-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathutils.py
275 lines (242 loc) · 10.4 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
import numpy as np
from scipy.special import expit as sigmoid
import igraph as ig
import random
def set_random_seed(seed):
random.seed(seed)
np.random.seed(seed)
def is_dag(W):
G = ig.Graph.Weighted_Adjacency(W.tolist())
return G.is_dag()
def simulate_dag(d, s0, graph_type):
"""Simulate random DAG with some expected number of edges.
Args:
d (int): num of nodes
s0 (int): expected num of edges
graph_type (str): ER, SF, BP
Returns:
B (np.ndarray): [d, d] binary adj matrix of DAG
"""
def _random_permutation(M):
# np.random.permutation permutes first axis only
P = np.random.permutation(np.eye(M.shape[0]))
return P.T @ M @ P
def _random_acyclic_orientation(B_und):
return np.tril(_random_permutation(B_und), k=-1)
def _graph_to_adjmat(G):
return np.array(G.get_adjacency().data)
if graph_type == 'ER':
# Erdos-Renyi
G_und = ig.Graph.Erdos_Renyi(n=d, m=s0)
B_und = _graph_to_adjmat(G_und)
B = _random_acyclic_orientation(B_und)
elif graph_type == 'SF':
# Scale-free, Barabasi-Albert
G = ig.Graph.Barabasi(n=d, m=int(round(s0 / d)), directed=True)
B = _graph_to_adjmat(G)
elif graph_type == 'BP':
# Bipartite, Sec 4.1 of (Gu, Fu, Zhou, 2018)
top = int(0.2 * d)
G = ig.Graph.Random_Bipartite(top, d - top, m=s0, directed=True, neimode=ig.OUT)
B = _graph_to_adjmat(G)
else:
raise ValueError('unknown graph type')
B_perm = _random_permutation(B)
assert ig.Graph.Adjacency(B_perm.tolist()).is_dag()
return B_perm
def simulate_parameter(B, w_ranges=((-2.0, -0.5), (0.5, 2.0))):
"""Simulate SEM parameters for a DAG.
Args:
B (np.ndarray): [d, d] binary adj matrix of DAG
w_ranges (tuple): disjoint weight ranges
Returns:
W (np.ndarray): [d, d] weighted adj matrix of DAG
"""
W = np.zeros(B.shape)
S = np.random.randint(len(w_ranges), size=B.shape) # which range
for i, (low, high) in enumerate(w_ranges):
U = np.random.uniform(low=low, high=high, size=B.shape)
W += B * (S == i) * U
return W
def simulate_linear_sem(W, n, sem_type, noise_scale=None):
"""Simulate samples from linear SEM with specified type of noise.
For uniform, noise z ~ uniform(-a, a), where a = noise_scale.
Args:
W (np.ndarray): [d, d] weighted adj matrix of DAG
n (int): num of samples, n=inf mimics population risk
sem_type (str): gauss, exp, gumbel, uniform, logistic, poisson
noise_scale (np.ndarray): scale parameter of additive noise, default all ones
Returns:
X (np.ndarray): [n, d] sample matrix, [d, d] if n=inf
"""
def _simulate_single_equation(X, w, scale):
"""X: [n, num of parents], w: [num of parents], x: [n]"""
if sem_type == 'gauss':
z = np.random.normal(scale=scale, size=n)
x = X @ w + z
elif sem_type == 'exp':
z = np.random.exponential(scale=scale, size=n)
x = X @ w + z
elif sem_type == 'gumbel':
z = np.random.gumbel(scale=scale, size=n)
x = X @ w + z
elif sem_type == 'uniform':
z = np.random.uniform(low=-scale, high=scale, size=n)
x = X @ w + z
elif sem_type == 'logistic':
x = np.random.binomial(1, sigmoid(X @ w)) * 1.0
elif sem_type == 'poisson':
x = np.random.poisson(np.exp(X @ w)) * 1.0
else:
raise ValueError('unknown sem type')
return x
d = W.shape[0]
if noise_scale is None:
scale_vec = np.ones(d)
elif np.isscalar(noise_scale):
scale_vec = noise_scale * np.ones(d)
else:
if len(noise_scale) != d:
raise ValueError('noise scale must be a scalar or has length d')
scale_vec = noise_scale
if not is_dag(W):
raise ValueError('W must be a DAG')
if np.isinf(n): # population risk for linear gauss SEM
if sem_type == 'gauss':
# make 1/d X'X = true cov
X = np.sqrt(d) * np.diag(scale_vec) @ np.linalg.inv(np.eye(d) - W)
return X
else:
raise ValueError('population risk not available')
# empirical risk
G = ig.Graph.Weighted_Adjacency(W.tolist())
ordered_vertices = G.topological_sorting()
assert len(ordered_vertices) == d
X = np.zeros([n, d])
for j in ordered_vertices:
parents = G.neighbors(j, mode=ig.IN)
X[:, j] = _simulate_single_equation(X[:, parents], W[parents, j], scale_vec[j])
return X
def simulate_nonlinear_sem(B, n, sem_type, noise_scale=None):
"""Simulate samples from nonlinear SEM.
Args:
B (np.ndarray): [d, d] binary adj matrix of DAG
n (int): num of samples
sem_type (str): mlp, mim, gp, gp-add
noise_scale (np.ndarray): scale parameter of additive noise, default all ones
Returns:
X (np.ndarray): [n, d] sample matrix
"""
def _simulate_single_equation(X, scale):
"""X: [n, num of parents], x: [n]"""
z = np.random.normal(scale=scale, size=n)
pa_size = X.shape[1]
if pa_size == 0:
return z
if sem_type == 'mlp':
hidden = 100
W1 = np.random.uniform(low=0.5, high=2.0, size=[pa_size, hidden])
W1[np.random.rand(*W1.shape) < 0.5] *= -1
W2 = np.random.uniform(low=0.5, high=2.0, size=hidden)
W2[np.random.rand(hidden) < 0.5] *= -1
x = sigmoid(X @ W1) @ W2 + z
elif sem_type == 'mim':
w1 = np.random.uniform(low=0.5, high=2.0, size=pa_size)
w1[np.random.rand(pa_size) < 0.5] *= -1
w2 = np.random.uniform(low=0.5, high=2.0, size=pa_size)
w2[np.random.rand(pa_size) < 0.5] *= -1
w3 = np.random.uniform(low=0.5, high=2.0, size=pa_size)
w3[np.random.rand(pa_size) < 0.5] *= -1
x = np.tanh(X @ w1) + np.cos(X @ w2) + np.sin(X @ w3) + z
elif sem_type == 'gp':
from sklearn.gaussian_process import GaussianProcessRegressor
gp = GaussianProcessRegressor()
x = gp.sample_y(X, random_state=None).flatten() + z
elif sem_type == 'gp-add':
from sklearn.gaussian_process import GaussianProcessRegressor
gp = GaussianProcessRegressor()
x = sum([gp.sample_y(X[:, i, None], random_state=None).flatten()
for i in range(X.shape[1])]) + z
else:
raise ValueError('unknown sem type')
return x
d = B.shape[0]
scale_vec = noise_scale if noise_scale else np.ones(d)
X = np.zeros([n, d])
G = ig.Graph.Adjacency(B.tolist())
ordered_vertices = G.topological_sorting()
assert len(ordered_vertices) == d
for j in ordered_vertices:
parents = G.neighbors(j, mode=ig.IN)
X[:, j] = _simulate_single_equation(X[:, parents], scale_vec[j])
return X
def count_accuracy(B_true, B_est):
"""Compute various accuracy metrics for B_est.
true positive = predicted association exists in condition in correct direction
reverse = predicted association exists in condition in opposite direction
false positive = predicted association does not exist in condition
Args:
B_true (np.ndarray): [d, d] ground truth graph, {0, 1}
B_est (np.ndarray): [d, d] estimate, {0, 1, -1}, -1 is undirected edge in CPDAG
Returns:
fdr: (reverse + false positive) / prediction positive
tpr: (true positive) / condition positive
fpr: (reverse + false positive) / condition negative
shd: undirected extra + undirected missing + reverse
nnz: prediction positive
"""
if (B_est == -1).any(): # cpdag
if not ((B_est == 0) | (B_est == 1) | (B_est == -1)).all():
raise ValueError('B_est should take value in {0,1,-1}')
if ((B_est == -1) & (B_est.T == -1)).any():
raise ValueError('undirected edge should only appear once')
else: # dag
if not ((B_est == 0) | (B_est == 1)).all():
raise ValueError('B_est should take value in {0,1}')
if not is_dag(B_est):
raise ValueError('B_est should be a DAG')
d = B_true.shape[0]
# linear index of nonzeros
pred_und = np.flatnonzero(B_est == -1)
pred = np.flatnonzero(B_est == 1)
cond = np.flatnonzero(B_true)
cond_reversed = np.flatnonzero(B_true.T)
cond_skeleton = np.concatenate([cond, cond_reversed])
# true pos
true_pos = np.intersect1d(pred, cond, assume_unique=True)
# treat undirected edge favorably
true_pos_und = np.intersect1d(pred_und, cond_skeleton, assume_unique=True)
true_pos = np.concatenate([true_pos, true_pos_und])
# false pos
false_pos = np.setdiff1d(pred, cond_skeleton, assume_unique=True)
false_pos_und = np.setdiff1d(pred_und, cond_skeleton, assume_unique=True)
false_pos = np.concatenate([false_pos, false_pos_und])
# reverse
extra = np.setdiff1d(pred, cond, assume_unique=True)
reverse = np.intersect1d(extra, cond_reversed, assume_unique=True)
# compute ratio
pred_size = len(pred) + len(pred_und)
cond_neg_size = 0.5 * d * (d - 1) - len(cond)
fdr = float(len(reverse) + len(false_pos)) / max(pred_size, 1)
tpr = float(len(true_pos)) / max(len(cond), 1)
fpr = float(len(reverse) + len(false_pos)) / max(cond_neg_size, 1)
# structural hamming distance
pred_lower = np.flatnonzero(np.tril(B_est + B_est.T))
cond_lower = np.flatnonzero(np.tril(B_true + B_true.T))
extra_lower = np.setdiff1d(pred_lower, cond_lower, assume_unique=True)
missing_lower = np.setdiff1d(cond_lower, pred_lower, assume_unique=True)
shd = len(extra_lower) + len(missing_lower) + len(reverse)
return {'fdr': round(fdr,6), 'tpr': round(tpr,6), 'fpr': round(fpr,6), 'shd': shd, 'nnz': pred_size}
def CPDAGtoDAG(B):
B_DAG = np.zeros_like(B)
B0 = np.multiply(np.triu(B, k=1),np.tril(B,k=-1).T)
index1 = np.transpose(np.where(B0==-1))
for i,j in index1:
if B[i,j] == -1:
B_DAG[i,j]=1
else:
B_DAG[j,i] = 1
index2 = np.transpose(np.where(B0 == 1))
for i,j in index2:
B_DAG[i,j] = -1
return B_DAG