From 5d70f0aaca421ec8090a094a2d4ee80eb81603c7 Mon Sep 17 00:00:00 2001 From: Adal Chiriliuc Date: Wed, 23 Oct 2024 22:10:17 +0300 Subject: [PATCH 1/8] added fv_protocol package --- .../compute_horde/fv_protocol/__init__.py | 0 .../fv_protocol/facilitator_requests.py | 96 +++++++++++++++++++ .../fv_protocol/validator_requests.py | 35 +++++++ 3 files changed, 131 insertions(+) create mode 100644 compute_horde/compute_horde/fv_protocol/__init__.py create mode 100644 compute_horde/compute_horde/fv_protocol/facilitator_requests.py create mode 100644 compute_horde/compute_horde/fv_protocol/validator_requests.py diff --git a/compute_horde/compute_horde/fv_protocol/__init__.py b/compute_horde/compute_horde/fv_protocol/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/compute_horde/compute_horde/fv_protocol/facilitator_requests.py b/compute_horde/compute_horde/fv_protocol/facilitator_requests.py new file mode 100644 index 000000000..c99508cf8 --- /dev/null +++ b/compute_horde/compute_horde/fv_protocol/facilitator_requests.py @@ -0,0 +1,96 @@ +from typing import Annotated, Literal, Self + +import pydantic +from pydantic import BaseModel, model_validator + +from compute_horde.base.output_upload import OutputUpload, ZipAndHttpPutUpload +from compute_horde.base.volume import Volume, ZipUrlVolume +from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS, ExecutorClass + + +class Error(BaseModel, extra="allow"): + msg: str + type: str + help: str = "" + + +class Response(BaseModel, extra="forbid"): + """Message sent from facilitator to validator in response to AuthenticationRequest & JobStatusUpdate""" + + status: Literal["error", "success"] + errors: list[Error] = [] + + +class V0FacilitatorJobRequest(BaseModel, extra="forbid"): + """Message sent from facilitator to validator to request a job execution""" + + # this points to a `ValidatorConsumer.job_new` handler (fuck you django-channels!) + type: Literal["job.new"] = "job.new" + message_type: Literal["V0JobRequest"] = "V0JobRequest" + + uuid: str + miner_hotkey: str + # TODO: remove default after we add executor class support to facilitator + executor_class: ExecutorClass = DEFAULT_EXECUTOR_CLASS + docker_image: str + raw_script: str + args: list[str] + env: dict[str, str] + use_gpu: bool + input_url: str + output_url: str + + def get_args(self): + return self.args + + @model_validator(mode="after") + def validate_at_least_docker_image_or_raw_script(self) -> Self: + if not (bool(self.docker_image) or bool(self.raw_script)): + raise ValueError("Expected at least one of `docker_image` or `raw_script`") + return self + + @property + def volume(self) -> Volume | None: + if self.input_url: + return ZipUrlVolume(contents=self.input_url) + return None + + @property + def output_upload(self) -> OutputUpload | None: + if self.output_url: + return ZipAndHttpPutUpload(url=self.output_url) + return None + + +class V1FacilitatorJobRequest(BaseModel, extra="forbid"): + """Message sent from facilitator to validator to request a job execution""" + + # this points to a `ValidatorConsumer.job_new` handler (fuck you django-channels!) + type: Literal["job.new"] = "job.new" + message_type: Literal["V1JobRequest"] = "V1JobRequest" + uuid: str + miner_hotkey: str + # TODO: remove default after we add executor class support to facilitator + executor_class: ExecutorClass = DEFAULT_EXECUTOR_CLASS + docker_image: str + raw_script: str + args: list[str] + env: dict[str, str] + use_gpu: bool + volume: Volume | None = None + output_upload: OutputUpload | None = None + + def get_args(self): + return self.args + + @model_validator(mode="after") + def validate_at_least_docker_image_or_raw_script(self) -> Self: + if not (bool(self.docker_image) or bool(self.raw_script)): + raise ValueError("Expected at least one of `docker_image` or `raw_script`") + return self + + +JobRequest = Annotated[ + V0FacilitatorJobRequest | V1FacilitatorJobRequest, + pydantic.Field(discriminator="message_type"), +] diff --git a/compute_horde/compute_horde/fv_protocol/validator_requests.py b/compute_horde/compute_horde/fv_protocol/validator_requests.py new file mode 100644 index 000000000..5bad44219 --- /dev/null +++ b/compute_horde/compute_horde/fv_protocol/validator_requests.py @@ -0,0 +1,35 @@ +from typing import Any, Self + +import bittensor +from pydantic import BaseModel + + +class Heartbeat(BaseModel, extra="forbid"): + """Message sent from validator to facilitator to keep connection alive""" + + message_type: str = "V0Heartbeat" + + +class AuthenticationRequest(BaseModel, extra="forbid"): + """Message sent from validator to facilitator to authenticate itself""" + + message_type: str = "V0AuthenticationRequest" + public_key: str + signature: str + + @classmethod + def from_keypair(cls, keypair: bittensor.Keypair) -> Self: + return cls( + public_key=keypair.public_key.hex(), + signature=f"0x{keypair.sign(keypair.public_key).hex()}", + ) + + +class MachineSpecsUpdate(BaseModel, extra="forbid"): + """Message sent from validator to facilitator to update miner specs""" + + message_type: str = "V0MachineSpecsUpdate" + miner_hotkey: str + validator_hotkey: str + specs: dict[str, Any] + batch_id: str From f2d31f38db3cbbb81b772f45745eb4aa2d971026 Mon Sep 17 00:00:00 2001 From: Adal Chiriliuc Date: Wed, 23 Oct 2024 22:10:53 +0300 Subject: [PATCH 2/8] replaced facilitator_api with fv_protocol --- .../validator/organic_jobs/facilitator_api.py | 123 ------------------ .../organic_jobs/facilitator_client.py | 14 +- .../validator/organic_jobs/miner_driver.py | 8 +- .../validator/tests/helpers.py | 8 +- .../tests/test_facilitator_client.py | 9 +- 5 files changed, 17 insertions(+), 145 deletions(-) delete mode 100644 validator/app/src/compute_horde_validator/validator/organic_jobs/facilitator_api.py diff --git a/validator/app/src/compute_horde_validator/validator/organic_jobs/facilitator_api.py b/validator/app/src/compute_horde_validator/validator/organic_jobs/facilitator_api.py deleted file mode 100644 index fa2028e07..000000000 --- a/validator/app/src/compute_horde_validator/validator/organic_jobs/facilitator_api.py +++ /dev/null @@ -1,123 +0,0 @@ -from typing import Annotated, Any, Literal, Self - -import bittensor -import pydantic -from compute_horde.base.output_upload import OutputUpload, ZipAndHttpPutUpload -from compute_horde.base.volume import Volume, ZipUrlVolume -from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS, ExecutorClass -from pydantic import BaseModel, model_validator - - -class Error(BaseModel, extra="allow"): - msg: str - type: str - help: str = "" - - -class Response(BaseModel, extra="forbid"): - """Message sent from facilitator to validator in response to AuthenticationRequest & JobStatusUpdate""" - - status: Literal["error", "success"] - errors: list[Error] = [] - - -class AuthenticationRequest(BaseModel, extra="forbid"): - """Message sent from validator to facilitator to authenticate itself""" - - message_type: str = "V0AuthenticationRequest" - public_key: str - signature: str - - @classmethod - def from_keypair(cls, keypair: bittensor.Keypair) -> Self: - return cls( - public_key=keypair.public_key.hex(), - signature=f"0x{keypair.sign(keypair.public_key).hex()}", - ) - - -class V0FacilitatorJobRequest(BaseModel, extra="forbid"): - """Message sent from facilitator to validator to request a job execution""" - - # this points to a `ValidatorConsumer.job_new` handler (fuck you django-channels!) - type: Literal["job.new"] = "job.new" - message_type: Literal["V0JobRequest"] = "V0JobRequest" - - uuid: str - miner_hotkey: str - # TODO: remove default after we add executor class support to facilitator - executor_class: ExecutorClass = DEFAULT_EXECUTOR_CLASS - docker_image: str - raw_script: str - args: list[str] - env: dict[str, str] - use_gpu: bool - input_url: str - output_url: str - - def get_args(self): - return self.args - - @model_validator(mode="after") - def validate_at_least_docker_image_or_raw_script(self) -> Self: - if not (bool(self.docker_image) or bool(self.raw_script)): - raise ValueError("Expected at least one of `docker_image` or `raw_script`") - return self - - @property - def volume(self) -> Volume | None: - if self.input_url: - return ZipUrlVolume(contents=self.input_url) - return None - - @property - def output_upload(self) -> OutputUpload | None: - if self.output_url: - return ZipAndHttpPutUpload(url=self.output_url) - return None - - -class V1FacilitatorJobRequest(BaseModel, extra="forbid"): - """Message sent from facilitator to validator to request a job execution""" - - # this points to a `ValidatorConsumer.job_new` handler (fuck you django-channels!) - type: Literal["job.new"] = "job.new" - message_type: Literal["V1JobRequest"] = "V1JobRequest" - uuid: str - miner_hotkey: str - # TODO: remove default after we add executor class support to facilitator - executor_class: ExecutorClass = DEFAULT_EXECUTOR_CLASS - docker_image: str - raw_script: str - args: list[str] - env: dict[str, str] - use_gpu: bool - volume: Volume | None = None - output_upload: OutputUpload | None = None - - def get_args(self): - return self.args - - @model_validator(mode="after") - def validate_at_least_docker_image_or_raw_script(self) -> Self: - if not (bool(self.docker_image) or bool(self.raw_script)): - raise ValueError("Expected at least one of `docker_image` or `raw_script`") - return self - - -JobRequest = Annotated[ - V0FacilitatorJobRequest | V1FacilitatorJobRequest, - pydantic.Field(discriminator="message_type"), -] - - -class Heartbeat(BaseModel, extra="forbid"): - message_type: str = "V0Heartbeat" - - -class MachineSpecsUpdate(BaseModel, extra="forbid"): - message_type: str = "V0MachineSpecsUpdate" - miner_hotkey: str - validator_hotkey: str - specs: dict[str, Any] - batch_id: str diff --git a/validator/app/src/compute_horde_validator/validator/organic_jobs/facilitator_client.py b/validator/app/src/compute_horde_validator/validator/organic_jobs/facilitator_client.py index d620ed3b3..15ff6755d 100644 --- a/validator/app/src/compute_horde_validator/validator/organic_jobs/facilitator_client.py +++ b/validator/app/src/compute_horde_validator/validator/organic_jobs/facilitator_client.py @@ -8,6 +8,12 @@ import tenacity import websockets from channels.layers import get_channel_layer +from compute_horde.fv_protocol.facilitator_requests import Error, JobRequest, Response +from compute_horde.fv_protocol.validator_requests import ( + AuthenticationRequest, + Heartbeat, + MachineSpecsUpdate, +) from django.conf import settings from pydantic import BaseModel @@ -16,14 +22,6 @@ get_miner_axon_info, ) from compute_horde_validator.validator.models import Miner, OrganicJob, SystemEvent -from compute_horde_validator.validator.organic_jobs.facilitator_api import ( - AuthenticationRequest, - Error, - Heartbeat, - JobRequest, - MachineSpecsUpdate, - Response, -) from compute_horde_validator.validator.organic_jobs.miner_client import MinerClient from compute_horde_validator.validator.organic_jobs.miner_driver import execute_organic_job from compute_horde_validator.validator.utils import MACHINE_SPEC_CHANNEL diff --git a/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver.py b/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver.py index a4d9057c0..475b61488 100644 --- a/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver.py +++ b/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver.py @@ -4,6 +4,10 @@ from typing import Literal from compute_horde.executor_class import ExecutorClass +from compute_horde.fv_protocol.facilitator_requests import ( + V0FacilitatorJobRequest, + V1FacilitatorJobRequest, +) from compute_horde.miner_client.organic import ( FailureReason, OrganicJobDetails, @@ -20,10 +24,6 @@ OrganicJob, SystemEvent, ) -from compute_horde_validator.validator.organic_jobs.facilitator_api import ( - V0FacilitatorJobRequest, - V1FacilitatorJobRequest, -) from compute_horde_validator.validator.organic_jobs.miner_client import MinerClient logger = logging.getLogger(__name__) diff --git a/validator/app/src/compute_horde_validator/validator/tests/helpers.py b/validator/app/src/compute_horde_validator/validator/tests/helpers.py index 6ec366dcd..179234d9f 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/helpers.py +++ b/validator/app/src/compute_horde_validator/validator/tests/helpers.py @@ -14,6 +14,10 @@ import numpy as np from bittensor import Balance from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS +from compute_horde.fv_protocol.facilitator_requests import ( + V0FacilitatorJobRequest, + V1FacilitatorJobRequest, +) from compute_horde.mv_protocol.miner_requests import ( V0AcceptJobRequest, V0ExecutorReadyRequest, @@ -24,10 +28,6 @@ from substrateinterface.exceptions import SubstrateRequestException from compute_horde_validator.validator.models import SystemEvent -from compute_horde_validator.validator.organic_jobs.facilitator_api import ( - V0FacilitatorJobRequest, - V1FacilitatorJobRequest, -) from compute_horde_validator.validator.organic_jobs.miner_client import MinerClient from compute_horde_validator.validator.synthetic_jobs import batch_run diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_facilitator_client.py b/validator/app/src/compute_horde_validator/validator/tests/test_facilitator_client.py index 9b7175837..4e6bfdde8 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_facilitator_client.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_facilitator_client.py @@ -6,14 +6,11 @@ import pytest import websockets from channels.layers import get_channel_layer +from compute_horde.fv_protocol.facilitator_requests import Response +from compute_horde.fv_protocol.validator_requests import AuthenticationRequest, MachineSpecsUpdate from compute_horde_validator.validator.models import OrganicJob -from compute_horde_validator.validator.organic_jobs.facilitator_api import MachineSpecsUpdate -from compute_horde_validator.validator.organic_jobs.facilitator_client import ( - AuthenticationRequest, - FacilitatorClient, - Response, -) +from compute_horde_validator.validator.organic_jobs.facilitator_client import FacilitatorClient from compute_horde_validator.validator.organic_jobs.miner_driver import JobStatusUpdate from compute_horde_validator.validator.utils import MACHINE_SPEC_CHANNEL From 9cd26601de9c3ddfb122d11434709963432bc397 Mon Sep 17 00:00:00 2001 From: Adal Chiriliuc Date: Wed, 23 Oct 2024 22:28:56 +0300 Subject: [PATCH 3/8] renamed facilitator protocol message classes --- .../fv_protocol/facilitator_requests.py | 6 +++--- .../fv_protocol/validator_requests.py | 6 +++--- docs/facilitator-protocol.md | 2 +- .../validator/organic_jobs/facilitator_client.py | 16 ++++++++-------- .../validator/organic_jobs/miner_driver.py | 6 +++--- .../validator/tests/helpers.py | 12 ++++++------ .../validator/tests/test_facilitator_client.py | 11 +++++++---- 7 files changed, 31 insertions(+), 28 deletions(-) diff --git a/compute_horde/compute_horde/fv_protocol/facilitator_requests.py b/compute_horde/compute_horde/fv_protocol/facilitator_requests.py index c99508cf8..8e800a4db 100644 --- a/compute_horde/compute_horde/fv_protocol/facilitator_requests.py +++ b/compute_horde/compute_horde/fv_protocol/facilitator_requests.py @@ -21,7 +21,7 @@ class Response(BaseModel, extra="forbid"): errors: list[Error] = [] -class V0FacilitatorJobRequest(BaseModel, extra="forbid"): +class V0JobRequest(BaseModel, extra="forbid"): """Message sent from facilitator to validator to request a job execution""" # this points to a `ValidatorConsumer.job_new` handler (fuck you django-channels!) @@ -62,7 +62,7 @@ def output_upload(self) -> OutputUpload | None: return None -class V1FacilitatorJobRequest(BaseModel, extra="forbid"): +class V1JobRequest(BaseModel, extra="forbid"): """Message sent from facilitator to validator to request a job execution""" # this points to a `ValidatorConsumer.job_new` handler (fuck you django-channels!) @@ -91,6 +91,6 @@ def validate_at_least_docker_image_or_raw_script(self) -> Self: JobRequest = Annotated[ - V0FacilitatorJobRequest | V1FacilitatorJobRequest, + V0JobRequest | V1JobRequest, pydantic.Field(discriminator="message_type"), ] diff --git a/compute_horde/compute_horde/fv_protocol/validator_requests.py b/compute_horde/compute_horde/fv_protocol/validator_requests.py index 5bad44219..b5d4accda 100644 --- a/compute_horde/compute_horde/fv_protocol/validator_requests.py +++ b/compute_horde/compute_horde/fv_protocol/validator_requests.py @@ -4,13 +4,13 @@ from pydantic import BaseModel -class Heartbeat(BaseModel, extra="forbid"): +class V0Heartbeat(BaseModel, extra="forbid"): """Message sent from validator to facilitator to keep connection alive""" message_type: str = "V0Heartbeat" -class AuthenticationRequest(BaseModel, extra="forbid"): +class V0AuthenticationRequest(BaseModel, extra="forbid"): """Message sent from validator to facilitator to authenticate itself""" message_type: str = "V0AuthenticationRequest" @@ -25,7 +25,7 @@ def from_keypair(cls, keypair: bittensor.Keypair) -> Self: ) -class MachineSpecsUpdate(BaseModel, extra="forbid"): +class V0MachineSpecsUpdate(BaseModel, extra="forbid"): """Message sent from validator to facilitator to update miner specs""" message_type: str = "V0MachineSpecsUpdate" diff --git a/docs/facilitator-protocol.md b/docs/facilitator-protocol.md index ce0eea17b..ebf38a49d 100644 --- a/docs/facilitator-protocol.md +++ b/docs/facilitator-protocol.md @@ -43,7 +43,7 @@ sequenceDiagram end validator->>facilitator: V0Heartbeat - validator->>facilitator: MachineSpecsUpdate + validator->>facilitator: V0MachineSpecsUpdate ``` ## `V0AuthenticationRequest` message diff --git a/validator/app/src/compute_horde_validator/validator/organic_jobs/facilitator_client.py b/validator/app/src/compute_horde_validator/validator/organic_jobs/facilitator_client.py index 15ff6755d..9b41e11c3 100644 --- a/validator/app/src/compute_horde_validator/validator/organic_jobs/facilitator_client.py +++ b/validator/app/src/compute_horde_validator/validator/organic_jobs/facilitator_client.py @@ -10,9 +10,9 @@ from channels.layers import get_channel_layer from compute_horde.fv_protocol.facilitator_requests import Error, JobRequest, Response from compute_horde.fv_protocol.validator_requests import ( - AuthenticationRequest, - Heartbeat, - MachineSpecsUpdate, + V0AuthenticationRequest, + V0Heartbeat, + V0MachineSpecsUpdate, ) from django.conf import settings from pydantic import BaseModel @@ -128,14 +128,14 @@ async def run_forever(self): async def handle_connection(self, ws: websockets.WebSocketClientProtocol): """handle a single websocket connection""" - await ws.send(AuthenticationRequest.from_keypair(self.keypair).model_dump_json()) + await ws.send(V0AuthenticationRequest.from_keypair(self.keypair).model_dump_json()) raw_msg = await ws.recv() try: response = Response.model_validate_json(raw_msg) except pydantic.ValidationError as exc: raise AuthenticationError( - "did not receive Response for AuthenticationRequest", [] + "did not receive Response for V0AuthenticationRequest", [] ) from exc if response.status != "success": raise AuthenticationError("auth request received failed response", response.errors) @@ -146,7 +146,7 @@ async def handle_connection(self, ws: websockets.WebSocketClientProtocol): await self.handle_message(raw_msg) async def wait_for_specs(self): - specs_queue: deque[MachineSpecsUpdate] = deque() + specs_queue: deque[V0MachineSpecsUpdate] = deque() channel_layer = get_channel_layer() while True: @@ -156,7 +156,7 @@ async def wait_for_specs(self): channel_layer.receive(MACHINE_SPEC_CHANNEL), timeout=20 * 60 ) - specs = MachineSpecsUpdate( + specs = V0MachineSpecsUpdate( specs=msg["specs"], miner_hotkey=msg["miner_hotkey"], batch_id=msg["batch_id"], @@ -190,7 +190,7 @@ async def heartbeat(self): while True: if self.ws is not None: try: - await self.send_model(Heartbeat()) + await self.send_model(V0Heartbeat()) except Exception as exc: msg = f"Error occurred while sending heartbeat: {exc}" logger.warning(msg) diff --git a/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver.py b/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver.py index 475b61488..6cb212eb6 100644 --- a/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver.py +++ b/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver.py @@ -5,8 +5,8 @@ from compute_horde.executor_class import ExecutorClass from compute_horde.fv_protocol.facilitator_requests import ( - V0FacilitatorJobRequest, - V1FacilitatorJobRequest, + V0JobRequest, + V1JobRequest, ) from compute_horde.miner_client.organic import ( FailureReason, @@ -93,7 +93,7 @@ async def _dummy_notify_callback(_: JobStatusUpdate) -> None: async def execute_organic_job( miner_client: MinerClient, job: OrganicJob, - job_request: V0FacilitatorJobRequest | V1FacilitatorJobRequest | AdminJobRequest, + job_request: V0JobRequest | V1JobRequest | AdminJobRequest, total_job_timeout: int = 300, wait_timeout: int = 300, notify_callback: Callable[[JobStatusUpdate], Awaitable[None]] = _dummy_notify_callback, diff --git a/validator/app/src/compute_horde_validator/validator/tests/helpers.py b/validator/app/src/compute_horde_validator/validator/tests/helpers.py index 179234d9f..ed5588fe7 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/helpers.py +++ b/validator/app/src/compute_horde_validator/validator/tests/helpers.py @@ -15,8 +15,8 @@ from bittensor import Balance from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS from compute_horde.fv_protocol.facilitator_requests import ( - V0FacilitatorJobRequest, - V1FacilitatorJobRequest, + V0JobRequest, + V1JobRequest, ) from compute_horde.mv_protocol.miner_requests import ( V0AcceptJobRequest, @@ -135,8 +135,8 @@ def __init__(self, *args, **kwargs): ) -def get_dummy_job_request_v0(uuid: str) -> V0FacilitatorJobRequest: - return V0FacilitatorJobRequest( +def get_dummy_job_request_v0(uuid: str) -> V0JobRequest: + return V0JobRequest( type="job.new", uuid=uuid, miner_hotkey="miner_hotkey", @@ -151,8 +151,8 @@ def get_dummy_job_request_v0(uuid: str) -> V0FacilitatorJobRequest: ) -def get_dummy_job_request_v1(uuid: str) -> V1FacilitatorJobRequest: - return V1FacilitatorJobRequest( +def get_dummy_job_request_v1(uuid: str) -> V1JobRequest: + return V1JobRequest( type="job.new", uuid=uuid, miner_hotkey="miner_hotkey", diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_facilitator_client.py b/validator/app/src/compute_horde_validator/validator/tests/test_facilitator_client.py index 4e6bfdde8..d4a1bc1c5 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_facilitator_client.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_facilitator_client.py @@ -7,7 +7,10 @@ import websockets from channels.layers import get_channel_layer from compute_horde.fv_protocol.facilitator_requests import Response -from compute_horde.fv_protocol.validator_requests import AuthenticationRequest, MachineSpecsUpdate +from compute_horde.fv_protocol.validator_requests import ( + V0AuthenticationRequest, + V0MachineSpecsUpdate, +) from compute_horde_validator.validator.models import OrganicJob from compute_horde_validator.validator.organic_jobs.facilitator_client import FacilitatorClient @@ -57,7 +60,7 @@ async def serve(self, ws): # auth response = await asyncio.wait_for(ws.recv(), timeout=5) try: - AuthenticationRequest.model_validate_json(response) + V0AuthenticationRequest.model_validate_json(response) except Exception as e: self.facilitator_error = e @@ -167,7 +170,7 @@ class FacilitatorExpectMachineSpecsWs(FacilitatorWs): async def serve(self, ws, path): response = await asyncio.wait_for(ws.recv(), timeout=5) try: - AuthenticationRequest.model_validate_json(response) + V0AuthenticationRequest.model_validate_json(response) except Exception as e: self.facilitator_error = e @@ -175,7 +178,7 @@ async def serve(self, ws, path): async for message in ws: try: - MachineSpecsUpdate.model_validate_json(message) + V0MachineSpecsUpdate.model_validate_json(message) except Exception: continue else: From eff12ab29fa42610234c5244e15dd9c34c5a2201 Mon Sep 17 00:00:00 2001 From: Adal Chiriliuc Date: Wed, 23 Oct 2024 22:46:56 +0300 Subject: [PATCH 4/8] copied methods from facilitator --- .../fv_protocol/validator_requests.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/compute_horde/compute_horde/fv_protocol/validator_requests.py b/compute_horde/compute_horde/fv_protocol/validator_requests.py index b5d4accda..c57f8385c 100644 --- a/compute_horde/compute_horde/fv_protocol/validator_requests.py +++ b/compute_horde/compute_horde/fv_protocol/validator_requests.py @@ -24,6 +24,21 @@ def from_keypair(cls, keypair: bittensor.Keypair) -> Self: signature=f"0x{keypair.sign(keypair.public_key).hex()}", ) + def verify_signature(self) -> bool: + public_key_bytes = bytes.fromhex(self.public_key) + keypair = bittensor.Keypair(public_key=public_key_bytes, ss58_format=42) + # make mypy happy + valid: bool = keypair.verify(public_key_bytes, self.signature) + return valid + + @property + def ss58_address(self) -> str: + # make mypy happy + address: str = bittensor.Keypair( + public_key=bytes.fromhex(self.public_key), ss58_format=42 + ).ss58_address + return address + class V0MachineSpecsUpdate(BaseModel, extra="forbid"): """Message sent from validator to facilitator to update miner specs""" From 8e64470163cd732b2e334de5f49b17d3bee1abe4 Mon Sep 17 00:00:00 2001 From: Adal Chiriliuc Date: Wed, 23 Oct 2024 23:09:20 +0300 Subject: [PATCH 5/8] removed obsolete defaults --- .../compute_horde/fv_protocol/facilitator_requests.py | 8 +++----- .../compute_horde_validator/validator/tests/helpers.py | 1 + 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/compute_horde/compute_horde/fv_protocol/facilitator_requests.py b/compute_horde/compute_horde/fv_protocol/facilitator_requests.py index 8e800a4db..5ef911973 100644 --- a/compute_horde/compute_horde/fv_protocol/facilitator_requests.py +++ b/compute_horde/compute_horde/fv_protocol/facilitator_requests.py @@ -5,7 +5,7 @@ from compute_horde.base.output_upload import OutputUpload, ZipAndHttpPutUpload from compute_horde.base.volume import Volume, ZipUrlVolume -from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS, ExecutorClass +from compute_horde.executor_class import ExecutorClass class Error(BaseModel, extra="allow"): @@ -30,8 +30,7 @@ class V0JobRequest(BaseModel, extra="forbid"): uuid: str miner_hotkey: str - # TODO: remove default after we add executor class support to facilitator - executor_class: ExecutorClass = DEFAULT_EXECUTOR_CLASS + executor_class: ExecutorClass docker_image: str raw_script: str args: list[str] @@ -70,8 +69,7 @@ class V1JobRequest(BaseModel, extra="forbid"): message_type: Literal["V1JobRequest"] = "V1JobRequest" uuid: str miner_hotkey: str - # TODO: remove default after we add executor class support to facilitator - executor_class: ExecutorClass = DEFAULT_EXECUTOR_CLASS + executor_class: ExecutorClass docker_image: str raw_script: str args: list[str] diff --git a/validator/app/src/compute_horde_validator/validator/tests/helpers.py b/validator/app/src/compute_horde_validator/validator/tests/helpers.py index ed5588fe7..874b49f47 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/helpers.py +++ b/validator/app/src/compute_horde_validator/validator/tests/helpers.py @@ -156,6 +156,7 @@ def get_dummy_job_request_v1(uuid: str) -> V1JobRequest: type="job.new", uuid=uuid, miner_hotkey="miner_hotkey", + executor_class=DEFAULT_EXECUTOR_CLASS, docker_image="nvidia", raw_script="print('hello world')", args=[], From b7f01baf7bc0d75a0529968031225161bc1669ad Mon Sep 17 00:00:00 2001 From: Adal Chiriliuc Date: Thu, 24 Oct 2024 10:39:28 +0300 Subject: [PATCH 6/8] added V2JobRequest --- .../fv_protocol/facilitator_requests.py | 38 ++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/compute_horde/compute_horde/fv_protocol/facilitator_requests.py b/compute_horde/compute_horde/fv_protocol/facilitator_requests.py index 5ef911973..d4a07eca0 100644 --- a/compute_horde/compute_horde/fv_protocol/facilitator_requests.py +++ b/compute_horde/compute_horde/fv_protocol/facilitator_requests.py @@ -1,7 +1,7 @@ from typing import Annotated, Literal, Self import pydantic -from pydantic import BaseModel, model_validator +from pydantic import BaseModel, JsonValue, model_validator from compute_horde.base.output_upload import OutputUpload, ZipAndHttpPutUpload from compute_horde.base.volume import Volume, ZipUrlVolume @@ -21,6 +21,14 @@ class Response(BaseModel, extra="forbid"): errors: list[Error] = [] +class SignedRequest(BaseModel, extra="forbid"): + signature_type: str + signatory: str + timestamp_ns: int + signature: str + signed_payload: JsonValue + + class V0JobRequest(BaseModel, extra="forbid"): """Message sent from facilitator to validator to request a job execution""" @@ -88,6 +96,34 @@ def validate_at_least_docker_image_or_raw_script(self) -> Self: return self +class V2JobRequest(BaseModel, extra="forbid"): + """Message sent from facilitator to validator to request a job execution""" + + # this points to a `ValidatorConsumer.job_new` handler (fuck you django-channels!) + type: Literal["job.new"] = "job.new" + message_type: Literal["V2JobRequest"] = "V2JobRequest" + uuid: str + miner_hotkey: str | None + executor_class: ExecutorClass + docker_image: str + raw_script: str + args: list[str] + env: dict[str, str] + use_gpu: bool + volume: Volume | None = None + output_upload: OutputUpload | None = None + signed_request: SignedRequest + + def get_args(self): + return self.args + + @model_validator(mode="after") + def validate_at_least_docker_image_or_raw_script(self) -> Self: + if not (bool(self.docker_image) or bool(self.raw_script)): + raise ValueError("Expected at least one of `docker_image` or `raw_script`") + return self + + JobRequest = Annotated[ V0JobRequest | V1JobRequest, pydantic.Field(discriminator="message_type"), From fb0dc141fa54c07c77077a3b2b6e869e14d0b963 Mon Sep 17 00:00:00 2001 From: Adal Chiriliuc Date: Thu, 24 Oct 2024 10:46:06 +0300 Subject: [PATCH 7/8] use pydantic.JsonValue --- compute_horde/compute_horde/signature.py | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/compute_horde/compute_horde/signature.py b/compute_horde/compute_horde/signature.py index f3d7134d9..0a496d9f5 100644 --- a/compute_horde/compute_horde/signature.py +++ b/compute_horde/compute_horde/signature.py @@ -12,15 +12,11 @@ from typing import ClassVar, Protocol from class_registry import ClassRegistry, RegistryKeyError +from pydantic import JsonValue if typing.TYPE_CHECKING: import bittensor -JSONValue = str | int | float | bool | None -JSONDict = dict[str, "JSONType"] -JSONArray = list["JSONType"] -JSONType = JSONValue | JSONDict | JSONArray - SIGNERS_REGISTRY: ClassRegistry[Signer] = ClassRegistry("signature_type") VERIFIERS_REGISTRY: ClassRegistry[Verifier] = ClassRegistry("signature_type") @@ -34,7 +30,7 @@ class Signature: def verify_signature( - payload: JSONType | bytes, + payload: JsonValue | bytes, signature: Signature, *, newer_than: datetime.datetime | None = None, @@ -87,7 +83,7 @@ def verify_request( method: str, url: str, headers: dict[str, str], - json: JSONType | None = None, + json: JsonValue | None = None, *, newer_than: datetime.datetime | None = None, signature_extractor: SignatureExtractor = signature_from_headers, @@ -150,7 +146,7 @@ class SignatureTimeoutException(SignatureInvalidException): pass -def hash_message_signature(payload: bytes | JSONType, signature: Signature) -> bytes: +def hash_message_signature(payload: bytes | JsonValue, signature: Signature) -> bytes: """ Hashes the message to be signed with the signature parameters @@ -171,8 +167,8 @@ def hash_message_signature(payload: bytes | JSONType, signature: Signature) -> b def signature_payload( - method: str, url: str, headers: dict[str, str], json: JSONType | None = None -) -> JSONType: + method: str, url: str, headers: dict[str, str], json: JsonValue | None = None +) -> JsonValue: reduced_url = _REMOVE_URL_SCHEME_N_HOST_RE.sub("", url) return { "action": f"{method.upper()} {reduced_url}", @@ -188,7 +184,7 @@ def payload_from_request( method: str, url: str, headers: dict[str, str], - json: JSONType | None = None, + json: JsonValue | None = None, ): return signature_payload( method=method, @@ -199,7 +195,7 @@ def payload_from_request( class Signer(SignatureScheme): - def sign(self, payload: JSONType | bytes) -> Signature: + def sign(self, payload: JsonValue | bytes) -> Signature: signature = Signature( signature_type=self.signature_type, signatory=self.get_signatory(), @@ -211,7 +207,7 @@ def sign(self, payload: JSONType | bytes) -> Signature: return signature def signature_for_request( - self, method: str, url: str, headers: dict[str, str], json: JSONType | None = None + self, method: str, url: str, headers: dict[str, str], json: JsonValue | None = None ) -> Signature: return self.sign(self.payload_from_request(method, url, headers=headers, json=json)) @@ -227,7 +223,7 @@ def get_signatory(self) -> str: class Verifier(SignatureScheme): def verify( self, - payload: JSONType | bytes, + payload: JsonValue | bytes, signature: Signature, newer_than: datetime.datetime | None = None, ): From bde23eec2566c8bef70bb3241c4daebcfc448a26 Mon Sep 17 00:00:00 2001 From: Adal Chiriliuc Date: Thu, 24 Oct 2024 10:30:21 +0300 Subject: [PATCH 8/8] updated .gitignore --- .gitignore | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index b6a7e4a5c..7c2e879fc 100644 --- a/.gitignore +++ b/.gitignore @@ -4,13 +4,18 @@ *.egg-info/ .idea/ .env -venv +venv/ +.venv/ .hypothesis .envrc .nox/ -__pycache__ -build -dist +.mypy_cache/ +.pdm-build/ +.pytest_cache/ +.ruff_cache/ +__pycache__/ +build/ +dist/ .pdm-python /wallets/ /facilitator/