Skip to content

Commit

Permalink
multi-pos bugfix + multiple enhancements
Browse files Browse the repository at this point in the history
fixes:
- multi-pos dataset would be displayed after single pos processing

enhancements:
- CLI will print Job status when used as cmd line, not for GUI
- use single socket connection when multi-pos is spawned by a request
- added "rx" field to model-container
- minor GUI tweaks
  • Loading branch information
amitabhverma committed Jan 18, 2025
1 parent 796cb59 commit 282c4d5
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 77 deletions.
5 changes: 4 additions & 1 deletion recOrder/cli/apply_inverse_transfer_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ def apply_inverse_transfer_function_cli(
f"{num_jobs} job{'s' if num_jobs > 1 else ''} submitted {'locally' if executor.cluster == 'local' else 'via ' + executor.cluster}."
)

doPrint = True # CLI prints Job status when used as cmd line
if unique_id != "": # no unique_id means no job submission info being listened to
JM.startClient()
i=0
Expand All @@ -384,9 +385,11 @@ def apply_inverse_transfer_function_cli(
position = input_position_dirpaths[i]
JM.putJobInList(job, unique_id, str(job_idx), position, str(executor.folder.absolute()))
i += 1
JM.sendDataThread()
JM.setShorterTimeout()
doPrint = False # CLI printing disabled when using GUI

monitor_jobs(jobs, input_position_dirpaths)
monitor_jobs(jobs, input_position_dirpaths, doPrint)


@click.command()
Expand Down
60 changes: 36 additions & 24 deletions recOrder/cli/jobs_mgmt.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
SERVER_PORT = 8089 # Choose an available port
JOBS_TIMEOUT = 5 # 5 mins
SERVER_uIDsjobIDs = {} # uIDsjobIDs[uid][jid] = job

class JobsManagement():

def __init__(self, *args, **kwargs):
self.executor = submitit.AutoExecutor(folder="logs")
self.clientsocket = None
self.uIDsjobIDs = {} # uIDsjobIDs[uid][jid] = job
self.DATA_QUEUE = []

def checkForJobIDFile(self, jobID, logsPath, extension="out"):

Expand All @@ -35,7 +37,7 @@ def checkForJobIDFile(self, jobID, logsPath, extension="out"):
return ""

def setShorterTimeout(self):
self.clientsocket.settimeout(3)
self.clientsocket.settimeout(30)

def startClient(self):
try:
Expand Down Expand Up @@ -103,21 +105,37 @@ def stopClient(self):
print(exc.args)

def checkAllExpJobsCompletion(self, uID):
if uID in self.uIDsjobIDs.keys():
for jobEntry in self.uIDsjobIDs[uID]:
jobsBool = jobEntry["jID"]
if jobsBool == False:
if uID in SERVER_uIDsjobIDs.keys():
for jobEntry in SERVER_uIDsjobIDs[uID].keys():
job:submitit.Job = SERVER_uIDsjobIDs[uID][jobEntry]["job"]
jobBool = SERVER_uIDsjobIDs[uID][jobEntry]["bool"]
if job is not None and job.done() == False:
return False
if jobBool == False:
return False
return True

def putJobCompletionInList(self, jobBool, uID: str, jID: str, mode="client"):
if uID in self.uIDsjobIDs.keys():
if jID in self.uIDsjobIDs[uID].keys():
self.uIDsjobIDs[uID][jID] = jobBool
if uID in SERVER_uIDsjobIDs.keys():
if jID in SERVER_uIDsjobIDs[uID].keys():
SERVER_uIDsjobIDs[uID][jID]["bool"] = jobBool

def addData(self, data):
self.DATA_QUEUE.append(data)

def sendDataThread(self):
thread = threading.Thread(target=self.sendData)
thread.start()

def sendData(self):
data = "".join(self.DATA_QUEUE)
self.clientsocket.send(data.encode())
self.DATA_QUEUE = []

def putJobInList(self, job, uID: str, jID: str, well:str, log_folder_path:str="", mode="client"):
try:
well = str(well)
jID = str(jID)
if ".zarr" in well:
wells = well.split(".zarr")
well = wells[1].replace("\\","-").replace("/","-")[1:]
Expand All @@ -129,32 +147,26 @@ def putJobInList(self, job, uID: str, jID: str, well:str, log_folder_path:str=""
if jID not in self.uIDsjobIDs[uID].keys():
self.uIDsjobIDs[uID][jID] = job
json_obj = {uID:{"jID": str(jID), "pos": well, "log": log_folder_path}}
json_str = json.dumps(json_obj)+"\n"
self.clientsocket.send(json_str.encode())
json_str = json.dumps(json_obj)+"\n"
self.addData(json_str)
else:
# from server side jobs object entry is a None object
# this will be later checked as completion boolean for a ExpID which might
# have several Jobs associated with it
if uID not in SERVER_uIDsjobIDs.keys():
SERVER_uIDsjobIDs[uID] = {}
SERVER_uIDsjobIDs[uID][jID] = job
else:
if jID not in SERVER_uIDsjobIDs[uID].keys():
SERVER_uIDsjobIDs[uID][jID] = job
SERVER_uIDsjobIDs[uID][jID] = {}
SERVER_uIDsjobIDs[uID][jID]["job"] = job
SERVER_uIDsjobIDs[uID][jID]["bool"] = False
else:
SERVER_uIDsjobIDs[uID][jID] = {}
SERVER_uIDsjobIDs[uID][jID]["job"] = job
SERVER_uIDsjobIDs[uID][jID]["bool"] = False
except Exception as exc:
print(exc.args)

def hasSubmittedJob(self, uID: str, mode="client")->bool:
if mode == "client":
if uID in self.uIDsjobIDs.keys():
return True
return False
else:
if uID in SERVER_uIDsjobIDs.keys():
return True
return False

def hasSubmittedJob(self, uID: str, jID: str, mode="client")->bool:
jID = str(jID)
if mode == "client":
if uID in self.uIDsjobIDs.keys():
if jID in self.uIDsjobIDs[uID].keys():
Expand Down
64 changes: 35 additions & 29 deletions recOrder/cli/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,31 @@
import sys


def _move_cursor_up(n_lines):
sys.stdout.write("\033[F" * n_lines)
def _move_cursor_up(n_lines, doPrint=True):
if doPrint:
sys.stdout.write("\033[F" * n_lines)


def _print_status(jobs, position_dirpaths, elapsed_list, print_indices=None):
def _print_status(jobs, position_dirpaths, elapsed_list, print_indices=None, doPrint=True):

columns = [15, 30, 40, 50]

# header
sys.stdout.write(
"\033[K" # clear line
"\033[96mID" # cyan
f"\033[{columns[0]}G WELL "
f"\033[{columns[1]}G STATUS "
f"\033[{columns[2]}G NODE "
f"\033[{columns[2]}G ELAPSED\n"
)
if doPrint:
sys.stdout.write(
"\033[K" # clear line
"\033[96mID" # cyan
f"\033[{columns[0]}G WELL "
f"\033[{columns[1]}G STATUS "
f"\033[{columns[2]}G NODE "
f"\033[{columns[2]}G ELAPSED\n"
)

if print_indices is None:
print_indices = range(len(jobs))

complete_count = 0

for i, (job, position_dirpath) in enumerate(zip(jobs, position_dirpaths)):
try:
node_name = job.get_info()["NodeList"] # slowest, so do this first
Expand All @@ -43,22 +46,24 @@ def _print_status(jobs, position_dirpaths, elapsed_list, print_indices=None):
elapsed_list[i] += 1 # inexact timing
else:
color = "\033[91m" # red

if i in print_indices:
sys.stdout.write(
f"\033[K" # clear line
f"{color}{job.job_id}"
f"\033[{columns[0]}G {'/'.join(position_dirpath.parts[-3:])}"
f"\033[{columns[1]}G {job.state}"
f"\033[{columns[2]}G {node_name}"
f"\033[{columns[3]}G {elapsed_list[i]} s\n"
)
if doPrint:
sys.stdout.write(
f"\033[K" # clear line
f"{color}{job.job_id}"
f"\033[{columns[0]}G {'/'.join(position_dirpath.parts[-3:])}"
f"\033[{columns[1]}G {job.state}"
f"\033[{columns[2]}G {node_name}"
f"\033[{columns[3]}G {elapsed_list[i]} s\n"
)
sys.stdout.flush()
print(
f"\033[32m{complete_count}/{len(jobs)} jobs complete. "
"<ctrl+z> to move monitor to background. "
"<ctrl+c> twice to cancel jobs."
)
if doPrint:
print(
f"\033[32m{complete_count}/{len(jobs)} jobs complete. "
"<ctrl+z> to move monitor to background. "
"<ctrl+c> twice to cancel jobs."
)

return elapsed_list

Expand Down Expand Up @@ -87,7 +92,7 @@ def _get_jobs_to_print(jobs, num_to_print):
return job_indices_to_print


def monitor_jobs(jobs: list[submitit.Job], position_dirpaths: list[Path]):
def monitor_jobs(jobs: list[submitit.Job], position_dirpaths: list[Path], doPrint=True):
"""Displays the status of a list of submitit jobs with corresponding paths.
Parameters
Expand All @@ -108,7 +113,7 @@ def monitor_jobs(jobs: list[submitit.Job], position_dirpaths: list[Path]):

# print all jobs once if terminal is too small
if shutil.get_terminal_size().lines - NON_JOB_LINES < len(jobs):
_print_status(jobs, position_dirpaths, elapsed_list)
_print_status(jobs, position_dirpaths, elapsed_list, doPrint)

# main monitor loop
try:
Expand All @@ -125,14 +130,15 @@ def monitor_jobs(jobs: list[submitit.Job], position_dirpaths: list[Path]):
position_dirpaths,
elapsed_list,
job_indices_to_print,
doPrint,
)

time.sleep(1)
_move_cursor_up(num_jobs_to_print + 2)
_move_cursor_up(num_jobs_to_print + 2, doPrint)

# Print final status
time.sleep(1)
_print_status(jobs, position_dirpaths, elapsed_list)
_print_status(jobs, position_dirpaths, elapsed_list, doPrint=doPrint)

# cancel jobs if ctrl+c
except KeyboardInterrupt:
Expand Down
Loading

0 comments on commit 282c4d5

Please sign in to comment.