Skip to content

Commit

Permalink
Merge pull request #207 from tmaeno/master
Browse files Browse the repository at this point in the history
lock_pool in adder
  • Loading branch information
tmaeno authored Apr 25, 2023
2 parents 7d7e515 + f8392dd commit 9b21a68
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 17 deletions.
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ RUN mkdir -p /var/log/panda/wsgisocks
RUN mkdir -p /var/log/panda/pandacache
RUN mkdir -p /run/httpd/wsgisocks
RUN mkdir -p /var/log/panda/pandacache/jedilog
RUN mkdir -p /var/cache/pandaserver/schedconfig
RUN mkdir -p /var/run/panda
RUN mkdir -p /var/cric

Expand Down Expand Up @@ -75,6 +76,7 @@ RUN chmod -R 777 /var/log/panda/pandacache
RUN chmod -R 777 /var/run/panda
RUN chmod -R 777 /var/lib/logrotate
RUN chmod -R 777 /var/cric
RUN chmod -R 777 /var/cache/pandaserver

ENV PANDA_LOCK_DIR /var/run/panda
RUN mkdir -p ${PANDA_LOCK_DIR} && chmod 777 ${PANDA_LOCK_DIR}
Expand Down
2 changes: 1 addition & 1 deletion PandaPkgInfo.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.0.52"
release_version = "0.0.53"
12 changes: 7 additions & 5 deletions pandaserver/daemons/scripts/add_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@


# main
def main(argv=tuple(), tbuf=None, **kwargs):
def main(argv=tuple(), tbuf=None, lock_pool=None, **kwargs):

try:
long
Expand Down Expand Up @@ -61,11 +61,12 @@ def main(argv=tuple(), tbuf=None, **kwargs):
class AdderThread(GenericThread):

def __init__(self, taskBuffer, aSiteMapper,
job_output_reports):
job_output_reports, lock_pool):
GenericThread.__init__(self)
self.taskBuffer = taskBuffer
self.aSiteMapper = aSiteMapper
self.job_output_reports = job_output_reports
self.lock_pool = lock_pool

# main loop
def run(self):
Expand Down Expand Up @@ -111,7 +112,8 @@ def run(self):
# get adder
adder_gen = AdderGen(taskBuffer, panda_id, job_status, attempt_nr,
ignoreTmpError=ignoreTmpError, siteMapper=aSiteMapper, pid=uniq_pid,
prelock_pid=uniq_pid, lock_offset=lock_interval-retry_interval)
prelock_pid=uniq_pid, lock_offset=lock_interval-retry_interval,
lock_pool=lock_pool)
n_processed += 1
# execute
adder_gen.run()
Expand Down Expand Up @@ -176,9 +178,9 @@ def proc_join(self):
tbuf = TaskBuffer()
tbuf_list.append(tbuf)
tbuf.init(panda_config.dbhost, panda_config.dbpasswd, nDBConnection=1, useTimeout=True)
thr = AdderThread(tbuf, aSiteMapper, jor_lists)
thr = AdderThread(tbuf, aSiteMapper, jor_lists, lock_pool)
else:
thr = AdderThread(taskBufferIF.getInterface(), aSiteMapper, jor_lists)
thr = AdderThread(taskBufferIF.getInterface(), aSiteMapper, jor_lists, lock_pool)
adderThrList.append(thr)
# start all threads
for thr in adderThrList:
Expand Down
21 changes: 13 additions & 8 deletions pandaserver/daemons/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import psutil

from pandacommon.pandalogger import logger_utils
from pandacommon.pandautils.thread_utils import LockPool
from pandaserver.config import panda_config, daemon_config


Expand Down Expand Up @@ -63,7 +64,7 @@ def kill_proc_tree(pid, sig=signal.SIGKILL, include_parent=True,


# worker process loop of daemon
def daemon_loop(dem_config, msg_queue, pipe_conn, worker_lifetime, tbuf=None):
def daemon_loop(dem_config, msg_queue, pipe_conn, worker_lifetime, tbuf=None, lock_pool=None):
# pid of the worker
my_pid = os.getpid()
my_full_pid = '{0}-{1}-{2}'.format(socket.getfqdn().split('.')[0], os.getpgrp(), my_pid)
Expand Down Expand Up @@ -202,7 +203,7 @@ def got_end_sig(sig, frame):
tmp_log.info('{dem} start looping'.format(dem=dem_name))
start_ts = time.time()
while True:
ret_val = the_module.main(argv=mod_argv, tbuf=tbuf)
ret_val = the_module.main(argv=mod_argv, tbuf=tbuf, lock_pool=lock_pool)
now_ts = time.time()
if not ret_val:
# daemon main function says stop the loop
Expand All @@ -214,7 +215,7 @@ def got_end_sig(sig, frame):
else:
# execute the module script with arguments
tmp_log.info('{dem} start'.format(dem=dem_name))
the_module.main(argv=mod_argv, tbuf=tbuf)
the_module.main(argv=mod_argv, tbuf=tbuf, lock_pool=lock_pool)
tmp_log.info('{dem} finish'.format(dem=dem_name))
except Exception as e:
# with error
Expand Down Expand Up @@ -262,14 +263,15 @@ class DaemonWorker(object):
_lock = threading.Lock()

# constructor
def __init__(self, dem_config, msg_queue, worker_lifetime, tbuf=None):
def __init__(self, dem_config, msg_queue, worker_lifetime, tbuf=None, lock_pool=None):
# synchronized with lock
with self._lock:
self._make_pipe()
self._make_process( dem_config=dem_config,
msg_queue=msg_queue,
worker_lifetime=worker_lifetime,
tbuf=tbuf)
tbuf=tbuf,
lock_pool=lock_pool)

# make pipe connection pairs for the worker
def _make_pipe(self):
Expand All @@ -281,8 +283,8 @@ def _close_pipe(self):
self.child_conn.close()

# make associated process
def _make_process(self, dem_config, msg_queue, worker_lifetime, tbuf):
args = (dem_config, msg_queue, self.child_conn, worker_lifetime, tbuf)
def _make_process(self, dem_config, msg_queue, worker_lifetime, tbuf, lock_pool):
args = (dem_config, msg_queue, self.child_conn, worker_lifetime, tbuf, lock_pool)
self.process = multiprocessing.Process(target=daemon_loop, args=args)

# start worker process
Expand Down Expand Up @@ -351,6 +353,8 @@ def __init__(self, logger, n_workers=1, n_dbconn=1, worker_lifetime=28800, use_t
# shared taskBufferIF
self.tbif = None
self._make_tbif()
# shared lock pool
self.lock_pool = LockPool()
# spawn workers
self._spawn_workers(self.n_workers)

Expand Down Expand Up @@ -386,7 +390,8 @@ def _spawn_workers(self, n_workers=1, auto_start=False):
worker = DaemonWorker( dem_config=self.dem_config,
msg_queue=self.msg_queue,
worker_lifetime=self.worker_lifetime,
tbuf=tbuf)
tbuf=tbuf,
lock_pool=self.lock_pool)
self.worker_pool.add(worker)
if auto_start:
worker.start()
Expand Down
17 changes: 15 additions & 2 deletions pandaserver/dataservice/AdderGen.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
class AdderGen(object):
# constructor
def __init__(self, taskBuffer, jobID, jobStatus, attemptNr, ignoreTmpError=True, siteMapper=None,
pid=None, prelock_pid=None, lock_offset=10):
pid=None, prelock_pid=None, lock_offset=10, lock_pool=None):
self.job = None
self.jobID = jobID
self.jobStatus = jobStatus
Expand All @@ -52,6 +52,7 @@ def __init__(self, taskBuffer, jobID, jobStatus, attemptNr, ignoreTmpError=True,
self.pid = pid
self.prelock_pid = prelock_pid
self.data = None
self.lock_pool = lock_pool
# logger
self.logger = LogWrapper(_logger,str(self.jobID))

Expand Down Expand Up @@ -304,13 +305,25 @@ def run(self):
if self.job.commandToPilot == 'tobekilled' and self.job.jobStatus == 'failed':
self.job.jobStatus = 'cancelled'
# update job
if oldJobStatus in ['cancelled','closed']:
if oldJobStatus in ['cancelled', 'closed']:
pass
else:
db_lock = None
if self.job.jediTaskID not in [0, None, 'NULL'] and self.lock_pool:
db_lock = self.lock_pool.get(self.job.jediTaskID)
if db_lock:
db_lock.acquire()
self.logger.debug("got DB lock for jediTaskID={}".format(self.job.jediTaskID))
else:
self.logger.debug("couldn't get DB lock for jediTaskID={}".format(self.job.jediTaskID))
self.logger.debug("updating DB")
retU = self.taskBuffer.updateJobs([self.job],False,oldJobStatusList=[oldJobStatus],
extraInfo=self.extraInfo)
self.logger.debug("retU: %s" % retU)
if db_lock:
self.logger.debug("release DB lock for jediTaskID={}".format(self.job.jediTaskID))
db_lock.release()
self.lock_pool.release(self.job.jediTaskID)
# failed
if not retU[0]:
self.logger.error('failed to update DB for pandaid={0}'.format(self.job.PandaID))
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def run (self):
author_email='atlas-adc-panda@cern.ch',
url='https://twiki.cern.ch/twiki/bin/view/Atlas/PanDA',
zip_safe=False,
install_requires=['panda-common>=0.0.31',
install_requires=['panda-common>=0.0.34',
'panda-client',
'pyOpenSSL',
'python-daemon',
Expand Down

0 comments on commit 9b21a68

Please sign in to comment.