Skip to content

Commit

Permalink
YDA-5533: Revisions, replications specify max memory, dry run option
Browse files Browse the repository at this point in the history
Conflicts:
	iiRevisions.r
	replication.py
	revisions.py
	tools/async-job.py
	uuFunctions.r
  • Loading branch information
claravox committed Nov 27, 2023
1 parent 10eebe6 commit ce79827
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 34 deletions.
4 changes: 2 additions & 2 deletions iiRevisions.r
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# \param[in] balance_id_min Minimum balance id for batch jobs (value 1-64)
# \param[in] balance_id_max Maximum balance id for batch jobs (value 1-64)
# \param[in] batch_size_limit Maximum number of items to be processed in a batch job
uuRevisionBatch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit) {
rule_revision_batch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit);
uuRevisionBatch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit, *dryRun) {
rule_revision_batch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit, *dryRun);
}


Expand Down
66 changes: 57 additions & 9 deletions replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import genquery
import irods_types
import psutil

from util import *

Expand Down Expand Up @@ -44,7 +45,7 @@ def replicate_asynchronously(ctx, path, source_resource, target_resource):


@rule.make()
def rule_replicate_batch(ctx, verbose, balance_id_min, balance_id_max, batch_size_limit):
def rule_replicate_batch(ctx, verbose, balance_id_min, balance_id_max, batch_size_limit, dry_run):
"""Scheduled replication batch job.
Performs replication for all data objects marked with 'org_replication_scheduled' metadata.
Expand All @@ -54,16 +55,18 @@ def rule_replicate_batch(ctx, verbose, balance_id_min, balance_id_max, batch_siz
To enable efficient parallel batch processing, each batch job gets assigned a range of numbers. For instance 1-32.
The corresponding job will only process data objects with a balance id within the range.
:param ctx: Combined type of a callback and rei struct
:param verbose: Whether to log verbose messages for troubleshooting ('1': yes, anything else: no)
:param balance_id_min: Minimum balance id for batch jobs (value 1-64)
:param balance_id_max: Maximum balance id for batch jobs (value 1-64)
:param ctx: Combined type of a callback and rei struct
:param verbose: Whether to log verbose messages for troubleshooting ('1': yes, anything else: no)
:param balance_id_min: Minimum balance id for batch jobs (value 1-64)
:param balance_id_max: Maximum balance id for batch jobs (value 1-64)
:param batch_size_limit: Maximum number of items to be processed within one batch
:param dry_run: When '1' do not actually replicate, only log what would have replicated
"""
count = 0
count_ok = 0
print_verbose = (verbose == '1')
no_action = (dry_run == '1')

attr = constants.UUORGMETADATAPREFIX + "replication_scheduled"
errorattr = constants.UUORGMETADATAPREFIX + "replication_failed"
Expand All @@ -76,7 +79,14 @@ def rule_replicate_batch(ctx, verbose, balance_id_min, balance_id_max, batch_siz

minimum_timestamp = int(time.time() - config.async_replication_delay_time)

# Get list of up to 1000 data objects scheduled for replication, taking into account their modification time.
log.write(ctx, "verbose = {}".format(verbose))
if verbose:
log.write(ctx, "async_replication_delay_time = {} seconds".format(config.async_replication_delay_time))
log.write(ctx, "max_rss = {} bytes".format(config.async_replication_max_rss))
log.write(ctx, "dry_run = {}".format(dry_run))
show_memory_usage(ctx)

# Get list of up to batch size limit of data objects scheduled for replication, taking into account their modification time.
iter = list(genquery.Query(ctx,
['ORDER(DATA_ID)', 'COLL_NAME', 'DATA_NAME', 'META_DATA_ATTR_VALUE', 'DATA_RESC_NAME'],
"META_DATA_ATTR_NAME = '{}' AND DATA_MODIFY_TIME n<= '{}'".format(attr, minimum_timestamp),
Expand All @@ -87,6 +97,13 @@ def rule_replicate_batch(ctx, verbose, balance_id_min, balance_id_max, batch_siz
log.write(ctx, "Batch replication job is stopped")
break

# Check current memory usage and stop if it is above the limit.
if memory_limit_exceeded(config.async_replication_max_rss):
show_memory_usage(ctx)
log.write(ctx, "Memory used is now above specified limit of {} bytes, stopping further processing".format(config.async_replication_max_rss))
break

count += 1
path = row[1] + "/" + row[2]

# Metadata value contains from_path, to_path and balace id for load balancing purposes.
Expand Down Expand Up @@ -124,6 +141,12 @@ def rule_replicate_batch(ctx, verbose, balance_id_min, balance_id_max, batch_siz
count += 1
data_resc_name = row[4]

# "No action" is meant for easier memory usage debugging.
if no_action:
show_memory_usage(ctx)
log.write(ctx, "Skipping batch replication (dry_run): would have replicated \"{}\" from {} to {}".format(path, from_path, to_path))
continue

if print_verbose:
log.write(ctx, "Batch replication: copying {} from {} to {}".format(path, from_path, to_path))

Expand Down Expand Up @@ -184,6 +207,9 @@ def rule_replicate_batch(ctx, verbose, balance_id_min, balance_id_max, batch_siz
# error => report it but still continue
log.write(ctx, "ERROR - Scheduled replication of <{}>: could not remove schedule flag".format(path))

if print_verbose:
show_memory_usage(ctx)

# Total replication process completed
log.write(ctx, "Batch replication job finished. {}/{} objects replicated successfully.".format(count_ok, count))

Expand All @@ -195,6 +221,28 @@ def is_replication_blocked_by_admin(ctx):
:returns: Boolean indicating if admin put replication on hold.
"""
zone = user.zone(ctx)
path = "/{}/yoda/flags/stop_replication".format(zone)
return collection.exists(ctx, path)
return data_object.exists(ctx, "/{}{}".format(user.zone(ctx), "/yoda/flags/stop_replication"))


def memory_rss_usage():
"""
The RSS (resident) memory size in bytes for the current process.
"""
p = psutil.Process()
return p.memory_info().rss


def show_memory_usage(ctx):
"""
For debug purposes show the current RSS usage.
"""
log.write(ctx, "current RSS usage: {} bytes".format(memory_rss_usage()))


def memory_limit_exceeded(rss_limit):
"""
True when a limit other than 0 was specified and memory usage is currently
above this limit. Otherwise False.
"""
rss_limit = int(rss_limit)
return rss_limit and memory_rss_usage() > rss_limit
68 changes: 57 additions & 11 deletions revisions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import genquery
import irods_types
import psutil

import folder
import groups
Expand Down Expand Up @@ -278,7 +279,7 @@ def resource_modified_post_revision(ctx, resource, zone, path):


@rule.make()
def rule_revision_batch(ctx, verbose, balance_id_min, balance_id_max, batch_size_limit):
def rule_revision_batch(ctx, verbose, balance_id_min, balance_id_max, batch_size_limit, dry_run='0'):
"""Scheduled revision creation batch job.
Creates revisions for all data objects (in research space) marked with 'org_revision_scheduled' metadata.
Expand All @@ -287,17 +288,18 @@ def rule_revision_batch(ctx, verbose, balance_id_min, balance_id_max, batch_size
To enable efficient parallel batch processing, each batch job gets assigned a range of numbers. For instance 1-32.
The corresponding job will only process data objects with a balance id within the range.
:param ctx: Combined type of a callback and rei struct
:param verbose: Whether to log verbose messages for troubleshooting ('1': yes, anything else: no)
:param balance_id_min: Minimum balance id for batch jobs (value 1-64)
:param balance_id_max: Maximum balance id for batch jobs (value 1-64)
:param ctx: Combined type of a callback and rei struct
:param verbose: Whether to log verbose messages for troubleshooting ('1': yes, anything else: no)
:param balance_id_min: Minimum balance id for batch jobs (value 1-64)
:param balance_id_max: Maximum balance id for batch jobs (value 1-64)
:param batch_size_limit: Maximum number of items to be processed within one batch
:param dry_run: When '1' do not actually create revisions, only log what would have been created
"""
count = 0
count_ok = 0
count_ignored = 0
print_verbose = (verbose == '1')
no_action = (dry_run == '1')

attr = constants.UUORGMETADATAPREFIX + "revision_scheduled"
errorattr = constants.UUORGMETADATAPREFIX + "revision_failed"
Expand All @@ -310,8 +312,15 @@ def rule_revision_batch(ctx, verbose, balance_id_min, balance_id_max, batch_size

minimum_timestamp = int(time.time() - config.async_revision_delay_time)

# Get list up to 1000 data objects (in research space) scheduled for revision, taking into account
# Get list of up to batch size limit of data objects (in research space) scheduled for revision, taking into account
# modification time.
log.write(ctx, "verbose = {}".format(verbose))
if verbose:
log.write(ctx, "async_revision_delay_time = {} seconds".format(config.async_revision_delay_time))
log.write(ctx, "max_rss = {} bytes".format(config.async_revision_max_rss))
log.write(ctx, "dry_run = {}".format(dry_run))
show_memory_usage(ctx)

iter = list(genquery.Query(ctx,
['ORDER(DATA_ID)', 'COLL_NAME', 'DATA_NAME', 'META_DATA_ATTR_VALUE'],
"META_DATA_ATTR_NAME = '{}' AND COLL_NAME like '/{}/home/{}%' AND DATA_MODIFY_TIME n<= '{}'".format(
Expand All @@ -326,6 +335,12 @@ def rule_revision_batch(ctx, verbose, balance_id_min, balance_id_max, batch_size
log.write(ctx, "Batch revision job is stopped")
break

# Check current memory usage and stop if it is above the limit.
if memory_limit_exceeded(config.async_revision_max_rss):
show_memory_usage(ctx)
log.write(ctx, "Memory used is now above specified limit of {} bytes, stopping further processing".format(config.async_revision_max_rss))
break

# Perform scheduled revision creation for one data object.
data_id = row[0]
path = row[1] + "/" + row[2]
Expand All @@ -350,6 +365,12 @@ def rule_revision_batch(ctx, verbose, balance_id_min, balance_id_max, batch_size
# For getting the total count only the data objects within the wanted range
count += 1

# "No action" is meant for easier memory usage debugging.
if no_action:
show_memory_usage(ctx)
log.write(ctx, "Skipping creating revision (dry_run): would have created revision for {} on resc {}".format(path, resc))
continue

if print_verbose:
log.write(ctx, "Batch revision: creating revision for {} on resc {}".format(path, resc))

Expand Down Expand Up @@ -400,6 +421,9 @@ def rule_revision_batch(ctx, verbose, balance_id_min, balance_id_max, batch_size
log.write(ctx, "ERROR - Scheduled revision creation of <{}> failed".format(path))
avu.set_on_data(ctx, path, errorattr, "true")

if print_verbose:
show_memory_usage(ctx)

# Total revision process completed
log.write(ctx, "Batch revision job finished. {}/{} objects processed successfully. ".format(count_ok, count))
log.write(ctx, "Batch revision job ignored {} data objects in research area, excluding data objects postponed because of delay time.".format(count_ignored))
Expand All @@ -410,11 +434,9 @@ def is_revision_blocked_by_admin(ctx):
:param ctx: Combined type of a callback and rei struct
:returns: Boolean indicating if admin put replication on hold.
:returns: Boolean indicating if admin put revisions on hold.
"""
zone = user.zone(ctx)
path = "/{}/yoda/flags/stop_revisions".format(zone)
return collection.exists(ctx, path)
return data_object.exists(ctx, "/{}{}".format(user.zone(ctx), "/yoda/flags/stop_revisions"))


def revision_create(ctx, resource, data_id, max_size, verbose):
Expand Down Expand Up @@ -860,3 +882,27 @@ def revision_remove(ctx, revision_id, revision_path):

log.write(ctx, "ERROR - Revision ID <{}> not found or permission denied.".format(revision_id))
return False


def memory_rss_usage():
"""
The RSS (resident) memory size in bytes for the current process.
"""
p = psutil.Process()
return p.memory_info().rss


def show_memory_usage(ctx):
"""
For debug purposes show the current RSS usage.
"""
log.write(ctx, "current RSS usage: {} bytes".format(memory_rss_usage()))


def memory_limit_exceeded(rss_limit):
"""
True when a limit other than 0 was specified and memory usage is currently
above this limit. Otherwise False.
"""
rss_limit = int(rss_limit)
return rss_limit and memory_rss_usage() > rss_limit
2 changes: 2 additions & 0 deletions rules_uu.cfg.template
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ enable_inactivity_notification =
inactivity_cutoff_months =

async_replication_delay_time =
async_replication_max_rss =
async_revision_delay_time =
async_revision_max_rss =

enable_tape_archive =

Expand Down
20 changes: 11 additions & 9 deletions tools/async-job.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@

NAME = os.path.basename(sys.argv[0])


def get_args():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description='Yoda replication and revision job')
parser.add_argument('--verbose', '-v', action='store_const', default="0", const="1",
help='Log more information in rodsLog for troubleshooting purposes')
help='Log more information in rodsLog for troubleshooting purposes')
parser.add_argument('--balance_id_min', type=int, default=1,
help='Minimal balance id to be handled by this job (range 1-64) for load balancing purposes')
help='Minimal balance id to be handled by this job (range 1-64) for load balancing purposes')
parser.add_argument('--balance_id_max', type=int, default=64,
help='Maximum balance id to be handled by this job (range 1-64) for load balancing purposes')
help='Maximum balance id to be handled by this job (range 1-64) for load balancing purposes')
parser.add_argument('--batch_size_limit', type=int, default=1000,
help='Maximum number of items to be processed per batch job')

help='Maximum number of items to be processed per batch job')
parser.add_argument('--dry-run', '-n', action='store_const', default="0", const="1",
help='Perform a trial run for troubleshooting purposes')
return parser.parse_args()


Expand All @@ -57,15 +59,15 @@ def lock_or_die(balance_id_min, balance_id_max):


if 'replicate' in NAME:
rule_name = 'uuReplicateBatch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit)'
rule_name = 'uuReplicateBatch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit, *dry_run)'
elif 'revision' in NAME:
rule_name = 'uuRevisionBatch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit)'
rule_name = 'uuRevisionBatch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit, *dry_run)'
else:
print('bad command "{}"'.format(NAME), file=sys.stderr)
exit(1)

args = get_args()
lock_or_die(args.balance_id_min, args.balance_id_max)
rule_options = "*verbose={}%*balance_id_min={}%*balance_id_max={}%*batch_size_limit={}".format(args.verbose, args.balance_id_min, args.balance_id_max, args.batch_size_limit)
rule_options = "*verbose={}%*balance_id_min={}%*balance_id_max={}%*batch_size_limit={}%*dry_run={}".format(args.verbose, args.balance_id_min, args.balance_id_max, args.batch_size_limit, args.dry_run)
subprocess.call(['irule', '-r', 'irods_rule_engine_plugin-irods_rule_language-instance',
rule_name, rule_options, 'ruleExecOut'])
rule_name, rule_options, 'ruleExecOut'])
2 changes: 2 additions & 0 deletions util/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ def __repr__(self):
token_lifetime=0,
token_expiration_notification=0,
async_replication_delay_time=0,
async_replication_max_rss=1000000000,
async_revision_delay_time=0,
async_revision_max_rss=1000000000,
yoda_portal_fqdn=None,
epic_pid_enabled=False,
epic_url=None,
Expand Down
7 changes: 4 additions & 3 deletions uuFunctions.r
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@ uuObjectMetadataKvp(*data_id, *prefix, *kvp) {
# Performs replication for all data objects marked with 'org_replication_scheduled' metadata.
# The metadata value indicates the source and destination resource.
#
# \param[in] verbose whether to log verbose messages for troubleshooting (1: yes, 0: no)
# \param[in] verbose Whether to log verbose messages for troubleshooting (1: yes, 0: no)
# \param[in] balance_id_min Minimum balance id for batch jobs (value 1-64)
# \param[in] balance_id_max Maximum balance id for batch jobs (value 1-64)
# \param[in] batch_size_limit Maximum number of items to be processed within one batch
uuReplicateBatch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit) {
rule_replicate_batch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit);
# \param[in] dry_run Whether to do a trial run (1: yes, 0: no)
uuReplicateBatch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit, *dry_run) {
rule_replicate_batch(*verbose, *balance_id_min, *balance_id_max, *batch_size_limit, *dry_run);
}

0 comments on commit ce79827

Please sign in to comment.