Skip to content

Commit

Permalink
Merge pull request #260 from backend-developers-ltd/llama-predeploy-p…
Browse files Browse the repository at this point in the history
…arams

Llama predeploy params
  • Loading branch information
mzukowski-reef authored Sep 27, 2024
2 parents 8ff4b86 + dea90fe commit f12f260
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 38 deletions.
15 changes: 13 additions & 2 deletions validator/app/src/compute_horde_validator/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,12 @@ def wrapped(*args, **kwargs):
),
# llama params
"DYNAMIC_MAX_PROMPT_SERIES": (
10000,
3500,
"Maximum number of prompt series upon which the prompt generator will not be triggered",
int,
),
"DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY": (
250,
1536, # 256 * 2 * 3 - we allow 2 executors per miner and want queue for 3 synthetic job batches
"how many prompt samples to generate (should be larger than how many prompts series we use per synthetic run)",
int,
),
Expand Down Expand Up @@ -262,6 +262,17 @@ def wrapped(*args, **kwargs):
),
str,
),
"DYNAMIC_EXECUTOR_CLASS_WEIGHTS": (
"spin_up-4min.gpu-24gb=99,always_on.llm.a6000=1",
(
"Weights of executor classes that are used to normalize miners scores. "
"Executor classes not mentioned here are not taken into account when scoring. "
"The format should be: 'key1=value1,key2=value2', "
"where the keys are executor class enum values, and the values are floats, "
"but int values that sum up to 100 are encouraged"
),
str,
),
}
DYNAMIC_CONFIG_CACHE_TIMEOUT = 300

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import time
from collections.abc import Callable
from contextlib import suppress
from typing import Any

import constance.utils
from asgiref.sync import sync_to_async
Expand Down Expand Up @@ -46,17 +48,37 @@ def get_synthetic_jobs_flow_version():
return config.DYNAMIC_SYNTHETIC_JOBS_FLOW_VERSION


async def get_miner_max_executors_per_class() -> dict[ExecutorClass, int]:
miner_max_executors_per_class: str = await aget_config("DYNAMIC_MINER_MAX_EXECUTORS_PER_CLASS")
def executor_class_value_map_parser(
value_map_str: str, value_parser: Callable[[str], Any] | None = None
) -> dict[ExecutorClass, Any]:
result = {}
for pair in miner_max_executors_per_class.split(","):
for pair in value_map_str.split(","):
# ignore errors for misconfiguration, i,e. non-existent executor classes,
# non-integer/negative counts etc.
with suppress(ValueError):
executor_class_str, count_str = pair.split("=")
executor_class_str, value_str = pair.split("=")
executor_class = ExecutorClass(executor_class_str)
count = int(count_str)
if count >= 0:
result[executor_class] = count
if value_parser is not None:
parsed_value = value_parser(value_str)
else:
parsed_value = value_str
result[executor_class] = parsed_value
return result


async def get_miner_max_executors_per_class() -> dict[ExecutorClass, int]:
miner_max_executors_per_class: str = await aget_config("DYNAMIC_MINER_MAX_EXECUTORS_PER_CLASS")
result = {
executor_class: count
for executor_class, count in executor_class_value_map_parser(
miner_max_executors_per_class, value_parser=int
).items()
if count >= 0
}
return result


def get_executor_class_weights() -> dict[ExecutorClass, float]:
return executor_class_value_map_parser(
config.DYNAMIC_EXECUTOR_CLASS_WEIGHTS, value_parser=float
)
12 changes: 5 additions & 7 deletions validator/app/src/compute_horde_validator/validator/scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@
from compute_horde.executor_class import ExecutorClass
from django.conf import settings

logger = logging.getLogger(__name__)

from .dynamic_config import get_executor_class_weights

EXECUTOR_CLASS_WEIGHTS = {
ExecutorClass.spin_up_4min__gpu_24gb: 100,
}
logger = logging.getLogger(__name__)


def normalize(scores, weight=1):
Expand Down Expand Up @@ -64,9 +61,10 @@ def score_jobs(jobs, score_aggregation=sum, normalization_weight=1):


def score_batch(batch):
executor_class_weights = get_executor_class_weights()
executor_class_jobs = defaultdict(list)
for job in batch.synthetic_jobs.all():
if job.executor_class in EXECUTOR_CLASS_WEIGHTS:
if job.executor_class in executor_class_weights:
executor_class_jobs[job.executor_class].append(job)

parametriezed_horde_score = partial(
Expand All @@ -80,7 +78,7 @@ def score_batch(batch):
)
batch_scores = defaultdict(float)
for executor_class, jobs in executor_class_jobs.items():
executor_class_weight = EXECUTOR_CLASS_WEIGHTS[executor_class]
executor_class_weight = executor_class_weights[executor_class]
if executor_class == ExecutorClass.spin_up_4min__gpu_24gb:
score_aggregation = parametriezed_horde_score
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,14 @@ async def _generate_jobs(ctx: BatchContext) -> None:
if prompt_samples_iter is None:
logger.warning("No llm prompt samples available, skipping llm job")
continue
prompt_sample = next(prompt_samples_iter)
prompt_sample = next(prompt_samples_iter, None)
if prompt_sample is None:
# it means that there is some bug - we want to see it in sentry
# and continue, so other executor classes are not affected
logger.error(
"Dried prompt_samples_iter, this should not happen, skipping llm job"
)
continue
kwargs = {
"prompt_sample": prompt_sample,
"expected_prompts": list(prompt_sample.prompts.all()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ def _url_for_download(self) -> str:
)

def timeout_seconds(self) -> int:
# TODO: ???
return 80
return 48 # it takes around 42s - we add 15% buffer

def base_docker_image_name(self) -> str:
return "docker.io/backenddevelopersltd/compute-horde-prompt-solver:v0-latest"
Expand Down
4 changes: 2 additions & 2 deletions validator/app/src/compute_horde_validator/validator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,7 @@ def fetch_dynamic_config() -> None:


@app.task(
soft_time_limit=4 * 60 + 40,
soft_time_limit=4 * 60 + 50,
time_limit=5 * 60,
)
def llm_prompt_generation():
Expand Down Expand Up @@ -1167,7 +1167,7 @@ def llm_prompt_generation():


@app.task(
soft_time_limit=4 * 60 + 40,
soft_time_limit=4 * 60 + 50,
time_limit=5 * 60,
)
def llm_prompt_answering():
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from datetime import timedelta
from unittest.mock import patch

import pytest
from django.utils import timezone

from compute_horde_validator.validator.models import Miner, SyntheticJob, SyntheticJobBatch
from compute_horde_validator.validator.scoring import ExecutorClass, score_batches

EXECUTOR_CLASS_WEIGHTS_OVERRIDE = "spin_up-4min.gpu-24gb=8,always_on.gpu-24gb=2"


@pytest.fixture
def setup_data():
Expand Down Expand Up @@ -72,18 +73,9 @@ def setup_data():
return batch


@pytest.fixture
def mocked_executor_class_weights():
mocked_weights = {
ExecutorClass.spin_up_4min__gpu_24gb: 8,
ExecutorClass.always_on__gpu_24gb: 2,
}
with patch("compute_horde_validator.validator.scoring.EXECUTOR_CLASS_WEIGHTS", mocked_weights):
yield mocked_weights


@pytest.mark.override_config(DYNAMIC_EXECUTOR_CLASS_WEIGHTS=EXECUTOR_CLASS_WEIGHTS_OVERRIDE)
@pytest.mark.django_db(databases=["default", "default_alias"], transaction=True)
def test_score_batches_basic(setup_data, mocked_executor_class_weights):
def test_score_batches_basic(setup_data):
batch = setup_data
scores = score_batches([batch])

Expand All @@ -101,8 +93,9 @@ def test_score_batches_basic(setup_data, mocked_executor_class_weights):
assert scores["hotkey1"] > scores["hotkey4"]


@pytest.mark.override_config(DYNAMIC_EXECUTOR_CLASS_WEIGHTS=EXECUTOR_CLASS_WEIGHTS_OVERRIDE)
@pytest.mark.django_db(databases=["default", "default_alias"], transaction=True)
def test_score_batches_with_changed_params_avg(setup_data, settings, mocked_executor_class_weights):
def test_score_batches_with_changed_params_avg(setup_data, settings):
batch = setup_data

settings.HORDE_SCORE_AVG_PARAM = (
Expand All @@ -117,10 +110,9 @@ def test_score_batches_with_changed_params_avg(setup_data, settings, mocked_exec
assert changed_scores["hotkey2"] > changed_scores["hotkey4"]


@pytest.mark.override_config(DYNAMIC_EXECUTOR_CLASS_WEIGHTS=EXECUTOR_CLASS_WEIGHTS_OVERRIDE)
@pytest.mark.django_db(databases=["default", "default_alias"], transaction=True)
def test_score_batches_with_changed_params_horde_size(
setup_data, settings, mocked_executor_class_weights
):
def test_score_batches_with_changed_params_horde_size(setup_data, settings):
batch = setup_data

settings.HORDE_SCORE_SIZE_PARAM = 1.75
Expand All @@ -133,8 +125,9 @@ def test_score_batches_with_changed_params_horde_size(
assert changed_scores["hotkey4"] > changed_scores["hotkey3"]


@pytest.mark.override_config(DYNAMIC_EXECUTOR_CLASS_WEIGHTS=EXECUTOR_CLASS_WEIGHTS_OVERRIDE)
@pytest.mark.django_db(databases=["default", "default_alias"], transaction=True)
def test_score_batches_executor_classes_weights(mocked_executor_class_weights):
def test_score_batches_executor_classes_weights():
miner1 = Miner.objects.create(hotkey="hotkey1")
miner2 = Miner.objects.create(hotkey="hotkey2")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def test_set_scores__too_early(settings):
assert SystemEvent.objects.using(settings.DEFAULT_DB_ALIAS).count() == 0


@pytest.mark.override_config(DYNAMIC_EXECUTOR_CLASS_WEIGHTS="spin_up-4min.gpu-24gb=100")
@pytest.mark.django_db(databases=["default", "default_alias"], transaction=True)
@patch_constance({"DYNAMIC_COMMIT_REVEAL_WEIGHTS_ENABLED": False})
def test_set_scores__set_weight_success(settings):
Expand Down

0 comments on commit f12f260

Please sign in to comment.