-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
197 lines (167 loc) · 5.4 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
#coding:utf-8
#File: main.py
#Auth: lixp(@500wan.com)
#Date: 2014-10-17 17:19:49
#Desc:
import tornado.tcpserver
import tornado.ioloop
import tornado.gen
import tornado.websocket
import tornadoredis
import time
import json
import sys,os
#reids-svr
HOST = '192.168.41.141'
PORT = 6379
DB = 0
TIMEOUT = 5
CALLBACK_PERIOD = 5
#pushkey到conn的正排拉链
g_p2c = {}
#conn到pushkey的倒排拉链
g_c2p = {}
#redis客户端连接
g_redis_sub = None
g_redis_blpop = None
"""定时清空超时连接
"""
def update_connections():
print 'remove timeout connection!',len(g_p2c),int(time.time())
now = time.time()
keys = g_p2c.keys()
for p in keys:
c = g_p2c[p]
if c['t'] < now-CALLBACK_PERIOD:
#超时关闭连接
del g_c2p[c['c']]
g_p2c[p]['c'].close()
del g_p2c[p]
"""订阅消息并监听
"""
@tornado.gen.engine
def sub_listen():
#异步提交cmd,必须yield
yield tornado.gen.Task(g_redis_sub.subscribe, 'android_msg_pubsub')
g_redis_sub.listen(sub_callback)
"""消息订阅回调
"""
def sub_callback(msg):
if msg.kind != 'message':
return
p = msg.body
if g_p2c.has_key(p):
#当客户端消息可用时主动推送
key = 'android_push_' + p.lower()
g_redis_blpop.blpop((key,), TIMEOUT, blpop_callback)
else:
#连接不可用时,被动拉取
pass
"""消息发送回调
"""
def blpop_callback(msg):
msg = msg.values()[0].encode('utf8')
if msg:
try:
dct = json.loads(msg)
token = dct.get('token', None)
if token and g_p2c.has_key(token):
#兼容被动拉取消息格式
resp = {}
data = {}
expire = dct.get('etc',{}).get('expire',0)
if expire >= time.time():
data['msg'] = dct.get('data',{})
else:
data['msg'] = {}
data['interval'] = 350
resp['data'] = data
resp['code'] = '1'
#发送msg
try:
tornado.ioloop.IOLoop.instance().add_callback(lambda: \
g_p2c[token]['c'].write_message(json.dumps(resp,ensure_ascii=False)))
except Exception as e:
print 'Connection already closed!:%s' % e
else:
#token不合法或客户端未连接,直接丢弃msg
print 'unreachable token:[ TOKEN:%s ]' % token
except Exception as e:
print 'push msg err![ MSG:%s ERR:%s]' % (msg,e)
"""客户端连接
"""
class Connection(object):
def __init__(self, stream, address):
self._stream = stream
self._address = address
self._stream.set_close_callback(self.on_close)
self.read_handler()
print "A new client has connected!", address
def read_handler(self):
if self._stream.closed():
return
self._stream.read_until('\n', self.biz_handler)
def biz_handler(self, data):
req = data[:-1]
if len(req) > 0:
#首次通信:pushkey
g_p2c.update({req:{'c':self, 't':time.time()}})
g_c2p.update({self:req})
resp = 'ACK\n'
else:
#心跳包:''
g_p2c[g_c2p[self]]['t'] = time.time()
resp = 'PING\n'
self.write_handler(resp)
self.read_handler()
def write_handler(self, data):
if self._stream.closed():
return
self._stream.write(data)
def on_close(self):
print "A connection has broken!", self._address
class PushServer(tornado.tcpserver.TCPServer):
def handle_stream(self, stream, address):
Connection(stream, address)
class MainHandler(tornado.websocket.WebSocketHandler):
def open(self):
print "A new client has connected!"
def on_close(self):
print "A connection has broken!"
def on_message(self, message):
req = message
if len(req) > 0:
#首次通信:pushkey
print 'First tcp pack: [ PUSHKEY:%s ]' % req
g_p2c.update({req:{'c':self, 't':time.time()}})
g_c2p.update({self:req})
try:
self.write_message('ACK')
except Exception as e:
print 'Connection already closed!:%s' % e
else:
#心跳包:'', 不响应心跳包节约流量
print 'Heart beat to keepalive: [ PING:%s ]' % req
g_p2c[g_c2p[self]]['t'] = time.time()
if __name__ == '__main__':
"""
#启动tcp-server
server = PushServer()
server.bind(8774)
server.start() # 0:Forks multiple sub-processes
"""
application = tornado.web.Application([(r'/websocket',MainHandler),])
application.listen(8774)
#redis-subscribe
g_redis_sub = tornadoredis.Client(host=HOST, port=PORT, selected_db=DB,
io_loop=tornado.ioloop.IOLoop.instance())
g_redis_sub.connect()
sub_listen()
#redis-blpop
g_redis_blpop = tornadoredis.Client(host=HOST, port=PORT, selected_db=DB,
io_loop=tornado.ioloop.IOLoop.instance())
g_redis_blpop.connect()
#定时清理超时connection
tornado.ioloop.PeriodicCallback(update_connections, CALLBACK_PERIOD*1000).start()
#启动ioloop
tornado.ioloop.IOLoop.instance().start()