-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathReplayBuffer.py
86 lines (70 loc) · 3.17 KB
/
ReplayBuffer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
from collections import deque
import numpy as np
class UniformReplay:
"""
Class for the uniform experience replay
"""
def __init__(self, maxlen):
self.experiences = deque(maxlen=maxlen)
def sample(self, batch_size):
ids = np.random.randint(len(self.experiences), size=batch_size)
states = []
actions = []
rewards = []
states_next = []
dones = []
for idx in ids:
states.append(self.experiences[idx][0])
actions.append(self.experiences[idx][1])
rewards.append(self.experiences[idx][2])
states_next.append(self.experiences[idx][3])
dones.append(self.experiences[idx][4])
return np.asarray(states), np.asarray(actions), np.asarray(rewards), np.asarray(states_next), np.asarray(dones)
def add(self, experience):
self.experiences.append(experience)
class PrioritizedReplay:
def __init__(self, maxlen, epsilon=0.001):
self.experiences = deque(maxlen=maxlen)
# Parameters initialised according to the paper values
self.alpha = 0.6
self.beta = 0.4
self.beta_increment = 0.0005
self.epsilon = epsilon
def add(self, experience):
# Adding the experience and appending a td_error of 1 as initialisation
self.experiences.append(experience + (1,))
def sample(self, batch_size):
# Updating beta value
self.beta = min(1, self.beta + self.beta_increment)
# Calculate the value of each element
probabilities = np.asarray(list(map(lambda x: x[5] ** self.alpha, self.experiences)))
# Normalize it in order to have probabilities
probabilities /= np.sum(probabilities)
# Pick batch_size indexes according to the given probability distribution
ids = np.random.choice(len(self.experiences), batch_size, p=probabilities)
states = []
actions = []
rewards = []
states_next = []
dones = []
# Calculate the importance sampling value
importance_samplings = np.power(len(self.experiences) * probabilities[ids], -self.beta)
importance_samplings /= importance_samplings.max()
# Build the data structures that have to be returned
for idx in ids:
states.append(self.experiences[idx][0])
actions.append(self.experiences[idx][1])
rewards.append(self.experiences[idx][2])
states_next.append(self.experiences[idx][3])
dones.append(self.experiences[idx][4])
return np.asarray(states), np.asarray(actions), np.asarray(rewards), np.asarray(states_next), np.asarray(
dones), ids, importance_samplings
# Method to update the priority of the samples after have been fed to the network
def update_priority(self, ids, td_errors):
for td_index, idx in enumerate(ids):
td_errors = np.abs(td_errors) + self.epsilon
# Update experiences td_error
self.experiences[idx] = (
self.experiences[idx][0], self.experiences[idx][1], self.experiences[idx][2], self.experiences[idx][3],
self.experiences[idx][4],
td_errors[td_index])