-
Notifications
You must be signed in to change notification settings - Fork 32
/
Copy pathGMM.py
129 lines (114 loc) · 4.38 KB
/
GMM.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
"""
@FileName: GMM.py
@Description: Implement GMM
@Author: Ryuk
@CreateDate: 2021/05/30
@LastEditTime: 2021/05/30
@LastEditors: Please set LastEditors
@Version: v0.1
"""
import numpy as np
import pickle
import preProcess
from tqdm import tqdm
from scipy.stats import multivariate_normal
import matplotlib.pyplot as plt
class GaussianMixtureModel:
def __init__(self, K, D=2, iterations=100, norm_type="Normalization"):
self.norm_type = norm_type
self.iterations = iterations
self.K = K
self.D = D
self.N = 0
self.alpha = np.random.dirichlet(np.ones(self.K))
self.mu = np.random.rand(K, D)
self.sigma = np.array([np.eye(self.D)] * K)
self.gamma = None
self.label = None
'''
Function: GaussianPDF
Description: generate gaussian distribution with given mu, sigma and x
Input: mu dataType: ndarray description: features
Input: sigma dataType: ndarray description: features
Input: x dataType: ndarray description: features
Output: self dataType: obj description: the trained model
'''
def GaussianPDF(self, mu, sigma, x):
gaussian = multivariate_normal(mu, sigma)
return gaussian.pdf(x)
'''
Function: train
Description: train the model
Input: train_data dataType: ndarray description: features
Output: self dataType: obj description: the trained model
'''
def train(self, train_data, plotResult=True):
self.N = len(train_data)
self.gamma = np.zeros([self.N, self.K])
# if self.norm_type == "Standardization":
# train_data = preProcess.Standardization(train_data)
# else:
# train_data = preProcess.Normalization(train_data)
for i in tqdm(range(self.iterations)):
# E-step
for k in range(self.K):
self.gamma[:,k] = self.GaussianPDF(self.mu[k], self.sigma[k], train_data)
for j in range(self.N):
self.gamma[j,:] = self.gamma[j,:] / np.sum(self.gamma[j,:])
# M-step
for k in range(self.K):
gamma_sum = np.sum(self.gamma[:,k])
self.mu[k] = np.sum(np.dot(self.gamma[None,:, k], train_data), axis=0) / gamma_sum
self.sigma[k] = (train_data - self.mu[k]).T * np.multiply(np.mat(train_data - self.mu[k]), np.mat(self.gamma[:, k]).T) / gamma_sum
self.alpha[k] = gamma_sum / self.N
self.label = np.argmax(self.gamma, axis=1)
if plotResult:
self.plotResult(train_data)
return self.label
'''
Function: predict
Description: predict the test data
Input: test_data dataType: ndarray description: features
Output: label dataType: ndarray description: the predicted label
'''
def predict(self, test_data):
self.N = len(test_data)
self.gamma = np.zeros([self.N, self.K])
for k in range(self.K):
gamma_sum = np.sum(self.gamma[:,k])
self.mu[k] = np.sum(np.dot(self.gamma[None,:, k], test_data), axis=0) / gamma_sum
self.sigma[k] = (test_data - self.mu[k]).T * np.multiply(np.mat(test_data - self.mu[k]), np.mat(self.gamma[:, k]).T) / gamma_sum
self.alpha[k] = gamma_sum / self.N
self.label = np.argmax(self.gamma, axis=1)
return self.label
'''
Function: plotResult
Description: show the clustering result
'''
def plotResult(self, train_data):
plt.scatter(train_data[:, 0], train_data[:, 1], c=self.label)
plt.title('GMM')
plt.show()
'''
Function: save
Description: save the model as pkl
Input: filename dataType: str description: the path to save model
'''
def save(self, filename):
f = open(filename, 'w')
model = {'alpha': self.alpha, 'mu': self.mu, 'sigma': self.sigma}
pickle.dump(model, f)
f.close()
'''
Function: load
Description: load the model
Input: filename dataType: str description: the path to save model
Output: self dataType: obj description: the trained model
'''
def load(self, filename):
f = open(filename)
model = pickle.load(f)
self.alpha = model['alpha']
self.mu = model['mu']
self.sigma = model['sigma']
return self