From 3134c29413f9e36192c69b7ce1dc083360dbd65e Mon Sep 17 00:00:00 2001 From: tmaeno Date: Tue, 15 Oct 2024 09:35:27 +0200 Subject: [PATCH] * connection wrapper for oracle to override context manager protocol * trust_user in storeJobs * get_site_mapper in TaskBuffer --- PandaPkgInfo.py | 2 +- pandaserver/taskbuffer/OraDBProxy.py | 9 +++-- pandaserver/taskbuffer/TaskBuffer.py | 34 +++++++++++++++---- pandaserver/taskbuffer/WrappedPostgresConn.py | 2 +- pandaserver/taskbuffer/wrapped_oracle_conn.py | 18 ++++++++++ 5 files changed, 53 insertions(+), 12 deletions(-) create mode 100644 pandaserver/taskbuffer/wrapped_oracle_conn.py diff --git a/PandaPkgInfo.py b/PandaPkgInfo.py index 78fbc477e..f0f806dd3 100644 --- a/PandaPkgInfo.py +++ b/PandaPkgInfo.py @@ -1 +1 @@ -release_version = "0.3.22" +release_version = "0.3.23" diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index 10f57394f..1936b24a7 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -17,7 +17,6 @@ import sys import time import traceback -import urllib import uuid import warnings @@ -61,6 +60,8 @@ if panda_config.backend == "oracle": import oracledb + from . import wrapped_oracle_conn + oracledb.init_oracle_client() varNUMBER = oracledb.NUMBER @@ -201,13 +202,15 @@ def connect( # connect try: if self.backend == "oracle": - self.conn = oracledb.connect(dsn=self.dbhost, user=self.dbuser, password=self.dbpasswd) + conn = oracledb.connect(dsn=self.dbhost, user=self.dbuser, password=self.dbpasswd) def OutputTypeHandler(cursor, name, defaultType, size, precision, scale): if defaultType == oracledb.CLOB: return cursor.var(oracledb.LONG_STRING, arraysize=cursor.arraysize) - self.conn.outputtypehandler = OutputTypeHandler + conn.outputtypehandler = OutputTypeHandler + self.conn = wrapped_oracle_conn.WrappedOracleConn(conn) + elif self.backend == "postgres": dsn = {"dbname": self.dbname, "user": self.dbuser, "keepalives_idle": 30, "keepalives_interval": 30, "keepalives": 1} if self.dbpasswd: diff --git a/pandaserver/taskbuffer/TaskBuffer.py b/pandaserver/taskbuffer/TaskBuffer.py index 07d697ee1..bd009e9e1 100755 --- a/pandaserver/taskbuffer/TaskBuffer.py +++ b/pandaserver/taskbuffer/TaskBuffer.py @@ -36,6 +36,11 @@ def __init__(self): # save the requester for monitoring/logging purposes self.start_time = time.time() + # site mapper + self.site_mapper = None + # update time for site mapper + self.last_update_site_mapper = None + def __repr__(self): return "TaskBuffer" @@ -69,6 +74,16 @@ def cleanup(self, requester=None): def get_num_connections(self): return self.nDBConnection + # get SiteMapper + def get_site_mapper(self): + time_now = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) + if self.last_update_site_mapper is None or datetime.datetime.now(datetime.timezone.utc).replace( + tzinfo=None + ) - self.last_update_site_mapper > datetime.timedelta(minutes=10): + self.site_mapper = SiteMapper(self) + self.last_update_site_mapper = time_now + return self.site_mapper + # check production role def checkProdRole(self, fqans): for fqan in fqans: @@ -175,10 +190,11 @@ def storeJobs( getEsJobsetMap=False, unprocessedMap=None, bulk_job_insert=False, + trust_user=False, ): try: - tmpLog = LogWrapper(_logger, f"storeJobs <{CoreUtils.clean_user_id(user)}>") - tmpLog.debug(f"start nJobs={len(jobs)}") + tmpLog = LogWrapper(_logger, f"storeJobs <{CoreUtils.clean_user_id(user)} nJobs={len(jobs)}>") + tmpLog.debug(f"start toPending={toPending}") # check quota for priority calculation weight = 0.0 userJobID = -1 @@ -189,10 +205,10 @@ def storeJobs( useExpress = False nExpressJobs = 0 useDebugMode = False - siteMapper = SiteMapper(self) + siteMapper = self.get_site_mapper() # check ban user - if len(jobs) > 0: + if not trust_user and len(jobs) > 0: # get DB proxy proxy = self.proxyPool.getProxy() # check user status @@ -206,6 +222,7 @@ def storeJobs( return [], None, unprocessedMap return [] + tmpLog.debug(f"checked ban user") # set parameters for user jobs if ( len(jobs) > 0 @@ -230,9 +247,6 @@ def storeJobs( # release proxy self.proxyPool.putProxy(proxy) - # get site spec - tmpSiteSpec = siteMapper.getSite(jobs[0].computingSite) - # extract country group for tmpFQAN in fqans: match = re.search("^/atlas/([^/]+)/", tmpFQAN) @@ -246,6 +260,7 @@ def storeJobs( if tmpCountry in ["usatlas"]: userCountry = "us" break + tmpLog.debug(f"set user job parameters") # return if DN is blocked if not userStatus: @@ -294,23 +309,28 @@ def storeJobs( prio_reduction, ) = self.getPrioParameters(jobs, user, fqans, userDefinedWG, validWorkingGroup) tmpLog.debug(f"workingGroup={jobs[0].workingGroup} serNum={serNum} weight={weight} pOffset={priorityOffset} reduction={prio_reduction}") + tmpLog.debug(f"got prio parameters") # get DB proxy proxy = self.proxyPool.getProxy() + tmpLog.debug(f"got proxy") # get group job serial number groupJobSerialNum = 0 if len(jobs) > 0 and (jobs[0].prodSourceLabel in JobUtils.analy_sources) and (not jobs[0].processingType in ["merge", "unmerge"]): for tmpFile in jobs[-1].Files: if tmpFile.type in ["output", "log"] and "$GROUPJOBSN" in tmpFile.lfn: + tmpLog.debug(f"getting group job serial number") tmpSnRet = proxy.getSerialNumberForGroupJob(user) if tmpSnRet["status"]: groupJobSerialNum = tmpSnRet["sn"] break + tmpLog.debug(f"got group job serial number") # get total number of files totalNumFiles = 0 for job in jobs: totalNumFiles += len(job.Files) # bulk fetch PandaIDs new_panda_ids = proxy.bulk_fetch_panda_ids(len(jobs)) + tmpLog.debug(f"got PandaIDs") # bulk fetch fileIDs fileIDPool = [] if totalNumFiles > 0: diff --git a/pandaserver/taskbuffer/WrappedPostgresConn.py b/pandaserver/taskbuffer/WrappedPostgresConn.py index 45a2c99f0..7701d3043 100644 --- a/pandaserver/taskbuffer/WrappedPostgresConn.py +++ b/pandaserver/taskbuffer/WrappedPostgresConn.py @@ -18,7 +18,7 @@ def ping(self): raise RuntimeError("connection closed") def __enter__(self): - self.orig_conn.__enter__() + return self.orig_conn.__enter__() def __exit__(self, exc_type, exc_val, exc_tb): self.orig_conn.__exit__(exc_type, exc_val, exc_tb) diff --git a/pandaserver/taskbuffer/wrapped_oracle_conn.py b/pandaserver/taskbuffer/wrapped_oracle_conn.py new file mode 100644 index 000000000..9efe30c6f --- /dev/null +++ b/pandaserver/taskbuffer/wrapped_oracle_conn.py @@ -0,0 +1,18 @@ +# wrapper for Oracle Connection +class WrappedOracleConn(object): + def __init__(self, conn): + self.orig_conn = conn + + def __getattribute__(self, item): + try: + return object.__getattribute__(self.orig_conn, item) + except Exception: + pass + return object.__getattribute__(self, item) + + # override context manager protocol not to close connection + def __enter__(self): + return self.orig_conn.__enter__() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.orig_conn.commit()