Skip to content

Commit

Permalink
fix line size
Browse files Browse the repository at this point in the history
  • Loading branch information
esseivaju committed Oct 4, 2024
1 parent 963c57d commit 77ff2f5
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 72 deletions.
6 changes: 0 additions & 6 deletions .flake8

This file was deleted.

10 changes: 5 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ dependencies = [

[tool.ruff]

line-length = 80
line-length = 120
indent-width = 4

[tool.ruff.lint]
Expand All @@ -65,10 +65,10 @@ select = [
"I"
]

ignore = [
# pycodestyle
"E501",
]
# ignore = [
# # pycodestyle
# "E501",
# ]

[tool.ruff.lint.isort]
no-lines-before = ["third-party", "first-party", "standard-library"]
6 changes: 4 additions & 2 deletions src/raythena/actors/esworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ def modify_job(self, job: PandaJob) -> PandaJob:
f"--inputEVNTFile={in_files} -",
cmd,
)
# convert args of the form --outputHITSFile=HITS.30737678._[011001,...].pool.root to --outputHITSFile=HITS.30737678._011001.pool.root
# convert args of the form --outputHITSFile=HITS.30737678._[011001,...].pool.root
# to --outputHITSFile=HITS.30737678._011001.pool.root
match = re.findall(
r"--outputHITSFile=([0-9A-Z._]+)\[([0-9,]+)\](.pool.root)", cmd
)
Expand Down Expand Up @@ -275,7 +276,8 @@ def stagein(self) -> None:
Postconditions:
- The worker is in the READY_FOR_EVENTS state.
Raises:
StageInFailed: If creating / moving to the work directory fails or the call to the payload stage-in raises an exception.
StageInFailed: If creating / moving to the work directory fails or the call to the payload
stage-in raises an exception.
"""
self.payload_job_dir = os.path.join(self.workdir, self.job["PandaID"])
if not os.path.isdir(self.payload_job_dir):
Expand Down
11 changes: 7 additions & 4 deletions src/raythena/actors/payloads/eventservice/pilothttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,15 @@ def _build_pilot_command(self) -> str:
raise FailedPayload(self.worker_id)

queue_escaped = shlex.quote(self.config.payload["pandaqueue"])
cmd += f"{shlex.quote(pilotwrapper_bin)} --localpy --piloturl local -q {queue_escaped} -r {queue_escaped} -s {queue_escaped} "
cmd += (f"{shlex.quote(pilotwrapper_bin)} --localpy --piloturl local "
f"-q {queue_escaped} -r {queue_escaped} -s {queue_escaped} "
)

cmd += "--pilotversion 3 --pythonversion 3 "

cmd += (
f"-i PR -j {prod_source_label} --container --mute --pilot-user=atlas -t -u --es-executor-type=raythena -v 1 "
f"-i PR -j {prod_source_label} --container --mute --pilot-user=atlas -t -u "
f"--es-executor-type=raythena -v 1 "
f"-d --cleanup=False -w generic --use-https False --allow-same-user=False --resource-type MCORE "
f"--hpc-resource {shlex.quote(self.config.payload['hpcresource'])};"
)
Expand Down Expand Up @@ -377,8 +380,8 @@ def fetch_ranges_update(self) -> Optional[Mapping[str, str]]:
def should_request_more_ranges(self) -> bool:
"""
Checks if the payload is ready to receive more event ranges. If false is returned, then the payload is
not expecting to have more ranges assigned to it by calling submit_new_ranges. If this method ever returns false,
then any future to it will return false as well.
not expecting to have more ranges assigned to it by calling submit_new_ranges.
If this method ever returns false, then any future to it will return false as well.
Event ranges submitted after this method returns false will be ignored and never sent to the pilot process.
Returns:
Expand Down
64 changes: 38 additions & 26 deletions src/raythena/drivers/esdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@
class ESDriver(BaseDriver):
"""
The driver is managing all the ray workers and handling the communication with Harvester. It keeps tracks of
which event ranges is assigned to which actor using a BookKeeper instance which provides the interface to read and update the status of each event range.
which event ranges is assigned to which actor using a BookKeeper instance which provides the interface to read
and update the status of each event range.
It will also send requests for jobs, event ranges or update of produced output to harvester by using a communicator instance.
The communicator uses the shared file system to communicate with Harvester and does I/O in a separate thread,
communication between the driver and the communicator is done by message passing using a queue.
It will also send requests for jobs, event ranges or update of produced output to harvester by
using a communicator instance. The communicator uses the shared file system to communicate with
Harvester and does I/O in a separate thread, communication between the driver and the communicator
is done by message passing using a queue.
The driver is starting one actor per node in the ray cluster except for the ray head node which doesn't execute
any worker
Expand Down Expand Up @@ -217,7 +219,8 @@ def __getitem__(self, key: str) -> ESWorker:

def start_actors(self) -> None:
"""
Initialize actor communication by performing the first call to get_message() and add the future to the future list.
Initialize actor communication by performing the first call to get_message() and
add the future to the future list.
Returns:
None
Expand All @@ -227,8 +230,8 @@ def start_actors(self) -> None:

def create_actors(self) -> None:
"""
Create actors on each node. Before creating an actor, the driver tries to assign it a job and an initial batch of
event ranges. This avoid having all actors requesting jobs and event ranges at the start of the job.
Create actors on each node. Before creating an actor, the driver tries to assign it a job and an initial
batch of event ranges. This avoid having all actors requesting jobs and event ranges at the start of the job.
Returns:
None
Expand Down Expand Up @@ -270,7 +273,8 @@ def retrieve_actors_messages(
) -> Iterator[WorkerResponse]:
"""
Given a list of ready futures from actors, unwrap them and return an interable over the result of each future.
In case one of the futures raised an exception, the exception is handled by this function and not propagated to the caller.
In case one of the futures raised an exception, the exception is handled by this function and
not propagated to the caller.
Args:
ready: a list of read futures
Expand All @@ -281,7 +285,8 @@ def retrieve_actors_messages(
try:
messages = ray.get(ready)
except Exception:
# if any of the future raised an exception, we need to handle them one by one to know which one produced the exception.
# if any of the future raised an exception, we need to handle them one by one
# to know which one produced the exception.
for r in ready:
try:
actor_id, message, data = ray.get(r)
Expand All @@ -305,7 +310,8 @@ def enqueue_actor_call(self, actor_id: str, future: ObjectRef):

def handle_actors(self) -> None:
"""
Main function handling messages from all ray actors and dispatching to the appropriate handling function according to the message returned by the actor,
Main function handling messages from all ray actors and dispatching to the appropriate handling
function according to the message returned by the actor,
Returns:
None
Expand Down Expand Up @@ -343,16 +349,14 @@ def wait_on_messages(self) -> tuple[list[ObjectRef], list[ObjectRef]]:
"""
Wait on part of the pending futures to complete. Wait for 1 second trying to fetch half of the pending futures.
If no futures are ready, then wait another second to fetch a tenth of the pending futures.
If there are still no futures ready, then wait for the timeout interval or until one future is ready. If this is the beginning of the job, i.e. no
events have finished processing yet, then wait forever until one future is ready instead of only timeout interval.
If there are still no futures ready, then wait for the timeout interval or until one future is ready.
If this is the beginning of the job, i.e. no events have finished processing yet, then wait forever until
one future is ready instead of only timeout interval.
Returns:
tuple of a list of completed futures and a list of pending futures, respectively
"""
if self.bookKeeper.have_finished_events():
timeoutinterval = self.timeoutinterval
else:
timeoutinterval = None
timeoutinterval = self.timeoutinterval if self.bookKeeper.have_finished_events() else None

messages, queue = ray.wait(
self.actors_message_queue,
Expand Down Expand Up @@ -435,9 +439,10 @@ def handle_request_event_ranges(
the number of events returned in a single request is capped to the number of local events
divided by the number of actors. This cap is updated every time new events are retrieved from Harvester.
If the driver doesn't have enough events to send to the actor, then it will initiate or wait on a pending event request to Harvester to get more events.
It will only return less events than the request number (or cap) if Harvester returns no events.
Requests to Harvester are skipped if it was flagged as not having any events left for the current actor's job.
If the driver doesn't have enough events to send to the actor, then it will initiate or wait on a pending
event request to Harvester to get more events. It will only return less events than the request number (or cap)
if Harvester returns no events. Requests to Harvester are skipped if it was flagged as not having any events
left for the current actor's job.
Args:
actor_id: worker sending the event ranges update
Expand Down Expand Up @@ -624,7 +629,8 @@ def run(self) -> None:
Method used to start the driver, initializing actors, retrieving initial job and event ranges,
creates job subdir then handle actors until they are all done or stop() has been called
This function will also create a directory in config.ray.workdir for the retrieved job
with the directory name being the PandaID. Workers will then each create their own subdirectory in that job directory.
with the directory name being the PandaID. Workers will then each create their own
subdirectory in that job directory.
Returns:
None
Expand Down Expand Up @@ -841,8 +847,8 @@ def stop(self) -> None:
def handle_actor_exception(self, actor_id: str, ex: Exception) -> None:
"""
Handle exception that occurred in an actor process. Log the exception and count the number of exceptions
that were produced by the same actor. If the number of exceptions is greater than the threshold, the driver will simply drop the actor
by no longer calling remote functions on it.
that were produced by the same actor. If the number of exceptions is greater than the threshold,
the driver will simply drop the actor by no longer calling remote functions on it.
Args:
actor_id: the actor that raised the exception
Expand Down Expand Up @@ -890,10 +896,12 @@ def get_output_file_guid(self, job_report_file) -> Optional[str]:
def handle_merge_transforms(self, wait_for_completion=False) -> bool:
"""
Checks if the bookkeeper has files ready to be merged. If so, subprocesses for merge tasks are started.
After starting any subprocess, go through all the running subprocess and poll then to check if any completed and report status to the bookkeepr.
After starting any subprocess, go through all the running subprocess and poll then to check
if any completed and report status to the bookkeepr.
Args:
wait_for_completion: Wait for all the subprocesses (including those started by this call) to finish before returning
wait_for_completion: Wait for all the subprocesses
(including those started by this call) to finish before returning
Returns:
True if new merge jobs were created
Expand Down Expand Up @@ -1020,7 +1028,10 @@ def hits_merge_transform(
if self.config.payload["containerextrasetup"].strip().endswith(";")
else ";"
)
container_script = f"{self.config.payload['containerextrasetup']}{endtoken}{self.merge_transform} {transform_params}"
container_script = (
f"{self.config.payload['containerextrasetup']}{endtoken}"
f"{self.merge_transform} {transform_params}"
)
merge_script_path = os.path.join(tmp_dir, "merge_transform.sh")
with open(merge_script_path, "w") as f:
f.write(container_script)
Expand Down Expand Up @@ -1062,7 +1073,8 @@ def hits_merge_transform(
)
cmd += (
f"{self.config.payload['containerextraargs']}{endtoken}"
f"source ${{ATLAS_LOCAL_ROOT_BASE}}/user/atlasLocalSetup.sh --swtype {self.config.payload['containerengine']}"
f"source ${{ATLAS_LOCAL_ROOT_BASE}}/user/atlasLocalSetup.sh"
f" --swtype {self.config.payload['containerengine']}"
f" -c $thePlatform -s /srv/release_setup.sh -r /srv/merge_transform.sh -e \"{self.container_options}\";"
f"RETURN_VAL=$?;if [ \"$RETURN_VAL\" -eq 0 ]; then cp jobReport.json {job_report_name};fi;exit $RETURN_VAL;"
)
Expand Down
Loading

0 comments on commit 77ff2f5

Please sign in to comment.