diff --git a/silverback/middlewares.py b/silverback/middlewares.py index 4e713d73..3774b8ac 100644 --- a/silverback/middlewares.py +++ b/silverback/middlewares.py @@ -6,7 +6,7 @@ from eth_utils.conversions import to_hex from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult -from silverback.persistence import HandlerResult +from silverback.recorder import HandlerResult from silverback.types import SilverbackID, handler_id_block, handler_id_event from silverback.utils import hexbytes_dict @@ -43,7 +43,7 @@ def compute_block_time() -> int: self.block_time = self.chain_manager.provider.network.block_time or compute_block_time() self.ident = SilverbackID.from_settings(settings) - self.persistence = settings.get_persistent_store() + self.recorder = settings.get_recorder() def pre_send(self, message: TaskiqMessage) -> TaskiqMessage: # TODO: Necessary because bytes/HexBytes doesn't encode/deocde well for some reason @@ -99,7 +99,7 @@ def post_execute(self, message: TaskiqMessage, result: TaskiqResult): ) async def post_save(self, message: TaskiqMessage, result: TaskiqResult): - if not self.persistence: + if not self.recorder: return handler_id, block_number, log_index = resolve_task(message) @@ -109,7 +109,7 @@ async def post_save(self, message: TaskiqMessage, result: TaskiqResult): ) try: - await self.persistence.add_result(handler_result) + await self.recorder.add_result(handler_result) except Exception as err: logger.error(f"Error storing result: {err}") diff --git a/silverback/persistence.py b/silverback/recorder.py similarity index 96% rename from silverback/persistence.py rename to silverback/recorder.py index 2c2b4e80..dec03028 100644 --- a/silverback/persistence.py +++ b/silverback/recorder.py @@ -52,7 +52,7 @@ def from_taskiq( ) -class BasePersistentStore(ABC): +class BaseRecorder(ABC): @abstractmethod async def init(self): """Handle any async initialization from Silverback settings (e.g. migrations).""" @@ -83,16 +83,16 @@ async def add_result(self, v: HandlerResult): ... -class SQLitePersistentStore(BasePersistentStore): +class SQLiteRecorder(BaseRecorder): """ - SQLite implementation of BasePersistentStore used to store application state and handler + SQLite implementation of BaseRecorder used to store application state and handler result data. Usage: - To use SQLite persistent store, you must configure the following env vars: + To use SQLite recorder, you must configure the following env vars: - - `PERSISTENCE_CLASS`: `silverback.persistence.SQLitePersistentStore` + - `RECORDER_CLASS`: `silverback.recorder.SQLiteRecorder` - `SQLITE_PATH` (optional): A system file path or if blank it will be stored in-memory. """ diff --git a/silverback/runner.py b/silverback/runner.py index 49580586..39968f04 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -11,7 +11,7 @@ from .application import SilverbackApp from .exceptions import Halt, NoWebsocketAvailableError -from .persistence import BasePersistentStore +from .recorder import BaseRecorder from .settings import Settings from .subscriptions import SubscriptionType, Web3SubscriptionsManager from .types import SilverbackID, SilverbackStartupState @@ -28,7 +28,7 @@ def __init__(self, app: SilverbackApp, *args, max_exceptions: int = 3, **kwargs) self.exceptions = 0 self.last_block_seen = 0 self.last_block_processed = 0 - self.persistence: Optional[BasePersistentStore] = None + self.recorder: Optional[BaseRecorder] = None self.ident = SilverbackID.from_settings(settings) def _handle_result(self, result: TaskiqResult): @@ -58,9 +58,9 @@ async def _checkpoint( self.last_block_seen = max(last_block_seen, self.last_block_seen) self.last_block_processed = max(last_block_processed, self.last_block_processed) - if self.persistence: + if self.recorder: try: - await self.persistence.set_state( + await self.recorder.set_state( self.ident, self.last_block_seen, self.last_block_processed ) except Exception as err: @@ -92,10 +92,10 @@ async def run(self): Raises: :class:`~silverback.exceptions.Halt`: If there are no configured tasks to execute. """ - self.persistence = settings.get_persistent_store() + self.recorder = settings.get_recorder() - if self.persistence: - boot_state = await self.persistence.get_state(self.ident) + if self.recorder: + boot_state = await self.recorder.get_state(self.ident) if boot_state: self.last_block_seen = boot_state.last_block_seen self.last_block_processed = boot_state.last_block_processed diff --git a/silverback/settings.py b/silverback/settings.py index 02ecc888..f1ab3cbc 100644 --- a/silverback/settings.py +++ b/silverback/settings.py @@ -7,7 +7,7 @@ from ._importer import import_from_string from .middlewares import SilverbackMiddleware -from .persistence import BasePersistentStore +from .recorder import BaseRecorder class Settings(BaseSettings, ManagerAccessMixin): @@ -35,8 +35,8 @@ class Settings(BaseSettings, ManagerAccessMixin): NEW_BLOCK_TIMEOUT: Optional[int] = None START_BLOCK: Optional[int] = None - # Used for persistent store - PERSISTENCE_CLASS: Optional[str] = None + # Used for recorder + RECORDER_CLASS: Optional[str] = None model_config = SettingsConfigDict(env_prefix="SILVERBACK_", case_sensitive=True) @@ -68,12 +68,12 @@ def get_broker(self) -> AsyncBroker: def get_network_choice(self) -> str: return self.NETWORK_CHOICE or self.network_manager.network.choice - def get_persistent_store(self) -> Optional[BasePersistentStore]: - if not self.PERSISTENCE_CLASS: + def get_recorder(self) -> Optional[BaseRecorder]: + if not self.RECORDER_CLASS: return None - persistence_class = import_from_string(self.PERSISTENCE_CLASS) - return persistence_class() + recorder_class = import_from_string(self.RECORDER_CLASS) + return recorder_class() def get_provider_context(self) -> ProviderContextManager: # NOTE: Bit of a workaround for adhoc connections: