-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathanalysis_abnormal_detection.py
120 lines (92 loc) · 3.19 KB
/
analysis_abnormal_detection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""
Anomaly Detection (ad) Using hp filter and mad test
liubenyuan <liubenyuan@gmail.com>
"""
import numpy as np
import pandas as pd
from scipy import sparse, stats
import matplotlib.pyplot as plt
from datetime import datetime
import os
import configparser
parser = configparser.ConfigParser()
parser.read('config.ini')
current_dir = os.path.dirname(os.path.realpath(__file__))
stock_symbol = parser.get('general_settings','stock_symbol')
base_dir = parser.get('directory','base_dir')
in_dir = parser.get('directory','company_stock_marketprice_baseprice_prefilter')
out_dir = parser.get('directory','company_stock_marketprice_processed')
df= pd.read_csv(current_dir+"/"+base_dir+"/"+in_dir+"/"+in_dir+'_'+stock_symbol+'.csv')
data = pd.DataFrame({'date': df['date'].map(lambda x: datetime.strptime(str(x),'%Y-%m-%d'))})
# Hodrick Prescott filter
def hp_filter(x, lamb=5000):
w = len(x)
b = [[1]*w, [-2]*w, [1]*w]
D = sparse.spdiags(b, [0, 1, 2], w-2, w)
I = sparse.eye(w)
B = (I + lamb*(D.transpose()*D))
return sparse.linalg.dsolve.spsolve(B, x)
def mad(data, axis=None):
return np.mean(np.abs(data - np.mean(data, axis)), axis)
def AnomalyDetection(x, alpha=0.2, lamb=5000):
"""
x : pd.Series
alpha : The level of statistical significance with which to
accept or reject anomalies. (expon distribution)
lamb : penalize parameter for hp filter
return r : Data frame containing the index of anomaly
"""
# calculate residual
xhat = hp_filter(x, lamb=lamb)
resid = x - xhat
# drop NA values
ds = pd.Series(resid)
ds = ds.dropna()
# Remove the seasonal and trend component,
# and the median of the data to create the univariate remainder
md = np.median(x)
data = ds - md
# process data, using median filter
ares = (data - data.median()).abs()
data_sigma = data.mad() + 1e-12
ares = ares/data_sigma
# compute significance
p = 1. - alpha
R = stats.expon.interval(p, loc=ares.mean(), scale=ares.std())
threshold = R[1]
# extract index, np.argwhere(ares > md).ravel()
r_id = ares.index[ares > threshold]
return r_id
# demo
def main():
# fix
np.random.seed(42)
# sample signals
N = 1024 # number of sample points
t = data['date']
y = df['close']
# outliers are assumed to be step/jump events at sampling points
M = 3 # number of outliers
for ii, vv in zip(np.random.rand(M)*N, np.random.randn(M)):
y[int(ii):] += vv
# detect anomaly
r_idx = AnomalyDetection(y, alpha=0.1)
#for x in range(0,len(r_idx)):
# print('Abnormality Detect on '+str(data['date'][r_idx[x]]))
# plot the result
plt.xlabel('Date')
#plt.plot(y, 'b-')
plt.plot(data['date'][r_idx], y[r_idx], 'ro')
plt.twinx()
plt.ylabel("Price")
plt.tick_params(axis="y")
plt.ylabel("Price")
plt.plot(data['date'], df['close'], "b-", linewidth=1)
plt.title(stock_symbol+" Anomaly Graph")
#plt.show()
plt.savefig(current_dir+"/"+base_dir+"/"+out_dir+'/'+stock_symbol+"/"+out_dir+'_anomaly_'+stock_symbol+'.png')
plt.show(block=False)
plt.pause(6)
plt.close()
if __name__ == "__main__":
main()