Skip to content

Commit

Permalink
Merge pull request #185 from Bit-Quill/integ-record-verification
Browse files Browse the repository at this point in the history
Verify number of InfluxDB series migrated
  • Loading branch information
sethusrinivasan authored May 10, 2024
2 parents 98d68a1 + 41764c2 commit fd5edf5
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 14 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ node_modules
# Ignore AWS Influx migration script backup and performance files and directories
tools/python/influx-migration/**/performance.txt
tools/python/influx-migration/**/*influxdb-backup-*
tools/python/influx-migration/**/scripts/temp/*
14 changes: 14 additions & 0 deletions tools/python/influx-migration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -651,3 +651,17 @@ Possible reasons for a restore failing include:
- Invalid InfluxDB destination token.
- A bucket existing in the destination instance with the same name as in the source instance. For individual bucket migrations use the `--dest-bucket` option to set a unique name for the migrated bucket.
- Connectivity failure, either with the source or destination hosts or with an optional S3 bucket.

### Determining Amount of Data Migrated

By default, the number of shards migrated, as reported by the Influx CLI, and the
number of rows migrated when `--csv` is used, are logged.
When the log level is set to `debug`, with the option `--log-level debug`, the
number of [series](https://docs.influxdata.com/influxdb/v2/reference/key-concepts/data-elements/#series) as reported by
the [InfluxDB `/metrics` endpoint](https://docs.influxdata.com/influxdb/v2/api/#operation/GetMetrics), under [bucket series number](https://docs.influxdata.com/influxdb/v2/reference/internals/metrics/#bucket-series-number),
will be logged.

If a bucket is empty or has not been migrated, it will not be listed under bucket series number and an error indicating as such will be logged.
This can help determine whether data is successfully being migrated.

To manually verify migrated records, see the recommended queries listed in the [Migration Overview](#migration-overview) section, step 3.
114 changes: 100 additions & 14 deletions tools/python/influx-migration/influx_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from influxdb_client import BucketRetentionRules, InfluxDBClient
from influxdb_client.client.exceptions import InfluxDBError
from influxdb_client.rest import ApiException
from influxdb_client.service.metrics_service import MetricsService

# Maximum number of retries for attempting to mount an S3 bucket
MAX_RETRIES = 20
Expand All @@ -49,6 +50,11 @@
# mount an S3 bucket
MOUNT_POINT_NAME = "influxdb-backups"

# The number of seconds to wait before scraping from the /metrics endpoint
METRICS_SCRAPE_INTERVAL_SECONDS=10

BUCKET_PAGINATION_LIMIT=100

script_duration = 0

# The user is already warned for using --skip-verify, this will de-clutter output
Expand Down Expand Up @@ -80,6 +86,13 @@ def backup(backup_path, root_token, src_host, bucket_name=None, full=False, skip
raise ValueError("bucket_name and full not provided, one must be provided")

logging.info("Backing up bucket data and metadata using the InfluxDB CLI")
if logging.root.level >= logging.DEBUG:
time.sleep(METRICS_SCRAPE_INTERVAL_SECONDS)
if full:
report_all_bucket_series_count(host=src_host, token=root_token, skip_verify=skip_verify)
else:
report_bucket_series_count(bucket_name=bucket_name, host=src_host, token=root_token,
org_name=src_org, skip_verify=skip_verify)
start_time = time.time()

bucket_backup_command = ['influx', 'backup', backup_path, '-t', root_token,
Expand All @@ -97,6 +110,47 @@ def backup(backup_path, root_token, src_host, bucket_name=None, full=False, skip
duration = time.time() - start_time
log_performance_metrics("backup", start_time, duration)

def report_all_bucket_series_count(host, token, org_name=None, skip_verify=False):
with InfluxDBClient(url=host, token=token, verify_ssl=not skip_verify) as client:
# CSV migration may use an all access token, meaning buckets will be scoped to an organization
if org_name is not None:
client.org = org_name
buckets = client.buckets_api().find_buckets(limit=BUCKET_PAGINATION_LIMIT)
offset = 0
while len(buckets.buckets) > 0:
for bucket in buckets.buckets:
if not bucket.name.startswith("_"):
report_bucket_series_count(bucket_name=bucket.name, host=host, token=token,
org_name=org_name, skip_verify=skip_verify)
offset += BUCKET_PAGINATION_LIMIT
buckets = client.buckets_api().find_buckets(limit=BUCKET_PAGINATION_LIMIT,
offset=offset)

def report_bucket_series_count(bucket_name, host, token, org_name=None, skip_verify=False):
try:
with InfluxDBClient(url=host, token=token, verify_ssl=not skip_verify) as client:
if org_name is not None:
client.org = org_name
buckets = client.buckets_api().find_buckets(name=bucket_name) \
if org_name is None else \
client.buckets_api().find_buckets(name=bucket_name, org=org_name)
for bucket in buckets.buckets:
metrics_service = MetricsService(client.api_client)
metrics = metrics_service.get_metrics()
for line in metrics.split("\n"):
if f'storage_bucket_series_num{{bucket="{bucket.id}"}}' in line:
line = line.split(" ")
if len(line) < 2:
raise ValueError(f"Bucket metrics for bucket with name {bucket.name} are "
"tracked in storage_bucket_series_num but its series count is missing. "
f"Check the {host}/metrics endpoint for more details")
logging.debug(f"Bucket with name {bucket.name}, in org {bucket.org_id}, has {line[1]} series")
return
raise ValueError(f"Bucket series count could not be found in {host}/metrics")
except (ApiException, ValueError) as error:
logging.error(repr(error))
logging.error(f"Failed to get series count for bucket with name {bucket_name} in {host}")

def backup_csv(backup_path, root_token, src_host, bucket_name=None, full=False, skip_verify=False, src_org=None):
"""
Backups data and metadata stored in InfluxDB to a specified directory using csv for each
Expand All @@ -118,6 +172,13 @@ def backup_csv(backup_path, root_token, src_host, bucket_name=None, full=False,
:raises OSError: If writing bucket data to csv fails
"""
logging.info("Backing up bucket data and metadata using the InfluxDB v2 API")
if logging.root.level >= logging.DEBUG:
time.sleep(METRICS_SCRAPE_INTERVAL_SECONDS)
if full:
report_all_bucket_series_count(host=src_host, token=root_token, org_name=src_org, skip_verify=skip_verify)
else:
report_bucket_series_count(bucket_name=bucket_name, host=src_host, token=root_token,
org_name=src_org, skip_verify=skip_verify)
start_time = time.time()

try:
Expand Down Expand Up @@ -170,30 +231,41 @@ def bucket_create_rollback(host, token, bucket_name, org, skip_verify):
client.close()
return True

def bucket_exists(host, token, bucket_name, skip_verify=False, org=None):
def bucket_exists(host, token, bucket_name, org_name=None, skip_verify=False):
"""
Checks for the existence of a bucket.
:param str host: The host for the InfluxDB instance.
:param str token: The token to use for verification.
:param str bucket_name: The name of the bucket to verify.
:param org_name: The name of the org to use for bucket verification.
:type org_name: str or None
:param bool skip_verify: Whether to skip TLS certificate verification.
:param org: The name of the org to use for bucket verification
:type org: str or None
:returns: Whether the bucket exists in the instance.
:rtype: bool
"""
try:
client = InfluxDBClient(url=host,
token=token, timeout=MILLISECOND_TIMEOUT, verify_ssl=not skip_verify, org=org)
if client.buckets_api().find_bucket_by_name(bucket_name) is None:
return False
with InfluxDBClient(url=host, token=token, timeout=MILLISECOND_TIMEOUT,
verify_ssl=not skip_verify) as client:
# If the org name is provided, set the client org before making
# any requests
if org_name is not None:
client.org = org_name
# Buckets may have the same name in multiple organizations
buckets = client.buckets_api().find_buckets(name=bucket_name) \
if org_name is None else \
client.buckets_api().find_buckets(name=bucket_name, org=org_name)
if len(buckets.buckets) <= 0:
logging.debug(f"Bucket with name {bucket_name} could not be found "
f"in host {host}")
return False
logging.debug(f"{len(buckets.buckets)} buckets found")
for bucket in buckets.buckets:
logging.debug(f"Bucket with name {bucket_name} found in "
f"host {host} in org with ID {bucket.org_id}")
return True
except InfluxDBError as error:
logging.error(str(error))
return False
finally:
client.close()
return True

def cleanup(mount_point=None, exec_s3_bucket_mount=None):
"""
Expand Down Expand Up @@ -611,7 +683,7 @@ def set_logging(log_level):

logging.addLevelName(logging.WARNING, yellow + logging.getLevelName(logging.WARNING) + reset)
logging.addLevelName(logging.ERROR, bold_red + logging.getLevelName(logging.ERROR) + reset)
log_format = '%(levelname)s: %(filename)s: %(message)s'
log_format = '%(levelname)s: %(asctime)s %(filename)s: %(message)s'

log_level = log_level.lower()
if log_level == "debug":
Expand Down Expand Up @@ -785,7 +857,7 @@ def verify_instances(args, src_token, dest_token):
if args.src_org is not None and not verify_org(args.src_host, src_token, args.src_org, args.skip_verify):
raise InfluxDBError(message="The source org could not be verified")
if args.src_bucket is not None and args.full is False and \
not bucket_exists(args.src_host, src_token, args.src_bucket, args.skip_verify, args.src_org):
not bucket_exists(args.src_host, src_token, args.src_bucket, args.src_org, args.skip_verify):
raise InfluxDBError(message="The source bucket could not be found")
if not args.skip_verify and not verify_tls(args.src_host):
raise InfluxDBError(message="TLS certificate could not be verified for source host")
Expand All @@ -800,7 +872,7 @@ def verify_instances(args, src_token, dest_token):
if args.dest_org is not None and not verify_org(args.dest_host, dest_token, args.dest_org, args.skip_verify):
raise InfluxDBError(message="The destination org could not be verified")
if args.dest_bucket is not None and args.full is False and \
bucket_exists(args.dest_host, dest_token, args.dest_bucket, args.skip_verify, args.dest_org):
bucket_exists(args.dest_host, dest_token, args.dest_bucket, args.dest_org, args.skip_verify):
message = (f"The destination bucket {args.dest_bucket} already exists in the "
"destination instance")
if args.dest_org is not None:
Expand Down Expand Up @@ -1084,6 +1156,20 @@ def main(args):
"--retry-restore-dir option and provide the previously-mentioned backup directory.")
raise

# Report number of series in destination after migration
if logging.root.level >= logging.DEBUG:
time.sleep(METRICS_SCRAPE_INTERVAL_SECONDS)
if args.full:
if args.csv:
report_all_bucket_series_count(host=args.dest_host, token=dest_token, skip_verify=args.skip_verify)
else:
# For a full migration without csv, destination instance will contain
# the source token after migration
report_all_bucket_series_count(host=args.dest_host, token=src_token, skip_verify=args.skip_verify)
else:
report_bucket_series_count(bucket_name=args.dest_bucket, host=args.dest_host,
token=dest_token, org_name=args.dest_org, skip_verify=args.skip_verify)

logging.info("Migration complete")
log_performance_metrics("influx_migration.py", script_start_time, script_duration)
except (ApiException, CalledProcessError, botocore.exceptions.ClientError, OSError, ValueError,
Expand Down

0 comments on commit fd5edf5

Please sign in to comment.