From 64568a61590fae4715211b5befe9db8492adc7a4 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Thu, 20 Feb 2025 17:29:29 -0800 Subject: [PATCH 1/2] tool fixes --- backend/scripts/debugging/onyx_redis.py | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/backend/scripts/debugging/onyx_redis.py b/backend/scripts/debugging/onyx_redis.py index 7d3334cf2ad..152312580a6 100644 --- a/backend/scripts/debugging/onyx_redis.py +++ b/backend/scripts/debugging/onyx_redis.py @@ -58,6 +58,7 @@ def onyx_redis( command: str, batch: int, dry_run: bool, + ssl: bool, host: str, port: int, db: int, @@ -69,19 +70,23 @@ def onyx_redis( port=port, db=db, password=password if password else "", - ssl=REDIS_SSL, + ssl=ssl, ssl_cert_reqs="optional", ssl_ca_certs=None, ) r = Redis(connection_pool=pool) + logger.info("Redis ping starting. This may hang if your settings are incorrect.") + try: r.ping() except: logger.exception("Redis ping exceptioned") raise + logger.info("Redis ping succeeded.") + if command == "purge_connectorsync_taskset": """Purge connector tasksets. Used when the tasks represented in the tasksets have been purged.""" @@ -164,13 +169,15 @@ def purge_by_match_and_type( logger.info(f"Deleting item {count}: {key_str}") batch_keys.append(key) + + # flush if batch size has been reached if len(batch_keys) >= batch_size: flush_batch_delete(batch_keys, r) batch_keys.clear() - if len(batch_keys) >= batch_size: - flush_batch_delete(batch_keys, r) - batch_keys.clear() + # final flush + flush_batch_delete(batch_keys, r) + batch_keys.clear() logger.info(f"Deleted {count} matches.") @@ -281,6 +288,14 @@ def delete_user_token_from_redis( parser = argparse.ArgumentParser(description="Onyx Redis Manager") parser.add_argument("--command", type=str, help="Operation to run", required=True) + parser.add_argument( + "--ssl", + type=bool, + default=REDIS_SSL, + help="Use SSL when connecting to Redis. Usually True for prod and False for local testing", + required=False, + ) + parser.add_argument( "--host", type=str, @@ -368,6 +383,7 @@ def delete_user_token_from_redis( command=args.command, batch=args.batch, dry_run=args.dry_run, + ssl=args.ssl, host=args.host, port=args.port, db=args.db, From 5433f14babb82a7540e8f2919dbfb8270f464f2e Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Fri, 21 Feb 2025 11:33:56 -0800 Subject: [PATCH 2/2] support purging locks blocking deletion --- backend/scripts/debugging/onyx_redis.py | 96 ++++++++++++++++++++++--- 1 file changed, 88 insertions(+), 8 deletions(-) diff --git a/backend/scripts/debugging/onyx_redis.py b/backend/scripts/debugging/onyx_redis.py index 152312580a6..6b42a084641 100644 --- a/backend/scripts/debugging/onyx_redis.py +++ b/backend/scripts/debugging/onyx_redis.py @@ -3,6 +3,7 @@ import logging import sys import time +from enum import Enum from logging import getLogger from typing import cast from uuid import UUID @@ -20,10 +21,13 @@ from onyx.configs.app_configs import REDIS_SSL from onyx.db.engine import get_session_with_tenant from onyx.db.users import get_user_by_email +from onyx.redis.redis_connector import RedisConnector +from onyx.redis.redis_connector_index import RedisConnectorIndex from onyx.redis.redis_pool import RedisPool from shared_configs.configs import MULTI_TENANT from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR +from shared_configs.contextvars import get_current_tenant_id # Tool to run helpful operations on Redis in production # This is targeted for internal usage and may not have all the necessary parameters @@ -42,6 +46,19 @@ BATCH_DEFAULT = 1000 +class OnyxRedisCommand(Enum): + purge_connectorsync_taskset = "purge_connectorsync_taskset" + purge_documentset_taskset = "purge_documentset_taskset" + purge_usergroup_taskset = "purge_usergroup_taskset" + purge_locks_blocking_deletion = "purge_locks_blocking_deletion" + purge_vespa_syncing = "purge_vespa_syncing" + get_user_token = "get_user_token" + delete_user_token = "delete_user_token" + + def __str__(self) -> str: + return self.value + + def get_user_id(user_email: str) -> tuple[UUID, str]: tenant_id = ( get_tenant_id_for_email(user_email) if MULTI_TENANT else POSTGRES_DEFAULT_SCHEMA @@ -55,7 +72,7 @@ def get_user_id(user_email: str) -> tuple[UUID, str]: def onyx_redis( - command: str, + command: OnyxRedisCommand, batch: int, dry_run: bool, ssl: bool, @@ -64,7 +81,9 @@ def onyx_redis( db: int, password: str | None, user_email: str | None = None, + cc_pair_id: int | None = None, ) -> int: + # this is global and not tenant aware pool = RedisPool.create_pool( host=host, port=port, @@ -87,23 +106,45 @@ def onyx_redis( logger.info("Redis ping succeeded.") - if command == "purge_connectorsync_taskset": + if command == OnyxRedisCommand.purge_connectorsync_taskset: """Purge connector tasksets. Used when the tasks represented in the tasksets have been purged.""" return purge_by_match_and_type( "*connectorsync_taskset*", "set", batch, dry_run, r ) - elif command == "purge_documentset_taskset": + elif command == OnyxRedisCommand.purge_documentset_taskset: return purge_by_match_and_type( "*documentset_taskset*", "set", batch, dry_run, r ) - elif command == "purge_usergroup_taskset": + elif command == OnyxRedisCommand.purge_usergroup_taskset: return purge_by_match_and_type("*usergroup_taskset*", "set", batch, dry_run, r) - elif command == "purge_vespa_syncing": + elif command == OnyxRedisCommand.purge_locks_blocking_deletion: + if cc_pair_id is None: + logger.error("You must specify --cc-pair with purge_deletion_locks") + return 1 + + tenant_id = get_current_tenant_id() + logger.info(f"Purging locks associated with deleting cc_pair={cc_pair_id}.") + redis_connector = RedisConnector(tenant_id, cc_pair_id) + + match_pattern = f"{tenant_id}:{RedisConnectorIndex.FENCE_PREFIX}_{cc_pair_id}/*" + purge_by_match_and_type(match_pattern, "string", batch, dry_run, r) + + redis_delete_if_exists_helper( + f"{tenant_id}:{redis_connector.prune.fence_key}", dry_run, r + ) + redis_delete_if_exists_helper( + f"{tenant_id}:{redis_connector.permissions.fence_key}", dry_run, r + ) + redis_delete_if_exists_helper( + f"{tenant_id}:{redis_connector.external_group_sync.fence_key}", dry_run, r + ) + return 0 + elif command == OnyxRedisCommand.purge_vespa_syncing: return purge_by_match_and_type( "*connectorsync:vespa_syncing*", "string", batch, dry_run, r ) - elif command == "get_user_token": + elif command == OnyxRedisCommand.get_user_token: if not user_email: logger.error("You must specify --user-email with get_user_token") return 1 @@ -114,7 +155,7 @@ def onyx_redis( else: print(f"No token found for user {user_email}") return 2 - elif command == "delete_user_token": + elif command == OnyxRedisCommand.delete_user_token: if not user_email: logger.error("You must specify --user-email with delete_user_token") return 1 @@ -136,6 +177,25 @@ def flush_batch_delete(batch_keys: list[bytes], r: Redis) -> None: pipe.execute() +def redis_delete_if_exists_helper(key: str, dry_run: bool, r: Redis) -> bool: + """Returns True if the key was found, False if not. + This function exists for logging purposes as the delete operation itself + doesn't really need to check the existence of the key. + """ + + if not r.exists(key): + logger.info(f"Did not find {key}.") + return False + + if dry_run: + logger.info(f"(DRY-RUN) Deleting {key}.") + else: + logger.info(f"Deleting {key}.") + r.delete(key) + + return True + + def purge_by_match_and_type( match_pattern: str, match_type: str, batch_size: int, dry_run: bool, r: Redis ) -> int: @@ -143,6 +203,12 @@ def purge_by_match_and_type( match_type: https://redis.io/docs/latest/commands/type/ """ + logger.info( + f"purge_by_match_and_type start: " + f"match_pattern={match_pattern} " + f"match_type={match_type}" + ) + # cursor = "0" # while cursor != 0: # cursor, data = self.scan( @@ -286,7 +352,13 @@ def delete_user_token_from_redis( if __name__ == "__main__": parser = argparse.ArgumentParser(description="Onyx Redis Manager") - parser.add_argument("--command", type=str, help="Operation to run", required=True) + parser.add_argument( + "--command", + type=OnyxRedisCommand, + help="The command to run", + choices=list(OnyxRedisCommand), + required=True, + ) parser.add_argument( "--ssl", @@ -357,6 +429,13 @@ def delete_user_token_from_redis( required=False, ) + parser.add_argument( + "--cc-pair", + type=int, + help="A connector credential pair id. Used with the purge_deletion_locks command.", + required=False, + ) + args = parser.parse_args() if args.tenant_id: @@ -389,5 +468,6 @@ def delete_user_token_from_redis( db=args.db, password=args.password, user_email=args.user_email, + cc_pair_id=args.cc_pair, ) sys.exit(exitcode)