forked from waggle-sensor/beehive-server
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathWaggleRouter.py
123 lines (98 loc) · 4.66 KB
/
WaggleRouter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# WaggleRouter.py
import sys
sys.path.append("..")
sys.path.append("/usr/lib/waggle/")
from multiprocessing import Process, Manager
from config import *
import pika
from waggle_protocol.protocol.PacketHandler import *
from waggle_protocol.utilities.packetassembler import PacketAssembler
import logging
#logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.CRITICAL)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class WaggleRouter(Process):
"""
The WaggleRouter class receives all messages from the incoming queue in the RabbitMQ server.
It then reads the packet header to learn the message Major type, and forwards it to the appropriate
queue for processing.
"""
def __init__(self,nodes_table):
logger.info("Initializing Routing Process")
logger.debug("debug mode")
super(WaggleRouter,self).__init__()
self.nodes_table = nodes_table
self.routeQueues = {
'r' : 'registration',
't' : 'util',
'p' : 'util',
's' : 'data'
}
#Connect to rabbitMQ
try:
self.rabbitConn = pika.BlockingConnection(pika_params)
except Exception as e:
logger.error("Could not connect to RabbitMQ server \"%s\": %s" % (pika_params.host, str(e)))
sys.exit(1)
logger.info("Connected to RabbitMQ server \"%s\"" % (pika_params.host))
self.channel = self.rabbitConn.channel()
self.channel.basic_qos(prefetch_count=1)
# self.assembler = PacketAssembler()
#Load all of the existing registered node queues
#if os.path.isfile('registrations/nodes.txt'):
# with open('registrations/nodes.txt','r') as nodes:
# for line in nodes:
# if line and line != '\n':
# info = line.strip().split(":")
# self.channel.queue_declare(info[1])
#declare the default queues
#TODO: Check to see if this section can be culled.
#queue_list = ["incoming","registration","util"]
#for queueName in queue_list:
# self.channel.queue_declare(queueName)
#Start consuming things from the incoming queue
self.channel.basic_consume(self.gotPacket,queue='incoming')
def gotPacket(self,ch,method,props,body):
try:
header = get_header(body)
except Exception as e:
logger.error(str(e))
ch.basic_ack(delivery_tag = method.delivery_tag)
return
s_uniqid_str = nodeid_int2hexstr(header['s_uniqid'])
r_uniqid_str = nodeid_int2hexstr(header['r_uniqid'])
major = chr(header["msg_mj_type"])
minor = chr(header["msg_mi_type"])
logger.debug("message (%s%s) from %s for %s" % (major, minor, s_uniqid_str, r_uniqid_str) )
if (header['r_uniqid'] == 0): # If the message is intended for the cloud...
#msg_type = chr(header["msg_mj_type"]),chr(header["msg_mi_type"])
#Figure out which queue this message belongs in.
msg_dest = self.routeQueues.get(major,'invalid')
if(msg_dest == 'invalid'):
logger.debug("msg_dest == 'invalid'")
ch.basic_ack(delivery_tag = method.delivery_tag)
return
logger.debug("forwarding message to %s" % (msg_dest))
self.channel.basic_publish(exchange='internal',routing_key=msg_dest, body=body)
ch.basic_ack(delivery_tag=method.delivery_tag)
else: # This is a message for someone else. Send it along.
try:
#TODO: This is where we have to check if the senderis allowed
#to send a message to the recipient. We need a permissions system
#here, with the permissions stored in the Cassandra data base.
# check if the sender is not impersonating someone else - is that
#possible? If so, where would that be done? - Check and see if
#RabbitMq permission system will help (Ben's idea)
#recipient = self.nodes_table[header['r_uniqid']]
recipient_node = self.nodes_table[r_uniqid_str]
recipient = recipient_node['queue']
self.channel.basic_publish(exchange='internal', routing_key = recipient, body=body)
except Exception as e:
print str(e)
finally:
ch.basic_ack(delivery_tag =method.delivery_tag)
def run(self):
self.channel.start_consuming()
def join(self):
super(WaggleRouter,self).terminate()
self.rabbitConn.close()