Skip to content

Commit

Permalink
Merge pull request #429 from tmaeno/master
Browse files Browse the repository at this point in the history
for jedi enhancement
  • Loading branch information
tmaeno authored Oct 15, 2024
2 parents b0d1e25 + 436727e commit 60d5947
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 12 deletions.
2 changes: 1 addition & 1 deletion PandaPkgInfo.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.3.22"
release_version = "0.3.23"
9 changes: 6 additions & 3 deletions pandaserver/taskbuffer/OraDBProxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import sys
import time
import traceback
import urllib
import uuid
import warnings

Expand Down Expand Up @@ -61,6 +60,8 @@
if panda_config.backend == "oracle":
import oracledb

from . import wrapped_oracle_conn

oracledb.init_oracle_client()
varNUMBER = oracledb.NUMBER

Expand Down Expand Up @@ -201,13 +202,15 @@ def connect(
# connect
try:
if self.backend == "oracle":
self.conn = oracledb.connect(dsn=self.dbhost, user=self.dbuser, password=self.dbpasswd)
conn = oracledb.connect(dsn=self.dbhost, user=self.dbuser, password=self.dbpasswd)

def OutputTypeHandler(cursor, name, defaultType, size, precision, scale):
if defaultType == oracledb.CLOB:
return cursor.var(oracledb.LONG_STRING, arraysize=cursor.arraysize)

self.conn.outputtypehandler = OutputTypeHandler
conn.outputtypehandler = OutputTypeHandler
self.conn = wrapped_oracle_conn.WrappedOracleConn(conn)

elif self.backend == "postgres":
dsn = {"dbname": self.dbname, "user": self.dbuser, "keepalives_idle": 30, "keepalives_interval": 30, "keepalives": 1}
if self.dbpasswd:
Expand Down
34 changes: 27 additions & 7 deletions pandaserver/taskbuffer/TaskBuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ def __init__(self):
# save the requester for monitoring/logging purposes
self.start_time = time.time()

# site mapper
self.site_mapper = None
# update time for site mapper
self.last_update_site_mapper = None

def __repr__(self):
return "TaskBuffer"

Expand Down Expand Up @@ -69,6 +74,16 @@ def cleanup(self, requester=None):
def get_num_connections(self):
return self.nDBConnection

# get SiteMapper
def get_site_mapper(self):
time_now = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
if self.last_update_site_mapper is None or datetime.datetime.now(datetime.timezone.utc).replace(
tzinfo=None
) - self.last_update_site_mapper > datetime.timedelta(minutes=10):
self.site_mapper = SiteMapper(self)
self.last_update_site_mapper = time_now
return self.site_mapper

# check production role
def checkProdRole(self, fqans):
for fqan in fqans:
Expand Down Expand Up @@ -175,10 +190,11 @@ def storeJobs(
getEsJobsetMap=False,
unprocessedMap=None,
bulk_job_insert=False,
trust_user=False,
):
try:
tmpLog = LogWrapper(_logger, f"storeJobs <{CoreUtils.clean_user_id(user)}>")
tmpLog.debug(f"start nJobs={len(jobs)}")
tmpLog = LogWrapper(_logger, f"storeJobs <{CoreUtils.clean_user_id(user)} nJobs={len(jobs)}>")
tmpLog.debug(f"start toPending={toPending}")
# check quota for priority calculation
weight = 0.0
userJobID = -1
Expand All @@ -189,10 +205,10 @@ def storeJobs(
useExpress = False
nExpressJobs = 0
useDebugMode = False
siteMapper = SiteMapper(self)
siteMapper = self.get_site_mapper()

# check ban user
if len(jobs) > 0:
if not trust_user and len(jobs) > 0:
# get DB proxy
proxy = self.proxyPool.getProxy()
# check user status
Expand All @@ -206,6 +222,7 @@ def storeJobs(
return [], None, unprocessedMap
return []

tmpLog.debug(f"checked ban user")
# set parameters for user jobs
if (
len(jobs) > 0
Expand All @@ -230,9 +247,6 @@ def storeJobs(
# release proxy
self.proxyPool.putProxy(proxy)

# get site spec
tmpSiteSpec = siteMapper.getSite(jobs[0].computingSite)

# extract country group
for tmpFQAN in fqans:
match = re.search("^/atlas/([^/]+)/", tmpFQAN)
Expand All @@ -246,6 +260,7 @@ def storeJobs(
if tmpCountry in ["usatlas"]:
userCountry = "us"
break
tmpLog.debug(f"set user job parameters")

# return if DN is blocked
if not userStatus:
Expand Down Expand Up @@ -294,23 +309,28 @@ def storeJobs(
prio_reduction,
) = self.getPrioParameters(jobs, user, fqans, userDefinedWG, validWorkingGroup)
tmpLog.debug(f"workingGroup={jobs[0].workingGroup} serNum={serNum} weight={weight} pOffset={priorityOffset} reduction={prio_reduction}")
tmpLog.debug(f"got prio parameters")
# get DB proxy
proxy = self.proxyPool.getProxy()
tmpLog.debug(f"got proxy")
# get group job serial number
groupJobSerialNum = 0
if len(jobs) > 0 and (jobs[0].prodSourceLabel in JobUtils.analy_sources) and (not jobs[0].processingType in ["merge", "unmerge"]):
for tmpFile in jobs[-1].Files:
if tmpFile.type in ["output", "log"] and "$GROUPJOBSN" in tmpFile.lfn:
tmpLog.debug(f"getting group job serial number")
tmpSnRet = proxy.getSerialNumberForGroupJob(user)
if tmpSnRet["status"]:
groupJobSerialNum = tmpSnRet["sn"]
break
tmpLog.debug(f"got group job serial number")
# get total number of files
totalNumFiles = 0
for job in jobs:
totalNumFiles += len(job.Files)
# bulk fetch PandaIDs
new_panda_ids = proxy.bulk_fetch_panda_ids(len(jobs))
tmpLog.debug(f"got PandaIDs")
# bulk fetch fileIDs
fileIDPool = []
if totalNumFiles > 0:
Expand Down
2 changes: 1 addition & 1 deletion pandaserver/taskbuffer/WrappedPostgresConn.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def ping(self):
raise RuntimeError("connection closed")

def __enter__(self):
self.orig_conn.__enter__()
return self.orig_conn.__enter__()

def __exit__(self, exc_type, exc_val, exc_tb):
self.orig_conn.__exit__(exc_type, exc_val, exc_tb)
18 changes: 18 additions & 0 deletions pandaserver/taskbuffer/wrapped_oracle_conn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# wrapper for Oracle Connection
class WrappedOracleConn(object):
def __init__(self, conn):
self.orig_conn = conn

def __getattribute__(self, item):
try:
return object.__getattribute__(self.orig_conn, item)
except Exception:
pass
return object.__getattribute__(self, item)

# override context manager protocol not to close connection
def __enter__(self):
return self.orig_conn.__enter__()

def __exit__(self, exc_type, exc_val, exc_tb):
self.orig_conn.commit()

0 comments on commit 60d5947

Please sign in to comment.