Skip to content

Commit

Permalink
Merge pull request #26 from zavarin-michael/allocated-ports-redis
Browse files Browse the repository at this point in the history
Перевезти подбор портов с базы на redis
  • Loading branch information
artamaney authored Jan 9, 2024
2 parents b8d09c4 + 15f4624 commit 7fc75bc
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 28 deletions.
1 change: 1 addition & 0 deletions demo/scripts/example_script.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
echo 'Hello world!'
2 changes: 1 addition & 1 deletion overhave/api/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def get_test_run_storage() -> ITestRunStorage:

@cache
def get_emulation_storage() -> IEmulationStorage:
return EmulationStorage(OverhaveEmulationSettings())
return EmulationStorage(settings=OverhaveEmulationSettings(), redis=make_redis(get_redis_settings()))


@cache
Expand Down
1 change: 1 addition & 0 deletions overhave/entities/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ class OverhaveEmulationSettings(BaseOverhavePrefix):
emulation_bind_ip: str = "0.0.0.0" # noqa: S104
# Ports for emulation binding. Expects as string with format `["port1", "port2", ...]`
emulation_ports: list[int] = [8080]
redis_ports_key: str = "allocated_ports"

# As a real service, should be used follow path: `http://my-service.domain/mount`
# where `emulation_service_url` = `http://my-service.domain` - URL for service,
Expand Down
3 changes: 2 additions & 1 deletion overhave/factory/base_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
)
from overhave.test_execution import PytestRunner, StepCollector
from overhave.transport import S3Manager
from overhave.transport.redis.deps import get_redis_settings, make_redis


class IOverhaveFactory(Generic[TApplicationContext], abc.ABC):
Expand Down Expand Up @@ -120,7 +121,7 @@ def draft_storage(self) -> IDraftStorage:

@cached_property
def _emulation_storage(self) -> EmulationStorage:
return EmulationStorage(settings=self.context.emulation_settings)
return EmulationStorage(settings=self.context.emulation_settings, redis=make_redis(get_redis_settings()))

@property
def emulation_storage(self) -> IEmulationStorage:
Expand Down
43 changes: 22 additions & 21 deletions overhave/storage/emulation_storage.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import abc
import logging
import socket
from typing import cast
from typing import Any, List, cast

import orjson
import sqlalchemy as sa
import sqlalchemy.orm as so
from redis import Redis

from overhave import db
from overhave.entities.settings import OverhaveEmulationSettings
Expand Down Expand Up @@ -51,8 +52,10 @@ def get_emulation_runs_by_test_user_id(test_user_id: int) -> list[EmulationRunMo
class EmulationStorage(IEmulationStorage):
"""Class for emulation runs storage."""

def __init__(self, settings: OverhaveEmulationSettings):
def __init__(self, settings: OverhaveEmulationSettings, redis: "Redis[Any]"):
self._redis = redis
self._settings = settings
self._redis.set(self._settings.redis_ports_key, orjson.dumps([]))
self._emulation_ports_len = len(self._settings.emulation_ports)

@staticmethod
Expand All @@ -65,20 +68,9 @@ def create_emulation_run(emulation_id: int, initiated_by: str) -> int:
session.flush()
return emulation_run.id

def _get_next_port(self, session: so.Session) -> int:
runs_with_allocated_ports = ( # noqa: ECE001
session.query(db.EmulationRun)
.filter(db.EmulationRun.port.isnot(None))
.order_by(db.EmulationRun.id.desc())
.limit(self._emulation_ports_len)
.all()
)
allocated_sorted_runs = sorted(
runs_with_allocated_ports,
key=lambda t: t.changed_at,
)

allocated_ports = {run.port for run in allocated_sorted_runs}
def _get_next_port(self) -> int:
allocated_ports = self.get_allocated_ports()

logger.debug("Allocated ports: %s", allocated_ports)
not_allocated_ports = set(self._settings.emulation_ports).difference(allocated_ports)
logger.debug("Not allocated ports: %s", not_allocated_ports)
Expand All @@ -88,12 +80,20 @@ def _get_next_port(self, session: so.Session) -> int:
continue
return port
logger.debug("All not allocated ports are busy!")
for run in allocated_sorted_runs:
if self._is_port_in_use(cast(int, run.port)):
for port in allocated_ports:
if self._is_port_in_use(port):
continue
return cast(int, run.port)
return port
raise AllPortsAreBusyError("All ports are busy - could not find free port!")

def get_allocated_ports(self) -> List[int]:
return cast(List[int], orjson.loads(cast(bytes, self._redis.get(self._settings.redis_ports_key))))

def allocate_port(self, port: int) -> None:
new_allocated_ports = self.get_allocated_ports()
new_allocated_ports.append(port)
self._redis.set(self._settings.redis_ports_key, orjson.dumps(sorted(new_allocated_ports)))

def _is_port_in_use(self, port: int) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex((self._settings.emulation_bind_ip, port)) == 0
Expand All @@ -102,7 +102,8 @@ def get_requested_emulation_run(self, emulation_run_id: int) -> EmulationRunMode
with db.create_session() as session:
emulation_run = session.query(db.EmulationRun).filter(db.EmulationRun.id == emulation_run_id).one()
emulation_run.status = db.EmulationStatus.REQUESTED
emulation_run.port = self._get_next_port(session)
emulation_run.port = self._get_next_port()
self.allocate_port(emulation_run.port)
emulation_run.changed_at = get_current_time()
return EmulationRunModel.model_validate(emulation_run)

Expand Down
Loading

0 comments on commit 7fc75bc

Please sign in to comment.