diff --git a/data/alibaba_loader.py b/data/alibaba_loader.py index 6f0dcfdd..017ff8ef 100644 --- a/data/alibaba_loader.py +++ b/data/alibaba_loader.py @@ -45,7 +45,8 @@ def __init__( self._flags = flags self._job_data_generator = self._initialize_job_data_generator() self._job_graphs: Mapping[str, JobGraph] = {} - self._rng = random.Random(flags.random_seed) + self._rng_seed = flags.random_seed + self._rng = random.Random(self._rng_seed) self._release_times = self._construct_release_times() self._current_release_pointer = 0 self._workload_update_interval = ( @@ -80,6 +81,7 @@ def _construct_release_times(self): self._flags.override_arrival_period, EventTime.Unit.US ), start=start_time, + rng_seed=self._rng_seed, ) elif self._flags.override_release_policy == "fixed": if self._flags.override_arrival_period == 0: @@ -92,12 +94,14 @@ def _construct_release_times(self): ), num_invocations=self._flags.override_num_invocations, start=start_time, + rng_seed=self._rng_seed, ) elif self._flags.override_release_policy == "poisson": release_policy = JobGraph.ReleasePolicy.poisson( rate=self._flags.override_poisson_arrival_rate, num_invocations=self._flags.override_num_invocations, start=start_time, + rng_seed=self._rng_seed, ) elif self._flags.override_release_policy == "gamma": release_policy = JobGraph.ReleasePolicy.gamma( @@ -105,6 +109,7 @@ def _construct_release_times(self): num_invocations=self._flags.override_num_invocations, coefficient=self._flags.override_gamma_coefficient, start=start_time, + rng_seed=self._rng_seed, ) else: raise NotImplementedError( diff --git a/simulator.py b/simulator.py index cbe264db..df66f5fd 100644 --- a/simulator.py +++ b/simulator.py @@ -388,9 +388,10 @@ def event_representation_filter(record): def dry_run(self) -> None: """Displays the order in which the TaskGraphs will be released.""" + start_time = EventTime.zero() while True: # Get the next Workload from the WorkloadLoader. - next_workload = self._workload_loader.get_next_workload() + next_workload = self._workload_loader.get_next_workload(start_time) if next_workload is None: self._logger.info( f"The WorkloadLoader '{type(self._workload_loader).__name__}' " @@ -401,24 +402,27 @@ def dry_run(self) -> None: # A new Workload has been released, we log the release times of the # TaskGraphs from this instance of the Workload. self._workload = next_workload - task_graphs = sorted( - self._workload.task_graphs.values(), - key=lambda task_graph: task_graph.release_time, + task_graphs = list( + sorted( + self._workload.task_graphs.values(), + key=lambda task_graph: task_graph.release_time, + ) ) self._logger.info( f"The WorkloadLoader '{type(self._workload_loader).__name__}' released " f"a Workload with {len(task_graphs)} TaskGraphs." ) + start_time = task_graphs[-1].release_time - for task_graph in task_graphs: - self._logger.info( - "[%s] The TaskGraph %s will be released with deadline " - "%s and completion time %s.", - task_graph.release_time.to(EventTime.Unit.US).time, - task_graph.name, - task_graph.deadline, - task_graph.job_graph.completion_time, - ) + for task_graph in task_graphs: + self._logger.info( + "[%s] The TaskGraph %s will be released with deadline " + "%s and completion time %s.", + task_graph.release_time.to(EventTime.Unit.US).time, + task_graph.name, + task_graph.deadline, + task_graph.job_graph.completion_time, + ) def simulate(self) -> None: """Run the simulator loop. diff --git a/workload/jobs.py b/workload/jobs.py index 9a767a4d..2f6ac1fc 100644 --- a/workload/jobs.py +++ b/workload/jobs.py @@ -274,13 +274,15 @@ def get_release_times(self, completion_time: EventTime) -> List[EventTime]: ) ) elif self._policy_type == JobGraph.ReleasePolicyType.POISSON: - # TODO (Sukrit): Create a numpy version of this. + inter_arrival_times = self._rng.poisson( + self._arrival_rate, self._fixed_invocation_nums - 1 + ) current_release = self._start - for _ in range(self._fixed_invocation_nums): - inter_arrival_time = int( - self._rng.exponential(1 / self._arrival_rate) + releases.append(current_release) + for inter_arrival_time in inter_arrival_times: + current_release += EventTime( + int(inter_arrival_time), EventTime.Unit.US ) - current_release += EventTime(inter_arrival_time, EventTime.Unit.US) releases.append(current_release) elif self._policy_type == JobGraph.ReleasePolicyType.GAMMA: inter_arrival_times = np.clip( @@ -293,6 +295,7 @@ def get_release_times(self, completion_time: EventTime) -> List[EventTime]: a_max=None, ) current_release = self._start + releases.append(current_release) for inter_arrival_time in inter_arrival_times: current_release += EventTime( int(inter_arrival_time), EventTime.Unit.US @@ -314,7 +317,9 @@ def get_release_times(self, completion_time: EventTime) -> List[EventTime]: @staticmethod def periodic( - period: EventTime, start: EventTime = EventTime.zero() + period: EventTime, + start: EventTime = EventTime.zero(), + rng_seed: Optional[int] = None, ) -> "ReleasePolicy": # noqa: F821 """Creates the parameters corresponding to the `PERIODIC` release policy. @@ -323,6 +328,8 @@ def periodic( invocations of the `TaskGraph`. start (`EventTime`): The time at which the periodic release of the `TaskGraph`s should begin. + rng_seed (`Optional[int]`): The seed to use for the random number + generation. Returns: A `ReleasePolicy` instance with the required parameters. @@ -335,6 +342,7 @@ def periodic( coefficient=-1.0, concurrency=0, start=start, + rng_seed=rng_seed, ) @staticmethod @@ -342,6 +350,7 @@ def fixed( period: EventTime, num_invocations: int, start: EventTime = EventTime.zero(), + rng_seed: Optional[int] = None, ) -> "ReleasePolicy": # noqa: F821 """Creates the parameters corresponding to the `FIXED` release policy. @@ -351,6 +360,8 @@ def fixed( num_invocations (`int`): The number of invocations of the `TaskGraph`. start (`EventTime`): The time at which the periodic release of the `TaskGraph`s should begin. + rng_seed (`Optional[int]`): The seed to use for the random number + generation. Returns: A `ReleasePolicy` instance with the required parameters. @@ -363,6 +374,7 @@ def fixed( coefficient=-1.0, concurrency=0, start=start, + rng_seed=rng_seed, ) @staticmethod @@ -370,6 +382,7 @@ def poisson( rate: float, num_invocations: int, start: EventTime = EventTime.zero(), + rng_seed: Optional[int] = None, ) -> "ReleasePolicy": # noqa: F821 """Creates the parameters corresponding to the `POISSON` release policy. @@ -379,6 +392,8 @@ def poisson( num_invocations (`int`): The number of invocations of the `TaskGraph`. start (`EventTime`): The time at which the poisson arrival of the `TaskGraph`s should begin. + rng_seed (`Optional[int]`): The seed to use for the random number + generation. Returns: A `ReleasePolicy` instance with the required parameters. @@ -391,6 +406,7 @@ def poisson( coefficient=-1.0, concurrency=0, start=start, + rng_seed=rng_seed, ) @staticmethod @@ -399,7 +415,24 @@ def gamma( coefficient: float, num_invocations: int, start: EventTime = EventTime.zero(), + rng_seed: Optional[int] = None, ) -> "ReleasePolicy": # noqa: F821 + """Creates the parameters corresponding to the `GAMMA` release policy. + + Args: + rate (`float`): The lambda (rate) parameter defining the Gamma + arrival distribution. + coefficient (`float`): The coefficient parameter defining the Gamma + arrival distribution. + num_invocations (`int`): The number of invocations of the `TaskGraph`. + start (`EventTime`): The time at which the poisson arrival of the + `TaskGraph`s should begin. + rng_seed (`Optional[int]`): The seed to use for the random number + generation. + + Returns: + A `ReleasePolicy` instance with the required parameters. + """ return JobGraph.ReleasePolicy( policy_type=JobGraph.ReleasePolicyType.GAMMA, period=EventTime.invalid(), @@ -408,6 +441,7 @@ def gamma( coefficient=coefficient, concurrency=0, start=start, + rng_seed=rng_seed, ) @staticmethod @@ -415,6 +449,7 @@ def closed_loop( concurrency: int, num_invocations: int, start: EventTime = EventTime.zero(), + rng_seed: Optional[int] = None, ) -> "ReleasePolicy": # noqa: F821 """Creates the parameters corresponding to the `CLOSED_LOOP` release policy. @@ -424,6 +459,8 @@ def closed_loop( `TaskGraph`. start (`EventTime`): The time at which the closed loop execution of the `TaskGraph`s should begin. + rng_seed (`Optional[int]`): The seed to use for the random number + generation. Returns: A `ReleasePolicy` instance with the required parameters.