Skip to content

Commit

Permalink
Merge pull request #193 from HSF/flin
Browse files Browse the repository at this point in the history
Add checkInterval for qconf update
  • Loading branch information
mightqxc authored Jul 20, 2023
2 parents a17e68d + 620a4b5 commit b0fd26c
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pandaharvester/commit_timestamp.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
timestamp = "18-07-2023 09:28:39 on flin (by mightqxc)"
timestamp = "18-07-2023 14:53:35 on flin (by mightqxc)"
50 changes: 38 additions & 12 deletions pandaharvester/harvestercore/queue_config_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,24 @@ class QueueConfigMapper(six.with_metaclass(SingletonWithID, object)):
def __init__(self, update_db=True):
self.lock = threading.Lock()
self.lastUpdate = None
self.lastReload = None
self.lastCheck = None
self.dbProxy = DBProxy()
self.toUpdateDB = update_db
try:
self.configFromCacher = harvester_config.qconf.configFromCacher
except AttributeError:
self.configFromCacher = False
self.updateInterval = 600
try:
self.updateInterval = harvester_config.qconf.updateInterval
except AttributeError:
self.updateInterval = 600
try:
self.checkInterval = harvester_config.qconf.checkInterval
except AttributeError:
self.checkInterval = 5
finally:
self.checkInterval = min(self.checkInterval, self.updateInterval)

# load config from DB cache of URL with validation
def _load_config_from_cache(self):
Expand Down Expand Up @@ -258,8 +269,10 @@ def _get_resolver():
return resolver

# update last reload time
def _update_last_reload_time(self):
new_info = '{0:.3f}'.format(time.time())
def _update_last_reload_time(self, ts=None):
if ts is None:
ts = time.time()
new_info = '{0:.3f}'.format(ts)
return self.dbProxy.refresh_cache('_qconf_last_reload', '_universal', new_info)

# get last reload time
Expand All @@ -273,21 +286,32 @@ def _get_last_reload_time(self):
# load data
def load_data(self, refill_table=False):
mainLog = _make_logger(method_name='QueueConfigMapper.load_data')
# check if to update
with self.lock:
# check if to update
timeNow_timestamp = time.time()
if self.lastUpdate is not None:
last_reload_timestamp = self._get_last_reload_time()
if (last_reload_timestamp is not None and self.lastUpdate is not None
and datetime.datetime.utcfromtimestamp(last_reload_timestamp) < self.lastUpdate
and timeNow_timestamp - last_reload_timestamp < self.updateInterval):
return
time_now = datetime.datetime.utcnow()
updateInterval_td = datetime.timedelta(seconds=self.updateInterval)
checkInterval_td = datetime.timedelta(seconds=self.checkInterval)
# skip if lastCheck is fresh (within checkInterval)
if (self.lastCheck is not None
and time_now - self.lastCheck < checkInterval_td):
return
self.lastCheck = time_now
# get last_reload_timestamp from DB
last_reload_timestamp = self._get_last_reload_time()
self.lastReload = None if last_reload_timestamp is None else datetime.datetime.utcfromtimestamp(last_reload_timestamp)
# skip if lastReload is fresh and lastUpdate fresher than lastReload (within updateInterval)
if (self.lastReload is not None and self.lastUpdate is not None
and self.lastReload < self.lastUpdate
and time_now - self.lastReload < updateInterval_td):
return
# start
with self.lock:
# update timesatmp of last reload, lock with check interval
got_timesatmp_update_lock = self.dbProxy.get_process_lock('qconf_reload', 'qconf_universal', self.updateInterval)
if got_timesatmp_update_lock:
retVal = self._update_last_reload_time()
now_ts = time.time()
retVal = self._update_last_reload_time(now_ts)
self.lastReload = datetime.datetime.utcfromtimestamp(now_ts)
if retVal:
mainLog.debug('updated last reload timestamp')
else:
Expand Down Expand Up @@ -606,7 +630,9 @@ def load_data(self, refill_table=False):
queueConfig.configID = dumpSpec.configID
newQueueConfigWithID[dumpSpec.configID] = queueConfig
self.queueConfigWithID = newQueueConfigWithID
# update lastUpdate and lastCheck
self.lastUpdate = datetime.datetime.utcnow()
self.lastCheck = self.lastUpdate
# update database
if self.toUpdateDB:
self.dbProxy.fill_panda_queue_table(self.activeQueues.keys(), self, refill_table=refill_table)
Expand Down
5 changes: 5 additions & 0 deletions templates/panda_harvester.cfg.rpmnew.template
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ autoBlacklist = False
# restrict to a certain pilot version (optional)
#pilotVersion = 2

# update interval in sec (default: 600) - period to update qconf
updateInterval = 600

# check interval in sec (default: 5) - period for other agent threads to check last qconf update
checkInterval = 5


##########################
Expand Down

0 comments on commit b0fd26c

Please sign in to comment.