forked from lightningnetwork/lnd
-
Notifications
You must be signed in to change notification settings - Fork 1
/
betweenness_centrality.go
265 lines (221 loc) · 6.35 KB
/
betweenness_centrality.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
package autopilot
import (
"fmt"
"sync"
)
// stack is a simple int stack to help with readability of Brandes'
// betweenness centrality implementation below.
type stack struct {
stack []int
}
func (s *stack) push(v int) {
s.stack = append(s.stack, v)
}
func (s *stack) top() int {
return s.stack[len(s.stack)-1]
}
func (s *stack) pop() {
s.stack = s.stack[:len(s.stack)-1]
}
func (s *stack) empty() bool {
return len(s.stack) == 0
}
// queue is a simple int queue to help with readability of Brandes'
// betweenness centrality implementation below.
type queue struct {
queue []int
}
func (q *queue) push(v int) {
q.queue = append(q.queue, v)
}
func (q *queue) front() int {
return q.queue[0]
}
func (q *queue) pop() {
q.queue = q.queue[1:]
}
func (q *queue) empty() bool {
return len(q.queue) == 0
}
// BetweennessCentrality is a NodeMetric that calculates node betweenness
// centrality using Brandes' algorithm. Betweenness centrality for each node
// is the number of shortest paths passing through that node, not counting
// shortest paths starting or ending at that node. This is a useful metric
// to measure control of individual nodes over the whole network.
type BetweennessCentrality struct {
// workers number of goroutines are used to parallelize
// centrality calculation.
workers int
// centrality stores original (not normalized) centrality values for
// each node in the graph.
centrality map[NodeID]float64
// min is the minimum centrality in the graph.
min float64
// max is the maximum centrality in the graph.
max float64
}
// NewBetweennessCentralityMetric creates a new BetweennessCentrality instance.
// Users can specify the number of workers to use for calculating centrality.
func NewBetweennessCentralityMetric(workers int) (*BetweennessCentrality, error) {
// There should be at least one worker.
if workers < 1 {
return nil, fmt.Errorf("workers must be positive")
}
return &BetweennessCentrality{
workers: workers,
}, nil
}
// Name returns the name of the metric.
func (bc *BetweennessCentrality) Name() string {
return "betweenness_centrality"
}
// betweennessCentrality is the core of Brandes' algorithm.
// We first calculate the shortest paths from the start node s to all other
// nodes with BFS, then update the betweenness centrality values by using
// Brandes' dependency trick.
// For detailed explanation please read:
// https://www.cl.cam.ac.uk/teaching/1617/MLRD/handbook/brandes.html
func betweennessCentrality(g *SimpleGraph, s int, centrality []float64) {
// pred[w] is the list of nodes that immediately precede w on a
// shortest path from s to t for each node t.
pred := make([][]int, len(g.Nodes))
// sigma[t] is the number of shortest paths between nodes s and t
// for each node t.
sigma := make([]int, len(g.Nodes))
sigma[s] = 1
// dist[t] holds the distance between s and t for each node t.
// We initialize this to -1 (meaning infinity) for each t != s.
dist := make([]int, len(g.Nodes))
for i := range dist {
dist[i] = -1
}
dist[s] = 0
var (
st stack
q queue
)
q.push(s)
// BFS to calculate the shortest paths (sigma and pred)
// from s to t for each node t.
for !q.empty() {
v := q.front()
q.pop()
st.push(v)
for _, w := range g.Adj[v] {
// If distance from s to w is infinity (-1)
// then set it and enqueue w.
if dist[w] < 0 {
dist[w] = dist[v] + 1
q.push(w)
}
// If w is on a shortest path the update
// sigma and add v to w's predecessor list.
if dist[w] == dist[v]+1 {
sigma[w] += sigma[v]
pred[w] = append(pred[w], v)
}
}
}
// delta[v] is the ratio of the shortest paths between s and t that go
// through v and the total number of shortest paths between s and t.
// If we have delta then the betweenness centrality is simply the sum
// of delta[w] for each w != s.
delta := make([]float64, len(g.Nodes))
for !st.empty() {
w := st.top()
st.pop()
// pred[w] is the list of nodes that immediately precede w on a
// shortest path from s.
for _, v := range pred[w] {
// Update delta using Brandes' equation.
delta[v] += (float64(sigma[v]) / float64(sigma[w])) * (1.0 + delta[w])
}
if w != s {
// As noted above centrality is simply the sum
// of delta[w] for each w != s.
centrality[w] += delta[w]
}
}
}
// Refresh recalculates and stores centrality values.
func (bc *BetweennessCentrality) Refresh(graph ChannelGraph) error {
cache, err := NewSimpleGraph(graph)
if err != nil {
return err
}
var wg sync.WaitGroup
work := make(chan int)
partials := make(chan []float64, bc.workers)
// Each worker will compute a partial result.
// This partial result is a sum of centrality updates
// on roughly N / workers nodes.
worker := func() {
defer wg.Done()
partial := make([]float64, len(cache.Nodes))
// Consume the next node, update centrality
// parital to avoid unnecessary synchronization.
for node := range work {
betweennessCentrality(cache, node, partial)
}
partials <- partial
}
// Now start the N workers.
wg.Add(bc.workers)
for i := 0; i < bc.workers; i++ {
go worker()
}
// Distribute work amongst workers.
// Should be fair when the graph is sufficiently large.
for node := range cache.Nodes {
work <- node
}
close(work)
wg.Wait()
close(partials)
// Collect and sum partials for final result.
centrality := make([]float64, len(cache.Nodes))
for partial := range partials {
for i := 0; i < len(partial); i++ {
centrality[i] += partial[i]
}
}
// Get min/max to be able to normalize
// centrality values between 0 and 1.
bc.min = 0
bc.max = 0
if len(centrality) > 0 {
for _, v := range centrality {
if v < bc.min {
bc.min = v
} else if v > bc.max {
bc.max = v
}
}
}
// Divide by two as this is an undirected graph.
bc.min /= 2.0
bc.max /= 2.0
bc.centrality = make(map[NodeID]float64)
for u, value := range centrality {
// Divide by two as this is an undirected graph.
bc.centrality[cache.Nodes[u]] = value / 2.0
}
return nil
}
// GetMetric returns the current centrality values for each node indexed
// by node id.
func (bc *BetweennessCentrality) GetMetric(normalize bool) map[NodeID]float64 {
// Normalization factor.
var z float64
if (bc.max - bc.min) > 0 {
z = 1.0 / (bc.max - bc.min)
}
centrality := make(map[NodeID]float64)
for k, v := range bc.centrality {
if normalize {
v = (v - bc.min) * z
}
centrality[k] = v
}
return centrality
}