Skip to content

Commit

Permalink
Merge branch 'ray-osdi-experiment' into alind/experiments_alibaba
Browse files Browse the repository at this point in the history
  • Loading branch information
alindkhare committed Dec 1, 2023
2 parents b2ea0eb + 2fe3dcf commit e6867b1
Show file tree
Hide file tree
Showing 8 changed files with 622,533 additions and 179 deletions.
4 changes: 3 additions & 1 deletion data/csv_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]):
schedulers = []
for reading in csv_readings:
try:
# TODO: This
if reading[1] == "SIMULATOR_START":
simulator = Simulator(
csv_path=csv_path,
start_time=int(reading[0]),
total_tasks=reading[2],
)
elif reading[1] == "UPDATE_WORKLOAD":
simulator.total_tasks = int(reading[2])
elif reading[1] == "SIMULATOR_END":
assert (
simulator is not None
Expand Down
2 changes: 1 addition & 1 deletion data/csv_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def update_task_schedule(self, csv_reading: List[str]):


class Simulator(object):
def __init__(self, csv_path: str, start_time: int, total_tasks: int):
def __init__(self, csv_path: str, start_time: int, total_tasks: int = 0):
self.csv_path = csv_path
self.start_time = start_time
self.total_tasks = total_tasks
Expand Down
622,457 changes: 622,297 additions & 160 deletions experiments/analysis.ipynb

Large diffs are not rendered by default.

173 changes: 173 additions & 0 deletions experiments/analysis_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import os

Check failure on line 1 in experiments/analysis_utils.py

View workflow job for this annotation

GitHub Actions / Python 3.9 Build

Imports are incorrectly sorted and/or formatted.

Check failure on line 1 in experiments/analysis_utils.py

View workflow job for this annotation

GitHub Actions / Python 3.9 Build

Imports are incorrectly sorted and/or formatted.
from matplotlib import pyplot as plt
import numpy as np
import pandas as pd

def calculate_arrival_rate_and_cv2(release_time: list[int]):
release_time.sort()
inter_arrival_times = np.diff(release_time)
avg_inter_arrival_time = np.mean(inter_arrival_times)
std_inter_arrival_time = np.std(inter_arrival_times)
cv2 = (std_inter_arrival_time/avg_inter_arrival_time) ** 2
return 1/avg_inter_arrival_time, cv2

def find_all_file_paths(path, ends_with=".csv"):
csv_file_paths = []
if os.path.isdir(path):
for filename in os.listdir(path):
if filename.endswith(ends_with):
csv_file_paths.append(os.path.join(path, filename))
else:
csv_file_paths += find_all_file_paths(os.path.join(path, filename), ends_with)
return csv_file_paths

def extract_variables_from_filename(filename):
# Split the filename by underscores
parts = filename.split('_')

# Extract the variables based on your format
replay_trace = parts[0]
scheduler = parts[2]
release_policy = parts[5]
deadline_var = int(parts[9])
dag_aware = parts[12] == "1"

try:
arrival_rate = float(parts[16])
cv2 = int(parts[19].split('.')[0]) # Assuming the file extension is .csv
except:
# Before 11/28 afternoon, I used a different format for the filename and didn't include the arrival rate and CV2
arrival_rate = 10
cv2 = 2

if scheduler == "TetriSched":
scheduler_time_discretization = int(parts[-1].split('.')[0])
scheduler = f"TetriSched_time_dis_{scheduler_time_discretization}" + ("_DAG_aware" if dag_aware else "")
else:
scheduler_time_discretization = None

# Create a dictionary to store the extracted variables
variables = {
'trace': replay_trace,
'release_policy': release_policy,
'max_deadline_variance': deadline_var,
'scheduler': scheduler,
'DAG_aware': dag_aware,
'scheduler_time_discretization': scheduler_time_discretization,
"arrival_rate": arrival_rate,
"cv2": cv2,
}

return variables


def extract_experiments_result(base_dir: str) -> pd.DataFrame:
rows = []
# Loop through each folder and process the CSV file
for csv_file_path in find_all_file_paths(base_dir):
file_name = csv_file_path.split(os.sep)[-1]
try:
# Open the CSV file and read the last line
with open(csv_file_path, 'r') as file:
lines = file.readlines()
last_line = lines[-1]

end_time, _, finished_tasks, cancelled_tasks, missed_task_deadlines, finished_task_graphs, cancelled_task_graphs, missed_task_graph_deadlines = last_line.split(",")
row = extract_variables_from_filename(file_name)
# Analyze SLO attainment and goodput
slo_attainment = (int(finished_task_graphs) - int(missed_task_graph_deadlines)) / (int(cancelled_task_graphs) + int(finished_task_graphs))
row["slo_attainment"] = slo_attainment
row["goodput"] = int(finished_tasks)
row["csv_file_path"] = csv_file_path

# Calculate the arrival rate and cv2
release_times = []
for line in lines:
if "TASK_RELEASE" not in line:
continue
# event_time should be the actual release time
event_time, _, task_name, _, task_intended_release_time, task_release_time, task_deadline, task_id, task_graph = line.strip().split(",")
release_times.append(int(task_release_time))

actual_arrival_rate, actual_cv2 = calculate_arrival_rate_and_cv2(release_times)
row["actual_arrival_rate"] = actual_arrival_rate
row["actual_cv2"] = actual_cv2

rows.append(row)
except FileNotFoundError:
print(f"File not found: {csv_file_path}")
except Exception as e:
print(f"An error occurred while processing {csv_file_path}: {str(e)}")
# I want to remove the parent folder of the CSV file
# print(f"Removing {os.path.dirname(csv_file_path)}")
# shutil.rmtree(os.path.dirname(csv_file_path))

return pd.DataFrame(rows)


def plot_slo_attainments(data: pd.DataFrame):
# Define your unique values for the grid
cv2_values = sorted(data["cv2"].unique())
arrival_rate_values = sorted(data["arrival_rate"].unique())
scheduler_values = ["TetriSched_time_dis_20", "TetriSched_time_dis_20_DAG_aware", "TetriSched_time_dis_10",
"TetriSched_time_dis_10_DAG_aware", "TetriSched_time_dis_1", "TetriSched_time_dis_1_DAG_aware", "EDF"]

# Number of schedulers
n_schedulers = len(scheduler_values)

# Create a subplot grid
fig, axes = plt.subplots(len(arrival_rate_values), len(cv2_values), figsize=(20, 15), sharey=True)

# Define the width of each bar and the spacing between them
bar_width = 0.20
spacing = 0.05
group_width_factor = 2 # Increase this factor to widen the distance between groups

# Collect handles and labels for the legend
handles, labels = [], []

# Iterate over each subplot and plot the data
for i, arrival_rate in enumerate(arrival_rate_values):
for j, cv2 in enumerate(cv2_values):
ax = axes[i][j]
subset = data[(data['arrival_rate'] == arrival_rate) & (data['cv2'] == cv2)]

# Get unique deadline variances
deadline_vars = sorted(subset['max_deadline_variance'].unique())
x = np.arange(len(deadline_vars)) * group_width_factor # Adjust x positions

for k, scheduler in enumerate(scheduler_values):
scheduler_data = subset[subset['scheduler'] == scheduler]
# Calculate the position of each bar
bar_positions = x - (n_schedulers * bar_width / 2) + (k * bar_width) + (spacing * k)
# Some bars may not exist for some schedulers
slo_attainments = []
for deadline_var in deadline_vars:
if len(scheduler_data[scheduler_data['max_deadline_variance'] == deadline_var]['slo_attainment']) == 0:
slo_attainments.append(0)
else:
slo_attainments.append(scheduler_data[scheduler_data['max_deadline_variance'] == deadline_var]['slo_attainment'].item())

ax.bar(bar_positions, slo_attainments, width=bar_width, label=scheduler)

for c in ax.containers:
labels = [f'{(v.get_height() * 100):.1f}' for v in c]
ax.bar_label(c, labels=labels, label_type='edge', rotation=45, size=8)

ax.set_xticks(x)
ax.set_xticklabels(deadline_vars)
ax.set_title(f"Arrival Rate: {subset['actual_arrival_rate'].mean():.2f}, CV2: {subset['actual_cv2'].mean():.2f}")
ax.set_xlabel('Max Deadline Variance')
ax.set_ylabel('SLO Attainment')

# Adjust layout and add a super title
plt.tight_layout()
plt.subplots_adjust(top=0.9) # Adjust the bottom parameter to make space for the legend

handles, labels = ax.get_legend_handles_labels()
fig.legend(handles, labels, loc='upper center', bbox_to_anchor=(0.5, 0.95), ncol=len(labels))

plt.suptitle('SLO Attainment Comparison (min_deadline_var=10, num_invocation=400) 11_29_2023', size=16)

# Show the plot
plt.show()
41 changes: 34 additions & 7 deletions schedulers/tetrisched/src/Expression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,7 @@ std::string Expression::getDescriptiveName() const {
return this->getTypeString() + "(" + name + ")";
}

uint32_t Expression::getResourceQuantity() const {
return 0;
}
uint32_t Expression::getResourceQuantity() const { return 0; }

/* Method definitions for ChooseExpression */

Expand Down Expand Up @@ -565,7 +563,13 @@ SolutionResultPtr ChooseExpression::populateResults(
name, solution->startTime.value(), solution->endTime.value());
for (const auto& [partitionId, variable] : partitionVariables) {
auto variableValue = variable->getValue();
if (variableValue == 0) {
if (!variableValue.has_value()) {
throw tetrisched::exceptions::ExpressionSolutionException(
"ChooseExpression " + name +
" was parsed with a variable that does not have a value.");
}

if (std::abs(variableValue.value()) < 0.1) {
// This partition was not used.
continue;
}
Expand Down Expand Up @@ -869,10 +873,23 @@ SolutionResultPtr WindowedChooseExpression::populateResults(
name, solution->startTime.value(), solution->endTime.value());
for (auto& [chooseTime, placementVariable] : placementTimeVariables) {
auto placementVariableValue = placementVariable->getValue();
if (placementVariableValue == 0) {
if (!placementVariableValue.has_value()) {
throw tetrisched::exceptions::ExpressionSolutionException(
"WindowedChooseExpression " + name +
" was parsed with a variable that does not have a value.");
}

if (std::abs(placementVariableValue.value()) < 0.1) {
// This placement was not used.
TETRISCHED_DEBUG("[WindowedChoose] The placement for time "
<< chooseTime << " and task " << name << " was 0.")
continue;
}
TETRISCHED_DEBUG("[WindowedChoose] The placement for time "
<< chooseTime << " and task " << name
<< " was 1 because the placementValueVariable with name "
<< placementVariable->getName() << " was "
<< *placementVariableValue << ".")

if (placementPartitionVariables.find(chooseTime) ==
placementPartitionVariables.end()) {
Expand All @@ -886,7 +903,12 @@ SolutionResultPtr WindowedChooseExpression::populateResults(
for (auto& [partitionId, allocationVariable] :
placementPartitionVariables[chooseTime]) {
auto allocationVariableValue = allocationVariable->getValue();
if (allocationVariableValue == 0) {
if (!allocationVariableValue.has_value()) {
throw tetrisched::exceptions::ExpressionSolutionException(
"WindowedChooseExpression " + name +
" was parsed with a variable that does not have a value.");
}
if (std::abs(allocationVariableValue.value()) < 0.1) {
// This partition was not used.
continue;
}
Expand Down Expand Up @@ -1257,7 +1279,12 @@ SolutionResultPtr MalleableChooseExpression::populateResults(
name, solution->startTime.value(), solution->endTime.value());
for (const auto& [partitionPair, variable] : partitionVariables) {
auto variableValue = variable->getValue();
if (variableValue == 0) {
if (!variableValue.has_value()) {
throw tetrisched::exceptions::ExpressionSolutionException(
"MalleableChooseExpression " + name +
" was parsed with a variable that does not have a value.");
}
if (std::abs(variableValue.value()) < 0.1) {
// This partition was not used.
continue;
}
Expand Down
22 changes: 18 additions & 4 deletions schedulers/tetrisched/src/GurobiSolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <chrono>
#include <thread>
#include <cmath>

namespace tetrisched {
GurobiSolver::GurobiSolver()
Expand Down Expand Up @@ -263,12 +264,25 @@ SolverSolutionPtr GurobiSolver::solveModel() {
" was not found in the Gurobi model.");
}
switch (variable->variableType) {
case tetrisched::VariableType::VAR_INTEGER:
case tetrisched::VariableType::VAR_CONTINUOUS:
case tetrisched::VariableType::VAR_INDICATOR:
variable->solutionValue =
case tetrisched::VariableType::VAR_CONTINUOUS: {
double variableValue =
gurobiVariables.at(variableId).get(GRB_DoubleAttr_X);
TETRISCHED_DEBUG("Variable " << variable->getName() << "(" << variableId
<< ") has solved value " << variableValue);
variable->solutionValue = variableValue;
break;
}
case tetrisched::VariableType::VAR_INTEGER:
case tetrisched::VariableType::VAR_INDICATOR: {
// If this is an integer or an indicator variable, we round it to
// nearest integer value.
double variableValue =
std::round(gurobiVariables.at(variableId).get(GRB_DoubleAttr_X));
TETRISCHED_DEBUG("Variable " << variable->getName() << "(" << variableId
<< ") has solved value " << variableValue);
variable->solutionValue = variableValue;
break;
}
default:
throw tetrisched::exceptions::SolverException(
"Unsupported variable type: " + variable->variableType);
Expand Down
11 changes: 6 additions & 5 deletions scripts/run_alibaba_experiments_osdi.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,21 @@
# Scheduler runtimes in us.TetriSched
SCHEDULERS=(EDF TetriSched)
# MAX_DEADLINE_VARIANCES=(15 25 50 100 200)
MAX_DEADLINE_VARIANCES=(15 50 200)
MAX_DEADLINE_VARIANCES=(200 400 800)
SCHEDULER_TIME_DISCRETIZATIONS=(1 10 20)
# RELEASE_POLICIES=(fixed poisson gamma)
RELEASE_POLICIES=(gamma)
POISSON_ARRIVAL_RATES=(5 10 15)
GAMMA_COEFFICIENTS=(1 2 4)
# POISSON_ARRIVAL_RATES=(0.2 0.5 1 2)
POISSON_ARRIVAL_RATES=(0.01 0.05 0.1)
GAMMA_COEFFICIENTS=(1 2 4) #cv2
DAG_AWARENESS=(0 1) # False True

ERDOS_SIMULATOR_DIR="." # Change this to the directory where the simulator is located.
MIN_DEADLINE_VARIANCE=10
NUM_INVOCATIONS=400
NUM_INVOCATIONS=100
SCHEDULER_LOG_TIMES=10
SCHEDULER_RUNTIME=0
LOG_LEVEL=debug
LOG_LEVEL=info
REPLAY_TRACE=alibaba
WORKLOAD_PROFILE_PATH=./traces/alibaba-cluster-trace-v2018/alibaba_set_0_6600_dags.pkl
EXECUTION_MODE=replay
Expand Down
2 changes: 1 addition & 1 deletion simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,7 @@ def __handle_task_cancellation(self, event: Event) -> None:
"""
self._cancelled_tasks += 1
self._logger.info(
"[%s] (%d) The Simulator is cancelling the task %s.",
"[%s] (%s) The Simulator is cancelling the task %s.",
event.time.to(EventTime.Unit.US).time,
self._cancelled_tasks,
event.task,
Expand Down

0 comments on commit e6867b1

Please sign in to comment.