Skip to content

Commit

Permalink
Merge pull request #223 from tmaeno/master
Browse files Browse the repository at this point in the history
added max_worker_id
  • Loading branch information
tmaeno authored Jun 5, 2023
2 parents 22dd36a + 4854bf1 commit 87a1b36
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 3 deletions.
2 changes: 1 addition & 1 deletion PandaPkgInfo.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.0.58"
release_version = "0.0.59"
8 changes: 8 additions & 0 deletions pandaserver/jobdispatcher/JobDispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,11 @@ def updateWorkerPilotStatus(self, workerID, harvesterID, status, timeout, accept
.format(workerID, harvesterID, status, response.encode(accept_json)))
return response.encode(accept_json)

# get max workerID
def get_max_worker_id(self, harvester_id):
id = self.taskBuffer.get_max_worker_id(harvester_id)
return json.dumps(id)


# Singleton
jobDispatcher = JobDispatcher()
Expand Down Expand Up @@ -1402,3 +1407,6 @@ def updateWorkerPilotStatus(req, site, workerID, harvesterID, status, timeout=60
return jobDispatcher.updateWorkerPilotStatus(workerID, harvesterID, status, timeout, accept_json)


# get max workerID
def get_max_worker_id(req, harvester_id):
return jobDispatcher.get_max_worker_id(harvester_id)
5 changes: 3 additions & 2 deletions pandaserver/server/panda.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
from pandaserver.dataservice.DataService import datasetCompleted, updateFileStatusInDisp
from pandaserver.jobdispatcher.JobDispatcher import getJob, updateJob, getStatus, genPilotToken,\
getEventRanges, updateEventRange, getKeyPair, updateEventRanges, getDNsForS3, getProxy, getCommands, ackCommands,\
checkJobStatus, checkEventsAvailability, updateJobsInBulk, getResourceTypes, updateWorkerPilotStatus
checkJobStatus, checkEventsAvailability, updateJobsInBulk, getResourceTypes, updateWorkerPilotStatus,\
get_max_worker_id
from pandaserver.userinterface.UserIF import submitJobs, getJobStatus, queryPandaIDs, killJobs, reassignJobs,\
getJobStatistics, getJobStatisticsPerSite, resubmitJobs, queryLastFilesInDataset, getPandaIDsSite,\
getJobsToBeUpdated, updateProdDBUpdateTimes, runTaskAssignment, getAssigningTask, getSiteSpecs,\
Expand Down Expand Up @@ -91,7 +92,7 @@
'getEventRanges', 'updateEventRange', 'getKeyPair',
'updateEventRanges', 'getDNsForS3', 'getProxy', 'getCommands', 'ackCommands',
'checkJobStatus', 'checkEventsAvailability', 'updateJobsInBulk', 'getResourceTypes',
'updateWorkerPilotStatus']
'updateWorkerPilotStatus', 'get_max_worker_id']

allowedMethods += ['submitJobs', 'getJobStatus', 'queryPandaIDs', 'killJobs', 'reassignJobs', 'getJobStatistics',
'getJobStatisticsPerSite', 'resubmitJobs', 'queryLastFilesInDataset', 'getPandaIDsSite',
Expand Down
31 changes: 31 additions & 0 deletions pandaserver/taskbuffer/OraDBProxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -25407,3 +25407,34 @@ def get_files_in_datasets(self, task_id, dataset_types):
# error
self.dumpErrorMessage(_logger, method_name)
return None

# get the max workerID
def get_max_worker_id(self, harvester_id):
comment = ' /* DBProxy.get_max_worker_id */'
methodName = comment.split(' ')[-2].split('.')[-1]
methodName += " < HarvesterID={} >".format(harvester_id)
tmpLog = LogWrapper(_logger, methodName)
tmpLog.debug("start")
try:
# sql to get workers
sqlC = "SELECT MAX(workerID) FROM ATLAS_PANDA.Harvester_Workers "
sqlC += "WHERE harvesterID=:harvesterID "
varMap = {':harvesterID': harvester_id}
# start transaction
self.conn.begin()
self.cur.execute(sqlC+comment, varMap)
res = self.cur.fetchone()
if res:
max_id, = res
else:
max_id = None
if not self._commit():
raise RuntimeError('Commit error')
tmpLog.debug("got max workerID={}".format(max_id))
return max_id
except Exception:
# roll back
self._rollback()
# error
self.dumpErrorMessage(_logger, methodName)
return None
7 changes: 7 additions & 0 deletions pandaserver/taskbuffer/TaskBuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4183,5 +4183,12 @@ def get_files_in_datasets(self, task_id, dataset_types):
self.proxyPool.putProxy(proxy)
return ret

def get_max_worker_id(self, harvester_id):
proxy = self.proxyPool.getProxy()
ret = proxy.get_max_worker_id(harvester_id)
self.proxyPool.putProxy(proxy)
return ret


# Singleton
taskBuffer = TaskBuffer()

0 comments on commit 87a1b36

Please sign in to comment.