Skip to content

Commit

Permalink
Merge pull request #24 from weka/cleanup
Browse files Browse the repository at this point in the history
Cleanup
  • Loading branch information
vince-weka authored Aug 23, 2021
2 parents 45c3292 + c867962 commit bea301f
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 178 deletions.
65 changes: 38 additions & 27 deletions async.py → async_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

"""
async.py - subprocesses to execute multithreaded API calls.
async_api.py - subprocesses to execute multithreaded API calls.
"""
import pickle
import multiprocessing
Expand All @@ -10,6 +10,8 @@
import time
import queue
import math
import traceback
import json

# initialize logger - configured in main routine
log = getLogger(__name__)
Expand All @@ -26,6 +28,9 @@ def __init__(self, hostname, category, stat, method, parms):
self.exception = False
self.times_in_q = 1

def __str__(self):
return f"{self.hostname},{self.category},{self.stat},{json.dumps(self.result,indent=2)}"


die_mf = Job(None, None, None, None, None)

Expand All @@ -34,7 +39,6 @@ class SlaveThread(object):
def __init__(self, cluster, outputq):
self.cluster = cluster
self.outputq = outputq
#self.inputq = multiprocessing.JoinableQueue(200) # this used a LOT of semaphores; ran out of them
self.inputq = queue.Queue();

self.thread = threading.Thread(target=self.slave_thread, daemon=True)
Expand All @@ -60,19 +64,20 @@ def slave_thread(self):
except wekalib.exceptions.HTTPError as exc:
if exc.code == 502: # Bad Gateway - a transient error
log.error(f"slave thread received Bad Gateway on host {job.hostname}")
if job.times_in_q <= 10:
if job.times_in_q <= 2: # lowered from 10 retries
# retry a few times
job.times_in_q += 1
self.submit(job)
self.inputq.task_done()
continue # go back to the inputq.get()
elif job.times_in_q <= 12: # give it 2 more chances
# then sleep to give the cluster a little time to recover
time.sleep(0.5) # might be the only thing in the queue...
job.times_in_q += 1
self.submit(job)
self.inputq.task_done()
continue # go back to the inputq.get()
# trex - take this out for now... extending scrape times too much
#elif job.times_in_q <= 12: # give it 2 more chances
# # then sleep to give the cluster a little time to recover
# time.sleep(0.5) # might be the only thing in the queue...
# job.times_in_q += 1
# self.submit(job)
# self.inputq.task_done()
# continue # go back to the inputq.get()

# else, give up and return the error - note: multiprocessing.queue module hates HTTPErrors - can't unpickle correctly
job.result = wekalib.exceptions.APIError(f"{exc.host}: ({exc.code}) {exc.message}") # send as APIError
Expand All @@ -81,30 +86,27 @@ def slave_thread(self):
job.result = exc
job.exception = True
log.info(f"Exception recieved on host {job.hostname}:{exc}")
log.info(traceback.format_exc())


# this will send back the above exeptions as well as good results
#log.info(f"job.result={json.dumps(job.result, indent=2)}")
self.outputq.put(job)
self.inputq.task_done()

# slave thread submit
def submit(self, job):
""" submit an job to this slave for processing """
self.inputq.put(job)


#def join():
# self.inputq.join() # wait for the queue to be completed




# Start a process that will have lots of threads
class SlaveProcess(object):
def __init__(self, cluster, num_threads, outputq):
self.cluster = cluster
self.outputq = outputq
self.queuesize = 0
self.inputq = multiprocessing.JoinableQueue(50000) # 50,000 max entries?
self.inputq = multiprocessing.JoinableQueue(500) # 50,000 max entries?

self.slavesthreads = list()
self.num_threads = num_threads
Expand All @@ -114,6 +116,7 @@ def __init__(self, cluster, num_threads, outputq):
self.proc.start()


# process submit
def submit(self, job):
""" submit an job to this slave for processing """
self.inputq.put(job)
Expand Down Expand Up @@ -143,17 +146,18 @@ def slave_process(self, cluster):
#log.debug(f"got job from queue, {job.hostname}, {job.category}, {job.stat}")

if job.hostname is None:
#die_mf = Job(None, None, None, None, None)
# we were told to die... shut it down
for slave in self.slavethreads:
if not slave.thread.is_alive():
# for tracking errors
log.error(f"a thread is already dead?")
continue
# we want to make sure they're done before we kill them
slave.submit(die_mf)
slave.submit(die_mf) # slave THREAD
# do we need to wait for the queue to drain? Is that even a good idea?

# have to leave a lot of time in case it has a full inputq
slave.thread.join(timeout=60.0) # wait for it to die
slave.thread.join(timeout=5.0) # wait for it to die
if slave.thread.is_alive():
log.error(f"a thread didn't die!")
del self.inputq
Expand Down Expand Up @@ -200,7 +204,8 @@ def slave_process(self, cluster):
self.slavesthreads[bucket].submit(job)
self.inputq.task_done()

def join():
# process join
def join(self):
self.inputq.join() # wait for the queue to be completed


Expand Down Expand Up @@ -233,15 +238,15 @@ def __init__(self, cluster, max_procs=8, max_threads_per_proc=100):

# kill the slave processes
def __del__(self):
#die_mf = Job(None, None, None, None, None)
for slave in self.slaves:
slave.submit(die_mf)
slave.proc.join(60) # wait for it to die
slave.proc.join(5.0) # wait for it to die
del self.outputq

# submit a job
def submit(self, hostname, category, stat, method, parms):
job = Job(hostname, category, stat, method, parms) # wekahost? Object? decisions, decisions
log.debug(f"submitting job {job}")
try:
this_hash = self.bucket_array.index(hostname)
except ValueError:
Expand Down Expand Up @@ -275,25 +280,31 @@ def wait(self):
#self.log_stats()

# what if a slave dies or hangs? What will join() do?
# inputq is a multiprocessing.JoinableQueue
for slave in self.slaves:
#log.error(f"joining slave queue {self.slaves.index(slave)}")
# check if slave is alive/dead before joining?
#if slave.proc.is_alive():
slave.inputq.join() # wait for the inputq to drain
#slave.log_stats()


# all the slaves should be dead, we join()ed them above
while self.num_outstanding > 0:
try:
result = self.outputq.get(True, 60.0) # need try/except here to prevent process from locking up?
result = self.outputq.get(True, 10.0)
except queue.Empty as exc:
log.error(f"outputq timeout!")
log.error(f"outputq timeout!") # should never happen because they're dead
# queue is empty, just return
return
self.num_outstanding -= 1
if not result.exception:
if len(result.result) != 0:
yield result # yield so it is an iterator
else:
log.debug(f"API sent error: {result.result}")
# do we requeue?

# queue should be empty now
return


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit bea301f

Please sign in to comment.