Skip to content

Commit

Permalink
Merge pull request #228 from tmaeno/master
Browse files Browse the repository at this point in the history
regex matching multiple files
  • Loading branch information
tmaeno authored Jun 30, 2023
2 parents 9375dc9 + d933160 commit b16e21b
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 96 deletions.
2 changes: 1 addition & 1 deletion PandaPkgInfo.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.0.61"
release_version = "0.0.62"
41 changes: 16 additions & 25 deletions pandaserver/dataservice/AdderGen.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,13 +655,6 @@ def parseXML(self):
lfns_set = set(lfns)
fileList = []
for file in self.job.Files:
if file.lfn.startswith('regex|'):
target = re.sub(r'^[^|]+\|', '', file.lfn)
for tmp_lfn in lfns_set:
if re.search(target, tmp_lfn):
file.lfn = tmp_lfn
self.logger.debug(f'use new LFN {tmp_lfn} for {target}')
break
fileList.append(file.lfn)
if file.type == 'input':
if file.lfn in lfns_set:
Expand Down Expand Up @@ -742,34 +735,32 @@ def parseXML(self):
def copyFilesForVariableNumOutputs(self,lfns):
# get original output files
origOutputs = {}
updateOrig = {}
for tmpFile in self.job.Files:
if tmpFile.type in ['output','log']:
origOutputs[tmpFile.lfn] = tmpFile
if tmpFile.lfn in lfns:
# keep original
updateOrig[tmpFile.lfn] = False
else:
# overwrite original
updateOrig[tmpFile.lfn] = True
# look for unkown files
addedNewFiles = False
orig_to_new_map = {}
for newLFN in lfns:
if newLFN not in origOutputs:
# look for corresponding original output
for origLFN in origOutputs:
tmpPatt = '^{0}\.*_\d+$'.format(origLFN)
if re.search(tmpPatt,newLFN) is not None:
# copy file record
tmpStat = self.taskBuffer.copyFileRecord(newLFN,origOutputs[origLFN],updateOrig[origLFN])
if not tmpStat:
return False
addedNewFiles = True
# disable further overwriting
updateOrig[origLFN] = False
tmpPatt = r'^{0}\.*_\d+$'.format(origLFN)
regPatt = re.sub(r'^[^|]+\|', '', origLFN)
if re.search(tmpPatt,newLFN) or \
(origLFN.startswith('regex|') and re.search(regPatt, newLFN)):
self.logger.debug(f'use new LFN {newLFN} for {origLFN}')
# collect new filenames
orig_to_new_map.setdefault(origLFN, [])
orig_to_new_map[origLFN].append(newLFN)
break
# copy file records
for origLFN in orig_to_new_map:
tmpStat = self.taskBuffer.copy_file_records(orig_to_new_map[origLFN],
origOutputs[origLFN])
if not tmpStat:
return False
# refresh job info
if addedNewFiles:
if orig_to_new_map:
self.job = self.taskBuffer.peekJobs([self.jobID],fromDefined=False,
fromWaiting=False,
forAnal=True)[0]
Expand Down
136 changes: 69 additions & 67 deletions pandaserver/taskbuffer/OraDBProxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -18259,79 +18259,81 @@ def recordRetryHistoryJEDI(self,jediTaskID,newPandaID,oldPandaIDs,relationType):



# copy file record
def copyFileRecord(self,newLFN,fileSpec,updateOrig):
comment = ' /* DBProxy.copyFileRecord */'
# copy file records
def copy_file_records(self, new_lfns, file_spec):
comment = ' /* DBProxy.copy_file_records */'
methodName = comment.split(' ')[-2].split('.')[-1]
methodName += " <PandaID={0} oldLFN={1} newLFN={2} updateOrig={3}>".format(fileSpec.PandaID, fileSpec.lfn, newLFN, updateOrig)
methodName += " <PandaID={0} oldLFN={1}>".format(file_spec.PandaID, file_spec.lfn)
tmpLog = LogWrapper(_logger,methodName)
tmpLog.debug("start")
tmpLog.debug("start with {} files".format(len(new_lfns)))
try:
# reset rowID
tmpFileSpec = copy.copy(fileSpec)
tmpFileSpec.lfn = newLFN
if not updateOrig:
tmpFileSpec.row_ID = None
# begin transaction
self.conn.begin()
# insert file in JEDI
if not updateOrig and tmpFileSpec.jediTaskID not in [None,'NULL'] and tmpFileSpec.fileID not in ['', 'NULL', None]:
# get fileID
sqlFileID = "SELECT ATLAS_PANDA.JEDI_DATASET_CONT_FILEID_SEQ.nextval FROM dual "
self.cur.execute(sqlFileID+comment)
newFileID, = self.cur.fetchone()
# read file in JEDI
varMap = {}
varMap[':jediTaskID'] = tmpFileSpec.jediTaskID
varMap[':datasetID'] = tmpFileSpec.datasetID
varMap[':fileID'] = tmpFileSpec.fileID
sqlGI = 'SELECT * FROM {0}.JEDI_Dataset_Contents '.format(panda_config.schemaJEDI)
sqlGI += 'WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID '
self.cur.execute(sqlGI+comment,varMap)
resGI = self.cur.fetchone()
tmpFileSpec.fileID = newFileID
if resGI is not None:
# make sql and map
sqlJI = "INSERT INTO {0}.JEDI_Dataset_Contents ".format(panda_config.schemaJEDI)
sqlJI += "VALUES ("
for idx_lfn, new_lfn in enumerate(new_lfns):
# reset rowID
tmpFileSpec = copy.copy(file_spec)
tmpFileSpec.lfn = new_lfn
if idx_lfn > 0:
tmpFileSpec.row_ID = None
# insert file in JEDI
if idx_lfn > 0 and tmpFileSpec.jediTaskID not in [None,'NULL'] and \
tmpFileSpec.fileID not in ['', 'NULL', None]:
# get fileID
sqlFileID = "SELECT ATLAS_PANDA.JEDI_DATASET_CONT_FILEID_SEQ.nextval FROM dual "
self.cur.execute(sqlFileID+comment)
newFileID, = self.cur.fetchone()
# read file in JEDI
varMap = {}
for columDesc,columVal in zip(self.cur.description,resGI):
columName = columDesc[0]
# overwrite fileID
if columName == 'FILEID':
columVal = tmpFileSpec.fileID
keyName = ':{0}'.format(columName)
varMap[keyName] = columVal
sqlJI += '{0},'.format(keyName)
sqlJI = sqlJI[:-1]
sqlJI += ") "
# insert file in JEDI
self.cur.execute(sqlJI+comment,varMap)
if not updateOrig:
# insert file in Panda
sqlFile = "INSERT INTO ATLAS_PANDA.filesTable4 ({0}) ".format(FileSpec.columnNames())
sqlFile+= FileSpec.bindValuesExpression(useSeq=True)
varMap = tmpFileSpec.valuesMap(useSeq=True)
self.cur.execute(sqlFile+comment, varMap)
else:
# update LFN
sqlFSF = "UPDATE ATLAS_PANDA.filesTable4 SET lfn=:lfn "
sqlFSF += "WHERE row_ID=:row_ID "
varMap = {}
varMap[':lfn'] = tmpFileSpec.lfn
varMap[':row_ID'] = tmpFileSpec.row_ID
self.cur.execute(sqlFSF+comment,varMap)
# update LFN in JEDI
if tmpFileSpec.fileID not in ['', 'NULL', None]:
sqlJF = "UPDATE {0}.JEDI_Dataset_Contents ".format(panda_config.schemaJEDI)
sqlJF += "SET lfn=:lfn "
sqlJF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
varMap = {}
varMap[':lfn'] = tmpFileSpec.lfn
varMap[':jediTaskID'] = tmpFileSpec.jediTaskID
varMap[':datasetID'] = tmpFileSpec.datasetID
varMap[':fileID'] = tmpFileSpec.fileID
self.cur.execute(sqlJF+comment,varMap)
varMap[':jediTaskID'] = tmpFileSpec.jediTaskID
varMap[':datasetID'] = tmpFileSpec.datasetID
varMap[':fileID'] = tmpFileSpec.fileID
sqlGI = 'SELECT * FROM {0}.JEDI_Dataset_Contents '.format(panda_config.schemaJEDI)
sqlGI += 'WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID '
self.cur.execute(sqlGI+comment,varMap)
resGI = self.cur.fetchone()
tmpFileSpec.fileID = newFileID
if resGI is not None:
# make sql and map
sqlJI = "INSERT INTO {0}.JEDI_Dataset_Contents ".format(panda_config.schemaJEDI)
sqlJI += "VALUES ("
varMap = {}
for columDesc,columVal in zip(self.cur.description,resGI):
columName = columDesc[0]
# overwrite fileID
if columName == 'FILEID':
columVal = tmpFileSpec.fileID
keyName = ':{0}'.format(columName)
varMap[keyName] = columVal
sqlJI += '{0},'.format(keyName)
sqlJI = sqlJI[:-1]
sqlJI += ") "
# insert file in JEDI
self.cur.execute(sqlJI+comment,varMap)
if idx_lfn > 0:
# insert file in Panda
sqlFile = "INSERT INTO ATLAS_PANDA.filesTable4 ({0}) ".format(FileSpec.columnNames())
sqlFile+= FileSpec.bindValuesExpression(useSeq=True)
varMap = tmpFileSpec.valuesMap(useSeq=True)
self.cur.execute(sqlFile+comment, varMap)
else:
# update LFN
sqlFSF = "UPDATE ATLAS_PANDA.filesTable4 SET lfn=:lfn "
sqlFSF += "WHERE row_ID=:row_ID "
varMap = {}
varMap[':lfn'] = tmpFileSpec.lfn
varMap[':row_ID'] = tmpFileSpec.row_ID
self.cur.execute(sqlFSF+comment,varMap)
# update LFN in JEDI
if tmpFileSpec.fileID not in ['', 'NULL', None]:
sqlJF = "UPDATE {0}.JEDI_Dataset_Contents ".format(panda_config.schemaJEDI)
sqlJF += "SET lfn=:lfn "
sqlJF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND fileID=:fileID "
varMap = {}
varMap[':lfn'] = tmpFileSpec.lfn
varMap[':jediTaskID'] = tmpFileSpec.jediTaskID
varMap[':datasetID'] = tmpFileSpec.datasetID
varMap[':fileID'] = tmpFileSpec.fileID
self.cur.execute(sqlJF+comment,varMap)
# commit
if not self._commit():
raise RuntimeError('Commit error')
Expand Down
6 changes: 3 additions & 3 deletions pandaserver/taskbuffer/TaskBuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3013,12 +3013,12 @@ def getInputDatasetsForOutputDatasetJEDI(self,datasetName):



# copy file record
def copyFileRecord(self,newLFN,fileSpec,updateOrig):
# copy file records
def copy_file_records(self, new_lfns, file_spec):
# get proxy
proxy = self.proxyPool.getProxy()
# exec
ret = proxy.copyFileRecord(newLFN,fileSpec,updateOrig)
ret = proxy.copy_file_records(new_lfns, file_spec)
# release proxy
self.proxyPool.putProxy(proxy)
# return
Expand Down

0 comments on commit b16e21b

Please sign in to comment.