Skip to content

Commit

Permalink
v0.2.2
Browse files Browse the repository at this point in the history
  • Loading branch information
mightqxc committed Jun 16, 2020
2 parents 20c404e + 49b590a commit a4c3dfa
Show file tree
Hide file tree
Showing 67 changed files with 3,602 additions and 1,256 deletions.
Empty file added __init__.py
Empty file.
80 changes: 80 additions & 0 deletions examples/k8s/job_cern.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
---
apiVersion: batch/v1
kind: Job
metadata:
name: grid-job
spec:
ttlSecondsAfterFinished: 172800
backoffLimit: 0
template:
spec:
restartPolicy: Never
containers:
- name: atlas-grid-centos7
image: atlasadc/atlas-grid-centos7
env:
- name: computingSite
value: "$computingSite"
- name: pandaQueueName
value: "$pandaQueueName"
- name: proxySecretPath
value: "$proxySecretPath"
- name: proxyContent
value: "$proxyContent"
- name: workerID
value: "$workerID"
- name: logs_frontend_w
value: "$logs_frontend_w"
- name: logs_frontend_r
value: "$logs_frontend_r"
- name: resourceType
value: "$resourceType"
- name: HARVESTER_WORKER_ID
value: "$HARVESTER_WORKER_ID"
- name: HARVESTER_ID
value: "$HARVESTER_ID"
- name: PANDA_JSID
value: "$PANDA_JSID"
- name: TMPDIR
value: "/root"
- name: PILOT_NOKILL
value: "True"
command: ["/usr/bin/bash"]
args: ["-c", "cd; wget https://raw.githubusercontent.com/HSF/harvester/k8s_analysis/pandaharvester/harvestercloud/pilots_starter.py; chmod 755 pilots_starter.py; ./pilots_starter.py || true"]
volumeMounts:
- name: atlas
mountPath: /cvmfs/atlas.cern.ch
- name: atlas-condb
mountPath: /cvmfs/atlas-condb.cern.ch
- name: atlas-nightlies
mountPath: /cvmfs/atlas-nightlies.cern.ch
- name: sft
mountPath: /cvmfs/sft.cern.ch
- name: grid
mountPath: /cvmfs/grid.cern.ch
- name: proxy-secret
mountPath: /proxy
volumes:
- name: atlas
persistentVolumeClaim:
claimName: csi-cvmfs-atlas-pvc
readOnly: true
- name: atlas-condb
persistentVolumeClaim:
claimName: csi-cvmfs-atlas-condb-pvc
readOnly: true
- name: atlas-nightlies
persistentVolumeClaim:
claimName: csi-cvmfs-atlas-nightlies-pvc
readOnly: true
- name: sft
persistentVolumeClaim:
claimName: csi-cvmfs-sft-pvc
readOnly: true
- name: grid
persistentVolumeClaim:
claimName: csi-cvmfs-grid-pvc
readOnly: true
- name: proxy-secret
secret:
secretName: proxy-secret
File renamed without changes.
101 changes: 101 additions & 0 deletions examples/k8s/k8s_cvmfs_1.15.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: csi-cvmfs-atlas
provisioner: cvmfs.csi.cern.ch
parameters:
repository: atlas.cern.ch
---
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: csi-cvmfs-sft
provisioner: cvmfs.csi.cern.ch
parameters:
repository: sft.cern.ch
---
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: csi-cvmfs-grid
provisioner: cvmfs.csi.cern.ch
parameters:
repository: grid.cern.ch
---
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: csi-cvmfs-atlas-condb
provisioner: cvmfs.csi.cern.ch
parameters:
repository: atlas-condb.cern.ch
---
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: csi-cvmfs-atlas-nightlies
provisioner: cvmfs.csi.cern.ch
parameters:
repository: atlas-nightlies.cern.ch
---
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: csi-cvmfs-atlas-pvc
spec:
accessModes:
- ReadOnlyMany
resources:
requests:
storage: 1Gi
storageClassName: csi-cvmfs-atlas
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: csi-cvmfs-sft-pvc
spec:
accessModes:
- ReadOnlyMany
resources:
requests:
storage: 1Gi
storageClassName: csi-cvmfs-sft
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: csi-cvmfs-grid-pvc
spec:
accessModes:
- ReadOnlyMany
resources:
requests:
storage: 1Gi
storageClassName: csi-cvmfs-grid

---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: csi-cvmfs-atlas-condb-pvc
spec:
accessModes:
- ReadOnlyMany
resources:
requests:
storage: 1Gi
storageClassName: csi-cvmfs-atlas-condb
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: csi-cvmfs-atlas-nightlies-pvc
spec:
accessModes:
- ReadOnlyMany
resources:
requests:
storage: 1Gi
storageClassName: csi-cvmfs-atlas-nightlies
1 change: 0 additions & 1 deletion pandaharvester/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@
* **Submitter**: Classes to submit jobs to the batch system
* **Test**: Test scripts
* **Worker Maker**: Makes workers

2 changes: 1 addition & 1 deletion pandaharvester/commit_timestamp.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
timestamp = "10-03-2020 15:13:51 on release (by fahui)"
timestamp = "16-06-2020 14:57:09 on release (by fahui)"
16 changes: 12 additions & 4 deletions pandaharvester/harvesterbody/cred_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,13 @@ def __init__(self, single_mode=False):
pluginPar['inCertFile'] = inCertFile
pluginPar['outCertFile'] = outCertFile
pluginPar['voms'] = voms
exeCore = self.pluginFactory.get_plugin(pluginPar)
self.exeCores.append(exeCore)
try:
exeCore = self.pluginFactory.get_plugin(pluginPar)
self.exeCores.append(exeCore)
except Exception as e:
_logger.error('Problem instantiating cred manager for {0}'.format(pluginPar))
_logger.error('Exception {0}'.format(e))


# get list
def get_list(self, data):
Expand Down Expand Up @@ -74,8 +79,11 @@ def execute(self):
# do nothing
if exeCore is None:
continue
# make logger
mainLog = self.make_logger(_logger, "{0} {1}".format(exeCore.__class__.__name__, exeCore.outCertFile),

# make logger
mainLog = self.make_logger(_logger, "{0} {1} {2}".format(exeCore.__class__.__name__,
exeCore.inCertFile,
exeCore.outCertFile),
method_name='execute')
try:
# check credential
Expand Down
46 changes: 28 additions & 18 deletions pandaharvester/harvesterbody/job_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
from pandaharvester.harvesterbody.agent_base import AgentBase
from pandaharvester.harvestercore.plugin_factory import PluginFactory
from pandaharvester.harvestermisc.info_utils import PandaQueuesDict

# logger
_logger = core_utils.setup_logger('job_fetcher')
Expand All @@ -35,6 +36,10 @@ def run(self):
nJobsPerQueue = self.dbProxy.get_num_jobs_to_fetch(harvester_config.jobfetcher.nQueues,
harvester_config.jobfetcher.lookupTime)
mainLog.debug('got {0} queues'.format(len(nJobsPerQueue)))

# get up to date queue configuration
pandaQueueDict = PandaQueuesDict()

# loop over all queues
for queueName, nJobs in iteritems(nJobsPerQueue):
# check queue
Expand All @@ -44,17 +49,24 @@ def run(self):
method_name='run')
# get queue
queueConfig = self.queueConfigMapper.get_queue(queueName)
siteName = queueConfig.siteName
# upper limit
if nJobs > harvester_config.jobfetcher.maxJobs:
nJobs = harvester_config.jobfetcher.maxJobs

# get jobs
default_prodSourceLabel = queueConfig.get_source_label()
try:
is_grandly_unified_queue = pandaQueueDict.is_grandly_unified_queue(siteName)
except Exception:
is_grandly_unified_queue = False

default_prodSourceLabel = queueConfig.get_source_label(is_gu=is_grandly_unified_queue)

pdpm = getattr(queueConfig, 'prodSourceLabelRandomWeightsPermille', {})
choice_list = core_utils.make_choice_list(pdpm=pdpm, default=default_prodSourceLabel)
prodSourceLabel = random.choice(choice_list)
tmpLog.debug('getting {0} jobs for prodSourceLabel {1}'.format(nJobs, prodSourceLabel))
sw = core_utils.get_stopwatch()
siteName = queueConfig.siteName
jobs, errStr = self.communicator.get_jobs(siteName, self.nodeName,
prodSourceLabel,
self.nodeName, nJobs,
Expand Down Expand Up @@ -89,40 +101,38 @@ def run(self):
fileGroupDictList.append(extractorCore.get_aux_inputs(jobSpec))
for fileGroupDict in fileGroupDictList:
for tmpLFN, fileAttrs in iteritems(fileGroupDict):
# check file status
if tmpLFN not in fileStatMap:
fileStatMap[tmpLFN] = self.dbProxy.get_file_status(tmpLFN, 'input',
queueConfig.ddmEndpointIn,
'starting')
# make file spec
fileSpec = FileSpec()
fileSpec.PandaID = jobSpec.PandaID
fileSpec.taskID = jobSpec.taskID
fileSpec.lfn = tmpLFN
fileSpec.endpoint = queueConfig.ddmEndpointIn
fileSpec.scope = fileAttrs['scope']
# set preparing to skip stage-in if the file is (being) taken care of by another job
if 'ready' in fileStatMap[tmpLFN] or 'preparing' in fileStatMap[tmpLFN] \
or 'to_prepare' in fileStatMap[tmpLFN]:
fileSpec.status = 'preparing'
else:
fileSpec.status = 'to_prepare'
if fileSpec.status not in fileStatMap[tmpLFN]:
fileStatMap[tmpLFN][fileSpec.status] = 0
fileStatMap[tmpLFN][fileSpec.status] += 1
if 'INTERNAL_FileType' in fileAttrs:
fileSpec.fileType = fileAttrs['INTERNAL_FileType']
jobSpec.auxInput = JobSpec.AUX_hasAuxInput
else:
fileSpec.fileType = 'input'
# check file status
if tmpLFN not in fileStatMap:
fileStatMap[tmpLFN] = self.dbProxy.get_file_status(tmpLFN, fileSpec.fileType,
queueConfig.ddmEndpointIn,
'starting')
# set preparing to skip stage-in if the file is (being) taken care of by another job
if [x for x in ['ready', 'preparing', 'to_prepare', 'triggered']
if x in fileStatMap[tmpLFN]]:
fileSpec.status = 'preparing'
else:
fileSpec.status = 'to_prepare'
fileStatMap[tmpLFN].setdefault(fileSpec.status, None)
if 'INTERNAL_URL' in fileAttrs:
fileSpec.url = fileAttrs['INTERNAL_URL']
jobSpec.add_in_file(fileSpec)
jobSpec.trigger_propagation()
jobSpecs.append(jobSpec)
# insert to DB
tmpLog.debug("Converting of {0} jobs {1}".format(len(jobs),sw_startconvert.get_elapsed_time()))
sw_insertdb =core_utils.get_stopwatch()
tmpLog.debug("Converting of {0} jobs {1}".format(len(jobs), sw_startconvert.get_elapsed_time()))
sw_insertdb = core_utils.get_stopwatch()
self.dbProxy.insert_jobs(jobSpecs)
tmpLog.debug('Insert of {0} jobs {1}'.format(len(jobSpecs), sw_insertdb.get_elapsed_time()))
mainLog.debug('done')
Expand Down
37 changes: 28 additions & 9 deletions pandaharvester/harvesterbody/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,12 @@ def run(self):
# loop over all workers
for queueName, configIdWorkSpecs in iteritems(workSpecsPerQueue):
for configID, workSpecsList in iteritems(configIdWorkSpecs):
retVal = self.monitor_agent_core(lockedBy, queueName, workSpecsList, config_id=configID, check_source='DB')
try:
retVal = self.monitor_agent_core(lockedBy, queueName, workSpecsList, config_id=configID, check_source='DB')
except Exception as e:
mainLog.error('monitor_agent_core excepted with {0}'.format(e))
retVal = None # skip the loop

if monitor_fifo.enabled and retVal is not None:
workSpecsToEnqueue, workSpecsToEnqueueToHead, timeNow_timestamp, fifoCheckInterval = retVal
if workSpecsToEnqueue:
Expand Down Expand Up @@ -192,8 +197,13 @@ def run(self):
else:
workSpec.pandaid_list = []
workSpec.force_update('pandaid_list')
retVal = self.monitor_agent_core(lockedBy, queueName, workSpecsList, from_fifo=True,
config_id=configID, check_source='FIFO')
try:
retVal = self.monitor_agent_core(lockedBy, queueName, workSpecsList, from_fifo=True,
config_id=configID, check_source='FIFO')
except Exception as e:
mainLog.error('monitor_agent_core excepted with {0}'.format(e))
retVal = None # skip the loop

if retVal is not None:
workSpecsToEnqueue, workSpecsToEnqueueToHead, timeNow_timestamp, fifoCheckInterval = retVal
qc_key = (queueName, configID)
Expand Down Expand Up @@ -644,8 +654,10 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log,
tmp_log.debug('kill workerID={0} due to queuing longer than {1} seconds'.format(
workerID, workerQueueTimeLimit))
self.dbProxy.kill_worker(workSpec.workerID)
diagMessage = 'Killed by Harvester due to worker queuing too long' + diagMessage
diagMessage = 'Killed by Harvester due to worker queuing too long. ' + diagMessage
workSpec.set_pilot_error(PilotErrors.ERR_FAILEDBYSERVER, diagMessage)
# set closed
workSpec.set_pilot_closed()
# expired heartbeat - only when requested in the configuration
try:
# check if the queue configuration requires checking for worker heartbeat
Expand Down Expand Up @@ -694,7 +706,8 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log,
else:
newStatus = WorkSpec.ST_idle
elif not workSpec.is_post_processed():
if not queue_config.is_no_heartbeat_status(newStatus) and not queue_config.truePilot:
if (not queue_config.is_no_heartbeat_status(newStatus) and not queue_config.truePilot) \
or (hasattr(messenger, 'forcePostProcessing') and messenger.forcePostProcessing):
# post processing unless heartbeat is suppressed
jobSpecs = self.dbProxy.get_jobs_with_worker_id(workSpec.workerID,
None, True,
Expand Down Expand Up @@ -764,10 +777,16 @@ def monitor_event_digester(self, locked_by, max_events):
for configID, workSpecsList in iteritems(_val):
qc_key = (queueName, configID)
tmpLog.debug('checking workers of queueName={0} configID={1}'.format(*qc_key))
retVal = self.monitor_agent_core(locked_by, queueName, workSpecsList,
from_fifo=True, config_id=configID,
check_source='Event')
retMap[qc_key] = retVal
try:
retVal = self.monitor_agent_core(locked_by, queueName, workSpecsList,
from_fifo=True, config_id=configID,
check_source='Event')
except Exception as e:
tmpLog.error('monitor_agent_core excepted with {0}'.format(e))
retVal = None # skip the loop

if retVal:
retMap[qc_key] = retVal
tmpLog.debug('done')
return retMap

Expand Down
Loading

0 comments on commit a4c3dfa

Please sign in to comment.