Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing logic #7

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
0dbf21f
feat: implement enabling shield
Oct 30, 2024
9648ecb
fix: fixes after code review
Nov 5, 2024
6f321c7
feat: shield logic finished
Nov 7, 2024
23fccc4
fix: code review fixes
Nov 8, 2024
290fc3d
feat: add serialization logic
Nov 9, 2024
1c69b99
feat: logic finished
Nov 11, 2024
6b4abe1
feat: encryption manager refactored
Nov 11, 2024
34a9cf7
feat: add memory implementation for some managers
Nov 11, 2024
168de86
feat: finish memory implementations for all managers along with full …
Nov 12, 2024
8f975a7
fix: finish tests for shield with needed fixes (there is still proble…
Nov 12, 2024
7f1873c
feat: back to EncryptionManager using ECIES library
Nov 13, 2024
4d7fba3
fix: added hash to manifest file to verify if content is the same
Nov 13, 2024
9802e2d
feat: implemented Route53AddressManager
Nov 14, 2024
16603b6
feat: implemented SQLAlchemyMinerShieldStateManager
Nov 14, 2024
6b24602
feat: implemented S3ManifestManager
Nov 15, 2024
82e330c
feat: added integration test
Nov 15, 2024
968f045
feat: filling AwsAddressManager, work in progress
Nov 22, 2024
935c2e6
feat: added possibility of specyfing IP in AwsAddressManager
Nov 22, 2024
6ba1675
feat: filling AwsAddressManager - creating ELB is working
Nov 22, 2024
2ac5dd7
feat: filling AwsAddressManager - creating HostedZone during ELB crea…
Nov 24, 2024
097b2de
feat: filling AwsAddressManager - creating alias DNS entry in Route53
Nov 24, 2024
dc62a30
feat: finished AwsAddressManager - added creating firewall
Nov 25, 2024
c9d3b75
feat: back to passing hosted_zone_id to AwsAddressManager
Dec 5, 2024
845ec8a
fix: tiny code review fixes
Dec 6, 2024
44ebe33
Add AWSClientFactory
grzegorz-leszczynski-reef Dec 25, 2024
840941e
Use pytest fixture in TestAddressManager
grzegorz-leszczynski-reef Dec 25, 2024
54de75f
Simplify code after CR
grzegorz-leszczynski-reef Dec 25, 2024
d2bc4a4
Upgrade handling AWS API errors
grzegorz-leszczynski-reef Dec 26, 2024
030cba1
Add project dependencies
grzegorz-leszczynski-reef Dec 26, 2024
5e27248
Apply ruff linter fixes
grzegorz-leszczynski-reef Dec 26, 2024
78ad8f2
Add MinerShieldFactory
grzegorz-leszczynski-reef Dec 27, 2024
4af7d75
Implement Validator
grzegorz-leszczynski-reef Dec 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bt_ddos_shield/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def encrypt(self) -> bytes:

@classmethod
@abstractmethod
def decrypt(cls, encrypted_data: bytes) -> Address:
def decrypt(cls, encrypted_data: bytes):
"""
Create address from encrypted address data.

Expand Down
20 changes: 17 additions & 3 deletions bt_ddos_shield/event_processor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
import traceback


@dataclass
class Event:
class MinerShieldEvent:
"""
Class describing event, which happened in the shield.
"""
Expand All @@ -12,17 +13,30 @@ class Event:
exception: Exception = None # Exception which caused the event.


class AbstractEventProcessor(ABC):
class AbstractMinerShieldEventProcessor(ABC):
"""
Abstract base class for processor handling events generated by shield.
"""

@abstractmethod
def add_event(self, event: Event):
def add_event(self, event: MinerShieldEvent):
"""
Add new event to be handled by processor.

Args:
event: Event to add.
"""
pass


class LoggingMinerShieldEventProcessor(AbstractMinerShieldEventProcessor):
"""
Event processor which logs events to console.
"""

def add_event(self, event: MinerShieldEvent):
if event.exception is not None:
print(f"MinerShieldEvent: {event.event_description}\nException happened:")
print(traceback.format_exc())
else:
print(f"MinerShieldEvent: {event.event_description}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This prints, not logs, but I also think it is generally assembled wrong and won't work. No worries, you'll fix it when you start running it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is class mostly (and maybe only) for unit tests. But I change name to PrintingMinerShieldEventProcessor, because probably in the future there will be LoggingMinerShieldEventProcessor. Also it works - just run unit test.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if we ever print that, we'd rather use stderr.
traceback.print_exc() can be a sufficient routine for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed it already in future commits. But now I'm printing exception captured inside MinerShieldEvent. Probably not, but there is possibility, that it wouldn't be last exception thrown and print_exc calls sys.exception() inside to get the last one.

2 changes: 1 addition & 1 deletion bt_ddos_shield/manifest_manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod

from bt_ddos_shield.address import Address
from bt_ddos_shield.miner_shield import Hotkey
from bt_ddos_shield.utils import Hotkey


class AbstractManifestManager(ABC):
andreea-popescu-reef marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
226 changes: 211 additions & 15 deletions bt_ddos_shield/miner_shield.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import threading
from queue import Queue
from dataclasses import dataclass
from time import sleep

from bt_ddos_shield.blockchain_manager import AbstractBlockchainManager
from bt_ddos_shield.event_processor import AbstractEventProcessor
from bt_ddos_shield.event_processor import AbstractMinerShieldEventProcessor, MinerShieldEvent
from bt_ddos_shield.address_manager import AbstractAddressManager
from bt_ddos_shield.utils import Hotkey
from bt_ddos_shield.validators_manager import AbstractValidatorsManager
from bt_ddos_shield.manifest_manager import AbstractManifestManager
from bt_ddos_shield.state_manager import AbstractMinerShieldStateManager


Hotkey = str # type of Hotkey


@dataclass
class MinerShieldOptions:
"""
A class to represent the configuration options for the MinerShield.
A class to represent the configuration options for the MinerShield.
"""

auto_hide_original_server: bool = False # If True, the original server will be hidden after some time after shield
Expand All @@ -23,15 +24,19 @@ class MinerShieldOptions:
auto_hide_delay_sec: int = 600 # Time in seconds after which the original server will be hidden if
# auto_hide_original_server is set to True.

retry_delay: int = 5 # Time in seconds to wait before retrying failed task.


class MinerShield:
"""
Main class to be used by Miner to shield himself from DDoS. Call enable() to start the shield.
Main class to be used by Miner to shield himself from DDoS. Call enable() to start the shield. No methods in
managers should be called directly. All operations are done by worker thread. After starting shield user can
schedule tasks to be executed asynchronously.
"""

def __init__(self, validators_manager: AbstractValidatorsManager, address_manager: AbstractAddressManager,
manifest_manager: AbstractManifestManager, blockchain_manager: AbstractBlockchainManager,
state_manager: AbstractMinerShieldStateManager, event_processor: AbstractEventProcessor,
state_manager: AbstractMinerShieldStateManager, event_processor: AbstractMinerShieldEventProcessor,
options: MinerShieldOptions):
"""
Initialize the MinerShield class.
Expand All @@ -45,30 +50,221 @@ def __init__(self, validators_manager: AbstractValidatorsManager, address_manage
event_processor: Instance of AbstractEventProcessor to handle events generated by the shield.
options: Instance of MinerShieldOptions.
"""
pass
self.validators_manager = validators_manager
self.address_manager = address_manager
self.manifest_manager = manifest_manager
self.blockchain_manager = blockchain_manager
self.state_manager = state_manager
self.event_processor = event_processor
self.options = options

self.worker_thread = None
self.task_queue = Queue()
self.run = False
self.finishing = False

def enable(self):
"""
Enable shield. It asynchronously starts the shield, which consists of such steps:
Enable shield. It starts worker thread, which will do such steps if run for the first time:
1. Fetch validators keys.
2. Creates addresses for all validators.
3. Save manifest file.
4. Publish link to manifest file to blockchain.
5. Eventually close public access to original IP after some time.

It puts events to event_manager after each step. Current state is managed by state_manager. If shielding
process had been interrupted it is continued from the last step.
It puts events to event_manager after each finished operation. Current state is managed by state_manager.
If any error occurs it is retried forever until shield is disabled.

When shield is running, changing validators set triggers shield reconfiguration.
When shield is running, user can schedule tasks to be processed by worker.
"""
pass
if self.worker_thread is not None:
raise Exception("Shield is already enabled")

self.finishing = False
self.run = True
self._add_task(MinerShieldInitializeTask())
self.worker_thread = threading.Thread(target=self._worker_function)
self.worker_thread.start()

def disable(self):
"""
Disable shield. It stops worker thread after finishing current task. Function blocks until worker is stopped.
"""
self._add_task(MinerShieldDisableTask())
self.finishing = True
self.worker_thread.join()
self.worker_thread = None
self.task_queue = Queue() # clear task queue

def ban_validator(self, validator_hotkey: Hotkey):
"""
Ban a validator by its hotkey. Function blocks execution until manifest file is updated and info about file
is published to Bittensor.
Ban a validator by its hotkey. Task will be executed by worker. It will update manifest file and publish info
about new file version to blockchain.

Args:
validator_hotkey: The hotkey of the validator.
"""
self._add_task(MinerShieldBanValidatorTask(validator_hotkey))
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pass


def _add_task(self, task):
"""
Add task to task queue. It will be handled by _worker_function.
"""
if not isinstance(task, MinerShieldTask):
raise Exception("Task is not instance of MinerShieldTask")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _add_task(self, task):
"""
Add task to task queue. It will be handled by _worker_function.
"""
if not isinstance(task, MinerShieldTask):
raise Exception("Task is not instance of MinerShieldTask")
def _add_task(self, task: MinerShieldTask):
"""
Add task to task queue. It will be handled by _worker_function.
"""

Traditionally we don't implement type enforcement in the first lines of every method and function. Rather than running it in runtime, we have external tools for that which perform static analysis (whenever possible).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EncryptionManager was written using this check also in code - so I made this the same. And I forgot to specify type in parameter. Fixed - using trick with string literal, because there are no forward declarations in Python.

if not self.run:
raise Exception("Shield is disabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to raise any exceptions you'll need to

class ShieldException(Exception):
    pass


class ShieldIsDisabled(ShieldException):
    pass

and raise that - this way the user of the module will be able to catch the intended exceptions without catching every exception, which includes KeyError, ValueError etc

However, in this particular case you should raise a built-in ProgrammingError, which is a builtin for cases like that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exceptions added. And I added MinerShieldDisabledException, because _add_task is called at least from ban_validator and user can try to call it after stopping or before starting shield for some reason. Thats why I think MinerShieldDisabledException should be specified.

Except from above, I don't find anything like ProgrammingError (or ProgrammingException) - there is such class but in sqllite3 module.


self.task_queue.put(task)

def _worker_function(self):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need threads? can't this be done with asyncio tasks?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for example def _add_task(self, task: 'AbstractMinerShieldTask'): could mostly be replaced with await task

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it can be done with asyncio tasks, because I don't wait here for the result and task is retried forever by worker until shield gets stopped (this behaviour is by design). I will learn more about asyncio and then think if it can be changed, because my knowledge about asyncio is very small now. I wrote it in my notes for future upgrade but as for now I think this can be left as it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read something, thought about it, used AI and probably this can be refactored. But when current behavior is to be maintained, using asyncio won't change too much - I can use asyncio.to_thread to run _worker_function, use asyncio.Queue() instead of just Queue and make rest of needed changes, but logic and code structure will looks nearly the same (if I'm thinking correctly). So my question is, if it is worth time to change it, if it already works? I don't know the answer... Next time, when doing similar job, I'll try to use asyncio, but should I spend time for changing it here?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say this is worth refactoring - there's no point in reimplementing the logic of a lib so common as asyncio. (https://docs.python.org/3/library/asyncio-task.html#awaitables)
The refactor wouldn't take long and it'll make the code much easier to follow for any py dev.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll do it. If my thoughts about how to do it sounds good?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see note in https://docs.python.org/3/library/asyncio-task.html#asyncio.to_thread about the GIL - I'd rather use asyncio.create_task instead of to_thread since there's no need for multiple threads. For the "io heavy" db work there is optimized native asyncio support in sqlalchemy https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html.
the Queue is actually the event loop from asyncio - it shouldn't be needed

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

although you might want to keep the queue just for preventing gc of the task... see Important: https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task

"""
Function called in separate thread by enable() to start the shield. It is handling events put to task_queue.
"""

self.event_processor.add_event(MinerShieldEvent(f"Starting shield"))

while self.run:
task = self.task_queue.get()
try_count = 1

while self.run:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
try_count = 1
while self.run:
try_count: int = 0
while try_count <= self.options.max_retries:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want tasks to be retried forever and I think this should be default behaviour - we can eventually talk about this. But I added new option to allow limited retrying.

self.event_processor.add_event(MinerShieldEvent(f"Handling task {task}, try {try_count}"))

try:
task.handle(self)
self.event_processor.add_event(MinerShieldEvent(f"Task {task} finished successfully"))
break
except Exception as e:
self.event_processor.add_event(MinerShieldEvent(f"Error during handling task {task}", e))

if self.finishing:
break

try_count += 1
sleep(self.options.retry_delay)

self.event_processor.add_event(MinerShieldEvent(f"Stopping shield"))

def _handle_initialize(self):
"""
Initialize shield. Load state and initial validators set.
"""
self.state_manager.get_state()
self.event_processor.add_event(MinerShieldEvent("State loaded"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than seeing

self.event_processor.add_event(MinerShieldEvent("State loaded"))

everywhere, I'd prefer to see

self._event("State loaded")

with

def _event(self, description: str, *args, **kwargs):
    return self.event_processor.add_event(self.event_class(description, *args, **kwargs))

and event_class=MinerShieldEvent in the constructor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed. But as for now I haven't added self.event_class, because it needs more changes - at least in AbstractMinerShieldEventProcessor, where type for event is hardcoded now. I think we can leave it for future.


self.validators_manager.refresh_validators()
validators: dict[Hotkey, str] = self.validators_manager.get_validators()
self.event_processor.add_event(MinerShieldEvent(f"Validators refreshed, got {len(validators)} validators"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.event_processor.add_event(MinerShieldEvent(f"Validators refreshed, got {len(validators)} validators"))
self._event("Validators refreshed", validator_count=len(validators)))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the MinerShieldEvent class will need to get a little bit more complicated, but we'll be able to emit smart objects that can then be filtered, sorted etc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change this in future commits, because this needs more work, but I added TODO in my notes. I also think I need to talk how this should done the best way, because there is PrintingMinerShieldEventProcessor for example.


self._add_task(MinerShieldValidatorsChangedTask())

def _handle_disable(self):
self.run = False

def _handle_validators_changed(self):
"""
Calculates difference between newly fetched validators set and one saved in state and run logic for any changes.
"""

# get current state and recently fetched validators
current_state = self.state_manager.get_state()
fetched_validators: dict[Hotkey, str] = self.validators_manager.get_validators()

# remove banned validators from fetched validators
for banned_validator in current_state.banned_validators.keys():
fetched_validators.pop(banned_validator, None)

# calculate difference between current state and fetched validators
deprecated_validators = current_state.known_validators.keys() - fetched_validators.keys()
new_validators = fetched_validators.keys() - current_state.known_validators.keys()
changed_validators = {
k: fetched_validators[k] for k in fetched_validators.keys() & current_state.known_validators.keys()
if fetched_validators[k] != current_state.known_validators[k]
}

# handle changes in validators

self.event_processor.add_event(MinerShieldEvent(
f"Handling validators change, deprecated_validators count={len(deprecated_validators)}"
f", new_validators count={len(new_validators)}, changed_validators count={len(changed_validators)}")
)

for validator in deprecated_validators:
self.event_processor.add_event(MinerShieldEvent(f"Removing validator {validator}"))

if validator in current_state.active_addresses:
self.address_manager.remove_address(current_state.active_addresses[validator])

self.state_manager.remove_validator(validator)

# TODO handle new_validators and changed_validators

if deprecated_validators or new_validators or changed_validators:
# if anything changed update manifest file and publish new version to blockchain
# TODO also check state of shield if manifest was published at all
pass

def _handle_ban_validator(self, validator_hotkey: Hotkey):
"""
Ban validator by its hotkey. It will update manifest file and publish info about new file version to blockchain.
"""
# TODO
pass


class MinerShieldTask:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class MinerShieldTask:
class AbstractMinerShieldTask(ABC):

"""
Task to be executed by shield worker.
"""

def __init__(self, task_name: str):
"""
Initialize task.

Args:
task_name: Short name of the task.
"""
self.task_name = task_name

def handle(self, miner_shield: MinerShield):
"""
Run task logic.

Args
miner_shield: Instance of MinerShield in which task is executed.
"""
pass

def __repr__(self):
return self.task_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def __init__(self, task_name: str):
"""
Initialize task.
Args:
task_name: Short name of the task.
"""
self.task_name = task_name
def handle(self, miner_shield: MinerShield):
"""
Run task logic.
Args
miner_shield: Instance of MinerShield in which task is executed.
"""
pass
def __repr__(self):
return self.task_name
NAME_DELETER = re.compile(r'^MinerShield(.*)Task$')
@abstractmethod
def run(self, miner_shield: MinerShield):
pass
def __repr__(self):
return self.NAME_DELETER.sub(r'\1', self.__class__.__name__)


class MinerShieldInitializeTask(MinerShieldTask):
def __init__(self):
super().__init__("INITIALIZE_SHIELD")

def handle(self, miner_shield: MinerShield):
miner_shield._handle_initialize()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be refactored away into the abstract class too, but it's not worth it - we wouldn't save much anyway and it'd confuse static code analyzers


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

class MinerShieldDisableTask(MinerShieldTask):
def __init__(self):
super().__init__("DISABLE_SHIELD")

def handle(self, miner_shield: MinerShield):
miner_shield._handle_disable()

class MinerShieldValidatorsChangedTask(MinerShieldTask):
def __init__(self):
super().__init__("VALIDATORS_CHANGED")

def handle(self, miner_shield: MinerShield):
miner_shield._handle_validators_changed()

class MinerShieldBanValidatorTask(MinerShieldTask):
def __init__(self, validator_hotkey: Hotkey):
super().__init__("BAN_VALIDATOR")
self.validator_hotkey = validator_hotkey

def handle(self, miner_shield: MinerShield):
miner_shield._handle_ban_validator(self.validator_hotkey)
Loading