Skip to content

Commit

Permalink
Merge pull request #284 from backend-developers-ltd/change-receipt-flow
Browse files Browse the repository at this point in the history
Change receipt flow
  • Loading branch information
emnoor-reef authored Oct 28, 2024
2 parents 4d5d35a + be1f0b8 commit 6342021
Show file tree
Hide file tree
Showing 24 changed files with 823 additions and 328 deletions.
177 changes: 108 additions & 69 deletions compute_horde/compute_horde/miner_client/organic.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from compute_horde.base.output_upload import OutputUpload
from compute_horde.base.volume import Volume
from compute_horde.base_requests import BaseRequest
from compute_horde.executor_class import ExecutorClass
from compute_horde.executor_class import EXECUTOR_CLASS, ExecutorClass
from compute_horde.miner_client.base import (
AbstractMinerClient,
ErrorCallback,
Expand All @@ -34,19 +34,24 @@
)
from compute_horde.mv_protocol.validator_requests import (
AuthenticationPayload,
JobFinishedReceiptPayload,
JobStartedReceiptPayload,
V0AuthenticateRequest,
V0InitialJobRequest,
V0JobAcceptedReceiptRequest,
V0JobFinishedReceiptRequest,
V0JobRequest,
V0JobStartedReceiptRequest,
)
from compute_horde.receipts.schemas import (
JobAcceptedReceiptPayload,
JobFinishedReceiptPayload,
JobStartedReceiptPayload,
)
from compute_horde.transport import AbstractTransport, TransportConnectionError, WSTransport
from compute_horde.utils import MachineSpecs, Timer

logger = logging.getLogger(__name__)

JOB_STARTED_RECEIPT_MIN_TTL = 30


class OrganicMinerClient(AbstractMinerClient):
"""
Expand Down Expand Up @@ -217,40 +222,55 @@ def generate_authentication_message(self) -> V0AuthenticateRequest:
def generate_job_started_receipt_message(
self,
executor_class: ExecutorClass,
accepted_timestamp: float,
max_timeout: int,
) -> V0JobStartedReceiptRequest:
time_accepted = datetime.datetime.fromtimestamp(accepted_timestamp, datetime.UTC)
ttl: int,
) -> tuple[JobStartedReceiptPayload, str]:
receipt_payload = JobStartedReceiptPayload(
job_uuid=self.job_uuid,
miner_hotkey=self.miner_hotkey,
validator_hotkey=self.my_hotkey,
timestamp=datetime.datetime.now(datetime.UTC),
executor_class=executor_class,
time_accepted=time_accepted,
max_timeout=max_timeout,
is_organic=True,
ttl=ttl,
)
return V0JobStartedReceiptRequest(
signature = f"0x{self.my_keypair.sign(receipt_payload.blob_for_signing()).hex()}"
return receipt_payload, signature

def generate_job_accepted_receipt_message(
self,
accepted_timestamp: float,
ttl: int,
) -> V0JobAcceptedReceiptRequest:
time_accepted = datetime.datetime.fromtimestamp(accepted_timestamp, datetime.UTC)
receipt_payload = JobAcceptedReceiptPayload(
job_uuid=self.job_uuid,
miner_hotkey=self.miner_hotkey,
validator_hotkey=self.my_hotkey,
timestamp=datetime.datetime.now(datetime.UTC),
time_accepted=time_accepted,
ttl=ttl,
)
return V0JobAcceptedReceiptRequest(
payload=receipt_payload,
signature=f"0x{self.my_keypair.sign(receipt_payload.blob_for_signing()).hex()}",
)

async def send_job_started_receipt_message(
async def send_job_accepted_receipt_message(
self,
executor_class: ExecutorClass,
accepted_timestamp: float,
max_timeout: int,
ttl: int,
) -> None:
try:
receipt_message = self.generate_job_started_receipt_message(
executor_class,
receipt_message = self.generate_job_accepted_receipt_message(
accepted_timestamp,
max_timeout,
ttl,
)
await self.send_model(receipt_message)
logger.debug(f"Sent job started receipt for {self.job_uuid}")
logger.debug(f"Sent job accepted receipt for {self.job_uuid}")
except Exception as e:
comment = f"Failed to send job started receipt to miner {self.miner_name} for job {self.job_uuid}: {e}"
comment = f"Failed to send job accepted receipt to miner {self.miner_name} for job {self.job_uuid}: {e}"
logger.warning(comment)
await self.notify_receipt_failure(comment)

Expand All @@ -265,6 +285,7 @@ def generate_job_finished_receipt_message(
job_uuid=self.job_uuid,
miner_hotkey=self.miner_hotkey,
validator_hotkey=self.my_hotkey,
timestamp=datetime.datetime.now(datetime.UTC),
time_started=time_started,
time_took_us=int(time_took_seconds * 1_000_000),
score_str=f"{score:.6f}",
Expand Down Expand Up @@ -320,9 +341,9 @@ def __init__(self, reason: FailureReason, received: BaseRequest | None = None):
self.received = received

def __str__(self):
s = f"Organic job failed, received: {self.received_str()}"
s = f"Organic job failed, {self.reason=}"
if self.received:
s += f", {self.received=}"
s += f", received: {self.received_str()}"
return s

def __repr__(self):
Expand Down Expand Up @@ -374,72 +395,90 @@ async def run_organic_job(

job_timer = Timer(timeout=job_details.total_job_timeout)

receipt_payload, receipt_signature = client.generate_job_started_receipt_message(
executor_class=job_details.executor_class,
max_timeout=int(job_timer.time_left()),
ttl=max(
JOB_STARTED_RECEIPT_MIN_TTL,
EXECUTOR_CLASS[job_details.executor_class].spin_up_time or 0,
),
)
await client.send_model(
V0InitialJobRequest(
job_uuid=job_details.job_uuid,
executor_class=job_details.executor_class,
base_docker_image_name=job_details.docker_image,
timeout_seconds=job_details.total_job_timeout,
volume_type=job_details.volume.volume_type if job_details.volume else None,
job_started_receipt_payload=receipt_payload,
job_started_receipt_signature=receipt_signature,
),
)

try:
initial_response = await asyncio.wait_for(
client.miner_accepting_or_declining_future,
timeout=min(job_timer.time_left(), wait_timeout),
)
except TimeoutError as exc:
raise OrganicJobError(FailureReason.INITIAL_RESPONSE_TIMED_OUT) from exc
if isinstance(initial_response, V0DeclineJobRequest):
raise OrganicJobError(FailureReason.JOB_DECLINED, initial_response)

await client.notify_job_accepted(initial_response)

try:
executor_readiness_response = await asyncio.wait_for(
client.executor_ready_or_failed_future,
timeout=min(job_timer.time_left(), wait_timeout),
try:
initial_response = await asyncio.wait_for(
client.miner_accepting_or_declining_future,
timeout=min(job_timer.time_left(), wait_timeout),
)
except TimeoutError as exc:
raise OrganicJobError(FailureReason.INITIAL_RESPONSE_TIMED_OUT) from exc
if isinstance(initial_response, V0DeclineJobRequest):
raise OrganicJobError(FailureReason.JOB_DECLINED, initial_response)

await client.notify_job_accepted(initial_response)

await client.send_job_accepted_receipt_message(
accepted_timestamp=time.time(),
ttl=int(job_timer.time_left()),
)
except TimeoutError as exc:
raise OrganicJobError(FailureReason.EXECUTOR_READINESS_RESPONSE_TIMED_OUT) from exc
if isinstance(executor_readiness_response, V0ExecutorFailedRequest):
raise OrganicJobError(FailureReason.EXECUTOR_FAILED, executor_readiness_response)

await client.notify_executor_ready(executor_readiness_response)

await client.send_job_started_receipt_message(
executor_class=job_details.executor_class,
accepted_timestamp=time.time(),
max_timeout=int(job_timer.time_left()),
)

await client.send_model(
V0JobRequest(
job_uuid=job_details.job_uuid,
executor_class=job_details.executor_class,
docker_image_name=job_details.docker_image,
raw_script=job_details.raw_script,
docker_run_options_preset=job_details.docker_run_options_preset,
docker_run_cmd=job_details.docker_run_cmd,
volume=job_details.volume,
output_upload=job_details.output,
try:
executor_readiness_response = await asyncio.wait_for(
client.executor_ready_or_failed_future,
timeout=min(job_timer.time_left(), wait_timeout),
)
except TimeoutError as exc:
raise OrganicJobError(FailureReason.EXECUTOR_READINESS_RESPONSE_TIMED_OUT) from exc
if isinstance(executor_readiness_response, V0ExecutorFailedRequest):
raise OrganicJobError(FailureReason.EXECUTOR_FAILED, executor_readiness_response)

await client.notify_executor_ready(executor_readiness_response)

await client.send_model(
V0JobRequest(
job_uuid=job_details.job_uuid,
executor_class=job_details.executor_class,
docker_image_name=job_details.docker_image,
raw_script=job_details.raw_script,
docker_run_options_preset=job_details.docker_run_options_preset,
docker_run_cmd=job_details.docker_run_cmd,
volume=job_details.volume,
output_upload=job_details.output,
)
)
)

try:
final_response = await asyncio.wait_for(
client.miner_finished_or_failed_future,
timeout=job_timer.time_left(),
)
if isinstance(final_response, V0JobFailedRequest):
raise OrganicJobError(FailureReason.JOB_FAILED, final_response)
return final_response.docker_process_stdout, final_response.docker_process_stderr
except TimeoutError as exc:
raise OrganicJobError(FailureReason.FINAL_RESPONSE_TIMED_OUT) from exc
finally:
try:
final_response = await asyncio.wait_for(
client.miner_finished_or_failed_future,
timeout=job_timer.time_left(),
)
if isinstance(final_response, V0JobFailedRequest):
raise OrganicJobError(FailureReason.JOB_FAILED, final_response)

await client.send_job_finished_receipt_message(
started_timestamp=job_timer.start_time.timestamp(),
time_took_seconds=job_timer.passed_time(),
score=0, # no score for organic jobs (at least right now)
)

return final_response.docker_process_stdout, final_response.docker_process_stderr
except TimeoutError as exc:
raise OrganicJobError(FailureReason.FINAL_RESPONSE_TIMED_OUT) from exc
except Exception:
await client.send_job_finished_receipt_message(
started_timestamp=job_timer.start_time.timestamp(),
time_took_seconds=job_timer.passed_time(),
score=0, # no score for organic jobs (at least right now)
score=0,
)
raise
59 changes: 13 additions & 46 deletions compute_horde/compute_horde/mv_protocol/validator_requests.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import datetime
import enum
import json
import re
from typing import Self

import pydantic
from pydantic import field_serializer, model_validator
from pydantic import model_validator

from ..base.docker import DockerRunOptionsPreset
from ..base.output_upload import OutputUpload # noqa
from ..base.volume import Volume, VolumeType
from ..base_requests import BaseRequest, JobMixin
from ..executor_class import ExecutorClass
from ..utils import MachineSpecs, json_dumps_default
from ..receipts.schemas import (
JobAcceptedReceiptPayload,
JobFinishedReceiptPayload,
JobStartedReceiptPayload,
)
from ..utils import MachineSpecs

SAFE_DOMAIN_REGEX = re.compile(r".*")

Expand All @@ -22,8 +26,8 @@ class RequestType(enum.Enum):
V0InitialJobRequest = "V0InitialJobRequest"
V0MachineSpecsRequest = "V0MachineSpecsRequest"
V0JobRequest = "V0JobRequest"
V0JobAcceptedReceiptRequest = "V0JobAcceptedReceiptRequest"
V0JobFinishedReceiptRequest = "V0JobFinishedReceiptRequest"
V0JobStartedReceiptRequest = "V0JobStartedReceiptRequest"
GenericError = "GenericError"


Expand Down Expand Up @@ -57,6 +61,8 @@ class V0InitialJobRequest(BaseValidatorRequest, JobMixin):
timeout_seconds: int | None = None
volume: Volume | None = None
volume_type: VolumeType | None = None
job_started_receipt_payload: JobStartedReceiptPayload
job_started_receipt_signature: str

@model_validator(mode="after")
def validate_volume_or_volume_type(self) -> Self:
Expand Down Expand Up @@ -92,34 +98,6 @@ class GenericError(BaseValidatorRequest):
details: str | None = None


class ReceiptPayload(pydantic.BaseModel):
job_uuid: str
miner_hotkey: str
validator_hotkey: str

def blob_for_signing(self):
# pydantic v2 does not support sort_keys anymore.
return json.dumps(self.model_dump(), sort_keys=True, default=json_dumps_default)


class JobFinishedReceiptPayload(ReceiptPayload):
time_started: datetime.datetime
time_took_us: int # micro-seconds
score_str: str

@property
def time_took(self):
return datetime.timedelta(microseconds=self.time_took_us)

@property
def score(self):
return float(self.score_str)

@field_serializer("time_started")
def serialize_dt(self, dt: datetime.datetime, _info):
return dt.isoformat()


class V0JobFinishedReceiptRequest(BaseValidatorRequest):
message_type: RequestType = RequestType.V0JobFinishedReceiptRequest
payload: JobFinishedReceiptPayload
Expand All @@ -129,20 +107,9 @@ def blob_for_signing(self):
return self.payload.blob_for_signing()


class JobStartedReceiptPayload(ReceiptPayload):
executor_class: ExecutorClass
time_accepted: datetime.datetime
max_timeout: int # seconds
is_organic: bool

@field_serializer("time_accepted")
def serialize_dt(self, dt: datetime.datetime, _info):
return dt.isoformat()


class V0JobStartedReceiptRequest(BaseValidatorRequest):
message_type: RequestType = RequestType.V0JobStartedReceiptRequest
payload: JobStartedReceiptPayload
class V0JobAcceptedReceiptRequest(BaseValidatorRequest):
message_type: RequestType = RequestType.V0JobAcceptedReceiptRequest
payload: JobAcceptedReceiptPayload
signature: str

def blob_for_signing(self):
Expand Down
Loading

0 comments on commit 6342021

Please sign in to comment.