Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tool fixes #4075

Merged
merged 2 commits into from
Feb 21, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 108 additions & 12 deletions backend/scripts/debugging/onyx_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import sys
import time
from enum import Enum
from logging import getLogger
from typing import cast
from uuid import UUID
Expand All @@ -20,10 +21,13 @@
from onyx.configs.app_configs import REDIS_SSL
from onyx.db.engine import get_session_with_tenant
from onyx.db.users import get_user_by_email
from onyx.redis.redis_connector import RedisConnector
from onyx.redis.redis_connector_index import RedisConnectorIndex
from onyx.redis.redis_pool import RedisPool
from shared_configs.configs import MULTI_TENANT
from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
from shared_configs.contextvars import get_current_tenant_id

# Tool to run helpful operations on Redis in production
# This is targeted for internal usage and may not have all the necessary parameters
Expand All @@ -42,6 +46,19 @@
BATCH_DEFAULT = 1000


class OnyxRedisCommand(Enum):
purge_connectorsync_taskset = "purge_connectorsync_taskset"
purge_documentset_taskset = "purge_documentset_taskset"
purge_usergroup_taskset = "purge_usergroup_taskset"
purge_locks_blocking_deletion = "purge_locks_blocking_deletion"
purge_vespa_syncing = "purge_vespa_syncing"
get_user_token = "get_user_token"
delete_user_token = "delete_user_token"

def __str__(self) -> str:
return self.value


def get_user_id(user_email: str) -> tuple[UUID, str]:
tenant_id = (
get_tenant_id_for_email(user_email) if MULTI_TENANT else POSTGRES_DEFAULT_SCHEMA
Expand All @@ -55,50 +72,79 @@ def get_user_id(user_email: str) -> tuple[UUID, str]:


def onyx_redis(
command: str,
command: OnyxRedisCommand,
batch: int,
dry_run: bool,
ssl: bool,
host: str,
port: int,
db: int,
password: str | None,
user_email: str | None = None,
cc_pair_id: int | None = None,
) -> int:
# this is global and not tenant aware
pool = RedisPool.create_pool(
host=host,
port=port,
db=db,
password=password if password else "",
ssl=REDIS_SSL,
ssl=ssl,
ssl_cert_reqs="optional",
ssl_ca_certs=None,
)

r = Redis(connection_pool=pool)

logger.info("Redis ping starting. This may hang if your settings are incorrect.")

try:
r.ping()
except:
logger.exception("Redis ping exceptioned")
raise

if command == "purge_connectorsync_taskset":
logger.info("Redis ping succeeded.")

if command == OnyxRedisCommand.purge_connectorsync_taskset:
"""Purge connector tasksets. Used when the tasks represented in the tasksets
have been purged."""
return purge_by_match_and_type(
"*connectorsync_taskset*", "set", batch, dry_run, r
)
elif command == "purge_documentset_taskset":
elif command == OnyxRedisCommand.purge_documentset_taskset:
return purge_by_match_and_type(
"*documentset_taskset*", "set", batch, dry_run, r
)
elif command == "purge_usergroup_taskset":
elif command == OnyxRedisCommand.purge_usergroup_taskset:
return purge_by_match_and_type("*usergroup_taskset*", "set", batch, dry_run, r)
elif command == "purge_vespa_syncing":
elif command == OnyxRedisCommand.purge_locks_blocking_deletion:
if cc_pair_id is None:
logger.error("You must specify --cc-pair with purge_deletion_locks")
return 1

tenant_id = get_current_tenant_id()
logger.info(f"Purging locks associated with deleting cc_pair={cc_pair_id}.")
redis_connector = RedisConnector(tenant_id, cc_pair_id)

match_pattern = f"{tenant_id}:{RedisConnectorIndex.FENCE_PREFIX}_{cc_pair_id}/*"
purge_by_match_and_type(match_pattern, "string", batch, dry_run, r)

redis_delete_if_exists_helper(
f"{tenant_id}:{redis_connector.prune.fence_key}", dry_run, r
)
redis_delete_if_exists_helper(
f"{tenant_id}:{redis_connector.permissions.fence_key}", dry_run, r
)
redis_delete_if_exists_helper(
f"{tenant_id}:{redis_connector.external_group_sync.fence_key}", dry_run, r
)
return 0
elif command == OnyxRedisCommand.purge_vespa_syncing:
return purge_by_match_and_type(
"*connectorsync:vespa_syncing*", "string", batch, dry_run, r
)
elif command == "get_user_token":
elif command == OnyxRedisCommand.get_user_token:
if not user_email:
logger.error("You must specify --user-email with get_user_token")
return 1
Expand All @@ -109,7 +155,7 @@ def onyx_redis(
else:
print(f"No token found for user {user_email}")
return 2
elif command == "delete_user_token":
elif command == OnyxRedisCommand.delete_user_token:
if not user_email:
logger.error("You must specify --user-email with delete_user_token")
return 1
Expand All @@ -131,13 +177,38 @@ def flush_batch_delete(batch_keys: list[bytes], r: Redis) -> None:
pipe.execute()


def redis_delete_if_exists_helper(key: str, dry_run: bool, r: Redis) -> bool:
"""Returns True if the key was found, False if not.
This function exists for logging purposes as the delete operation itself
doesn't really need to check the existence of the key.
"""

if not r.exists(key):
logger.info(f"Did not find {key}.")
return False

if dry_run:
logger.info(f"(DRY-RUN) Deleting {key}.")
else:
logger.info(f"Deleting {key}.")
r.delete(key)

return True


def purge_by_match_and_type(
match_pattern: str, match_type: str, batch_size: int, dry_run: bool, r: Redis
) -> int:
"""match_pattern: glob style expression
match_type: https://redis.io/docs/latest/commands/type/
"""

logger.info(
f"purge_by_match_and_type start: "
f"match_pattern={match_pattern} "
f"match_type={match_type}"
)

# cursor = "0"
# while cursor != 0:
# cursor, data = self.scan(
Expand All @@ -164,13 +235,15 @@ def purge_by_match_and_type(
logger.info(f"Deleting item {count}: {key_str}")

batch_keys.append(key)

# flush if batch size has been reached
if len(batch_keys) >= batch_size:
flush_batch_delete(batch_keys, r)
batch_keys.clear()

if len(batch_keys) >= batch_size:
flush_batch_delete(batch_keys, r)
batch_keys.clear()
# final flush
flush_batch_delete(batch_keys, r)
batch_keys.clear()

logger.info(f"Deleted {count} matches.")

Expand Down Expand Up @@ -279,7 +352,21 @@ def delete_user_token_from_redis(

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Onyx Redis Manager")
parser.add_argument("--command", type=str, help="Operation to run", required=True)
parser.add_argument(
"--command",
type=OnyxRedisCommand,
help="The command to run",
choices=list(OnyxRedisCommand),
required=True,
)

parser.add_argument(
"--ssl",
type=bool,
default=REDIS_SSL,
help="Use SSL when connecting to Redis. Usually True for prod and False for local testing",
required=False,
)

parser.add_argument(
"--host",
Expand Down Expand Up @@ -342,6 +429,13 @@ def delete_user_token_from_redis(
required=False,
)

parser.add_argument(
"--cc-pair",
type=int,
help="A connector credential pair id. Used with the purge_deletion_locks command.",
required=False,
)

args = parser.parse_args()

if args.tenant_id:
Expand All @@ -368,10 +462,12 @@ def delete_user_token_from_redis(
command=args.command,
batch=args.batch,
dry_run=args.dry_run,
ssl=args.ssl,
host=args.host,
port=args.port,
db=args.db,
password=args.password,
user_email=args.user_email,
cc_pair_id=args.cc_pair,
)
sys.exit(exitcode)
Loading