Skip to content

Commit

Permalink
Merge pull request #194 from tmaeno/master
Browse files Browse the repository at this point in the history
gitlab plugins
  • Loading branch information
tmaeno authored Jul 19, 2023
2 parents 987d231 + 54017aa commit a17e68d
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 2 deletions.
1 change: 1 addition & 0 deletions docker/httpd.conf
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Alias /condor_logs/ "/var/log/condor_logs/"
DirectoryIndex disabled

# Errors go to their own log
LogLevel info
ErrorLog logs/error_log

# Never change this block
Expand Down
7 changes: 7 additions & 0 deletions pandaharvester/harvestercore/job_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""

import copy
import json
import datetime
from past.builtins import long
from future.utils import iteritems
Expand Down Expand Up @@ -128,6 +129,12 @@ def add_event(self, event_spec, zip_filespec):

# convert from Job JSON
def convert_job_json(self, data):
# decode secrets
try:
if 'secrets' in data:
data['secrets'] = json.loads(data['secrets'])
except Exception:
pass
self.PandaID = data['PandaID']
if data['taskID'] == 'NULL':
self.taskID = None
Expand Down
4 changes: 2 additions & 2 deletions pandaharvester/harvestermessenger/shared_file_messenger.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ def tar_directory(dir_name, tar_name=None, max_depth=None, extra_files=None, sub
com += 'find . '
if max_depth is not None:
com += '-maxdepth {0} '.format(max_depth)
com += r'-type f \( ' + filter_log_tgz(extra_files) + r'\) -print0 '
com += '| '
com += r'-type f \( ' + filter_log_tgz(extra_files) + r'\) '
com += r'| grep -v {0} | tr "\n" "\0" | '.format(jobSpecFileName)
com += 'tar '
if distutils.spawn.find_executable('pigz') is None:
com += '-z '
Expand Down
20 changes: 20 additions & 0 deletions pandaharvester/harvestermisc/gitlab_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import os
import json
job_params_file = '__job_params__'


# get job params from file
def get_job_params(work_spec):
with open(make_file_path(work_spec)) as f:
return json.load(f)


# store job params in file
def store_job_params(work_spec, params):
with open(make_file_path(work_spec), 'w') as f:
json.dump(params, f)


# make file path
def make_file_path(work_spec):
return os.path.join(work_spec.get_access_point(), job_params_file)
64 changes: 64 additions & 0 deletions pandaharvester/harvestermonitor/gitlab_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import json
import requests
import os.path
from pandaharvester.harvestercore.work_spec import WorkSpec
from pandaharvester.harvestercore.plugin_base import PluginBase
from pandaharvester.harvestercore import core_utils
from pandaharvester.harvestermisc.gitlab_utils import get_job_params

# logger
baseLogger = core_utils.setup_logger('gitlab_monitor')


# dummy monitor
class GitlabMonitor(PluginBase):
# constructor
def __init__(self, **kwarg):
self.timeout = 180
PluginBase.__init__(self, **kwarg)

# check workers
def check_workers(self, workspec_list):
retList = []
for workSpec in workspec_list:
# make logger
tmpLog = self.make_logger(baseLogger, 'workerID={0}'.format(workSpec.workerID),
method_name='check_workers')
try:
params = get_job_params(workSpec)
url = '{}/{}/pipelines/{}'.format(params['project_api'], params['project_id'],
workSpec.batchID.split()[0])
try:
tmpLog.debug('check pipeline at {}'.format(url))
r = requests.get(url, headers={'PRIVATE-TOKEN': params['secrets'][params['access_token']]},
timeout=self.timeout)
response = r.json()
tmpLog.debug('got {}'.format(str(response)))
except Exception:
err_str = core_utils.dump_error_message(tmpLog)
retList.append((WorkSpec.ST_idle, err_str))
continue
newMsg = ''
if 'status' not in response:
newStatus = WorkSpec.ST_idle
if 'message' in response:
newMsg = response['message']
else:
newMsg = 'failed to check due to unknown reason'
else:
if response['status'] == 'success':
newStatus = WorkSpec.ST_finished
elif response['status'] == 'failed':
newStatus = WorkSpec.ST_failed
elif response['status'] == 'created':
newStatus = WorkSpec.ST_submitted
elif response['status'] == 'pending':
newStatus = WorkSpec.ST_pending
else:
newStatus = WorkSpec.ST_running
tmpLog.debug('newStatus={0}'.format(newStatus))
retList.append((newStatus, newMsg))
except Exception:
err_str = core_utils.dump_error_message(tmpLog)
retList.append((WorkSpec.ST_idle, err_str))
return True, retList
56 changes: 56 additions & 0 deletions pandaharvester/harvestersubmitter/gitlab_submitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import uuid
import os
import json
import requests
from pandaharvester.harvestercore import core_utils
from pandaharvester.harvestercore.plugin_base import PluginBase
from pandaharvester.harvestermisc.gitlab_utils import store_job_params

# setup base logger
baseLogger = core_utils.setup_logger('gitlab_submitter')


# dummy submitter
class GitlabSubmitter(PluginBase):
# constructor
def __init__(self, **kwarg):
self.timeout = 180
PluginBase.__init__(self, **kwarg)

# trigger pipeline jobs
def submit_workers(self, workspec_list):
tmpLog = self.make_logger(baseLogger, method_name='submit_workers')
tmpLog.debug('start nWorkers={0}'.format(len(workspec_list)))
retList = []
for workSpec in workspec_list:
try:
jobSpec = workSpec.get_jobspec_list()[0]
secrets = jobSpec.jobParams['secrets']
params = json.loads(jobSpec.jobParams['jobPars'])
params['secrets'] = secrets
store_job_params(workSpec, params)
url = '{}/{}/trigger/pipeline'.format(params['project_api'], params['project_id'])
data = {'token': secrets[params['trigger_token']],
'ref': params['ref']}
try:
tmpLog.debug('trigger pipeline at {}'.format(url))
r = requests.post(url, data=data, timeout=self.timeout)
response = r.json()
tmpLog.debug('got {}'.format(str(response)))
except Exception:
err_str = core_utils.dump_error_message(tmpLog)
retList.append((False, err_str))
continue
if response['status'] == 'created':
workSpec.batchID = '{} {}'.format(response['id'], response['project_id'])
tmpLog.debug('succeeded with {}'.format(workSpec.batchID))
retList.append((True, ''))
else:
err_str = 'failed to trigger with {}'.format(response['status'])
tmpLog.error(err_str)
retList.append((False, err_str))
except Exception:
err_str = core_utils.dump_error_message(tmpLog)
retList.append((False, err_str))
tmpLog.debug('done')
return retList
72 changes: 72 additions & 0 deletions pandaharvester/harvestersweeper/gitlab_sweeper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import os
import shutil
import requests
try:
import subprocess32 as subprocess
except ImportError:
import subprocess

from pandaharvester.harvestercore import core_utils
from pandaharvester.harvestersweeper.base_sweeper import BaseSweeper
from pandaharvester.harvestermisc.gitlab_utils import get_job_params


# logger
baseLogger = core_utils.setup_logger('gitlab_sweeper')


# plugin for sweeper with Gitlab
class GitlabSweeper(BaseSweeper):
# constructor
def __init__(self, **kwarg):
self.timeout = 180
BaseSweeper.__init__(self, **kwarg)

# kill a worker
def kill_worker(self, workspec):
"""Kill a worker in a scheduling system like batch systems and computing elements.
:param workspec: worker specification
:type workspec: WorkSpec
:return: A tuple of return code (True for success, False otherwise) and error dialog
:rtype: (bool, string)
"""
# make logger
tmpLog = self.make_logger(baseLogger, 'workerID={0}'.format(workspec.workerID),
method_name='kill_worker')
params = get_job_params(workspec)
url = '{}/{}/pipelines/{}/cancel'.format(params['project_api'], params['project_id'],
workspec.batchID.split()[0])
try:
tmpLog.debug('cancel pipeline at {}'.format(url))
r = requests.get(url, headers={'PRIVATE-TOKEN': params['secrets'][params['access_token']]},
timeout=self.timeout)
response = r.json()
tmpLog.debug('got {}'.format(str(response)))
except Exception:
err_str = core_utils.dump_error_message(tmpLog)
tmpLog.error(err_str)
tmpLog.debug('done')
# return
return True, ''

# cleanup for a worker
def sweep_worker(self, workspec):
"""Perform cleanup procedures for a worker, such as deletion of work directory.
:param workspec: worker specification
:type workspec: WorkSpec
:return: A tuple of return code (True for success, False otherwise) and error dialog
:rtype: (bool, string)
"""
# make logger
tmpLog = self.make_logger(baseLogger, 'workerID={0}'.format(workspec.workerID),
method_name='sweep_worker')
# clean up worker directory
if os.path.exists(workspec.accessPoint):
shutil.rmtree(workspec.accessPoint)
tmpLog.info('removed {0}'.format(workspec.accessPoint))
else:
tmpLog.info('access point already removed.')
# return
return True, ''

0 comments on commit a17e68d

Please sign in to comment.