From 583b234290bef98bbef9cab8b973ab1932655c19 Mon Sep 17 00:00:00 2001 From: Weves Date: Sat, 1 Feb 2025 17:36:38 -0800 Subject: [PATCH] cleanup --- .vscode/launch.template.jsonc | 2 +- .../background/celery/tasks/indexing/tasks.py | 9 ++++++-- .../indexing/checkpointing_utils.py | 2 +- backend/onyx/configs/constants.py | 1 + backend/scripts/dev_run_background_jobs.py | 2 +- backend/supervisord.conf | 2 +- .../mock_connector_server/main.py | 21 ++++++++++++------- 7 files changed, 25 insertions(+), 14 deletions(-) diff --git a/.vscode/launch.template.jsonc b/.vscode/launch.template.jsonc index 8c965d36e802..f1454ca8e9cb 100644 --- a/.vscode/launch.template.jsonc +++ b/.vscode/launch.template.jsonc @@ -205,7 +205,7 @@ "--loglevel=INFO", "--hostname=light@%n", "-Q", - "vespa_metadata_sync,connector_deletion,doc_permissions_upsert", + "vespa_metadata_sync,connector_deletion,doc_permissions_upsert,checkpoint_cleanup", ], "presentation": { "group": "2", diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index 22ffad7bc833..552c379423fd 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -33,6 +33,7 @@ from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import CELERY_INDEXING_LOCK_TIMEOUT from onyx.configs.constants import CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT +from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import OnyxRedisLocks from onyx.configs.constants import OnyxRedisSignals @@ -726,8 +727,12 @@ def check_for_checkpoint_cleanup(self: Task, *, tenant_id: str | None) -> None: task_logger.info( f"Cleaning up checkpoint for index attempt {attempt.id}" ) - cleanup_checkpoint_task.delay( - index_attempt_id=attempt.id, tenant_id=tenant_id + cleanup_checkpoint_task.apply_async( + kwargs={ + "index_attempt_id": attempt.id, + "tenant_id": tenant_id, + }, + queue=OnyxCeleryQueues.CHECKPOINT_CLEANUP, ) except Exception: diff --git a/backend/onyx/background/indexing/checkpointing_utils.py b/backend/onyx/background/indexing/checkpointing_utils.py index 72180c3c3823..aa1987a319c1 100644 --- a/backend/onyx/background/indexing/checkpointing_utils.py +++ b/backend/onyx/background/indexing/checkpointing_utils.py @@ -130,7 +130,7 @@ def get_latest_valid_checkpoint( def get_index_attempts_with_old_checkpoints( - db_session: Session, days_to_keep: int = 0 + db_session: Session, days_to_keep: int = 7 ) -> list[IndexAttempt]: """Get all index attempts with checkpoints older than the specified number of days. diff --git a/backend/onyx/configs/constants.py b/backend/onyx/configs/constants.py index e05cc92233cc..48403737b173 100644 --- a/backend/onyx/configs/constants.py +++ b/backend/onyx/configs/constants.py @@ -259,6 +259,7 @@ class OnyxCeleryQueues: DOC_PERMISSIONS_UPSERT = "doc_permissions_upsert" CONNECTOR_DELETION = "connector_deletion" LLM_MODEL_UPDATE = "llm_model_update" + CHECKPOINT_CLEANUP = "checkpoint_cleanup" # Heavy queue CONNECTOR_PRUNING = "connector_pruning" diff --git a/backend/scripts/dev_run_background_jobs.py b/backend/scripts/dev_run_background_jobs.py index 3370034c3ce6..ef638aebae41 100644 --- a/backend/scripts/dev_run_background_jobs.py +++ b/backend/scripts/dev_run_background_jobs.py @@ -42,7 +42,7 @@ def run_jobs() -> None: "--loglevel=INFO", "--hostname=light@%n", "-Q", - "vespa_metadata_sync,connector_deletion,doc_permissions_upsert", + "vespa_metadata_sync,connector_deletion,doc_permissions_upsert,checkpoint_cleanup", ] cmd_worker_heavy = [ diff --git a/backend/supervisord.conf b/backend/supervisord.conf index 78d5679bae7c..d42672096a51 100644 --- a/backend/supervisord.conf +++ b/backend/supervisord.conf @@ -33,7 +33,7 @@ stopasgroup=true command=celery -A onyx.background.celery.versioned_apps.light worker --loglevel=INFO --hostname=light@%%n - -Q vespa_metadata_sync,connector_deletion,doc_permissions_upsert + -Q vespa_metadata_sync,connector_deletion,doc_permissions_upsert,checkpoint_cleanup stdout_logfile=/var/log/celery_worker_light.log stdout_logfile_maxbytes=16MB redirect_stderr=true diff --git a/backend/tests/integration/common_utils/mock_connector_server/main.py b/backend/tests/integration/common_utils/mock_connector_server/main.py index 311f5726e484..a6ffbf57b337 100644 --- a/backend/tests/integration/common_utils/mock_connector_server/main.py +++ b/backend/tests/integration/common_utils/mock_connector_server/main.py @@ -3,30 +3,35 @@ from pydantic import BaseModel from pydantic import Field -from onyx.connectors.mock_connector.connector import SingleConnectorYield -from onyx.connectors.models import ConnectorCheckpoint +# We would like to import these, but it makes building this so much harder/slower +# from onyx.connectors.mock_connector.connector import SingleConnectorYield +# from onyx.connectors.models import ConnectorCheckpoint app = FastAPI() # Global state to store connector behavior configuration class ConnectorBehavior(BaseModel): - connector_yields: list[SingleConnectorYield] = Field(default_factory=list) - called_with_checkpoints: list[ConnectorCheckpoint] = Field(default_factory=list) + connector_yields: list[dict] = Field( + default_factory=list + ) # really list[SingleConnectorYield] + called_with_checkpoints: list[dict] = Field( + default_factory=list + ) # really list[ConnectorCheckpoint] current_behavior: ConnectorBehavior = ConnectorBehavior() @app.post("/set-behavior") -async def set_behavior(behavior: list[SingleConnectorYield]) -> None: +async def set_behavior(behavior: list[dict]) -> None: """Set the behavior for the next connector run""" global current_behavior current_behavior = ConnectorBehavior(connector_yields=behavior) @app.get("/get-documents") -async def get_documents() -> list[SingleConnectorYield]: +async def get_documents() -> list[dict]: """Get the next batch of documents and update the checkpoint""" global current_behavior @@ -44,14 +49,14 @@ async def get_documents() -> list[SingleConnectorYield]: @app.post("/add-checkpoint") -async def add_checkpoint(checkpoint: ConnectorCheckpoint) -> None: +async def add_checkpoint(checkpoint: dict) -> None: """Add a checkpoint to the list of checkpoints. Called by the MockConnector.""" global current_behavior current_behavior.called_with_checkpoints.append(checkpoint) @app.get("/get-checkpoints") -async def get_checkpoints() -> list[ConnectorCheckpoint]: +async def get_checkpoints() -> list[dict]: """Get the list of checkpoints. Used by the test to verify the proper checkpoint ordering.""" global current_behavior