Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into release
Browse files Browse the repository at this point in the history
  • Loading branch information
mightqxc committed Feb 16, 2022
2 parents a05034c + 05f0e95 commit b3369ac
Show file tree
Hide file tree
Showing 25 changed files with 830 additions and 216 deletions.
89 changes: 89 additions & 0 deletions .github/workflows/docker-publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
name: Docker

# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.

on:
release:
types: [published]

workflow_dispatch:

env:
# Use docker.io for Docker Hub if empty
REGISTRY: ghcr.io
# github.repository as <account>/<repo>
IMAGE_NAME: ${{ github.repository }}


jobs:
build:

runs-on: ubuntu-latest
permissions:
contents: read
packages: write
# This is used to complete the identity challenge
# with sigstore/fulcio when running outside of PRs.
id-token: write

steps:
- name: Checkout repository
uses: actions/checkout@v2

# Install the cosign tool except on PR
# https://github.com/sigstore/cosign-installer
- name: Install cosign
if: github.event_name != 'pull_request'
uses: sigstore/cosign-installer@1e95c1de343b5b0c23352d6417ee3e48d5bcd422
with:
cosign-release: 'v1.4.0'


# Workaround: https://github.com/docker/build-push-action/issues/461
- name: Setup Docker buildx
uses: docker/setup-buildx-action@79abd3f86f79a9d68a23c75a09a9a85889262adf

# Login against a Docker registry except on PR
# https://github.com/docker/login-action
- name: Log into registry ${{ env.REGISTRY }}
if: github.event_name != 'pull_request'
uses: docker/login-action@28218f9b04b4f3f62068d7b6ce6ca5b26e35336c
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

# Extract metadata (tags, labels) for Docker
# https://github.com/docker/metadata-action
- name: Extract Docker metadata
id: meta
uses: docker/metadata-action@98669ae865ea3cffbcbaa878cf57c20bbf1c6c38
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}

# Build and push Docker image with Buildx (don't push on PR)
# https://github.com/docker/build-push-action
- name: Build and push Docker image
id: build-and-push
uses: docker/build-push-action@ad44023a93711e3deb337508980b4b5e9bcdc5dc
with:
context: .
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}

# Sign the resulting Docker image digest except on PRs.
# This will only write to the public Rekor transparency log when the Docker
# repository is public to avoid leaking data. If you would like to publish
# transparency data even for private images, pass --force to cosign below.
# https://github.com/sigstore/cosign
#- name: Sign the published Docker image
# if: ${{ github.event_name != 'pull_request' }}
# env:
# COSIGN_EXPERIMENTAL: "true"
# # This step uses the identity token to provision an ephemeral certificate
# # against the sigstore community Fulcio instance.
# run: cosign sign ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}@${{ steps.build-and-push.outputs.digest }}
33 changes: 33 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
FROM docker.io/centos:7

RUN yum update -y
RUN yum install -y epel-release
RUN yum install -y python3 python3-devel gcc less git mysql-devel curl

RUN curl -fsSL https://get.htcondor.org | /bin/bash -s -- --no-dry-run

RUN python3 -m venv /opt/harvester
RUN /opt/harvester/bin/pip install -U pip
RUN /opt/harvester/bin/pip install -U setuptools
RUN /opt/harvester/bin/pip install -U mysqlclient uWSGI
RUN /opt/harvester/bin/pip install git+git://github.com/HSF/harvester.git

RUN mv /opt/harvester/etc/sysconfig/panda_harvester.rpmnew.template /opt/harvester/etc/sysconfig/panda_harvester
RUN mv /opt/harvester/etc/panda/panda_common.cfg.rpmnew /opt/harvester/etc/panda/panda_common.cfg
RUN mv /opt/harvester/etc/panda/panda_harvester.cfg.rpmnew.template /opt/harvester/etc/panda/panda_harvester.cfg
RUN mv /opt/harvester/etc/panda/panda_harvester-uwsgi.ini.rpmnew.template /opt/harvester/etc/panda/panda_harvester-uwsgi.ini
RUN mv /opt/harvester/etc/rc.d/init.d/panda_harvester-uwsgi.rpmnew.template /opt/harvester/etc/rc.d/init.d/panda_harvester-uwsgi

RUN ln -fs /opt/harvester/etc/queue_config/panda_queueconfig.json /opt/harvester/etc/panda/panda_queueconfig.json

RUN adduser atlpan
RUN groupadd zp
RUN usermod -a -G zp atlpan

RUN mkdir -p /var/log/panda
RUN chown -R atlpan:zp /var/log/panda

RUN mkdir -p /data/harvester
RUN chown -R atlpan:zp /data/harvester

CMD exec /bin/bash -c "trap : TERM INT; sleep infinity & wait"
2 changes: 1 addition & 1 deletion pandaharvester/commit_timestamp.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
timestamp = "17-09-2021 07:05:31 on release (by mightqxc)"
timestamp = "16-02-2022 08:11:12 on release (by mightqxc)"
58 changes: 40 additions & 18 deletions pandaharvester/harvesterbody/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,19 +182,29 @@ def run(self):
while time.time() < last_fifo_cycle_timestamp + fifoCheckDuration:
sw.reset()
n_loops += 1
retVal, overhead_time = monitor_fifo.to_check_workers()
try:
retVal, overhead_time = monitor_fifo.to_check_workers()
except Exception as e:
mainLog.error('failed to check workers from FIFO: {0}'.format(e))
if overhead_time is not None:
n_chunk_peeked_stat += 1
sum_overhead_time_stat += overhead_time
if retVal:
# check fifo size
fifo_size = monitor_fifo.size()
mainLog.debug('FIFO size is {0}'.format(fifo_size))
try:
fifo_size = monitor_fifo.size()
mainLog.debug('FIFO size is {0}'.format(fifo_size))
except Exception as e:
mainLog.error('failed to get size of FIFO: {0}'.format(e))
time.sleep(2)
continue
mainLog.debug('starting run with FIFO')
try:
obj_gotten = monitor_fifo.get(timeout=1, protective=fifoProtectiveDequeue)
except Exception as errStr:
mainLog.error('failed to get object from FIFO: {0}'.format(errStr))
time.sleep(2)
continue
else:
if obj_gotten is not None:
sw_fifo = core_utils.get_stopwatch()
Expand Down Expand Up @@ -299,7 +309,10 @@ def run(self):
mainLog.error('failed to put object from FIFO head: {0}'.format(errStr))
# delete protective dequeued objects
if fifoProtectiveDequeue and len(obj_dequeued_id_list) > 0:
monitor_fifo.delete(ids=obj_dequeued_id_list)
try:
monitor_fifo.delete(ids=obj_dequeued_id_list)
except Exception as e:
mainLog.error('failed to delete object from FIFO: {0}'.format(e))
mainLog.debug('put {0} worker chunks into FIFO'.format(n_chunk_put) + sw.get_elapsed_time())
# adjust adjusted_sleepTime
if n_chunk_peeked_stat > 0 and sum_overhead_time_stat > sleepTime:
Expand Down Expand Up @@ -655,7 +668,7 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log,
timeNow - workSpec.checkTime > datetime.timedelta(seconds=checkTimeout):
# kill due to timeout
tmp_log.debug('kill workerID={0} due to consecutive check failures'.format(workerID))
self.dbProxy.kill_worker(workSpec.workerID)
self.dbProxy.mark_workers_to_kill_by_workerids([workSpec.workerID])
newStatus = WorkSpec.ST_cancelled
diagMessage = 'Killed by Harvester due to consecutive worker check failures. ' + diagMessage
workSpec.set_pilot_error(PilotErrors.FAILEDBYSERVER, diagMessage)
Expand All @@ -665,13 +678,13 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log,
# request kill
if messenger.kill_requested(workSpec):
tmp_log.debug('kill workerID={0} as requested'.format(workerID))
self.dbProxy.kill_worker(workSpec.workerID)
self.dbProxy.mark_workers_to_kill_by_workerids([workSpec.workerID])
# stuck queuing for too long
if workSpec.status == WorkSpec.ST_submitted \
and timeNow > workSpec.submitTime + datetime.timedelta(seconds=workerQueueTimeLimit):
tmp_log.debug('kill workerID={0} due to queuing longer than {1} seconds'.format(
workerID, workerQueueTimeLimit))
self.dbProxy.kill_worker(workSpec.workerID)
self.dbProxy.mark_workers_to_kill_by_workerids([workSpec.workerID])
diagMessage = 'Killed by Harvester due to worker queuing too long. ' + diagMessage
workSpec.set_pilot_error(PilotErrors.FAILEDBYSERVER, diagMessage)
# set closed
Expand All @@ -689,9 +702,8 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log,
if messenger.is_alive(workSpec, worker_heartbeat_limit):
tmp_log.debug('heartbeat for workerID={0} is valid'.format(workerID))
else:
tmp_log.debug('heartbeat for workerID={0} expired: sending kill request'.format(
workerID))
self.dbProxy.kill_worker(workSpec.workerID)
tmp_log.debug('heartbeat for workerID={0} expired: sending kill request'.format(workerID))
self.dbProxy.mark_workers_to_kill_by_workerids([workSpec.workerID])
diagMessage = 'Killed by Harvester due to worker heartbeat expired. ' + diagMessage
workSpec.set_pilot_error(PilotErrors.FAILEDBYSERVER, diagMessage)
# get work attributes
Expand Down Expand Up @@ -725,7 +737,7 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log,
newStatus = WorkSpec.ST_idle
elif not workSpec.is_post_processed():
if (not queue_config.is_no_heartbeat_status(newStatus) and not queue_config.truePilot) \
or (hasattr(messenger, 'forcePostProcessing') and messenger.forcePostProcessing):
or (hasattr(messenger, 'forcePostProcessing') and messenger.forcePostProcessing):
# post processing unless heartbeat is suppressed
jobSpecs = self.dbProxy.get_jobs_with_worker_id(workSpec.workerID,
None, True,
Expand Down Expand Up @@ -783,9 +795,12 @@ def monitor_event_deliverer(self, time_window):
def monitor_event_digester(self, locked_by, max_events):
tmpLog = self.make_logger(_logger, 'id=monitor-{0}'.format(self.get_pid()), method_name='monitor_event_digester')
tmpLog.debug('start')
timeNow_timestamp = time.time()
retMap = {}
obj_gotten_list = self.monitor_event_fifo.getmany(mode='first', count=max_events, protective=True)
try:
obj_gotten_list = self.monitor_event_fifo.getmany(mode='first', count=max_events, protective=True)
except Exception as e:
obj_gotten_list = []
tmpLog.error('monitor_event_fifo excepted with {0}'.format(e))
workerID_list = [ obj_gotten.id for obj_gotten in obj_gotten_list ]
tmpLog.debug('got {0} worker events'.format(len(workerID_list)))
if len(workerID_list) > 0:
Expand All @@ -797,8 +812,8 @@ def monitor_event_digester(self, locked_by, max_events):
tmpLog.debug('checking workers of queueName={0} configID={1}'.format(*qc_key))
try:
retVal = self.monitor_agent_core(locked_by, queueName, workSpecsList,
from_fifo=True, config_id=configID,
check_source='Event')
from_fifo=True, config_id=configID,
check_source='Event')
except Exception as e:
tmpLog.error('monitor_agent_core excepted with {0}'.format(e))
retVal = None # skip the loop
Expand All @@ -813,10 +828,17 @@ def monitor_event_disposer(self, event_lifetime, max_events):
tmpLog = self.make_logger(_logger, 'id=monitor-{0}'.format(self.get_pid()), method_name='monitor_event_disposer')
tmpLog.debug('start')
timeNow_timestamp = time.time()
obj_gotten_list = self.monitor_event_fifo.getmany(mode='first',
try:
obj_gotten_list = self.monitor_event_fifo.getmany(mode='first',
maxscore=(timeNow_timestamp-event_lifetime),
count=max_events, temporary=True)
except Exception as e:
obj_gotten_list = []
tmpLog.error('monitor_event_fifo excepted with {0}'.format(e))
tmpLog.debug('removed {0} events'.format(len(obj_gotten_list)))
n_events = self.monitor_event_fifo.size()
tmpLog.debug('now {0} events in monitor-event fifo'.format(n_events))
try:
n_events = self.monitor_event_fifo.size()
tmpLog.debug('now {0} events in monitor-event fifo'.format(n_events))
except Exception as e:
tmpLog.error('failed to get size of monitor-event fifo: {0}'.format(e))
tmpLog.debug('done')
2 changes: 1 addition & 1 deletion pandaharvester/harvesterbody/propagator.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def run(self):
tmpRet['command'] = 'tobekilled'
# got kill command
if 'command' in tmpRet and tmpRet['command'] in ['tobekilled']:
nWorkers = self.dbProxy.kill_workers_with_job(tmpJobSpec.PandaID)
nWorkers = self.dbProxy.mark_workers_to_kill_by_pandaid(tmpJobSpec.PandaID)
if nWorkers == 0:
# no workers
tmpJobSpec.status = 'cancelled'
Expand Down
Loading

0 comments on commit b3369ac

Please sign in to comment.