diff --git a/compute_horde/compute_horde/miner_client/organic.py b/compute_horde/compute_horde/miner_client/organic.py index 08ab54464..51c0bc050 100644 --- a/compute_horde/compute_horde/miner_client/organic.py +++ b/compute_horde/compute_horde/miner_client/organic.py @@ -13,7 +13,7 @@ from compute_horde.base.output_upload import OutputUpload from compute_horde.base.volume import Volume from compute_horde.base_requests import BaseRequest -from compute_horde.executor_class import ExecutorClass +from compute_horde.executor_class import EXECUTOR_CLASS, ExecutorClass from compute_horde.miner_client.base import ( AbstractMinerClient, ErrorCallback, @@ -34,19 +34,24 @@ ) from compute_horde.mv_protocol.validator_requests import ( AuthenticationPayload, - JobFinishedReceiptPayload, - JobStartedReceiptPayload, V0AuthenticateRequest, V0InitialJobRequest, + V0JobAcceptedReceiptRequest, V0JobFinishedReceiptRequest, V0JobRequest, - V0JobStartedReceiptRequest, +) +from compute_horde.receipts.schemas import ( + JobAcceptedReceiptPayload, + JobFinishedReceiptPayload, + JobStartedReceiptPayload, ) from compute_horde.transport import AbstractTransport, TransportConnectionError, WSTransport from compute_horde.utils import MachineSpecs, Timer logger = logging.getLogger(__name__) +JOB_STARTED_RECEIPT_MIN_TTL = 30 + class OrganicMinerClient(AbstractMinerClient): """ @@ -217,40 +222,55 @@ def generate_authentication_message(self) -> V0AuthenticateRequest: def generate_job_started_receipt_message( self, executor_class: ExecutorClass, - accepted_timestamp: float, max_timeout: int, - ) -> V0JobStartedReceiptRequest: - time_accepted = datetime.datetime.fromtimestamp(accepted_timestamp, datetime.UTC) + ttl: int, + ) -> tuple[JobStartedReceiptPayload, str]: receipt_payload = JobStartedReceiptPayload( job_uuid=self.job_uuid, miner_hotkey=self.miner_hotkey, validator_hotkey=self.my_hotkey, + timestamp=datetime.datetime.now(datetime.UTC), executor_class=executor_class, - time_accepted=time_accepted, max_timeout=max_timeout, is_organic=True, + ttl=ttl, ) - return V0JobStartedReceiptRequest( + signature = f"0x{self.my_keypair.sign(receipt_payload.blob_for_signing()).hex()}" + return receipt_payload, signature + + def generate_job_accepted_receipt_message( + self, + accepted_timestamp: float, + ttl: int, + ) -> V0JobAcceptedReceiptRequest: + time_accepted = datetime.datetime.fromtimestamp(accepted_timestamp, datetime.UTC) + receipt_payload = JobAcceptedReceiptPayload( + job_uuid=self.job_uuid, + miner_hotkey=self.miner_hotkey, + validator_hotkey=self.my_hotkey, + timestamp=datetime.datetime.now(datetime.UTC), + time_accepted=time_accepted, + ttl=ttl, + ) + return V0JobAcceptedReceiptRequest( payload=receipt_payload, signature=f"0x{self.my_keypair.sign(receipt_payload.blob_for_signing()).hex()}", ) - async def send_job_started_receipt_message( + async def send_job_accepted_receipt_message( self, - executor_class: ExecutorClass, accepted_timestamp: float, - max_timeout: int, + ttl: int, ) -> None: try: - receipt_message = self.generate_job_started_receipt_message( - executor_class, + receipt_message = self.generate_job_accepted_receipt_message( accepted_timestamp, - max_timeout, + ttl, ) await self.send_model(receipt_message) - logger.debug(f"Sent job started receipt for {self.job_uuid}") + logger.debug(f"Sent job accepted receipt for {self.job_uuid}") except Exception as e: - comment = f"Failed to send job started receipt to miner {self.miner_name} for job {self.job_uuid}: {e}" + comment = f"Failed to send job accepted receipt to miner {self.miner_name} for job {self.job_uuid}: {e}" logger.warning(comment) await self.notify_receipt_failure(comment) @@ -265,6 +285,7 @@ def generate_job_finished_receipt_message( job_uuid=self.job_uuid, miner_hotkey=self.miner_hotkey, validator_hotkey=self.my_hotkey, + timestamp=datetime.datetime.now(datetime.UTC), time_started=time_started, time_took_us=int(time_took_seconds * 1_000_000), score_str=f"{score:.6f}", @@ -320,9 +341,9 @@ def __init__(self, reason: FailureReason, received: BaseRequest | None = None): self.received = received def __str__(self): - s = f"Organic job failed, received: {self.received_str()}" + s = f"Organic job failed, {self.reason=}" if self.received: - s += f", {self.received=}" + s += f", received: {self.received_str()}" return s def __repr__(self): @@ -374,6 +395,14 @@ async def run_organic_job( job_timer = Timer(timeout=job_details.total_job_timeout) + receipt_payload, receipt_signature = client.generate_job_started_receipt_message( + executor_class=job_details.executor_class, + max_timeout=int(job_timer.time_left()), + ttl=max( + JOB_STARTED_RECEIPT_MIN_TTL, + EXECUTOR_CLASS[job_details.executor_class].spin_up_time or 0, + ), + ) await client.send_model( V0InitialJobRequest( job_uuid=job_details.job_uuid, @@ -381,65 +410,75 @@ async def run_organic_job( base_docker_image_name=job_details.docker_image, timeout_seconds=job_details.total_job_timeout, volume_type=job_details.volume.volume_type if job_details.volume else None, + job_started_receipt_payload=receipt_payload, + job_started_receipt_signature=receipt_signature, ), ) try: - initial_response = await asyncio.wait_for( - client.miner_accepting_or_declining_future, - timeout=min(job_timer.time_left(), wait_timeout), - ) - except TimeoutError as exc: - raise OrganicJobError(FailureReason.INITIAL_RESPONSE_TIMED_OUT) from exc - if isinstance(initial_response, V0DeclineJobRequest): - raise OrganicJobError(FailureReason.JOB_DECLINED, initial_response) - - await client.notify_job_accepted(initial_response) - - try: - executor_readiness_response = await asyncio.wait_for( - client.executor_ready_or_failed_future, - timeout=min(job_timer.time_left(), wait_timeout), + try: + initial_response = await asyncio.wait_for( + client.miner_accepting_or_declining_future, + timeout=min(job_timer.time_left(), wait_timeout), + ) + except TimeoutError as exc: + raise OrganicJobError(FailureReason.INITIAL_RESPONSE_TIMED_OUT) from exc + if isinstance(initial_response, V0DeclineJobRequest): + raise OrganicJobError(FailureReason.JOB_DECLINED, initial_response) + + await client.notify_job_accepted(initial_response) + + await client.send_job_accepted_receipt_message( + accepted_timestamp=time.time(), + ttl=int(job_timer.time_left()), ) - except TimeoutError as exc: - raise OrganicJobError(FailureReason.EXECUTOR_READINESS_RESPONSE_TIMED_OUT) from exc - if isinstance(executor_readiness_response, V0ExecutorFailedRequest): - raise OrganicJobError(FailureReason.EXECUTOR_FAILED, executor_readiness_response) - - await client.notify_executor_ready(executor_readiness_response) - await client.send_job_started_receipt_message( - executor_class=job_details.executor_class, - accepted_timestamp=time.time(), - max_timeout=int(job_timer.time_left()), - ) - - await client.send_model( - V0JobRequest( - job_uuid=job_details.job_uuid, - executor_class=job_details.executor_class, - docker_image_name=job_details.docker_image, - raw_script=job_details.raw_script, - docker_run_options_preset=job_details.docker_run_options_preset, - docker_run_cmd=job_details.docker_run_cmd, - volume=job_details.volume, - output_upload=job_details.output, + try: + executor_readiness_response = await asyncio.wait_for( + client.executor_ready_or_failed_future, + timeout=min(job_timer.time_left(), wait_timeout), + ) + except TimeoutError as exc: + raise OrganicJobError(FailureReason.EXECUTOR_READINESS_RESPONSE_TIMED_OUT) from exc + if isinstance(executor_readiness_response, V0ExecutorFailedRequest): + raise OrganicJobError(FailureReason.EXECUTOR_FAILED, executor_readiness_response) + + await client.notify_executor_ready(executor_readiness_response) + + await client.send_model( + V0JobRequest( + job_uuid=job_details.job_uuid, + executor_class=job_details.executor_class, + docker_image_name=job_details.docker_image, + raw_script=job_details.raw_script, + docker_run_options_preset=job_details.docker_run_options_preset, + docker_run_cmd=job_details.docker_run_cmd, + volume=job_details.volume, + output_upload=job_details.output, + ) ) - ) - try: - final_response = await asyncio.wait_for( - client.miner_finished_or_failed_future, - timeout=job_timer.time_left(), - ) - if isinstance(final_response, V0JobFailedRequest): - raise OrganicJobError(FailureReason.JOB_FAILED, final_response) - return final_response.docker_process_stdout, final_response.docker_process_stderr - except TimeoutError as exc: - raise OrganicJobError(FailureReason.FINAL_RESPONSE_TIMED_OUT) from exc - finally: + try: + final_response = await asyncio.wait_for( + client.miner_finished_or_failed_future, + timeout=job_timer.time_left(), + ) + if isinstance(final_response, V0JobFailedRequest): + raise OrganicJobError(FailureReason.JOB_FAILED, final_response) + + await client.send_job_finished_receipt_message( + started_timestamp=job_timer.start_time.timestamp(), + time_took_seconds=job_timer.passed_time(), + score=0, # no score for organic jobs (at least right now) + ) + + return final_response.docker_process_stdout, final_response.docker_process_stderr + except TimeoutError as exc: + raise OrganicJobError(FailureReason.FINAL_RESPONSE_TIMED_OUT) from exc + except Exception: await client.send_job_finished_receipt_message( started_timestamp=job_timer.start_time.timestamp(), time_took_seconds=job_timer.passed_time(), - score=0, # no score for organic jobs (at least right now) + score=0, ) + raise diff --git a/compute_horde/compute_horde/mv_protocol/validator_requests.py b/compute_horde/compute_horde/mv_protocol/validator_requests.py index a3a76d0bb..326422c15 100644 --- a/compute_horde/compute_horde/mv_protocol/validator_requests.py +++ b/compute_horde/compute_horde/mv_protocol/validator_requests.py @@ -1,18 +1,22 @@ -import datetime import enum import json import re from typing import Self import pydantic -from pydantic import field_serializer, model_validator +from pydantic import model_validator from ..base.docker import DockerRunOptionsPreset from ..base.output_upload import OutputUpload # noqa from ..base.volume import Volume, VolumeType from ..base_requests import BaseRequest, JobMixin from ..executor_class import ExecutorClass -from ..utils import MachineSpecs, json_dumps_default +from ..receipts.schemas import ( + JobAcceptedReceiptPayload, + JobFinishedReceiptPayload, + JobStartedReceiptPayload, +) +from ..utils import MachineSpecs SAFE_DOMAIN_REGEX = re.compile(r".*") @@ -22,8 +26,8 @@ class RequestType(enum.Enum): V0InitialJobRequest = "V0InitialJobRequest" V0MachineSpecsRequest = "V0MachineSpecsRequest" V0JobRequest = "V0JobRequest" + V0JobAcceptedReceiptRequest = "V0JobAcceptedReceiptRequest" V0JobFinishedReceiptRequest = "V0JobFinishedReceiptRequest" - V0JobStartedReceiptRequest = "V0JobStartedReceiptRequest" GenericError = "GenericError" @@ -57,6 +61,8 @@ class V0InitialJobRequest(BaseValidatorRequest, JobMixin): timeout_seconds: int | None = None volume: Volume | None = None volume_type: VolumeType | None = None + job_started_receipt_payload: JobStartedReceiptPayload + job_started_receipt_signature: str @model_validator(mode="after") def validate_volume_or_volume_type(self) -> Self: @@ -92,34 +98,6 @@ class GenericError(BaseValidatorRequest): details: str | None = None -class ReceiptPayload(pydantic.BaseModel): - job_uuid: str - miner_hotkey: str - validator_hotkey: str - - def blob_for_signing(self): - # pydantic v2 does not support sort_keys anymore. - return json.dumps(self.model_dump(), sort_keys=True, default=json_dumps_default) - - -class JobFinishedReceiptPayload(ReceiptPayload): - time_started: datetime.datetime - time_took_us: int # micro-seconds - score_str: str - - @property - def time_took(self): - return datetime.timedelta(microseconds=self.time_took_us) - - @property - def score(self): - return float(self.score_str) - - @field_serializer("time_started") - def serialize_dt(self, dt: datetime.datetime, _info): - return dt.isoformat() - - class V0JobFinishedReceiptRequest(BaseValidatorRequest): message_type: RequestType = RequestType.V0JobFinishedReceiptRequest payload: JobFinishedReceiptPayload @@ -129,20 +107,9 @@ def blob_for_signing(self): return self.payload.blob_for_signing() -class JobStartedReceiptPayload(ReceiptPayload): - executor_class: ExecutorClass - time_accepted: datetime.datetime - max_timeout: int # seconds - is_organic: bool - - @field_serializer("time_accepted") - def serialize_dt(self, dt: datetime.datetime, _info): - return dt.isoformat() - - -class V0JobStartedReceiptRequest(BaseValidatorRequest): - message_type: RequestType = RequestType.V0JobStartedReceiptRequest - payload: JobStartedReceiptPayload +class V0JobAcceptedReceiptRequest(BaseValidatorRequest): + message_type: RequestType = RequestType.V0JobAcceptedReceiptRequest + payload: JobAcceptedReceiptPayload signature: str def blob_for_signing(self): diff --git a/compute_horde/compute_horde/receipts/admin.py b/compute_horde/compute_horde/receipts/admin.py index 712a15f79..411ac4641 100644 --- a/compute_horde/compute_horde/receipts/admin.py +++ b/compute_horde/compute_horde/receipts/admin.py @@ -1,7 +1,7 @@ from django.contrib import admin # noqa from compute_horde.base.admin import ReadOnlyAdminMixin -from compute_horde.receipts.models import JobFinishedReceipt, JobStartedReceipt +from compute_horde.receipts.models import JobFinishedReceipt, JobStartedReceipt, JobAcceptedReceipt class JobStartedReceiptsReadOnlyAdmin(admin.ModelAdmin, ReadOnlyAdminMixin): @@ -9,11 +9,24 @@ class JobStartedReceiptsReadOnlyAdmin(admin.ModelAdmin, ReadOnlyAdminMixin): "job_uuid", "miner_hotkey", "validator_hotkey", + "timestamp", "executor_class", - "time_accepted", "max_timeout", + "ttl", + ] + ordering = ["-timestamp"] + + +class JobAcceptedReceiptsReadOnlyAdmin(admin.ModelAdmin, ReadOnlyAdminMixin): + list_display = [ + "job_uuid", + "miner_hotkey", + "validator_hotkey", + "timestamp", + "time_accepted", + "ttl", ] - ordering = ["-time_accepted"] + ordering = ["-timestamp"] class JobFinishedReceiptsReadOnlyAdmin(admin.ModelAdmin, ReadOnlyAdminMixin): @@ -21,12 +34,14 @@ class JobFinishedReceiptsReadOnlyAdmin(admin.ModelAdmin, ReadOnlyAdminMixin): "job_uuid", "miner_hotkey", "validator_hotkey", + "timestamp", "score", "time_started", "time_took", ] - ordering = ["-time_started"] + ordering = ["-timestamp"] admin.site.register(JobStartedReceipt, admin_class=JobStartedReceiptsReadOnlyAdmin) +admin.site.register(JobAcceptedReceipt, admin_class=JobAcceptedReceiptsReadOnlyAdmin) admin.site.register(JobFinishedReceipt, admin_class=JobFinishedReceiptsReadOnlyAdmin) diff --git a/compute_horde/compute_horde/receipts/migrations/0003_jobacceptedreceipt_and_more.py b/compute_horde/compute_horde/receipts/migrations/0003_jobacceptedreceipt_and_more.py new file mode 100644 index 000000000..1ec5d3df6 --- /dev/null +++ b/compute_horde/compute_horde/receipts/migrations/0003_jobacceptedreceipt_and_more.py @@ -0,0 +1,80 @@ +# Generated by Django 5.1.1 on 2024-10-28 12:53 + +import datetime + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("receipts", "0002_jobstartedreceipt_is_organic"), + ] + + operations = [ + migrations.CreateModel( + name="JobAcceptedReceipt", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, primary_key=True, serialize=False, verbose_name="ID" + ), + ), + ("job_uuid", models.UUIDField()), + ("validator_hotkey", models.CharField(max_length=256)), + ("miner_hotkey", models.CharField(max_length=256)), + ("validator_signature", models.CharField(max_length=256)), + ("miner_signature", models.CharField(blank=True, max_length=256, null=True)), + ("timestamp", models.DateTimeField()), + ("time_accepted", models.DateTimeField()), + ("ttl", models.IntegerField()), + ], + options={ + "abstract": False, + }, + ), + migrations.RemoveField( + model_name="jobstartedreceipt", + name="time_accepted", + ), + migrations.AddField( + model_name="jobfinishedreceipt", + name="timestamp", + field=models.DateTimeField( + default=datetime.datetime(2020, 1, 1, 0, 0, tzinfo=datetime.UTC) + ), + preserve_default=False, + ), + migrations.AddField( + model_name="jobstartedreceipt", + name="timestamp", + field=models.DateTimeField( + default=datetime.datetime(2020, 1, 1, 0, 0, tzinfo=datetime.UTC) + ), + preserve_default=False, + ), + migrations.AddField( + model_name="jobstartedreceipt", + name="ttl", + field=models.IntegerField(default=0), + preserve_default=False, + ), + migrations.AddIndex( + model_name="jobfinishedreceipt", + index=models.Index(fields=["timestamp"], name="jobfinishedreceipt_ts_idx"), + ), + migrations.AddIndex( + model_name="jobstartedreceipt", + index=models.Index(fields=["timestamp"], name="jobstartedreceipt_ts_idx"), + ), + migrations.AddIndex( + model_name="jobacceptedreceipt", + index=models.Index(fields=["timestamp"], name="jobacceptedreceipt_ts_idx"), + ), + migrations.AddConstraint( + model_name="jobacceptedreceipt", + constraint=models.UniqueConstraint( + fields=("job_uuid",), name="receipts_unique_jobacceptedreceipt_job_uuid" + ), + ), + ] diff --git a/compute_horde/compute_horde/receipts/models.py b/compute_horde/compute_horde/receipts/models.py index 9d6db2cb8..04c45f662 100644 --- a/compute_horde/compute_horde/receipts/models.py +++ b/compute_horde/compute_horde/receipts/models.py @@ -3,11 +3,12 @@ from django.db import models from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS, ExecutorClass -from compute_horde.mv_protocol.validator_requests import ( +from compute_horde.receipts.schemas import ( + JobAcceptedReceiptPayload, JobFinishedReceiptPayload, JobStartedReceiptPayload, + Receipt, ) -from compute_horde.receipts.schemas import Receipt class ReceiptNotSigned(Exception): @@ -20,12 +21,16 @@ class AbstractReceipt(models.Model): miner_hotkey = models.CharField(max_length=256) validator_signature = models.CharField(max_length=256) miner_signature = models.CharField(max_length=256, null=True, blank=True) + timestamp = models.DateTimeField() class Meta: abstract = True constraints = [ models.UniqueConstraint(fields=["job_uuid"], name="receipts_unique_%(class)s_job_uuid"), ] + indexes = [ + models.Index(fields=["timestamp"], name="%(class)s_ts_idx"), + ] def __str__(self): return f"job_uuid: {self.job_uuid}" @@ -33,9 +38,9 @@ def __str__(self): class JobStartedReceipt(AbstractReceipt): executor_class = models.CharField(max_length=255, default=DEFAULT_EXECUTOR_CLASS) - time_accepted = models.DateTimeField() max_timeout = models.IntegerField() is_organic = models.BooleanField() + ttl = models.IntegerField() # https://github.com/typeddjango/django-stubs/issues/1684#issuecomment-1706446344 objects: models.Manager["JobStartedReceipt"] @@ -49,10 +54,36 @@ def to_receipt(self) -> Receipt: job_uuid=str(self.job_uuid), miner_hotkey=self.miner_hotkey, validator_hotkey=self.validator_hotkey, + timestamp=self.timestamp, executor_class=ExecutorClass(self.executor_class), - time_accepted=self.time_accepted, max_timeout=self.max_timeout, is_organic=self.is_organic, + ttl=self.ttl, + ), + validator_signature=self.validator_signature, + miner_signature=self.miner_signature, + ) + + +class JobAcceptedReceipt(AbstractReceipt): + time_accepted = models.DateTimeField() + ttl = models.IntegerField() + + # https://github.com/typeddjango/django-stubs/issues/1684#issuecomment-1706446344 + objects: models.Manager["JobAcceptedReceipt"] + + def to_receipt(self) -> Receipt: + if self.miner_signature is None: + raise ReceiptNotSigned("Miner signature is required") + + return Receipt( + payload=JobAcceptedReceiptPayload( + job_uuid=str(self.job_uuid), + miner_hotkey=self.miner_hotkey, + validator_hotkey=self.validator_hotkey, + timestamp=self.timestamp, + time_accepted=self.time_accepted, + ttl=self.ttl, ), validator_signature=self.validator_signature, miner_signature=self.miner_signature, @@ -82,6 +113,7 @@ def to_receipt(self) -> Receipt: job_uuid=str(self.job_uuid), miner_hotkey=self.miner_hotkey, validator_hotkey=self.validator_hotkey, + timestamp=self.timestamp, time_started=self.time_started, time_took_us=self.time_took_us, score_str=self.score_str, diff --git a/compute_horde/compute_horde/receipts/schemas.py b/compute_horde/compute_horde/receipts/schemas.py index 7d8cfc94d..7cfc875d9 100644 --- a/compute_horde/compute_horde/receipts/schemas.py +++ b/compute_horde/compute_horde/receipts/schemas.py @@ -1,24 +1,68 @@ +import datetime import enum -import logging +import json +from typing import Annotated, Literal import bittensor -import pydantic +from pydantic import BaseModel, Field -from compute_horde.mv_protocol.validator_requests import ( - JobFinishedReceiptPayload, - JobStartedReceiptPayload, -) +from compute_horde.executor_class import ExecutorClass -logger = logging.getLogger(__name__) - -class ReceiptType(enum.Enum): +class ReceiptType(enum.StrEnum): JobStartedReceipt = "JobStartedReceipt" + JobAcceptedReceipt = "JobAcceptedReceipt" JobFinishedReceipt = "JobFinishedReceipt" -class Receipt(pydantic.BaseModel): - payload: JobStartedReceiptPayload | JobFinishedReceiptPayload +class BaseReceiptPayload(BaseModel): + job_uuid: str + miner_hotkey: str + validator_hotkey: str + timestamp: datetime.datetime # when the receipt was generated + + def blob_for_signing(self): + # pydantic v2 does not support sort_keys anymore. + return json.dumps(self.model_dump(mode="json"), sort_keys=True) + + +class JobStartedReceiptPayload(BaseReceiptPayload): + receipt_type: Literal[ReceiptType.JobStartedReceipt] = ReceiptType.JobStartedReceipt + executor_class: ExecutorClass + max_timeout: int # seconds + is_organic: bool + ttl: int + + +class JobAcceptedReceiptPayload(BaseReceiptPayload): + receipt_type: Literal[ReceiptType.JobAcceptedReceipt] = ReceiptType.JobAcceptedReceipt + time_accepted: datetime.datetime + ttl: int + + +class JobFinishedReceiptPayload(BaseReceiptPayload): + receipt_type: Literal[ReceiptType.JobFinishedReceipt] = ReceiptType.JobFinishedReceipt + time_started: datetime.datetime + time_took_us: int # micro-seconds + score_str: str + + @property + def time_took(self): + return datetime.timedelta(microseconds=self.time_took_us) + + @property + def score(self): + return float(self.score_str) + + +ReceiptPayload = Annotated[ + JobStartedReceiptPayload | JobAcceptedReceiptPayload | JobFinishedReceiptPayload, + Field(discriminator="receipt_type"), +] + + +class Receipt(BaseModel): + payload: ReceiptPayload validator_signature: str miner_signature: str diff --git a/compute_horde/compute_horde/receipts/transfer.py b/compute_horde/compute_horde/receipts/transfer.py index 825343ffe..b74c06d1f 100644 --- a/compute_horde/compute_horde/receipts/transfer.py +++ b/compute_horde/compute_horde/receipts/transfer.py @@ -1,6 +1,5 @@ import contextlib import csv -import datetime import io import logging import shutil @@ -10,11 +9,14 @@ import requests from compute_horde.executor_class import ExecutorClass -from compute_horde.mv_protocol.validator_requests import ( +from compute_horde.receipts.schemas import ( + JobAcceptedReceiptPayload, JobFinishedReceiptPayload, JobStartedReceiptPayload, + Receipt, + ReceiptPayload, + ReceiptType, ) -from compute_horde.receipts.schemas import Receipt, ReceiptType logger = logging.getLogger(__name__) @@ -43,7 +45,7 @@ def get_miner_receipts(hotkey: str, ip: str, port: int) -> list[Receipt]: for raw_receipt in csv_reader: try: receipt_type = ReceiptType(raw_receipt["type"]) - receipt_payload: JobStartedReceiptPayload | JobFinishedReceiptPayload + receipt_payload: ReceiptPayload match receipt_type: case ReceiptType.JobStartedReceipt: @@ -51,12 +53,11 @@ def get_miner_receipts(hotkey: str, ip: str, port: int) -> list[Receipt]: job_uuid=raw_receipt["job_uuid"], miner_hotkey=raw_receipt["miner_hotkey"], validator_hotkey=raw_receipt["validator_hotkey"], + timestamp=raw_receipt["timestamp"], # type: ignore[arg-type] executor_class=ExecutorClass(raw_receipt["executor_class"]), - time_accepted=datetime.datetime.fromisoformat( - raw_receipt["time_accepted"] - ), max_timeout=int(raw_receipt["max_timeout"]), is_organic=raw_receipt.get("is_organic") == "True", + ttl=int(raw_receipt["ttl"]), ) case ReceiptType.JobFinishedReceipt: @@ -64,13 +65,22 @@ def get_miner_receipts(hotkey: str, ip: str, port: int) -> list[Receipt]: job_uuid=raw_receipt["job_uuid"], miner_hotkey=raw_receipt["miner_hotkey"], validator_hotkey=raw_receipt["validator_hotkey"], - time_started=datetime.datetime.fromisoformat( - raw_receipt["time_started"] - ), + timestamp=raw_receipt["timestamp"], # type: ignore[arg-type] + time_started=raw_receipt["time_started"], # type: ignore[arg-type] time_took_us=int(raw_receipt["time_took_us"]), score_str=raw_receipt["score_str"], ) + case ReceiptType.JobAcceptedReceipt: + receipt_payload = JobAcceptedReceiptPayload( + job_uuid=raw_receipt["job_uuid"], + miner_hotkey=raw_receipt["miner_hotkey"], + validator_hotkey=raw_receipt["validator_hotkey"], + timestamp=raw_receipt["timestamp"], # type: ignore[arg-type] + time_accepted=raw_receipt["time_accepted"], # type: ignore[arg-type] + ttl=int(raw_receipt["ttl"]), + ) + receipt = Receipt( payload=receipt_payload, validator_signature=raw_receipt["validator_signature"], diff --git a/compute_horde/tests/conftest.py b/compute_horde/tests/conftest.py index ffd8c3c8b..52ca9cdac 100644 --- a/compute_horde/tests/conftest.py +++ b/compute_horde/tests/conftest.py @@ -6,11 +6,12 @@ from bittensor import Keypair from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS -from compute_horde.mv_protocol.validator_requests import ( +from compute_horde.receipts.schemas import ( + JobAcceptedReceiptPayload, JobFinishedReceiptPayload, JobStartedReceiptPayload, + Receipt, ) -from compute_horde.receipts.schemas import Receipt @pytest.fixture @@ -49,10 +50,11 @@ def receipts(validator_keypair, miner_keypair): job_uuid="3342460e-4a99-438b-8757-795f4cb348dd", miner_hotkey=miner_keypair.ss58_address, validator_hotkey=validator_keypair.ss58_address, + timestamp=datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), executor_class=DEFAULT_EXECUTOR_CLASS, - time_accepted=datetime.datetime(2024, 1, 2, 1, 55, 0, tzinfo=datetime.UTC), max_timeout=30, is_organic=False, + ttl=30, ) receipt1 = Receipt( payload=payload1, @@ -60,13 +62,13 @@ def receipts(validator_keypair, miner_keypair): miner_signature=f"0x{miner_keypair.sign(payload1.blob_for_signing()).hex()}", ) - payload2 = JobFinishedReceiptPayload( + payload2 = JobAcceptedReceiptPayload( job_uuid="3342460e-4a99-438b-8757-795f4cb348dd", miner_hotkey=miner_keypair.ss58_address, validator_hotkey=validator_keypair.ss58_address, - time_started=datetime.datetime(2024, 1, 2, 1, 57, 0, tzinfo=datetime.UTC), - time_took_us=2_000_000, - score_str="2.00", + timestamp=datetime.datetime(2020, 1, 1, 0, 5, 0, tzinfo=datetime.UTC), + time_accepted=datetime.datetime(2020, 1, 1, 0, 4, 0, tzinfo=datetime.UTC), + ttl=300, ) receipt2 = Receipt( payload=payload2, @@ -74,7 +76,22 @@ def receipts(validator_keypair, miner_keypair): miner_signature=f"0x{miner_keypair.sign(payload2.blob_for_signing()).hex()}", ) - return [receipt1, receipt2] + payload3 = JobFinishedReceiptPayload( + job_uuid="3342460e-4a99-438b-8757-795f4cb348dd", + miner_hotkey=miner_keypair.ss58_address, + validator_hotkey=validator_keypair.ss58_address, + timestamp=datetime.datetime(2020, 1, 1, 0, 10, 0, tzinfo=datetime.UTC), + time_started=datetime.datetime(2020, 1, 1, 0, 9, 0, tzinfo=datetime.UTC), + time_took_us=60_000_000, + score_str="2.00", + ) + receipt3 = Receipt( + payload=payload3, + validator_signature=f"0x{validator_keypair.sign(payload3.blob_for_signing()).hex()}", + miner_signature=f"0x{miner_keypair.sign(payload3.blob_for_signing()).hex()}", + ) + + return [receipt1, receipt2, receipt3] @pytest.fixture diff --git a/compute_horde/tests/test_receipts.py b/compute_horde/tests/test_receipts.py index 545f16a47..a4877a6bd 100644 --- a/compute_horde/tests/test_receipts.py +++ b/compute_horde/tests/test_receipts.py @@ -3,20 +3,22 @@ import pytest -from compute_horde.mv_protocol.validator_requests import ( +from compute_horde.receipts.schemas import ( + JobAcceptedReceiptPayload, JobFinishedReceiptPayload, JobStartedReceiptPayload, -) -from compute_horde.receipts.schemas import ( Receipt, - ReceiptType, ) from compute_horde.receipts.transfer import ReceiptFetchError, get_miner_receipts def receipts_helper(mocked_responses, receipts: list[Receipt], miner_keypair): payload_fields = set() - for payload_cls in [JobStartedReceiptPayload, JobFinishedReceiptPayload]: + for payload_cls in [ + JobStartedReceiptPayload, + JobAcceptedReceiptPayload, + JobFinishedReceiptPayload, + ]: payload_fields |= set(payload_cls.model_fields.keys()) buf = io.StringIO() @@ -31,11 +33,7 @@ def receipts_helper(mocked_responses, receipts: list[Receipt], miner_keypair): ) csv_writer.writeheader() for receipt in receipts: - match receipt.payload: - case JobStartedReceiptPayload(): - receipt_type = ReceiptType.JobStartedReceipt - case JobFinishedReceiptPayload(): - receipt_type = ReceiptType.JobFinishedReceipt + receipt_type = receipt.payload.receipt_type row = ( dict( type=receipt_type.value, @@ -53,13 +51,20 @@ def receipts_helper(mocked_responses, receipts: list[Receipt], miner_keypair): def receipts_one_skipped_helper(mocked_responses, receipts, miner_keypair): got_receipts = receipts_helper(mocked_responses, receipts, miner_keypair) # only the valid receipt should be stored - assert len(got_receipts) == 1 - assert got_receipts[0] == receipts[0] + assert len(got_receipts) == len(receipts) - 1 + for receipt in receipts[1:]: + got_receipt = [ + x + for x in got_receipts + if x.payload.job_uuid == receipt.payload.job_uuid + and x.payload.__class__ is receipt.payload.__class__ + ][0] + assert got_receipt == receipt def test__get_miner_receipts__happy_path(mocked_responses, receipts, miner_keypair): got_receipts = receipts_helper(mocked_responses, receipts, miner_keypair) - assert len(got_receipts) == 2 + assert len(got_receipts) == len(receipts) for receipt in receipts: got_receipt = [ x @@ -76,29 +81,29 @@ def test__get_miner_receipts__invalid_receipt_skipped(mocked_responses, receipts Invalidate one receipt payload fields to make it invalid """ - receipts[1].payload.miner_hotkey = 0 - receipts[1].payload.validator_hotkey = None + receipts[0].payload.miner_hotkey = 0 + receipts[0].payload.validator_hotkey = None receipts_one_skipped_helper(mocked_responses, receipts, miner_keypair) def test__get_miner_receipts__miner_hotkey_mismatch_skipped( mocked_responses, receipts, miner_keypair, keypair ): - receipts[1].payload.miner_hotkey = keypair.ss58_address + receipts[0].payload.miner_hotkey = keypair.ss58_address receipts_one_skipped_helper(mocked_responses, receipts, miner_keypair) def test__get_miner_receipts__invalid_miner_signature_skipped( mocked_responses, receipts, miner_keypair ): - receipts[1].miner_signature = f"0x{miner_keypair.sign('bla').hex()}" + receipts[0].miner_signature = f"0x{miner_keypair.sign('bla').hex()}" receipts_one_skipped_helper(mocked_responses, receipts, miner_keypair) def test__get_miner_receipts__invalid_validator_signature_skipped( mocked_responses, receipts, miner_keypair ): - receipts[1].validator_signature = f"0x{miner_keypair.sign('bla').hex()}" + receipts[0].validator_signature = f"0x{miner_keypair.sign('bla').hex()}" receipts_one_skipped_helper(mocked_responses, receipts, miner_keypair) diff --git a/compute_horde/tests/test_run_organic_job.py b/compute_horde/tests/test_run_organic_job.py index ad43f8555..f28972590 100644 --- a/compute_horde/tests/test_run_organic_job.py +++ b/compute_horde/tests/test_run_organic_job.py @@ -14,9 +14,9 @@ BaseValidatorRequest, V0AuthenticateRequest, V0InitialJobRequest, + V0JobAcceptedReceiptRequest, V0JobFinishedReceiptRequest, V0JobRequest, - V0JobStartedReceiptRequest, ) from compute_horde.transport import StubTransport @@ -65,7 +65,7 @@ async def test_run_organic_job__success(keypair): assert sent_models_types == [ V0AuthenticateRequest, V0InitialJobRequest, - V0JobStartedReceiptRequest, + V0JobAcceptedReceiptRequest, V0JobRequest, V0JobFinishedReceiptRequest, ] diff --git a/miner/app/src/compute_horde_miner/miner/liveness_check.py b/miner/app/src/compute_horde_miner/miner/liveness_check.py index 486d1267d..f372ff305 100644 --- a/miner/app/src/compute_horde_miner/miner/liveness_check.py +++ b/miner/app/src/compute_horde_miner/miner/liveness_check.py @@ -1,5 +1,6 @@ import asyncio import base64 +import datetime import io import json import os @@ -14,6 +15,7 @@ from compute_horde.base.volume import InlineVolume, VolumeType from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS from compute_horde.mv_protocol import validator_requests +from compute_horde.receipts.schemas import JobStartedReceiptPayload from django.conf import settings from compute_horde_miner.channel_layer.channel_layer import ECRedisChannelLayer @@ -148,12 +150,26 @@ async def drive_executor() -> float: job_uuid = str(uuid.uuid4()) executor_token = f"{job_uuid}-{uuid.uuid4()}" + keypair = settings.BITTENSOR_WALLET().get_hotkey() + receipt_payload = JobStartedReceiptPayload( + job_uuid=job_uuid, + miner_hotkey=keypair.ss58_address, + validator_hotkey=keypair.ss58_address, + timestamp=datetime.datetime.now(datetime.UTC), + executor_class=DEFAULT_EXECUTOR_CLASS, + max_timeout=JOB_TIMEOUT_SECONDS, + is_organic=False, + ttl=30, + ) + receipt_signature = f"0x{keypair.sign(receipt_payload.blob_for_signing()).hex()}" initial_job_request = validator_requests.V0InitialJobRequest( job_uuid=job_uuid, executor_class=DEFAULT_EXECUTOR_CLASS, base_docker_image_name=JOB_IMAGE_NAME, timeout_seconds=JOB_TIMEOUT_SECONDS, volume_type=VolumeType.inline, + job_started_receipt_payload=receipt_payload, + job_started_receipt_signature=receipt_signature, ) job_request = validator_requests.V0JobRequest( job_uuid=job_uuid, diff --git a/miner/app/src/compute_horde_miner/miner/miner_consumer/validator_interface.py b/miner/app/src/compute_horde_miner/miner/miner_consumer/validator_interface.py index 2b2dfb0de..d57d6cbe3 100644 --- a/miner/app/src/compute_horde_miner/miner/miner_consumer/validator_interface.py +++ b/miner/app/src/compute_horde_miner/miner/miner_consumer/validator_interface.py @@ -10,7 +10,8 @@ from compute_horde.mv_protocol.validator_requests import ( BaseValidatorRequest, ) -from compute_horde.receipts.models import JobFinishedReceipt, JobStartedReceipt +from compute_horde.receipts.models import JobAcceptedReceipt, JobFinishedReceipt, JobStartedReceipt +from compute_horde.receipts.schemas import JobStartedReceiptPayload, ReceiptPayload from django.conf import settings from django.utils import timezone @@ -156,32 +157,26 @@ def verify_auth_msg(self, msg: validator_requests.V0AuthenticateRequest) -> tupl return False, "Signature mismatches" - def verify_receipt_msg( - self, - msg: validator_requests.V0JobStartedReceiptRequest - | validator_requests.V0JobFinishedReceiptRequest, - ) -> bool: + def verify_receipt_payload(self, payload: ReceiptPayload, signature: str) -> bool: if settings.IS_LOCAL_MINER: return True - if self.my_hotkey != DONT_CHECK and msg.payload.miner_hotkey != self.my_hotkey: + if self.my_hotkey != DONT_CHECK and payload.miner_hotkey != self.my_hotkey: logger.warning( - f"Miner hotkey mismatch in receipt for job_uuid {msg.payload.job_uuid} ({msg.payload.miner_hotkey!r} != {self.my_hotkey!r})" + f"Miner hotkey mismatch in receipt for job_uuid {payload.job_uuid} ({payload.miner_hotkey!r} != {self.my_hotkey!r})" ) return False - if msg.payload.validator_hotkey != self.validator_key: + if payload.validator_hotkey != self.validator_key: logger.warning( - f"Validator hotkey mismatch in receipt for job_uuid {msg.payload.job_uuid} ({msg.payload.validator_hotkey!r} != {self.validator_key!r})" + f"Validator hotkey mismatch in receipt for job_uuid {payload.job_uuid} ({payload.validator_hotkey!r} != {self.validator_key!r})" ) return False keypair = bittensor.Keypair(ss58_address=self.validator_key) - if keypair.verify(msg.blob_for_signing(), msg.signature): + if keypair.verify(payload.blob_for_signing(), signature): return True - logger.warning( - f"Validator signature mismatch in receipt for job_uuid {msg.payload.job_uuid}" - ) + logger.warning(f"Validator signature mismatch in receipt for job_uuid {payload.job_uuid}") return False async def handle_authentication(self, msg: validator_requests.V0AuthenticateRequest): @@ -299,13 +294,13 @@ async def handle(self, msg: BaseValidatorRequest): await self.handle_job_request(msg) if isinstance( - msg, validator_requests.V0JobStartedReceiptRequest - ) and self.verify_receipt_msg(msg): - await self.handle_job_started_receipt(msg) + msg, validator_requests.V0JobAcceptedReceiptRequest + ) and self.verify_receipt_payload(msg.payload, msg.signature): + await self.handle_job_accepted_receipt(msg) if isinstance( msg, validator_requests.V0JobFinishedReceiptRequest - ) and self.verify_receipt_msg(msg): + ) and self.verify_receipt_payload(msg.payload, msg.signature): await self.handle_job_finished_receipt(msg) async def handle_initial_job_request(self, msg: validator_requests.V0InitialJobRequest): @@ -320,6 +315,11 @@ async def handle_initial_job_request(self, msg: validator_requests.V0InitialJobR miner_requests.V0DeclineJobRequest(job_uuid=msg.job_uuid).model_dump_json() ) return + + await self.handle_job_started_receipt( + msg.job_started_receipt_payload, msg.job_started_receipt_signature + ) + # TODO add rate limiting per validator key here token = f"{msg.job_uuid}-{uuid.uuid4()}" await self.group_add(token) @@ -378,27 +378,54 @@ async def handle_job_request(self, msg: validator_requests.V0JobRequest): job.full_job_details = msg.model_dump() await job.asave() - async def handle_job_started_receipt(self, msg: validator_requests.V0JobStartedReceiptRequest): + async def handle_job_started_receipt(self, payload: JobStartedReceiptPayload, signature: str): logger.info( f"Received job started receipt for" - f" job_uuid={msg.payload.job_uuid} validator_hotkey={msg.payload.validator_hotkey}" - f" max_timeout={msg.payload.max_timeout}" + f" job_uuid={payload.job_uuid} validator_hotkey={payload.validator_hotkey}" + f" max_timeout={payload.max_timeout}" ) if settings.IS_LOCAL_MINER: return + if not self.verify_receipt_payload(payload, signature): + return + await JobStartedReceipt.objects.acreate( - job_uuid=msg.payload.job_uuid, - validator_hotkey=msg.payload.validator_hotkey, - miner_hotkey=msg.payload.miner_hotkey, + job_uuid=payload.job_uuid, + validator_hotkey=payload.validator_hotkey, + miner_hotkey=payload.miner_hotkey, + validator_signature=signature, + miner_signature=get_miner_signature(payload), + timestamp=payload.timestamp, + executor_class=payload.executor_class, + max_timeout=payload.max_timeout, + is_organic=payload.is_organic, + ttl=payload.ttl, + ) + + async def handle_job_accepted_receipt( + self, msg: validator_requests.V0JobAcceptedReceiptRequest + ): + logger.info( + f"Received job accepted receipt for" + f" job_uuid={msg.payload.job_uuid} validator_hotkey={msg.payload.validator_hotkey}" + ) + + if settings.IS_LOCAL_MINER: + return + + await JobAcceptedReceipt.objects.acreate( validator_signature=msg.signature, miner_signature=get_miner_signature(msg), - executor_class=msg.payload.executor_class, + job_uuid=msg.payload.job_uuid, + miner_hotkey=msg.payload.miner_hotkey, + validator_hotkey=msg.payload.validator_hotkey, + timestamp=msg.payload.timestamp, time_accepted=msg.payload.time_accepted, - max_timeout=msg.payload.max_timeout, - is_organic=msg.payload.is_organic, + ttl=msg.payload.ttl, ) + prepare_receipts.delay() async def handle_job_finished_receipt( self, msg: validator_requests.V0JobFinishedReceiptRequest @@ -422,6 +449,7 @@ async def handle_job_finished_receipt( job_uuid=msg.payload.job_uuid, miner_hotkey=msg.payload.miner_hotkey, validator_hotkey=msg.payload.validator_hotkey, + timestamp=msg.payload.timestamp, time_started=msg.payload.time_started, time_took_us=msg.payload.time_took_us, score_str=msg.payload.score_str, diff --git a/miner/app/src/compute_horde_miner/miner/receipt_store/local.py b/miner/app/src/compute_horde_miner/miner/receipt_store/local.py index 2aad9143e..c819ccd2f 100644 --- a/miner/app/src/compute_horde_miner/miner/receipt_store/local.py +++ b/miner/app/src/compute_horde_miner/miner/receipt_store/local.py @@ -4,11 +4,12 @@ import shutil import tempfile -from compute_horde.mv_protocol.validator_requests import ( +from compute_horde.receipts.schemas import ( + JobAcceptedReceiptPayload, JobFinishedReceiptPayload, JobStartedReceiptPayload, + Receipt, ) -from compute_horde.receipts.schemas import Receipt, ReceiptType from django.conf import settings from compute_horde_miner.miner.receipt_store.base import BaseReceiptStore @@ -23,6 +24,7 @@ def store(self, receipts: list[Receipt]) -> None: payload_fields = set() payload_fields |= set(JobStartedReceiptPayload.model_fields.keys()) + payload_fields |= set(JobAcceptedReceiptPayload.model_fields.keys()) payload_fields |= set(JobFinishedReceiptPayload.model_fields.keys()) buf = io.StringIO() @@ -37,14 +39,9 @@ def store(self, receipts: list[Receipt]) -> None: ) csv_writer.writeheader() for receipt in receipts: - match receipt.payload: - case JobStartedReceiptPayload(): - receipt_type = ReceiptType.JobStartedReceipt - case JobFinishedReceiptPayload(): - receipt_type = ReceiptType.JobFinishedReceipt row = ( dict( - type=receipt_type.value, + type=receipt.payload.receipt_type.value, validator_signature=receipt.validator_signature, miner_signature=receipt.miner_signature, ) diff --git a/miner/app/src/compute_horde_miner/miner/tasks.py b/miner/app/src/compute_horde_miner/miner/tasks.py index b67639065..953d52fef 100644 --- a/miner/app/src/compute_horde_miner/miner/tasks.py +++ b/miner/app/src/compute_horde_miner/miner/tasks.py @@ -2,11 +2,12 @@ from celery.utils.log import get_task_logger from compute_horde.dynamic_config import sync_dynamic_config -from compute_horde.mv_protocol.validator_requests import ( +from compute_horde.receipts.models import JobAcceptedReceipt, JobFinishedReceipt, JobStartedReceipt +from compute_horde.receipts.schemas import ( + JobAcceptedReceiptPayload, JobFinishedReceiptPayload, JobStartedReceiptPayload, ) -from compute_horde.receipts.models import JobFinishedReceipt, JobStartedReceipt from compute_horde.receipts.transfer import get_miner_receipts from compute_horde.utils import get_validators from constance import config @@ -70,27 +71,15 @@ def fetch_validators(): def prepare_receipts(): receipts = [] - job_started_receipts = JobStartedReceipt.objects.order_by("time_accepted").filter( - time_accepted__gt=now() - RECEIPTS_MAX_SERVED_PERIOD - ) - for job_started_receipt in job_started_receipts: - try: - receipts.append(job_started_receipt.to_receipt()) - except Exception as e: - logger.error( - f"Skipping job started receipt for job {job_started_receipt.job_uuid}: {e}" - ) - - job_finished_receipts = JobFinishedReceipt.objects.order_by("time_started").filter( - time_started__gt=now() - RECEIPTS_MAX_SERVED_PERIOD - ) - for job_finished_receipt in job_finished_receipts: - try: - receipts.append(job_finished_receipt.to_receipt()) - except Exception as e: - logger.error( - f"Skipping job finished receipt for job {job_finished_receipt.job_uuid}: {e}" - ) + for model in [JobStartedReceipt, JobAcceptedReceipt, JobFinishedReceipt]: + db_objects = model.objects.order_by("timestamp").filter( # type: ignore[attr-defined] + timestamp__gt=now() - RECEIPTS_MAX_SERVED_PERIOD + ) + for db_object in db_objects: + try: + receipts.append(db_object.to_receipt()) + except Exception as e: + logger.error(f"Skipping job started receipt for job {db_object.job_uuid}: {e}") logger.info(f"Stored receipts: {len(receipts)}") @@ -99,12 +88,8 @@ def prepare_receipts(): @app.task def clear_old_receipts(): - JobFinishedReceipt.objects.filter( - time_started__lt=now() - RECEIPTS_MAX_RETENTION_PERIOD - ).delete() - JobStartedReceipt.objects.filter( - time_accepted__lt=now() - RECEIPTS_MAX_RETENTION_PERIOD - ).delete() + for model in [JobStartedReceipt, JobAcceptedReceipt, JobFinishedReceipt]: + model.objects.filter(timestamp__lt=now() - RECEIPTS_MAX_RETENTION_PERIOD).delete() # type: ignore[attr-defined] @app.task @@ -124,44 +109,77 @@ def get_receipts_from_old_miner(): tolerance = datetime.timedelta(hours=1) latest_job_started_receipt = ( - JobStartedReceipt.objects.filter(miner_hotkey=hotkey).order_by("-time_accepted").first() + JobStartedReceipt.objects.filter(miner_hotkey=hotkey).order_by("-timestamp").first() ) job_started_receipt_cutoff_time = ( - latest_job_started_receipt.time_accepted - tolerance if latest_job_started_receipt else None + latest_job_started_receipt.timestamp - tolerance if latest_job_started_receipt else None ) job_started_receipt_to_create = [ JobStartedReceipt( job_uuid=receipt.payload.job_uuid, miner_hotkey=receipt.payload.miner_hotkey, validator_hotkey=receipt.payload.validator_hotkey, + validator_signature=receipt.validator_signature, + miner_signature=receipt.miner_signature, + timestamp=receipt.payload.timestamp, executor_class=receipt.payload.executor_class, - time_accepted=receipt.payload.time_accepted, max_timeout=receipt.payload.max_timeout, is_organic=receipt.payload.is_organic, + ttl=receipt.payload.ttl, ) for receipt in receipts if isinstance(receipt.payload, JobStartedReceiptPayload) and ( job_started_receipt_cutoff_time is None - or receipt.payload.time_accepted > job_started_receipt_cutoff_time + or receipt.payload.timestamp > job_started_receipt_cutoff_time ) ] if job_started_receipt_to_create: JobStartedReceipt.objects.bulk_create(job_started_receipt_to_create, ignore_conflicts=True) + latest_job_accepted_receipt = ( + JobAcceptedReceipt.objects.filter(miner_hotkey=hotkey).order_by("-timestamp").first() + ) + job_accepted_receipt_cutoff_time = ( + latest_job_accepted_receipt.timestamp - tolerance if latest_job_accepted_receipt else None + ) + job_accepted_receipt_to_create = [ + JobAcceptedReceipt( + job_uuid=receipt.payload.job_uuid, + miner_hotkey=receipt.payload.miner_hotkey, + validator_hotkey=receipt.payload.validator_hotkey, + validator_signature=receipt.validator_signature, + miner_signature=receipt.miner_signature, + timestamp=receipt.payload.timestamp, + time_accepted=receipt.payload.time_accepted, + ttl=receipt.payload.ttl, + ) + for receipt in receipts + if isinstance(receipt.payload, JobAcceptedReceiptPayload) + and ( + job_accepted_receipt_cutoff_time is None + or receipt.payload.timestamp > job_accepted_receipt_cutoff_time + ) + ] + if job_accepted_receipt_to_create: + JobAcceptedReceipt.objects.bulk_create( + job_accepted_receipt_to_create, ignore_conflicts=True + ) + latest_job_finished_receipt = ( - JobFinishedReceipt.objects.filter(miner_hotkey=hotkey).order_by("-time_started").first() + JobFinishedReceipt.objects.filter(miner_hotkey=hotkey).order_by("-timestamp").first() ) job_finished_receipt_cutoff_time = ( - latest_job_finished_receipt.time_started - tolerance - if latest_job_finished_receipt - else None + latest_job_finished_receipt.timestamp - tolerance if latest_job_finished_receipt else None ) job_finished_receipt_to_create = [ JobFinishedReceipt( job_uuid=receipt.payload.job_uuid, miner_hotkey=receipt.payload.miner_hotkey, validator_hotkey=receipt.payload.validator_hotkey, + validator_signature=receipt.validator_signature, + miner_signature=receipt.miner_signature, + timestamp=receipt.payload.timestamp, time_started=receipt.payload.time_started, time_took_us=receipt.payload.time_took_us, score_str=receipt.payload.score_str, @@ -170,7 +188,7 @@ def get_receipts_from_old_miner(): if isinstance(receipt.payload, JobFinishedReceiptPayload) and ( job_finished_receipt_cutoff_time is None - or receipt.payload.time_started > job_finished_receipt_cutoff_time + or receipt.payload.timestamp > job_finished_receipt_cutoff_time ) ] if job_finished_receipt_to_create: diff --git a/miner/app/src/compute_horde_miner/miner/tests/conftest.py b/miner/app/src/compute_horde_miner/miner/tests/conftest.py index 6e21e1b1a..ff1dda9dc 100644 --- a/miner/app/src/compute_horde_miner/miner/tests/conftest.py +++ b/miner/app/src/compute_horde_miner/miner/tests/conftest.py @@ -1,8 +1,11 @@ +import logging from collections.abc import Generator import bittensor import pytest +logger = logging.getLogger(__name__) + @pytest.fixture(scope="session") def validator_wallet(): @@ -27,3 +30,14 @@ def some() -> Generator[int, None, None]: # setup code yield 1 # teardown code + + +@pytest.fixture(scope="session", autouse=True) +def wallet(): + wallet = bittensor.wallet(name="test_miner") + try: + # workaround the overwrite flag + wallet.regenerate_coldkey(seed="0" * 64, use_password=False, overwrite=True) + wallet.regenerate_hotkey(seed="1" * 64, use_password=False, overwrite=True) + except Exception as e: + logger.error(f"Failed to create wallet: {e}") diff --git a/miner/app/src/compute_horde_miner/miner/tests/integration/test_mocked_executor_manager.py b/miner/app/src/compute_horde_miner/miner/tests/integration/test_mocked_executor_manager.py index cd099ea5f..ff7646e8d 100644 --- a/miner/app/src/compute_horde_miner/miner/tests/integration/test_mocked_executor_manager.py +++ b/miner/app/src/compute_horde_miner/miner/tests/integration/test_mocked_executor_manager.py @@ -92,6 +92,18 @@ async def run_regular_flow_test(validator_key: str, job_uuid: str): "base_docker_image_name": "it's teeeeests", "timeout_seconds": 60, "volume_type": "inline", + "job_started_receipt_payload": { + "receipt_type": "JobStartedReceipt", + "job_uuid": job_uuid, + "miner_hotkey": "miner_hotkey", + "validator_hotkey": validator_key, + "timestamp": "2020-01-01T00:00Z", + "executor_class": DEFAULT_EXECUTOR_CLASS, + "max_timeout": 60, + "is_organic": True, + "ttl": 5, + }, + "job_started_receipt_signature": "gibberish", } ) response = await communicator.receive_json_from(timeout=WEBSOCKET_TIMEOUT) @@ -125,7 +137,7 @@ async def run_regular_flow_test(validator_key: str, job_uuid: str): } -async def test_main_loop(validator: Validator, job_uuid: str): +async def test_main_loop(validator: Validator, job_uuid: str, mock_keypair: MagicMock): await run_regular_flow_test(validator.public_key, job_uuid) diff --git a/miner/app/src/compute_horde_miner/miner/tests/settings.py b/miner/app/src/compute_horde_miner/miner/tests/settings.py index c3f29f844..e52a0333e 100644 --- a/miner/app/src/compute_horde_miner/miner/tests/settings.py +++ b/miner/app/src/compute_horde_miner/miner/tests/settings.py @@ -1,4 +1,7 @@ import os +import pathlib + +import bittensor os.environ.update( { @@ -13,3 +16,19 @@ EXECUTOR_MANAGER_CLASS_PATH = "compute_horde_miner.miner.tests.executor_manager:StubExecutorManager" DEBUG_TURN_AUTHENTICATION_OFF = True + +BITTENSOR_WALLET_DIRECTORY = pathlib.Path("~").expanduser() / ".bittensor" / "wallets" +BITTENSOR_WALLET_NAME = "test_miner" +BITTENSOR_WALLET_HOTKEY_NAME = "default" + + +def BITTENSOR_WALLET() -> bittensor.wallet: # type: ignore + if not BITTENSOR_WALLET_NAME or not BITTENSOR_WALLET_HOTKEY_NAME: + raise RuntimeError("Wallet not configured") + wallet = bittensor.wallet( + name=BITTENSOR_WALLET_NAME, + hotkey=BITTENSOR_WALLET_HOTKEY_NAME, + path=str(BITTENSOR_WALLET_DIRECTORY), + ) + wallet.hotkey_file.get_keypair() # this raises errors if the keys are inaccessible + return wallet diff --git a/miner/app/src/compute_horde_miner/miner/tests/test_migration.py b/miner/app/src/compute_horde_miner/miner/tests/test_migration.py index b478fb433..573423919 100644 --- a/miner/app/src/compute_horde_miner/miner/tests/test_migration.py +++ b/miner/app/src/compute_horde_miner/miner/tests/test_migration.py @@ -1,4 +1,5 @@ import uuid +from datetime import UTC, datetime import pytest from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS @@ -6,8 +7,8 @@ JobFinishedReceiptPayload, JobStartedReceiptPayload, ) -from compute_horde.receipts.models import JobFinishedReceipt, JobStartedReceipt -from compute_horde.receipts.schemas import Receipt +from compute_horde.receipts.models import JobAcceptedReceipt, JobFinishedReceipt, JobStartedReceipt +from compute_horde.receipts.schemas import JobAcceptedReceiptPayload, Receipt from django.utils.timezone import now from pytest_mock import MockerFixture @@ -47,10 +48,23 @@ def test_get_receipts_from_old_miner(mocker: MockerFixture): job_uuid=str(uuid.uuid4()), miner_hotkey="m1", validator_hotkey="v1", + timestamp=datetime(2020, 1, 1, tzinfo=UTC), executor_class=DEFAULT_EXECUTOR_CLASS, - time_accepted=now(), max_timeout=30, is_organic=False, + ttl=5, + ), + validator_signature="0xv1", + miner_signature="0xm1", + ), + Receipt( + payload=JobAcceptedReceiptPayload( + job_uuid=str(uuid.uuid4()), + miner_hotkey="m1", + validator_hotkey="v1", + timestamp=datetime(2020, 1, 1, tzinfo=UTC), + time_accepted=datetime(2020, 1, 1, tzinfo=UTC), + ttl=5, ), validator_signature="0xv1", miner_signature="0xm1", @@ -59,13 +73,14 @@ def test_get_receipts_from_old_miner(mocker: MockerFixture): payload=JobFinishedReceiptPayload( job_uuid=str(uuid.uuid4()), miner_hotkey="m1", - validator_hotkey="v2", + validator_hotkey="v3", + timestamp=datetime(2020, 1, 1, tzinfo=UTC), time_started=now(), time_took_us=35_000_000, score_str="103.45", ), - validator_signature="0xv2", - miner_signature="0xm2", + validator_signature="0xv3", + miner_signature="0xm3", ), ] mocker.patch("compute_horde_miner.miner.tasks.get_miner_receipts", return_value=receipts) @@ -74,4 +89,5 @@ def test_get_receipts_from_old_miner(mocker: MockerFixture): get_receipts_from_old_miner() assert JobStartedReceipt.objects.count() == 1 + assert JobAcceptedReceipt.objects.count() == 1 assert JobFinishedReceipt.objects.count() == 1 diff --git a/miner/app/src/compute_horde_miner/miner/tests/test_receipts.py b/miner/app/src/compute_horde_miner/miner/tests/test_receipts.py index 6376b6fce..b5b3bac63 100644 --- a/miner/app/src/compute_horde_miner/miner/tests/test_receipts.py +++ b/miner/app/src/compute_horde_miner/miner/tests/test_receipts.py @@ -3,20 +3,24 @@ import bittensor import pytest +from compute_horde.base.volume import VolumeType from compute_horde.executor_class import ExecutorClass from compute_horde.mv_protocol.validator_requests import ( AuthenticationPayload, JobFinishedReceiptPayload, JobStartedReceiptPayload, V0AuthenticateRequest, + V0InitialJobRequest, + V0JobAcceptedReceiptRequest, V0JobFinishedReceiptRequest, - V0JobStartedReceiptRequest, ) -from compute_horde.receipts.models import JobFinishedReceipt, JobStartedReceipt +from compute_horde.receipts.models import JobAcceptedReceipt, JobFinishedReceipt, JobStartedReceipt +from compute_horde.receipts.schemas import JobAcceptedReceiptPayload from django.utils import timezone from pytest_mock import MockerFixture -from compute_horde_miner.miner.models import AcceptedJob, Validator +from compute_horde_miner.miner.models import Validator +from compute_horde_miner.miner.tests.executor_manager import fake_executor from compute_horde_miner.miner.tests.validator import fake_validator @@ -24,6 +28,14 @@ def _sign(msg, key: bittensor.Keypair): return f"0x{key.sign(msg.blob_for_signing()).hex()}" +@pytest.fixture +def job_uuid(): + _job_uuid = str(uuid4()) + fake_executor.job_uuid = _job_uuid + yield _job_uuid + fake_executor.job_uuid = None + + @pytest.mark.parametrize( "organic_job", (True, False), @@ -35,13 +47,13 @@ async def test_receipt_is_saved( miner_wallet: bittensor.wallet, mocker: MockerFixture, organic_job: bool, + job_uuid: str, settings, ) -> None: mocker.patch("compute_horde_miner.miner.miner_consumer.validator_interface.prepare_receipts") settings.DEBUG_TURN_AUTHENTICATION_OFF = True settings.BITTENSOR_WALLET = lambda: miner_wallet - job_uuid = str(uuid4()) - validator = await Validator.objects.acreate( + await Validator.objects.acreate( public_key=validator_wallet.hotkey.ss58_address, active=True, ) @@ -69,28 +81,45 @@ async def test_receipt_is_saved( }, } - # Skip doing the job - await AcceptedJob.objects.acreate( - job_uuid=job_uuid, - validator=validator, - initial_job_details={}, - ) - # Send the receipts job_started_receipt_payload = JobStartedReceiptPayload( job_uuid=job_uuid, is_organic=organic_job, miner_hotkey=miner_wallet.hotkey.ss58_address, validator_hotkey=validator_wallet.hotkey.ss58_address, + timestamp=timezone.now(), executor_class=ExecutorClass.spin_up_4min__gpu_24gb, + max_timeout=60, + ttl=5, + ) + job_started_receipt_signature = _sign(job_started_receipt_payload, validator_wallet.hotkey) + await fake_validator_channel.send_to( + V0InitialJobRequest( + job_uuid=job_uuid, + executor_class=ExecutorClass.spin_up_4min__gpu_24gb, + base_docker_image_name="it's teeeeests", + timeout_seconds=60, + volume_type=VolumeType.inline, + job_started_receipt_payload=job_started_receipt_payload, + job_started_receipt_signature=job_started_receipt_signature, + ).model_dump_json() + ) + + # skip doing the job, and only send receipts + + job_accepted_receipt_payload = JobAcceptedReceiptPayload( + job_uuid=job_uuid, + miner_hotkey=miner_wallet.hotkey.ss58_address, + validator_hotkey=validator_wallet.hotkey.ss58_address, + timestamp=timezone.now(), time_accepted=timezone.now(), - max_timeout=123, + ttl=5, ) await fake_validator_channel.send_to( - V0JobStartedReceiptRequest( + V0JobAcceptedReceiptRequest( job_uuid=job_uuid, - payload=job_started_receipt_payload, - signature=_sign(job_started_receipt_payload, validator_wallet.hotkey), + payload=job_accepted_receipt_payload, + signature=_sign(job_accepted_receipt_payload, validator_wallet.hotkey), ).model_dump_json() ) @@ -98,6 +127,7 @@ async def test_receipt_is_saved( job_uuid=job_uuid, miner_hotkey=miner_wallet.hotkey.ss58_address, validator_hotkey=validator_wallet.hotkey.ss58_address, + timestamp=timezone.now(), time_started=timezone.now(), time_took_us=123, score_str="123.45", @@ -113,4 +143,5 @@ async def test_receipt_is_saved( assert await JobStartedReceipt.objects.filter( job_uuid=job_uuid, is_organic=organic_job ).aexists() + assert await JobAcceptedReceipt.objects.filter(job_uuid=job_uuid).aexists() assert await JobFinishedReceipt.objects.filter(job_uuid=job_uuid).aexists() diff --git a/tests/integration_tests/test_miner_on_dev_executor_manager.py b/tests/integration_tests/test_miner_on_dev_executor_manager.py index 591970e7d..2610858e2 100644 --- a/tests/integration_tests/test_miner_on_dev_executor_manager.py +++ b/tests/integration_tests/test_miner_on_dev_executor_manager.py @@ -1,3 +1,4 @@ +from datetime import datetime, UTC import asyncio import base64 import io @@ -11,6 +12,8 @@ import uuid import zipfile from unittest import mock +import bittensor +import logging import pytest import requests @@ -21,7 +24,29 @@ MINER_PORT = 8045 WEBSOCKET_TIMEOUT = 10 -validator_key = str(uuid.uuid4()) +logger = logging.getLogger(__name__) + + +def get_miner_wallet(): + wallet = bittensor.wallet(name="test_miner") + try: + # workaround the overwrite flag + wallet.regenerate_coldkey(seed="0" * 64, use_password=False, overwrite=True) + wallet.regenerate_hotkey(seed="1" * 64, use_password=False, overwrite=True) + except Exception as e: + logger.error(f"Failed to create wallet: {e}") + return wallet + + +def get_validator_wallet(): + wallet = bittensor.wallet(name="test_validator") + try: + # workaround the overwrite flag + wallet.regenerate_coldkey(seed="2" * 64, use_password=False, overwrite=True) + wallet.regenerate_hotkey(seed="3" * 64, use_password=False, overwrite=True) + except Exception as e: + logger.error(f"Failed to create wallet: {e}") + return wallet class Test(ActiveSubnetworkBaseTest): @@ -43,6 +68,7 @@ def miner_path_and_args(cls) -> list[str]: @classmethod def miner_preparation_tasks(cls): + validator_key = get_validator_wallet().get_hotkey().ss58_address db_shell_cmd = f"{sys.executable} miner/app/src/manage.py dbshell" for cmd in [ f'echo "DROP DATABASE IF EXISTS compute_horde_miner_integration_test" | {db_shell_cmd}', @@ -69,6 +95,7 @@ def miner_environ(cls) -> dict[str, str]: "PORT_FOR_EXECUTORS": str(MINER_PORT), "DATABASE_SUFFIX": "_integration_test", "DEBUG_TURN_AUTHENTICATION_OFF": "1", + "BITTENSOR_WALLET_NAME": "test_miner", } @classmethod @@ -77,11 +104,16 @@ def validator_path_and_args(cls) -> list[str]: @classmethod def validator_environ(cls) -> dict[str, str]: - return {} + return { + "BITTENSOR_WALLET_NAME": "test_validator", + } @pytest.mark.asyncio async def test_echo_image(self): job_uuid = str(uuid.uuid4()) + miner_key = get_miner_wallet().get_hotkey().ss58_address + validator_wallet = get_validator_wallet() + validator_key = validator_wallet.get_hotkey().ss58_address payload = "".join( random.choice(string.ascii_uppercase + string.digits) for _ in range(32) @@ -121,6 +153,19 @@ async def test_echo_image(self): ] }, } + + receipt_payload = { + "job_uuid": job_uuid, + "miner_hotkey": miner_key, + "validator_hotkey": validator_key, + "timestamp": datetime.now(tz=UTC).isoformat(), + "executor_class": DEFAULT_EXECUTOR_CLASS, + "max_timeout": 60, + "is_organic": True, + "ttl": 30, + } + blob = json.dumps(receipt_payload, sort_keys=True) + signature = "0x" + validator_wallet.get_hotkey().sign(blob).hex() await ws.send( json.dumps( { @@ -130,6 +175,8 @@ async def test_echo_image(self): "base_docker_image_name": "backenddevelopersltd/compute-horde-job-echo:v0-latest", "timeout_seconds": 60, "volume_type": "inline", + "job_started_receipt_payload": receipt_payload, + "job_started_receipt_signature": signature, } ) ) diff --git a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py index 211edac84..991db7063 100644 --- a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py +++ b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py @@ -37,15 +37,18 @@ ) from compute_horde.mv_protocol.validator_requests import ( AuthenticationPayload, - JobFinishedReceiptPayload, - JobStartedReceiptPayload, V0AuthenticateRequest, V0InitialJobRequest, + V0JobAcceptedReceiptRequest, V0JobFinishedReceiptRequest, V0JobRequest, - V0JobStartedReceiptRequest, ) -from compute_horde.receipts.models import JobFinishedReceipt, JobStartedReceipt +from compute_horde.receipts.models import JobAcceptedReceipt, JobFinishedReceipt, JobStartedReceipt +from compute_horde.receipts.schemas import ( + JobAcceptedReceiptPayload, + JobFinishedReceiptPayload, + JobStartedReceiptPayload, +) from compute_horde.transport import AbstractTransport, WSTransport from compute_horde.transport.base import TransportConnectionError from django.conf import settings @@ -268,7 +271,9 @@ class Job: machine_specs: V0MachineSpecsRequest | None = None # receipts - job_started_receipt: V0JobStartedReceiptRequest | None = None + job_started_receipt_payload: JobStartedReceiptPayload | None = None + job_started_receipt_signature: str | None = None + job_accepted_receipt: V0JobAcceptedReceiptRequest | None = None job_finished_receipt: V0JobFinishedReceiptRequest | None = None # scoring @@ -386,6 +391,12 @@ def emit_telemetry_event(self) -> SystemEvent | None: data=data, ) + def get_spin_up_time(self) -> int: + spin_up_time = EXECUTOR_CLASS[self.executor_class].spin_up_time + assert spin_up_time is not None + spin_up_time = max(spin_up_time, _MIN_SPIN_UP_TIME) + return spin_up_time + @dataclass class BatchContext: @@ -686,21 +697,38 @@ def _get_total_executor_count(ctx: BatchContext) -> int: def _generate_job_started_receipt(ctx: BatchContext, job: Job) -> None: - assert job.job_started_receipt is None - - assert job.executor_response_time is not None + assert job.job_started_receipt_payload is None + assert job.job_started_receipt_signature is None max_timeout = job.job_generator.timeout_seconds() payload = JobStartedReceiptPayload( job_uuid=job.uuid, miner_hotkey=job.miner_hotkey, validator_hotkey=ctx.own_keypair.ss58_address, + timestamp=datetime.now(tz=UTC), executor_class=ExecutorClass(job.executor_class), - time_accepted=job.executor_response_time, max_timeout=max_timeout, is_organic=False, + ttl=job.get_spin_up_time(), ) - job.job_started_receipt = V0JobStartedReceiptRequest( + signature = f"0x{ctx.own_keypair.sign(payload.blob_for_signing()).hex()}" + job.job_started_receipt_payload = payload + job.job_started_receipt_signature = signature + + +def _generate_job_accepted_receipt(ctx: BatchContext, job: Job) -> None: + assert job.job_accepted_receipt is None + assert job.accept_response_time is not None + + payload = JobAcceptedReceiptPayload( + job_uuid=job.uuid, + miner_hotkey=job.miner_hotkey, + validator_hotkey=ctx.own_keypair.ss58_address, + timestamp=datetime.now(tz=UTC), + time_accepted=job.accept_response_time, + ttl=6 * 60, # FIXME: max time allowed to run the job + ) + job.job_accepted_receipt = V0JobAcceptedReceiptRequest( payload=payload, signature=f"0x{ctx.own_keypair.sign(payload.blob_for_signing()).hex()}", ) @@ -722,6 +750,7 @@ def _generate_job_finished_receipt(ctx: BatchContext, job: Job) -> None: job_uuid=job.uuid, miner_hotkey=job.miner_hotkey, validator_hotkey=ctx.own_keypair.ss58_address, + timestamp=datetime.now(tz=UTC), time_started=job.job_before_sent_time, time_took_us=int(time_took_sec * 1_000_000), score_str=f"{job.score:.6g}", @@ -880,10 +909,11 @@ async def _send_initial_job_request( job.accept_barrier_time = barrier_time client = ctx.clients[job.miner_hotkey] - spin_up_time = EXECUTOR_CLASS[job.executor_class].spin_up_time - assert spin_up_time is not None - spin_up_time = max(spin_up_time, _MIN_SPIN_UP_TIME) - stagger_wait_interval = max_spin_up_time - spin_up_time + _generate_job_started_receipt(ctx, job) + assert job.job_started_receipt_payload is not None + assert job.job_started_receipt_signature is not None + + stagger_wait_interval = max_spin_up_time - job.get_spin_up_time() assert stagger_wait_interval >= 0 request = V0InitialJobRequest( @@ -892,6 +922,8 @@ async def _send_initial_job_request( base_docker_image_name=job.job_generator.base_docker_image_name(), timeout_seconds=job.job_generator.timeout_seconds(), volume=job.volume if job.job_generator.volume_in_initial_req() else None, + job_started_receipt_payload=job.job_started_receipt_payload, + job_started_receipt_signature=job.job_started_receipt_signature, ) request_json = request.model_dump_json() @@ -907,24 +939,22 @@ async def _send_initial_job_request( await job.accept_response_event.wait() if isinstance(job.accept_response, V0AcceptJobRequest): - await job.executor_response_event.wait() + _generate_job_accepted_receipt(ctx, job) + assert job.job_accepted_receipt is not None + try: + receipt_json = job.job_accepted_receipt.model_dump_json() + async with asyncio.timeout(_SEND_RECEIPT_TIMEOUT): + await client.send_check(receipt_json) + except (Exception, asyncio.CancelledError) as exc: + logger.warning("%s failed to send job accepted receipt: %r", job.name, exc) + job.system_event( + type=SystemEvent.EventType.RECEIPT_FAILURE, + subtype=SystemEvent.EventSubType.RECEIPT_SEND_ERROR, + description=repr(exc), + func="_send_initial_job_request", + ) - # send the receipt from outside the timeout - if isinstance(job.executor_response, V0ExecutorReadyRequest): - _generate_job_started_receipt(ctx, job) - assert job.job_started_receipt is not None - try: - receipt_json = job.job_started_receipt.model_dump_json() - async with asyncio.timeout(_SEND_RECEIPT_TIMEOUT): - await client.send_check(receipt_json) - except (Exception, asyncio.CancelledError) as exc: - logger.warning("%s failed to send job started receipt: %r", job.name, exc) - job.system_event( - type=SystemEvent.EventType.RECEIPT_FAILURE, - subtype=SystemEvent.EventSubType.RECEIPT_SEND_ERROR, - description=repr(exc), - func="_send_initial_job_request", - ) + await job.executor_response_event.wait() async def _send_job_request( @@ -1511,22 +1541,43 @@ def _db_persist(ctx: BatchContext) -> None: job_started_receipts: list[JobStartedReceipt] = [] for job in ctx.jobs.values(): - if job.job_started_receipt is not None: - started_payload = job.job_started_receipt.payload + if ( + job.job_started_receipt_payload is not None + and job.job_started_receipt_signature is not None + ): + started_payload = job.job_started_receipt_payload job_started_receipts.append( JobStartedReceipt( job_uuid=started_payload.job_uuid, miner_hotkey=started_payload.miner_hotkey, validator_hotkey=started_payload.validator_hotkey, - validator_signature=job.job_started_receipt.signature, + validator_signature=job.job_started_receipt_signature, + timestamp=started_payload.timestamp, executor_class=started_payload.executor_class, - time_accepted=started_payload.time_accepted, max_timeout=started_payload.max_timeout, is_organic=False, + ttl=started_payload.ttl, ) ) JobStartedReceipt.objects.bulk_create(job_started_receipts) + job_accepted_receipts: list[JobAcceptedReceipt] = [] + for job in ctx.jobs.values(): + if job.job_accepted_receipt is not None: + accepted_payload = job.job_accepted_receipt.payload + job_accepted_receipts.append( + JobAcceptedReceipt( + job_uuid=accepted_payload.job_uuid, + miner_hotkey=accepted_payload.miner_hotkey, + validator_hotkey=accepted_payload.validator_hotkey, + validator_signature=job.job_accepted_receipt.signature, + timestamp=accepted_payload.timestamp, + time_accepted=accepted_payload.time_accepted, + ttl=accepted_payload.ttl, + ) + ) + JobAcceptedReceipt.objects.bulk_create(job_accepted_receipts) + job_finished_receipts: list[JobFinishedReceipt] = [] for job in ctx.jobs.values(): if job.job_finished_receipt is not None: @@ -1537,6 +1588,7 @@ def _db_persist(ctx: BatchContext) -> None: miner_hotkey=finished_payload.miner_hotkey, validator_hotkey=finished_payload.validator_hotkey, validator_signature=job.job_finished_receipt.signature, + timestamp=finished_payload.timestamp, time_started=finished_payload.time_started, time_took_us=finished_payload.time_took_us, score_str=finished_payload.score_str, diff --git a/validator/app/src/compute_horde_validator/validator/tasks.py b/validator/app/src/compute_horde_validator/validator/tasks.py index b39c31893..3cb47b89e 100644 --- a/validator/app/src/compute_horde_validator/validator/tasks.py +++ b/validator/app/src/compute_horde_validator/validator/tasks.py @@ -19,11 +19,12 @@ from celery.result import allow_join_result from celery.utils.log import get_task_logger from compute_horde.dynamic_config import sync_dynamic_config -from compute_horde.mv_protocol.validator_requests import ( +from compute_horde.receipts.models import JobAcceptedReceipt, JobFinishedReceipt, JobStartedReceipt +from compute_horde.receipts.schemas import ( + JobAcceptedReceiptPayload, JobFinishedReceiptPayload, JobStartedReceiptPayload, ) -from compute_horde.receipts.models import JobFinishedReceipt, JobStartedReceipt from compute_horde.receipts.transfer import get_miner_receipts from compute_horde.utils import ValidatorListError, get_validators from constance import config @@ -1023,44 +1024,75 @@ def fetch_receipts_from_miner(hotkey: str, ip: str, port: int): tolerance = timedelta(hours=1) latest_job_started_receipt = ( - JobStartedReceipt.objects.filter(miner_hotkey=hotkey).order_by("-time_accepted").first() + JobStartedReceipt.objects.filter(miner_hotkey=hotkey).order_by("-timestamp").first() ) job_started_receipt_cutoff_time = ( - latest_job_started_receipt.time_accepted - tolerance if latest_job_started_receipt else None + latest_job_started_receipt.timestamp - tolerance if latest_job_started_receipt else None ) job_started_receipt_to_create = [ JobStartedReceipt( job_uuid=receipt.payload.job_uuid, miner_hotkey=receipt.payload.miner_hotkey, validator_hotkey=receipt.payload.validator_hotkey, + miner_signature=receipt.miner_signature, + validator_signature=receipt.validator_signature, + timestamp=receipt.payload.timestamp, executor_class=receipt.payload.executor_class, - time_accepted=receipt.payload.time_accepted, max_timeout=receipt.payload.max_timeout, is_organic=receipt.payload.is_organic, + ttl=receipt.payload.ttl, ) for receipt in receipts if isinstance(receipt.payload, JobStartedReceiptPayload) and ( job_started_receipt_cutoff_time is None - or receipt.payload.time_accepted > job_started_receipt_cutoff_time + or receipt.payload.timestamp > job_started_receipt_cutoff_time ) ] logger.debug(f"Creating {len(job_started_receipt_to_create)} JobStartedReceipt. {hotkey=}") JobStartedReceipt.objects.bulk_create(job_started_receipt_to_create, ignore_conflicts=True) + latest_job_accepted_receipt = ( + JobAcceptedReceipt.objects.filter(miner_hotkey=hotkey).order_by("-timestamp").first() + ) + job_accepted_receipt_cutoff_time = ( + latest_job_accepted_receipt.timestamp - tolerance if latest_job_accepted_receipt else None + ) + job_accepted_receipt_to_create = [ + JobAcceptedReceipt( + job_uuid=receipt.payload.job_uuid, + miner_hotkey=receipt.payload.miner_hotkey, + validator_hotkey=receipt.payload.validator_hotkey, + miner_signature=receipt.miner_signature, + validator_signature=receipt.validator_signature, + timestamp=receipt.payload.timestamp, + time_accepted=receipt.payload.time_accepted, + ttl=receipt.payload.ttl, + ) + for receipt in receipts + if isinstance(receipt.payload, JobAcceptedReceiptPayload) + and ( + job_accepted_receipt_cutoff_time is None + or receipt.payload.timestamp > job_accepted_receipt_cutoff_time + ) + ] + logger.debug(f"Creating {len(job_accepted_receipt_to_create)} JobAcceptedReceipt. {hotkey=}") + JobAcceptedReceipt.objects.bulk_create(job_accepted_receipt_to_create, ignore_conflicts=True) + latest_job_finished_receipt = ( - JobFinishedReceipt.objects.filter(miner_hotkey=hotkey).order_by("-time_started").first() + JobFinishedReceipt.objects.filter(miner_hotkey=hotkey).order_by("-timestamp").first() ) job_finished_receipt_cutoff_time = ( - latest_job_finished_receipt.time_started - tolerance - if latest_job_finished_receipt - else None + latest_job_finished_receipt.timestamp - tolerance if latest_job_finished_receipt else None ) job_finished_receipt_to_create = [ JobFinishedReceipt( job_uuid=receipt.payload.job_uuid, miner_hotkey=receipt.payload.miner_hotkey, validator_hotkey=receipt.payload.validator_hotkey, + miner_signature=receipt.miner_signature, + validator_signature=receipt.validator_signature, + timestamp=receipt.payload.timestamp, time_started=receipt.payload.time_started, time_took_us=receipt.payload.time_took_us, score_str=receipt.payload.score_str, @@ -1069,7 +1101,7 @@ def fetch_receipts_from_miner(hotkey: str, ip: str, port: int): if isinstance(receipt.payload, JobFinishedReceiptPayload) and ( job_finished_receipt_cutoff_time is None - or receipt.payload.time_started > job_finished_receipt_cutoff_time + or receipt.payload.timestamp > job_finished_receipt_cutoff_time ) ] logger.debug(f"Creating {len(job_finished_receipt_to_create)} JobFinishedReceipt. {hotkey=}") @@ -1080,8 +1112,9 @@ def fetch_receipts_from_miner(hotkey: str, ip: str, port: int): def fetch_receipts(): """Fetch job receipts from the miners.""" # Delete old receipts before fetching new ones - JobStartedReceipt.objects.filter(time_accepted__lt=now() - timedelta(days=7)).delete() - JobFinishedReceipt.objects.filter(time_started__lt=now() - timedelta(days=7)).delete() + JobStartedReceipt.objects.filter(timestamp__lt=now() - timedelta(days=7)).delete() + JobAcceptedReceipt.objects.filter(timestamp__lt=now() - timedelta(days=7)).delete() + JobFinishedReceipt.objects.filter(timestamp__lt=now() - timedelta(days=7)).delete() metagraph = bittensor.metagraph( netuid=settings.BITTENSOR_NETUID, network=settings.BITTENSOR_NETWORK diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_miner_driver.py b/validator/app/src/compute_horde_validator/validator/tests/test_miner_driver.py index 6120d4d95..f92c7cb47 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_miner_driver.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_miner_driver.py @@ -11,8 +11,8 @@ V0JobFinishedRequest, ) from compute_horde.mv_protocol.validator_requests import ( + V0JobAcceptedReceiptRequest, V0JobFinishedReceiptRequest, - V0JobStartedReceiptRequest, ) from compute_horde_validator.validator.models import Miner @@ -37,7 +37,7 @@ "expected_job_status_updates", "organic_job_status", "dummy_job_factory", - "expected_job_started_receipt", + "expected_job_accepted_receipt", "expected_job_finished_receipt", ), [ @@ -62,7 +62,7 @@ ["accepted", "failed"], OrganicJob.Status.FAILED, get_dummy_job_request_v0, - False, + True, False, ), ( @@ -104,7 +104,7 @@ async def test_miner_driver( expected_job_status_updates, organic_job_status, dummy_job_factory, - expected_job_started_receipt, + expected_job_accepted_receipt, expected_job_finished_receipt, ): miner, _ = await Miner.objects.aget_or_create(hotkey="miner_client") @@ -163,7 +163,7 @@ async def track_job_status_updates(x): def condition(_): return True - if expected_job_started_receipt: - assert miner_client._query_sent_models(condition, V0JobStartedReceiptRequest) + if expected_job_accepted_receipt: + assert miner_client._query_sent_models(condition, V0JobAcceptedReceiptRequest) if expected_job_finished_receipt: assert miner_client._query_sent_models(condition, V0JobFinishedReceiptRequest) diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_receipts.py b/validator/app/src/compute_horde_validator/validator/tests/test_receipts.py index c1d6bb53d..9c0af975d 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_receipts.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_receipts.py @@ -1,14 +1,15 @@ import uuid +from datetime import UTC, datetime from typing import NamedTuple import pytest from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS -from compute_horde.mv_protocol.validator_requests import ( +from compute_horde.receipts.models import JobFinishedReceipt, JobStartedReceipt +from compute_horde.receipts.schemas import ( JobFinishedReceiptPayload, JobStartedReceiptPayload, + Receipt, ) -from compute_horde.receipts.models import JobFinishedReceipt, JobStartedReceipt -from compute_horde.receipts.schemas import Receipt from django.utils.timezone import now from compute_horde_validator.validator.models import ( @@ -46,10 +47,11 @@ def mocked_get_miner_receipts(hotkey: str, ip: str, port: int) -> list[Receipt]: job_uuid=str(uuid.uuid4()), miner_hotkey="5G9qWBzLPVVu2fCPPvg3QgPPK5JaJmJKaJha95TPHH9NZWuL", validator_hotkey="v1", + timestamp=datetime(2020, 1, 1, 0, 0, tzinfo=UTC), executor_class=DEFAULT_EXECUTOR_CLASS, - time_accepted=now(), max_timeout=30, is_organic=False, + ttl=5, ), validator_signature="0xv1", miner_signature="0xm1", @@ -62,6 +64,7 @@ def mocked_get_miner_receipts(hotkey: str, ip: str, port: int) -> list[Receipt]: job_uuid=str(uuid.uuid4()), miner_hotkey="5CPhGRp4cdEG4KSui7VQixHhvN5eBUSnMYeUF5thdxm4sKtz", validator_hotkey="v1", + timestamp=datetime(2020, 1, 1, 1, 0, tzinfo=UTC), time_started=now(), time_took_us=30_000_000, score_str="123.45",