Skip to content

Commit

Permalink
refactor: use Python 3.9+ typing style
Browse files Browse the repository at this point in the history
  • Loading branch information
fubuloubu committed May 2, 2024
1 parent e729115 commit e087853
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 72 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Read the [development userguide](https://docs.apeworx.io/silverback/stable/userg

## Dependencies

- [python3](https://www.python.org/downloads) version 3.8 or greater, python3-dev
- [python3](https://www.python.org/downloads) version 3.9 or greater, python3-dev

## Installation

Expand Down
5 changes: 2 additions & 3 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import sys
from functools import lru_cache
from pathlib import Path
from typing import List

import requests
from semantic_version import Version # type: ignore
Expand Down Expand Up @@ -43,7 +42,7 @@
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path.
exclude_patterns: List[str] = ["_build", ".DS_Store"]
exclude_patterns: list[str] = ["_build", ".DS_Store"]


# The suffix(es) of source filenames.
Expand Down Expand Up @@ -94,7 +93,7 @@ def fixpath(path: str) -> str:


@lru_cache(maxsize=None)
def get_versions() -> List[str]:
def get_versions() -> list[str]:
"""
Get all the versions from the Web.
"""
Expand Down
2 changes: 1 addition & 1 deletion example.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Annotated # NOTE: Only Python 3.9+
from typing import Annotated

from ape import chain
from ape.api import BlockAPI
Expand Down
26 changes: 13 additions & 13 deletions silverback/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from collections import defaultdict
from dataclasses import dataclass
from datetime import timedelta
from typing import Callable, Dict, Optional, Union
from typing import Callable

from ape.api.networks import LOCAL_NETWORK_NAME
from ape.contracts import ContractEvent, ContractInstance
Expand All @@ -18,7 +18,7 @@

@dataclass
class TaskData:
container: Union[BlockContainer, ContractEvent, None]
container: BlockContainer | ContractEvent | None
handler: AsyncTaskiqDecoratedTask


Expand All @@ -35,12 +35,12 @@ class SilverbackApp(ManagerAccessMixin):
... # Connection has been initialized, can call broker methods e.g. `app.on_(...)`
"""

def __init__(self, settings: Optional[Settings] = None):
def __init__(self, settings: Settings | None = None):
"""
Create app
Args:
settings (Optional[~:class:`silverback.settings.Settings`]): Settings override.
settings (~:class:`silverback.settings.Settings` | None): Settings override.
Defaults to environment settings.
"""
if not settings:
Expand All @@ -62,7 +62,7 @@ def __init__(self, settings: Optional[Settings] = None):
self.broker = settings.get_broker()
# NOTE: If no tasks registered yet, defaults to empty list instead of raising KeyError
self.tasks: defaultdict[TaskType, list[TaskData]] = defaultdict(list)
self.poll_settings: Dict[str, Dict] = {}
self.poll_settings: dict[str, dict] = {}

atexit.register(self.network.__exit__, None, None, None)

Expand All @@ -84,14 +84,14 @@ def __init__(self, settings: Optional[Settings] = None):
def broker_task_decorator(
self,
task_type: TaskType,
container: Union[BlockContainer, ContractEvent, None] = None,
container: BlockContainer | ContractEvent | None = None,
) -> Callable[[Callable], AsyncTaskiqDecoratedTask]:
"""
Dynamically create a new broker task that handles tasks of ``task_type``.
Args:
task_type: :class:`~silverback.types.TaskType`: The type of task to create.
container: (Union[BlockContainer, ContractEvent]): The event source to watch.
container: (BlockContainer | ContractEvent): The event source to watch.
Returns:
Callable[[Callable], :class:`~taskiq.AsyncTaskiqDecoratedTask`]:
Expand Down Expand Up @@ -187,18 +187,18 @@ def do_something_on_shutdown(state):

def on_(
self,
container: Union[BlockContainer, ContractEvent],
new_block_timeout: Optional[int] = None,
start_block: Optional[int] = None,
container: BlockContainer | ContractEvent,
new_block_timeout: int | None = None,
start_block: int | None = None,
):
"""
Create task to handle events created by `container`.
Args:
container: (Union[BlockContainer, ContractEvent]): The event source to watch.
new_block_timeout: (Optional[int]): Override for block timeout that is acceptable.
container: (BlockContainer | ContractEvent): The event source to watch.
new_block_timeout: (int | None): Override for block timeout that is acceptable.
Defaults to whatever the app's settings are for default polling timeout are.
start_block (Optional[int]): block number to start processing events from.
start_block (int | None): block number to start processing events from.
Defaults to whatever the latest block is.
Raises:
Expand Down
28 changes: 14 additions & 14 deletions silverback/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import sqlite3
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from typing import Optional, TypeVar
from typing import TypeVar

from pydantic import BaseModel
from taskiq import TaskiqResult
Expand All @@ -28,17 +28,17 @@ class HandlerResult(TaskiqResult):
instance: str
network: str
handler_id: str
block_number: Optional[int]
log_index: Optional[int]
block_number: int | None
log_index: int | None
created: datetime

@classmethod
def from_taskiq(
cls,
ident: SilverbackID,
handler_id: str,
block_number: Optional[int],
log_index: Optional[int],
block_number: int | None,
log_index: int | None,
result: TaskiqResult,
) -> Self:
return cls(
Expand All @@ -59,21 +59,21 @@ async def init(self):
...

@abstractmethod
async def get_state(self, ident: SilverbackID) -> Optional[SilverbackState]:
async def get_state(self, ident: SilverbackID) -> SilverbackState |None:
"""Return the stored state for a Silverback instance"""
...

@abstractmethod
async def set_state(
self, ident: SilverbackID, last_block_seen: int, last_block_processed: int
) -> Optional[SilverbackState]:
) -> SilverbackState | None:
"""Set the stored state for a Silverback instance"""
...

@abstractmethod
async def get_latest_result(
self, ident: SilverbackID, handler: Optional[str] = None
) -> Optional[HandlerResult]:
self, ident: SilverbackID, handler: str |None = None
) -> HandlerResult | None:
"""Return the latest result for a Silverback instance's handler"""
...

Expand Down Expand Up @@ -136,7 +136,7 @@ class SQLiteRecorder(BaseRecorder):
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);
"""

con: Optional[sqlite3.Connection]
con: sqlite3.Connection | None
initialized: bool = False

async def init(self):
Expand Down Expand Up @@ -182,7 +182,7 @@ async def init(self):

self.initialized = True

async def get_state(self, ident: SilverbackID) -> Optional[SilverbackState]:
async def get_state(self, ident: SilverbackID) -> SilverbackState | None:
if not self.initialized:
await self.init()

Expand Down Expand Up @@ -210,7 +210,7 @@ async def get_state(self, ident: SilverbackID) -> Optional[SilverbackState]:

async def set_state(
self, ident: SilverbackID, last_block_seen: int, last_block_processed: int
) -> Optional[SilverbackState]:
) -> SilverbackState | None:
if not self.initialized:
await self.init()

Expand Down Expand Up @@ -261,8 +261,8 @@ async def set_state(
)

async def get_latest_result(
self, ident: SilverbackID, handler: Optional[str] = None
) -> Optional[HandlerResult]:
self, ident: SilverbackID, handler: str | None = None
) -> HandlerResult | None:
if not self.initialized:
await self.init()

Expand Down
5 changes: 2 additions & 3 deletions silverback/runner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
from abc import ABC, abstractmethod
from typing import Optional, Tuple

from ape import chain
from ape.contracts import ContractEvent, ContractInstance
Expand Down Expand Up @@ -28,7 +27,7 @@ def __init__(self, app: SilverbackApp, *args, max_exceptions: int = 3, **kwargs)
self.exceptions = 0
self.last_block_seen = 0
self.last_block_processed = 0
self.recorder: Optional[BaseRecorder] = None
self.recorder: BaseRecorder | None = None
self.ident = SilverbackID.from_settings(settings)

def _handle_result(self, result: TaskiqResult):
Expand All @@ -43,7 +42,7 @@ def _handle_result(self, result: TaskiqResult):

async def _checkpoint(
self, last_block_seen: int = 0, last_block_processed: int = 0
) -> Tuple[int, int]:
) -> tuple[int, int]:
"""Set latest checkpoint block number"""
if (
last_block_seen > self.last_block_seen
Expand Down
78 changes: 47 additions & 31 deletions silverback/settings.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from typing import List, Optional

from ape.api import AccountAPI, ProviderContextManager
from ape.utils import ManagerAccessMixin
from pydantic_settings import BaseSettings, SettingsConfigDict
from taskiq import AsyncBroker, InMemoryBroker, PrometheusMiddleware, TaskiqMiddleware
from taskiq import (
AsyncBroker,
AsyncResultBackend,
InMemoryBroker,
PrometheusMiddleware,
TaskiqMiddleware,
)

from ._importer import import_from_string
from .middlewares import SilverbackMiddleware
Expand Down Expand Up @@ -32,14 +36,34 @@ class Settings(BaseSettings, ManagerAccessMixin):
NETWORK_CHOICE: str = ""
SIGNER_ALIAS: str = ""

NEW_BLOCK_TIMEOUT: Optional[int] = None
START_BLOCK: Optional[int] = None
NEW_BLOCK_TIMEOUT: int | None = None
START_BLOCK: int | None = None

# Used for recorder
RECORDER_CLASS: Optional[str] = None
RECORDER_CLASS: str | None = None

model_config = SettingsConfigDict(env_prefix="SILVERBACK_", case_sensitive=True)

def get_middlewares(self) -> list[TaskiqMiddleware]:
middlewares: list[TaskiqMiddleware] = [
# Built-in middlewares (required)
SilverbackMiddleware(silverback_settings=self),
]

if self.ENABLE_METRICS:
middlewares.append(
PrometheusMiddleware(server_addr="0.0.0.0", server_port=9000),
)

return middlewares

def get_result_backend(self) -> AsyncResultBackend | None:
if not (backend_cls_str := self.RESULT_BACKEND_CLASS):
return None

result_backend_cls = import_from_string(backend_cls_str)
return result_backend_cls(self.RESULT_BACKEND_URI)

def get_broker(self) -> AsyncBroker:
broker_class = import_from_string(self.BROKER_CLASS)
if broker_class == InMemoryBroker:
Expand All @@ -49,47 +73,39 @@ def get_broker(self) -> AsyncBroker:
# TODO: Not all brokers share a common arg signature.
broker = broker_class(self.BROKER_URI or None)

middlewares: List[TaskiqMiddleware] = [SilverbackMiddleware(silverback_settings=self)]
if middlewares := self.get_middlewares():
broker = broker.with_middlewares(*middlewares)

if self.ENABLE_METRICS:
middlewares.append(
PrometheusMiddleware(server_addr="0.0.0.0", server_port=9000),
)

broker = broker.with_middlewares(*middlewares)

if self.RESULT_BACKEND_CLASS:
result_backend_class = import_from_string(self.RESULT_BACKEND_CLASS)
result_backend = result_backend_class(self.RESULT_BACKEND_URI)
if result_backend := self.get_result_backend():
broker = broker.with_result_backend(result_backend)

return broker

def get_network_choice(self) -> str:
return self.NETWORK_CHOICE or self.network_manager.network.choice

def get_recorder(self) -> Optional[BaseRecorder]:
if not self.RECORDER_CLASS:
def get_recorder(self) -> BaseRecorder | None:
if not (recorder_cls_str := self.RECORDER_CLASS):
return None

recorder_class = import_from_string(self.RECORDER_CLASS)
recorder_class = import_from_string(recorder_cls_str)
return recorder_class()

def get_provider_context(self) -> ProviderContextManager:
# NOTE: Bit of a workaround for adhoc connections:
# https://github.com/ApeWorX/ape/issues/1762
if "adhoc" in self.get_network_choice():
if "adhoc" in (network_choice := self.get_network_choice()):
return ProviderContextManager(provider=self.provider)
return self.network_manager.parse_network_choice(self.get_network_choice())
return self.network_manager.parse_network_choice(network_choice)

def get_signer(self) -> Optional[AccountAPI]:
if self.SIGNER_ALIAS:
if self.SIGNER_ALIAS.startswith("TEST::"):
acct_idx = int(self.SIGNER_ALIAS.replace("TEST::", ""))
return self.account_manager.test_accounts[acct_idx]
def get_signer(self) -> AccountAPI | None:
if not (alias := self.SIGNER_ALIAS):
# NOTE: Useful if user wants to add a "paper trading" mode
return None

# NOTE: Will only have a signer if assigned one here (or in app)
return self.account_manager.load(self.SIGNER_ALIAS)
if alias.startswith("TEST::"):
acct_idx = int(alias.replace("TEST::", ""))
return self.account_manager.test_accounts[acct_idx]

# NOTE: Useful if user wants to add a "paper trading" mode
return None
# NOTE: Will only have a signer if assigned one here (or in app)
return self.account_manager.load(alias)
8 changes: 4 additions & 4 deletions silverback/subscriptions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import json
from enum import Enum
from typing import AsyncGenerator, Dict, List, Optional
from typing import AsyncGenerator

from ape.logging import logger
from websockets import ConnectionClosedError
Expand All @@ -26,10 +26,10 @@ def __init__(self, ws_provider_uri: str):
self._ws_provider_uri = ws_provider_uri

# Stateful
self._connection: Optional[ws_client.WebSocketClientProtocol] = None
self._connection: ws_client.WebSocketClientProtocol | None = None
self._last_request: int = 0
self._subscriptions: Dict[str, asyncio.Queue] = {}
self._rpc_msg_buffer: List[dict] = []
self._subscriptions: dict[str, asyncio.Queue] = {}
self._rpc_msg_buffer: list[dict] = []
self._ws_lock = asyncio.Lock()

def __repr__(self) -> str:
Expand Down
Loading

0 comments on commit e087853

Please sign in to comment.