forked from hhuuggoo/ZmqWebBridge
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgeventbridgeutils.py
113 lines (91 loc) · 3.39 KB
/
geventbridgeutils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import gevent
import gevent.queue
from gevent_zeromq import zmq
import logging
log = logging.getLogger(__name__)
import simplejson
from gevent import spawn
import collections
import logging
import time
log = logging.getLogger('__name__')
jsonapi = simplejson
class GeventZMQRPC(object):
#none, means we can rpc any function
#explicit iterable means, only those
#functions in the iterable can be executed
#(use a set)
authorized_functions = None
def __init__(self, reqrep_socket):
self.reqrep_socket = reqrep_socket
def run_rpc(self):
while True:
try:
#the follow code must be wrapped in an exception handler
#we don't know what we're getting
msg = self.reqrep_socket.recv()
msgobj = jsonapi.loads(msg)
response_obj = self.get_response(msgobj)
response = jsonapi.dumps(response_obj)
except Exception as e:
log.exception(e)
response_obj = self.error_obj('unknown error')
response = jsonapi.dumps(response_obj)
self.reqrep_socket.send(jsonapi.dumps(response_obj))
def error_obj(self, error_msg):
return {'status' : 'error',
'error_msg' : error_msg}
def returnval_obj(self, returnval):
return {'returnval' : returnval}
def get_response(self, msgobj):
funcname = msgobj['funcname']
args = msgobj.get('args', [])
kwargs = msgobj.get('kwargs', {})
auth = False
if self.authorized_functions is not None \
and funcname not in self.authorized_functions:
return self.error_obj('unauthorized access')
if hasattr(self, 'can_' + funcname):
auth = self.can_funcname(*args, **kwargs)
if not auth:
return self.error_obj('unauthorized access')
func = getattr(self, funcname)
retval = func(*args, **kwargs)
return self.returnval_obj(retval)
class PubSubRPCClient(object):
def __init__(self, socket):
self.socket = socket
self.queue = gevent.queue.Queue()
def rpc(self, funcname, *args, **kwargs):
msg = {'funcname' : funcname,
'args' : args}
self.queue.put(jsonapi.dumps(msg))
def run_pub(self):
while True:
msg = self.queue.get()
self.socket.send(msg)
class GeventRPCClient(object):
def __init__(self, socket, ident, timeout=1.0):
self.socket = socket
self.ident = ident
self.queue = gevent.queue.Queue()
self.timeout = timeout
def rpc(self, funcname, *args, **kwargs):
msg = {'funcname' : funcname,
'args' : args}
self.socket.send_multipart([jsonapi.dumps(msg), self.ident])
data = []
def recv():
val = self.socket.recv()
data.append(val)
recv_t = gevent.spawn(recv)
recv_t.join(timeout=self.timeout)
recv_t.kill()
if len(data) == 1:
return jsonapi.loads(data[0])['returnval']
else:
return None
def run_send(self):
while True:
msg = self.queue.get()
self.socket.send(msg)