Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor workers #75

Merged
merged 1 commit into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions jellybench_py/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class CommandConfig:
class Constants:
DEFAULT_OUTPUT_JSON: str = "./output.json"
DEFAULT_SERVER_URL: str = "https://hwa.jellyfin.org"
DEFAULT_TIMEOUT: int = 120
NVENC_TEST_WINDOWS = CommandConfig(
BASE_CMD="{ffmpeg} -y -hwaccel cuda -hwaccel_output_format cuda -t 50 -hwaccel_device {gpu} -f lavfi -i testsrc ",
WORKER_CMD="-vf hwupload -fps_mode passthrough -c:a copy -c:v h264_nvenc -b:v {bitrate} -f null -",
Expand Down
68 changes: 50 additions & 18 deletions jellybench_py/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
#
##########################################################################################

import concurrent.futures
import re
import shlex
import subprocess
import subprocess as sp
import time

from jellybench_py import ffmpeg_log
from jellybench_py.constant import Constants


def run_ffmpeg(pid: int, ffmpeg_cmd: list) -> tuple: # Process ID,
Expand Down Expand Up @@ -78,23 +79,53 @@ def run_ffmpeg(pid: int, ffmpeg_cmd: list) -> tuple: # Process ID,


def workMan(worker_count: int, ffmpeg_cmd: str) -> tuple:
print()
print()
print("Starting a run")
print(f"HERE IS FFMPEG CMD: {type(ffmpeg_cmd)} {ffmpeg_cmd}")
print(f"Here is worker count: {worker_count}")

ffmpeg_cmd_list = shlex.split(ffmpeg_cmd)
raw_worker_data = {}
failure_reason = None
# print(f"> Run with {worker_count} Processes")
with concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) as executor:
futures = {
executor.submit(run_ffmpeg, nr, ffmpeg_cmd_list): nr
for nr in range(worker_count)
}
for future in concurrent.futures.as_completed(futures):
pid = futures[future]
raw_worker_data[pid] = future.result()
# print(f"> > > Finished Worker Process: {pid}")
if raw_worker_data[pid][1]:
failure_reason = raw_worker_data[pid][1]

if failure_reason:
procs, results = {}, {}

# Start processes
for i in range(worker_count):
procs[i] = sp.Popen(
ffmpeg_cmd_list,
stdin=sp.PIPE,
stdout=sp.PIPE,
stderr=sp.PIPE,
text=True
)
then = time.time()
keep_waiting = True
while keep_waiting is True and failure_reason is None:
now = time.time()
if now - then >= Constants.DEFAULT_TIMEOUT:
failure_reason = 'failed_timeout'
else:
keep_waiting = False
for idx, copy_of_proc in list(procs.items()): # iterate over a copy to mutate original
if copy_of_proc.poll() is not None:
stdout, stderr = copy_of_proc.communicate()
results[idx] = stdout.strip()
if copy_of_proc.returncode == 0:
print(f"Process {idx} finished with output:\n{stdout}")
else:
print(f"Process {idx} failed with return code {copy_of_proc.returncode}")
failure_reason = f"Worker {idx} failed with return code {copy_of_proc.returncode}"
break
del procs[idx]
else:
keep_waiting = True
time.sleep(1)

if failure_reason is not None:
for _, proc in procs.items():
proc.kill()
print("There was a failure so I killed all the procs")
raw_worker_data = None
# Deleting all the Raw Data, since run with failed Worker is not counted

Expand Down Expand Up @@ -148,9 +179,9 @@ def workMan(worker_count: int, ffmpeg_cmd: str) -> tuple:
}

run_data_raw.append(worker_data)
return False, evaluateRunData(run_data_raw)
return False, evaluateRunData(run_data_raw), None
else:
return True, failure_reason
return True, None, failure_reason


def evaluateRunData(run_data_raw: list) -> dict:
Expand Down Expand Up @@ -187,6 +218,7 @@ def evaluateRunData(run_data_raw: list) -> dict:
def test_command(ffmpeg_cmd):
ffmpeg_cmd_list = shlex.split(ffmpeg_cmd)
successful_stream_count = 0
print(f"HERE IS YA COMMAND: {type(ffmpeg_cmd)}:{ffmpeg_cmd}")
raw_worker_data = run_ffmpeg(1, ffmpeg_cmd_list)

failure_reason = raw_worker_data[1]
Expand Down
Loading