-
Notifications
You must be signed in to change notification settings - Fork 1
/
master.py
145 lines (106 loc) · 4.29 KB
/
master.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import os
from typing import Optional
import logging
import time
from fastapi import FastAPI, Response
from http import HTTPStatus
from pydantic import BaseModel
import time
import grequests
from threading import Thread
from requests import Session
import gevent
from gevent import monkey as curious_george
from gevent.pool import Pool
logger = logging.getLogger("master")
class MessageModel(BaseModel):
message: str
is_blocked: bool = False
app = FastAPI(debug=True)
SECONDARY_RESPONSE_TIMEOUT=30
message_number = -1
INMEMORY_MESSAGE_LIST = ["test"]
SECONDARIES = [f"http://{sec_name}:8000/__message" for sec_name in os.environ['SECONDARIES_NAMES'].split(sep=',')]
HEALTHCKECK_LOOP_TIMEOUT=10
UNHEALTHY_TIMEOUT=5
HEALTH_STATUSES = {sec_name:'Suspected' for sec_name in os.environ['SECONDARIES_NAMES'].split(sep=',')}
def imap(requests, stream=False, size=2, exception_handler=None):
pool = Pool(size)
def send(r):
return r.send(stream=stream)
ex_reqs = []
for request in pool.imap_unordered(send, requests):
if not request or not request.response or request.response.status_code == 500:
ex_reqs.append(request)
continue
if request.response is not None:
yield request.response
for req in ex_reqs:
yield exception_handler(req, stream)
pool.join()
def exception_handler(r, stream=False):
request = r.send(stream=stream)
retr_timeout = 3
while not request or not request.response or request.response.status_code == 500:
time.sleep(retr_timeout)
retr_timeout = retr_timeout*2
print(f'Retry func with timeout: {retr_timeout}')
request = r.send(stream=stream)
return request.response
def healthcheck_loop(sec_name):
secondary_ping_uri = f"http://{sec_name}:8000/ping"
print("sec_name", sec_name)
while True:
request = grequests.get(secondary_ping_uri)
response = grequests.map([request])[0]
if not response or response.status_code != 204:
if HEALTH_STATUSES[sec_name] == "Suspected":
HEALTH_STATUSES[sec_name] = "Unhealthy"
else:
HEALTH_STATUSES[sec_name] = "Suspected"
else:
HEALTH_STATUSES[sec_name] = "Healthy"
time.sleep(HEALTHCKECK_LOOP_TIMEOUT)
for sec_name in os.environ['SECONDARIES_NAMES'].split(sep=','):
Thread(target=healthcheck_loop, args=[sec_name]).start()
def is_quorum():
n_avaliable_nodes = len([s for s in HEALTH_STATUSES.values() if s != "Unhealthy"])
return n_avaliable_nodes > 0
def replicate_to_secondaries(message, write_concern, id_):
rs = [grequests.post(secondary, json={'message': message.message, 'id': id_, 'is_blocked': message.is_blocked}, timeout=SECONDARY_RESPONSE_TIMEOUT) for secondary in SECONDARIES]
results = imap(rs, exception_handler=exception_handler)
return results
def is_success_replication(results, write_concern):
for i in range(0, write_concern - 1):
if not next(results):
return False
return True
def threading_start(results, write_concern):
threadOfUpdates = Thread(target=is_success_replication, args=[results, 3])
threadOfUpdates.start()
@app.get("/health")
def health():
"""Reports secondaries' statuses"""
return HEALTH_STATUSES
@app.get("/message")
def get_message():
return INMEMORY_MESSAGE_LIST
@app.post("/message")
def post_message(message: MessageModel, write_concern: int):
global message_number
message_number += 1
INMEMORY_MESSAGE_LIST.append(message)
print(f'Input message in master: {message.message}')
print(f"With write concern: {write_concern}")
print(HEALTH_STATUSES)
if is_quorum():
replication_results = replicate_to_secondaries(message, write_concern, message_number)
if is_success_replication(replication_results, write_concern):
threading_start(replication_results, write_concern)
print('Successful replication!')
return Response('Successfull replication!', status_code=HTTPStatus.OK.value)
else:
return Response(status_code=HTTPStatus.SERVICE_UNAVAILABLE.value)
else:
error_message = f"Can't append message. No quorum. Secondaries' health: {HEALTH_STATUSES}"
return Response(error_message, status_code=418)