Skip to content

Commit

Permalink
Python 3.11, basic auth, update styles, change logging
Browse files Browse the repository at this point in the history
  • Loading branch information
brunneis committed Jan 10, 2023
1 parent 9df55a5 commit d4ddc92
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 71 deletions.
4 changes: 2 additions & 2 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.9-slim-buster as BUILD_IMAGE
FROM python:3.11-slim-buster as BUILD_IMAGE

ARG ROCKSDB_VERSION=6.10.2
ARG ROCKSDB_BASE_URL=https://github.com/facebook/rocksdb/archive
Expand Down Expand Up @@ -33,7 +33,7 @@ RUN \
apt-get -y install libev-dev && \
python setup.py install

FROM python:3.9-slim-buster
FROM python:3.11-slim-buster
COPY --from=BUILD_IMAGE /usr/lib /usr/lib
COPY --from=BUILD_IMAGE /usr/local/lib /usr/local/lib
COPY --from=BUILD_IMAGE /usr/include /usr/include
Expand Down
3 changes: 3 additions & 0 deletions docker/config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
---
auth:
guest: guest

global:
port: 5704
data_dir: ./data
Expand Down
4 changes: 4 additions & 0 deletions src/.style.yapf
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ based_on_style=pep8
column_limit=79
split_before_arithmetic_operator=true
split_before_logical_operator=true
split_before_named_assigns=true
split_before_first_argument=true
allow_split_before_dict_value=false
dedent_closing_brackets=true
48 changes: 23 additions & 25 deletions src/stopover_server/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,33 @@
import bjoern

banner = f"""
███████████ ███████████
█████████████ ███████████████
███████████████ █████████████████
█████████████ ███████████████████
███████████ ███ █████████████████
███ ███████████████
███ ███████████
███ ███
███ ███
███ ███
█████████████████ ███
███████████████████████ ███
█████████████████████████ ███
█████████████████████████████
█████████████████████████████
█████████████████████████████
█████████████████████████████
███████████████████████████
███████████████████████
███████████████████
███████████
█████████
█████████ █████████████
█████████████ █████████████████
███████████████ ███████████████████
█████████████ █████████████████
█████████ ███ █████████████
███ █████████
███ ███
█████████████ ███
█████████████████ ███
█████████████████████ ███
█████████████████████████
███████████████████████████
█████████████████████████
█████████████████████
█████████████████
█████████████
Stopover v{__version__}
"""

CONFIG_PATH = './config.yaml'

logging.getLogger().setLevel(logging.INFO)
logging.basicConfig(format='%(asctime)-15s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
logging.basicConfig(
format='%(asctime)-15s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)


def main():
Expand All @@ -48,8 +46,8 @@ def main():
config = yaml.safe_load(input_file)

try:
open(f"{config['global']['data_dir']}/streams/.active")

file = open(f"{config['global']['data_dir']}/streams/.active")
file.close()
except FileNotFoundError:
logging.critical('the streams dir is not active')
sys.exit(1)
Expand Down
100 changes: 71 additions & 29 deletions src/stopover_server/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import time
import json
import logging
import base64


class STATUS:
Expand All @@ -22,6 +23,7 @@ class STATUS:


def handle_error(method):

def _try_except(self, *args, **kwargs):
try:
return method(self, *args, **kwargs)
Expand All @@ -39,9 +41,10 @@ def _try_except(self, *args, **kwargs):


class Broker:

def __init__(self, config):
self.config = config
logging.info(f'config: {json.dumps(config, indent=2)}')
utils.log_dict(self.config, prefix='⚙️ ')

self.partitions_by_stream_lock = Lock()
self.partitions_by_stream = {}
Expand All @@ -58,12 +61,44 @@ def __init__(self, config):
Thread(target=self._rebalance_loop, daemon=True).start()
Thread(target=self._prune_loop, daemon=True).start()

def check_authenticated(self, headers):
if 'authorization' in headers:
return True
return False

def check_authorized(self, headers):
token = headers['authorization'].split('Basic ')[1]
client_id, client_secret = base64.b64decode(token) \
.decode('ascii').split(':')
return (
client_id in self.config['auth']
and client_secret == self.config['auth'][client_id]
)

@staticmethod
def on_get(request, response):
response.content_type = 'text/html; charset=utf-8'
response.body = f'Labteral Stopover {__version__}'

def on_post(self, request, response):
headers = {
key.lower(): value
for (key, value) in request.headers.items()
}

if 'auth' in self.config:
is_authenticated = self.check_authenticated(headers)
if not is_authenticated:
response.status = falcon.status_codes.HTTP_401
return

is_authorized = self.check_authorized(
headers
) if is_authenticated else False
if not is_authorized:
response.status = falcon.status_codes.HTTP_403
return

bin_data = request.stream.read()

plain_response = False
Expand Down Expand Up @@ -290,7 +325,8 @@ def _get_partition(self, stream: str, partition_number: int):
self.partitions[stream][partition_number] = Partition(
stream=stream,
number=partition_number,
data_dir=self.config['global']['data_dir'])
data_dir=self.config['global']['data_dir']
)
return self.partitions[stream][partition_number]

def _get_receiver_partition_numbers(
Expand All @@ -306,12 +342,13 @@ def _get_receiver_partition_numbers(
if receiver_group not in self.partitions_by_group[stream]:
self.partitions_by_group[stream][receiver_group] = {}

if receiver not in self.partitions_by_group[stream][
receiver_group]:
if receiver not in self.partitions_by_group[stream][receiver_group
]:
self.partitions_by_group[stream][receiver_group][receiver] = []

return list(
self.partitions_by_group[stream][receiver_group][receiver])
self.partitions_by_group[stream][receiver_group][receiver]
)

def _get_stream_path(self, stream: str) -> str:
return f"{self.config['global']['data_dir']}/streams/{stream}/"
Expand All @@ -325,8 +362,8 @@ def _get_stream_partition_numbers(self, stream: str):
self.partitions_by_stream[stream] = partition_numbers

try:
partitions_target = self.config['streams'][stream][
'partitions']
partitions_target = self.config['streams'][stream]['partitions'
]
except KeyError:
partitions_target = self.config['global']['partitions']

Expand All @@ -344,12 +381,15 @@ def _get_stream_partition_numbers(self, stream: str):
partitions_target):
if partition_number in partition_numbers:
raise FileNotFoundError(
f'missing partitions among {partition_numbers}')

Partition(stream=stream,
number=partition_number,
data_dir=self.config['global']['data_dir'],
create_if_missing=True)
f'missing partitions among {partition_numbers}'
)

Partition(
stream=stream,
number=partition_number,
data_dir=self.config['global']['data_dir'],
create_if_missing=True
)
partition_numbers.append(partition_number)

return self.partitions_by_stream[stream]
Expand All @@ -359,16 +399,15 @@ def _rebalance_loop(self):
self._rebalance()
remaining_seconds = self.config['global']['rebalance_interval']
logging.debug(
f"next rebalance will hapen in {remaining_seconds} seconds")
f"next rebalance will hapen in {remaining_seconds} seconds"
)
time.sleep(self.config['global']['rebalance_interval'])

def _rebalance(self):
with self.partitions_by_group_lock:
logging.debug('rebalancing...')
if self.partitions_by_group:
logging.info(
'assignments: '
f'{json.dumps(self.partitions_by_group, indent=4)}')
utils.log_dict(self.partitions_by_group)

receivers_to_remove = []
for stream in self.partitions_by_group:
Expand All @@ -389,7 +428,8 @@ def _rebalance(self):

else:
receivers_to_remove.append(
(stream, receiver_group, receiver))
(stream, receiver_group, receiver)
)

stream_partition_numbers = \
self._get_stream_partition_numbers(stream)
Expand All @@ -410,22 +450,22 @@ def _rebalance(self):
step):
receiver_index = index // step
self.partitions_by_group[stream][receiver_group][
stream_receiver_group_receivers[
receiver_index]] = stream_partition_numbers[
index:index + step]
stream_receiver_group_receivers[receiver_index]
] = stream_partition_numbers[index:index + step]

for index in range(number_of_partitions - remainder,
number_of_partitions):
receiver_index = index - number_of_partitions + 1
self.partitions_by_group[stream][receiver_group][
stream_receiver_group_receivers[
receiver_index]].append(
stream_partition_numbers[index])
stream_receiver_group_receivers[receiver_index]
].append(stream_partition_numbers[index])

for stream, receiver_group, receiver in receivers_to_remove:
logging.info(f'receiver "{receiver}" kicked from the '
f'receiver_group "{receiver_group}" '
f'for the stream "{stream}"')
logging.info(
f'receiver "{receiver}" kicked from the '
f'receiver_group "{receiver_group}" '
f'for the stream "{stream}"'
)
del self.partitions_by_group[stream][receiver_group][receiver]
if receiver in self.last_seen_by_group[receiver_group]:
del self.last_seen_by_group[receiver_group][receiver]
Expand Down Expand Up @@ -473,8 +513,10 @@ def _prune_loop(self):

for partition_number in partition_numbers:
logging.info(
f'pruning stream {stream} ({partition_number})')
f'pruning stream {stream} ({partition_number})'
)

partition = self._get_partition(
stream, partition_number)
stream, partition_number
)
partition.prune(int(ttl))
36 changes: 22 additions & 14 deletions src/stopover_server/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@


class PartitionItem:
def __init__(self,
value: bytes = None,
timestamp: int = None,
item_dict: Dict = None):

def __init__(
self,
value: bytes = None,
timestamp: int = None,
item_dict: Dict = None
):
if item_dict is not None:
self._load_from_dict(item_dict)
else:
Expand Down Expand Up @@ -47,11 +50,13 @@ class Partition:
INDEX = b'\x01'
OFFSET = b'\x02'

def __init__(self,
stream: str,
number: int,
data_dir: str,
create_if_missing: bool = False):
def __init__(
self,
stream: str,
number: int,
data_dir: str,
create_if_missing: bool = False
):
self.lock = Lock()
self.stream = stream
self.number = number
Expand Down Expand Up @@ -103,7 +108,7 @@ def get(self, receiver_group: str, index=None) -> dict:
partition_item = self._get_by_index(receiver_index)

if partition_item is None:
return
return None

partition_item_dict = partition_item.dict
partition_item_dict['index'] = receiver_index
Expand All @@ -113,8 +118,10 @@ def commit(self, offset: int, receiver: str):
with self.lock:
expected_offset = self._get_offset(receiver) + 1
if offset != expected_offset:
raise ValueError(f'trying to commit offset {offset} '
f'but expecting {expected_offset}')
raise ValueError(
f'trying to commit offset {offset} '
f'but expecting {expected_offset}'
)
self._increase_offset(receiver)

def set_offset(self, receiver: str, offset: int):
Expand Down Expand Up @@ -147,7 +154,7 @@ def prune(self, ttl: int):
logging.debug(f'Deleting {key}')
self._store.delete(key)

def _get_by_index(self, index: int) -> bytes:
def _get_by_index(self, index: int) -> PartitionItem:
message_key = self._get_message_key(index)
value = self._store.get(message_key)
if value is None:
Expand Down Expand Up @@ -196,5 +203,6 @@ def _get_offset_key(receiver: str) -> bytes:
@staticmethod
def _get_message_key(index: int) -> bytes:
message_key = Partition.MESSAGE + int_to_padded_bytes(
index, UINT_BYTES)
index, UINT_BYTES
)
return message_key
Loading

0 comments on commit d4ddc92

Please sign in to comment.