Skip to content

Commit

Permalink
Merge pull request #157 from backend-developers-ltd/startreceipt
Browse files Browse the repository at this point in the history
add job started receipt
  • Loading branch information
adal-chiriliuc-reef authored Jul 11, 2024
2 parents 369cbb6 + b55a9ee commit dd2e1ae
Show file tree
Hide file tree
Showing 23 changed files with 670 additions and 202 deletions.
45 changes: 36 additions & 9 deletions compute_horde/compute_horde/mv_protocol/validator_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
from typing import Self

import pydantic
from pydantic import model_validator
from pydantic import field_serializer, model_validator

from ..base.output_upload import OutputUpload, OutputUploadType # noqa
from ..base.output_upload import OutputUpload # noqa
from ..base.volume import Volume, VolumeType
from ..base_requests import BaseRequest, JobMixin
from ..executor_class import ExecutorClass
Expand All @@ -21,7 +21,8 @@ class RequestType(enum.Enum):
V0InitialJobRequest = "V0InitialJobRequest"
V0MachineSpecsRequest = "V0MachineSpecsRequest"
V0JobRequest = "V0JobRequest"
V0ReceiptRequest = "V0ReceiptRequest"
V0JobFinishedReceiptRequest = "V0JobFinishedReceiptRequest"
V0JobStartedReceiptRequest = "V0JobStartedReceiptRequest"
GenericError = "GenericError"


Expand Down Expand Up @@ -87,14 +88,17 @@ class ReceiptPayload(pydantic.BaseModel):
job_uuid: str
miner_hotkey: str
validator_hotkey: str
time_started: datetime.datetime
time_took_us: int # micro-seconds
score_str: str

def blob_for_signing(self):
# pydantic v2 does not support sort_keys anymore.
return json.dumps(self.model_dump(), sort_keys=True, default=_json_dumps_default)


class JobFinishedReceiptPayload(ReceiptPayload):
time_started: datetime.datetime
time_took_us: int # micro-seconds
score_str: str

@property
def time_took(self):
return datetime.timedelta(microseconds=self.time_took_us)
Expand All @@ -103,10 +107,33 @@ def time_took(self):
def score(self):
return float(self.score_str)

@field_serializer("time_started")
def serialize_dt(self, dt: datetime.datetime, _info):
return dt.isoformat()


class V0JobFinishedReceiptRequest(BaseValidatorRequest):
message_type: RequestType = RequestType.V0JobFinishedReceiptRequest
payload: JobFinishedReceiptPayload
signature: str

def blob_for_signing(self):
return self.payload.blob_for_signing()


class JobStartedReceiptPayload(ReceiptPayload):
executor_class: ExecutorClass
time_accepted: datetime.datetime
max_timeout: int # seconds

@field_serializer("time_accepted")
def serialize_dt(self, dt: datetime.datetime, _info):
return dt.isoformat()


class V0ReceiptRequest(BaseValidatorRequest):
message_type: RequestType = RequestType.V0ReceiptRequest
payload: ReceiptPayload
class V0JobStartedReceiptRequest(BaseValidatorRequest):
message_type: RequestType = RequestType.V0JobStartedReceiptRequest
payload: JobStartedReceiptPayload
signature: str

def blob_for_signing(self):
Expand Down
49 changes: 38 additions & 11 deletions compute_horde/compute_horde/receipts.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import contextlib
import csv
import datetime
import enum
import io
import logging
import shutil
Expand All @@ -10,13 +11,19 @@
import pydantic
import requests

from .mv_protocol.validator_requests import ReceiptPayload
from .executor_class import ExecutorClass
from .mv_protocol.validator_requests import JobFinishedReceiptPayload, JobStartedReceiptPayload

logger = logging.getLogger(__name__)


class ReceiptType(enum.Enum):
JobStartedReceipt = "JobStartedReceipt"
JobFinishedReceipt = "JobFinishedReceipt"


class Receipt(pydantic.BaseModel):
payload: ReceiptPayload
payload: JobStartedReceiptPayload | JobFinishedReceiptPayload
validator_signature: str
miner_signature: str

Expand Down Expand Up @@ -52,19 +59,39 @@ def get_miner_receipts(hotkey: str, ip: str, port: int) -> list[Receipt]:
csv_reader = csv.DictReader(wrapper)
for raw_receipt in csv_reader:
try:
receipt_type = ReceiptType(raw_receipt["type"])
match receipt_type:
case ReceiptType.JobStartedReceipt:
payload = JobStartedReceiptPayload(
job_uuid=raw_receipt["job_uuid"],
miner_hotkey=raw_receipt["miner_hotkey"],
validator_hotkey=raw_receipt["validator_hotkey"],
executor_class=ExecutorClass(raw_receipt["executor_class"]),
time_accepted=datetime.datetime.fromisoformat(
raw_receipt["time_accepted"]
),
max_timeout=int(raw_receipt["max_timeout"]),
)

case ReceiptType.JobFinishedReceipt:
payload = JobFinishedReceiptPayload(
job_uuid=raw_receipt["job_uuid"],
miner_hotkey=raw_receipt["miner_hotkey"],
validator_hotkey=raw_receipt["validator_hotkey"],
time_started=datetime.datetime.fromisoformat(
raw_receipt["time_started"]
),
time_took_us=int(raw_receipt["time_took_us"]),
score_str=raw_receipt["score_str"],
)

receipt = Receipt(
payload=ReceiptPayload(
job_uuid=raw_receipt["job_uuid"],
miner_hotkey=raw_receipt["miner_hotkey"],
validator_hotkey=raw_receipt["validator_hotkey"],
time_started=datetime.datetime.fromisoformat(raw_receipt["time_started"]),
time_took_us=int(raw_receipt["time_took_us"]),
score_str=raw_receipt["score_str"],
),
payload=payload,
validator_signature=raw_receipt["validator_signature"],
miner_signature=raw_receipt["miner_signature"],
)
except (ValueError, pydantic.ValidationError):

except (KeyError, ValueError, pydantic.ValidationError):
logger.warning(f"Miner sent invalid receipt {raw_receipt=}")
continue

Expand Down
20 changes: 12 additions & 8 deletions compute_horde/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import responses
from bittensor import Keypair

from compute_horde.mv_protocol.validator_requests import ReceiptPayload
from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS
from compute_horde.mv_protocol.validator_requests import (
JobFinishedReceiptPayload,
JobStartedReceiptPayload,
)
from compute_horde.receipts import Receipt


Expand All @@ -31,25 +35,25 @@ def miner_keypair():

@pytest.fixture
def receipts(validator_keypair, miner_keypair):
payload1 = ReceiptPayload(
job_uuid="0d89161e-65e4-46ad-bed8-ecfec1cc3c6b",
payload1 = JobStartedReceiptPayload(
job_uuid="3342460e-4a99-438b-8757-795f4cb348dd",
miner_hotkey=miner_keypair.ss58_address,
validator_hotkey=validator_keypair.ss58_address,
time_started=datetime.datetime(2024, 1, 1, tzinfo=datetime.UTC),
time_took_us=1_000_000,
score_str="1.00",
executor_class=DEFAULT_EXECUTOR_CLASS,
time_accepted=datetime.datetime(2024, 1, 2, 1, 55, 0, tzinfo=datetime.UTC),
max_timeout=30,
)
receipt1 = Receipt(
payload=payload1,
validator_signature=f"0x{validator_keypair.sign(payload1.blob_for_signing()).hex()}",
miner_signature=f"0x{miner_keypair.sign(payload1.blob_for_signing()).hex()}",
)

payload2 = ReceiptPayload(
payload2 = JobFinishedReceiptPayload(
job_uuid="3342460e-4a99-438b-8757-795f4cb348dd",
miner_hotkey=miner_keypair.ss58_address,
validator_hotkey=validator_keypair.ss58_address,
time_started=datetime.datetime(2024, 1, 2, tzinfo=datetime.UTC),
time_started=datetime.datetime(2024, 1, 2, 1, 57, 0, tzinfo=datetime.UTC),
time_took_us=2_000_000,
score_str="2.00",
)
Expand Down
75 changes: 39 additions & 36 deletions compute_horde/tests/test_receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,46 @@

import pytest

from compute_horde.receipts import ReceiptFetchError, get_miner_receipts
from compute_horde.mv_protocol.validator_requests import (
JobFinishedReceiptPayload,
JobStartedReceiptPayload,
)
from compute_horde.receipts import Receipt, ReceiptFetchError, ReceiptType, get_miner_receipts


def receipts_helper(mocked_responses, receipts, miner_keypair):
def receipts_helper(mocked_responses, receipts: list[Receipt], miner_keypair):
payload_fields = set()
for payload_cls in [JobStartedReceiptPayload, JobFinishedReceiptPayload]:
payload_fields |= set(payload_cls.model_fields.keys())

buf = io.StringIO()
csv_writer = csv.writer(buf)
csv_writer.writerow(
csv_writer = csv.DictWriter(
buf,
[
"job_uuid",
"miner_hotkey",
"validator_hotkey",
"time_started",
"time_took_us",
"score_str",
"type",
"validator_signature",
"miner_signature",
]
*payload_fields,
],
)
csv_writer.writeheader()
for receipt in receipts:
csv_writer.writerow(
[
receipt.payload.job_uuid,
receipt.payload.miner_hotkey,
receipt.payload.validator_hotkey,
receipt.payload.time_started.isoformat(),
receipt.payload.time_took_us,
receipt.payload.score_str,
receipt.validator_signature,
receipt.miner_signature,
]
match receipt.payload:
case JobStartedReceiptPayload():
receipt_type = ReceiptType.JobStartedReceipt
case JobFinishedReceiptPayload():
receipt_type = ReceiptType.JobFinishedReceipt
row = (
dict(
type=receipt_type.value,
validator_signature=receipt.validator_signature,
miner_signature=receipt.miner_signature,
)
| receipt.payload.model_dump()
)
csv_writer.writerow(row)

buf.seek(0)
mocked_responses.get("http://127.0.0.1:8000/receipts/receipts.csv", body=buf.read())
mocked_responses.get("http://127.0.0.1:8000/receipts/receipts.csv", body=buf.getvalue())
return get_miner_receipts(miner_keypair.ss58_address, "127.0.0.1", 8000)


Expand All @@ -51,25 +57,22 @@ def test__get_miner_receipts__happy_path(mocked_responses, receipts, miner_keypa
got_receipts = receipts_helper(mocked_responses, receipts, miner_keypair)
assert len(got_receipts) == 2
for receipt in receipts:
got_receipt = [x for x in got_receipts if x.payload.job_uuid == receipt.payload.job_uuid][0]
got_receipt = [
x
for x in got_receipts
if x.payload.job_uuid == receipt.payload.job_uuid
and x.payload.__class__ is receipt.payload.__class__
][0]
assert got_receipt == receipt


def test__get_miner_receipts__invalid_receipt_skipped(mocked_responses, receipts, miner_keypair):
"""
Populate all the fields of one csv row with "invalid" :D
Invalidate one receipt payload fields to make it invalid
"""

class Mock:
def __str__(self):
return "invalid"

isoformat = __str__

def __getattr__(self, item):
return Mock()

receipts[1] = Mock()
receipts[1].payload.miner_hotkey = 0
receipts[1].payload.validator_hotkey = None
receipts_one_skipped_helper(mocked_responses, receipts, miner_keypair)


Expand Down
24 changes: 12 additions & 12 deletions docs/sequence-diagram-organic-jobs-loops.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ sequenceDiagram
validator-->>facilitator: connect
validator->>facilitator: V0AuthenticationRequest
facilitator->>validator: Response
loop
Note over facilitator,validator: wait for a job from facilitator
user->>facilitator: submit job
facilitator->>validator: V0JobRequest
validator-->>miner: connect
validator->>miner: V0AuthenticateRequest
miner->>validator: V0ExecutorManifestRequest
validator->>miner: V0InitialJobRequest
miner-->>executor: reserve/start executor
activate executor
miner->>validator: V0AcceptJobRequest
Note over miner,executor: wait for executor to spin up
executor-->>miner: connect
miner->>executor: V0InitialJobRequest
Expand All @@ -35,28 +35,28 @@ sequenceDiagram
validator->>facilitator: JobStatusUpdate
facilitator->>user: update job status
facilitator->>validator: Response
validator->>miner: V0JobRequest
miner->>executor: V0JobRequest
Note over miner,executor: wait for executor to complete the job
executor->>miner: V0MachineSpecsRequest
executor->>miner: V0FinishedRequest
executor-->>miner: disconnect / stop
deactivate executor
miner->>validator: V0MachineSpecsRequest
miner->>validator: V0JobFinishedRequest
validator->>miner: V0ReceiptRequest
validator->>miner: V0JobFinishedReceiptRequest
validator->>facilitator: JobStatusUpdate
facilitator->>user: job results
facilitator->>validator: Response
end
loop every 30 minutes
validator->>miner: scrape receipts
end
```
Loading

0 comments on commit dd2e1ae

Please sign in to comment.