Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Weves committed Feb 2, 2025
1 parent 993fda3 commit 583b234
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.template.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@
"--loglevel=INFO",
"--hostname=light@%n",
"-Q",
"vespa_metadata_sync,connector_deletion,doc_permissions_upsert",
"vespa_metadata_sync,connector_deletion,doc_permissions_upsert,checkpoint_cleanup",
],
"presentation": {
"group": "2",
Expand Down
9 changes: 7 additions & 2 deletions backend/onyx/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_INDEXING_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisLocks
from onyx.configs.constants import OnyxRedisSignals
Expand Down Expand Up @@ -726,8 +727,12 @@ def check_for_checkpoint_cleanup(self: Task, *, tenant_id: str | None) -> None:
task_logger.info(
f"Cleaning up checkpoint for index attempt {attempt.id}"
)
cleanup_checkpoint_task.delay(
index_attempt_id=attempt.id, tenant_id=tenant_id
cleanup_checkpoint_task.apply_async(
kwargs={
"index_attempt_id": attempt.id,
"tenant_id": tenant_id,
},
queue=OnyxCeleryQueues.CHECKPOINT_CLEANUP,
)

except Exception:
Expand Down
2 changes: 1 addition & 1 deletion backend/onyx/background/indexing/checkpointing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def get_latest_valid_checkpoint(


def get_index_attempts_with_old_checkpoints(
db_session: Session, days_to_keep: int = 0
db_session: Session, days_to_keep: int = 7
) -> list[IndexAttempt]:
"""Get all index attempts with checkpoints older than the specified number of days.
Expand Down
1 change: 1 addition & 0 deletions backend/onyx/configs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ class OnyxCeleryQueues:
DOC_PERMISSIONS_UPSERT = "doc_permissions_upsert"
CONNECTOR_DELETION = "connector_deletion"
LLM_MODEL_UPDATE = "llm_model_update"
CHECKPOINT_CLEANUP = "checkpoint_cleanup"

# Heavy queue
CONNECTOR_PRUNING = "connector_pruning"
Expand Down
2 changes: 1 addition & 1 deletion backend/scripts/dev_run_background_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def run_jobs() -> None:
"--loglevel=INFO",
"--hostname=light@%n",
"-Q",
"vespa_metadata_sync,connector_deletion,doc_permissions_upsert",
"vespa_metadata_sync,connector_deletion,doc_permissions_upsert,checkpoint_cleanup",
]

cmd_worker_heavy = [
Expand Down
2 changes: 1 addition & 1 deletion backend/supervisord.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ stopasgroup=true
command=celery -A onyx.background.celery.versioned_apps.light worker
--loglevel=INFO
--hostname=light@%%n
-Q vespa_metadata_sync,connector_deletion,doc_permissions_upsert
-Q vespa_metadata_sync,connector_deletion,doc_permissions_upsert,checkpoint_cleanup
stdout_logfile=/var/log/celery_worker_light.log
stdout_logfile_maxbytes=16MB
redirect_stderr=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,35 @@
from pydantic import BaseModel
from pydantic import Field

from onyx.connectors.mock_connector.connector import SingleConnectorYield
from onyx.connectors.models import ConnectorCheckpoint
# We would like to import these, but it makes building this so much harder/slower
# from onyx.connectors.mock_connector.connector import SingleConnectorYield
# from onyx.connectors.models import ConnectorCheckpoint

app = FastAPI()


# Global state to store connector behavior configuration
class ConnectorBehavior(BaseModel):
connector_yields: list[SingleConnectorYield] = Field(default_factory=list)
called_with_checkpoints: list[ConnectorCheckpoint] = Field(default_factory=list)
connector_yields: list[dict] = Field(
default_factory=list
) # really list[SingleConnectorYield]
called_with_checkpoints: list[dict] = Field(
default_factory=list
) # really list[ConnectorCheckpoint]


current_behavior: ConnectorBehavior = ConnectorBehavior()


@app.post("/set-behavior")
async def set_behavior(behavior: list[SingleConnectorYield]) -> None:
async def set_behavior(behavior: list[dict]) -> None:
"""Set the behavior for the next connector run"""
global current_behavior
current_behavior = ConnectorBehavior(connector_yields=behavior)


@app.get("/get-documents")
async def get_documents() -> list[SingleConnectorYield]:
async def get_documents() -> list[dict]:
"""Get the next batch of documents and update the checkpoint"""
global current_behavior

Expand All @@ -44,14 +49,14 @@ async def get_documents() -> list[SingleConnectorYield]:


@app.post("/add-checkpoint")
async def add_checkpoint(checkpoint: ConnectorCheckpoint) -> None:
async def add_checkpoint(checkpoint: dict) -> None:
"""Add a checkpoint to the list of checkpoints. Called by the MockConnector."""
global current_behavior
current_behavior.called_with_checkpoints.append(checkpoint)


@app.get("/get-checkpoints")
async def get_checkpoints() -> list[ConnectorCheckpoint]:
async def get_checkpoints() -> list[dict]:
"""Get the list of checkpoints. Used by the test to verify the
proper checkpoint ordering."""
global current_behavior
Expand Down

0 comments on commit 583b234

Please sign in to comment.