Skip to content

Commit

Permalink
feat!: stateless runner (#82)
Browse files Browse the repository at this point in the history
* chore: add packaging as peer dep w/ eth-ape

* refactor: add taskiq utility function

* feat!: add system configuration task

* refactor: reorder where broker startup occurs

also added a note on how to use system startup tasks

* feat: add system task to fetch task data from worker; refactor Runner

* refactor!: remove start block from polling runner

* refactor!: rename user tasks, change NEW_BLOCKS to NEW_BLOCK

* fix: allow either string or set of exceptions for startup failure

* refactor: move system task results to debug logger in middleware

* fix: typing didn't match use in runner

* refactor: remove storing full EventABI in favor of rebuilding from sig

* fix: unused import

* refactor!: update lower pin on system task to next reelase
  • Loading branch information
fubuloubu authored May 11, 2024
1 parent 0211cfe commit f6d02b5
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 110 deletions.
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
"eth-ape>=0.7,<1.0",
"ethpm-types>=0.6.10", # lower pin only, `eth-ape` governs upper pin
"eth-pydantic-types", # Use same version as eth-ape
"packaging", # Use same version as eth-ape
"pydantic_settings", # Use same version as eth-ape
"taskiq[metrics]>=0.11.3,<0.12",
],
Expand Down
95 changes: 73 additions & 22 deletions silverback/application.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,36 @@
import atexit
from collections import defaultdict
from dataclasses import dataclass
from datetime import timedelta
from typing import Callable
from typing import Any, Callable

from ape.api.networks import LOCAL_NETWORK_NAME
from ape.contracts import ContractEvent, ContractInstance
from ape.logging import logger
from ape.managers.chain import BlockContainer
from ape.utils import ManagerAccessMixin
from packaging.version import Version
from pydantic import BaseModel
from taskiq import AsyncTaskiqDecoratedTask, TaskiqEvents

from .exceptions import ContainerTypeMismatchError, InvalidContainerTypeError
from .settings import Settings
from .types import SilverbackID, TaskType


@dataclass
class TaskData:
container: BlockContainer | ContractEvent | None
handler: AsyncTaskiqDecoratedTask
class SystemConfig(BaseModel):
# NOTE: Do not change this datatype unless major breaking

# NOTE: Useful for determining if Runner can handle this app
sdk_version: str
# NOTE: Useful for specifying what task types can be specified by app
task_types: list[str]


class TaskData(BaseModel):
# NOTE: Data we need to know how to call a task via kicker
name: str # Name of user function
labels: dict[str, Any]

# NOTE: Any other items here must have a default value


class SilverbackApp(ManagerAccessMixin):
Expand Down Expand Up @@ -66,28 +77,65 @@ def __init__(self, settings: Settings | None = None):
logger.info(f"Loading Silverback App with settings:\n {settings_str}")

self.broker = settings.get_broker()
# NOTE: If no tasks registered yet, defaults to empty list instead of raising KeyError
self.tasks: defaultdict[TaskType, list[TaskData]] = defaultdict(list)
self.tasks: dict[TaskType, list[TaskData]] = {
task_type: []
for task_type in TaskType
# NOTE: Dont track system tasks
if not str(task_type).startswith("system:")
}
self.poll_settings: dict[str, dict] = {}

atexit.register(provider_context.__exit__, None, None, None)

self.signer = settings.get_signer()
self.new_block_timeout = settings.NEW_BLOCK_TIMEOUT
self.start_block = settings.START_BLOCK

signer_str = f"\n SIGNER={repr(self.signer)}"
start_block_str = f"\n START_BLOCK={self.start_block}" if self.start_block else ""
new_block_timeout_str = (
f"\n NEW_BLOCK_TIMEOUT={self.new_block_timeout}" if self.new_block_timeout else ""
)

network_choice = f"{self.identifier.ecosystem}:{self.identifier.network}"
logger.success(
f'Loaded Silverback App:\n NETWORK="{network_choice}"'
f"{signer_str}{start_block_str}{new_block_timeout_str}"
f"{signer_str}{new_block_timeout_str}"
)

# NOTE: Runner must call this to configure itself for all SDK hooks
self._get_system_config = self.__register_system_task(
TaskType.SYSTEM_CONFIG, self.__get_system_config_handler
)
# NOTE: Register other system tasks here
self._get_user_taskdata = self.__register_system_task(
TaskType.SYSTEM_USER_TASKDATA, self.__get_user_taskdata_handler
)

def __register_system_task(
self, task_type: TaskType, task_handler: Callable
) -> AsyncTaskiqDecoratedTask:
assert str(task_type).startswith("system:"), "Can only add system tasks"
# NOTE: This has to be registered with the broker in the worker
return self.broker.register_task(
task_handler,
# NOTE: Name makes it impossible to conflict with user's handler fn names
task_name=str(task_type),
task_type=str(task_type),
)

def __get_system_config_handler(self) -> SystemConfig:
# NOTE: This is actually executed on the worker side
from silverback.version import __version__

return SystemConfig(
sdk_version=Version(__version__).base_version,
task_types=[str(t) for t in TaskType],
)

def __get_user_taskdata_handler(self, task_type: TaskType) -> list[TaskData]:
# NOTE: This is actually executed on the worker side
assert str(task_type).startswith("user:"), "Can only fetch user task data"
return self.tasks.get(task_type, [])

def broker_task_decorator(
self,
task_type: TaskType,
Expand All @@ -110,12 +158,12 @@ def broker_task_decorator(
type it should handle.
"""
if (
(task_type is TaskType.NEW_BLOCKS and not isinstance(container, BlockContainer))
(task_type is TaskType.NEW_BLOCK and not isinstance(container, BlockContainer))
or (task_type is TaskType.EVENT_LOG and not isinstance(container, ContractEvent))
or (
task_type
not in (
TaskType.NEW_BLOCKS,
TaskType.NEW_BLOCK,
TaskType.EVENT_LOG,
)
and container is not None
Expand All @@ -132,19 +180,22 @@ def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask:
# we only want to determine if it is not None
if container is not None and isinstance(container, ContractEvent):
# Address is almost a certainty if the container is being used as a filter here.
if contract_address := getattr(container.contract, "address", None):
labels["contract_address"] = contract_address
labels["event_signature"] = f"{container.abi.signature}"
if not (contract_address := getattr(container.contract, "address", None)):
raise InvalidContainerTypeError(
"Please provider a contract event from a valid contract instance."
)

labels["contract_address"] = contract_address
labels["event_signature"] = container.abi.signature

broker_task = self.broker.register_task(
self.tasks[task_type].append(TaskData(name=handler.__name__, labels=labels))

return self.broker.register_task(
handler,
task_name=handler.__name__,
**labels,
)

self.tasks[task_type].append(TaskData(container=container, handler=broker_task))
return broker_task

return add_taskiq_task

def on_startup(self) -> Callable:
Expand Down Expand Up @@ -228,7 +279,7 @@ def on_(
else:
self.poll_settings["_blocks_"] = {"start_block": start_block}

return self.broker_task_decorator(TaskType.NEW_BLOCKS, container=container)
return self.broker_task_decorator(TaskType.NEW_BLOCK, container=container)

elif isinstance(container, ContractEvent) and isinstance(
container.contract, ContractInstance
Expand Down
8 changes: 5 additions & 3 deletions silverback/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Sequence
from typing import Any

from ape.exceptions import ApeException

Expand Down Expand Up @@ -32,8 +32,10 @@ class SilverbackException(ApeException):

# TODO: `ExceptionGroup` added in Python 3.11
class StartupFailure(SilverbackException):
def __init__(self, *exceptions: Sequence[Exception]):
if error_str := "\n".join(str(e) for e in exceptions):
def __init__(self, *exceptions: Exception | str):
if len(exceptions) == 1 and isinstance(exceptions[0], str):
super().__init__(exceptions[0])
elif error_str := "\n".join(str(e) for e in exceptions):
super().__init__(f"Startup failure(s):\n{error_str}")
else:
super().__init__("Startup failure(s) detected. See logs for details.")
Expand Down
12 changes: 8 additions & 4 deletions silverback/middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
return message # Not a silverback task

# Add extra labels for our task to see what their source was
if task_type is TaskType.NEW_BLOCKS:
if task_type is TaskType.NEW_BLOCK:
# NOTE: Necessary because we don't know the exact block class
block = message.args[0] = self.provider.network.ecosystem.decode_block(
hexbytes_dict(message.args[0])
Expand All @@ -96,8 +96,12 @@ def post_execute(self, message: TaskiqMessage, result: TaskiqResult):
else:
percent_display = ""

(logger.error if result.error else logger.success)(
f"{self._create_label(message)} " f"- {result.execution_time:.3f}s{percent_display}"
)
msg = f"{self._create_label(message)} " f"- {result.execution_time:.3f}s{percent_display}"
if result.is_err:
logger.error(msg)
elif message.task_name.startswith("system:"):
logger.debug(msg)
else:
logger.success(msg)

# NOTE: Unless stdout is ignored, error traceback appears in stdout, no need for `on_error`
Loading

0 comments on commit f6d02b5

Please sign in to comment.