From f0b77c0b774cb372b6023398f29dc9a457edc01e Mon Sep 17 00:00:00 2001 From: Trevor Bonas <45324987+trevorbonas@users.noreply.github.com> Date: Thu, 9 May 2024 13:20:56 -0700 Subject: [PATCH 1/3] Verify number of series migrated - The series count of buckets to be backed up are now logged when log level is set to `debug`. - The series count of buckets after restoration up are now logged when log level is set to `debug`. - `bucket_exists` has been improved. - All logs now include a timestamp. - Documentation describing how series counts are logged has been added. - `scripts/temp` has been added to `.gitignore`. --- .gitignore | 1 + tools/python/influx-migration/README.md | 14 +++ .../influx-migration/influx_migration.py | 108 +++++++++++++++--- 3 files changed, 109 insertions(+), 14 deletions(-) diff --git a/.gitignore b/.gitignore index 46306785..4fcc3f0b 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,4 @@ node_modules # Ignore AWS Influx migration script backup and performance files and directories tools/python/influx-migration/**/performance.txt tools/python/influx-migration/**/*influxdb-backup-* +tools/python/influx-migration/**/scripts/temp/* diff --git a/tools/python/influx-migration/README.md b/tools/python/influx-migration/README.md index 8e411fdb..7589c455 100644 --- a/tools/python/influx-migration/README.md +++ b/tools/python/influx-migration/README.md @@ -651,3 +651,17 @@ Possible reasons for a restore failing include: - Invalid InfluxDB destination token. - A bucket existing in the destination instance with the same name as in the source instance. For individual bucket migrations use the `--dest-bucket` option to set a unique name for the migrated bucket. - Connectivity failure, either with the source or destination hosts or with an optional S3 bucket. + +### Determining Amount of Data Migrated + +By default, the number of shards migrated, as reported by the Influx CLI, and the +number of rows migrated when `--csv` is used, are logged. +When the log level is set to `debug`, with the option `--log-level debug`, the +number of [series](https://docs.influxdata.com/influxdb/v2/reference/key-concepts/data-elements/#series) as reported by +the [InfluxDB `/metrics` endpoint](https://docs.influxdata.com/influxdb/v2/api/#operation/GetMetrics), under [bucket series number](https://docs.influxdata.com/influxdb/v2/reference/internals/metrics/#bucket-series-number), +will be logged. + +If a bucket is empty or has not been migrated, it will not be listed under bucket series number and an error indicating as such will be logged. +This can help determine whether data is successfully being migrated. + +To manually verify migrated records, see the recommended queries listed in the [Migration Overview](#migration-overview) section, step 3. \ No newline at end of file diff --git a/tools/python/influx-migration/influx_migration.py b/tools/python/influx-migration/influx_migration.py index 0baa84b1..6e7b5501 100644 --- a/tools/python/influx-migration/influx_migration.py +++ b/tools/python/influx-migration/influx_migration.py @@ -40,6 +40,7 @@ from influxdb_client import BucketRetentionRules, InfluxDBClient from influxdb_client.client.exceptions import InfluxDBError from influxdb_client.rest import ApiException +from influxdb_client.service.metrics_service import MetricsService # Maximum number of retries for attempting to mount an S3 bucket MAX_RETRIES = 20 @@ -49,6 +50,11 @@ # mount an S3 bucket MOUNT_POINT_NAME = "influxdb-backups" +# The number of seconds to wait before scraping from the /metrics endpoint +METRICS_SCRAPE_INTERVAL_SECONDS=10 + +BUCKET_PAGINATION_LIMIT=100 + script_duration = 0 # The user is already warned for using --skip-verify, this will de-clutter output @@ -80,6 +86,12 @@ def backup(backup_path, root_token, src_host, bucket_name=None, full=False, skip raise ValueError("bucket_name and full not provided, one must be provided") logging.info("Backing up bucket data and metadata using the InfluxDB CLI") + if logging.root.level >= logging.DEBUG: + time.sleep(METRICS_SCRAPE_INTERVAL_SECONDS) + if full: + report_all_bucket_series_count(host=src_host, token=root_token) + else: + report_bucket_series_count(bucket_name=bucket_name, host=src_host, token=root_token, org_name=src_org) start_time = time.time() bucket_backup_command = ['influx', 'backup', backup_path, '-t', root_token, @@ -97,6 +109,46 @@ def backup(backup_path, root_token, src_host, bucket_name=None, full=False, skip duration = time.time() - start_time log_performance_metrics("backup", start_time, duration) +def report_all_bucket_series_count(host, token, org_name=None): + with InfluxDBClient(url=host, token=token) as client: + # CSV migration may use an all access token, meaning buckets will be scoped to an organization + if org_name is not None: + client.org = org_name + buckets = client.buckets_api().find_buckets(limit=BUCKET_PAGINATION_LIMIT) + offset = 0 + while len(buckets.buckets) > 0: + for bucket in buckets.buckets: + if not bucket.name.startswith("_"): + report_bucket_series_count(bucket_name=bucket.name, host=host, token=token, org_name=org_name) + offset += BUCKET_PAGINATION_LIMIT + buckets = client.buckets_api().find_buckets(limit=BUCKET_PAGINATION_LIMIT, + offset=offset) + +def report_bucket_series_count(bucket_name, host, token, org_name=None): + try: + with InfluxDBClient(url=host, token=token) as client: + if org_name is not None: + client.org = org_name + buckets = client.buckets_api().find_buckets(name=bucket_name) \ + if org_name is None else \ + client.buckets_api().find_buckets(name=bucket_name, org=org_name) + for bucket in buckets.buckets: + metrics_service = MetricsService(client.api_client) + metrics = metrics_service.get_metrics() + for line in metrics.split("\n"): + if f'storage_bucket_series_num{{bucket="{bucket.id}"}}' in line: + line = line.split(" ") + if len(line) < 2: + raise ValueError(f"Bucket metrics for bucket with name {bucket.name} are " + "tracked in storage_bucket_series_num but its series count is missing. " + f"Check the {host}/metrics endpoint for more details") + logging.debug(f"Bucket with name {bucket.name}, in org {bucket.org_id}, has {line[1]} series") + return + raise ValueError(f"Bucket series count could not be found in {host}/metrics") + except (ApiException, ValueError) as error: + logging.error(repr(error)) + logging.error(f"Failed to get series count for bucket with name {bucket_name} in {host}") + def backup_csv(backup_path, root_token, src_host, bucket_name=None, full=False, skip_verify=False, src_org=None): """ Backups data and metadata stored in InfluxDB to a specified directory using csv for each @@ -118,6 +170,12 @@ def backup_csv(backup_path, root_token, src_host, bucket_name=None, full=False, :raises OSError: If writing bucket data to csv fails """ logging.info("Backing up bucket data and metadata using the InfluxDB v2 API") + if logging.root.level >= logging.DEBUG: + time.sleep(METRICS_SCRAPE_INTERVAL_SECONDS) + if full: + report_all_bucket_series_count(host=src_host, token=root_token, org_name=src_org) + else: + report_bucket_series_count(bucket_name=bucket_name, host=src_host, token=root_token, org_name=src_org) start_time = time.time() try: @@ -170,30 +228,41 @@ def bucket_create_rollback(host, token, bucket_name, org, skip_verify): client.close() return True -def bucket_exists(host, token, bucket_name, skip_verify=False, org=None): +def bucket_exists(host, token, bucket_name, org_name=None, skip_verify=False): """ Checks for the existence of a bucket. :param str host: The host for the InfluxDB instance. :param str token: The token to use for verification. :param str bucket_name: The name of the bucket to verify. + :param org_name: The name of the org to use for bucket verification. + :type org_name: str or None :param bool skip_verify: Whether to skip TLS certificate verification. - :param org: The name of the org to use for bucket verification - :type org: str or None :returns: Whether the bucket exists in the instance. :rtype: bool """ try: - client = InfluxDBClient(url=host, - token=token, timeout=MILLISECOND_TIMEOUT, verify_ssl=not skip_verify, org=org) - if client.buckets_api().find_bucket_by_name(bucket_name) is None: - return False + with InfluxDBClient(url=host, token=token, timeout=MILLISECOND_TIMEOUT, + verify_ssl=not skip_verify) as client: + # If the org name is provided, set the client org before making + # any requests + if org_name is not None: + client.org = org_name + # Buckets may have the same name in multiple organizations + buckets = client.buckets_api().find_buckets(name=bucket_name) \ + if org_name is None else \ + client.buckets_api().find_buckets(name=bucket_name, org=org_name) + if len(buckets.buckets) <= 0: + logging.debug(f"Bucket with name {bucket_name} could not be found " + f"in host {host}") + return False + logging.debug(f"{len(buckets.buckets)} buckets found") + for bucket in buckets.buckets: + logging.debug(f"Bucket with name {bucket_name} found in " + f"host {host} in org with ID {bucket.org_id}") + return True except InfluxDBError as error: - logging.error(str(error)) return False - finally: - client.close() - return True def cleanup(mount_point=None, exec_s3_bucket_mount=None): """ @@ -611,7 +680,7 @@ def set_logging(log_level): logging.addLevelName(logging.WARNING, yellow + logging.getLevelName(logging.WARNING) + reset) logging.addLevelName(logging.ERROR, bold_red + logging.getLevelName(logging.ERROR) + reset) - log_format = '%(levelname)s: %(filename)s: %(message)s' + log_format = '%(levelname)s: %(asctime)s %(filename)s: %(message)s' log_level = log_level.lower() if log_level == "debug": @@ -785,7 +854,7 @@ def verify_instances(args, src_token, dest_token): if args.src_org is not None and not verify_org(args.src_host, src_token, args.src_org, args.skip_verify): raise InfluxDBError(message="The source org could not be verified") if args.src_bucket is not None and args.full is False and \ - not bucket_exists(args.src_host, src_token, args.src_bucket, args.skip_verify, args.src_org): + not bucket_exists(args.src_host, src_token, args.src_bucket, args.src_org, args.skip_verify): raise InfluxDBError(message="The source bucket could not be found") if not args.skip_verify and not verify_tls(args.src_host): raise InfluxDBError(message="TLS certificate could not be verified for source host") @@ -800,7 +869,7 @@ def verify_instances(args, src_token, dest_token): if args.dest_org is not None and not verify_org(args.dest_host, dest_token, args.dest_org, args.skip_verify): raise InfluxDBError(message="The destination org could not be verified") if args.dest_bucket is not None and args.full is False and \ - bucket_exists(args.dest_host, dest_token, args.dest_bucket, args.skip_verify, args.dest_org): + bucket_exists(args.dest_host, dest_token, args.dest_bucket, args.dest_org, args.skip_verify): message = (f"The destination bucket {args.dest_bucket} already exists in the " "destination instance") if args.dest_org is not None: @@ -1084,6 +1153,17 @@ def main(args): "--retry-restore-dir option and provide the previously-mentioned backup directory.") raise + # Report number of series in destination after migration + if logging.root.level >= logging.DEBUG: + time.sleep(METRICS_SCRAPE_INTERVAL_SECONDS) + if args.full: + # For a full migration, destination instance will contain + # the source token after migration + report_all_bucket_series_count(host=args.dest_host, token=src_token) + else: + report_bucket_series_count(bucket_name=args.dest_bucket, host=args.dest_host, + token=dest_token, org_name=args.dest_org) + logging.info("Migration complete") log_performance_metrics("influx_migration.py", script_start_time, script_duration) except (ApiException, CalledProcessError, botocore.exceptions.ClientError, OSError, ValueError, From 3b040cd61492747e4a91566530495035720270b3 Mon Sep 17 00:00:00 2001 From: Trevor Bonas Date: Thu, 9 May 2024 13:27:43 -0700 Subject: [PATCH 2/3] Add verify_ssl to series count functions --- tools/python/influx-migration/influx_migration.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tools/python/influx-migration/influx_migration.py b/tools/python/influx-migration/influx_migration.py index 6e7b5501..0e11f060 100644 --- a/tools/python/influx-migration/influx_migration.py +++ b/tools/python/influx-migration/influx_migration.py @@ -109,8 +109,8 @@ def backup(backup_path, root_token, src_host, bucket_name=None, full=False, skip duration = time.time() - start_time log_performance_metrics("backup", start_time, duration) -def report_all_bucket_series_count(host, token, org_name=None): - with InfluxDBClient(url=host, token=token) as client: +def report_all_bucket_series_count(host, token, org_name=None, skip_verify=False): + with InfluxDBClient(url=host, token=token, verify_ssl=not skip_verify) as client: # CSV migration may use an all access token, meaning buckets will be scoped to an organization if org_name is not None: client.org = org_name @@ -119,14 +119,15 @@ def report_all_bucket_series_count(host, token, org_name=None): while len(buckets.buckets) > 0: for bucket in buckets.buckets: if not bucket.name.startswith("_"): - report_bucket_series_count(bucket_name=bucket.name, host=host, token=token, org_name=org_name) + report_bucket_series_count(bucket_name=bucket.name, host=host, token=token, + org_name=org_name, skip_verify=skip_verify) offset += BUCKET_PAGINATION_LIMIT buckets = client.buckets_api().find_buckets(limit=BUCKET_PAGINATION_LIMIT, offset=offset) -def report_bucket_series_count(bucket_name, host, token, org_name=None): +def report_bucket_series_count(bucket_name, host, token, org_name=None, skip_verify=False): try: - with InfluxDBClient(url=host, token=token) as client: + with InfluxDBClient(url=host, token=token, verify_ssl=not skip_verify) as client: if org_name is not None: client.org = org_name buckets = client.buckets_api().find_buckets(name=bucket_name) \ From 41764c2de88ed1ef82c161f216ae0c5f64d7c21e Mon Sep 17 00:00:00 2001 From: Trevor Bonas Date: Thu, 9 May 2024 23:17:27 -0700 Subject: [PATCH 3/3] Improve bucket count calls Now all calls to report_bucket_series_count and report_all_bucket_series_count include the skip_verify argument instead of defaulting to False. Calls to check bucket series counts after a full migration have been improved: - If a full migration without --csv has been done then the destination instance will have the source operator token, which will be used for checking all bucket series counts. - If --csv is used with a full migration then tokens will not be migrated and these calls will instead use the original destination token. --- .../influx-migration/influx_migration.py | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/tools/python/influx-migration/influx_migration.py b/tools/python/influx-migration/influx_migration.py index 0e11f060..0ec9c9ca 100644 --- a/tools/python/influx-migration/influx_migration.py +++ b/tools/python/influx-migration/influx_migration.py @@ -89,9 +89,10 @@ def backup(backup_path, root_token, src_host, bucket_name=None, full=False, skip if logging.root.level >= logging.DEBUG: time.sleep(METRICS_SCRAPE_INTERVAL_SECONDS) if full: - report_all_bucket_series_count(host=src_host, token=root_token) + report_all_bucket_series_count(host=src_host, token=root_token, skip_verify=skip_verify) else: - report_bucket_series_count(bucket_name=bucket_name, host=src_host, token=root_token, org_name=src_org) + report_bucket_series_count(bucket_name=bucket_name, host=src_host, token=root_token, + org_name=src_org, skip_verify=skip_verify) start_time = time.time() bucket_backup_command = ['influx', 'backup', backup_path, '-t', root_token, @@ -174,9 +175,10 @@ def backup_csv(backup_path, root_token, src_host, bucket_name=None, full=False, if logging.root.level >= logging.DEBUG: time.sleep(METRICS_SCRAPE_INTERVAL_SECONDS) if full: - report_all_bucket_series_count(host=src_host, token=root_token, org_name=src_org) + report_all_bucket_series_count(host=src_host, token=root_token, org_name=src_org, skip_verify=skip_verify) else: - report_bucket_series_count(bucket_name=bucket_name, host=src_host, token=root_token, org_name=src_org) + report_bucket_series_count(bucket_name=bucket_name, host=src_host, token=root_token, + org_name=src_org, skip_verify=skip_verify) start_time = time.time() try: @@ -1158,12 +1160,15 @@ def main(args): if logging.root.level >= logging.DEBUG: time.sleep(METRICS_SCRAPE_INTERVAL_SECONDS) if args.full: - # For a full migration, destination instance will contain - # the source token after migration - report_all_bucket_series_count(host=args.dest_host, token=src_token) + if args.csv: + report_all_bucket_series_count(host=args.dest_host, token=dest_token, skip_verify=args.skip_verify) + else: + # For a full migration without csv, destination instance will contain + # the source token after migration + report_all_bucket_series_count(host=args.dest_host, token=src_token, skip_verify=args.skip_verify) else: report_bucket_series_count(bucket_name=args.dest_bucket, host=args.dest_host, - token=dest_token, org_name=args.dest_org) + token=dest_token, org_name=args.dest_org, skip_verify=args.skip_verify) logging.info("Migration complete") log_performance_metrics("influx_migration.py", script_start_time, script_duration)