Skip to content

Commit

Permalink
refactor(cluster): add bots back to ClusterConfiguration
Browse files Browse the repository at this point in the history
paritally undoes #193 (which is unreleased upstream)
  • Loading branch information
fubuloubu committed Feb 13, 2025
1 parent 11af509 commit 25ef110
Showing 1 changed file with 35 additions and 61 deletions.
96 changes: 35 additions & 61 deletions silverback/cluster/types.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import enum
import math
import re
import uuid
from datetime import datetime
Expand Down Expand Up @@ -35,90 +34,61 @@ class ClusterConfiguration(BaseModel):
# processing through ApePay
# NOTE: All defaults should be the minimal end of the scale, so that `__or__` works right

# Version byte (Byte 0)
# NOTE: Update this to revise new models for every configuration change
version: int = 1
"""Version of this configuration (used for encoding/decoding)"""
# NOTE: Update this to revise new models for every configuration change

# Cluster-wide Configuration, priced per maximum usage (Bytes 1-2)
cpu: Annotated[int, Field(ge=0, le=6)] = 0 # defaults to 0.25 vCPU
"""Max vCPUs for entire cluster:
- 0.25 vCPU (0)
- 0.50 vCPU (1)
- 1.00 vCPU (2)
- 2.00 vCPU (3)
- 4.00 vCPU (4)
- 8.00 vCPU (5)
- 16.0 vCPU (6)"""
# Cluster-wide limits on maximum usage (Bytes 1-2)
cpu: Annotated[int, Field(ge=1, le=64)] = 1 # defaults to 1 vCPU
"""Max vCPUs shared by all bots in cluster"""

memory: Annotated[int, Field(ge=0, le=120)] = 0 # defaults to 512 MiB
"""Max memory for entire cluster (in GB, 0 means '512 MiB')"""
memory: Annotated[int, Field(ge=1, le=128)] = 1
"""Max memory (in GiB) shared by all bots in cluster"""

# NOTE: # of workers configured based on cpu & memory settings
# NOTE: # of workers in each bot configured based on above cpu & memory settings

# Runner configuration (Bytes 3-4)
networks: Annotated[int, Field(ge=1, le=20)] = 1
"""Maximum number of concurrent network runners"""
"""Maximum number of concurrent networks all bots can use"""

# NOTE: Byte 4 unused
bots: Annotated[int, Field(ge=1, le=250)] = 1
"""Maximum number of guaranteed concurrently running bots"""
# NOTE: Some amount of spare capacity allowed over this limit, depending on cluster load

# NOTE: Byte 5 unused
# NOTE: Byte 5 is reserved for future use (non-breaking)

# Recorder configuration (Bytes 6-7)
bandwidth: Annotated[int, Field(ge=0, le=250)] = 0 # 512 kB/sec
"""Rate at which data should be emitted by cluster (in MB/sec, 0 means '512 kB/sec')"""
# NOTE: This rate is only estimated average, and will serve as a throttling threshold
bandwidth: Annotated[int, Field(ge=1, le=250)] = 1 # 1 KiB/sec (~2.5 GiB/month)
"""Rate at which data should be emitted by cluster (in KiB/sec)"""
# NOTE: This rate will serve as a throttling threshold on results processing / data streaming

duration: Annotated[int, Field(ge=1, le=120)] = 1
"""Time to keep data recording duration (in months)"""
# NOTE: The storage space alloted for your recordings will be `bandwidth x duration`.
# If the storage space is exceeded, it will be aggressively pruned to maintain that size.
# We will also prune duration past that point less aggressively, if there is unused space.

@field_validator("cpu", mode="before")
def parse_cpu_value(cls, value: str | int) -> int:
if not isinstance(value, str):
return value

return round(math.log2(float(value.split(" ")[0]) * 1024 / 256))

@field_validator("memory", mode="before")
def parse_memory_value(cls, value: str | int) -> int:
if not isinstance(value, str):
return value

mem, units = value.split(" ")
if units.lower() in ("mib", "mb"):
assert mem == "512"
return 0

assert units.lower() == "gb"
return int(mem)
# We will also prune data with duration past this point, if there is unused space.

@field_validator("bandwidth", mode="before")
def parse_bandwidth_value(cls, value: str | int) -> int:
@field_validator("cpu", "memory", "bandwidth", mode="before")
def parse_units(cls, value: str | int) -> int:
if not isinstance(value, str):
return value

bandwidth, units = value.split(" ")
if units.lower() == "b/sec":
assert bandwidth == "512"
return 0

assert units.lower() == "kb/sec"
return int(bandwidth)
amount, _ = value.split(" ")
return int(amount)

def settings_display_dict(self) -> dict:
return dict(
version=self.version,
runner=dict(
bots=self.bots,
networks=self.networks,
),
bots=dict(
cpu=f"{256 * 2**self.cpu / 1024} vCPU",
memory=f"{self.memory} GB" if self.memory > 0 else "512 MiB",
cpu=f"{self.cpu} vCPU",
memory=f"{self.memory} GiB",
),
recorder=dict(
bandwidth=f"{self.bandwidth} MB/sec" if self.bandwidth > 0 else "512 kB/sec",
bandwidth=f"{self.bandwidth} KiB/sec",
duration=f"{self.duration} months",
),
)
Expand Down Expand Up @@ -147,6 +117,7 @@ def decode(cls, value: Any) -> "ClusterConfiguration":
cpu=cls._decode_byte(value, 1),
memory=cls._decode_byte(value, 2),
networks=cls._decode_byte(value, 3),
botss=cls._decode_byte(value, 4),
bandwidth=cls._decode_byte(value, 6),
duration=cls._decode_byte(value, 7),
)
Expand All @@ -167,6 +138,7 @@ def encode(self) -> int:
+ self._encode_byte(self.cpu, 1)
+ self._encode_byte(self.memory, 2)
+ self._encode_byte(self.networks, 3)
+ self._encode_byte(self.bots, 4)
+ self._encode_byte(self.bandwidth, 6)
+ self._encode_byte(self.duration, 7)
)
Expand Down Expand Up @@ -209,17 +181,19 @@ class ClusterTier(enum.IntEnum):
"""Suggestions for different tier configurations"""

STANDARD = ClusterConfiguration(
cpu="0.25 vCPU",
memory="512 MiB",
cpu="1 vCPU",
memory="2 GiB",
networks=3,
bandwidth="512 B/sec", # 1.236 GB/mo
bots=5,
bandwidth="1 KiB/sec", # 2.47 GB/mo
duration=3, # months
).encode()
PREMIUM = ClusterConfiguration(
cpu="1 vCPU",
memory="2 GB",
cpu="4 vCPU",
memory="8 GiB",
networks=10,
bandwidth="5 kB/sec", # 12.36 GB/mo
bots=20,
bandwidth="5 KiB/sec", # 12.36 GB/mo
duration=12, # 1 year = ~148GB
).encode()

Expand Down

0 comments on commit 25ef110

Please sign in to comment.