From ce7982741a7ba6baf5362545bbfb853def93f6fa Mon Sep 17 00:00:00 2001 From: claravox Date: Thu, 23 Nov 2023 14:19:38 +0100 Subject: [PATCH] YDA-5533: Revisions, replications specify max memory, dry run option Conflicts: iiRevisions.r replication.py revisions.py tools/async-job.py uuFunctions.r --- iiRevisions.r | 4 +-- replication.py | 66 +++++++++++++++++++++++++++++++++++------ revisions.py | 68 ++++++++++++++++++++++++++++++++++++------- rules_uu.cfg.template | 2 ++ tools/async-job.py | 20 +++++++------ util/config.py | 2 ++ uuFunctions.r | 7 +++-- 7 files changed, 135 insertions(+), 34 deletions(-) diff --git a/iiRevisions.r b/iiRevisions.r index 6e6cee8ff..a8b5676e8 100644 --- a/iiRevisions.r +++ b/iiRevisions.r @@ -13,8 +13,8 @@ # \param[in] balance_id_min Minimum balance id for batch jobs (value 1-64) # \param[in] balance_id_max Maximum balance id for batch jobs (value 1-64) # \param[in] batch_size_limit Maximum number of items to be processed in a batch job -uuRevisionBatch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit) { - rule_revision_batch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit); +uuRevisionBatch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit, *dryRun) { + rule_revision_batch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit, *dryRun); } diff --git a/replication.py b/replication.py index fce9f032c..6193a93b3 100644 --- a/replication.py +++ b/replication.py @@ -11,6 +11,7 @@ import genquery import irods_types +import psutil from util import * @@ -44,7 +45,7 @@ def replicate_asynchronously(ctx, path, source_resource, target_resource): @rule.make() -def rule_replicate_batch(ctx, verbose, balance_id_min, balance_id_max, batch_size_limit): +def rule_replicate_batch(ctx, verbose, balance_id_min, balance_id_max, batch_size_limit, dry_run): """Scheduled replication batch job. Performs replication for all data objects marked with 'org_replication_scheduled' metadata. @@ -54,16 +55,18 @@ def rule_replicate_batch(ctx, verbose, balance_id_min, balance_id_max, batch_siz To enable efficient parallel batch processing, each batch job gets assigned a range of numbers. For instance 1-32. The corresponding job will only process data objects with a balance id within the range. - :param ctx: Combined type of a callback and rei struct - :param verbose: Whether to log verbose messages for troubleshooting ('1': yes, anything else: no) - :param balance_id_min: Minimum balance id for batch jobs (value 1-64) - :param balance_id_max: Maximum balance id for batch jobs (value 1-64) + :param ctx: Combined type of a callback and rei struct + :param verbose: Whether to log verbose messages for troubleshooting ('1': yes, anything else: no) + :param balance_id_min: Minimum balance id for batch jobs (value 1-64) + :param balance_id_max: Maximum balance id for batch jobs (value 1-64) :param batch_size_limit: Maximum number of items to be processed within one batch + :param dry_run: When '1' do not actually replicate, only log what would have replicated """ count = 0 count_ok = 0 print_verbose = (verbose == '1') + no_action = (dry_run == '1') attr = constants.UUORGMETADATAPREFIX + "replication_scheduled" errorattr = constants.UUORGMETADATAPREFIX + "replication_failed" @@ -76,7 +79,14 @@ def rule_replicate_batch(ctx, verbose, balance_id_min, balance_id_max, batch_siz minimum_timestamp = int(time.time() - config.async_replication_delay_time) - # Get list of up to 1000 data objects scheduled for replication, taking into account their modification time. + log.write(ctx, "verbose = {}".format(verbose)) + if verbose: + log.write(ctx, "async_replication_delay_time = {} seconds".format(config.async_replication_delay_time)) + log.write(ctx, "max_rss = {} bytes".format(config.async_replication_max_rss)) + log.write(ctx, "dry_run = {}".format(dry_run)) + show_memory_usage(ctx) + + # Get list of up to batch size limit of data objects scheduled for replication, taking into account their modification time. iter = list(genquery.Query(ctx, ['ORDER(DATA_ID)', 'COLL_NAME', 'DATA_NAME', 'META_DATA_ATTR_VALUE', 'DATA_RESC_NAME'], "META_DATA_ATTR_NAME = '{}' AND DATA_MODIFY_TIME n<= '{}'".format(attr, minimum_timestamp), @@ -87,6 +97,13 @@ def rule_replicate_batch(ctx, verbose, balance_id_min, balance_id_max, batch_siz log.write(ctx, "Batch replication job is stopped") break + # Check current memory usage and stop if it is above the limit. + if memory_limit_exceeded(config.async_replication_max_rss): + show_memory_usage(ctx) + log.write(ctx, "Memory used is now above specified limit of {} bytes, stopping further processing".format(config.async_replication_max_rss)) + break + + count += 1 path = row[1] + "/" + row[2] # Metadata value contains from_path, to_path and balace id for load balancing purposes. @@ -124,6 +141,12 @@ def rule_replicate_batch(ctx, verbose, balance_id_min, balance_id_max, batch_siz count += 1 data_resc_name = row[4] + # "No action" is meant for easier memory usage debugging. + if no_action: + show_memory_usage(ctx) + log.write(ctx, "Skipping batch replication (dry_run): would have replicated \"{}\" from {} to {}".format(path, from_path, to_path)) + continue + if print_verbose: log.write(ctx, "Batch replication: copying {} from {} to {}".format(path, from_path, to_path)) @@ -184,6 +207,9 @@ def rule_replicate_batch(ctx, verbose, balance_id_min, balance_id_max, batch_siz # error => report it but still continue log.write(ctx, "ERROR - Scheduled replication of <{}>: could not remove schedule flag".format(path)) + if print_verbose: + show_memory_usage(ctx) + # Total replication process completed log.write(ctx, "Batch replication job finished. {}/{} objects replicated successfully.".format(count_ok, count)) @@ -195,6 +221,28 @@ def is_replication_blocked_by_admin(ctx): :returns: Boolean indicating if admin put replication on hold. """ - zone = user.zone(ctx) - path = "/{}/yoda/flags/stop_replication".format(zone) - return collection.exists(ctx, path) + return data_object.exists(ctx, "/{}{}".format(user.zone(ctx), "/yoda/flags/stop_replication")) + + +def memory_rss_usage(): + """ + The RSS (resident) memory size in bytes for the current process. + """ + p = psutil.Process() + return p.memory_info().rss + + +def show_memory_usage(ctx): + """ + For debug purposes show the current RSS usage. + """ + log.write(ctx, "current RSS usage: {} bytes".format(memory_rss_usage())) + + +def memory_limit_exceeded(rss_limit): + """ + True when a limit other than 0 was specified and memory usage is currently + above this limit. Otherwise False. + """ + rss_limit = int(rss_limit) + return rss_limit and memory_rss_usage() > rss_limit diff --git a/revisions.py b/revisions.py index 9cb34e882..fb8906d00 100644 --- a/revisions.py +++ b/revisions.py @@ -13,6 +13,7 @@ import genquery import irods_types +import psutil import folder import groups @@ -278,7 +279,7 @@ def resource_modified_post_revision(ctx, resource, zone, path): @rule.make() -def rule_revision_batch(ctx, verbose, balance_id_min, balance_id_max, batch_size_limit): +def rule_revision_batch(ctx, verbose, balance_id_min, balance_id_max, batch_size_limit, dry_run='0'): """Scheduled revision creation batch job. Creates revisions for all data objects (in research space) marked with 'org_revision_scheduled' metadata. @@ -287,17 +288,18 @@ def rule_revision_batch(ctx, verbose, balance_id_min, balance_id_max, batch_size To enable efficient parallel batch processing, each batch job gets assigned a range of numbers. For instance 1-32. The corresponding job will only process data objects with a balance id within the range. - :param ctx: Combined type of a callback and rei struct - :param verbose: Whether to log verbose messages for troubleshooting ('1': yes, anything else: no) - :param balance_id_min: Minimum balance id for batch jobs (value 1-64) - :param balance_id_max: Maximum balance id for batch jobs (value 1-64) + :param ctx: Combined type of a callback and rei struct + :param verbose: Whether to log verbose messages for troubleshooting ('1': yes, anything else: no) + :param balance_id_min: Minimum balance id for batch jobs (value 1-64) + :param balance_id_max: Maximum balance id for batch jobs (value 1-64) :param batch_size_limit: Maximum number of items to be processed within one batch - + :param dry_run: When '1' do not actually create revisions, only log what would have been created """ count = 0 count_ok = 0 count_ignored = 0 print_verbose = (verbose == '1') + no_action = (dry_run == '1') attr = constants.UUORGMETADATAPREFIX + "revision_scheduled" errorattr = constants.UUORGMETADATAPREFIX + "revision_failed" @@ -310,8 +312,15 @@ def rule_revision_batch(ctx, verbose, balance_id_min, balance_id_max, batch_size minimum_timestamp = int(time.time() - config.async_revision_delay_time) - # Get list up to 1000 data objects (in research space) scheduled for revision, taking into account + # Get list of up to batch size limit of data objects (in research space) scheduled for revision, taking into account # modification time. + log.write(ctx, "verbose = {}".format(verbose)) + if verbose: + log.write(ctx, "async_revision_delay_time = {} seconds".format(config.async_revision_delay_time)) + log.write(ctx, "max_rss = {} bytes".format(config.async_revision_max_rss)) + log.write(ctx, "dry_run = {}".format(dry_run)) + show_memory_usage(ctx) + iter = list(genquery.Query(ctx, ['ORDER(DATA_ID)', 'COLL_NAME', 'DATA_NAME', 'META_DATA_ATTR_VALUE'], "META_DATA_ATTR_NAME = '{}' AND COLL_NAME like '/{}/home/{}%' AND DATA_MODIFY_TIME n<= '{}'".format( @@ -326,6 +335,12 @@ def rule_revision_batch(ctx, verbose, balance_id_min, balance_id_max, batch_size log.write(ctx, "Batch revision job is stopped") break + # Check current memory usage and stop if it is above the limit. + if memory_limit_exceeded(config.async_revision_max_rss): + show_memory_usage(ctx) + log.write(ctx, "Memory used is now above specified limit of {} bytes, stopping further processing".format(config.async_revision_max_rss)) + break + # Perform scheduled revision creation for one data object. data_id = row[0] path = row[1] + "/" + row[2] @@ -350,6 +365,12 @@ def rule_revision_batch(ctx, verbose, balance_id_min, balance_id_max, batch_size # For getting the total count only the data objects within the wanted range count += 1 + # "No action" is meant for easier memory usage debugging. + if no_action: + show_memory_usage(ctx) + log.write(ctx, "Skipping creating revision (dry_run): would have created revision for {} on resc {}".format(path, resc)) + continue + if print_verbose: log.write(ctx, "Batch revision: creating revision for {} on resc {}".format(path, resc)) @@ -400,6 +421,9 @@ def rule_revision_batch(ctx, verbose, balance_id_min, balance_id_max, batch_size log.write(ctx, "ERROR - Scheduled revision creation of <{}> failed".format(path)) avu.set_on_data(ctx, path, errorattr, "true") + if print_verbose: + show_memory_usage(ctx) + # Total revision process completed log.write(ctx, "Batch revision job finished. {}/{} objects processed successfully. ".format(count_ok, count)) log.write(ctx, "Batch revision job ignored {} data objects in research area, excluding data objects postponed because of delay time.".format(count_ignored)) @@ -410,11 +434,9 @@ def is_revision_blocked_by_admin(ctx): :param ctx: Combined type of a callback and rei struct - :returns: Boolean indicating if admin put replication on hold. + :returns: Boolean indicating if admin put revisions on hold. """ - zone = user.zone(ctx) - path = "/{}/yoda/flags/stop_revisions".format(zone) - return collection.exists(ctx, path) + return data_object.exists(ctx, "/{}{}".format(user.zone(ctx), "/yoda/flags/stop_revisions")) def revision_create(ctx, resource, data_id, max_size, verbose): @@ -860,3 +882,27 @@ def revision_remove(ctx, revision_id, revision_path): log.write(ctx, "ERROR - Revision ID <{}> not found or permission denied.".format(revision_id)) return False + + +def memory_rss_usage(): + """ + The RSS (resident) memory size in bytes for the current process. + """ + p = psutil.Process() + return p.memory_info().rss + + +def show_memory_usage(ctx): + """ + For debug purposes show the current RSS usage. + """ + log.write(ctx, "current RSS usage: {} bytes".format(memory_rss_usage())) + + +def memory_limit_exceeded(rss_limit): + """ + True when a limit other than 0 was specified and memory usage is currently + above this limit. Otherwise False. + """ + rss_limit = int(rss_limit) + return rss_limit and memory_rss_usage() > rss_limit diff --git a/rules_uu.cfg.template b/rules_uu.cfg.template index 76f1e8edb..37dd3e5b5 100644 --- a/rules_uu.cfg.template +++ b/rules_uu.cfg.template @@ -53,7 +53,9 @@ enable_inactivity_notification = inactivity_cutoff_months = async_replication_delay_time = +async_replication_max_rss = async_revision_delay_time = +async_revision_max_rss = enable_tape_archive = diff --git a/tools/async-job.py b/tools/async-job.py index 579c67cf0..ac5db51eb 100755 --- a/tools/async-job.py +++ b/tools/async-job.py @@ -19,18 +19,20 @@ NAME = os.path.basename(sys.argv[0]) + def get_args(): """Parse command line arguments""" parser = argparse.ArgumentParser(description='Yoda replication and revision job') parser.add_argument('--verbose', '-v', action='store_const', default="0", const="1", - help='Log more information in rodsLog for troubleshooting purposes') + help='Log more information in rodsLog for troubleshooting purposes') parser.add_argument('--balance_id_min', type=int, default=1, - help='Minimal balance id to be handled by this job (range 1-64) for load balancing purposes') + help='Minimal balance id to be handled by this job (range 1-64) for load balancing purposes') parser.add_argument('--balance_id_max', type=int, default=64, - help='Maximum balance id to be handled by this job (range 1-64) for load balancing purposes') + help='Maximum balance id to be handled by this job (range 1-64) for load balancing purposes') parser.add_argument('--batch_size_limit', type=int, default=1000, - help='Maximum number of items to be processed per batch job') - + help='Maximum number of items to be processed per batch job') + parser.add_argument('--dry-run', '-n', action='store_const', default="0", const="1", + help='Perform a trial run for troubleshooting purposes') return parser.parse_args() @@ -57,15 +59,15 @@ def lock_or_die(balance_id_min, balance_id_max): if 'replicate' in NAME: - rule_name = 'uuReplicateBatch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit)' + rule_name = 'uuReplicateBatch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit, *dry_run)' elif 'revision' in NAME: - rule_name = 'uuRevisionBatch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit)' + rule_name = 'uuRevisionBatch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit, *dry_run)' else: print('bad command "{}"'.format(NAME), file=sys.stderr) exit(1) args = get_args() lock_or_die(args.balance_id_min, args.balance_id_max) -rule_options = "*verbose={}%*balance_id_min={}%*balance_id_max={}%*batch_size_limit={}".format(args.verbose, args.balance_id_min, args.balance_id_max, args.batch_size_limit) +rule_options = "*verbose={}%*balance_id_min={}%*balance_id_max={}%*batch_size_limit={}%*dry_run={}".format(args.verbose, args.balance_id_min, args.balance_id_max, args.batch_size_limit, args.dry_run) subprocess.call(['irule', '-r', 'irods_rule_engine_plugin-irods_rule_language-instance', - rule_name, rule_options, 'ruleExecOut']) + rule_name, rule_options, 'ruleExecOut']) diff --git a/util/config.py b/util/config.py index 4cc9c513d..09834f1ee 100644 --- a/util/config.py +++ b/util/config.py @@ -117,7 +117,9 @@ def __repr__(self): token_lifetime=0, token_expiration_notification=0, async_replication_delay_time=0, + async_replication_max_rss=1000000000, async_revision_delay_time=0, + async_revision_max_rss=1000000000, yoda_portal_fqdn=None, epic_pid_enabled=False, epic_url=None, diff --git a/uuFunctions.r b/uuFunctions.r index 7fd826129..4416d58ae 100644 --- a/uuFunctions.r +++ b/uuFunctions.r @@ -79,10 +79,11 @@ uuObjectMetadataKvp(*data_id, *prefix, *kvp) { # Performs replication for all data objects marked with 'org_replication_scheduled' metadata. # The metadata value indicates the source and destination resource. # -# \param[in] verbose whether to log verbose messages for troubleshooting (1: yes, 0: no) +# \param[in] verbose Whether to log verbose messages for troubleshooting (1: yes, 0: no) # \param[in] balance_id_min Minimum balance id for batch jobs (value 1-64) # \param[in] balance_id_max Maximum balance id for batch jobs (value 1-64) # \param[in] batch_size_limit Maximum number of items to be processed within one batch -uuReplicateBatch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit) { - rule_replicate_batch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit); +# \param[in] dry_run Whether to do a trial run (1: yes, 0: no) +uuReplicateBatch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit, *dry_run) { + rule_replicate_batch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit, *dry_run); }