forked from rkp8000/seq_speak
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathntwk.py
468 lines (359 loc) · 15.6 KB
/
ntwk.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
from copy import deepcopy
import numpy as np
from scipy.sparse import csc_matrix
import os
import time
from aux import Generic
# CONNECTIVITY
def join_w(targs, srcs, ws):
"""
Combine multiple weight matrices specific to pairs of populations
into a single, full set of weight matrices (one per synapse type).
:param targs: dict of boolean masks indicating targ cell classes
:param srcs: dict of boolean masks indicating source cell classes
:param ws: dict of inter-population weight matrices, e.g.:
ws = {
'AMPA': {
('EXC', 'EXC'): np.array([[...]]),
('INH', 'EXC'): np.array([[...]]),
},
'GABA': {
('EXC', 'INH'): np.array([[...]]),
('INH', 'INH'): np.array([[...]]),
}
}
note: keys given as (targ, src)
:return: ws_full, a dict of full ws, one per synapse
"""
# convert targs/srcs to dicts if given as arrays
if not isinstance(targs, dict):
targs_ = deepcopy(targs)
targs = {
cell_type: targs_ == cell_type for cell_type in set(targs_)
}
if not isinstance(srcs, dict):
srcs_ = deepcopy(srcs)
srcs = {
cell_type: srcs_ == cell_type for cell_type in set(srcs_)
}
# make sure all targ/src masks have same shape
targ_shapes = [mask.shape for mask in targs.values()]
src_shapes = [mask.shape for mask in srcs.values()]
if len(set(targ_shapes)) > 1:
raise Exception('All targ masks must have same shape.')
if len(set(src_shapes)) > 1:
raise Exception('All targ masks must have same shape.')
n_targ = targ_shapes[0][0]
n_src = src_shapes[0][0]
# make sure weight matrix dimensions match sizes
# of targ/src classes
for syn, ws_ in ws.items():
for (targ, src), w_ in ws_.items():
if not w_.shape == (targs[targ].sum(), srcs[src].sum()):
raise Exception(
'Weight matrix for {}: ({}, {}) does not match '
'dimensionality specified by targ/src masks.')
# loop through synapse types
dtype = list(list(ws.values())[0].values())[0].dtype
ws_full = {}
for syn, ws_ in ws.items():
w = np.zeros((n_targ, n_src), dtype=dtype)
# loop through population pairs
for (targ, src), w_ in ws_.items():
# get mask of all cxns from src to targ
mask = np.outer(targs[targ], srcs[src])
assert mask.sum() == w_.size
w[mask] = w_.flatten()
ws_full[syn] = w
return ws_full
# NETWORK CLASS AND FUNCTIONS
class LIFNtwk(object):
"""
Network of leaky integrate-and-fire (LIF) neurons.
All parameters should be given in SI units
(i.e., time constants in seconds, potentials in volts).
This simulation uses exponential
synapses for all synapse types.
In all weight matrices, rows index target, cols index source.
:param t_m: membrane time constant (or 1D array)
:param e_l: leak reversal potential (or 1D array)
:param v_th: firing threshold potential (or 1D array)
:param v_r: reset potential (or 1D array)
:param t_r: refractory time
:param es_syn: synaptic reversal potentials (dict with keys naming
synapse types, e.g., 'AMPA', 'NMDA', ...)
:param ts_syn: synaptic time constants (dict)
:param ws_rcr: recurrent synaptic weight matrices (dict with keys
naming synapse types)
:param ws_up: input synaptic weight matrices from upstream inputs (dict)
:param sparse: whether to convert weight matrices to sparse matrices for
more efficient processing
"""
def __init__(self,
t_m, e_l, v_th, v_r, t_r,
es_syn=None, ts_syn=None, ws_up=None, ws_rcr=None,
sparse=True):
"""Constructor."""
# validate arguments
if es_syn is None:
es_syn = {}
if ts_syn is None:
ts_syn = {}
if ws_up is None:
ws_up = {}
if ws_rcr is None:
ws_rcr = {}
self.syns = list(es_syn.keys())
# check weight matrices have correct dims
shape_rcr = list(ws_rcr.values())[0].shape
shape_up = list(ws_up.values())[0].shape
self.n = shape_rcr[1]
# fill in unspecified weight matrices with zeros
for syn in self.syns:
if syn not in ws_rcr:
ws_rcr[syn] = np.zeros(shape_rcr)
if syn not in ws_up:
ws_up[syn] = np.zeros(shape_up)
# check syn. dicts have same keys
if not set(es_syn) == set(ts_syn) == set(ws_rcr) == set(ws_up):
raise ValueError(
'All synaptic dicts ("es_syn", "ts_syn", '
'"ws_rcr", "ws_inp") must have same keys.'
)
if not all([w.shape[0] == w.shape[1] == self.n for w in ws_rcr.values()]):
raise ValueError('All recurrent weight matrices must be square.')
# check input matrices' have correct dims
self.n_up = list(ws_up.values())[0].shape[1]
if not all([w.shape[0] == self.n for w in ws_up.values()]):
raise ValueError(
'Upstream weight matrices must have one row per neuron.')
if not all([w.shape[1] == self.n_up for w in ws_up.values()]):
raise ValueError(
'All upstream weight matrices must have same number of columns.')
# make sure v_r is actually an array
if isinstance(v_r, (int, float, complex)):
v_r = v_r * np.ones(self.n)
# store network params
self.t_m = t_m
self.e_l = e_l
self.v_th = v_th
self.v_r = v_r
self.t_r = t_r
self.es_syn = es_syn
self.ts_syn = ts_syn
if sparse:
ws_rcr = {syn: csc_matrix(w) for syn, w in ws_rcr.items()}
ws_up = {syn: csc_matrix(w) for syn, w in ws_up.items()}
self.ws_rcr = ws_rcr
self.ws_up = ws_up
def run(self, spks_up, dt, vs_0=None, gs_0=None, i_ext=None, store=None, report_every=None):
"""
Run a simulation of the network.
:param spks_up: upstream spiking inputs (rows are time points,
cols are neurons) (should be non-negative integers)
:param dt: integration time step for dynamics simulation
:param vs_0: initial vs
:param gs_0: initial gs (dict of 1-D arrays)
are time points, cols are neurons)
:return: network response object
"""
# validate arguments
if vs_0 is None:
vs_0 = self.e_l * np.ones(self.n)
if gs_0 is None:
gs_0 = {syn: np.zeros(self.n) for syn in self.syns}
if type(spks_up) != np.ndarray or spks_up.ndim != 2:
raise TypeError('"inps_upstream" must be a 2D array.')
if not spks_up.shape[1] == self.n_up:
raise ValueError(
'Upstream input size does not match size of input weight matrix.')
if not vs_0.shape == (self.n,):
raise ValueError(
'"vs_0" must be 1-D array with one element per neuron.')
if not all([gs.shape == (self.n,) for gs in gs_0.values()]):
raise ValueError(
'All elements of "gs_0" should be 1-D array with '
'one element per neuron.')
if store is None:
store = {}
if 'vs' not in store:
store['vs'] = np.float64
if 'spks' not in store:
store['spks'] = bool
if 'gs' not in store:
store['gs'] = np.float64
for key, val in store.items():
if key == 'vs':
assert val in (None, float, np.float16, np.float64)
elif key == 'gs':
assert val in (None, float, np.float16, np.float64)
elif key == 'spks':
assert val in (None, bool)
# prepare smln
ts = np.arange(len(spks_up)) * dt
# initialize membrane potentials, conductances, and refractory counters
vs_prev = vs_0.copy()
spks_prev = np.zeros(vs_0.shape, dtype=bool)
gs_prev = {syn: gs_0[syn].copy() for syn in self.syns}
rp_ctrs = np.zeros(self.n)
# allocate space for slmn results and store initial values
sim_shape = (len(ts), self.n)
vs = None
spks = None
gs = None
if (i_ext is None):
i_ext = np.zeros(len(ts))
if store['vs'] is not None:
vs = np.nan * np.zeros(sim_shape, dtype=store['vs'])
vs[0, :] = vs_prev.copy()
if store['spks'] is not None:
spks = np.zeros(sim_shape, dtype=bool)
spks[0, :] = spks_prev.copy()
if store['gs'] is not None:
gs = {
syn: np.nan * np.zeros(sim_shape, dtype=store['gs'])
for syn in self.syns
}
for syn in self.syns:
gs[syn][0, :] = gs_0[syn].copy()
# run simulation
smln_start_time = time.time()
last_update = time.time()
for step in range(1, len(ts)):
## update dynamics
for syn in self.syns:
# calculate new conductances for all synapse types
w_up = self.ws_up[syn]
w_rcr = self.ws_rcr[syn]
t_syn = self.ts_syn[syn]
# calculate upstream and recurrent inputs to conductances
inps_up = w_up.dot(spks_up[step])
inps_rcr = w_rcr.dot(spks_prev)
# decay conductances and add any positive inputs
dg = -(dt/t_syn) * gs_prev[syn] + inps_up + inps_rcr
gs_prev[syn] = gs_prev[syn] + dg
# calculate current input resulting from synaptic conductances
## note: conductances are relative, so is_g are in volts
is_g = [
gs_prev[syn] * (self.es_syn[syn] - vs_prev)
for syn in self.syns
]
# get total input current
is_all = np.sum(is_g, axis=0) + i_ext[step]
# update membrane potential
dvs = -(dt/self.t_m) * (vs_prev - self.e_l) + is_all
vs_prev = vs_prev + dvs
# force refractory neurons to reset potential
vs_prev[rp_ctrs > 0] = self.v_r[rp_ctrs > 0]
# identify spks
spks_prev = vs_prev >= self.v_th
# reset membrane potentials of spiking neurons
vs_prev[spks_prev] = self.v_r[spks_prev]
# set refractory counters for spiking neurons
rp_ctrs[spks_prev] = self.t_r[spks_prev]
# decrement refractory counters for all neurons
rp_ctrs -= dt
# adjust negative refractory counters up to zero
rp_ctrs[rp_ctrs < 0] = 0
# store vs
if store['vs'] is not None:
vs[step] = vs_prev.copy()
# store spks
if store['spks'] is not None:
spks[step] = spks_prev.copy()
# store conductances
if store['gs'] is not None:
for syn in self.syns:
gs[syn][step] = gs_prev[syn].copy()
if report_every is not None:
if time.time() > last_update + report_every:
print('{0}/{1} steps completed after {2:.3f} s...'.format(
step + 1, len(ts), time.time() - smln_start_time))
last_update = time.time()
# return NtwkResponse object
return NtwkResponse(
ts=ts, vs=vs, spks=spks, spks_up=spks_up, dt=dt, i_ext=i_ext, e_l=self.e_l, v_th=self.v_th,
gs=gs, ws_rcr=self.ws_rcr, ws_up=self.ws_up)
class NtwkResponse(object):
"""
Class for storing network response parameters.
:param ts: timestamp vector
:param vs: membrane potentials
:param spks: spk times
:param gs: syn-dict of conductances
:param ws_rcr: syn-dict of recurrent weight matrices
:param ws_up: syn-dict upstream weight matrices
:param cell_types: array-like of cell-types
:param cs: spk ctr variables for each cell
:param ws_plastic: syn-dict of time-courses of plastic weights
:param masks_plastic: syn-dict of masks specifying which weights the plastic
ones correspond to
:param pfcs: array of cell place field centers
"""
def __init__(self, ts, vs, spks, spks_up, dt, i_ext, e_l, v_th, gs, ws_rcr, ws_up, cell_types=None, pfcs=None):
"""Constructor."""
# check args
if (cell_types is not None) and (len(cell_types) != vs.shape[1]):
raise ValueError(
'If "cell_types" is provided, all cells must have a type.')
self.ts = ts
self.vs = vs
self.spks = spks
self.spks_up = spks_up
self.dt = dt
self.i_ext = i_ext
self.e_l = e_l
self.v_th = v_th
self.gs = gs
self.ws_rcr = ws_rcr
self.ws_up = ws_up
self.cell_types = cell_types
self.dt = np.mean(np.diff(ts))
self.fs = 1 / self.dt
@property
def n(self):
"""Number of neurons."""
return self.vs.shape[1]
class SimpleNtwk(object):
"""Simplified network model."""
def __init__(self, w, ltp_ie, max_active, v_th, rp):
assert w.shape[0] == w.shape[1]
assert len(ltp_ie) == w.shape[0]
assert v_th > 0
assert max_active <= w.shape[0]
self.w = w
self.ltp_ie = ltp_ie
self.max_active = max_active
self.v_th = v_th
self.rp = int(rp)
self.n = w.shape[0]
def run(self, inp_ext, inp_g):
n_t = len(inp_ext)
v = np.zeros((n_t, self.n), dtype=float)
spks = np.zeros((n_t, self.n), dtype=bool)
rp = np.zeros(self.n, dtype=int)
spks_prev = np.zeros(self.n, dtype=bool)
for t_ctr, inp_ext_ in enumerate(inp_ext):
# compute total inputs
# (recurrent + g + external)
v_ = self.w.dot(spks_prev) + inp_g*(1+self.ltp_ie) + inp_ext_
# set nrns in refrac period to 0
v_[rp > 0] = 0
# decrement refractory period
rp = np.clip(rp-1, 0, np.inf)
# get tmp variable containing only max_active voltages
v_max_only = np.zeros(self.n)
idxs_most_active = np.argsort(v_)[-self.max_active:]
v_max_only[idxs_most_active] = v_[idxs_most_active]
# convert to spks
spks_ = v_max_only >= self.v_th
#spks_ = np.zeros(self.n, dtype=bool)
#if np.any(v_max_only >= self.v_th):
# spks_[idxs_most_active] = True
# reset refractory period for spking nrns
rp[spks_] = self.rp
# store everything
v[t_ctr] = v_.copy()
spks[t_ctr] = spks_.copy()
spks_prev = spks_.copy()
return Generic(t=np.arange(len(inp_ext)), v=v, spks=spks)