-
Notifications
You must be signed in to change notification settings - Fork 1
/
config.js
83 lines (77 loc) · 2.77 KB
/
config.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
var config = YAML.decode(
pipy.load('config.yaml')
)
// unhealthy brokers cache
var unhealthyBrokers = new algo.Cache()
// global logger
var logger = new logging.JSONLogger('console').toStdout()
var brokerTargets = Object.fromEntries(config.brokers.map(b => [b.addr, b]))
var brokerCapacities = config.brokers.reduce(function (caps, i) {
caps = caps + Number.parseInt(i.capicity)
return caps
}, 0)
var connRate = Number.parseInt(config.limits.conn.rate)
var connQuota = new algo.Quota(connRate < brokerCapacities ? connRate : brokerCapacities, { key: 'conn' })
// valid a broker
// 1. add to unhealthy cache
// 2. decrease connection quota
var validBroker = (target) => {
var previous = connQuota.current
var pre = brokerCapacities
var targetCap = Number.parseInt(target.capicity)
brokerCapacities += targetCap
if (connRate > pre) {
if (connRate < brokerCapacities) {
connQuota.produce((connRate - pre))
} else {
connQuota.produce(targetCap)
}
}
logger.log(`Broker ${target.addr} valid, change connection quota from ${previous} to ${connQuota.current} `)
}
// invalid a broker:
// 1. remove from unhealthy cache
// 2. increate connection quota
var invalidBroker = (target) => {
var previous = connQuota.current
var pre = brokerCapacities
var targetCap = Number.parseInt(target.capicity)
brokerCapacities -= targetCap
if (connRate < pre && connRate > brokerCapacities) {
connQuota.consume((connRate - brokerCapacities))
} else if (connRate >= pre) {
connQuota.consume(targetCap)
}
logger.log(`Broker ${target.addr} invalid, change connection quota from ${previous} to ${connQuota.current} `)
}
// health check
if (config.healthCheck.enabled === 'true') {
var agent = new http.Agent(`localhost:${config.healthCheck.port}`)
var lastUnhealthies = {}
var healthCheck = function () {
agent.request('GET', '/unhealthy').then(
function (res) {
var unhealthies = Object.fromEntries(JSON.decode(res.body).map(addr => [addr, true]))
Object.keys(lastUnhealthies).forEach(addr => {
if (!unhealthies[addr]) {
pipy.thread.id === 0 && validBroker(brokerTargets[addr])
if (unhealthyBrokers.get(addr)) {
unhealthyBrokers.remove(addr)
}
delete lastUnhealthies[addr]
}
})
Object.keys(unhealthies).forEach(addr => {
if (!lastUnhealthies[addr]) {
pipy.thread.id === 0 && invalidBroker(brokerTargets[addr])
unhealthyBrokers.set(addr, true)
lastUnhealthies[addr] = true
}
})
}
)
new Timeout(Number.parseInt(config.healthCheck.interval)).wait().then(healthCheck)
}
healthCheck()
}
export { config, logger, unhealthyBrokers, connQuota, validBroker, invalidBroker }