Skip to content

Commit

Permalink
Merge pull request #252 from tmaeno/master
Browse files Browse the repository at this point in the history
workflow_name and json serializable
  • Loading branch information
tmaeno authored Oct 9, 2023
2 parents 8d7dd60 + d9f9a9f commit 8557d31
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 9 deletions.
2 changes: 1 addition & 1 deletion PandaPkgInfo.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.0.66"
release_version = "0.0.67"
16 changes: 16 additions & 0 deletions pandaserver/srvcore/CoreUtils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import re
import os
import json
import datetime
import subprocess
from threading import Lock
Expand Down Expand Up @@ -158,3 +159,18 @@ def __contains__(self, item):
def __getitem__(self, name):
self.update()
return self.cachedObj[name]


# convert datetime to string
class NonJsonObjectEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime.datetime):
return {"_datetime_object": obj.strftime("%Y-%m-%d %H:%M:%S.%f")}
return json.JSONEncoder.default(self, obj)


# hook for json decoder
def as_python_object(dct):
if "_datetime_object" in dct:
return datetime.datetime.strptime(str(dct["_datetime_object"]), "%Y-%m-%d %H:%M:%S.%f")
return dct
7 changes: 7 additions & 0 deletions pandaserver/taskbuffer/FileSpec.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,10 @@ def isAllowedNoOutput(self):
except Exception:
pass
return False

# dump to be json-serializable
def dump_to_json_serializable(self):
stat = self.__getstate__()[:-1]
# set None as _owner
stat.append(None)
return stat
24 changes: 24 additions & 0 deletions pandaserver/taskbuffer/JobSpec.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import json
import datetime

from pandaserver.taskbuffer.FileSpec import FileSpec

reserveChangedState = False


Expand Down Expand Up @@ -855,6 +857,28 @@ def set_ram_for_retry(self, val):
newItems.append("{0}:{1}".format(self._tagForSH["retryRam"], val))
self.specialHandling = ",".join(newItems)

# dump to json-serializable
def dump_to_json_serializable(self):
job_state = self.__getstate__()
file_state_list = []
for file_spec in job_state[-1]:
file_stat = file_spec.dump_to_json_serializable()
file_state_list.append(file_stat)
job_state = job_state[:-1]
# append files
job_state.append(file_state_list)
return job_state

# load from json-serializable
def load_from_json_serializable(self, job_state):
# initialize with empty file list
self.__setstate__(job_state[:-1] + [[]])
# add files
for file_stat in job_state[-1]:
file_spec = FileSpec()
file_spec.__setstate__(file_stat)
self.addFile(file_spec)


# utils

Expand Down
24 changes: 24 additions & 0 deletions pandaserver/taskbuffer/JobUtils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import re
import json

from pandaserver.taskbuffer.JobSpec import JobSpec

from pandaserver.srvcore.CoreUtils import NonJsonObjectEncoder, as_python_object

try:
long
Expand Down Expand Up @@ -179,3 +184,22 @@ def compensate_ram_count(ram_count):
if ram_count is not None:
ram_count = int(ram_count * 0.90)
return ram_count


# dump jobs to serialized json
def dump_jobs_json(jobs):
state_objects = []
for job_spec in jobs:
state_objects.append(job_spec.dump_to_json_serializable())
return json.dumps(state_objects, cls=NonJsonObjectEncoder)


# load serialized json to jobs
def load_jobs_json(state):
state_objects = json.loads(state, object_hook=as_python_object)
jobs = []
for job_state in state_objects:
job_spec = JobSpec()
job_spec.load_from_json_serializable(job_state)
jobs.append(job_spec)
return jobs
7 changes: 5 additions & 2 deletions pandaserver/taskbuffer/workflow_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ def core_exec(sandbox_url, log_token, dump_workflow, ops_file, user_name, test_m
# parse workflow files
if is_OK:
tmpLog.info("parse workflow")
workflow_name = None
if ops["data"]["language"] == "cwl":
workflow_name = ops["data"].get("workflow_name")
nodes, root_in = pcwl_utils.parse_workflow_file(ops["data"]["workflowSpecFile"], tmpLog)
with open(ops["data"]["workflowInputFile"]) as workflow_input:
data = yaml.safe_load(workflow_input)
Expand Down Expand Up @@ -229,13 +231,14 @@ def core_exec(sandbox_url, log_token, dump_workflow, ops_file, user_name, test_m
(
workflow_to_submit,
dump_str_list,
) = workflow_utils.convert_nodes_to_workflow(nodes)
) = workflow_utils.convert_nodes_to_workflow(nodes, workflow_name=workflow_name)
try:
if workflow_to_submit:
if not test_mode:
tmpLog.info("submit workflow")
wm = ClientManager(host=get_rest_host())
request_id = wm.submit(workflow_to_submit, username=user_name)
request_id = wm.submit(workflow_to_submit, username=user_name,
use_dataset_name=False)
else:
dump_str = "workflow is empty"
tmpLog.error(dump_str)
Expand Down
10 changes: 6 additions & 4 deletions pandaserver/userinterface/UserIF.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,10 @@ def checkSandboxFile(self, userName, fileSize, checkSum):
return ret

# get job status
def getJobStatus(self, idsStr, use_json):
def getJobStatus(self, idsStr, use_json, no_pickle=False):
try:
# deserialize IDs
if use_json:
if use_json or no_pickle:
ids = json.loads(idsStr)
else:
ids = WrappedPickle.loads(idsStr)
Expand All @@ -232,6 +232,8 @@ def getJobStatus(self, idsStr, use_json):
# serialize
if use_json:
return json.dumps(ret)
if no_pickle:
return JobUtils.dump_jobs_json(ret)
return WrappedPickle.dumps(ret)

# get PandaID with jobexeID
Expand Down Expand Up @@ -1181,8 +1183,8 @@ def runTaskAssignment(req, jobs):


# get job status
def getJobStatus(req, ids):
return userIF.getJobStatus(ids, req.acceptJson())
def getJobStatus(req, ids, no_pickle=None):
return userIF.getJobStatus(ids, req.acceptJson(), no_pickle)


# get PandaID with jobexeID
Expand Down
5 changes: 3 additions & 2 deletions pandaserver/workflow/workflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,17 +674,18 @@ def get_dict_form(self, serial_id=None, dict_form=None):


# convert nodes to workflow
def convert_nodes_to_workflow(nodes, workflow_node=None, workflow=None):
def convert_nodes_to_workflow(nodes, workflow_node=None, workflow=None, workflow_name=None):
if workflow is None:
is_top = True
workflow = Workflow()
workflow.name = workflow_name
else:
is_top = False
id_work_map = {}
all_sub_id_work_map = {}
sub_to_id_map = {}
cond_dump_str = " Conditions\n"
class_dump_str = "===== Workflow ID:{} ====\n".format(workflow_node.id if workflow_node else None)
class_dump_str = "===== Workflow ID:{} ====\n".format(workflow_node.id if workflow_node else workflow_name)
class_dump_str += " Works\n"
dump_str_list = []
# create works or workflows
Expand Down

0 comments on commit 8557d31

Please sign in to comment.