Skip to content

Commit

Permalink
Merge pull request #74 from erdos-project/dg/ray_osdi_expts
Browse files Browse the repository at this point in the history
Merge dg/ray_osdi_expts into main
  • Loading branch information
ruizehung authored Dec 6, 2023
2 parents a19675c + 329d0e8 commit 2105a50
Show file tree
Hide file tree
Showing 94 changed files with 285,180 additions and 315 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ env

# Ignore build of tetrisched
schedulers/tetrisched/build/*

experiments/*
traces/alibaba-cluster-trace-v2018/job_dag_files_6600_groups/*
20 changes: 20 additions & 0 deletions configs/alibaba_edf_fixed.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
--log_file_name=./alibaba_scheduler_EDF_num_invocation_50.log
--csv_file_name=./alibaba_scheduler_EDF_num_invocation_50.csv
--log_level=debug
--execution_mode=replay
--replay_trace=alibaba
--max_deadline_variance=100
--min_deadline_variance=50
--workload_profile_path=./traces/alibaba-cluster-trace-v2018/alibaba_random_50_dags.pkl
--override_num_invocations=50
--override_arrival_period=10
--randomize_start_time_max=100
--worker_profile_path=profiles/workers/alibaba_cluster.yaml
--scheduler_runtime=0
--scheduler=EDF
# --enforce_deadlines
# --retract_schedules
# --drop_skipped_tasks
# # --release_taskgraphs
# --scheduler_log_times=10
# --scheduler_time_discretization=1
2 changes: 1 addition & 1 deletion configs/alibaba_tetrisched_adaptive_discretization.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
--max_deadline_variance=100
--min_deadline_variance=50
--workload_profile_path=./traces/alibaba-cluster-trace-v2018/alibaba_random_50_dags.pkl
--override_num_invocations=1
--override_num_invocations=50
--override_arrival_period=10
--randomize_start_time_max=100
--worker_profile_path=profiles/workers/alibaba_cluster.yaml
Expand Down
9 changes: 7 additions & 2 deletions configs/alibaba_tetrisched_discrete_1.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,20 @@
--max_deadline_variance=100
--min_deadline_variance=50
--workload_profile_path=./traces/alibaba-cluster-trace-v2018/alibaba_random_50_dags.pkl
--override_num_invocations=1
--override_num_invocations=50
--override_arrival_period=10
--randomize_start_time_max=100
--worker_profile_path=profiles/workers/alibaba_cluster.yaml
--scheduler_runtime=0
--scheduler=TetriSched
# --scheduler=EDF
--enforce_deadlines
--retract_schedules
--drop_skipped_tasks
--release_taskgraphs
--scheduler_log_times=10
--scheduler_time_discretization=1
--scheduler_time_discretization=5

# --override_release_policy=gamma
# --override_poisson_arrival_rate=10
# --override_gamma_coefficient=3
20 changes: 20 additions & 0 deletions configs/alibaba_tetrisched_discrete_1_no_dag_awareness.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
--log_file_name=./alibaba_scheduler_TetriSched_release_policy_fixed_deadline_var_100_scheduler_discretization_1_no_dag_awareness.log
--csv_file_name=./alibaba_scheduler_TetriSched_release_policy_fixed_deadline_var_100_scheduler_discretization_1_no_dag_awareness.csv
--log_level=debug
--execution_mode=replay
--replay_trace=alibaba
--max_deadline_variance=100
--min_deadline_variance=50
--workload_profile_path=./traces/alibaba-cluster-trace-v2018/alibaba_random_50_dags.pkl
--override_num_invocations=50
--override_arrival_period=10
--randomize_start_time_max=100
--worker_profile_path=profiles/workers/alibaba_cluster.yaml
--scheduler_runtime=0
--scheduler=TetriSched
--enforce_deadlines
--retract_schedules
--drop_skipped_tasks
# --release_taskgraphs
--scheduler_log_times=10
--scheduler_time_discretization=1
23 changes: 23 additions & 0 deletions configs/alibaba_tetrisched_dynamic_discretization_1_5.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
--log_file_name=./alibaba_scheduler_TetriSched_release_policy_fixed_deadline_var_100_scheduler_dynamic_discretization_1_5_auto_occupancy_0.7.log
--csv_file_name=./alibaba_scheduler_TetriSched_release_policy_fixed_deadline_var_100_scheduler_dynamic_discretization_1_5_auto_occupancy_0.7.csv
--log_level=debug
--execution_mode=replay
--replay_trace=alibaba
--max_deadline_variance=100
--min_deadline_variance=50
--workload_profile_path=./traces/alibaba-cluster-trace-v2018/alibaba_random_50_dags.pkl
--override_num_invocations=50
--override_arrival_period=10
--randomize_start_time_max=100
--worker_profile_path=profiles/workers/alibaba_cluster.yaml
--scheduler_runtime=0
--scheduler=TetriSched
--enforce_deadlines
--retract_schedules
--drop_skipped_tasks
--release_taskgraphs
--scheduler_log_times=10
--scheduler_time_discretization=1
--scheduler_dynamic_discretization
--scheduler_max_time_discretization=5
--scheduler_max_occupancy_threshold=0.7
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
--log_file_name=./alibaba_scheduler_TetriSched_release_policy_fixed_deadline_var_100_scheduler_dynamic_discretization_1_5_max_occ_1100.log
--csv_file_name=./alibaba_scheduler_TetriSched_release_policy_fixed_deadline_var_100_scheduler_dynamic_discretization_1_5_max_occ_1100.csv
--log_level=debug
--execution_mode=replay
--replay_trace=alibaba
--max_deadline_variance=100
--min_deadline_variance=50
--workload_profile_path=./traces/alibaba-cluster-trace-v2018/alibaba_random_50_dags.pkl
--override_num_invocations=50
--override_arrival_period=10
--randomize_start_time_max=100
--worker_profile_path=profiles/workers/alibaba_cluster.yaml
--scheduler_runtime=0
--scheduler=TetriSched
--enforce_deadlines
--retract_schedules
--drop_skipped_tasks
--release_taskgraphs
--scheduler_log_times=10
--scheduler_time_discretization=1
--scheduler_dynamic_discretization
--scheduler_max_time_discretization=5
--scheduler_max_occupancy_threshold=1100
2 changes: 1 addition & 1 deletion configs/alibaba_trace.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
--execution_mode=replay
--replay_trace=alibaba
--workload_profile_path=./traces/alibaba-cluster-trace-v2018/alibaba_random_50_dags.pkl
--batch_size_job_loading=25
# --batch_size_job_loading=25
--override_num_invocations=1
--override_arrival_period=10
--randomize_start_time_max=50
Expand Down
19 changes: 19 additions & 0 deletions configs/sanity_check_hetero.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
--log_file_name=./alibaba_scheduler_EDF_release_policy_gamma_max_deadline_var_25_dag_aware_0_poisson_arrival_rate_0.04_gamma_coefficient_1.log
--csv_file_name=./alibaba_scheduler_EDF_release_policy_gamma_max_deadline_var_25_dag_aware_0_poisson_arrival_rate_0.04_gamma_coefficient_1.csv
--log_level=info
--execution_mode=replay
--replay_trace=alibaba
--max_deadline_variance=25
--min_deadline_variance=10
--workload_profile_path=./traces/alibaba-cluster-trace-v2018/alibaba_set_0_6600_dags.pkl
--override_num_invocations=50
--randomize_start_time_max=100
--worker_profile_path=profiles/workers/alibaba_cluster_heterogeneous.yaml
--scheduler_runtime=0
--override_release_policy=gamma
--scheduler=EDF
--alibaba_loader_task_cpu_divisor=25
--override_poisson_arrival_rate=0.04
--override_gamma_coefficient=1
--alibaba_enable_heterogeneous_resource_type

17 changes: 17 additions & 0 deletions configs/sanity_check_homo.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
--log_file_name=./alibaba_scheduler_EDF_release_policy_gamma_max_deadline_var_25_dag_aware_0_poisson_arrival_rate_0.04_gamma_coefficient_1.log
--csv_file_name=./alibaba_scheduler_EDF_release_policy_gamma_max_deadline_var_25_dag_aware_0_poisson_arrival_rate_0.04_gamma_coefficient_1.csv
--log_level=info
--execution_mode=replay
--replay_trace=alibaba
--max_deadline_variance=25
--min_deadline_variance=10
--workload_profile_path=./traces/alibaba-cluster-trace-v2018/alibaba_set_0_6600_dags.pkl
--override_num_invocations=50
--randomize_start_time_max=100
--worker_profile_path=profiles/workers/alibaba_cluster.yaml
--scheduler_runtime=0
--override_release_policy=gamma
--scheduler=EDF
--alibaba_loader_task_cpu_divisor=25
--override_poisson_arrival_rate=0.04
--override_gamma_coefficient=1
96 changes: 82 additions & 14 deletions data/alibaba_loader.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import math
import os
import pathlib
Expand All @@ -9,7 +10,7 @@

import absl

from utils import EventTime, setup_logging
from utils import EventTime, setup_csv_logging, setup_logging
from workload import (
ExecutionStrategies,
ExecutionStrategy,
Expand Down Expand Up @@ -40,13 +41,14 @@ def __init__(
path: str,
workload_interval: EventTime,
flags: "absl.flags",
heterogeneous: bool = False,
):
self._path = path
self._flags = flags
self._logger = setup_logging(
name=self.__class__.__name__,
log_dir=flags.log_dir,
log_file=flags.log_file_level,
log_file=flags.log_file_name,
log_level=flags.log_level,
)
self._job_data_generator = self._initialize_job_data_generator()
Expand All @@ -61,6 +63,24 @@ def __init__(
else EventTime(sys.maxsize, EventTime.Unit.US)
)
self._workload = Workload.empty(flags)
self._heterogeneous = heterogeneous

if self._flags:
self._csv_logger = setup_csv_logging(
name=self.__class__.__name__,
log_dir=self._flags.log_dir,
log_file=self._flags.csv_file_name,
)

self._task_cpu_divisor = int(self._flags.alibaba_loader_task_cpu_divisor)
self._task_duration_multipler = self._flags.alibaba_task_duration_multiplier
else:
self._csv_logger = setup_csv_logging(
name=self.__class__.__name__, log_file=None
)
self._log_dir = os.getcwd()
self._task_cpu_divisor = 25
self._task_duration_multipler = 1

def _construct_release_times(self):
"""Construct the release times of the jobs in the workload.
Expand Down Expand Up @@ -117,6 +137,15 @@ def _construct_release_times(self):
start=start_time,
rng_seed=self._rng_seed,
)
elif self._flags.override_release_policy == "fixed_gamma":
release_policy = JobGraph.ReleasePolicy.fixed_gamma(
variable_arrival_rate=self._flags.override_poisson_arrival_rate,
base_arrival_rate=self._flags.override_base_arrival_rate,
num_invocations=self._flags.override_num_invocations,
coefficient=self._flags.override_gamma_coefficient,
start=start_time,
rng_seed=self._rng_seed,
)
else:
raise NotImplementedError(
f"Release policy {self._flags.override_release_policy} not implemented."
Expand Down Expand Up @@ -175,26 +204,62 @@ def _convert_job_data_to_job_graph(
# Create the individual Job instances corresponding to each Task.
task_name_to_simulator_job_mapping = {}
for task in job_tasks:
job_resources = Resources(
job_resources_1 = Resources(
resource_vector={
# Note: We divide the CPU by some self._task_cpu_divisor instead
# of 100 because this would intorduce more variance into the
# resource/slots usage.
# We used to divide by 100, but the majority of the tasks
# would end up using 1 slot, which is not very interesting and
# makes no chance for DAG_Sched to do effective packing that
# would beat EDF by a significant margin.
Resource(name="Slot_1", _id="any"): int(
math.ceil(task.cpu / self._task_cpu_divisor)
),
}
)

job_resources_2 = Resources(
resource_vector={
Resource(name="Slot", _id="any"): int(math.ceil(task.cpu / 100)),
Resource(name="Slot_2", _id="any"): int(
math.ceil(task.cpu / self._task_cpu_divisor)
),
}
)

job_name = task.name.split("_")[0]
job_runtime = EventTime(int(math.ceil(task.duration)), EventTime.Unit.US)
job_runtime_1 = EventTime(
int(math.ceil(task.duration * self._task_duration_multipler)),
EventTime.Unit.US,
)
# This is used when self._heterogeneous is True
# to support another execution strategy where it runs faster.
job_runtime_2 = EventTime(
int(math.ceil(task.duration * self._task_duration_multipler * 0.8)),
EventTime.Unit.US,
)

execution_strategies = [
ExecutionStrategy(
resources=job_resources_1,
batch_size=1,
runtime=job_runtime_1,
),
]
if self._heterogeneous:
execution_strategies.append(
ExecutionStrategy(
resources=job_resources_2,
batch_size=1,
runtime=job_runtime_2,
),
)

task_name_to_simulator_job_mapping[job_name] = Job(
name=job_name,
profile=WorkProfile(
name="SlotPolicyFor{}".format(job_name),
execution_strategies=ExecutionStrategies(
[
ExecutionStrategy(
resources=job_resources,
batch_size=1,
runtime=job_runtime,
)
]
),
execution_strategies=ExecutionStrategies(execution_strategies),
),
)

Expand Down Expand Up @@ -288,5 +353,8 @@ def get_next_workload(self, current_time: EventTime) -> Optional[Workload]:
)
if task_graph is not None:
self._workload.add_task_graph(task_graph)
self._csv_logger.info(
f"{current_time.time},TASK_GRAPH_RELEASE,{task_graph.release_time.time},{task_graph.deadline.time},{task_graph.name},{len(task_graph.get_nodes())}"
)
task_release_index += 1
return self._workload
2 changes: 1 addition & 1 deletion data/csv_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]):
schedulers = []
for reading in csv_readings:
try:
# TODO: This
if reading[1] == "SIMULATOR_START":
simulator = Simulator(
csv_path=csv_path,
start_time=int(reading[0]),
total_tasks=reading[2],
)
elif reading[1] == "UPDATE_WORKLOAD":
simulator.total_tasks += int(reading[2])
Expand Down
Loading

0 comments on commit 2105a50

Please sign in to comment.