Skip to content

Commit

Permalink
Merge pull request #289 from backend-developers-ltd/prioritize-storin…
Browse files Browse the repository at this point in the history
…g-jobs

Prioritize storing jobs after synthetic batch
  • Loading branch information
mzukowski-reef authored Oct 29, 2024
2 parents 6342021 + 1001940 commit e417158
Showing 1 changed file with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1466,7 +1466,7 @@ def _db_persist_system_events(ctx: BatchContext) -> None:

# sync_to_async is needed since we use the sync Django ORM
@sync_to_async
def _db_persist(ctx: BatchContext) -> None:
def _db_persist_critical(ctx: BatchContext) -> None:
start_time = time.time()

# persist the batch and the jobs in the same transaction, to
Expand Down Expand Up @@ -1505,6 +1505,19 @@ def _db_persist(ctx: BatchContext) -> None:
)
synthetic_jobs.append(synthetic_job)
synthetic_jobs = SyntheticJob.objects.bulk_create(synthetic_jobs)
duration = time.time() - start_time
logger.info("Persisted to database in %.2f seconds", duration)


# sync_to_async is needed since we use the sync Django ORM
@sync_to_async
def _db_persist(ctx: BatchContext) -> None:
start_time = time.time()

if ctx.batch_id is not None:
batch = SyntheticJobBatch.objects.get(id=ctx.batch_id)
else:
batch = SyntheticJobBatch.objects.get(started_at=ctx.stage_start_time["BATCH_BEGIN"])

miner_manifests: list[MinerManifest] = []
for miner in ctx.miners.values():
Expand All @@ -1523,7 +1536,7 @@ def _db_persist(ctx: BatchContext) -> None:

# TODO: refactor into nicer abstraction
synthetic_jobs_map: dict[str, SyntheticJob] = {
str(synthetic_job.job_uuid): synthetic_job for synthetic_job in synthetic_jobs
str(synthetic_job.job_uuid): synthetic_job for synthetic_job in batch.synthetic_jobs.all()
}
prompt_samples: list[PromptSample] = []

Expand Down Expand Up @@ -1700,6 +1713,9 @@ async def execute_synthetic_batch_run(
func="_multi_close_client",
)

await ctx.checkpoint_system_event("_db_persist_critical")
await _db_persist_critical(ctx)

await ctx.checkpoint_system_event("_emit_telemetry_events")
try:
_emit_telemetry_events(ctx)
Expand Down

0 comments on commit e417158

Please sign in to comment.