-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathutility.py
126 lines (79 loc) · 3.54 KB
/
utility.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import datetime
class Utility:
def print_logs(self, result):
'''Print eeach log in user log.'''
if self.is_log_enabled:
if self.is_ratelimit_reset_header_allowed:
logs = result[6]
self.count_logs(logs)
else:
logs = result[4]
self.count_logs(logs)
def count_logs(self, logs):
''''Count expired and valid logs in rate limit time window.'''
logCount = 0
logExpiredCount = 0
t = datetime.datetime.fromtimestamp(self.slidingTimeWindow)
t = datetime.datetime.strftime(
t, self.fixed_time_window_format_for_insertion)
print('Logs Sliding Time ', t, ' - ', self.request_recieved_at)
for log in logs:
t = int(log.decode())
times = datetime.datetime.fromtimestamp(t)
times = times.strftime("%H:%M:%S")
print(times, end=" | ")
for log in logs:
if int(log.decode()) < self.slidingTimeWindow:
#print('Log key Deleted : ', int(log.decode()))
logExpiredCount = logExpiredCount + 1
else:
#print('Log key : ',int(log.decode()) )
logCount = logCount + 1
print('Log inlcuded : ', logCount)
print('Log not included : ', logExpiredCount)
print('Included windows : ', self.total_request_served)
print('Not Inlcuded windows : ', self.total_expired_time_windows)
def print_all_timewindows(self, time_window_list):
'''Print and count expired and valid windows in rate limit time window.'''
print('Windows Included')
for window, counts in time_window_list.items():
if int(window.decode()) < self.slidingTimeWindow:
pass
else:
t = int(window.decode())
times = datetime.datetime.fromtimestamp(t)
times = times.strftime("%H:%M:%S")
print(times, end=" | ")
print('\nWindows Not Included')
for window, counts in time_window_list.items():
if int(window.decode()) < self.slidingTimeWindow:
t = int(window.decode())
times = datetime.datetime.fromtimestamp(t)
times = times.strftime("%H:%M:%S")
print(times, end=" | ")
else:
pass
print('\n')
print('Sliding Window Time : ', self.slidingTimeWindow)
print('All windows \n')
for window, counts in time_window_list.items():
t = int(window.decode())
print(t, ':', int(counts.decode()), end=" | ")
print('\n')
def getRequestInsertionTimeStr(self):
'''Return request time in str format.'''
if self.request_recieved_at:
return self.request_recieved_at.strftime("%Y:%m:%d:%H:%M:%S:%M")
def getCurrentUnixtimestamp(self):
'''get current timestamp for request'''
t = datetime.datetime.now()
return int(datetime.datetime.timestamp(t))
def insertLogrequest(self):
'''Insert each requrest into log'''
if self.is_log_enabled:
self.pipeline.execute_command(
'ZADD', 'log:'+self.clientid, 'nx', 1, self.getCurrentUnixtimestamp())
self.pipeline.execute_command(
'ZRANGE', 'log:'+self.clientid, 0, -1)
self.pipeline.execute_command(
'EXPIRE', 'log:'+self.clientid, self.expiration_time_of_client_keys)