forked from mailru/tntlua
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathirina.lua
334 lines (281 loc) · 9.2 KB
/
irina.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
--
-- irina.lua
--
--
-- Space 0: Remote IMAP Collector Accounts (Instant and Usual)
-- Tuple: { email (STR), user_id (NUM), is_instant (NUM), is_expirable (NUM), inst_from (NUM), shard_id (NUM) }
-- Index 0: HASH { email }
-- Index 1: TREE { is_instant, shard_id }
--
-- Space 1: Remote IMAP Collector Listeners
-- Tuple: { shard_id (NUM), addr (STR) }
-- Index 0: TREE { shard_id }
--
local function parse_addr(addr)
local ind = addr:find(":")
if ind == nil then return nil end
return addr:sub(0, ind - 1), tonumber(addr:sub(ind + 1))
end
local function send_collector_cmd(addr, cmd)
local s = box.socket.tcp()
if s == nil then
print("can not create collector socket")
return false
end
local host, port = parse_addr(addr)
if not s:connect(host, port, 0.1) then
local _, errstr = s:error()
print("can not connect to collector[" .. host .. ":" .. port .. "]: " .. errstr)
s:close()
return false
end
local bytes_sent, status, errno, errstr = s:send(cmd, 0.1)
if bytes_sent ~= #cmd then
local _, errstr = s:error()
print("can not send data to collector[" .. host .. ":" .. port .. "]: " .. errstr)
s:close()
return false
end
s:close()
return true
end
local function get_collector_address(shardid)
local v = box.select_limit(1, 0, 0, 1, shardid)
if v == nil then return nil end
return v[1]
end
--
-- Send signals to Instant Remote IMAP Collector daemon
--
local function send_change_status(email, userid, shardid, enabled, expirable)
local addr = get_collector_address(shardid)
if addr == nil then return end
local data = email .. " " .. userid .. " " .. enabled .. " " .. expirable
send_collector_cmd(addr, data)
end
local function send_add_shard(addr, shardid)
local data = "add_shard " .. shardid
send_collector_cmd(addr, data)
end
local function send_del_shard(addr, shardid)
local data = "del_shard " .. shardid
send_collector_cmd(addr, data)
end
local function get_table_size(t)
local n = 0
for _, _ in pairs(t) do n = n + 1 end
return n
end
function irina_add_collector_for(addr, shardid, notify_added_shard, notify_deleted_shard)
shardid = tonumber(shardid)
notify_added_shard = tonumber(notify_added_shard)
notify_deleted_shard = tonumber(notify_deleted_shard)
if shardid < 0 or shardid > 1023 then error("shard id must be in [0,1023] range") end
if notify_added_shard == nil then notify_added_shard = 1 end
if notify_deleted_shard == nil then notify_deleted_shard = 1 end
local tuple = box.select_limit(1, 0, 0, 1, shardid)
if tuple == nil then
box.insert(1, shardid, addr)
if notify_added_shard ~= 0 then send_add_shard(addr, shardid) end
else
local prev_addr = tuple[1]
print("rebind shard #" .. shardid .. " from " .. prev_addr .. " to " .. addr)
box.replace(1, shardid, addr)
if notify_deleted_shard ~= 0 then
send_del_shard(prev_addr, shardid)
end
if notify_added_shard ~= 0 then
box.fiber.sleep(0.1)
send_add_shard(addr, shardid)
end
end
end
function irina_add_collector(addr, notify_added_shard)
notify_added_shard = tonumber(notify_added_shard)
local addrs = {}
for tuple in box.space[1].index[0]:iterator(box.index.ALL) do
local curr_addr = tuple[1]
if curr_addr == addr then
print("already has collector with addr " .. addr)
return
end
if addrs[curr_addr] == nil then addrs[curr_addr] = 1
else addrs[curr_addr] = addrs[curr_addr] + 1 end
end
local n_collectors = get_table_size(addrs)
if n_collectors == 0 then
for i = 0, 1023 do
print("bind shard #" .. i .. " to " .. addr)
box.insert(1, i, addr)
if notify_added_shard ~= 0 then send_add_shard(addr, i) end
end
return
end
local new_count = 1024 / (n_collectors + 1)
for tuple in box.space[1].index[0]:iterator(box.index.ALL) do
local curr_shardid = box.unpack('i', tuple[0])
local curr_addr = tuple[1]
if addrs[curr_addr] > new_count then
addrs[curr_addr] = addrs[curr_addr] - 1
print("rebind shard #" .. curr_shardid .. " from " .. curr_addr .. " to " .. addr)
box.replace(1, curr_shardid, addr)
send_del_shard(curr_addr, curr_shardid)
if notify_added_shard ~= 0 then
box.fiber.sleep(0.1)
send_add_shard(addr, curr_shardid)
end
end
end
end
function irina_del_collector(addr, notify_deleted_shard)
notify_deleted_shard = tonumber(notify_deleted_shard)
local addrs = {}
for tuple in box.space[1].index[0]:iterator(box.index.ALL) do
local curr_addr = tuple[1]
addrs[curr_addr] = 1
end
if addrs[addr] == nil then
print("collector with addr " .. addr .. " does not exist")
return
end
local n_collectors = get_table_size(addrs)
if n_collectors == 1 then
for i = 0, 1023 do
print("unbind shard #" .. i .. " from " .. addr)
box.delete(1, i)
if notify_deleted_shard ~= 0 then send_del_shard(addr, i) end
end
return
end
addrs[addr] = nil
local addr_shards = irina_get_shards_impl(addr)
local n_addr_shards = get_table_size(addr_shards)
while n_addr_shards > 0 do
for curr_addr, _ in pairs(addrs) do
if n_addr_shards > 0 then
local shardid = nil
for k, v in pairs(addr_shards) do
shardid = v
table.remove(addr_shards, k)
n_addr_shards = n_addr_shards - 1
do break end
end
print("rebind shard #" .. shardid .. " from " .. addr .. " to " .. curr_addr)
box.replace(1, shardid, curr_addr)
if notify_deleted_shard ~= 0 then
send_del_shard(addr, shardid)
box.fiber.sleep(0.1)
end
send_add_shard(curr_addr, shardid)
end
end
end
end
function irina_get_shards_impl(addr)
local result = {}
for tuple in box.space[1].index[0]:iterator(box.index.ALL) do
local curr_shardid = box.unpack('i', tuple[0])
if tuple[1] == addr then table.insert(result, curr_shardid) end
end
return result
end
function irina_get_shards(addr)
local result = irina_get_shards_impl(addr)
return unpack(result)
end
local function update_record(email, set_instant, set_expirable)
box.update(0, email, "=p=p=p", 2, set_instant, 3, set_expirable, 4, box.time())
end
function irina_add_user(email, userid, is_instant, shardid)
userid = box.unpack('i', userid)
is_instant = box.unpack('i', is_instant)
shardid = box.unpack('i', shardid)
local need_send = false
local tuple = box.select_limit(0, 0, 0, 1, email)
if tuple == nil then
box.insert(0, email, userid, is_instant, 0, box.time(), shardid)
need_send = (is_instant == 1)
elseif is_instant == 1 then
local is_old_instant, is_old_expirable = box.unpack('i', tuple[2]), box.unpack('i', tuple[3])
if (is_old_instant == 0 or is_old_expirable == 1) then
need_send = true
shardid = box.unpack('i', tuple[5])
update_record(email, is_instant, 0)
end
elseif box.unpack('i', tuple[2]) == 1 then
need_send = true
shardid = box.unpack('i', tuple[5])
update_record(email, 0, 0)
end
if need_send then send_change_status(email, userid, shardid, is_instant, 0) end
end
function irina_del_user(email)
local tuple = box.delete(0, email)
if tuple == nil then return end
local userid = box.unpack('i', tuple[1])
local is_old_instant = box.unpack('i', tuple[2])
local shardid = box.unpack('i', tuple[5])
if is_old_instant == 1 then send_change_status(email, userid, shardid, 0, 0) end
end
local function set_flags_impl(tuple, cond, set_instant, set_expirable)
local email = tuple[0]
local userid = box.unpack('i', tuple[1])
local is_instant = box.unpack('i', tuple[2])
local is_expirable = box.unpack('i', tuple[3])
local shardid = box.unpack('i', tuple[5])
if not cond(is_instant, is_expirable) then return end
update_record(email, set_instant, set_expirable)
if is_instant ~= set_instant or is_expirable ~= set_expirable then
send_change_status(email, userid, shardid, set_instant, set_expirable)
end
end
local function set_flags(email, cond, set_instant, set_expirable)
local tuple = box.select_limit(0, 0, 0, 1, email)
if tuple == nil then return end
set_flags_impl(tuple, cond, set_instant, set_expirable)
end
function irina_set_instant(email)
set_flags(email,
function(i, e) return i == 0 or e == 1 end,
1, 0)
end
function irina_del_instant(email)
set_flags(email,
function(i, e) return i == 1 and e == 0 end,
0, 0)
end
function irina_set_online(email)
set_flags(email,
function(i, e) return i == 0 or e == 1 end,
1, 1)
end
function irina_get_instant_users_ex(shardid)
shardid = box.unpack('i', shardid)
local result = {}
for tuple in box.space[0].index[1]:iterator(box.index.EQ, 1, shardid) do
table.insert(result, { tuple[0], box.unpack('i', tuple[1]), box.unpack('i', tuple[3]) })
end
return unpack(result)
end
function irina_get_usual_users(shardid)
shardid = box.unpack('i', shardid)
local result = {}
for tuple in box.space[0].index[1]:iterator(box.index.EQ, 0, shardid) do
table.insert(result, { tuple[0], box.unpack('i', tuple[1]) })
end
return unpack(result)
end
local function is_expired(args, tuple)
if tuple == nil or #tuple <= args.fieldno then return nil end
local is_expirable = box.unpack('i', tuple[3])
if is_expirable == 0 then return false end
local field = box.unpack('i', tuple[args.fieldno])
return box.time() >= field + args.expiration_time
end
local function clean_expired(spaceno, args, tuple)
set_flags_impl(tuple,
function (i, e) return i == 1 and e == 1 end,
0, 0)
end
dofile('expirationd.lua')
expirationd.run_task('expire_instant', 0, is_expired, clean_expired, {fieldno = 4, expiration_time = 5*60})