forked from M-J-Murray/MAMEToolkit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathEnvironment.py
221 lines (193 loc) · 8.49 KB
/
Environment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
from MAMEToolkit.emulator.Emulator import Emulator
from MAMEToolkit.emulator.Address import Address
from MAMEToolkit.sf_environment.Steps import *
from MAMEToolkit.sf_environment.Actions import Actions
# Combines the data of multiple time steps
def add_rewards(old_data, new_data):
for k in old_data.keys():
if "rewards" in k:
for player in old_data[k]:
new_data[k][player] += old_data[k][player]
return new_data
# Returns the list of memory addresses required to train on Street Fighter
def setup_memory_addresses():
return {
"fighting": Address('0x02011389', 'u8'),
"winsP1": Address('0x02011383', 'u8'),
"winsP2": Address('0x02011385', 'u8'),
"healthP1": Address('0x02068D0A', 's16'),
"healthP2": Address('0x020691A2', 's16')
}
# Converts and index (action) into the relevant movement action Enum, depending on the player
def index_to_move_action(action):
return {
0: [Actions.P1_LEFT],
1: [Actions.P1_LEFT, Actions.P1_UP],
2: [Actions.P1_UP],
3: [Actions.P1_UP, Actions.P1_RIGHT],
4: [Actions.P1_RIGHT],
5: [Actions.P1_RIGHT, Actions.P1_DOWN],
6: [Actions.P1_DOWN],
7: [Actions.P1_DOWN, Actions.P1_LEFT],
8: []
}[action]
# Converts and index (action) into the relevant attack action Enum, depending on the player
def index_to_attack_action(action):
return {
0: [Actions.P1_JPUNCH],
1: [Actions.P1_SPUNCH],
2: [Actions.P1_FPUNCH],
3: [Actions.P1_JPUNCH, Actions.P1_SPUNCH],
4: [Actions.P1_SKICK],
5: [Actions.P1_FKICK],
6: [Actions.P1_RKICK],
7: [Actions.P1_SKICK, Actions.P1_FKICK],
8: [Actions.P1_JPUNCH, Actions.P1_SKICK],
9: []
}[action]
# The Street Fighter specific interface for training an agent against the game
class Environment(object):
# env_id - the unique identifier of the emulator environment, used to create fifo pipes
# difficulty - the difficult to be used in story mode gameplay
# frame_ratio, frames_per_step - see Emulator class
# render, throttle, debug - see Console class
def __init__(self, env_id, roms_path, difficulty=3, frame_ratio=3, frames_per_step=3, render=True, throttle=False, debug=False):
self.difficulty = difficulty
self.frame_ratio = frame_ratio
self.frames_per_step = frames_per_step
self.throttle = throttle
self.emu = Emulator(env_id, roms_path, "sfiii3n", setup_memory_addresses(), frame_ratio=frame_ratio, render=render, throttle=throttle, debug=debug)
self.started = False
self.expected_health = {"P1": 0, "P2": 0}
self.expected_wins = {"P1": 0, "P2": 0}
self.round_done = False
self.stage_done = False
self.game_done = False
self.stage = 1
# Runs a set of action steps over a series of time steps
# Used for transitioning the emulator through non-learnable gameplay, aka. title screens, character selects
def run_steps(self, steps):
for step in steps:
for i in range(step["wait"]):
self.emu.step([])
self.emu.step([action.value for action in step["actions"]])
# Must be called first after creating this class
# Sends actions to the game until the learnable gameplay starts
# Returns the first few frames of gameplay
def start(self):
if self.throttle:
for i in range(int(250/self.frame_ratio)):
self.emu.step([])
self.run_steps(set_difficulty(self.frame_ratio, self.difficulty))
self.run_steps(start_game(self.frame_ratio))
frames = self.wait_for_fight_start()
self.started = True
return frames
# Observes the game and waits for the fight to start
def wait_for_fight_start(self):
data = self.emu.step([])
while data["fighting"] == 0:
data = self.emu.step([])
self.expected_health = {"P1": data["healthP1"], "P2": data["healthP2"]}
data = self.gather_frames([])
return data["frame"]
def reset(self):
if self.game_done:
return self.new_game()
elif self.stage_done:
return self.next_stage()
elif self.round_done:
return self.next_round()
else:
raise EnvironmentError("Reset called while gameplay still running")
# To be called when a round finishes
# Performs the necessary steps to take the agent to the next round of gameplay
def next_round(self):
self.round_done = False
self.expected_health = {"P1": 0, "P2": 0}
return self.wait_for_fight_start()
# To be called when a game finishes
# Performs the necessary steps to take the agent(s) to the next game and resets the necessary book keeping variables
def next_stage(self):
self.wait_for_continue()
self.run_steps(next_stage(self.frame_ratio))
self.expected_health = {"P1": 0, "P2": 0}
self.expected_wins = {"P1": 0, "P2": 0}
self.round_done = False
self.stage_done = False
return self.wait_for_fight_start()
def new_game(self):
self.wait_for_continue()
self.run_steps(new_game(self.frame_ratio))
self.expected_health = {"P1": 0, "P2": 0}
self.expected_wins = {"P1": 0, "P2": 0}
self.round_done = False
self.stage_done = False
self.game_done = False
self.stage = 1
return self.wait_for_fight_start()
# Steps the emulator along until the screen goes black at the very end of a game
def wait_for_continue(self):
data = self.emu.step([])
if self.frames_per_step == 1:
while data["frame"].sum() != 0:
data = self.emu.step([])
else:
while data["frame"][0].sum() != 0:
data = self.emu.step([])
# Steps the emulator along until the round is definitely over
def run_till_victor(self, data):
while self.expected_wins["P1"] == data["winsP1"] and self.expected_wins["P2"] == data["winsP2"]:
data = add_rewards(data, self.sub_step([]))
self.expected_wins = {"P1":data["winsP1"], "P2":data["winsP2"]}
return data
# Checks whether the round or game has finished
def check_done(self, data):
if data["fighting"] == 0:
data = self.run_till_victor(data)
self.round_done = True
if data["winsP1"] == 2:
self.stage_done = True
self.stage += 1
if data["winsP2"] == 2:
self.game_done = True
return data
# Collects the specified amount of frames the agent requires before choosing an action
def gather_frames(self, actions):
data = self.sub_step(actions)
frames = [data["frame"]]
for i in range(self.frames_per_step - 1):
data = add_rewards(data, self.sub_step(actions))
frames.append(data["frame"])
data["frame"] = frames[0] if self.frames_per_step == 1 else frames
return data
# Steps the emulator along by one time step and feeds in any actions that require pressing
# Takes the data returned from the step and updates book keeping variables
def sub_step(self, actions):
data = self.emu.step([action.value for action in actions])
p1_diff = (self.expected_health["P1"] - data["healthP1"])
p2_diff = (self.expected_health["P2"] - data["healthP2"])
self.expected_health = {"P1": data["healthP1"], "P2": data["healthP2"]}
rewards = {
"P1": (p2_diff-p1_diff),
"P2": (p1_diff-p2_diff)
}
data["rewards"] = rewards
return data
# Steps the emulator along by the requested amount of frames required for the agent to provide actions
def step(self, move_action, attack_action):
if self.started:
if not self.round_done and not self.stage_done and not self.game_done:
actions = []
actions += index_to_move_action(move_action)
actions += index_to_attack_action(attack_action)
data = self.gather_frames(actions)
data = self.check_done(data)
return data["frame"], data["rewards"], self.round_done, self.stage_done, self.game_done
else:
raise EnvironmentError("Attempted to step while characters are not fighting")
else:
raise EnvironmentError("Start must be called before stepping")
# Safely closes emulator
def close(self):
self.emu.close()