Skip to content

Commit

Permalink
Determinism in ReleasePolicy.
Browse files Browse the repository at this point in the history
  • Loading branch information
sukritkalra committed Nov 13, 2023
1 parent 4f17793 commit 348390f
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 20 deletions.
7 changes: 6 additions & 1 deletion data/alibaba_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ def __init__(
self._flags = flags
self._job_data_generator = self._initialize_job_data_generator()
self._job_graphs: Mapping[str, JobGraph] = {}
self._rng = random.Random(flags.random_seed)
self._rng_seed = flags.random_seed
self._rng = random.Random(self._rng_seed)
self._release_times = self._construct_release_times()
self._current_release_pointer = 0
self._workload_update_interval = (
Expand Down Expand Up @@ -80,6 +81,7 @@ def _construct_release_times(self):
self._flags.override_arrival_period, EventTime.Unit.US
),
start=start_time,
rng_seed=self._rng_seed,
)
elif self._flags.override_release_policy == "fixed":
if self._flags.override_arrival_period == 0:
Expand All @@ -92,19 +94,22 @@ def _construct_release_times(self):
),
num_invocations=self._flags.override_num_invocations,
start=start_time,
rng_seed=self._rng_seed,
)
elif self._flags.override_release_policy == "poisson":
release_policy = JobGraph.ReleasePolicy.poisson(
rate=self._flags.override_poisson_arrival_rate,
num_invocations=self._flags.override_num_invocations,
start=start_time,
rng_seed=self._rng_seed,
)
elif self._flags.override_release_policy == "gamma":
release_policy = JobGraph.ReleasePolicy.gamma(
rate=self._flags.override_poisson_arrival_rate,
num_invocations=self._flags.override_num_invocations,
coefficient=self._flags.override_gamma_coefficient,
start=start_time,
rng_seed=self._rng_seed,
)
else:
raise NotImplementedError(
Expand Down
30 changes: 17 additions & 13 deletions simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,10 @@ def event_representation_filter(record):

def dry_run(self) -> None:
"""Displays the order in which the TaskGraphs will be released."""
start_time = EventTime.zero()
while True:
# Get the next Workload from the WorkloadLoader.
next_workload = self._workload_loader.get_next_workload()
next_workload = self._workload_loader.get_next_workload(start_time)
if next_workload is None:
self._logger.info(
f"The WorkloadLoader '{type(self._workload_loader).__name__}' "
Expand All @@ -401,24 +402,27 @@ def dry_run(self) -> None:
# A new Workload has been released, we log the release times of the
# TaskGraphs from this instance of the Workload.
self._workload = next_workload
task_graphs = sorted(
self._workload.task_graphs.values(),
key=lambda task_graph: task_graph.release_time,
task_graphs = list(
sorted(
self._workload.task_graphs.values(),
key=lambda task_graph: task_graph.release_time,
)
)
self._logger.info(
f"The WorkloadLoader '{type(self._workload_loader).__name__}' released "
f"a Workload with {len(task_graphs)} TaskGraphs."
)
start_time = task_graphs[-1].release_time

for task_graph in task_graphs:
self._logger.info(
"[%s] The TaskGraph %s will be released with deadline "
"%s and completion time %s.",
task_graph.release_time.to(EventTime.Unit.US).time,
task_graph.name,
task_graph.deadline,
task_graph.job_graph.completion_time,
)
for task_graph in task_graphs:
self._logger.info(
"[%s] The TaskGraph %s will be released with deadline "
"%s and completion time %s.",
task_graph.release_time.to(EventTime.Unit.US).time,
task_graph.name,
task_graph.deadline,
task_graph.job_graph.completion_time,
)

def simulate(self) -> None:
"""Run the simulator loop.
Expand Down
49 changes: 43 additions & 6 deletions workload/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,15 @@ def get_release_times(self, completion_time: EventTime) -> List[EventTime]:
)
)
elif self._policy_type == JobGraph.ReleasePolicyType.POISSON:
# TODO (Sukrit): Create a numpy version of this.
inter_arrival_times = self._rng.poisson(
self._arrival_rate, self._fixed_invocation_nums - 1
)
current_release = self._start
for _ in range(self._fixed_invocation_nums):
inter_arrival_time = int(
self._rng.exponential(1 / self._arrival_rate)
releases.append(current_release)
for inter_arrival_time in inter_arrival_times:
current_release += EventTime(
int(inter_arrival_time), EventTime.Unit.US
)
current_release += EventTime(inter_arrival_time, EventTime.Unit.US)
releases.append(current_release)
elif self._policy_type == JobGraph.ReleasePolicyType.GAMMA:
inter_arrival_times = np.clip(
Expand All @@ -293,6 +295,7 @@ def get_release_times(self, completion_time: EventTime) -> List[EventTime]:
a_max=None,
)
current_release = self._start
releases.append(current_release)
for inter_arrival_time in inter_arrival_times:
current_release += EventTime(
int(inter_arrival_time), EventTime.Unit.US
Expand All @@ -314,7 +317,9 @@ def get_release_times(self, completion_time: EventTime) -> List[EventTime]:

@staticmethod
def periodic(
period: EventTime, start: EventTime = EventTime.zero()
period: EventTime,
start: EventTime = EventTime.zero(),
rng_seed: Optional[int] = None,
) -> "ReleasePolicy": # noqa: F821
"""Creates the parameters corresponding to the `PERIODIC` release policy.
Expand All @@ -323,6 +328,8 @@ def periodic(
invocations of the `TaskGraph`.
start (`EventTime`): The time at which the periodic release of the
`TaskGraph`s should begin.
rng_seed (`Optional[int]`): The seed to use for the random number
generation.
Returns:
A `ReleasePolicy` instance with the required parameters.
Expand All @@ -335,13 +342,15 @@ def periodic(
coefficient=-1.0,
concurrency=0,
start=start,
rng_seed=rng_seed,
)

@staticmethod
def fixed(
period: EventTime,
num_invocations: int,
start: EventTime = EventTime.zero(),
rng_seed: Optional[int] = None,
) -> "ReleasePolicy": # noqa: F821
"""Creates the parameters corresponding to the `FIXED` release policy.
Expand All @@ -351,6 +360,8 @@ def fixed(
num_invocations (`int`): The number of invocations of the `TaskGraph`.
start (`EventTime`): The time at which the periodic release of the
`TaskGraph`s should begin.
rng_seed (`Optional[int]`): The seed to use for the random number
generation.
Returns:
A `ReleasePolicy` instance with the required parameters.
Expand All @@ -363,13 +374,15 @@ def fixed(
coefficient=-1.0,
concurrency=0,
start=start,
rng_seed=rng_seed,
)

@staticmethod
def poisson(
rate: float,
num_invocations: int,
start: EventTime = EventTime.zero(),
rng_seed: Optional[int] = None,
) -> "ReleasePolicy": # noqa: F821
"""Creates the parameters corresponding to the `POISSON` release policy.
Expand All @@ -379,6 +392,8 @@ def poisson(
num_invocations (`int`): The number of invocations of the `TaskGraph`.
start (`EventTime`): The time at which the poisson arrival of the
`TaskGraph`s should begin.
rng_seed (`Optional[int]`): The seed to use for the random number
generation.
Returns:
A `ReleasePolicy` instance with the required parameters.
Expand All @@ -391,6 +406,7 @@ def poisson(
coefficient=-1.0,
concurrency=0,
start=start,
rng_seed=rng_seed,
)

@staticmethod
Expand All @@ -399,7 +415,24 @@ def gamma(
coefficient: float,
num_invocations: int,
start: EventTime = EventTime.zero(),
rng_seed: Optional[int] = None,
) -> "ReleasePolicy": # noqa: F821
"""Creates the parameters corresponding to the `GAMMA` release policy.
Args:
rate (`float`): The lambda (rate) parameter defining the Gamma
arrival distribution.
coefficient (`float`): The coefficient parameter defining the Gamma
arrival distribution.
num_invocations (`int`): The number of invocations of the `TaskGraph`.
start (`EventTime`): The time at which the poisson arrival of the
`TaskGraph`s should begin.
rng_seed (`Optional[int]`): The seed to use for the random number
generation.
Returns:
A `ReleasePolicy` instance with the required parameters.
"""
return JobGraph.ReleasePolicy(
policy_type=JobGraph.ReleasePolicyType.GAMMA,
period=EventTime.invalid(),
Expand All @@ -408,13 +441,15 @@ def gamma(
coefficient=coefficient,
concurrency=0,
start=start,
rng_seed=rng_seed,
)

@staticmethod
def closed_loop(
concurrency: int,
num_invocations: int,
start: EventTime = EventTime.zero(),
rng_seed: Optional[int] = None,
) -> "ReleasePolicy": # noqa: F821
"""Creates the parameters corresponding to the `CLOSED_LOOP` release policy.
Expand All @@ -424,6 +459,8 @@ def closed_loop(
`TaskGraph`.
start (`EventTime`): The time at which the closed loop execution of the
`TaskGraph`s should begin.
rng_seed (`Optional[int]`): The seed to use for the random number
generation.
Returns:
A `ReleasePolicy` instance with the required parameters.
Expand Down

0 comments on commit 348390f

Please sign in to comment.