Skip to content

Commit

Permalink
Merge pull request #240 from HSF/flin
Browse files Browse the repository at this point in the history
v0.5.9; fix queue params when switching between pull and push, fix fifo benchmark in multi-threads; typos
  • Loading branch information
mightqxc authored Jul 23, 2024
2 parents ed7ca90 + dd017e1 commit c33a9be
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 46 deletions.
2 changes: 1 addition & 1 deletion pandaharvester/commit_timestamp.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
timestamp = "03-07-2024 15:25:26 on flin (by mightqxc)"
timestamp = "23-07-2024 10:30:54 on flin (by mightqxc)"
8 changes: 4 additions & 4 deletions pandaharvester/harvesterbody/cacher.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def run(self):
return

# main
def execute(self, force_update=False, skip_lock=False, n_thread=0):
def execute(self, force_update=False, skip_lock=False, n_threads=0):
mainLog = self.make_logger(_logger, f"id={self.get_pid()}", method_name="execute")
# get lock
locked = self.dbProxy.get_process_lock("cacher", self.get_pid(), harvester_config.cacher.sleepTime)
Expand Down Expand Up @@ -82,9 +82,9 @@ def _refresh_cache(inputs):
mainLog.error(f"failed to refresh key={mainKey} subKey={subKey} due to a DB error")

# loop over all items
if n_thread:
mainLog.debug(f"refresh cache with {n_thread} threads")
with ThreadPoolExecutor(n_thread) as thread_pool:
if n_threads:
mainLog.debug(f"refresh cache with {n_threads} threads")
with ThreadPoolExecutor(n_threads) as thread_pool:
thread_pool.map(_refresh_cache, itemsList)
else:
mainLog.debug("refresh cache")
Expand Down
6 changes: 3 additions & 3 deletions pandaharvester/harvestercore/fifos.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def make_logger(self, base_log, token=None, method_name=None, send_dialog=True):
hook = None
return core_utils.make_logger(base_log, token=token, method_name=method_name, hook=hook)

# intialize fifo from harvester configuration
# initialize fifo from harvester configuration
def _initialize_fifo(self, force_enable=False):
self.fifoName = f"{self.titleName}_fifo"
self.config = getattr(harvester_config, self.titleName)
Expand Down Expand Up @@ -285,7 +285,7 @@ def update(self, id, item=None, score=None, temporary=None, cond_score="gt"):
return retVal


# Special fifo base for non havester-agent
# Special fifo base for non harvester-agent
class SpecialFIFOBase(FIFOBase):
# constructor
def __init__(self, **kwarg):
Expand All @@ -303,7 +303,7 @@ def __init__(self, **kwarg):
self.fifo = pluginFactory.get_plugin(pluginConf)


# Management fifo class, for managning fifo database
# Management fifo class, for managing fifo database
class ManagementFIFO(SpecialFIFOBase):
titleName = "management"

Expand Down
6 changes: 2 additions & 4 deletions pandaharvester/harvestercore/queue_config_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,13 +561,11 @@ def load_data(self, refill_table=False):
# nullify all job limit attributes if NoJob mapType (PULL)
if queueConfig.mapType == WorkSpec.MT_NoJob:
for attName in ["nQueueLimitJob", "nQueueLimitJobRatio", "nQueueLimitJobMax", "nQueueLimitJobMin"]:
if hasattr(queueConfig, attName):
setattr(queueConfig, attName, None)
setattr(queueConfig, attName, None)
# nullify worker ratio limit attributes if jobful mapTypes (PUSH)
if queueConfig.mapType != WorkSpec.MT_NoJob:
for attName in ["nQueueLimitWorkerRatio", "nQueueLimitWorkerMin"]:
if hasattr(queueConfig, attName):
setattr(queueConfig, attName, None)
setattr(queueConfig, attName, None)
# heartbeat suppression
if queueConfig.truePilot and queueConfig.noHeartbeat == "":
queueConfig.noHeartbeat = "running,transferring,finished,failed"
Expand Down
85 changes: 56 additions & 29 deletions pandaharvester/harvesterscripts/harvester_admin.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import argparse
import json
import logging
import os
import random
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor

Expand Down Expand Up @@ -56,9 +56,9 @@ def json_print(data):
print(json.dumps(data, sort_keys=True, indent=4))


def multithread_executer(func, n_object, n_thread):
with ThreadPoolExecutor(n_thread) as _pool:
retIterator = _pool.map(func, range(n_object))
def multithread_executer(func, n_objects, n_threads, initializer=None, initargs=()):
with ThreadPoolExecutor(n_threads, initializer=initializer, initargs=initargs) as _pool:
retIterator = _pool.map(func, range(n_objects))
return retIterator


Expand Down Expand Up @@ -121,63 +121,90 @@ def get(arguments):


def fifo_benchmark(arguments):
n_object = arguments.n_object
n_thread = arguments.n_thread
mq = harvesterFifos.BenchmarkFIFO()
sw = core_utils.get_stopwatch()
n_objects = arguments.n_objects
n_threads = arguments.n_threads
sum_dict = {
"put_n": 0,
"put_time": 0.0,
"get_time": 0.0,
"get_protective_time": 0.0,
"clear_time": 0.0,
}
thread_lock = threading.Lock()
thread_fifo_map = {}

def _thread_initializer():
thread_id = threading.get_ident()
with thread_lock:
thread_fifo_map[thread_id] = harvesterFifos.BenchmarkFIFO()

def _put_object(i_index):
mq = thread_fifo_map.get(threading.get_ident())
if mq is None:
return
workspec = WorkSpec()
workspec.workerID = i_index
data = {"random": [(i_index**2) % 2**16, random.random()]}
workspec.workAttributes = data
mq.put(workspec)

def _get_object(i_index):
mq = thread_fifo_map.get(threading.get_ident())
if mq is None:
return
return mq.get(timeout=3, protective=False)

def _get_object_protective(i_index):
mq = thread_fifo_map.get(threading.get_ident())
if mq is None:
return
return mq.get(timeout=3, protective=True)

def put_test():
sw = core_utils.get_stopwatch()
sw.reset()
multithread_executer(_put_object, n_object, n_thread)
multithread_executer(_put_object, n_objects, n_threads, _thread_initializer)
sum_dict["put_time"] += sw.get_elapsed_time_in_sec()
sum_dict["put_n"] += 1
print(f"Put {n_object} objects by {n_thread} threads" + sw.get_elapsed_time())
print(f"Now fifo size is {mq.size()}")
print(f"Put {n_objects} objects by {n_threads} threads" + sw.get_elapsed_time())
benchmark_mq = harvesterFifos.BenchmarkFIFO()
print(f"Now fifo size is {benchmark_mq.size()}")
del benchmark_mq

def get_test():
sw = core_utils.get_stopwatch()
sw.reset()
multithread_executer(_get_object, n_object, n_thread)
multithread_executer(_get_object, n_objects, n_threads, _thread_initializer)
sum_dict["get_time"] = sw.get_elapsed_time_in_sec()
print(f"Get {n_object} objects by {n_thread} threads" + sw.get_elapsed_time())
print(f"Now fifo size is {mq.size()}")
print(f"Get {n_objects} objects by {n_threads} threads" + sw.get_elapsed_time())
benchmark_mq = harvesterFifos.BenchmarkFIFO()
print(f"Now fifo size is {benchmark_mq.size()}")
del benchmark_mq

def get_protective_test():
sw = core_utils.get_stopwatch()
sw.reset()
multithread_executer(_get_object_protective, n_object, n_thread)
multithread_executer(_get_object_protective, n_objects, n_threads, _thread_initializer)
sum_dict["get_protective_time"] = sw.get_elapsed_time_in_sec()
print(f"Get {n_object} objects protective dequeue by {n_thread} threads" + sw.get_elapsed_time())
print(f"Now fifo size is {mq.size()}")
print(f"Get {n_objects} objects protective dequeue by {n_threads} threads" + sw.get_elapsed_time())
benchmark_mq = harvesterFifos.BenchmarkFIFO()
print(f"Now fifo size is {benchmark_mq.size()}")
del benchmark_mq

def clear_test():
benchmark_mq = harvesterFifos.BenchmarkFIFO()
sw = core_utils.get_stopwatch()
sw.reset()
mq.fifo.clear()
benchmark_mq.fifo.clear()
sum_dict["clear_time"] = sw.get_elapsed_time_in_sec()
print("Cleared fifo" + sw.get_elapsed_time())
print(f"Now fifo size is {mq.size()}")
print(f"Now fifo size is {benchmark_mq.size()}")
del benchmark_mq

# Benchmark
print("Start fifo benchmark ...")
mq.fifo.clear()
benchmark_mq = harvesterFifos.BenchmarkFIFO()
benchmark_mq.fifo.clear()
print("Cleared fifo")
put_test()
get_test()
Expand All @@ -188,12 +215,12 @@ def clear_test():
print("Finished fifo benchmark")
# summary
print("Summary:")
print(f"FIFO plugin is: {mq.fifo.__class__.__name__}")
print(f"Benchmark with {n_object} objects by {n_thread} threads")
print(f"Put : {1000.0 * sum_dict['put_time'] / (sum_dict['put_n'] * n_object):.3f} ms / obj")
print(f"Get : {1000.0 * sum_dict['get_time'] / n_object:.3f} ms / obj")
print(f"Get protective : {1000.0 * sum_dict['get_protective_time'] / n_object:.3f} ms / obj")
print(f"Clear : {1000.0 * sum_dict['clear_time'] / n_object:.3f} ms / obj")
print(f"FIFO plugin is: {benchmark_mq.fifo.__class__.__name__}")
print(f"Benchmark with {n_objects} objects by {n_threads} threads")
print(f"Put : {1000.0 * sum_dict['put_time'] / (sum_dict['put_n'] * n_objects):.3f} ms / obj")
print(f"Get : {1000.0 * sum_dict['get_time'] / n_objects:.3f} ms / obj")
print(f"Get protective : {1000.0 * sum_dict['get_protective_time'] / n_objects:.3f} ms / obj")
print(f"Clear : {1000.0 * sum_dict['clear_time'] / n_objects:.3f} ms / obj")


def fifo_repopulate(arguments):
Expand All @@ -209,7 +236,7 @@ def cacher_refresh(arguments):

communicatorPool = CommunicatorPool()
cacher = Cacher(communicatorPool)
cacher.execute(force_update=True, skip_lock=True, n_thread=4)
cacher.execute(force_update=True, skip_lock=True, n_threads=4)


def qconf_list(arguments):
Expand Down Expand Up @@ -368,8 +395,8 @@ def main():
# fifo benchmark command
fifo_benchmark_parser = fifo_subparsers.add_parser("benchmark", help="benchmark fifo backend")
fifo_benchmark_parser.set_defaults(which="fifo_benchmark")
fifo_benchmark_parser.add_argument("-n", type=int, dest="n_object", action="store", default=500, metavar="<N>", help="Benchmark with N objects")
fifo_benchmark_parser.add_argument("-t", type=int, dest="n_thread", action="store", default=1, metavar="<N>", help="Benchmark with N threads")
fifo_benchmark_parser.add_argument("-n", type=int, dest="n_objects", action="store", default=500, metavar="<N>", help="Benchmark with N objects")
fifo_benchmark_parser.add_argument("-t", type=int, dest="n_threads", action="store", default=1, metavar="<N>", help="Benchmark with N threads")
# fifo repopuate command
fifo_repopulate_parser = fifo_subparsers.add_parser("repopulate", help="Repopulate agent fifo")
fifo_repopulate_parser.set_defaults(which="fifo_repopulate")
Expand Down
2 changes: 1 addition & 1 deletion pandaharvester/panda_pkg_info.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.5.8"
release_version = "0.5.9"
2 changes: 1 addition & 1 deletion templates/panda/panda_harvester-uwsgi.ini.rpmnew.template
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
; Comment with semicolon like this line

; These options are necessary to run harvester with uwsgi and control with
; the uwsgi servie script. Better not change them :)
; the uwsgi service script. Better not change them :)
master = true
lazy-apps = true
wsgi-file = $(SITE_PACKAGES_PATH)/pandaharvester/harvesterbody/master.py
Expand Down
6 changes: 3 additions & 3 deletions templates/panda/panda_harvester.cfg.rpmnew.template
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ dynamic_plugin_change = False

[db]

# database filename for sqlite. Better to use local disk if possible since sqlite doesn't like NAS
database_filename = FIXME

# verbose
verbose = False

Expand All @@ -48,6 +45,9 @@ nConnections = 10
# database engine : sqlite or mariadb
engine = sqlite

# database filename for sqlite. Better to use local disk if possible since sqlite doesn't like NAS
database_filename = FIXME

# use MySQLdb for mariadb access
useMySQLdb = False

Expand Down

0 comments on commit c33a9be

Please sign in to comment.