forked from AnyLeoPeace/DURLECA
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrl_util.py
134 lines (106 loc) · 4.47 KB
/
rl_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
'''from keras-rl https://github.com/keras-rl/keras-rl/'''
import numpy as np
from tensorflow.keras.models import model_from_config, Sequential, Model, model_from_config
import tensorflow.keras.optimizers as optimizers
import tensorflow.keras.backend as K
def clone_model(model, custom_objects={}):
# Requires Keras 1.0.7 since get_config has breaking changes.
config = {
'class_name': model.__class__.__name__,
'config': model.get_config(),
}
clone = model_from_config(config, custom_objects=custom_objects)
clone.set_weights(model.get_weights())
return clone
def clone_optimizer(optimizer):
if type(optimizer) is str:
return optimizers.get(optimizer)
# Requires Keras 1.0.7 since get_config has breaking changes.
params = dict([(k, v) for k, v in optimizer.get_config().items()])
config = {
'class_name': optimizer.__class__.__name__,
'config': params,
}
if hasattr(optimizers, 'optimizer_from_config'):
# COMPATIBILITY: Keras < 2.0
clone = optimizers.optimizer_from_config(config)
else:
clone = optimizers.deserialize(config)
return clone
def get_soft_target_model_updates(target, source, tau):
target_weights = target.trainable_weights + sum([l.non_trainable_weights for l in target.layers], [])
source_weights = source.trainable_weights + sum([l.non_trainable_weights for l in source.layers], [])
assert len(target_weights) == len(source_weights)
# Create updates.
updates = []
for tw, sw in zip(target_weights, source_weights):
updates.append((tw, tau * sw + (1. - tau) * tw))
return updates
def get_object_config(o):
if o is None:
return None
config = {
'class_name': o.__class__.__name__,
'config': o.get_config()
}
return config
def huber_loss(y_true, y_pred, clip_value):
# Huber loss, see https://en.wikipedia.org/wiki/Huber_loss and
# https://medium.com/@karpathy/yes-you-should-understand-backprop-e2f06eab496b
# for details.
assert clip_value > 0.
x = y_true - y_pred
if np.isinf(clip_value):
# Spacial case for infinity since Tensorflow does have problems
# if we compare `K.abs(x) < np.inf`.
return .5 * K.square(x)
condition = K.abs(x) < clip_value
squared_loss = .5 * K.square(x)
linear_loss = clip_value * (K.abs(x) - .5 * clip_value)
if K.backend() == 'tensorflow':
import tensorflow as tf
if hasattr(tf, 'select'):
return tf.select(condition, squared_loss, linear_loss) # condition, true, false
else:
return tf.where(condition, squared_loss, linear_loss) # condition, true, false
elif K.backend() == 'theano':
from theano import tensor as T
return T.switch(condition, squared_loss, linear_loss)
else:
raise RuntimeError('Unknown backend "{}".'.format(K.backend()))
class AdditionalUpdatesOptimizer(optimizers.Optimizer):
def __init__(self, name, optimizer, additional_updates):
super(AdditionalUpdatesOptimizer, self).__init__(name)
self.optimizer = optimizer
self.additional_updates = additional_updates
def get_updates(self, params, loss):
updates = self.optimizer.get_updates(params=params, loss=loss)
updates += self.additional_updates
self.updates = updates
return self.updates
def get_config(self):
return self.optimizer.get_config()
# Based on https://github.com/openai/baselines/blob/master/baselines/common/mpi_running_mean_std.py
class WhiteningNormalizer(object):
def __init__(self, shape, eps=1e-2, dtype=np.float64):
self.eps = eps
self.shape = shape
self.dtype = dtype
self._sum = np.zeros(shape, dtype=dtype)
self._sumsq = np.zeros(shape, dtype=dtype)
self._count = 0
self.mean = np.zeros(shape, dtype=dtype)
self.std = np.ones(shape, dtype=dtype)
def normalize(self, x):
return (x - self.mean) / self.std
def denormalize(self, x):
return self.std * x + self.mean
def update(self, x):
if x.ndim == len(self.shape):
x = x.reshape(-1, *self.shape)
assert x.shape[1:] == self.shape
self._count += x.shape[0]
self._sum += np.sum(x, axis=0)
self._sumsq += np.sum(np.square(x), axis=0)
self.mean = self._sum / float(self._count)
self.std = np.sqrt(np.maximum(np.square(self.eps), self._sumsq / float(self._count) - np.square(self.mean)))