-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathindex.js
120 lines (98 loc) · 3.04 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
'use strict'
const Trailpack = require('trailpack')
const lib = require('./lib')
const _ = require('lodash')
const rabbit = require('rabbot')
// automatically nack exceptions in handlers
rabbit.nackOnError()
const joi = require('joi')
const config = require('./lib/config')
const Client = require('./lib/Client')
const TaskerUtils = require('./lib/Util.js')
module.exports = class TaskerTrailpack extends Trailpack {
/**
* TODO document method
*/
validate() {
this.app.config.tasker = _.defaultsDeep(this.app.config.tasker, config.defaults)
return new Promise((resolve, reject) => {
joi.validate(this.app.config.tasker, config.schema, (err, value) => {
if (err) return reject(new Error('Tasker Configuration: ' + err))
return resolve(value)
})
})
}
/**
* configure rabbitmq exchanges, queues, bindings and handlers
*/
configure() {
let taskerConfig = this.app.config.tasker
const profile = getWorkerProfile(taskerConfig)
taskerConfig = configureExchangesAndQueues(profile, taskerConfig)
this.app.tasker = new Client(this.app, rabbit, taskerConfig.exchangeName)
TaskerUtils.registerTasks(profile, this.app, rabbit)
this.app.api.tasks = this.app.api.tasks || {}
}
/**
* Establish connection to the RabbitMQ exchange, listen for tasks.
*/
initialize() {
return Promise.resolve(rabbit.configure(this.app.config.tasker))
}
constructor(app) {
super(app, {
config: require('./config'),
api: require('./api'),
pkg: require('./package')
})
}
}
/**
* Get the profile for the current process
* The profile contains a list of tasks that this process can work on
* If there is no profile (ie the current process is not a worker process), this returns undefined
*/
function getWorkerProfile(taskerConfig) {
const profileName = taskerConfig.worker
if (!profileName || !taskerConfig.profiles[profileName]) {
return { tasks: [] }
}
return taskerConfig.profiles[profileName]
}
/**
* This function mutates the taskerConfig object
* Declare the exchanges and queues, and bind them appropriately
* Define the relevant routing keys
* @returns {object} - taskerConfig
*/
function configureExchangesAndQueues(profile, taskerConfig) {
const exchangeName = taskerConfig.exchange || 'tasker-work-x'
const workQueueName = taskerConfig.workQueueName || 'tasker-work-q'
const interruptQueueName = taskerConfig.interruptQueueName || 'tasker-interrupt-q'
taskerConfig.exchangeName = exchangeName
taskerConfig.exchanges = [{
name: exchangeName,
type: 'topic',
autoDelete: false
}]
taskerConfig.queues = [{
name: workQueueName,
autoDelete: false,
subscribe: true
}, {
name: interruptQueueName,
autoDelete: false,
subscribe: true
}]
taskerConfig.bindings = [{
exchange: exchangeName,
target: workQueueName,
keys: profile.tasks
}, {
exchange: exchangeName,
target: interruptQueueName,
keys: profile.tasks.map(task => task + '.interrupt')
}]
return taskerConfig
}
exports.Task = lib.Task