-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathq_learning.py
375 lines (295 loc) · 14.6 KB
/
q_learning.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
import random
import pprint
import os
import pickle
import json
import sys
import pandas as pd
from tqdm import tqdm
import connect_four
sys.path.insert(1, 'MCTS/connect4/src')
from tournament import Tournament
from mcts.mcts_strategy import MctsStrategy
from game import Game
from player import Player
from board import Board
class QLearning:
def __init__(self, width, height, AI_first = True, seed = 0):
# stores tuples (game_board_state (2d tuple), input_column (int))
# This starts off as empty, and states are created as episodes are generated
self.Q_function = {}
self.AI_first = AI_first
self.width = width
self.height = height
random.seed(seed)
self.experiments_folder = "./experiments"
if not os.path.exists(self.experiments_folder):
os.makedirs(self.experiments_folder)
self.opponents_dict = {
"self_play": self.self_play_opponent,
"random": self.random_opponent,
"leftmost": self.leftmost_opponent,
"mcts_5": self.mcts_5_opponent,
"mcts_25": self.mcts_25_opponent,
"mcts_50": self.mcts_50_opponent
}
self.train_settings = None
self.curr_training_iter = 0
# used for self-play
self.Q_function_history= []
self.curr_iter_Q_index = {}
# converts a 2D array to a 2D tuple
def array2tuple(self, array):
return tuple(tuple(a) for a in array)
# given a Q function dict and a board state tuple x, get argmin_u' Q(x,u') and min_u' Q(x,u') over all valid u'
# if there are no entries in Q for x, none will be returned
def minQ(self, x, Q_dict):
best_control = None
smallest_Q = None
controls = connect_four.ConnectFour.get_valid_inputs(x)
random.shuffle(controls)
for u in controls:
if (x, u) in Q_dict:
if smallest_Q is None or Q_dict[(x, u)] < smallest_Q:
smallest_Q = Q_dict[(x, u)]
best_control = u
return best_control, smallest_Q
# an opponent which uses the Q function as the next move
def self_play_opponent(self, state_tuple):
iterations_save = 100000
max_history_size = 1
curr_iteration = self.curr_training_iter
epsilon = 0.05
# managining history
if len(self.Q_function_history) == 0 or curr_iteration % iterations_save == 0:
self.Q_function_history.append(self.Q_function.copy())
if len(self.Q_function_history) > max_history_size:
self.Q_function_history.pop(0)
if len(self.curr_iter_Q_index) > 1:
raise
# assign a Q index for the current iteration
if curr_iteration not in self.curr_iter_Q_index:
self.curr_iter_Q_index = {curr_iteration:random.randint(0,len(self.Q_function_history))}
if random.uniform(0,1) < epsilon:
move = random.choice(connect_four.ConnectFour.get_valid_inputs(state_tuple))
else:
Q_index = self.curr_iter_Q_index[curr_iteration]
if Q_index == len(self.Q_function_history):
move, _ = self.minQ(state_tuple, self.Q_function)
else:
move, _ = self.minQ(state_tuple, self.Q_function_history[Q_index])
# if state is not in the previous Q function, just return random
if move is None:
move = random.choice(connect_four.ConnectFour.get_valid_inputs(state_tuple))
return move
# an opponent which uses the Q function as the next move
def self_play_randomized_opponent(self, state_tuple):
num_iterations = self.train_settings['num_iterations']
curr_iteration = self.curr_training_iter
#epsilon = 0.25
# epsilon starts at 1 (all random) and moves to 0.05 as a function of the current iteration of training
epsilon = 1-min(curr_iteration/(num_iterations/2), 1.0-0.05)
if random.uniform(0,1) < epsilon:
#move = random.choice(cf_game.get_valid_inputs(cf_game.board_state))
move = random.choice(connect_four.ConnectFour.get_valid_inputs(state_tuple))
else:
move, _ = self.minQ(state_tuple, self.Q_function)
return move
# an opponent which always just picks a random valid column as the next move
def random_opponent (self, state_tuple):
move = random.choice(connect_four.ConnectFour.get_valid_inputs(state_tuple))
return move
# an opponent which always pick the leftmost valid column as the next move
def leftmost_opponent(self, state_tuple):
move = min(connect_four.ConnectFour.get_valid_inputs(state_tuple))
return move
def mcts_opponent(self, state_tuple, rollout_limit):
state_arr = [[str(t2) for t2 in t] for t in state_tuple]
b = Board(self.width, self.height)
b.board = state_arr[::-1]
players = [Player('1'), Player('2')]
game_number = 1
g = Game(b, players, game_number)
# fix playerid
playerid = '2'
mcts_s = MctsStrategy(rollout_limit)
move = mcts_s.move(g, playerid)
return move
def mcts_5_opponent(self, state_tuple):
return self.mcts_opponent(state_tuple, 5)
def mcts_25_opponent(self, state_tuple):
return self.mcts_opponent(state_tuple, 25)
def mcts_50_opponent(self, state_tuple):
return self.mcts_opponent(state_tuple, 50)
# generate episode with a epsilon-greedy policy derived from Q
# epsilon in [0,1] is the percent chance that we pick a random control instead of the optimal
# this also uses self-play (player 1 is AI, player 2 is AI's opponnent)
def generate_episode(self, opponent, epsilon = 0):
ai_episode = []
opponent_episode = []
curr_player = 1 if self.AI_first else 2
cf_game = connect_four.ConnectFour(self.width, self.height)
while True:
curr_state_tuple = self.array2tuple(cf_game.board_state)
# add state to Q-function for all valid controls with value 0 if it's not in yet
# TODO: optimize this; this actually only needs to be outside of the AI if-statement below for self-play opponent
for possible_control in cf_game.get_valid_inputs(cf_game.board_state):
Q_entry = (curr_state_tuple, possible_control)
if Q_entry not in self.Q_function:
self.Q_function[Q_entry] = 0
# AI picks move using epsilon-greedy policy
if curr_player == 1:
if random.uniform(0,1) < epsilon:
move = random.choice(cf_game.get_valid_inputs(cf_game.board_state))
else:
move, _ = self.minQ(curr_state_tuple, self.Q_function)
# update the episode
ai_episode.append(curr_state_tuple)
ai_episode.append(move)
# opponent picks move using specified opponent type
else:
move = opponent(curr_state_tuple)
# update the episode
opponent_episode.append(curr_state_tuple)
opponent_episode.append(move)
cf_game.update_board(move, curr_player)
game_outcome = cf_game.get_status()
if game_outcome != 0:
ai_episode.append(self.array2tuple(cf_game.board_state))
opponent_episode.append(self.array2tuple(cf_game.board_state))
break
# switch to next player
curr_player = 2 if curr_player == 1 else 1
return ai_episode, opponent_episode, game_outcome
# test the current Q_function on a list of opponents. Opponents list should be in form [[name, ]]
# returns a dict (keyed by test name) of dicts,
# where each dict specifies number of wins, losses, and ties for that opponent
# also returns the unexplored rate
# TODO: make better unexplored rate which also takes into account the control
def test(self, iterations, opponent_names):
test_results = {}
unexplored_rate = 0
num_moves = 0
opponents = [self.opponents_dict[opponent_name] for opponent_name in opponent_names]
for opponent_name, opponent in zip(opponent_names, opponents):
wins = 0
ties = 0
losses = 0
for i in (range(iterations)):
curr_player = 1 if self.AI_first else 2
cf_game = connect_four.ConnectFour(self.width, self.height)
while True:
curr_state_tuple = self.array2tuple(cf_game.board_state)
# AI's turn
if curr_player == 1:
move, _ = self.minQ(curr_state_tuple, self.Q_function)
# if Q does not have an entry for the current state x, just pick a random move
if move is None:
unexplored_rate += 1
move = random.choice(cf_game.get_valid_inputs(cf_game.board_state))
num_moves += 1
# opponent's turn
else:
move = opponent(curr_state_tuple)
cf_game.update_board(move, curr_player)
game_outcome = cf_game.get_status()
if game_outcome != 0:
if game_outcome == 1:
wins += 1
elif game_outcome == 2:
losses += 1
else:
ties += 1
break
# switch to next player
curr_player = 2 if curr_player == 1 else 1
opponent_results = {"wins":wins, "ties":ties, "losses": losses}
test_results[opponent_name] = opponent_results
return test_results, unexplored_rate/num_moves
# update step for loss-based Q learning
def Q_learning_update(self, episode, game_outcome, train_settings):
learning_rate = train_settings['learning_rate']
discount_factor = train_settings['discount_factor']
outcome2loss = {1:-1, 2:1, 3:-0.5}
end_loss = outcome2loss[game_outcome]
i=0
while i < len(episode)-2:
x = episode[i]
u = episode[i+1]
x_prime = episode[i+2]
if i == len(episode)-3:
loss = end_loss
min_x_prime = 0
else:
loss = 0
min_x_prime = self.minQ(x_prime, self.Q_function)[1]
self.Q_function[(x,u)] = self.Q_function[(x,u)] + learning_rate * (loss + discount_factor*min_x_prime - self.Q_function[(x,u)])
i += 2
def MC_PI_update(self, episode, game_outcome, train_settings):
learning_rate = train_settings['learning_rate']
outcome2loss = {1:-1, 2:1, 3:-0.5}
end_loss = outcome2loss[game_outcome]
i=0
while i < len(episode)-2:
x = episode[i]
u = episode[i+1]
self.Q_function[(x,u)] = self.Q_function[(x,u)] + learning_rate * (end_loss - self.Q_function[(x,u)])
i += 2
# in train_settings:
# - alg_type in {"MC", "Q"}
# - policy_opponent in {"self_play", "random", "leftmost"}
# - test_opponents is a list with elements in {"self_play", "random", "leftmost"}
# if experiment_name not specified, results not saved
# epsilon in [0,1] is the percent chance that we pick a random control instead of the optimal
def train(self, train_settings):
self.train_settings = train_settings
alg_type = train_settings['alg_type']
learning_rate = train_settings['learning_rate']
discount_factor = train_settings['discount_factor']
episode_epsilon = train_settings['episode_epsilon']
num_iterations = train_settings['num_iterations']
test_every = train_settings['test_every']
experiment_name = train_settings['experiment_name']
test_opponents = train_settings['test_opponents']
policy_opponent = self.opponents_dict[train_settings['policy_opponent']]
# write settings dict to file
experiment_dir = os.path.join(self.experiments_folder, experiment_name)
if not os.path.exists(experiment_dir):
os.makedirs(experiment_dir)
with open(os.path.join(experiment_dir,'training_settings.txt'), 'w') as file:
file.write(json.dumps(train_settings))
train_df = pd.DataFrame()
for iteration in tqdm(range(num_iterations)):
self.curr_training_iter = iteration
ai_episode, opponent_episode, game_outcome_ai = self.generate_episode(policy_opponent, epsilon = episode_epsilon)
if game_outcome_ai == 1:
game_outcome_opponent = 2
elif game_outcome_ai == 2:
game_outcome_opponent = 1
elif game_outcome_ai == 3:
game_outcome_opponent = 3
if alg_type == "Q":
self.Q_learning_update(ai_episode, game_outcome_ai, train_settings)
# Note: for simplicity we always also update Q value for opponent but
# this actually only needs to be done when policy_opponent is self_play
self.Q_learning_update(opponent_episode, game_outcome_opponent, train_settings)
elif alg_type == "MC":
self.MC_PI_update(ai_episode, game_outcome_ai, train_settings)
self.MC_PI_update(opponent_episode, game_outcome_opponent, train_settings)
else:
raise
# periodically, print win rate against random opponents, update df, update Q function
if iteration % test_every == 0 or iteration == num_iterations-1 :
test_results, unexplored_rate = self.test(80, test_opponents)
df_log = {"iteration": iteration, "Q_function_size":len(self.Q_function), "unexplored_states_rate": unexplored_rate}
for opponent in test_results:
wins = test_results[opponent]["wins"]
ties = test_results[opponent]["ties"]
losses = test_results[opponent]["losses"]
df_log[opponent +"_win_rate"] = wins / (ties + losses + wins)
tqdm.write(str(df_log))
train_df = train_df.append(df_log, ignore_index = True)
if experiment_name != "":
train_df.to_pickle(os.path.join(experiment_dir, "training_df.p"))
pickle.dump(self.Q_function, open(os.path.join(experiment_dir,"Q_function.p"), "wb"))
return train_df