Skip to content

Commit

Permalink
Feature/val 1404 eip 7251 head watcher alerts for new el requests (#58)
Browse files Browse the repository at this point in the history
* feat(val-1404): alerts for new EL requests

* fix: pin alertmanager to compatible version because headwatcher use the deprecated /api/v1/alerts endpoint

* chore: .gitignore Docker volumes

* docs: update documentation to align with code

* fix: correct keyword casing to prevent Docker warnings; fix LegacyKeyValueFormat for ENV

* feat(val-1404): group similar alerts

* feat(val-1404): lazy VALID_WITHDRAWAL_ADDRESSES

---------

Co-authored-by: AlexanderLukin <alexanderlukin9@gmail.com>
  • Loading branch information
dputko and AlexanderLukin authored Feb 4, 2025
1 parent dc36bc7 commit 6b973ff
Show file tree
Hide file tree
Showing 20 changed files with 674 additions and 19 deletions.
4 changes: 2 additions & 2 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ KEYS_API_URI=URL_TO_KEYS_API
LIDO_LOCATOR_ADDRESS=ETHEREUM_ADDRESS
EXECUTION_CLIENT_URI=URL_TO_EL_API

# For option when KEYS_SOURCE is 'keys_file'
# For option when KEYS_SOURCE is 'file'
# CONSENSUS_CLIENT_URI: URL_TO_CL_API
# KEYS_SOURCE: keys_file
# KEYS_SOURCE: file
# KEYS_FILE_PATH: path/to/keys.yml
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,6 @@ dmypy.json

# IDE
.idea/

# Docker
.volumes
12 changes: 6 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
FROM python:3.11.3-slim as base
FROM python:3.11.3-slim AS base

RUN apt-get update && apt-get install -y --no-install-recommends -qq \
gcc=4:10.2.1-1 \
libffi-dev=3.3-6 \
g++=4:10.2.1-1 \
git=1:2.30.2-1+deb11u2 \
curl=7.74.0-1.3+deb11u12 \
curl=7.74.0-1.3+deb11u14 \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

Expand All @@ -20,7 +20,7 @@ ENV PYTHONUNBUFFERED=1 \

ENV PATH="$VENV_PATH/bin:$PATH"

FROM base as builder
FROM base AS builder

ENV POETRY_VERSION=1.3.2
RUN pip install --no-cache-dir poetry==$POETRY_VERSION
Expand All @@ -30,16 +30,16 @@ COPY pyproject.toml poetry.lock ./
RUN poetry install --only main --no-root


FROM base as production
FROM base AS production

COPY --from=builder $VENV_PATH $VENV_PATH
WORKDIR /app
COPY . .

RUN apt-get clean && find /var/lib/apt/lists/ -type f -delete && chown -R www-data /app/

ENV PROMETHEUS_PORT 9000
ENV HEALTHCHECK_SERVER_PORT 9010
ENV PROMETHEUS_PORT=9000
ENV HEALTHCHECK_SERVER_POR=9010

EXPOSE $PROMETHEUS_PORT
USER www-data
Expand Down
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Currently it supports:
> All exits will be handled as unexpected for specified keys
1. Fill `docker/validators/keys.yml` with your values
2. Set `KEYS_SOURCE=keys_file` in `.env`
2. Set `KEYS_SOURCE=file` in `.env`

> If you want to use another path, specify it in `KEYS_FILE_PATH` env variable
Expand All @@ -42,12 +42,12 @@ Currently it supports:
* **Required:** false
* **Default:** false
---
`KEYS_SOURCE` - Keys source. If `keys_api` - application will fetch keys from Keys API, if `keys_file` - application will fetch keys from `KEYS_FILE_PATH`
`KEYS_SOURCE` - Keys source. If `keys_api` - application will fetch keys from Keys API, if `file` - application will fetch keys from `KEYS_FILE_PATH`
* **Required:** false
* **Default:** keys_api
---
`KEYS_FILE_PATH` - Path to file with keys
* **Required:** if `KEYS_SOURCE` is `keys_file`
* **Required:** if `KEYS_SOURCE` is `file`
* **Default:** ./docker/validators/keys.yml
---
`CONSENSUS_CLIENT_URI` - Ethereum consensus layer comma separated API urls
Expand Down Expand Up @@ -139,6 +139,10 @@ Currently it supports:
`ALERTMANAGER_REQUEST_SLEEP_BEFORE_RETRY_IN_SECONDS` - Alertmanager request retry timeout in seconds
* **Required:** false
* **Default:** 1
---
`VALID_WITHDRAWAL_ADDRESSES` - A comma-separated list of addresses. Triggers a critical alert if a monitored execution_request contains a source_address matching any of these addresses
* **Required:** false
* **Default:** []

## Application metrics

Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ services:
- 9090

alertmanager:
image: prom/alertmanager:latest
image: prom/alertmanager:v0.25.0
container_name: alertmanager
restart: unless-stopped
networks:
Expand Down
68 changes: 68 additions & 0 deletions src/handlers/consolidation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import logging

from unsync import unsync

from src.alerts.common import CommonAlert
from src.handlers.handler import WatcherHandler
from src.handlers.helpers import beaconchain, validator_pubkey_link
from src.metrics.prometheus.duration_meter import duration_meter
from src.providers.consensus.typings import ConsolidationRequest, FullBlockInfo

logger = logging.getLogger()


class ConsolidationHandler(WatcherHandler):
@unsync
@duration_meter()
def handle(self, watcher, head: FullBlockInfo):
if not head.message.body.execution_requests or not head.message.body.execution_requests.consolidations:
logger.info({"msg": f"No consolidation requests in block [{head.message.slot}]"})
return

slot = head.message.slot
withdrawal_address, source_pubkey, target_pubkey = [], [], []
for consolidation in head.message.body.execution_requests.consolidations:
if consolidation.source_address in watcher.valid_withdrawal_addresses:
withdrawal_address.append(consolidation)
elif consolidation.source_pubkey in watcher.user_keys:
source_pubkey.append(consolidation)
elif consolidation.target_pubkey in watcher.user_keys:
target_pubkey.append(consolidation)
# in the future we should check the type of validator WC:
# if it is 0x02 and source_address == WCs of source validator - It's donation!

if withdrawal_address:
self._send_withdrawals_address(watcher, slot, withdrawal_address)
if source_pubkey:
self._send_source_pubkey(watcher, slot, source_pubkey)
if target_pubkey:
self._send_target_pubkey(watcher, slot, target_pubkey)

def _send_withdrawals_address(self, watcher, slot, consolidations: list[ConsolidationRequest]):
alert = CommonAlert(name="HeadWatcherConsolidationSourceWithdrawalAddress", severity="critical")
summary = "🚨🚨🚨 Validator consolidation was requested from Withdrawal Vault source address"
self._send_alert(watcher, slot, alert, summary, consolidations)

def _send_source_pubkey(self, watcher, slot, consolidations: list[ConsolidationRequest]):
alert = CommonAlert(name="HeadWatcherConsolidationUserSourcePubkey", severity="info")
summary = "⚠️⚠️⚠️ Consolidation was requested for our validators"
self._send_alert(watcher, slot, alert, summary, consolidations)

def _send_target_pubkey(self, watcher, slot, consolidations: list[ConsolidationRequest]):
alert = CommonAlert(name="HeadWatcherConsolidationUserTargetPubkey", severity="info")
summary = "⚠️⚠️⚠️ Someone attempts to consolidate their validators to our validators"
self._send_alert(watcher, slot, alert, summary, consolidations)

def _send_alert(self, watcher, slot: str, alert: CommonAlert, summary: str,
consolidations: list[ConsolidationRequest]):
description = '\n\n'.join(self._describe_consolidation(c, watcher.user_keys) for c in consolidations)
description += f'\n\nSlot: {beaconchain(slot)}'
self.send_alert(watcher, alert.build_body(summary, description))

@staticmethod
def _describe_consolidation(consolidation: ConsolidationRequest, keys):
return '\n'.join([
f'Request source address: {consolidation.source_address}',
f'Source: {validator_pubkey_link(consolidation.source_pubkey, keys)}',
f'Target: {validator_pubkey_link(consolidation.target_pubkey, keys)}',
])
58 changes: 58 additions & 0 deletions src/handlers/el_triggered_exit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import logging

from unsync import unsync

from src.alerts.common import CommonAlert
from src.handlers.handler import WatcherHandler
from src.handlers.helpers import beaconchain, validator_pubkey_link
from src.keys_source.base_source import NamedKey
from src.metrics.prometheus.duration_meter import duration_meter
from src.providers.consensus.typings import FullBlockInfo, WithdrawalRequest

logger = logging.getLogger()


class ElTriggeredExitHandler(WatcherHandler):
@unsync
@duration_meter()
def handle(self, watcher, head: FullBlockInfo):
if not head.message.body.execution_requests or not head.message.body.execution_requests.withdrawals:
logger.debug({"msg": f"No withdrawals requests in block [{head.message.slot}]"})
return

slot = head.message.slot
our_withdrawal_address, our_validators = [], []
for withdrawal in head.message.body.execution_requests.withdrawals:
if withdrawal.source_address in watcher.valid_withdrawal_addresses:
our_withdrawal_address.append(withdrawal)
elif withdrawal.validator_pubkey in watcher.user_keys:
our_validators.append(withdrawal)

if our_withdrawal_address:
self._send_withdrawal_address_alerts(watcher, slot, our_withdrawal_address)
if our_validators:
self._send_our_validators_alert(watcher, slot, our_validators)

def _send_withdrawal_address_alerts(self, watcher, slot: str, withdrawals: list[WithdrawalRequest]):
alert = CommonAlert(name="HeadWatcherELWithdrawalFromUserWithdrawalAddress", severity="critical")
summary = "🚨🚨🚨 Our validator triggered withdrawal was requested from our Withdrawal Vault address"
description = '\n\n'.join(map(lambda w: self._describe_withdrawal(w, watcher.user_keys), withdrawals))
self._send_alert(watcher, alert, summary, description, slot)

def _send_our_validators_alert(self, watcher, slot: str, withdrawals: list[WithdrawalRequest]):
alert = CommonAlert(name="HeadWatcherUserELWithdrawal", severity="info")
summary = "⚠️⚠️⚠️ Our validator triggered withdrawal was requested"
description = '\n\n'.join(map(lambda w: self._describe_withdrawal(w, watcher.user_keys), withdrawals))
self._send_alert(watcher, alert, summary, description, slot)

def _send_alert(self, watcher, alert: CommonAlert, summary: str, description: str, slot: str):
description += f'\n\nSlot: {beaconchain(slot)}'
self.send_alert(watcher, alert.build_body(summary, description))

@staticmethod
def _describe_withdrawal(withdrawal: WithdrawalRequest, user_keys: dict[str, NamedKey]) -> str:
return '\n'.join([
f'Source address: {withdrawal.source_address}',
f'Validator: {validator_pubkey_link(withdrawal.validator_pubkey, user_keys)}',
f'Amount: {withdrawal.amount}',
])
8 changes: 3 additions & 5 deletions src/handlers/fork.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@

from src.alerts.common import CommonAlert
from src.handlers.handler import WatcherHandler
from src.handlers.helpers import beaconchain
from src.metrics.prometheus.duration_meter import duration_meter
from src.providers.consensus.typings import BlockHeaderResponseData, ChainReorgEvent
from src.variables import NETWORK_NAME

BEACONCHAIN_URL_TEMPLATE = "[{0}](https://{1}.beaconcha.in/slot/{0})"


class ForkHandler(WatcherHandler):
Expand Down Expand Up @@ -43,7 +41,7 @@ def _send_reorg_alert(self, watcher, chain_reorg: ChainReorgEvent):
alert = CommonAlert(name="UnhandledChainReorg", severity="info")
links = "\n".join(
[
BEACONCHAIN_URL_TEMPLATE.format(s, NETWORK_NAME)
beaconchain(s)
for s in range(int(chain_reorg.slot) - int(chain_reorg.depth), int(chain_reorg.slot) + 1)
]
)
Expand All @@ -59,5 +57,5 @@ def _send_unhandled_head_alert(self, watcher, head: BlockHeaderResponseData):
if diff > 0:
additional_msg = f"\nAnd {diff} slot(s) before it"
parent_root = head.header.message.parent_root
description = f"Please, check unhandled slot: {BEACONCHAIN_URL_TEMPLATE.format(parent_root, NETWORK_NAME)}{additional_msg}"
description = f"Please, check unhandled slot: {beaconchain(parent_root)}{additional_msg}"
self.send_alert(watcher, alert.build_body(summary, description))
19 changes: 19 additions & 0 deletions src/handlers/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from src.keys_source.base_source import NamedKey
from src.variables import NETWORK_NAME

BEACONCHAIN_URL_TEMPLATE = "[{0}](https://{1}.beaconcha.in/slot/{0})"
BEACONCHAIN_VALIDATOR_URL_TEMPLATE = "[{0}](https://{1}.beaconcha.in/validator/{2})"


def beaconchain(slot) -> str:
return BEACONCHAIN_URL_TEMPLATE.format(slot, NETWORK_NAME)


def validator_link(title: str, pubkey: str) -> str:
return BEACONCHAIN_VALIDATOR_URL_TEMPLATE.format(title, NETWORK_NAME, pubkey)

def validator_pubkey_link(pubkey: str, keys: dict[str, NamedKey]) -> str:
operator = keys[pubkey].operatorName if pubkey in keys else ''
spacer = ' ' if operator else ''
title = f'{operator}{spacer}{pubkey}'
return validator_link(title, pubkey)
4 changes: 4 additions & 0 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from web3.middleware import simple_cache_middleware

from src import variables
from src.handlers.consolidation import ConsolidationHandler
from src.handlers.el_triggered_exit import ElTriggeredExitHandler
from src.handlers.exit import ExitsHandler
from src.handlers.fork import ForkHandler
from src.handlers.slashing import SlashingHandler
Expand Down Expand Up @@ -60,6 +62,8 @@ def main():
ForkHandler(),
ExitsHandler(),
# FinalityHandler(), ???
ConsolidationHandler(),
ElTriggeredExitHandler()
]
Watcher(handlers, keys_source, web3).run()

Expand Down
27 changes: 27 additions & 0 deletions src/providers/consensus/typings.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,39 @@ class BlockVoluntaryExit(Nested, FromResponse):
signature: str


@dataclass
class ConsolidationRequest(FromResponse):
source_address: str
source_pubkey: str
target_pubkey: str

@dataclass
class WithdrawalRequest(FromResponse):
source_address: str
validator_pubkey: str
amount: str

@dataclass
class DepositRequest(FromResponse):
pubkey: str
withdrawal_credentials: str
amount: str
signature: str
index: int

@dataclass
class ExecutionRequests(Nested, FromResponse):
deposits: list[DepositRequest]
withdrawals: list[WithdrawalRequest]
consolidations: list[ConsolidationRequest]

@dataclass
class BlockBody(Nested, FromResponse):
execution_payload: BlockExecutionPayload
voluntary_exits: list[BlockVoluntaryExit]
proposer_slashings: list
attester_slashings: list
execution_requests: Optional[ExecutionRequests] = None


@dataclass
Expand Down
13 changes: 12 additions & 1 deletion src/utils/dataclass.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
import functools
from dataclasses import dataclass, fields, is_dataclass
from types import GenericAlias
from typing import Callable, Self, Sequence, TypeVar
from typing import Callable, Self, Sequence, TypeVar, Union, get_args, get_origin


class DecodeToDataclassException(Exception):
pass


def try_extract_underlying_type_from_optional(field):
args = get_args(field)
types = [x for x in args if x != type(None)]
if get_origin(field) is Union and type(None) in args and len(types) == 1:
return types[0]
return None


@dataclass
class Nested:
"""
Expand All @@ -31,6 +39,9 @@ def __post_init__(self):
elif is_dataclass(field.type) and not is_dataclass(getattr(self, field.name)):
factory = self.__get_dataclass_factory(field.type)
setattr(self, field.name, factory(**getattr(self, field.name)))
elif getattr(self, field.name) and (underlying := try_extract_underlying_type_from_optional(field.type)):
factory = self.__get_dataclass_factory(underlying)
setattr(self, field.name, factory(**getattr(self, field.name)))

@staticmethod
def __get_dataclass_factory(field_type):
Expand Down
2 changes: 2 additions & 0 deletions src/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@

LIDO_LOCATOR_ADDRESS = os.getenv('LIDO_LOCATOR_ADDRESS', '')

VALID_WITHDRAWAL_ADDRESSES = [x.lower() for x in os.getenv('VALID_WITHDRAWAL_ADDRESSES', '').split(',') if x]

# - Metrics -
PROMETHEUS_PORT = int(os.getenv('PROMETHEUS_PORT', 9000))
PROMETHEUS_PREFIX = os.getenv("PROMETHEUS_PREFIX", "ethereum_head_watcher")
Expand Down
Loading

0 comments on commit 6b973ff

Please sign in to comment.