Skip to content

Commit

Permalink
Merge pull request #79 from erdos-project/ray-osdi-experiment
Browse files Browse the repository at this point in the history
Add visualization tool and alibaba_loader_task_max_pow2_slots flag and more
  • Loading branch information
ruizehung authored Dec 19, 2023
2 parents 0422168 + 3024055 commit 3a47288
Show file tree
Hide file tree
Showing 39 changed files with 6,923 additions and 164 deletions.
6 changes: 3 additions & 3 deletions analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ def analyze_scheduler_runtime(


def analyze_task_placement(
csv_reader,
csv_reader: CSVReader,
scheduler_csv_file,
scheduler_name,
output,
Expand All @@ -503,11 +503,11 @@ def analyze_task_placement(

# Calculate the heights of placed and unplaced tasks.
placed_task_heights = [
scheduler_invocation.placed_tasks
scheduler_invocation.num_placed_tasks
for scheduler_invocation in scheduler_invocations
]
unplaced_task_heights = [
scheduler_invocation.unplaced_tasks
scheduler_invocation.num_unplaced_tasks
for scheduler_invocation in scheduler_invocations
]

Expand Down
88 changes: 63 additions & 25 deletions data/alibaba_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,16 @@ def __init__(
)

self._task_cpu_divisor = int(self._flags.alibaba_loader_task_cpu_divisor)
self._task_duration_multipler = self._flags.alibaba_task_duration_multiplier
self._task_max_pow2_slots = int(
self._flags.alibaba_loader_task_max_pow2_slots
)
else:
self._csv_logger = setup_csv_logging(
name=self.__class__.__name__, log_file=None
)
self._log_dir = os.getcwd()
self._task_cpu_divisor = 25
self._task_duration_multipler = 1
self._task_max_pow2_slots = 0 # default behaviour: use task.cpu from trace

def _construct_release_times(self):
"""Construct the release times of the jobs in the workload.
Expand Down Expand Up @@ -192,6 +194,14 @@ def job_data_generator():

return job_data_generator()

def _sample_normal_distribution_random(self, n, mean, std, min_val=0, max_val=100):
samples = []
while len(samples) < n:
sample = self._rng.normalvariate(mean, std)
if min_val <= sample <= max_val:
samples.append(sample)
return samples

def _convert_job_data_to_job_graph(
self, job_graph_name: str, job_tasks: List[str]
) -> JobGraph:
Expand All @@ -204,38 +214,66 @@ def _convert_job_data_to_job_graph(
# Create the individual Job instances corresponding to each Task.
task_name_to_simulator_job_mapping = {}
for task in job_tasks:
job_resources_1 = Resources(
resource_vector={
# Note: We divide the CPU by some self._task_cpu_divisor instead
# of 100 because this would intorduce more variance into the
# resource/slots usage.
# We used to divide by 100, but the majority of the tasks
# would end up using 1 slot, which is not very interesting and
# makes no chance for DAG_Sched to do effective packing that
# would beat EDF by a significant margin.
Resource(name="Slot_1", _id="any"): int(
math.ceil(task.cpu / self._task_cpu_divisor)
),
}
)
if self._task_max_pow2_slots == 0:
# This code will use the cpu requirements from the alibaba trace and adjust slots
job_resources_1 = Resources(
resource_vector={
# Note: We divide the CPU by some self._task_cpu_divisor instead
# of 100 because this would intorduce more variance into the
# resource/slots usage.
# We used to divide by 100, but the majority of the tasks
# would end up using 1 slot, which is not very interesting and
# makes no chance for DAG_Sched to do effective packing that
# would beat EDF by a significant margin.
Resource(name="Slot_1", _id="any"): int(
math.ceil(task.cpu / self._task_cpu_divisor)
),
}
)

job_resources_2 = Resources(
resource_vector={
Resource(name="Slot_2", _id="any"): int(
math.ceil(task.cpu / self._task_cpu_divisor)
),
}
)
job_resources_2 = Resources(
resource_vector={
Resource(name="Slot_2", _id="any"): int(
math.ceil(task.cpu / self._task_cpu_divisor)
),
}
)
else:
# This code will override cpu requirements from the alibaba trace and assign
# random number of slots in powers of 2 upto a limit of self._task_max_pow2_slots
max_pow2_for_slot = math.log2(self._task_max_pow2_slots)
slots_for_task = 2 ** (self._rng.randint(0, max_pow2_for_slot))
job_resources_1 = Resources(
resource_vector={
Resource(name="Slot_1", _id="any"): slots_for_task,
}
)

job_resources_2 = Resources(
resource_vector={
Resource(name="Slot_2", _id="any"): slots_for_task,
}
)

# If we want to try randomizing the duration of the tasks.
# random_task_duration = round(
# self._sample_normal_distribution_random(1, 50, 15)[0]
# )
# Use this if we want middle heavy distribution of task durations
# if i == 0 or i == len(job_tasks) - 1:
# random_task_duration = round(self._sample_normal_distribution_random(1, 10, 5)[0])
# else:
# random_task_duration = round(self._sample_normal_distribution_random(1, 50, 15)[0])

job_name = task.name.split("_")[0]
job_runtime_1 = EventTime(
int(math.ceil(task.duration * self._task_duration_multipler)),
int(task.duration),
EventTime.Unit.US,
)
# This is used when self._heterogeneous is True
# to support another execution strategy where it runs faster.
job_runtime_2 = EventTime(
int(math.ceil(task.duration * self._task_duration_multipler * 0.8)),
int(math.ceil(task.duration * 0.8)),
EventTime.Unit.US,
)

Expand Down
37 changes: 26 additions & 11 deletions data/csv_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, csv_paths: str, _flags: Optional["absl.flags"] = None):
path_readings.append(line)
readings[csv_path] = path_readings

self._simulators = {}
self._simulators: dict[str, Simulator] = {}
self.parse_events(readings)

def parse_events(self, readings: Mapping[str, Sequence[str]]):
Expand All @@ -52,7 +52,7 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]):
tasks: dict[str, Task] = {}
task_graphs: dict[str, TaskGraph] = {}
worker_pools = {}
schedulers = []
schedulers: list[Scheduler] = []
for reading in csv_readings:
try:
# TODO: This
Expand All @@ -77,6 +77,8 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]):
intended_release_time=int(reading[4]),
release_time=int(reading[5]),
deadline=int(reading[6]),
window_to_execute=int(reading[6]) - int(reading[5]),
slowest_execution_time=int(reading[9]),
)
elif reading[1] == "TASK_FINISHED":
# Update the task with the completion event data.
Expand Down Expand Up @@ -126,17 +128,23 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]):
task_graph=reading[5],
timestamp=int(reading[3]),
task_id=reading[4],
intended_release_time=float("inf"),
release_time=float("inf"),
deadline=float("inf"),
intended_release_time=None,
release_time=None,
deadline=None,
window_to_execute=None,
# Checking if len(reading) > 6 is for backward compatibility
slowest_execution_time=int(reading[6])
if len(reading) > 6
else None,
)
tasks[reading[4]].cancelled = True
tasks[reading[4]].cancelled_at = int(reading[0])
task_graphs[reading[5]].cancelled = True
task_graphs[reading[5]].cancelled_at = int(reading[0])
elif reading[1] == "TASK_SKIP" and reading[4] in tasks:
# Update the task with the skip data.
tasks[reading[4]].update_skip(reading)
elif reading[1] == "TASK_SKIP":
if reading[4] in tasks:
# Update the task with the skip data.
tasks[reading[4]].update_skip(reading)
elif reading[1] == "TASK_PREEMPT":
# Update the placement with the preemption time.
tasks[reading[4]].update_preempt(reading)
Expand All @@ -152,16 +160,23 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]):
name=reading[4],
release_time=int(reading[2]),
deadline=int(reading[3]),
num_tasks=int(reading[5]),
)
elif reading[1] == "TASK_GRAPH_FINISHED":
# Add the task to the last scheduler's invocation.
task_graphs[reading[2]].completion_time = int(reading[0])
task_graphs[reading[2]].cancelled = False
task_graphs[reading[2]].slack = (
task_graphs[reading[2]].deadline
- task_graphs[reading[2]].completion_time
)
elif reading[1] == "MISSED_TASK_GRAPH_DEADLINE":
# Add the task to the last scheduler's invocation.
task_graphs[reading[2]].deadline_miss_detected_at = int(
reading[0]
)
elif reading[0] == "input_flag":
continue
else:
print(f"[x] Unknown event type: {reading[1]}")
except Exception as e:
Expand All @@ -184,9 +199,9 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]):
assert simulator.missed_taskgraphs == missed_deadline_task_graphs_count
assert simulator.dropped_taskgraphs == canceled_task_graphs_count

simulator.worker_pools = worker_pools.values()
simulator.worker_pools = list(worker_pools.values())
simulator.tasks = list(
sorted(tasks.values(), key=attrgetter("release_time"))
sorted(tasks.values(), key=lambda x: x.release_time_compare_key)
)
simulator.scheduler_invocations = schedulers
simulator.task_graphs = task_graphs
Expand Down Expand Up @@ -254,7 +269,7 @@ def get_worker_pool_utilizations(self, csv_path: str) -> Sequence[WorkerPoolStat
worker_pool_stats.append(
WorkerPoolStats(
simulator_time=simulator_time,
resource_utilizations=resource_utilizations,
resource_utilizations=dict(resource_utilizations),
)
)
return worker_pool_stats
Expand Down
Loading

0 comments on commit 3a47288

Please sign in to comment.