diff --git a/compute_horde/compute_horde/base/output_upload.py b/compute_horde/compute_horde/base/output_upload.py index 4f33a70f9..0dc436400 100644 --- a/compute_horde/compute_horde/base/output_upload.py +++ b/compute_horde/compute_horde/base/output_upload.py @@ -83,6 +83,10 @@ class MultiUpload(pydantic.BaseModel): OutputUpload = Annotated[ - ZipAndHttpPostUpload | ZipAndHttpPutUpload | MultiUpload, + SingleFilePostUpload + | SingleFilePutUpload + | ZipAndHttpPostUpload + | ZipAndHttpPutUpload + | MultiUpload, Field(discriminator="output_upload_type"), ] diff --git a/compute_horde/compute_horde/executor_class.py b/compute_horde/compute_horde/executor_class.py index 2e7410587..ae2d83d20 100644 --- a/compute_horde/compute_horde/executor_class.py +++ b/compute_horde/compute_horde/executor_class.py @@ -6,6 +6,7 @@ class ExecutorClass(StrEnum): spin_up_4min__gpu_24gb = "spin_up-4min.gpu-24gb" always_on__gpu_24gb = "always_on.gpu-24gb" + always_on__llm__a6000 = "always_on.llm.a6000" # always_on__cpu_16c__ram_64gb = "always_on.cpu-16c.ram-64gb" # always_on__gpu_80gb = "always_on.gpu-80gb" # always_on__gpu_24gb__docker_cached_facilitator = "always_on.gpu-24gb.docker_cached-facilitator" @@ -39,6 +40,12 @@ class ExecutorClassSpec: gpu_vram_gb=24, spin_up_time=0, ), + ExecutorClass.always_on__llm__a6000: ExecutorClassSpec( + description="always on, NVIDIA RTX A6000 GPU machine for LLM prompts solving", + has_gpu=True, + gpu_vram_gb=48, + spin_up_time=int(timedelta(minutes=1).total_seconds()), + ), # ExecutorClass.always_on__cpu_16c__ram_64gb: ExecutorClassSpec( # cpu_cores=16, # ram_gb=64, diff --git a/executor/app/src/compute_horde_executor/executor/management/commands/run_executor.py b/executor/app/src/compute_horde_executor/executor/management/commands/run_executor.py index 842d71368..fbae57fb3 100644 --- a/executor/app/src/compute_horde_executor/executor/management/commands/run_executor.py +++ b/executor/app/src/compute_horde_executor/executor/management/commands/run_executor.py @@ -482,26 +482,26 @@ async def run_job(self, job_request: V0JobRequest): success = exit_status == 0 - # upload the output if requested - if job_request.output_upload: - try: - output_uploader = OutputUploader.for_upload_output(job_request.output_upload) - await output_uploader.upload(self.output_volume_mount_dir) - except OutputUploadFailed as ex: - logger.warning( - f"Uploading output failed for job {self.initial_job_request.job_uuid} with error: {ex!r}" - ) - success = False - stdout = ex.description - stderr = "" - - time_took = time.time() - t1 - if success: + # upload the output if requested and job succeeded + if job_request.output_upload: + try: + output_uploader = OutputUploader.for_upload_output(job_request.output_upload) + await output_uploader.upload(self.output_volume_mount_dir) + except OutputUploadFailed as ex: + logger.warning( + f"Uploading output failed for job {self.initial_job_request.job_uuid} with error: {ex!r}" + ) + success = False + stdout = ex.description + stderr = "" + + time_took = time.time() - t1 logger.info( f'Job "{self.initial_job_request.job_uuid}" finished successfully in {time_took:0.2f} seconds' ) else: + time_took = time.time() - t1 logger.error( f'"{" ".join(cmd)}" (job_uuid={self.initial_job_request.job_uuid})' f' failed after {time_took:0.2f} seconds with status={process.returncode}' diff --git a/validator/app/src/compute_horde_validator/celery.py b/validator/app/src/compute_horde_validator/celery.py index cff46a714..fb0aa1406 100644 --- a/validator/app/src/compute_horde_validator/celery.py +++ b/validator/app/src/compute_horde_validator/celery.py @@ -19,6 +19,10 @@ def route_task(name, args, kwargs, options, task=None, **kw): "compute_horde_validator.validator.tasks.fetch_receipts_from_miner", "compute_horde_validator.validator.tasks.send_events_to_facilitator", "compute_horde_validator.validator.tasks.fetch_dynamic_config", + # TODO: llm tasks should have dedicated workers, but just move them from default queue for now + "compute_horde_validator.validator.tasks.llm_prompt_generation", + "compute_horde_validator.validator.tasks.llm_prompt_sampling", + "compute_horde_validator.validator.tasks.llm_prompt_answering", } if name in worker_queue_names: return {"queue": "worker"} diff --git a/validator/app/src/compute_horde_validator/settings.py b/validator/app/src/compute_horde_validator/settings.py index cbfcb110f..523b412cd 100644 --- a/validator/app/src/compute_horde_validator/settings.py +++ b/validator/app/src/compute_horde_validator/settings.py @@ -43,6 +43,8 @@ def wrapped(*args, **kwargs): ENV = env("ENV", default="prod") +PROMPT_GENERATION_MODEL = env("PROMPT_GENERATION_MODEL", default="phi3") + DEFAULT_ADMIN_PASSWORD = env("DEFAULT_ADMIN_PASSWORD", default=None) DEFAULT_ADMIN_USERNAME = env("DEFAULT_ADMIN_USERNAME", default="admin") DEFAULT_ADMIN_EMAIL = env("DEFAULT_ADMIN_EMAIL", default="admin@admin.com") @@ -216,31 +218,61 @@ def wrapped(*args, **kwargs): "in seconds", int, ), - "DYNAMIC_NUMBER_OF_PROMPTS_TO_VALIDATE_FROM_SERIES": ( - 10, - "how many prompts to sample and validate from a series", + # llama params + "DYNAMIC_MAX_PROMPT_SERIES": ( + 3500, + "Maximum number of prompt series upon which the prompt generator will not be triggered", int, ), - "DYNAMIC_NUMBER_OF_WORKLOADS_TO_TRIGGER_LOCAL_INFERENCE": ( - 100, - "how many workloads are needed before running local inference", + "DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY": ( + 1536, # 256 * 2 * 3 - we allow 2 executors per miner and want queue for 3 synthetic job batches + "how many prompt samples to generate (should be larger than how many prompts series we use per synthetic run)", int, ), - "DYNAMIC_MAX_PROMPT_BATCHES": ( - 10000, - "Maximum number of prompt batches upon which the prompt generator will not be triggered", + "DYNAMIC_NUMBER_OF_PROMPTS_PER_WORKLOAD": ( + 240, + "how many prompts to answer in a single workload", int, ), - "DYNAMIC_PROMPTS_BATCHES_IN_A_SINGLE_GO": ( - 5, + # prompt generation params + "DYNAMIC_PROMPTS_SERIES_IN_A_SINGLE_GENERATION": ( + 25, "Number of batches that prompt generator will process in a single go", int, ), - "DYNAMIC_NUMBER_OF_PROMPTS_IN_BATCH": ( + "DYNAMIC_NUMBER_OF_PROMPTS_IN_SERIES": ( 240, - "Number of prompts to generate in a single batch", + "Number of prompts to generate in a single series", + int, + ), + # prompts answering params + "DYNAMIC_NUMBER_OF_PROMPTS_TO_SAMPLE_FROM_SERIES": ( + 1, + "how many prompts to sample and answer from a series", int, ), + "DYNAMIC_MINER_MAX_EXECUTORS_PER_CLASS": ( + "always_on.llm.a6000=2", + ( + "The maximum number of executor for an executor class that miners are allowed to have. " + "Executor classes not mentioned here have no limits. " + "The format should be: 'key1=value1,key2=value2', " + "where the keys are executor class enum values, and the values are integers. " + "Setting 0 will disable an executor class." + ), + str, + ), + "DYNAMIC_EXECUTOR_CLASS_WEIGHTS": ( + "spin_up-4min.gpu-24gb=99,always_on.llm.a6000=1", + ( + "Weights of executor classes that are used to normalize miners scores. " + "Executor classes not mentioned here are not taken into account when scoring. " + "The format should be: 'key1=value1,key2=value2', " + "where the keys are executor class enum values, and the values are floats, " + "but int values that sum up to 100 are encouraged" + ), + str, + ), } DYNAMIC_CONFIG_CACHE_TIMEOUT = 300 @@ -407,6 +439,21 @@ def wrapped(*args, **kwargs): "schedule": timedelta(minutes=5), "options": {}, }, + "llm_prompt_generation": { + "task": "compute_horde_validator.validator.tasks.llm_prompt_generation", + "schedule": timedelta(minutes=5), + "options": {}, + }, + "llm_prompt_sampling": { + "task": "compute_horde_validator.validator.tasks.llm_prompt_sampling", + "schedule": timedelta(minutes=30), + "options": {}, + }, + "llm_prompt_answering": { + "task": "compute_horde_validator.validator.tasks.llm_prompt_answering", + "schedule": timedelta(minutes=5), + "options": {}, + }, } if env.bool("DEBUG_RUN_BEAT_VERY_OFTEN", default=False): CELERY_BEAT_SCHEDULE["run_synthetic_jobs"]["schedule"] = crontab(minute="*") @@ -508,17 +555,6 @@ def wrapped(*args, **kwargs): DYNAMIC_CONFIG_ENV = env.str("DYNAMIC_CONFIG_ENV", default="prod") -# prompt gen sampling -DEBUG_OVERRIDE_DYNAMIC_NUMBER_OF_PROMPTS_IN_SERIES = env.int( - "DEBUG_OVERRIDE_DYNAMIC_NUMBER_OF_PROMPTS_IN_SERIES", default=None -) -DEBUG_OVERRIDE_DYNAMIC_NUMBER_OF_PROMPTS_TO_VALIDATE_FROM_SERIES = env.int( - "DEBUG_OVERRIDE_DYNAMIC_NUMBER_OF_PROMPTS_TO_VALIDATE_IN_BATCH", default=None -) -DEBUG_OVERRIDE_DYNAMIC_NUMBER_OF_WORKLOADS_TO_TRIGGER_LOCAL_INFERENCE = env.int( - "DEBUG_OVERRIDE_DYNAMIC_NUMBER_OF_WORKLOADS_TO_TRIGGER_LOCAL_INFERENCE", default=None -) - # synthetic jobs are evenly distributed through the cycle, however # we start them from some offset because scheduling takes some time SYNTHETIC_JOBS_RUN_OFFSET = env.int("SYNTHETIC_JOBS_RUN_OFFSET", default=24) @@ -542,9 +578,9 @@ def BITTENSOR_WALLET() -> bittensor.wallet: # Local miner generating prompts -GENERATION_MINER_KEY = env.str("GENERATION_MINER_KEY", default="") -GENERATION_MINER_ADDRESS = env.str("GENERATION_MINER_ADDRESS", default="") -GENERATION_MINER_PORT = env.int("GENERATION_MINER_PORT", default=0) +TRUSTED_MINER_KEY = env.str("TRUSTED_MINER_KEY", default="") +TRUSTED_MINER_ADDRESS = env.str("TRUSTED_MINER_ADDRESS", default="") +TRUSTED_MINER_PORT = env.int("TRUSTED_MINER_PORT", default=0) CHANNEL_LAYERS = { diff --git a/validator/app/src/compute_horde_validator/validator/cross_validation/generator/base.py b/validator/app/src/compute_horde_validator/validator/cross_validation/generator/base.py index ff5421e58..16e9c58f9 100644 --- a/validator/app/src/compute_horde_validator/validator/cross_validation/generator/base.py +++ b/validator/app/src/compute_horde_validator/validator/cross_validation/generator/base.py @@ -2,6 +2,7 @@ import uuid from compute_horde.base.volume import Volume +from compute_horde.executor_class import ExecutorClass from compute_horde.miner_client.organic import OrganicJobDetails @@ -28,8 +29,11 @@ def generator_version(self) -> int: ... @abc.abstractmethod def docker_image_name(self) -> str: ... + @abc.abstractmethod + def executor_class(self) -> ExecutorClass: ... + def docker_run_options_preset(self) -> str: - return "none" + return "nvidia_all" def docker_run_cmd(self) -> list[str]: return [] @@ -49,6 +53,7 @@ def output(self) -> str | None: def get_job_details(self) -> OrganicJobDetails: return OrganicJobDetails( job_uuid=str(self._uuid), + executor_class=self.executor_class(), docker_image=self.docker_image_name(), raw_script=self.raw_script(), docker_run_options_preset=self.docker_run_options_preset(), diff --git a/validator/app/src/compute_horde_validator/validator/cross_validation/generator/v0/__init__.py b/validator/app/src/compute_horde_validator/validator/cross_validation/generator/v0/__init__.py index f901e2005..af1bb907d 100644 --- a/validator/app/src/compute_horde_validator/validator/cross_validation/generator/v0/__init__.py +++ b/validator/app/src/compute_horde_validator/validator/cross_validation/generator/v0/__init__.py @@ -1,4 +1,6 @@ from compute_horde.base.output_upload import MultiUpload, SingleFilePutUpload +from compute_horde.executor_class import ExecutorClass +from django.conf import settings from ..base import BasePromptJobGenerator @@ -8,15 +10,22 @@ def generator_version(self) -> int: return 0 def timeout_seconds(self) -> int: - return 3600 + return 5 * 60 def docker_image_name(self) -> str: - return "backenddevelopersltd/compute-horde-prompt-gen:v0-latest" + return f"backenddevelopersltd/compute-horde-prompt-gen-{settings.PROMPT_GENERATION_MODEL}:v0-latest" + + def executor_class(self) -> ExecutorClass: + return ExecutorClass.always_on__llm__a6000 def docker_run_cmd(self) -> list[str]: return [ + "--quantize", "--model_name", - "phi3", + settings.PROMPT_GENERATION_MODEL, + "--batch_size=250", # on A6000 we want 240 prompts generated in single file, but not all results are valid + "--num_return_sequences=1", + "--max_new_tokens=40", # 40 new tokens is enough for reasonable length prompt - 30 caused too much cut off prompts "--number_of_prompts_per_batch", str(self.num_prompts_per_batch), "--uuids", diff --git a/validator/app/src/compute_horde_validator/validator/cross_validation/prompt_answering.py b/validator/app/src/compute_horde_validator/validator/cross_validation/prompt_answering.py new file mode 100644 index 000000000..feb8f9d4b --- /dev/null +++ b/validator/app/src/compute_horde_validator/validator/cross_validation/prompt_answering.py @@ -0,0 +1,118 @@ +import logging +import uuid +from datetime import datetime + +import bittensor +from asgiref.sync import sync_to_async +from compute_horde.executor_class import ExecutorClass +from compute_horde.miner_client.organic import ( + OrganicJobDetails, + OrganicMinerClient, + run_organic_job, +) +from django.conf import settings +from django.db import transaction +from django.utils.timezone import now + +from compute_horde_validator.validator.models import Prompt, SolveWorkload +from compute_horde_validator.validator.synthetic_jobs.generator.llm_prompts import ( + LlmPromptsJobGenerator, +) + +logger = logging.getLogger(__name__) + + +def _get_keypair() -> bittensor.Keypair: + return settings.BITTENSOR_WALLET().get_hotkey() + + +async def answer_prompts( + workload: SolveWorkload, + create_miner_client=OrganicMinerClient, + job_uuid: uuid.UUID | None = None, + wait_timeout: int | None = None, +) -> None: + if not all( + [ + settings.TRUSTED_MINER_KEY, + settings.TRUSTED_MINER_ADDRESS, + settings.TRUSTED_MINER_PORT, + ] + ): + logger.warning("Trusted generation miner not configured, skipping prompt generation") + return + + ts = datetime.now() + seed = workload.seed + + job_generator = LlmPromptsJobGenerator(workload.s3_url, seed) + await job_generator.ainit() + + # TODO: Should be generated for all the llm executor classes. + # SolveWorkload/PromptSample should have a executor_class field saying which + # executor_class this sample is for. + job_uuid = job_uuid or uuid.uuid4() + job_details = OrganicJobDetails( + job_uuid=str(job_uuid), + executor_class=ExecutorClass.always_on__llm__a6000, + docker_image=job_generator.docker_image_name(), + raw_script=job_generator.raw_script(), + docker_run_options_preset=job_generator.docker_run_options_preset(), + docker_run_cmd=job_generator.docker_run_cmd(), + total_job_timeout=job_generator.timeout_seconds(), + volume=await job_generator.volume(), + output=await job_generator.output_upload(), + ) + + wait_timeout = wait_timeout or job_generator.timeout_seconds() + + miner_client = create_miner_client( + miner_hotkey=settings.TRUSTED_MINER_KEY, + miner_address=settings.TRUSTED_MINER_ADDRESS, + miner_port=settings.TRUSTED_MINER_PORT, + job_uuid=str(job_uuid), + my_keypair=_get_keypair(), + ) + + try: + await run_organic_job(miner_client, job_details, wait_timeout=wait_timeout) + except Exception: + logger.error("Failed to run organic job", exc_info=True) + return + + try: + await job_generator._download_answers() + prompt_answers: dict[str, str] = job_generator.prompt_answers + except Exception: + logger.error("Failed to download prompt answers", exc_info=True) + return + + await sync_to_async(save_workload_answers)(workload, prompt_answers) + duration_seconds = (datetime.now() - ts).total_seconds() + logger.info(f"Workload {workload} finished in {duration_seconds} seconds") + + +def get_workload_prompts(workload: SolveWorkload) -> list[Prompt]: + return [ + x + for x in Prompt.objects.select_related("sample").filter( + sample__workload_id=workload.id, answer__isnull=True + ) + ] + + +def save_workload_answers(workload, prompt_answers): + prompts = get_workload_prompts(workload) + + with transaction.atomic(): + # update the workload as finished + workload.finished_at = now() + workload.save() + + # update the prompts with the answers + for prompt in prompts: + if prompt.content in prompt_answers: + prompt.answer = prompt_answers[prompt.content] + else: + logger.error(f"Prompt {prompt} was not found in the prompt answers generated") + Prompt.objects.bulk_update(prompts, ["answer"]) diff --git a/validator/app/src/compute_horde_validator/validator/cross_validation/prompt_generation.py b/validator/app/src/compute_horde_validator/validator/cross_validation/prompt_generation.py index 38b2b214d..29c51cdae 100644 --- a/validator/app/src/compute_horde_validator/validator/cross_validation/prompt_generation.py +++ b/validator/app/src/compute_horde_validator/validator/cross_validation/prompt_generation.py @@ -26,27 +26,18 @@ async def generate_prompts( ) -> None: if not all( [ - settings.GENERATION_MINER_KEY, - settings.GENERATION_MINER_ADDRESS, - settings.GENERATION_MINER_PORT, + settings.TRUSTED_MINER_KEY, + settings.TRUSTED_MINER_ADDRESS, + settings.TRUSTED_MINER_PORT, ] ): - logger.warning("Prompt generation miner not configured, skipping prompt generation") - return - - limit = await aget_config("DYNAMIC_MAX_PROMPT_BATCHES") - if current_count := await PromptSeries.objects.acount() >= limit: - logger.warning( - "There are %s series in the db exceeding the limit of %s, skipping prompt generation", - current_count, - limit, - ) + logger.warning("Trusted miner not configured, skipping prompt generation") return job_uuid = job_uuid or uuid.uuid4() - num_batches = await aget_config("DYNAMIC_PROMPTS_BATCHES_IN_A_SINGLE_GO") - num_prompts_per_batch = await aget_config("DYNAMIC_NUMBER_OF_PROMPTS_IN_BATCH") + num_batches = await aget_config("DYNAMIC_PROMPTS_SERIES_IN_A_SINGLE_GENERATION") + num_prompts_per_batch = await aget_config("DYNAMIC_NUMBER_OF_PROMPTS_IN_SERIES") series_uuids, upload_urls, public_urls = _generate_uuids_and_urls(num_batches) @@ -62,14 +53,18 @@ async def generate_prompts( wait_timeout = wait_timeout or job_generator.timeout_seconds() miner_client = create_miner_client( - miner_hotkey=settings.GENERATION_MINER_KEY, - miner_address=settings.GENERATION_MINER_ADDRESS, - miner_port=settings.GENERATION_MINER_PORT, + miner_hotkey=settings.TRUSTED_MINER_KEY, + miner_address=settings.TRUSTED_MINER_ADDRESS, + miner_port=settings.TRUSTED_MINER_PORT, job_uuid=str(job_uuid), my_keypair=_get_keypair(), ) - await run_organic_job(miner_client, job_details, wait_timeout=wait_timeout) + try: + await run_organic_job(miner_client, job_details, wait_timeout=wait_timeout) + except Exception: + logger.error("Failed to run organic job", exc_info=True) + return await _persist_series_list(series_uuids, public_urls, job_generator.generator_version()) diff --git a/validator/app/src/compute_horde_validator/validator/dynamic_config.py b/validator/app/src/compute_horde_validator/validator/dynamic_config.py index 07170935f..110947a18 100644 --- a/validator/app/src/compute_horde_validator/validator/dynamic_config.py +++ b/validator/app/src/compute_horde_validator/validator/dynamic_config.py @@ -1,8 +1,12 @@ import asyncio import time +from collections.abc import Callable +from contextlib import suppress +from typing import Any import constance.utils from asgiref.sync import sync_to_async +from compute_horde.executor_class import ExecutorClass from constance import config from django.conf import settings @@ -31,24 +35,6 @@ async def aget_config(key): return await dynamic_config_holder.get(key) -def get_number_of_prompts_in_series(): - if settings.DEBUG_OVERRIDE_DYNAMIC_NUMBER_OF_PROMPTS_IN_SERIES is not None: - return settings.DEBUG_OVERRIDE_DYNAMIC_NUMBER_OF_PROMPTS_IN_SERIES - return config.DYNAMIC_NUMBER_OF_PROMPTS_IN_SERIES - - -def get_number_of_prompts_to_validate_from_series(): - if settings.DEBUG_OVERRIDE_DYNAMIC_NUMBER_OF_PROMPTS_TO_VALIDATE_FROM_SERIES is not None: - return settings.DEBUG_OVERRIDE_DYNAMIC_NUMBER_OF_PROMPTS_TO_VALIDATE_FROM_SERIES - return config.DYNAMIC_NUMBER_OF_PROMPTS_TO_VALIDATE_FROM_SERIES - - -def get_number_of_workloads_to_trigger_local_inference(): - if settings.DEBUG_OVERRIDE_DYNAMIC_NUMBER_OF_WORKLOADS_TO_TRIGGER_LOCAL_INFERENCE is not None: - return settings.DEBUG_OVERRIDE_DYNAMIC_NUMBER_OF_WORKLOADS_TO_TRIGGER_LOCAL_INFERENCE - return config.DYNAMIC_NUMBER_OF_WORKLOADS_TO_TRIGGER_LOCAL_INFERENCE - - async def aget_weights_version(): if settings.DEBUG_OVERRIDE_WEIGHTS_VERSION is not None: return settings.DEBUG_OVERRIDE_WEIGHTS_VERSION @@ -60,3 +46,39 @@ def get_synthetic_jobs_flow_version(): if settings.DEBUG_OVERRIDE_SYNTHETIC_JOBS_FLOW_VERSION is not None: return settings.DEBUG_OVERRIDE_SYNTHETIC_JOBS_FLOW_VERSION return config.DYNAMIC_SYNTHETIC_JOBS_FLOW_VERSION + + +def executor_class_value_map_parser( + value_map_str: str, value_parser: Callable[[str], Any] | None = None +) -> dict[ExecutorClass, Any]: + result = {} + for pair in value_map_str.split(","): + # ignore errors for misconfiguration, i,e. non-existent executor classes, + # non-integer/negative counts etc. + with suppress(ValueError): + executor_class_str, value_str = pair.split("=") + executor_class = ExecutorClass(executor_class_str) + if value_parser is not None: + parsed_value = value_parser(value_str) + else: + parsed_value = value_str + result[executor_class] = parsed_value + return result + + +async def get_miner_max_executors_per_class() -> dict[ExecutorClass, int]: + miner_max_executors_per_class: str = await aget_config("DYNAMIC_MINER_MAX_EXECUTORS_PER_CLASS") + result = { + executor_class: count + for executor_class, count in executor_class_value_map_parser( + miner_max_executors_per_class, value_parser=int + ).items() + if count >= 0 + } + return result + + +def get_executor_class_weights() -> dict[ExecutorClass, float]: + return executor_class_value_map_parser( + config.DYNAMIC_EXECUTOR_CLASS_WEIGHTS, value_parser=float + ) diff --git a/validator/app/src/compute_horde_validator/validator/locks.py b/validator/app/src/compute_horde_validator/validator/locks.py index 6b2519644..36216223a 100644 --- a/validator/app/src/compute_horde_validator/validator/locks.py +++ b/validator/app/src/compute_horde_validator/validator/locks.py @@ -4,6 +4,7 @@ class LockType: WEIGHT_SETTING = 1 VALIDATION_SCHEDULING = 2 + TRUSTED_MINER_LOCK = 3 class Locked(Exception): diff --git a/validator/app/src/compute_horde_validator/validator/management/commands/debug_run_llm_prompt_task.py b/validator/app/src/compute_horde_validator/validator/management/commands/debug_run_llm_prompt_task.py new file mode 100644 index 000000000..c733c81c5 --- /dev/null +++ b/validator/app/src/compute_horde_validator/validator/management/commands/debug_run_llm_prompt_task.py @@ -0,0 +1,32 @@ +import logging +import sys + +from django.core.management.base import BaseCommand + +from compute_horde_validator.validator.tasks import ( + llm_prompt_answering, + llm_prompt_generation, + llm_prompt_sampling, +) + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + def add_arguments(self, parser): + parser.add_argument( + "--action", type=str, help="generation | sampling | answering", required=True + ) + + def handle(self, *args, **options): + action = options["action"] + logger.info(f"Running LLM prompt task with action: {action}") + if action == "generation": + llm_prompt_generation() + elif action == "sampling": + llm_prompt_sampling() + elif action == "answering": + llm_prompt_answering() + else: + logger.warning("Invalid action") + sys.exit(1) diff --git a/validator/app/src/compute_horde_validator/validator/models.py b/validator/app/src/compute_horde_validator/validator/models.py index 33544cbee..f720fa6d2 100644 --- a/validator/app/src/compute_horde_validator/validator/models.py +++ b/validator/app/src/compute_horde_validator/validator/models.py @@ -325,7 +325,7 @@ class SolveWorkload(models.Model): finished_at = models.DateTimeField(null=True, default=None, db_index=True) def __str__(self): - return f"uuid: {self.batch_uuid} - synthetic_job_batch: {self.synthetic_job_batch} - seed: {self.seed}" + return f"uuid: {self.workload_uuid} - seed: {self.seed}" class PromptSample(models.Model): diff --git a/validator/app/src/compute_horde_validator/validator/s3.py b/validator/app/src/compute_horde_validator/validator/s3.py index 0bbbd3858..527c8c5b5 100644 --- a/validator/app/src/compute_horde_validator/validator/s3.py +++ b/validator/app/src/compute_horde_validator/validator/s3.py @@ -1,8 +1,8 @@ import functools import logging -from collections.abc import Generator import boto3 +import httpx import requests from django.conf import settings @@ -45,9 +45,25 @@ def get_public_url(key: str, *, bucket_name: str, prefix: str = "") -> str: return f"{endpoint_url}/{bucket_name}/{prefix}{key}" -def get_prompts_from_s3_url(s3_url: str) -> Generator[tuple[str, list[str]]]: +# TODO: retries etc +def upload_prompts_to_s3_url(s3_url: str, content: str) -> bool: + response = requests.put(s3_url, data=content) + if response.status_code != 200: + logger.warning(f"Failed to upload prompts to {s3_url}") + return False + return True + + +def download_prompts_from_s3_url(s3_url: str) -> list[str]: response = requests.get(s3_url) if response.status_code != 200: logger.warning(f"Failed to download prompts from {s3_url}") return [] - return response.text.split("\n") + return response.text.splitlines() + + +async def download_file_content(s3_url: str) -> bytes: + async with httpx.AsyncClient() as client: + response = await client.get(s3_url, timeout=5) + response.raise_for_status() + return response.content diff --git a/validator/app/src/compute_horde_validator/validator/scoring.py b/validator/app/src/compute_horde_validator/validator/scoring.py index 5c3b6df56..627f2280b 100644 --- a/validator/app/src/compute_horde_validator/validator/scoring.py +++ b/validator/app/src/compute_horde_validator/validator/scoring.py @@ -6,12 +6,9 @@ from compute_horde.executor_class import ExecutorClass from django.conf import settings -logger = logging.getLogger(__name__) - +from .dynamic_config import get_executor_class_weights -EXECUTOR_CLASS_WEIGHTS = { - ExecutorClass.spin_up_4min__gpu_24gb: 100, -} +logger = logging.getLogger(__name__) def normalize(scores, weight=1): @@ -64,9 +61,10 @@ def score_jobs(jobs, score_aggregation=sum, normalization_weight=1): def score_batch(batch): + executor_class_weights = get_executor_class_weights() executor_class_jobs = defaultdict(list) for job in batch.synthetic_jobs.all(): - if job.executor_class in EXECUTOR_CLASS_WEIGHTS: + if job.executor_class in executor_class_weights: executor_class_jobs[job.executor_class].append(job) parametriezed_horde_score = partial( @@ -80,7 +78,7 @@ def score_batch(batch): ) batch_scores = defaultdict(float) for executor_class, jobs in executor_class_jobs.items(): - executor_class_weight = EXECUTOR_CLASS_WEIGHTS[executor_class] + executor_class_weight = executor_class_weights[executor_class] if executor_class == ExecutorClass.spin_up_4min__gpu_24gb: score_aggregation = parametriezed_horde_score else: diff --git a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py index 9eecba3a0..be87fad6e 100644 --- a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py +++ b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py @@ -13,7 +13,8 @@ import bittensor from asgiref.sync import sync_to_async from channels.layers import get_channel_layer -from compute_horde.base.volume import InlineVolume +from compute_horde.base.output_upload import OutputUpload +from compute_horde.base.volume import Volume from compute_horde.base_requests import BaseRequest from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS, EXECUTOR_CLASS, ExecutorClass from compute_horde.miner_client.base import ( @@ -52,11 +53,13 @@ from django.db import transaction from pydantic import BaseModel +from compute_horde_validator.validator.dynamic_config import get_miner_max_executors_per_class from compute_horde_validator.validator.models import ( JobFinishedReceipt, JobStartedReceipt, Miner, MinerManifest, + PromptSample, SyntheticJob, SyntheticJobBatch, SystemEvent, @@ -231,7 +234,8 @@ class Job: miner_hotkey: str executor_class: ExecutorClass job_generator: BaseSyntheticJobGenerator - volume_contents: str + volume: Volume | None + output_upload: OutputUpload | None # responses @@ -348,7 +352,6 @@ def emit_telemetry_event(self) -> SystemEvent | None: docker_image_name=self.job_generator.docker_image_name(), docker_run_options_preset=self.job_generator.docker_run_options_preset(), timeout_seconds=self.job_generator.timeout_seconds(), - volume_contents_size=len(self.volume_contents), exception=repr(self.exception) if self.exception is not None else None, exception_time=_datetime_dump(self.exception_time), exception_stage=self.exception_stage, @@ -649,7 +652,7 @@ def _init_context( def _get_max_spin_up_time(ctx: BatchContext) -> int: - max_spin_up_time = 0 + max_spin_up_time = _MIN_SPIN_UP_TIME for executors in ctx.executors.values(): for executor_class in executors.keys(): spin_up_time = EXECUTOR_CLASS[executor_class].spin_up_time @@ -761,16 +764,68 @@ async def _close_client(ctx: BatchContext, miner_hotkey: str) -> None: await client.close() +async def get_llm_prompt_samples(ctx: BatchContext) -> list[PromptSample] | None: + # TODO: refactor into nicer abstraction + llm_executor_count = sum( + count + for executors in ctx.executors.values() + for executor_class, count in executors.items() + if executor_class == ExecutorClass.always_on__llm__a6000 + ) + prompt_samples = ( + PromptSample.objects.select_related("series", "workload") + .prefetch_related("prompts") + .filter( + synthetic_job__isnull=True, + workload__finished_at__isnull=False, + )[:llm_executor_count] + ) + prompt_samples = [ps async for ps in prompt_samples] + if len(prompt_samples) < llm_executor_count: + logger.warning( + "Not enough prompt samples for llm executors: %d < %d - will NOT run llm synthetic prompt jobs", + len(prompt_samples), + llm_executor_count, + ) + return None + return prompt_samples + + async def _generate_jobs(ctx: BatchContext) -> None: start_time = time.time() generated_job_count = 0 + prompt_samples = await get_llm_prompt_samples(ctx) + prompt_samples_iter = iter(prompt_samples) if prompt_samples is not None else None + for hotkey, executors in ctx.executors.items(): miner_name = ctx.names[hotkey] for executor_class, count in executors.items(): job_generators = [] for _ in range(count): - job_generator = await current.synthetic_job_generator_factory.create(executor_class) + kwargs = {} + if executor_class == ExecutorClass.always_on__llm__a6000: + if prompt_samples_iter is None: + logger.warning("No llm prompt samples available, skipping llm job") + continue + prompt_sample = next(prompt_samples_iter, None) + if prompt_sample is None: + # it means that there is some bug - we want to see it in sentry + # and continue, so other executor classes are not affected + logger.error( + "Dried prompt_samples_iter, this should not happen, skipping llm job" + ) + continue + kwargs = { + "prompt_sample": prompt_sample, + "expected_prompts": list(prompt_sample.prompts.all()), + "s3_url": prompt_sample.series.s3_url, + "seed": prompt_sample.workload.seed, + } + + job_generator = await current.synthetic_job_generator_factory.create( + executor_class, **kwargs + ) await job_generator.ainit() job_uuid = str(job_generator.uuid()) ctx.jobs[job_uuid] = Job( @@ -780,7 +835,8 @@ async def _generate_jobs(ctx: BatchContext) -> None: miner_hotkey=hotkey, executor_class=executor_class, job_generator=job_generator, - volume_contents=await job_generator.volume_contents(), + volume=await job_generator.volume(), + output_upload=await job_generator.output_upload(), ) ctx.job_uuids.append(job_uuid) job_generators.append(job_generator) @@ -865,8 +921,8 @@ async def _send_job_request( docker_run_options_preset=job.job_generator.docker_run_options_preset(), docker_run_cmd=job.job_generator.docker_run_cmd(), raw_script=job.job_generator.raw_script(), - volume=InlineVolume(contents=job.volume_contents), - output_upload=None, + volume=job.volume, + output_upload=job.output_upload, ) request_json = request.model_dump_json() @@ -1009,6 +1065,24 @@ async def _multi_get_miner_manifest(ctx: BatchContext) -> None: assert result is None +async def _adjust_miner_max_executors_per_class(ctx: BatchContext) -> None: + max_executors_per_class = await get_miner_max_executors_per_class() + for hotkey, executors in ctx.executors.items(): + for executor_class, count in executors.items(): + if executor_class not in max_executors_per_class: + continue + if count > max_executors_per_class[executor_class]: + logger.warning( + "%s manifest for executor class %s has more count (%s) than the max limit (%s), capping at limit", + ctx.names[hotkey], + executor_class, + count, + max_executors_per_class[executor_class], + ) + ctx.executors[hotkey][executor_class] = max_executors_per_class[executor_class] + # TODO: add a system event? + + async def _multi_close_client(ctx: BatchContext) -> None: tasks = [ asyncio.create_task( @@ -1221,7 +1295,28 @@ async def _score_job(ctx: BatchContext, job: Job) -> None: ) +async def _download_llm_prompts_answers(ctx: BatchContext) -> None: + tasks = [ + asyncio.create_task(job.job_generator._download_answers()) + for job in ctx.jobs.values() + if job.executor_class == ExecutorClass.always_on__llm__a6000 + and job.job_response is not None + and isinstance(job.job_response, V0JobFinishedRequest) + ] + results = await asyncio.gather(*tasks, return_exceptions=True) + for i, result in enumerate(results): + if isinstance(result, BaseException): + hotkey = ctx.hotkeys[i] + name = ctx.names[hotkey] + logger.warning("%s failed to get llm prompt answers: %r", name, result) + else: + assert result is None + + async def _score_jobs(ctx: BatchContext) -> None: + # NOTE: download the answers for llm prompts jobs before scoring + await _download_llm_prompts_answers(ctx) + for job in ctx.jobs.values(): try: await _score_job(ctx, job) @@ -1339,7 +1434,7 @@ def _db_persist(ctx: BatchContext) -> None: score=job.score, ) synthetic_jobs.append(synthetic_job) - SyntheticJob.objects.bulk_create(synthetic_jobs) + synthetic_jobs = SyntheticJob.objects.bulk_create(synthetic_jobs) miner_manifests: list[MinerManifest] = [] for miner in ctx.miners.values(): @@ -1355,6 +1450,20 @@ def _db_persist(ctx: BatchContext) -> None: ) MinerManifest.objects.bulk_create(miner_manifests) + # TODO: refactor into nicer abstraction + synthetic_jobs_map: dict[str, SyntheticJob] = { + synthetic_job.job_uuid: synthetic_job for synthetic_job in synthetic_jobs + } + prompt_samples: list[PromptSample] = [] + + for job in ctx.jobs.values(): + if job.executor_class != ExecutorClass.always_on__llm__a6000: + continue + prompt_sample = job.job_generator.prompt_sample + prompt_sample.synthetic_job = synthetic_jobs_map.get(job.uuid) + prompt_samples.append(prompt_sample) + PromptSample.objects.bulk_update(prompt_samples, fields=["synthetic_job"]) + job_started_receipts: list[JobStartedReceipt] = [] for job in ctx.jobs.values(): if job.job_started_receipt is not None: @@ -1416,6 +1525,7 @@ async def execute_synthetic_batch_run( await ctx.checkpoint_system_event("_multi_get_miner_manifest") await _multi_get_miner_manifest(ctx) + await _adjust_miner_max_executors_per_class(ctx) await ctx.checkpoint_system_event("_get_total_executor_count") total_executor_count = _get_total_executor_count(ctx) diff --git a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/base.py b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/base.py index b67c3c1e1..6996b5454 100644 --- a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/base.py +++ b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/base.py @@ -1,12 +1,14 @@ import abc import uuid +from compute_horde.base.output_upload import OutputUpload +from compute_horde.base.volume import Volume from compute_horde.executor_class import ExecutorClass from compute_horde.mv_protocol.miner_requests import V0JobFinishedRequest class BaseSyntheticJobGenerator(abc.ABC): - def __init__(self): + def __init__(self, **kwargs): self._uuid = uuid.uuid4() def __repr__(self): @@ -37,7 +39,10 @@ def raw_script(self) -> str | None: return None @abc.abstractmethod - async def volume_contents(self) -> str: ... + async def volume(self) -> Volume | None: ... + + async def output_upload(self) -> OutputUpload | None: + return None @abc.abstractmethod def verify(self, msg: V0JobFinishedRequest, time_took: float) -> tuple[bool, str, float]: ... @@ -48,4 +53,6 @@ def job_description(self) -> str: ... class BaseSyntheticJobGeneratorFactory(abc.ABC): @abc.abstractmethod - async def create(self, executor_class: ExecutorClass) -> BaseSyntheticJobGenerator: ... + async def create( + self, executor_class: ExecutorClass, **kwargs + ) -> BaseSyntheticJobGenerator: ... diff --git a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/factory.py b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/factory.py index ac01f0fef..deb868629 100644 --- a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/factory.py +++ b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/factory.py @@ -7,8 +7,13 @@ from compute_horde_validator.validator.synthetic_jobs.generator.gpu_hashcat import ( GPUHashcatSyntheticJobGenerator, ) +from compute_horde_validator.validator.synthetic_jobs.generator.llm_prompts import ( + LlmPromptsSyntheticJobGenerator, +) class DefaultSyntheticJobGeneratorFactory(BaseSyntheticJobGeneratorFactory): - async def create(self, executor_class: ExecutorClass) -> BaseSyntheticJobGenerator: - return GPUHashcatSyntheticJobGenerator() + async def create(self, executor_class: ExecutorClass, **kwargs) -> BaseSyntheticJobGenerator: + if executor_class == ExecutorClass.always_on__llm__a6000: + return LlmPromptsSyntheticJobGenerator(**kwargs) + return GPUHashcatSyntheticJobGenerator(**kwargs) diff --git a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/gpu_hashcat.py b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/gpu_hashcat.py index 54ef75586..a7ce318bd 100644 --- a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/gpu_hashcat.py +++ b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/gpu_hashcat.py @@ -1,4 +1,5 @@ from asgiref.sync import sync_to_async +from compute_horde.base.volume import InlineVolume, Volume from compute_horde.mv_protocol.miner_requests import V0JobFinishedRequest from compute_horde_validator.validator.dynamic_config import aget_weights_version @@ -17,8 +18,8 @@ class GPUHashcatSyntheticJobGenerator(BaseSyntheticJobGenerator): - def __init__(self): - super().__init__() + def __init__(self, **kwargs): + super().__init__(**kwargs) # set synthetic_jobs based on subnet weights_version self.weights_version = None self.hash_job = None @@ -73,8 +74,8 @@ def raw_script(self) -> str | None: return self.hash_job.raw_script() @sync_to_async(thread_sensitive=False) - def volume_contents(self) -> str: - return single_file_zip("payload.txt", self.hash_job.payload) + def volume(self) -> Volume | None: + return InlineVolume(contents=single_file_zip("payload.txt", self.hash_job.payload)) def score(self, time_took: float) -> float: if self.weights_version == 0: diff --git a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/llm_prompts.py b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/llm_prompts.py new file mode 100644 index 000000000..0a29c382c --- /dev/null +++ b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/llm_prompts.py @@ -0,0 +1,134 @@ +import uuid + +import pydantic +from compute_horde.base.output_upload import MultiUpload, OutputUpload, SingleFilePutUpload +from compute_horde.base.volume import MultiVolume, SingleFileVolume, Volume +from compute_horde.mv_protocol.miner_requests import V0JobFinishedRequest +from django.conf import settings + +from compute_horde_validator.validator.models import Prompt, PromptSample +from compute_horde_validator.validator.s3 import ( + download_file_content, + generate_upload_url, + get_public_url, +) + +from .base import BaseSyntheticJobGenerator + + +class LlmPromptsJobGenerator(BaseSyntheticJobGenerator): + def __init__( + self, + s3_url: str, + seed: int, + **kwargs, + ): + super().__init__(**kwargs) + self.seed = seed + self.s3_url = s3_url + file_uuid = str(uuid.uuid4()) + self.input_filename = file_uuid + ".txt" + self.s3_output_key = file_uuid + ".json" + self.s3_output_prefix = "solved/" + self.s3_output_bucket = settings.S3_BUCKET_NAME_ANSWERS + + self.prompt_answers: dict[str, str] = {} + + def _url_for_upload(self) -> str: + return generate_upload_url( + self.s3_output_key, + bucket_name=self.s3_output_bucket, + prefix=self.s3_output_prefix, + ) + + def _url_for_download(self) -> str: + return get_public_url( + key=self.s3_output_key, + bucket_name=self.s3_output_bucket, + prefix=self.s3_output_prefix, + ) + + def timeout_seconds(self) -> int: + return 48 # it takes around 42s - we add 15% buffer + + def base_docker_image_name(self) -> str: + return "docker.io/backenddevelopersltd/compute-horde-prompt-solver:v0-latest" + + def docker_image_name(self) -> str: + return "docker.io/backenddevelopersltd/compute-horde-prompt-solver:v0-latest" + + def docker_run_options_preset(self) -> str: + return "nvidia_all" + + def docker_run_cmd(self) -> list[str]: + return [ + "--temperature=0.5", + "--top-p=0.8", + "--max-tokens=256", + "--seed", + str(self.seed), + f"/volume/{self.input_filename}", + ] + + async def volume(self) -> Volume | None: + return MultiVolume( + volumes=[ + SingleFileVolume( + url=self.s3_url, + relative_path=self.input_filename, + ), + ] + ) + + async def output_upload(self) -> OutputUpload | None: + return MultiUpload( + uploads=[ + SingleFilePutUpload( + url=self._url_for_upload(), + relative_path=self.s3_output_key, + ), + ] + ) + + async def _download_answers(self): + response = await download_file_content(self._url_for_download()) + self.prompt_answers = pydantic.TypeAdapter(dict[str, str]).validate_json(response) + + def verify(self, msg: V0JobFinishedRequest, time_took: float) -> tuple[bool, str, float]: + # just check if there are any answers + if self.prompt_answers == {}: + return False, "no answers", 0.0 + return True, "answers exist", 1.0 + + def job_description(self) -> str: + return "LLM prompts job" + + +class LlmPromptsSyntheticJobGenerator(LlmPromptsJobGenerator): + def __init__( + self, + prompt_sample: PromptSample, + expected_prompts: list[Prompt], + s3_url: str, + seed: int, + **kwargs, + ): + super().__init__( + s3_url=s3_url, + seed=seed, + **kwargs, + ) + self.prompt_sample: PromptSample = prompt_sample + self.expected_prompts: list[Prompt] = expected_prompts + + def verify(self, msg: V0JobFinishedRequest, time_took: float) -> tuple[bool, str, float]: + for expected_prompt in self.expected_prompts: + if expected_prompt.content not in self.prompt_answers: + return False, "result does not contain all answers", 0.0 + if expected_prompt.answer != self.prompt_answers[expected_prompt.content]: + return False, "results does not match expected answers", 0.0 + + return True, "", 1.0 + + def job_description(self) -> str: + return "LLM prompts synthetic job" diff --git a/validator/app/src/compute_horde_validator/validator/tasks.py b/validator/app/src/compute_horde_validator/validator/tasks.py index 16fca518d..ce5ca15c5 100644 --- a/validator/app/src/compute_horde_validator/validator/tasks.py +++ b/validator/app/src/compute_horde_validator/validator/tasks.py @@ -32,10 +32,8 @@ from django.utils.timezone import now from compute_horde_validator.celery import app -from compute_horde_validator.validator.dynamic_config import ( - get_number_of_prompts_to_validate_from_series, - get_number_of_workloads_to_trigger_local_inference, -) +from compute_horde_validator.validator.cross_validation.prompt_answering import answer_prompts +from compute_horde_validator.validator.cross_validation.prompt_generation import generate_prompts from compute_horde_validator.validator.locks import Locked, LockType, get_advisory_lock from compute_horde_validator.validator.metagraph_client import get_miner_axon_info from compute_horde_validator.validator.models import ( @@ -53,7 +51,12 @@ ) from compute_horde_validator.validator.organic_jobs.miner_client import MinerClient from compute_horde_validator.validator.organic_jobs.miner_driver import execute_organic_job -from compute_horde_validator.validator.s3 import generate_upload_url, get_prompts_from_s3_url +from compute_horde_validator.validator.s3 import ( + download_prompts_from_s3_url, + generate_upload_url, + get_public_url, + upload_prompts_to_s3_url, +) from compute_horde_validator.validator.synthetic_jobs.batch_run import ( SYNTHETIC_JOBS_HARD_LIMIT, SYNTHETIC_JOBS_SOFT_LIMIT, @@ -68,6 +71,7 @@ logger = get_task_logger(__name__) JOB_WINDOW = 2 * 60 * 60 +MAX_SEED = (1 << 32) - 1 SCORING_ALGO_VERSION = 2 @@ -1129,63 +1133,181 @@ def fetch_dynamic_config() -> None: ) -def create_workload(seed: int): - # generate an s3 url to upload sample batch job result in +@app.task( + soft_time_limit=4 * 60 + 50, + time_limit=5 * 60, +) +def llm_prompt_generation(): + unprocessed_workloads = SolveWorkload.objects.filter(finished_at__isnull=True).count() + if unprocessed_workloads > 0: + # prevent any starvation issues + logger.info("Uprocessed workloads found - skipping prompt generation") + return + + num_expected_prompt_series = config.DYNAMIC_MAX_PROMPT_SERIES + num_prompt_series = PromptSeries.objects.count() + + if num_prompt_series >= num_expected_prompt_series: + logger.warning( + "There are %s series in the db - skipping prompt generation", + num_prompt_series, + ) + return + + logger.info("There are %s series in the db, generating prompts", num_prompt_series) + + with transaction.atomic(): + try: + get_advisory_lock(LockType.TRUSTED_MINER_LOCK) + except Locked: + logger.debug("Another thread already using the trusted miner") + return + + async_to_sync(generate_prompts)() + + +@app.task( + soft_time_limit=4 * 60 + 50, + time_limit=5 * 60, +) +def llm_prompt_answering(): + unprocessed_workloads = SolveWorkload.objects.filter(finished_at__isnull=True) + + times = [] + for workload in unprocessed_workloads: + start = time.time() + with transaction.atomic(): + try: + get_advisory_lock(LockType.TRUSTED_MINER_LOCK) + except Locked: + logger.debug("Another thread already using the trusted miner") + return + + async_to_sync(answer_prompts)(workload) + times.append(time.time() - start) + total_time = sum(times) + avg_time = total_time / len(times) + if total_time + avg_time > 4 * 60 + 20: + return + + +def init_workload(seed: int) -> tuple[SolveWorkload, str]: workload_uuid = uuid.uuid4() - s3_url = generate_upload_url(key=workload_uuid, bucket_name=settings.S3_BUCKET_NAME_ANSWERS) - return SolveWorkload.objects.create(workload_uuid=workload_uuid, seed=seed, s3_url=s3_url) + # generate an s3 url to upload workload prompts to + s3_upload_url = generate_upload_url( + key=str(workload_uuid), bucket_name=settings.S3_BUCKET_NAME_ANSWERS + ) + # generate an s3 url to download workload prompts to be answered + s3_url = get_public_url( + key=str(workload_uuid), + bucket_name=settings.S3_BUCKET_NAME_ANSWERS, + ) + return SolveWorkload(workload_uuid=workload_uuid, seed=seed, s3_url=s3_url), s3_upload_url @app.task() -def create_sample_workloads(): - total_workloads_needed = get_number_of_workloads_to_trigger_local_inference() - prompts_per_sample = get_number_of_prompts_to_validate_from_series() +def llm_prompt_sampling(): + # generate new prompt samples if needed + + num_prompt_series = PromptSeries.objects.count() + required_series_to_start_sampling = min( + config.DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY * 2, config.DYNAMIC_MAX_PROMPT_SERIES + ) + if num_prompt_series < required_series_to_start_sampling: + logger.warning( + "There are %s series in the db - expected %s for start sampling - skipping prompt sampling", + num_prompt_series, + required_series_to_start_sampling, + ) + return + num_unused_prompt_samples = PromptSample.objects.filter(synthetic_job__isnull=True).count() + num_needed_prompt_samples = ( + config.DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY - num_unused_prompt_samples + ) + + if num_needed_prompt_samples > 0: + logger.info( + "There are %s prompt samples in the db, generating more", + num_unused_prompt_samples, + ) + create_sample_workloads(num_needed_prompt_samples) + return + else: + logger.warning( + "There are %s prompt samples - skipping prompt sampling", + num_unused_prompt_samples, + ) + + +def persist_workload( + workload: SolveWorkload, prompt_samples: list[PromptSample], prompts: list[Prompt] +): + logger.info(f"Saving workload {workload}") + # save the sampled prompts as unanswered in the db + with transaction.atomic(): + workload.save() + PromptSample.objects.bulk_create(prompt_samples) + Prompt.objects.bulk_create(prompts) + + +def create_sample_workloads(num_needed_prompt_samples): + prompts_per_sample = config.DYNAMIC_NUMBER_OF_PROMPTS_TO_SAMPLE_FROM_SERIES + prompts_per_workload = config.DYNAMIC_NUMBER_OF_PROMPTS_PER_WORKLOAD # set seed for the current synthetic jobs run - seed = random.randint(0, 1000000) + seed = random.randint(0, MAX_SEED) # workload we are currently sampling for - current_workload = create_workload(seed) + try: + current_workload, current_upload_url = init_workload(seed) + except Exception as e: + logger.error(f"Failed to create new workload: {e} - aborting prompt sampling") + return - # how many prompts we have sampled for current_workload so far - current_workload_fill = 0 + # how many prompts series we sampled so far + # for each prompt series there is one prompt sample + num_prompt_series_sampled = 0 - # how many workloads we have finished (have enough samples) - workloads_done = 0 + current_prompt_samples = [] + current_prompts = [] # assume we have sufficient prompt series in the db to make all the prompt_samples needed # take a random order of prompt series to avoid using the same series at each synthetic jobs run for prompt_series in PromptSeries.objects.order_by("?").all(): - if current_workload_fill >= prompts_per_sample: - current_workload = create_workload(seed) - current_workload_fill = 0 - workloads_done += 1 - - if workloads_done >= total_workloads_needed: - break - # get all prompts - lines = get_prompts_from_s3_url(prompt_series.uuid, prompt_series.s3_url) + lines = download_prompts_from_s3_url(prompt_series.s3_url) # should always have enough prompts if len(lines) <= prompts_per_sample: - logger.error("Skipping bucket %s, not enough prompts", prompt_series.s3_url) + logger.error(f"Skipping bucket {prompt_series.s3_url}, not enough prompts") continue # sample prompts sampled_lines = random.sample(lines, prompts_per_sample) - current_workload_fill += len(sampled_lines) - with transaction.atomic(): - prompt_sample = PromptSample.objects.create( - series=prompt_series, workload=current_workload - ) + prompt_sample = PromptSample(series=prompt_series, workload=current_workload) + current_prompt_samples += [prompt_sample] + current_prompts += [Prompt(sample=prompt_sample, content=line) for line in sampled_lines] - # save the sampled prompts as unanswered in the db - Prompt.objects.bulk_create( - [Prompt(sample=prompt_sample, content=line) for line in sampled_lines] - ) + if len(current_prompts) >= prompts_per_workload: + content = "\n".join([p.content for p in current_prompts]) + if upload_prompts_to_s3_url(current_upload_url, content): + # save the workload in the db + persist_workload(current_workload, current_prompt_samples, current_prompts) + num_prompt_series_sampled += len(current_prompt_samples) + else: + logger.error(f"Failed to create workload {current_workload} - skipping") - # delete remaining empty workload - if current_workload_fill == 0: - current_workload.delete() + # finished creating all needed prompt samples so exit after last batch is filled + if num_prompt_series_sampled >= num_needed_prompt_samples: + logger.info(f"Created {num_prompt_series_sampled} new prompt samples") + break + + # reset for next workload + current_prompt_samples = [] + current_prompts = [] + try: + current_workload, current_upload_url = init_workload(seed) + except Exception as e: + logger.error(f"Failed to create new workload: {e} - aborting prompt sampling") + continue diff --git a/validator/app/src/compute_horde_validator/validator/tests/settings.py b/validator/app/src/compute_horde_validator/validator/tests/settings.py index 6c25dd9b4..6fc34d852 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/settings.py +++ b/validator/app/src/compute_horde_validator/validator/tests/settings.py @@ -45,6 +45,6 @@ def BITTENSOR_WALLET() -> bittensor.wallet: DYNAMIC_CONFIG_CACHE_TIMEOUT = 0 -GENERATION_MINER_KEY = "fake_generation_miner_key" -GENERATION_MINER_ADDRESS = "fakehost" -GENERATION_MINER_PORT = 1234 +TRUSTED_MINER_KEY = "fake_generation_miner_key" +TRUSTED_MINER_ADDRESS = "fakehost" +TRUSTED_MINER_PORT = 1234 diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_cross_validation/conftest.py b/validator/app/src/compute_horde_validator/validator/tests/test_cross_validation/conftest.py new file mode 100644 index 000000000..535676acb --- /dev/null +++ b/validator/app/src/compute_horde_validator/validator/tests/test_cross_validation/conftest.py @@ -0,0 +1,67 @@ +import uuid + +import pytest +import pytest_asyncio +from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS +from compute_horde.miner_client.organic import OrganicMinerClient +from compute_horde.mv_protocol import miner_requests + +from compute_horde_validator.validator.tests.transport import MinerSimulationTransport + + +@pytest_asyncio.fixture +async def transport(): + return MinerSimulationTransport("miner_hotkey") + + +@pytest.fixture +def job_uuid(): + return uuid.uuid4() + + +@pytest.fixture +def create_miner_client(transport: MinerSimulationTransport): + def _create(*args, **kwargs): + kwargs["transport"] = transport + return OrganicMinerClient(*args, **kwargs) + + return _create + + +@pytest.fixture +def manifest_message(): + return miner_requests.V0ExecutorManifestRequest( + manifest=miner_requests.ExecutorManifest( + executor_classes=[ + miner_requests.ExecutorClassManifest(executor_class=DEFAULT_EXECUTOR_CLASS, count=1) + ] + ) + ).model_dump_json() + + +@pytest.fixture +def executor_ready_message(job_uuid: uuid.UUID): + return miner_requests.V0ExecutorReadyRequest(job_uuid=str(job_uuid)).model_dump_json() + + +@pytest.fixture +def accept_job_message(job_uuid: uuid.UUID): + return miner_requests.V0AcceptJobRequest(job_uuid=str(job_uuid)).model_dump_json() + + +@pytest.fixture +def job_finish_message(job_uuid: uuid.UUID): + return miner_requests.V0JobFinishedRequest( + job_uuid=str(job_uuid), + docker_process_stdout="", + docker_process_stderr="", + ).model_dump_json() + + +@pytest.fixture +def job_failed_message(job_uuid: uuid.UUID): + return miner_requests.V0JobFailedRequest( + job_uuid=str(job_uuid), + docker_process_stdout="", + docker_process_stderr="", + ).model_dump_json() diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_cross_validation/test_prompt_answering.py b/validator/app/src/compute_horde_validator/validator/tests/test_cross_validation/test_prompt_answering.py new file mode 100644 index 000000000..5c45ad6fb --- /dev/null +++ b/validator/app/src/compute_horde_validator/validator/tests/test_cross_validation/test_prompt_answering.py @@ -0,0 +1,142 @@ +import json +import uuid +from collections.abc import Callable +from unittest.mock import patch + +import pytest + +from compute_horde_validator.validator.cross_validation.prompt_answering import answer_prompts +from compute_horde_validator.validator.models import ( + Prompt, + PromptSample, + PromptSeries, + SolveWorkload, +) +from compute_horde_validator.validator.tests.transport import MinerSimulationTransport + +pytestmark = [ + pytest.mark.asyncio, + pytest.mark.django_db(transaction=True), +] + + +async def db_setup(): + workload = await SolveWorkload.objects.acreate(seed=0, s3_url="s3://test") + prompt_series = await PromptSeries.objects.acreate( + s3_url="s3://test", + generator_version=1, + ) + prompt_sample = await PromptSample.objects.acreate( + series=prompt_series, + workload=workload, + ) + prompts = await Prompt.objects.abulk_create( + [ + Prompt(sample=prompt_sample, content="prompt1"), + Prompt(sample=prompt_sample, content="prompt2"), + Prompt(sample=prompt_sample, content="prompt3"), + ] + ) + return prompts, workload + + +async def mock_download_file_content(*args, **kwargs): + return json.dumps({f"prompt{i}": f"answer{i}" for i in range(1, 4)}) + + +async def mock_throw_error(*args, **kwargs): + raise Exception("Download failed") + + +@patch( + "compute_horde_validator.validator.synthetic_jobs.generator.llm_prompts.download_file_content", + mock_download_file_content, +) +async def test_answer_prompts( + settings, + transport: MinerSimulationTransport, + create_miner_client: Callable, + manifest_message: str, + executor_ready_message: str, + accept_job_message: str, + job_finish_message: str, + job_uuid: uuid.UUID, +): + await transport.add_message(manifest_message, send_before=1) + await transport.add_message(accept_job_message, send_before=1) + await transport.add_message(executor_ready_message, send_before=0) + await transport.add_message(job_finish_message, send_before=2) + + prompts, workload = await db_setup() + + await answer_prompts( + workload, create_miner_client=create_miner_client, job_uuid=job_uuid, wait_timeout=2 + ) + + await workload.arefresh_from_db() + assert workload.finished_at is not None + + for i, prompt in enumerate(prompts): + await prompt.arefresh_from_db() + assert prompt.answer == f"answer{i + 1}" + + +async def test_answer_prompts_job_failed( + transport: MinerSimulationTransport, + create_miner_client: Callable, + manifest_message: str, + executor_ready_message: str, + accept_job_message: str, + job_failed_message: str, + job_uuid: uuid.UUID, +): + await transport.add_message(manifest_message, send_before=1) + await transport.add_message(accept_job_message, send_before=1) + await transport.add_message(executor_ready_message, send_before=0) + await transport.add_message(job_failed_message, send_before=2) + + prompts, workload = await db_setup() + + await answer_prompts( + workload, create_miner_client=create_miner_client, job_uuid=job_uuid, wait_timeout=2 + ) + + await workload.arefresh_from_db() + assert workload.finished_at is None + + for prompt in prompts: + await prompt.arefresh_from_db() + assert prompt.answer is None + + +@patch( + "compute_horde_validator.validator.synthetic_jobs.generator.llm_prompts.download_file_content", + mock_throw_error, +) +async def test_answer_prompts_download_failed( + settings, + transport: MinerSimulationTransport, + create_miner_client: Callable, + manifest_message: str, + executor_ready_message: str, + accept_job_message: str, + job_finish_message: str, + job_uuid: uuid.UUID, +): + await transport.add_message(manifest_message, send_before=1) + await transport.add_message(accept_job_message, send_before=1) + await transport.add_message(executor_ready_message, send_before=0) + await transport.add_message(job_finish_message, send_before=2) + + prompts, workload = await db_setup() + + await answer_prompts( + workload, create_miner_client=create_miner_client, job_uuid=job_uuid, wait_timeout=2 + ) + + await workload.arefresh_from_db() + assert workload.finished_at is None + + for prompt in prompts: + await prompt.arefresh_from_db() + assert prompt.answer is None diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_cross_validation/test_prompt_generation.py b/validator/app/src/compute_horde_validator/validator/tests/test_cross_validation/test_prompt_generation.py index ce1259011..fc23b183d 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_cross_validation/test_prompt_generation.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_cross_validation/test_prompt_generation.py @@ -2,11 +2,7 @@ from collections.abc import Callable import pytest -import pytest_asyncio from compute_horde.base.output_upload import MultiUpload -from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS -from compute_horde.miner_client.organic import OrganicJobError, OrganicMinerClient -from compute_horde.mv_protocol import miner_requests from compute_horde.mv_protocol.validator_requests import BaseValidatorRequest from compute_horde_validator.validator.cross_validation.prompt_generation import generate_prompts @@ -17,71 +13,13 @@ pytest.mark.asyncio, pytest.mark.django_db(transaction=True), pytest.mark.override_config( - DYNAMIC_MAX_PROMPT_BATCHES=5, - DYNAMIC_PROMPTS_BATCHES_IN_A_SINGLE_GO=3, - DYNAMIC_NUMBER_OF_PROMPTS_IN_BATCH=99, + DYNAMIC_MAX_PROMPT_SERIES=5, + DYNAMIC_PROMPTS_SERIES_IN_A_SINGLE_GENERATION=3, + DYNAMIC_NUMBER_OF_PROMPTS_IN_SERIES=99, ), ] -@pytest_asyncio.fixture -async def transport(): - return MinerSimulationTransport("miner_hotkey") - - -@pytest.fixture -def job_uuid(): - return uuid.uuid4() - - -@pytest.fixture -def create_miner_client(transport: MinerSimulationTransport): - def _create(*args, **kwargs): - kwargs["transport"] = transport - return OrganicMinerClient(*args, **kwargs) - - return _create - - -@pytest.fixture -def manifest_message(): - return miner_requests.V0ExecutorManifestRequest( - manifest=miner_requests.ExecutorManifest( - executor_classes=[ - miner_requests.ExecutorClassManifest(executor_class=DEFAULT_EXECUTOR_CLASS, count=1) - ] - ) - ).model_dump_json() - - -@pytest.fixture -def executor_ready_message(job_uuid: uuid.UUID): - return miner_requests.V0ExecutorReadyRequest(job_uuid=str(job_uuid)).model_dump_json() - - -@pytest.fixture -def accept_job_message(job_uuid: uuid.UUID): - return miner_requests.V0AcceptJobRequest(job_uuid=str(job_uuid)).model_dump_json() - - -@pytest.fixture -def job_finish_message(job_uuid: uuid.UUID): - return miner_requests.V0JobFinishedRequest( - job_uuid=str(job_uuid), - docker_process_stdout="", - docker_process_stderr="", - ).model_dump_json() - - -@pytest.fixture -def job_failed_message(job_uuid: uuid.UUID): - return miner_requests.V0JobFailedRequest( - job_uuid=str(job_uuid), - docker_process_stdout="", - docker_process_stderr="", - ).model_dump_json() - - async def test_generate_prompts( transport: MinerSimulationTransport, create_miner_client: Callable, @@ -146,10 +84,9 @@ async def test_generate_prompts_job_failed( await transport.add_message(executor_ready_message, send_before=0) await transport.add_message(job_failed_message, send_before=2) - with pytest.raises(OrganicJobError): - await generate_prompts( - create_miner_client=create_miner_client, job_uuid=job_uuid, wait_timeout=2 - ) + await generate_prompts( + create_miner_client=create_miner_client, job_uuid=job_uuid, wait_timeout=2 + ) assert not await PromptSeries.objects.aexists() @@ -162,25 +99,8 @@ async def test_generate_prompts_timeout( ): await transport.add_message(manifest_message, send_before=1) - with pytest.raises(OrganicJobError): - await generate_prompts( - create_miner_client=create_miner_client, job_uuid=job_uuid, wait_timeout=0.5 - ) - - assert not await PromptSeries.objects.aexists() - - -async def test_generate_prompts_max_batches_reached( - create_miner_client: Callable, - job_uuid: uuid.UUID, -): - existing = [] - for _ in range(5): - existing.append(PromptSeries(s3_url="", generator_version=1)) - await PromptSeries.objects.abulk_create(existing) - await generate_prompts( create_miner_client=create_miner_client, job_uuid=job_uuid, wait_timeout=0.5 ) - assert await PromptSeries.objects.acount() == 5 + assert not await PromptSeries.objects.aexists() diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_llm_tasks.py b/validator/app/src/compute_horde_validator/validator/tests/test_llm_tasks.py new file mode 100644 index 000000000..29d4a5e98 --- /dev/null +++ b/validator/app/src/compute_horde_validator/validator/tests/test_llm_tasks.py @@ -0,0 +1,135 @@ +from unittest.mock import patch + +import pytest + +from compute_horde_validator.validator.models import ( + Prompt, + PromptSample, + PromptSeries, + SolveWorkload, +) +from compute_horde_validator.validator.tasks import llm_prompt_generation, llm_prompt_sampling + + +def create_prompt_series(num: int): + PromptSeries.objects.bulk_create( + [PromptSeries(s3_url="", generator_version=1) for _ in range(num)] + ) + + +@pytest.mark.override_config(DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY=5) +@pytest.mark.django_db(transaction=True) +def test_llm_prompt_sampling__will_not_trigger(): + create_prompt_series(10) + prompt_series = PromptSeries.objects.create(s3_url="", generator_version=1) + for i in range(5): + workload = SolveWorkload.objects.create(seed=i, s3_url="s3://test") + PromptSample(series=prompt_series, workload=workload) + + with patch( + "compute_horde_validator.validator.tasks.create_sample_workloads" + ) as mock_create_sample_workloads: + llm_prompt_sampling() + assert mock_create_sample_workloads.called + + +@pytest.mark.override_config(DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY=5) +@pytest.mark.django_db(transaction=True) +@patch("compute_horde_validator.validator.tasks.upload_prompts_to_s3_url", lambda *args: False) +@patch( + "compute_horde_validator.validator.tasks.download_prompts_from_s3_url", + lambda *args: ["test" for _ in range(10)], +) +def test_llm_prompt_sampling__fail_upload_to_s3(): + create_prompt_series(4) + llm_prompt_sampling() + assert SolveWorkload.objects.count() == 0 + assert PromptSample.objects.count() == 0 + assert Prompt.objects.count() == 0 + + +@pytest.mark.override_config(DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY=5) +@pytest.mark.django_db(transaction=True) +@patch("compute_horde_validator.validator.tasks.download_prompts_from_s3_url", lambda *args: []) +def test_llm_prompt_sampling__fail_download_from_s3(): + create_prompt_series(4) + llm_prompt_sampling() + assert SolveWorkload.objects.count() == 0 + assert PromptSample.objects.count() == 0 + assert Prompt.objects.count() == 0 + + +@pytest.mark.override_config( + DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY=5, + DYNAMIC_NUMBER_OF_PROMPTS_TO_SAMPLE_FROM_SERIES=10, + DYNAMIC_NUMBER_OF_PROMPTS_PER_WORKLOAD=20, +) +@pytest.mark.django_db(transaction=True) +@patch("compute_horde_validator.validator.tasks.upload_prompts_to_s3_url", lambda *args: True) +@patch( + "compute_horde_validator.validator.tasks.download_prompts_from_s3_url", + lambda *args: ["test" for _ in range(240)], +) +def test_llm_prompt_sampling__success(): + create_prompt_series(10) + llm_prompt_sampling() + assert SolveWorkload.objects.count() == 3 + assert PromptSample.objects.count() == 6 + assert Prompt.objects.count() == 60 + + +@pytest.mark.override_config( + DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY=4, + DYNAMIC_NUMBER_OF_PROMPTS_TO_SAMPLE_FROM_SERIES=100, + DYNAMIC_NUMBER_OF_PROMPTS_PER_WORKLOAD=80, +) +@pytest.mark.django_db(transaction=True) +@patch("compute_horde_validator.validator.tasks.upload_prompts_to_s3_url", lambda *args: True) +@patch( + "compute_horde_validator.validator.tasks.download_prompts_from_s3_url", + lambda *args: ["test" for _ in range(240)], +) +def test_llm_prompt_sampling__one_sample_per_workload(): + create_prompt_series(8) + llm_prompt_sampling() + assert SolveWorkload.objects.count() == 4 + assert PromptSample.objects.count() == 4 + assert Prompt.objects.count() == 400 + + +@pytest.mark.override_config( + DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY=1, + DYNAMIC_NUMBER_OF_PROMPTS_TO_SAMPLE_FROM_SERIES=1, + DYNAMIC_NUMBER_OF_PROMPTS_PER_WORKLOAD=5, +) +@pytest.mark.django_db(transaction=True) +@patch("compute_horde_validator.validator.tasks.upload_prompts_to_s3_url", lambda *args: True) +@patch( + "compute_horde_validator.validator.tasks.download_prompts_from_s3_url", + lambda *args: ["test" for _ in range(240)], +) +def test_llm_prompt_sampling__not_enough_for_one_workload(): + create_prompt_series(4) + llm_prompt_sampling() + assert SolveWorkload.objects.count() == 0 + assert PromptSample.objects.count() == 0 + assert Prompt.objects.count() == 0 + + +@pytest.mark.override_config(DYNAMIC_MAX_PROMPT_SERIES=5) +@pytest.mark.django_db(transaction=True) +def test_llm_prompt_generation__will_trigger(): + create_prompt_series(4) + with patch("compute_horde_validator.validator.tasks.generate_prompts") as mock_generate_prompts: + llm_prompt_generation() + assert mock_generate_prompts.called + + +@pytest.mark.override_config(DYNAMIC_MAX_PROMPT_SERIES=5) +@pytest.mark.django_db(transaction=True) +def test_llm_prompt_generation__will_not_trigger(): + create_prompt_series(10) + with patch("compute_horde_validator.validator.tasks.generate_prompts") as mock_generate_prompts: + llm_prompt_generation() + assert mock_generate_prompts.not_called + assert PromptSeries.objects.count() == 10 diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_s3.py b/validator/app/src/compute_horde_validator/validator/tests/test_s3.py index 532322e7a..8be2ea9a0 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_s3.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_s3.py @@ -4,9 +4,9 @@ from moto import mock_aws from compute_horde_validator.validator.s3 import ( + download_prompts_from_s3_url, generate_download_url, generate_upload_url, - get_prompts_from_s3_url, get_public_url, get_s3_client, ) @@ -90,11 +90,11 @@ def test_get_public_url( [ (200, "prompt1\nprompt2\nprompt3", ["prompt1", "prompt2", "prompt3"]), (200, "single_prompt", ["single_prompt"]), - (200, "", [""]), + (200, "", []), (404, "Not Found", []), ], ) -def test_get_prompts_from_s3_url(status_code, content, expected): +def test_download_prompts_from_s3_url(status_code, content, expected): with patch("requests.get") as mock_get: # Mock the requests.get response mock_response = MagicMock() @@ -102,7 +102,7 @@ def test_get_prompts_from_s3_url(status_code, content, expected): mock_response.text = content mock_get.return_value = mock_response - result = get_prompts_from_s3_url("https://fake-s3-url.com/prompts.txt") + result = download_prompts_from_s3_url("https://fake-s3-url.com/prompts.txt") assert result == expected mock_get.assert_called_once_with("https://fake-s3-url.com/prompts.txt") diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_scoring.py b/validator/app/src/compute_horde_validator/validator/tests/test_scoring.py index 3867146dc..d11a68c18 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_scoring.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_scoring.py @@ -1,5 +1,4 @@ from datetime import timedelta -from unittest.mock import patch import pytest from django.utils import timezone @@ -7,6 +6,8 @@ from compute_horde_validator.validator.models import Miner, SyntheticJob, SyntheticJobBatch from compute_horde_validator.validator.scoring import ExecutorClass, score_batches +EXECUTOR_CLASS_WEIGHTS_OVERRIDE = "spin_up-4min.gpu-24gb=8,always_on.gpu-24gb=2" + @pytest.fixture def setup_data(): @@ -72,18 +73,9 @@ def setup_data(): return batch -@pytest.fixture -def mocked_executor_class_weights(): - mocked_weights = { - ExecutorClass.spin_up_4min__gpu_24gb: 8, - ExecutorClass.always_on__gpu_24gb: 2, - } - with patch("compute_horde_validator.validator.scoring.EXECUTOR_CLASS_WEIGHTS", mocked_weights): - yield mocked_weights - - +@pytest.mark.override_config(DYNAMIC_EXECUTOR_CLASS_WEIGHTS=EXECUTOR_CLASS_WEIGHTS_OVERRIDE) @pytest.mark.django_db(databases=["default", "default_alias"], transaction=True) -def test_score_batches_basic(setup_data, mocked_executor_class_weights): +def test_score_batches_basic(setup_data): batch = setup_data scores = score_batches([batch]) @@ -101,8 +93,9 @@ def test_score_batches_basic(setup_data, mocked_executor_class_weights): assert scores["hotkey1"] > scores["hotkey4"] +@pytest.mark.override_config(DYNAMIC_EXECUTOR_CLASS_WEIGHTS=EXECUTOR_CLASS_WEIGHTS_OVERRIDE) @pytest.mark.django_db(databases=["default", "default_alias"], transaction=True) -def test_score_batches_with_changed_params_avg(setup_data, settings, mocked_executor_class_weights): +def test_score_batches_with_changed_params_avg(setup_data, settings): batch = setup_data settings.HORDE_SCORE_AVG_PARAM = ( @@ -117,10 +110,9 @@ def test_score_batches_with_changed_params_avg(setup_data, settings, mocked_exec assert changed_scores["hotkey2"] > changed_scores["hotkey4"] +@pytest.mark.override_config(DYNAMIC_EXECUTOR_CLASS_WEIGHTS=EXECUTOR_CLASS_WEIGHTS_OVERRIDE) @pytest.mark.django_db(databases=["default", "default_alias"], transaction=True) -def test_score_batches_with_changed_params_horde_size( - setup_data, settings, mocked_executor_class_weights -): +def test_score_batches_with_changed_params_horde_size(setup_data, settings): batch = setup_data settings.HORDE_SCORE_SIZE_PARAM = 1.75 @@ -133,8 +125,9 @@ def test_score_batches_with_changed_params_horde_size( assert changed_scores["hotkey4"] > changed_scores["hotkey3"] +@pytest.mark.override_config(DYNAMIC_EXECUTOR_CLASS_WEIGHTS=EXECUTOR_CLASS_WEIGHTS_OVERRIDE) @pytest.mark.django_db(databases=["default", "default_alias"], transaction=True) -def test_score_batches_executor_classes_weights(mocked_executor_class_weights): +def test_score_batches_executor_classes_weights(): miner1 = Miner.objects.create(hotkey="hotkey1") miner2 = Miner.objects.create(hotkey="hotkey2") diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_set_scores.py b/validator/app/src/compute_horde_validator/validator/tests/test_set_scores.py index a664953b2..d30d937e9 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_set_scores.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_set_scores.py @@ -97,6 +97,7 @@ def test_set_scores__too_early(settings): assert SystemEvent.objects.using(settings.DEFAULT_DB_ALIAS).count() == 0 +@pytest.mark.override_config(DYNAMIC_EXECUTOR_CLASS_WEIGHTS="spin_up-4min.gpu-24gb=100") @pytest.mark.django_db(databases=["default", "default_alias"], transaction=True) @patch_constance({"DYNAMIC_COMMIT_REVEAL_WEIGHTS_ENABLED": False}) def test_set_scores__set_weight_success(settings): diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/conftest.py b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/conftest.py index 05ba55809..8022fdf22 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/conftest.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/conftest.py @@ -6,9 +6,19 @@ from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS from compute_horde.miner_client.base import AbstractTransport from compute_horde.mv_protocol import miner_requests - -from compute_horde_validator.validator.models import Miner +from django.utils.timezone import now + +from compute_horde_validator.validator.models import ( + Miner, + Prompt, + PromptSample, + PromptSeries, + SolveWorkload, +) from compute_horde_validator.validator.synthetic_jobs.batch_run import BatchContext, MinerClient +from compute_horde_validator.validator.synthetic_jobs.generator.llm_prompts import ( + LlmPromptsSyntheticJobGenerator, +) from compute_horde_validator.validator.tests.transport import MinerSimulationTransport @@ -117,3 +127,62 @@ def job_failed_message(job_uuid: uuid.UUID, docker_process_stdout: str, docker_p docker_process_stdout=docker_process_stdout, docker_process_stderr=docker_process_stderr, ).model_dump_json() + + +@pytest_asyncio.fixture +async def prompt_series(): + return await PromptSeries.objects.acreate( + series_uuid=uuid.uuid4(), + s3_url="http://localhost:9999/prompt-series-download-url", + generator_version=0, + ) + + +@pytest_asyncio.fixture +async def solve_workload(): + return await SolveWorkload.objects.acreate( + workload_uuid=uuid.uuid4(), + seed=42, + s3_url="http://localhost:9999/solve-workload-download-url", + finished_at=now(), + ) + + +@pytest_asyncio.fixture +async def prompt_sample(prompt_series, solve_workload): + return await PromptSample.objects.acreate( + series=prompt_series, + workload=solve_workload, + synthetic_job=None, + ) + + +@pytest_asyncio.fixture +async def prompts(prompt_sample): + return await Prompt.objects.abulk_create( + [ + Prompt( + sample=prompt_sample, + content=str(i), + answer=str(i), + ) + for i in range(10) + ] + ) + + +@pytest_asyncio.fixture +async def llm_prompts_job_generator( + prompt_series: PromptSeries, + solve_workload: SolveWorkload, + prompt_sample: PromptSample, + prompts: list[Prompt], +) -> LlmPromptsSyntheticJobGenerator: + job_generator = LlmPromptsSyntheticJobGenerator( + prompt_sample=prompt_sample, + expected_prompts=prompts, + s3_url=prompt_series.s3_url, + seed=solve_workload.seed, + ) + await job_generator.ainit() + return job_generator diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/mock_generator.py b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/mock_generator.py index 40e6a7199..f1551744f 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/mock_generator.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/mock_generator.py @@ -1,5 +1,6 @@ import uuid +from compute_horde.base.volume import InlineVolume, Volume from compute_horde.executor_class import ExecutorClass from compute_horde.mv_protocol.miner_requests import ( V0JobFinishedRequest, @@ -15,7 +16,8 @@ class MockSyntheticJobGenerator(BaseSyntheticJobGenerator): - def __init__(self, _uuid: uuid.UUID): + def __init__(self, _uuid: uuid.UUID, **kwargs): + super().__init__(**kwargs) self._uuid = _uuid async def ainit(self): @@ -36,8 +38,8 @@ def docker_run_options_preset(self) -> str: def docker_run_cmd(self) -> list[str]: return ["mock"] - async def volume_contents(self) -> str: - return "mock" + async def volume(self) -> Volume | None: + return InlineVolume(contents="mock") def verify(self, msg: V0JobFinishedRequest, time_took: float) -> tuple[bool, str, float]: return True, "mock", MOCK_SCORE @@ -52,15 +54,16 @@ def verify(self, msg: V0JobFinishedRequest, time_took: float) -> tuple[bool, str class MockSyntheticJobGeneratorFactory(BaseSyntheticJobGeneratorFactory): - def __init__(self, uuids: list[uuid.UUID] = None): + def __init__(self, uuids: list[uuid.UUID] = None, **kwargs): + super().__init__(**kwargs) self._uuids = uuids or [] - async def create(self, executor_class: ExecutorClass) -> BaseSyntheticJobGenerator: + async def create(self, executor_class: ExecutorClass, **kwargs) -> BaseSyntheticJobGenerator: _uuid = self._uuids.pop(0) - return MockSyntheticJobGenerator(_uuid) + return MockSyntheticJobGenerator(_uuid, **kwargs) class TimeTookScoreMockSyntheticJobGeneratorFactory(MockSyntheticJobGeneratorFactory): - async def create(self, executor_class: ExecutorClass) -> BaseSyntheticJobGenerator: + async def create(self, executor_class: ExecutorClass, *args) -> BaseSyntheticJobGenerator: _uuid = self._uuids.pop(0) return TimeTookScoreMockSyntheticJobGenerator(_uuid) diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_llm_prompts_generator.py b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_llm_prompts_generator.py new file mode 100644 index 000000000..9ff622c76 --- /dev/null +++ b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_llm_prompts_generator.py @@ -0,0 +1,72 @@ +import pytest +from compute_horde.base.output_upload import MultiUpload, SingleFilePutUpload +from compute_horde.base.volume import MultiVolume, SingleFileVolume +from compute_horde.mv_protocol.miner_requests import V0JobFinishedRequest +from pytest_httpx import HTTPXMock + +from compute_horde_validator.validator.synthetic_jobs.generator.llm_prompts import ( + LlmPromptsSyntheticJobGenerator, +) + +_JOB_FINISHED_REQUEST = V0JobFinishedRequest( + job_uuid="CF8753B2-C86C-45A3-A01F-84295C3BAD8F", + docker_process_stdout="", + docker_process_stderr="", +) + + +@pytest.mark.asyncio +@pytest.mark.django_db(databases=["default", "default_alias"], transaction=True) +async def test_llm_prompts_generator_basic( + httpx_mock: HTTPXMock, + llm_prompts_job_generator: LlmPromptsSyntheticJobGenerator, +): + httpx_mock.add_response(json={str(i): str(i) for i in range(240)}) + + volume = await llm_prompts_job_generator.volume() + assert isinstance(volume, MultiVolume) + assert len(volume.volumes) == 1 + assert isinstance(volume.volumes[0], SingleFileVolume) + + output_upload = await llm_prompts_job_generator.output_upload() + assert isinstance(output_upload, MultiUpload) + assert len(output_upload.uploads) == 1 + assert isinstance(output_upload.uploads[0], SingleFilePutUpload) + + # before downloading answers + correct, _, score = llm_prompts_job_generator.verify(_JOB_FINISHED_REQUEST, 0) + assert not correct + assert score == 0.0 + + await llm_prompts_job_generator._download_answers() + correct, _, score = llm_prompts_job_generator.verify(_JOB_FINISHED_REQUEST, 0) + assert correct + assert score == 1.0 + + +@pytest.mark.asyncio +@pytest.mark.django_db(databases=["default", "default_alias"], transaction=True) +async def test_llm_prompts_generator_missing_prompts( + httpx_mock: HTTPXMock, + llm_prompts_job_generator: LlmPromptsSyntheticJobGenerator, +): + httpx_mock.add_response(json={str(i): str(i) for i in range(9, 249)}) + + await llm_prompts_job_generator._download_answers() + correct, _, score = llm_prompts_job_generator.verify(_JOB_FINISHED_REQUEST, 0) + assert not correct + assert score == 0.0 + + +@pytest.mark.asyncio +@pytest.mark.django_db(databases=["default", "default_alias"], transaction=True) +async def test_llm_prompts_generator_wrong_answers( + httpx_mock: HTTPXMock, + llm_prompts_job_generator: LlmPromptsSyntheticJobGenerator, +): + httpx_mock.add_response(json={str(i): "wrong" for i in range(240)}) + + await llm_prompts_job_generator._download_answers() + correct, _, score = llm_prompts_job_generator.verify(_JOB_FINISHED_REQUEST, 0) + assert not correct + assert score == 0.0 diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_llm_synthetic_job_flow.py b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_llm_synthetic_job_flow.py new file mode 100644 index 000000000..c432cb9e0 --- /dev/null +++ b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_llm_synthetic_job_flow.py @@ -0,0 +1,123 @@ +import asyncio +import re +import uuid +from collections.abc import Callable +from unittest.mock import patch + +import bittensor +import pytest +import pytest_asyncio +from compute_horde.executor_class import ExecutorClass +from compute_horde.mv_protocol import miner_requests +from pytest_httpx import HTTPXMock + +from compute_horde_validator.validator.models import ( + Miner, + Prompt, + PromptSample, + PromptSeries, + SolveWorkload, + SyntheticJob, +) +from compute_horde_validator.validator.s3 import get_public_url +from compute_horde_validator.validator.synthetic_jobs.batch_run import execute_synthetic_batch_run +from compute_horde_validator.validator.synthetic_jobs.generator.base import ( + BaseSyntheticJobGenerator, +) +from compute_horde_validator.validator.synthetic_jobs.generator.factory import ( + DefaultSyntheticJobGeneratorFactory, +) +from compute_horde_validator.validator.tests.transport import MinerSimulationTransport + + +class JobGeneratorFactory(DefaultSyntheticJobGeneratorFactory): + async def create(self, executor_class: ExecutorClass, **kwargs) -> BaseSyntheticJobGenerator: + generator = await super().create(executor_class, **kwargs) + generator._uuid = self._uuid + return generator + + +@pytest_asyncio.fixture +def mocked_job_generator_factory(prompts): + factory = JobGeneratorFactory() + with patch( + "compute_horde_validator.validator.synthetic_jobs.generator.current.synthetic_job_generator_factory", + factory, + ): + yield factory + + +@pytest.mark.asyncio +@pytest.mark.django_db(databases=["default", "default_alias"], transaction=True) +async def test_llm_synthetic_jobs_flow( + miner: Miner, + axon_dict: dict[str, bittensor.AxonInfo], + create_simulation_miner_client: Callable, + transport: MinerSimulationTransport, + override_weights_version_v2, + small_spin_up_times, + prompt_series: PromptSeries, + solve_workload: SolveWorkload, + prompt_sample: PromptSample, + prompts: list[Prompt], + mocked_job_generator_factory: JobGeneratorFactory, + httpx_mock: HTTPXMock, + settings, +): + job_uuid = str(uuid.uuid4()) + mocked_job_generator_factory._uuid = job_uuid + httpx_mock.add_response( + url=re.compile( + get_public_url(key=".*", bucket_name=settings.S3_BUCKET_NAME_ANSWERS, prefix="solved/") + ), + json={p.content: p.answer for p in prompts}, + ) + + manifest_message = miner_requests.V0ExecutorManifestRequest( + manifest=miner_requests.ExecutorManifest( + executor_classes=[ + miner_requests.ExecutorClassManifest( + executor_class=ExecutorClass.always_on__llm__a6000, + count=1, + ) + ] + ) + ).model_dump_json() + await transport.add_message(manifest_message, send_before=1) + + await transport.add_message( + miner_requests.V0AcceptJobRequest(job_uuid=job_uuid).model_dump_json(), + send_before=1, + sleep_before=0.05, + ) + await transport.add_message( + miner_requests.V0ExecutorReadyRequest(job_uuid=job_uuid).model_dump_json(), + send_before=0, + ) + await transport.add_message( + miner_requests.V0JobFinishedRequest( + job_uuid=job_uuid, + docker_process_stdout="", + docker_process_stderr="", + ).model_dump_json(), + send_before=2, + sleep_before=0.05, + ) + + assert prompt_sample.synthetic_job_id is None + + await asyncio.wait_for( + execute_synthetic_batch_run( + axon_dict, + [miner], + create_miner_client=create_simulation_miner_client, + ), + timeout=2, + ) + + job = await SyntheticJob.objects.aget(job_uuid=job_uuid) + assert job.status == SyntheticJob.Status.COMPLETED + assert job.score > 0 + + await prompt_sample.arefresh_from_db() + assert prompt_sample.synthetic_job_id == job.id diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_utils.py b/validator/app/src/compute_horde_validator/validator/tests/test_utils.py index 68a83957c..2df8d30cb 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_utils.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_utils.py @@ -7,6 +7,7 @@ import bittensor import pytest from asgiref.sync import sync_to_async +from compute_horde.base.volume import InlineVolume, Volume from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS, ExecutorClass from compute_horde.mv_protocol.miner_requests import ( ExecutorClassManifest, @@ -75,8 +76,8 @@ def docker_run_options_preset(self) -> str: def docker_run_cmd(self) -> list[str]: return ["mock"] - async def volume_contents(self) -> str: - return "mock" + async def volume(self) -> Volume | None: + return InlineVolume(contents="mock") def verify(self, msg: V0JobFinishedRequest, time_took: float) -> tuple[bool, str, float]: return True, "mock", MOCK_SCORE @@ -231,8 +232,8 @@ async def as_coro(fun, *args, **kwargs): class MockSyntheticJobGeneratorFactory(BaseSyntheticJobGeneratorFactory): - async def create(self, executor_class: ExecutorClass) -> BaseSyntheticJobGenerator: - return MockSyntheticJobGenerator() + async def create(self, executor_class: ExecutorClass, **kwargs) -> BaseSyntheticJobGenerator: + return MockSyntheticJobGenerator(**kwargs) mock_synthetic_job_generator_factory = MagicMock(name="MockSyntheticJobGeneratorFactory") @@ -366,8 +367,8 @@ async def create_mock_job_batches(miner): class TimeToookScoreMockSyntheticJobGeneratorFactory(BaseSyntheticJobGeneratorFactory): - async def create(self, executor_class: ExecutorClass) -> BaseSyntheticJobGenerator: - return TimeToookScoreMockSyntheticJobGenerator() + async def create(self, executor_class: ExecutorClass, **kwargs) -> BaseSyntheticJobGenerator: + return TimeToookScoreMockSyntheticJobGenerator(**kwargs) time_took_mock_synthetic_job_generator_factory = MagicMock( diff --git a/validator/pdm.lock b/validator/pdm.lock index 05c1224a6..3affaa8d2 100644 --- a/validator/pdm.lock +++ b/validator/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "format", "lint", "security_check", "test", "type_check"] strategy = ["inherit_metadata"] lock_version = "4.5.0" -content_hash = "sha256:eb2731ad148ba6d8433b4fcdf234edb1395d9584a14c0c59de4bd1e81b800b69" +content_hash = "sha256:90d12954944851d55941ccbc6f6d7c6b4ab29839251b409672976d86a5341912" [[metadata.targets]] requires_python = "==3.11.*" @@ -1288,7 +1288,7 @@ name = "httpcore" version = "1.0.5" requires_python = ">=3.8" summary = "A minimal low-level HTTP client." -groups = ["test"] +groups = ["default", "test"] dependencies = [ "certifi", "h11<0.15,>=0.13", @@ -1300,10 +1300,10 @@ files = [ [[package]] name = "httpx" -version = "0.27.0" +version = "0.27.2" requires_python = ">=3.8" summary = "The next generation HTTP client." -groups = ["test"] +groups = ["default", "test"] dependencies = [ "anyio", "certifi", @@ -1312,8 +1312,8 @@ dependencies = [ "sniffio", ] files = [ - {file = "httpx-0.27.0-py3-none-any.whl", hash = "sha256:71d5465162c13681bff01ad59b2cc68dd838ea1f10e51574bac27103f00c91a5"}, - {file = "httpx-0.27.0.tar.gz", hash = "sha256:a0cb88a46f32dc874e04ee956e4c2764aba2aa228f650b06788ba6bda2962ab5"}, + {file = "httpx-0.27.2-py3-none-any.whl", hash = "sha256:7bb2708e112d8fdd7829cd4243970f0c223274051cb35ee80c03301ee29a3df0"}, + {file = "httpx-0.27.2.tar.gz", hash = "sha256:f7c2be1d2f3c3c3160d441802406b206c2b76f5947b11115e6df10c6c65e66c2"}, ] [[package]] diff --git a/validator/pyproject.toml b/validator/pyproject.toml index 3fcc69dca..551830f29 100644 --- a/validator/pyproject.toml +++ b/validator/pyproject.toml @@ -30,6 +30,7 @@ dependencies = [ "django-admin-rangefilter==0.12.4", "uvloop>=0.19.0", "boto3>=1.35.11", + "httpx>=0.27.2", ] [build-system]