Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
esseivaju committed Oct 4, 2024
1 parent 77ff2f5 commit f18b61d
Show file tree
Hide file tree
Showing 26 changed files with 311 additions and 985 deletions.
19 changes: 4 additions & 15 deletions bin/validate-raythena-job.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,12 @@ def validate_job(job_dir, job_state_file):
with open(job_state_file) as f:
job_state = json.load(f)
merged_input_files = job_state["merged"]
merged_output_files = set(
[list(x.keys())[0] for x in merged_input_files.values()]
)
merged_output_files = set([list(x.keys())[0] for x in merged_input_files.values()])
event_numbers = set()
for output_file in merged_output_files:
output_file_abs = path.join(job_dir, "final", output_file)
if not path.isfile(output_file_abs):
print(
"Expected file "
+ output_file_abs
+ " to be present in the job directory"
)
print("Expected file " + output_file_abs + " to be present in the job directory")
exit(1)

current_event_numbers = get_event_numbers(output_file_abs)
Expand All @@ -45,16 +39,11 @@ def validate_job(job_dir, job_state_file):
"Duplicate events in file "
+ output_file
+ "("
+ str(
len(current_event_numbers)
- len(unique_current_event_numbers)
)
+ str(len(current_event_numbers) - len(unique_current_event_numbers))
+ "): "
)
exit(1)
print(
str(len(current_event_numbers)) + " events in file " + output_file
)
print(str(len(current_event_numbers)) + " events in file " + output_file)
if not unique_current_event_numbers.isdisjoint(event_numbers):
print(
"Found duplicate events in file "
Expand Down
20 changes: 5 additions & 15 deletions example/standalone_ray_test_hello_world.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ def __init__(self) -> None:
self.pid = os.getpid()
self.hostname = platform.node()
self.ip = ray._private.services.get_node_ip_address()
print(
f"Initial message from PID - {self.pid} Running on host - {self.hostname} {self.ip}"
)
print(f"Initial message from PID - {self.pid} Running on host - {self.hostname} {self.ip}")

def ping(self):
print(f"{self.pid} {self.hostname} {self.ip} - ping")
Expand Down Expand Up @@ -78,18 +76,10 @@ def main(redis_ip: str, redis_port: str, redis_password: str):


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Wait on ray head node or workers to connect"
)
parser.add_argument(
"--redis-ip", default="{}".format(os.environ["RAYTHENA_RAY_HEAD_IP"])
)
parser.add_argument(
"--redis-port", default="{}".format(os.environ["RAYTHENA_RAY_REDIS_PORT"])
)
parser.add_argument(
"--redis-password", default=os.environ["RAYTHENA_RAY_REDIS_PASSWORD"]
)
parser = argparse.ArgumentParser(description="Wait on ray head node or workers to connect")
parser.add_argument("--redis-ip", default="{}".format(os.environ["RAYTHENA_RAY_HEAD_IP"]))
parser.add_argument("--redis-port", default="{}".format(os.environ["RAYTHENA_RAY_REDIS_PORT"]))
parser.add_argument("--redis-password", default=os.environ["RAYTHENA_RAY_REDIS_PASSWORD"])
args = parser.parse_args()
print(f"args : {args}")
main(args.redis_ip, args.redis_port, args.redis_password)
127 changes: 29 additions & 98 deletions src/raythena/actors/esworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from time import sleep
from typing import Any, Optional, Union
import ray
# from raythena.utils.timing import CPUMonitor
from raythena.actors.payloads.basePayload import BasePayload
from raythena.actors.payloads.eventservice.esPayload import ESPayload
from raythena.actors.payloads.eventservice.pilothttp import PilotHttpPayload
Expand Down Expand Up @@ -56,18 +55,12 @@ class ESWorker:
"""

READY_FOR_JOB = 0 # initial state, before the first job request
JOB_REQUESTED = (
1 # job has been requested to the driver, waiting for result
)
JOB_REQUESTED = 1 # job has been requested to the driver, waiting for result
READY_FOR_EVENTS = 2 # ready to request new events for the current job
EVENT_RANGES_REQUESTED = (
3 # event ranges have been requested to the driver, waiting for result
)
EVENT_RANGES_REQUESTED = 3 # event ranges have been requested to the driver, waiting for result
FINISHING_LOCAL_RANGES = 4 # do not request additional ranges, will move to STAGE_OUT once local cache is empty
PROCESSING = 5 # currently processing event ranges
FINISHING = (
6 # Performing cleanup of resources, preparing final server update
)
FINISHING = 6 # Performing cleanup of resources, preparing final server update
DONE = 7 # Actor has finished processing job
STAGE_IN = 8 # Staging-in data.
STAGE_OUT = 9 # Staging-out data
Expand Down Expand Up @@ -134,24 +127,16 @@ def __init__(
self.payload_actor_process_dir = None
self.actor_ray_logs_dir = None
self.cpu_monitor = None
self.workdir = os.path.expandvars(
self.config.ray.get("workdir", os.getcwd())
)
self.workdir = os.path.expandvars(self.config.ray.get("workdir", os.getcwd()))
if not os.path.isdir(self.workdir):
self.workdir = os.getcwd()
self.output_dir = self.config.ray.get("outputdir")
self.pilot_kill_file = os.path.expandvars(
self.config.payload.get("pilotkillfile", "pilot_kill_payload")
)
self.pilot_kill_file = os.path.expandvars(self.config.payload.get("pilotkillfile", "pilot_kill_payload"))
self.pilot_kill_time = self.config.payload.get("pilotkilltime", 600)
self.time_monitor_file = os.path.expandvars(
self.config.payload.get(
"timemonitorfile", "RaythenaTimeMonitor.txt"
)
)
self.payload: Union[BasePayload, ESPayload] = PilotHttpPayload(
self.id, self.config
self.config.payload.get("timemonitorfile", "RaythenaTimeMonitor.txt")
)
self.payload: Union[BasePayload, ESPayload] = PilotHttpPayload(self.id, self.config)
self.start_time = -1
self.time_limit = -1
self.elapsed = 1
Expand All @@ -170,12 +155,7 @@ def check_time(self) -> None:
"""
while True:
curtime = datetime.datetime.now()
time_elapsed = (
curtime.hour * 3600
+ curtime.minute * 60
+ curtime.second
- self.start_time
)
time_elapsed = curtime.hour * 3600 + curtime.minute * 60 + curtime.second - self.start_time
if time_elapsed <= 0:
time_elapsed = 24 * 3600 + time_elapsed
if time_elapsed // 300 >= self.elapsed:
Expand All @@ -184,13 +164,9 @@ def check_time(self) -> None:
if self.config.logging.get("copyraylogs", False):
if os.path.isdir(self.actor_ray_logs_dir):
shutil.rmtree(self.actor_ray_logs_dir)
shutil.copytree(
self.session_log_dir, self.actor_ray_logs_dir
)
shutil.copytree(self.session_log_dir, self.actor_ray_logs_dir)
except Exception as e:
self._logger.warning(
f"Failed to copy ray logs to actor directory: {e}"
)
self._logger.warning(f"Failed to copy ray logs to actor directory: {e}")
if time_elapsed > self.time_limit - self.pilot_kill_time:
with open(self.pilot_kill_file, "w") as f:
f.write("KILL")
Expand All @@ -217,9 +193,7 @@ def modify_job(self, job: PandaJob) -> PandaJob:
if len(input_evnt_file) != 1:
return job
in_files = [
os.path.join(
os.path.expandvars(self.config.harvester["endpoint"]), x
)
os.path.join(os.path.expandvars(self.config.harvester["endpoint"]), x)
for x in input_evnt_file[0].split(",")
]
in_files = ",".join(in_files[0:1])
Expand All @@ -230,9 +204,7 @@ def modify_job(self, job: PandaJob) -> PandaJob:
)
# convert args of the form --outputHITSFile=HITS.30737678._[011001,...].pool.root
# to --outputHITSFile=HITS.30737678._011001.pool.root
match = re.findall(
r"--outputHITSFile=([0-9A-Z._]+)\[([0-9,]+)\](.pool.root)", cmd
)
match = re.findall(r"--outputHITSFile=([0-9A-Z._]+)\[([0-9,]+)\](.pool.root)", cmd)
if match:
match_tuple = match[0]
prefix = match_tuple[0]
Expand All @@ -245,15 +217,9 @@ def modify_job(self, job: PandaJob) -> PandaJob:
cmd,
)

job_number = (
max(int(job["attemptNr"]) - 1, 0) * self.actor_count
+ self.actor_no
+ 1
)
job_number = max(int(job["attemptNr"]) - 1, 0) * self.actor_count + self.actor_no + 1
if "--jobNumber=" in cmd:
cmd = re.sub(
r"--jobNumber=[0-9]+", f"--jobNumber={job_number}", cmd
)
cmd = re.sub(r"--jobNumber=[0-9]+", f"--jobNumber={job_number}", cmd)
else:
cmd = f"{cmd} --jobNumber={job_number} "

Expand Down Expand Up @@ -281,40 +247,22 @@ def stagein(self) -> None:
"""
self.payload_job_dir = os.path.join(self.workdir, self.job["PandaID"])
if not os.path.isdir(self.payload_job_dir):
self._logger.warning(
f"Specified path {self.payload_job_dir} does not exist. Using cwd {os.getcwd()}"
)
self._logger.warning(f"Specified path {self.payload_job_dir} does not exist. Using cwd {os.getcwd()}")
self.payload_job_dir = self.workdir

subdir = f"{self.id}"
self.payload_actor_process_dir = os.path.join(
self.payload_job_dir, subdir
)
self.payload_actor_output_dir = os.path.join(
self.payload_job_dir, subdir, "esOutput"
)
self.actor_ray_logs_dir = os.path.join(
self.payload_actor_process_dir, "ray_logs"
)
self.payload_actor_process_dir = os.path.join(self.payload_job_dir, subdir)
self.payload_actor_output_dir = os.path.join(self.payload_job_dir, subdir, "esOutput")
self.actor_ray_logs_dir = os.path.join(self.payload_actor_process_dir, "ray_logs")
try:
with open(os.path.join(self.workdir, self.time_monitor_file)) as time_limit_monitor:
start_time = time_limit_monitor.readline().split(":")
self.start_time = (
int(start_time[0]) * 3600
+ int(start_time[1]) * 60
+ int(start_time[2])
)
self.start_time = int(start_time[0]) * 3600 + int(start_time[1]) * 60 + int(start_time[2])
time_limit = time_limit_monitor.readline().split(":")
if len(time_limit) < 3:
time_limit = ["0"] + time_limit
self.time_limit = (
int(time_limit[0]) * 3600
+ int(time_limit[1]) * 60
+ int(time_limit[2])
)
timer_thread = threading.Thread(
name="timer", target=self.check_time, daemon=True
)
self.time_limit = int(time_limit[0]) * 3600 + int(time_limit[1]) * 60 + int(time_limit[2])
timer_thread = threading.Thread(name="timer", target=self.check_time, daemon=True)
timer_thread.start()
except Exception as e:
self._logger.warning(f"Failed to setup timer thread: {e}")
Expand Down Expand Up @@ -348,11 +296,7 @@ def stagein(self) -> None:
except Exception as e:
self._logger.warning(f"Failed to stagein payload: {e}")
raise StageInFailed(self.id) from e
self.transition_state(
ESWorker.READY_FOR_EVENTS
if self.is_event_service_job()
else ESWorker.PROCESSING
)
self.transition_state(ESWorker.READY_FOR_EVENTS if self.is_event_service_job() else ESWorker.PROCESSING)

def stageout(self) -> None:
"""
Expand Down Expand Up @@ -447,9 +391,7 @@ def mark_new_job(self) -> WorkerResponse:
self.transition_state(ESWorker.JOB_REQUESTED)
return self.return_message(Messages.REQUEST_NEW_JOB)

def receive_event_ranges(
self, reply: int, event_ranges: Sequence[EventRange]
) -> WorkerResponse:
def receive_event_ranges(self, reply: int, event_ranges: Sequence[EventRange]) -> WorkerResponse:
"""
Sends event ranges to be processed by the worker. Update the PFN of event ranges to an absolute path if
it is a relative path. If no ranges are provided, the worker will not expect any more ranges in the future and
Expand Down Expand Up @@ -540,9 +482,7 @@ def should_request_ranges(self) -> bool:
self.transition_state(ESWorker.READY_FOR_EVENTS)
return res

def stageout_event_service_files(
self, ranges_update: Mapping[str, str]
) -> Optional[EventRangeUpdate]:
def stageout_event_service_files(self, ranges_update: Mapping[str, str]) -> Optional[EventRangeUpdate]:
"""
Move the HITS files reported by the pilot payload. Files are moved from the Athena work directory to the
worker-specific output directory.
Expand Down Expand Up @@ -576,15 +516,11 @@ def stageout_event_service_files(
try:
os.replace(cfile, dst)
except OSError as e:
self._logger.error(
f"Failed to move file {cfile} to {dst}: errno {e.errno}: {e.strerror}"
)
self._logger.error(f"Failed to move file {cfile} to {dst}: errno {e.errno}: {e.strerror}")
raise StageOutFailed(self.id) from e
range_update[cfile_key] = dst
else:
self._logger.warning(
f"Couldn't stageout file {cfile} as it doesn't exist"
)
self._logger.warning(f"Couldn't stageout file {cfile} as it doesn't exist")
raise StageOutFailed(self.id)
return ranges

Expand All @@ -599,9 +535,7 @@ def get_payload_message(self) -> Optional[WorkerResponse]:
ranges_update = self.payload.fetch_ranges_update()
if ranges_update:
ranges_update = self.stageout_event_service_files(ranges_update)
return self.return_message(
Messages.UPDATE_EVENT_RANGES, ranges_update
)
return self.return_message(Messages.UPDATE_EVENT_RANGES, ranges_update)

job_update = self.payload.fetch_job_update()
if job_update:
Expand Down Expand Up @@ -644,8 +578,7 @@ def get_message(self) -> WorkerResponse:
self.stageout()
return self.return_message(Messages.PROCESS_DONE)
elif self.is_event_service_job() and (
self.state == ESWorker.READY_FOR_EVENTS
or self.should_request_ranges()
self.state == ESWorker.READY_FOR_EVENTS or self.should_request_ranges()
):
req = EventRangeRequest()
req.add_event_request(
Expand All @@ -655,9 +588,7 @@ def get_message(self) -> WorkerResponse:
self.job["jobsetID"],
)
self.transition_state(ESWorker.EVENT_RANGES_REQUESTED)
return self.return_message(
Messages.REQUEST_EVENT_RANGES, req
)
return self.return_message(Messages.REQUEST_EVENT_RANGES, req)
elif self.state == ESWorker.DONE:
return self.return_message(Messages.PROCESS_DONE)
else:
Expand Down
4 changes: 1 addition & 3 deletions src/raythena/actors/payloads/eventservice/esPayload.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ def __init__(self, worker_id: str, config: Config):
super().__init__(worker_id, config)

@abstractmethod
def submit_new_ranges(
self, event_ranges: Optional[Sequence[EventRange]]
) -> None:
def submit_new_ranges(self, event_ranges: Optional[Sequence[EventRange]]) -> None:
"""
Submit a new list of event ranges to the payload. The event ranges should be saved until is can be processed
Expand Down
Loading

0 comments on commit f18b61d

Please sign in to comment.