-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy path2x128_CNN.py
225 lines (182 loc) · 6.79 KB
/
2x128_CNN.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import os,random
os.environ["KERAS_BACKEND"] = "tensorflow"
import math
import numpy as np
import matplotlib.pyplot as plt
import cPickle, random, sys, keras
from keras.utils import np_utils
import keras.models as models
from keras.layers.core import Reshape,Dense,Dropout,Activation,Flatten
from keras.layers.noise import GaussianNoise
from keras.layers.convolutional import Conv2D, MaxPooling2D, ZeroPadding2D
from keras.regularizers import *
from keras.optimizers import adam
# Load the dataset
# from a certain local path
Xd = cPickle.load(open("/Users/guanyuchen/Desktop/CORNELLACA/INFO 5901 Fall/MPS_Project/RML2016.10a_dict.dat",'rb'))
print("Dataset imported")
snrs,mods = map(lambda j: sorted(list(set(map(lambda x: x[j], Xd.keys())))), [1,0])
X = []
lbl = []
for mod in mods:
for snr in snrs:
X.append(Xd[(mod,snr)])
for i in range(Xd[(mod,snr)].shape[0]):
lbl.append((mod,snr))
X = np.vstack(X)
# For dataset RML2016.10a_dict, we should have data size 220000*2*128
print "Dataset formatted into shape: ",X.shape
# print out the snrs and mods
print "Dataset with SNRs: ",snrs
print "Dataset with Modulations: ",mods
print "Data prepared"
# Partition the data
# into training and test sets of the form we can train/test on
# while keeping SNR and Mod labels handy for each
np.random.seed(2017)
n_examples = X.shape[0]
n_train = int(n_examples * 0.5)
train_idx = np.random.choice(range(0,n_examples), size=n_train, replace=False)
test_idx = list(set(range(0,n_examples))-set(train_idx))
X_train = X[train_idx]
X_test = X[test_idx]
def to_onehot(yy):
yy1 = np.zeros([len(yy), max(yy)+1])
yy1[np.arange(len(yy)),yy] = 1
return yy1
Y_train = to_onehot(map(lambda x: mods.index(lbl[x][0]), train_idx))
Y_test = to_onehot(map(lambda x: mods.index(lbl[x][0]), test_idx))
in_shp = list(X_train.shape[1:])
# print X_train.shape, in_shp
classes = mods
# Build the lite version VGG model using Keras primitives --
# - Reshape [N,2,128] to [N,1,2,128] on input
# - Pass through 4 2DConv/ReLu layers
# - Pass through 2 Dense layers (ReLu and Softmax)
# - Perform categorical cross entropy optimization
# Set up some params
nb_epoch = 100 # number of epochs to train on
batch_size = 1024 # training batch size
dr = 0.5 # dropout rate (%)
'''
# An update and fix for the original model
model = models.Sequential()
model.add(Reshape(in_shp+[1], input_shape=in_shp))
model.add(ZeroPadding2D((0,2)))
model.add(Conv2D(256, (1,3), activation="relu"))
model.add(Dropout(dr))
model.add(ZeroPadding2D((0,2)))
model.add(Conv2D(80, (2,3), activation="relu"))
model.add(Dropout(dr))
model.add(Flatten())
model.add(Dense(256, activation='relu', name="dense1"))
model.add(Dropout(dr))
model.add(Dense( len(classes), name="dense2" ))
model.add(Activation('softmax'))
model.add(Reshape([len(classes)]))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.summary()
'''
# build the CNN model
model = models.Sequential()
model.add(Reshape(in_shp+[1], input_shape=in_shp))
model.add(ZeroPadding2D((0,2)))
model.add(Conv2D(64, (1,4), activation="relu"))
model.add(Dropout(dr))
model.add(ZeroPadding2D((0,2)))
model.add(Conv2D(64, (2,4), activation="relu"))
model.add(Dropout(dr))
model.add(Conv2D(128, (1,8), activation="relu"))
model.add(Dropout(dr))
model.add(Conv2D(128, (1,8), activation="relu"))
model.add(Dropout(dr))
model.add(Flatten())
model.add(Dense(256, activation='relu'))
model.add(Dropout(dr))
model.add(Dense(len(classes), activation='softmax'))
model.add(Reshape([len(classes)]))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.summary()
# Train the dataset
# and store the weights
filepath = 'weight_4layers.wts.h5'
history = model.fit(X_train,
Y_train,
batch_size=batch_size,
nb_epoch=nb_epoch,
verbose=1,
validation_data=(X_test, Y_test),
callbacks = [
keras.callbacks.ModelCheckpoint(filepath, monitor='val_loss', verbose=0, save_best_only=True, mode='auto'),
keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, verbose=0, mode='auto')
])
# Re-load the best weights once training is finished
model.load_weights('/Users/guanyuchen/Desktop/Github/RF-Signal-Model/weight_4layers.wts.h5')
# Show simple version of performance
score = model.evaluate(X_test, Y_test, verbose=0, batch_size=batch_size)
print "Validation Loss and Accuracy: ",score
'''
# Optional: show analysis graphs
plt.figure()
plt.title('Training performance')
plt.plot(history.epoch, history.history['loss'], label='train loss+error')
plt.plot(history.epoch, history.history['val_loss'], label='val_error')
plt.legend()
plt.show()
'''
# helper method to plot the confusion matrix
def plot_confusion_matrix(cm, title='Confusion matrix', cmap=plt.cm.Blues, labels=[]):
plt.imshow(cm, interpolation='nearest', cmap=cmap)
plt.title(title)
plt.colorbar()
tick_marks = np.arange(len(labels))
plt.xticks(tick_marks, labels, rotation=45)
plt.yticks(tick_marks, labels)
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()
# Plot confusion matrix for the whole dataset
test_Y_hat = model.predict(X_test, batch_size=batch_size)
conf = np.zeros([len(classes),len(classes)])
confnorm = np.zeros([len(classes),len(classes)])
for i in range(0,X_test.shape[0]):
j = list(Y_test[i,:]).index(1)
k = int(np.argmax(test_Y_hat[i,:]))
conf[j,k] = conf[j,k] + 1
for i in range(0,len(classes)):
confnorm[i,:] = conf[i,:] / np.sum(conf[i,:])
plot_confusion_matrix(confnorm, labels=classes)
# Accuracy and confusion matrix for data with each SNR
acc = {}
for snr in snrs:
# extract classes @ SNR
test_SNRs = map(lambda x: lbl[x][1], test_idx)
test_X_i = X_test[np.where(np.array(test_SNRs)==snr)]
test_Y_i = Y_test[np.where(np.array(test_SNRs)==snr)]
# estimate classes
test_Y_i_hat = model.predict(test_X_i)
conf = np.zeros([len(classes),len(classes)])
confnorm = np.zeros([len(classes),len(classes)])
for i in range(0,test_X_i.shape[0]):
j = list(test_Y_i[i,:]).index(1)
k = int(np.argmax(test_Y_i_hat[i,:]))
conf[j,k] = conf[j,k] + 1
for i in range(0,len(classes)):
confnorm[i,:] = conf[i,:] / np.sum(conf[i,:])
#plt.figure()
#plot_confusion_matrix(confnorm, labels=classes, title="ConvNet Confusion Matrix (SNR=%d)"%(snr))
cor = np.sum(np.diag(conf))
ncor = np.sum(conf) - cor
print "SNR: ",snr, " Overall Accuracy: ", cor / (cor + ncor)
acc[snr] = 1.0 * cor / (cor + ncor)
# Save results to a pickle file for plotting later
print acc
fd = open('results_cnn_d0.5.dat','wb')
cPickle.dump( ("CNN", 0.5, acc) , fd )
# Plot accuracy curve
plt.plot(snrs, map(lambda x: acc[x], snrs))
plt.xlabel("Signal to Noise Ratio")
plt.ylabel("Classification Accuracy")
plt.title("New Model Classification Accuracy on RadioML 2016.10 Alpha")
plt.show()