diff --git a/Dockerfile b/Dockerfile index a41a74348..92031ed0b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -47,7 +47,7 @@ RUN mkdir -p /data/atlpan RUN mkdir -p /var/log/panda/wsgisocks RUN mkdir -p /var/log/panda/pandacache RUN mkdir -p /run/httpd/wsgisocks -RUN mkdir -p /var/cache/pandaserver/jedilog +RUN mkdir -p /var/log/panda/pandacache/jedilog RUN mkdir -p /var/run/panda RUN mkdir -p /var/cric @@ -71,7 +71,6 @@ RUN chmod -R 777 /var/log/panda RUN chmod -R 777 /run/httpd RUN chmod -R 777 /home/atlpan RUN chmod -R 777 /var/lock -RUN chmod -R 777 /var/cache/pandaserver RUN chmod -R 777 /var/log/panda/pandacache RUN chmod -R 777 /var/run/panda RUN chmod -R 777 /var/lib/logrotate diff --git a/PandaPkgInfo.py b/PandaPkgInfo.py index 424a8c83d..fa035dc9c 100644 --- a/PandaPkgInfo.py +++ b/PandaPkgInfo.py @@ -1 +1 @@ -release_version = "0.0.51" +release_version = "0.0.52" diff --git a/pandaserver/config/panda_config.py b/pandaserver/config/panda_config.py index 134350f88..b6a4589e6 100755 --- a/pandaserver/config/panda_config.py +++ b/pandaserver/config/panda_config.py @@ -35,6 +35,10 @@ if 'pserverhosthttp' not in tmpSelf.__dict__: tmpSelf.__dict__['pserverhosthttp'] = tmpSelf.__dict__['pserverhost'] +# disable http +if 'disableHTTP' not in tmpSelf.__dict__: + tmpSelf.__dict__['disableHTTP'] = False + # change the number of database connections for FastCGI/WSGI if tmpSelf.__dict__['useFastCGI'] or tmpSelf.__dict__['useWSGI']: tmpSelf.__dict__['nDBConnection'] = tmpSelf.__dict__['nDBConForFastCGIWSGI'] diff --git a/pandaserver/daemons/scripts/add_main.py b/pandaserver/daemons/scripts/add_main.py index 2d09ae72a..588cec9b7 100644 --- a/pandaserver/daemons/scripts/add_main.py +++ b/pandaserver/daemons/scripts/add_main.py @@ -44,7 +44,7 @@ def main(argv=tuple(), tbuf=None, **kwargs): lock_interval = 10 # retry interval in minutes - retry_interval = 3 + retry_interval = 1 # instantiate TB if tbuf is None: diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index eb7f6d6c2..05bcaab2a 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -1437,7 +1437,11 @@ def keepJob(self,job): # archive job to jobArchived and remove the job from jobsActive or jobsDefined def archiveJob(self,job,fromJobsDefined,useCommit=True,extraInfo=None,fromJobsWaiting=False): comment = ' /* DBProxy.archiveJob */' - _logger.debug("archiveJob : %s %s" % (job.PandaID, job.jobStatus)) + methodName = comment.split(' ')[-2].split('.')[-1] + methodName = methodName + " < PandaID={} jediTaskID={} >".format(job.PandaID, job.jediTaskID) + tmpLog = LogWrapper(_logger, methodName) + tmpLog.debug("start for jobStatus=%s" % job.jobStatus) + start_time = datetime.datetime.utcnow() if fromJobsDefined: sql0 = "SELECT jobStatus FROM ATLAS_PANDA.jobsDefined4 WHERE PandaID=:PandaID " sql1 = "DELETE FROM ATLAS_PANDA.jobsDefined4 WHERE PandaID=:PandaID AND (jobStatus=:oldJobStatus1 OR jobStatus=:oldJobStatus2)" @@ -1493,7 +1497,7 @@ def archiveJob(self,job,fromJobsDefined,useCommit=True,extraInfo=None,fromJobsWa sqlDFile = "SELECT %s FROM ATLAS_PANDA.filesTable4 " % FileSpec.columnNames() sqlDFile+= "WHERE PandaID=:PandaID" for upFile in upOutputs: - _logger.debug("look for downstream jobs for %s" % upFile) + tmpLog.debug("look for downstream jobs for %s" % upFile) if useCommit: self.conn.begin() # select PandaID @@ -1510,14 +1514,14 @@ def archiveJob(self,job,fromJobsDefined,useCommit=True,extraInfo=None,fromJobsWa nDownJobs = len(res) nDownChunk = 20 inTransaction = False - _logger.debug("archiveJob : {0} found {1} downstream jobs for {2}".format(job.PandaID,nDownJobs,upFile)) + tmpLog.debug("found {} downstream jobs for {}".format(nDownJobs, upFile)) # loop over all downstream IDs for downID, in res: if useCommit: if not inTransaction: self.conn.begin() inTransaction = True - _logger.debug("archiveJob : {0} delete : {1} ({2}/{3})".format(job.PandaID,downID,iDownJobs,nDownJobs)) + tmpLog.debug("delete : {} ({}/{})".format(downID, iDownJobs, nDownJobs)) iDownJobs += 1 # select jobs varMap = {} @@ -1610,7 +1614,7 @@ def archiveJob(self,job,fromJobsDefined,useCommit=True,extraInfo=None,fromJobsWa varMap[':status'] = 'tobeclosed' varMap[':name'] = tmpDestinationDBlock self.cur.execute(sqlCloseSub+comment, varMap) - _logger.debug("set tobeclosed for %s" % tmpDestinationDBlock) + tmpLog.debug("set tobeclosed for %s" % tmpDestinationDBlock) # append toBeClosedSubList[dJob.jobDefinitionID].append(tmpDestinationDBlock) # close top-level user dataset @@ -1625,7 +1629,7 @@ def archiveJob(self,job,fromJobsDefined,useCommit=True,extraInfo=None,fromJobsWa varMap[':status'] = 'tobeclosed' varMap[':name'] = topUserDsName self.cur.execute(sqlCloseSub+comment, varMap) - _logger.debug("set %s for %s" % (varMap[':status'],topUserDsName)) + tmpLog.debug("set %s for %s" % (varMap[':status'],topUserDsName)) # append topUserDsList.append(topUserDsName) if useCommit and (iDownJobs % nDownChunk) == 0: @@ -1654,7 +1658,7 @@ def archiveJob(self,job,fromJobsDefined,useCommit=True,extraInfo=None,fromJobsWa except Exception: pass if vuid == '': - _logger.error("cannot extract vuid from %s" % job.jobParameters) + tmpLog.error("cannot extract vuid from %s" % job.jobParameters) else: # get name varMap = {} @@ -1683,7 +1687,7 @@ def archiveJob(self,job,fromJobsDefined,useCommit=True,extraInfo=None,fromJobsWa sqlPMod = "UPDATE ATLAS_PANDA.jobParamsTable SET modificationTime=:modificationTime WHERE PandaID=:PandaID" lostJobIDs = [] for tmpID, in resLost: - _logger.debug("fail due to lost files : %s" % tmpID) + tmpLog.debug("fail due to lost files : %s" % tmpID) varMap = {} varMap[':PandaID'] = tmpID self.cur.arraysize = 10 @@ -1733,7 +1737,7 @@ def archiveJob(self,job,fromJobsDefined,useCommit=True,extraInfo=None,fromJobsWa ddmIDs.append(tmpID) # get offset ddmAttempt = job.attemptNr - _logger.debug("get PandaID for reassign : %s ddmAttempt=%s" % (str(ddmIDs),ddmAttempt)) + tmpLog.debug("get PandaID for reassign : %s ddmAttempt=%s" % (str(ddmIDs),ddmAttempt)) if useCommit: if not self._commit(): raise RuntimeError('Commit error') @@ -1789,11 +1793,11 @@ def archiveJob(self,job,fromJobsDefined,useCommit=True,extraInfo=None,fromJobsWa # check input status for ES merge if useJEDI and EventServiceUtils.isEventServiceMerge(job) and job.jobStatus == 'finished': retInputStat = self.checkInputFileStatusInJEDI(job, useCommit=False, withLock=True) - _logger.debug("archiveJob : {0} checkInput for ES merge -> {1}".format(job.PandaID, retInputStat)) + tmpLog.debug("checkInput for ES merge -> {}".format(retInputStat)) if retInputStat is None: raise RuntimeError("archiveJob : {0} failed to check input".format(job.PandaID)) if retInputStat is False: - _logger.debug("archiveJob : {0} set jobStatus=failed due to inconsisten input".format(job.PandaID)) + tmpLog.debug("set jobStatus=failed due to inconsisten input") job.jobStatus = 'failed' job.taskBufferErrorCode = ErrorCode.EC_EventServiceInconsistentIn job.taskBufferErrorDiag = "inconsistent file status between Panda and JEDI" @@ -1804,22 +1808,21 @@ def archiveJob(self,job,fromJobsDefined,useCommit=True,extraInfo=None,fromJobsWa if not useJEDI: # update HS06sec for non-JEDI jobs (e.g. HC) hs06sec = self.setHS06sec(job.PandaID, inActive=True) - _logger.debug("archiveJob : calculated hs06sec {0} for pandaID {1}".format(hs06sec, job.PandaID)) + tmpLog.debug("calculated hs06sec {0}".format(hs06sec)) if hs06sec is not None: job.hs06sec = hs06sec # update the g of CO2 emitted by the job try: gco2_regional, gco2_global = self.set_co2_emissions(job.PandaID, in_active=True) - _logger.debug("archiveJob : calculated gCO2 regional {0} and global {1} for pandaID {2}".format(gco2_regional, gco2_global, job.PandaID)) + tmpLog.debug("calculated gCO2 regional {0} and global {1}".format(gco2_regional, gco2_global)) if gco2_regional is not None: job.gco2_regional = gco2_regional if gco2_global is not None: job.gco2_global = gco2_global except Exception: - _logger.error( - "archiveJob : failed calculating gCO2 for pandaID {0} with {1}".format(job.PandaID, - traceback.format_exc())) + tmpLog.error( + "failed calculating gCO2 with {}".format(traceback.format_exc())) # actions for successful normal ES jobs if useJEDI and EventServiceUtils.isEventServiceJob(job) \ @@ -1833,22 +1836,21 @@ def archiveJob(self,job,fromJobsDefined,useCommit=True,extraInfo=None,fromJobsWa # update the g of CO2 emitted by the job try: gco2_regional, gco2_global = self.set_co2_emissions(job.PandaID, in_active=True) - _logger.debug("archiveJob : calculated gCO2 regional {0} and global {1} for pandaID {1}".format(gco2_regional, gco2_global, job.PandaID)) + tmpLog.debug("calculated gCO2 regional {} and global {}".format(gco2_regional, gco2_global)) if gco2_regional is not None: job.gco2_regional = gco2_regional if gco2_global is not None: job.gco2_global = gco2_global except Exception: - _logger.error( - "archiveJob : failed calculating gCO2 for pandaID {0} with {1}".format(job.PandaID, - traceback.format_exc())) + tmpLog.error( + "failed calculating gCO2 with {}".format(traceback.format_exc())) # post processing oldJobSubStatus = job.jobSubStatus if oldJobSubStatus == 'NULL': oldJobSubStatus = None retEvS,retNewPandaID = self.ppEventServiceJob(job,currentJobStatus,False) - _logger.debug("archiveJob : {0} ppE -> {1}".format(job.PandaID, retEvS)) + tmpLog.debug("ppE -> {}".format(retEvS)) # DB error if retEvS is None: raise RuntimeError('Faied to retry for Event Service') @@ -1914,20 +1916,6 @@ def archiveJob(self,job,fromJobsDefined,useCommit=True,extraInfo=None,fromJobsWa # additional actions when retry codeListWithRetry = [0, 4, 5, 8, 9] if retEvS in codeListWithRetry and job.computingSite != EventServiceUtils.siteIdForWaitingCoJumboJobs: - # resurrect consumers at other sites - """ - if retEvS != 4 and EventServiceUtils.isResurrectConsumers(job.specialHandling): - archivedConsumers = self.getOriginalConsumers(job.jediTaskID, job.jobsetID, job.PandaID) - for archivedConsumer in archivedConsumers: - archivedConsumer.attemptNr = job.attemptNr - archivedConsumer.maxAttempt = job.maxAttempt - tmpS,tmpID = self.ppEventServiceJob(archivedConsumer,None,False) - _logger.debug('archiveJob : {0} tried to resurrect old consumer {1} ret={2} new={3}'.format(job.PandaID, - archivedConsumer.PandaID, - tmpS,tmpID)) - if tmpID is not None: - retNewPandaID = tmpID - """ # check jumbo flag sqlJumbo = "SELECT useJumbo FROM {0}.JEDI_Tasks ".format(panda_config.schemaJEDI) sqlJumbo += "WHERE jediTaskID=:jediTaskID " @@ -1939,7 +1927,7 @@ def archiveJob(self,job,fromJobsDefined,useCommit=True,extraInfo=None,fromJobsWa useJumbo, = resJumbo else: useJumbo = None - _logger.debug("archiveJob : {0} useJumbo={1}".format(job.PandaID, useJumbo)) + tmpLog.debug("useJumbo={}".format(useJumbo)) # no new jobs if retNewPandaID is None and (retEvS != 4 or EventServiceUtils.isCoJumboJob(job) or useJumbo is not None): nActiveConsumers = self.getActiveConsumers(job.jediTaskID, job.jobsetID, job.PandaID) @@ -1952,9 +1940,8 @@ def archiveJob(self,job,fromJobsDefined,useCommit=True,extraInfo=None,fromJobsWa job.jobStatus = 'failed' job.taskBufferErrorCode = ErrorCode.EC_EventServiceNoEsQueues job.taskBufferErrorDiag = "no ES queues available for new consumers" - _logger.debug('archiveJob : {0} set {1} since {2}'.format(job.PandaID, - job.jobStatus, - job.taskBufferErrorDiag)) + tmpLog.debug('set {} since {}'.format(job.jobStatus, + job.taskBufferErrorDiag)) # kill unused event ranges if job.jobStatus == 'failed': if not job.notDiscardEvents(): @@ -2020,7 +2007,7 @@ def archiveJob(self,job,fromJobsDefined,useCommit=True,extraInfo=None,fromJobsWa varMap = file.valuesMap(onlyChanged=True) if varMap != {}: varMap[':row_ID'] = file.row_ID - _logger.debug(sqlF+comment+str(varMap)) + tmpLog.debug(sqlF+comment+str(varMap)) self.cur.execute(sqlF+comment, varMap) # update metadata and parameters sqlFMod = "UPDATE ATLAS_PANDA.filesTable4 SET modificationTime=:modificationTime WHERE PandaID=:PandaID" @@ -2049,13 +2036,13 @@ def archiveJob(self,job,fromJobsDefined,useCommit=True,extraInfo=None,fromJobsWa sqlGetCurFiles += "currentfiles,vuid FROM ATLAS_PANDA.Datasets tab WHERE name=:name" self.cur.execute(sqlGetCurFiles+comment,varMap) resCurFiles = self.cur.fetchone() - _logger.debug("archiveJob : %s %s" % (job.PandaID,str(resCurFiles))) + tmpLog.debug("%s" % str(resCurFiles)) if resCurFiles is not None: # increment currentfiles only for the first failed job since that is enough tmpCurrentFiles,tmpVUID = resCurFiles - _logger.debug("archiveJob : %s %s currentfiles=%s" % (job.PandaID,tmpFile.dispatchDBlock,tmpCurrentFiles)) + tmpLog.debug("%s currentfiles=%s" % (tmpFile.dispatchDBlock, tmpCurrentFiles)) if tmpCurrentFiles == 0: - _logger.debug("archiveJob : %s %s update currentfiles" % (job.PandaID,tmpFile.dispatchDBlock)) + tmpLog.debug("%s update currentfiles" % tmpFile.dispatchDBlock) varMap = {} varMap[':vuid'] = tmpVUID sqlFailedInDis = 'UPDATE ATLAS_PANDA.Datasets ' @@ -2120,7 +2107,7 @@ def archiveJob(self,job,fromJobsDefined,useCommit=True,extraInfo=None,fromJobsWa varMap[':jobSubStatus'] = 'es_wait' self.cur.execute(sqlOJS+comment,varMap) tmpJobStatus = varMap[':jobStatus'] - _logger.debug("archiveJob : %s change failed to merging" % job.PandaID) + tmpLog.debug("change failed to merging") elif job.jobStatus in ['failed'] and \ job.taskBufferErrorCode in [ErrorCode.EC_EventServiceLastUnprocessed, ErrorCode.EC_EventServiceUnprocessed] and \ (oldJobSubStatus in ['pilot_noevents'] or \ @@ -2131,7 +2118,7 @@ def archiveJob(self,job,fromJobsDefined,useCommit=True,extraInfo=None,fromJobsWa varMap[':jobSubStatus'] = oldJobSubStatus self.cur.execute(sqlOJS+comment,varMap) tmpJobStatus = varMap[':jobStatus'] - _logger.debug("archiveJob : {0} change failed to closed for {1}".format(job.PandaID, oldJobSubStatus)) + tmpLog.debug("change failed to closed for {}".format(oldJobSubStatus)) # commit if useCommit: if not self._commit(): @@ -2142,21 +2129,21 @@ def archiveJob(self,job,fromJobsDefined,useCommit=True,extraInfo=None,fromJobsWa self.recordStatusChange(tmpJob.PandaID,tmpJobStatus,jobInfo=tmpJob,useCommit=useCommit) self.push_job_status_message(tmpJob, tmpJob.PandaID, tmpJobStatus) except Exception: - _logger.error('recordStatusChange in archiveJob') - _logger.debug("archiveJob : %s done" % job.PandaID) - return True,ddmIDs,ddmAttempt,newJob + tmpLog.error('recordStatusChange in archiveJob') + exec_time = datetime.datetime.utcnow() - start_time + tmpLog.debug("done OK. took %s.%03d sec" % (exec_time.seconds, exec_time.microseconds/1000)) + return True, ddmIDs, ddmAttempt, newJob except Exception: # roll back if useCommit: self._rollback(True) - errtype,errvalue = sys.exc_info()[:2] - errStr = "archiveJob %s : %s %s" % (job.PandaID,errtype,errvalue) - errStr.strip() - errStr += traceback.format_exc() - _logger.error(errStr) + # error + self.dumpErrorMessage(_logger, methodName) + exec_time = datetime.datetime.utcnow() - start_time + tmpLog.debug("done NG. took %s.%03d sec" % (exec_time.seconds, exec_time.microseconds/1000)) if not useCommit: raise RuntimeError('archiveJob failed') - return False,[],0,None + return False, [], 0, None # finalize pending jobs @@ -12473,7 +12460,7 @@ def push_job_status_message(self, job_spec, panda_id, status, jedi_task_id=None, def propagateResultToJEDI(self, jobSpec, cur, oldJobStatus=None, extraInfo=None, finishPending=False, waitLock=False): comment = ' /* DBProxy.propagateResultToJEDI */' methodName = comment.split(' ')[-2].split('.')[-1] - methodName += " ".format(jobSpec.PandaID) + methodName += " < PandaID={} jediTaskID={} >".format(jobSpec.PandaID, jobSpec.jediTaskID) tmpLog = LogWrapper(_logger,methodName) datasetContentsStat = {} # loop over all files @@ -12828,6 +12815,7 @@ def propagateResultToJEDI(self, jobSpec, cur, oldJobStatus=None, extraInfo=None, tmpDatasetIDs = list(datasetContentsStat) tmpDatasetIDs.sort() for tmpDatasetID in tmpDatasetIDs: + tmpLog.debug('trying to lock datasetID={}'.format(tmpDatasetID)) tmpContentsStat = datasetContentsStat[tmpDatasetID] sqlJediDL = "SELECT nFilesUsed,nFilesFailed,nFilesTobeUsed,nFilesFinished,nFilesOnHold,type,masterID FROM ATLAS_PANDA.JEDI_Datasets " sqlJediDL += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID FOR UPDATE " @@ -12969,12 +12957,10 @@ def propagateResultToJEDI(self, jobSpec, cur, oldJobStatus=None, extraInfo=None, # update the g of CO2 emitted by the job try: gco2_regional, gco2_global = self.set_co2_emissions(jobSpec.PandaID) - _logger.debug("archiveJob : calculated gCO2 regional {0} and global {1} for pandaID {2}".format(gco2_regional, - gco2_global, - jobSpec.PandaID)) + tmpLog.debug("calculated gCO2 regional {0} and global {1}".format(gco2_regional, + gco2_global)) except Exception: - _logger.error("archiveJob : failed calculating gCO2 for pandaID {0} with {1}".format(jobSpec.PandaID, - traceback.format_exc())) + tmpLog.error("failed calculating gCO2 with {}".format(traceback.format_exc())) # return return True diff --git a/pandaserver/taskbuffer/Utils.py b/pandaserver/taskbuffer/Utils.py index 100bad7cd..3e1fae573 100755 --- a/pandaserver/taskbuffer/Utils.py +++ b/pandaserver/taskbuffer/Utils.py @@ -433,7 +433,10 @@ def uploadLog(req,file): fo.write(fileContent) fo.close() tmpLog.debug("written to {0}".format(fileFullPath)) - retStr = 'http://{0}/cache{1}/{2}'.format(getServerHTTP(None),jediLogDir,fileBaseName) + if panda_config.disableHTTP: + retStr = 'https://{}/cache{}/{}'.format(getServer(None), jediLogDir, fileBaseName) + else: + retStr = 'http://{}/cache{}/{}'.format(getServerHTTP(None),jediLogDir,fileBaseName) except Exception: errtype,errvalue = sys.exc_info()[:2] errStr = "failed to write log with {0}:{1}".format(errtype.__name__,errvalue)