Skip to content

Commit

Permalink
initialized variables
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryan Ward committed Jun 5, 2024
1 parent 33521c1 commit 8c45643
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions distillreads.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def dispatch(dispatch_queues, sorter_queue, read_done_events, dispatch_done_even
console.log(f"{dispatch_name} failed to finish.", style="bold red")


def sorter(sorter_queue, merger_queue, read_done_events, sort_done_event, sorter_lock):
def sorter(sorter_queue, merger_queue, read_done_events, dispatch_done_event, sort_done_event, sorter_lock):
sorter_name = f"[bold white]Sorter {mp.current_process().name}[/bold white]"

while True:
Expand Down Expand Up @@ -193,6 +193,7 @@ def merger(
send_ends,
sort_done_events,
merger_done_event,
read_done_events,
output_filenames,
writer_sems,
):
Expand Down Expand Up @@ -240,7 +241,6 @@ def merge_from_heap(heap, merger_name):

for event in read_done_events:
event.wait()
dispatch_done_event.wait()
for event in sort_done_events:
event.wait()

Expand Down Expand Up @@ -293,7 +293,7 @@ def merge_from_heap(heap, merger_name):
console.log(f"{merger_name} finished.", style="red")


def writer_process(recv_end, output_filename, writer_sem):
def writer_process(recv_end, output_filename, writer_sem, merger_done_event):
writer_name = f"[bold cyan]Writer {mp.current_process().name}[/bold cyan]"

with pyzstd.open(output_filename, "wb") as f:
Expand Down Expand Up @@ -384,6 +384,7 @@ def get_output_filename(filename):
sorter_queue,
merger_queue,
read_done_events,
dispatch_done_event,
sort_done_event,
sorter_lock,
),
Expand All @@ -397,13 +398,14 @@ def get_output_filename(filename):
merger_send_ends,
sort_done_events,
merger_done_event,
read_done_events,
output_filenames,
writer_sems,
),
)
writers = [
mp.Process(
target=writer_process, args=(recv_end, output_filename, writer_sem)
target=writer_process, args=(recv_end, output_filename, writer_sem, merger_done_event)
)
for recv_end, output_filename, writer_sem in zip(
writer_recv_ends, output_filenames, writer_sems
Expand Down

0 comments on commit 8c45643

Please sign in to comment.