-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevaluate.py
150 lines (100 loc) · 6.31 KB
/
evaluate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import argparse
import time
import traci
import torch
import utils
from model import DQN
from traffic_generator import RandomTraffic
from sumo_env import Environment
def get_args():
parser = argparse.ArgumentParser()
# traffic generation params
parser.add_argument('--num_cars_to_generate_per_episode', type=int, default=2000)
parser.add_argument('--max_step_per_episode', type=int, default=5400)
# state representation params
parser.add_argument('--num_cells_for_edge', type=int, default=10, help="discretizing the environment space, considering 10 cells for each incomming lanes")
parser.add_argument('--input_channels', type=int, default=4, help="number of features to represents the state: num of cars, average speed, waiting times, number of queued cars")
parser.add_argument('--num_actions', type=int, default=4, help="NS-green, NSL-green, EW-green, EWL-green")
# simulation params
parser.add_argument('--total_num_episode', type=int, default=50)
parser.add_argument('--yellow_duration', type=int, default=4)
parser.add_argument('--green_duration', type=int, default=10)
parser.add_argument('--gui', type=bool, default=False, help="whether to open the sumo-gui or not")
# general
parser.add_argument('--device', type=str, default=torch.device("cuda:0" if torch.cuda.is_available() else "cpu"))
parser.add_argument('--seed', type=int, default=321654987 ,help="for reproducibility")
args = parser.parse_args()
training_config = dict()
for arg in vars(args):
training_config[arg] = getattr(args, arg)
return training_config
def evaluate_dqn(configs):
path_to_model = ""
model = DQN(configs["input_channels"], configs["num_actions"]).to(configs["device"])
checkpoint = torch.load(path_to_model)
model.load_state_dict(checkpoint['model_state_dict'])
model.cuda().eval()
# get the path for saving the logs and checkpoints based on the name of experiences
plots_path = ""
# seed everything
utils.seed_everything(configs["seed"])
# getting the configuration for sumo to be run(whether to open the interface and the path to env.sumocfg file)
sumo_cmd = utils.sumo_configs(configs["max_step_per_episode"], configs["gui"])
# will be passed to the Environment class to generate random traffic for each episode
traffic_generator = RandomTraffic(configs["num_cars_to_generate_per_episode"], configs["max_step_per_episode"])
# will simulate the whole process in sumo
env = Environment(
traffic_generator, sumo_cmd, configs["max_step_per_episode"], configs["num_cells_for_edge"],
configs["num_actions"], configs["yellow_duration"], configs["green_duration"]
)
# count_steps_for_epsilon = 0 # this will help decay the epsilon
negative_rewards_list = []
cumulative_waiting_time_list = []
# start of the simulation for each episode ---> generate random traffic and traci.start(sumo_cmd)
for episode in range(1, configs["total_num_episode"] + 1):
print(f'Episode: {episode}/{configs["total_num_episode"]} {"-"*60}')
env.start(configs["seed"] + episode)
# variables to store the statistics of each episodes
old_total_wait = 0
old_state = -1 # will store the state ----> (state, action, reward, new state)
old_action = -1 # will store the action ----> (state, action, reward, new state)
sum_neg_reward = 0 # cumulative negative reward for each episode
cumulative_waiting_times = 0 # cumulative waiting time by all vehicles in each episode
step = 0
while step < configs["max_step_per_episode"]:
# for step in range(1, configs["max_step_per_episode"] + 1):
# getting the state representation at each step (a matrix consist of 4 dimension for num of cars, average speed, waiting times, number of queued cars)
current_state = env.get_state_observation() # output'shape : (4, 24, 24)
#action from model or agent !!!!!!!!!!!!!!!
state = torch.from_numpy(current_state).float().unsqueeze(0).to(configs["device"])
action_values = model(state)
action = action_values.argmax(dim=1)[0].cpu().numpy()
# calculating the reward as a sum of waiting times (for every car) and length of the queues in the incoming roads
current_total_waiting_time = env.get_waiting_time() + env.get_queue_length()
cumulative_waiting_times += current_total_waiting_time
reward = old_total_wait - current_total_waiting_time
# activating the yellow phase if it is needed
if step != 0 and old_action != action:
env.set_yellow_phase(old_action)
step = env.simulate(step, configs["max_step_per_episode"], configs["yellow_duration"])
# activating the phase that was selected
env.set_green_phase(action)
step = env.simulate(step, configs["max_step_per_episode"], configs["green_duration"])
old_state = current_state
old_action = action
old_total_wait = current_total_waiting_time
# saving the meaningful rewards to see if agent is learning or not
if reward < 0:
sum_neg_reward += reward
# logging the statistics of the episode to tensorboard
negative_rewards_list.append(sum_neg_reward)
cumulative_waiting_time_list.append(cumulative_waiting_times)
traci.close()
print(f"Test is done")
utils.plot(range(configs["total_num_episode"]), negative_rewards_list, "episodes", "Negative_rewards", "Negative rewards for all vehicles",
"blue", f"{plots_path}/test_Negative_rewards.png")
utils.plot(range(configs["total_num_episode"]), cumulative_waiting_time_list, "episodes", "cumulative_waiting_times", "cumulative waiting times for all vehicles",
"blue", f"{plots_path}/test_cumulative_waiting_times.png")
if __name__ == "__main__":
configs = get_args()
evaluate_dqn(configs)