-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathindex.js
78 lines (58 loc) · 2.5 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
const Consumer = require('sqs-consumer')
const Producer = require('sqs-producer')
const tinygen = require('tinygen')
const {
always, bind, compose, curryN, dissoc, identity, merge,
mergeAll, nAry, once, partial, tap
} = require('ramda')
const { parse, stringify } = JSON
const defaults = { visibilityTimeout: 30 },
timeoutErr = { error: 'visibility timeout exceeded' }
const action = (type, payload) => ({ type, payload })
const clean = compose(merge(defaults), dissoc('timeoutLogger'))
const missing = type => () =>
Promise.reject(new Error(`No handler registered for (${type})`))
const parseFirst = fn => (msg, done) =>
fn(parse(msg.Body), done)
module.exports = opts => {
const { timeoutLogger=identity, visibilityTimeout } = opts,
options = clean(opts),
handlers = {}
const dispatch = action =>
new Promise((res, rej) => {
const message = { id: tinygen(), body: stringify(action) }
producer.send([message], err => err ? rej(err) : res(message))
})
const handle = (type, handler) =>
handlers[type] = handler
const handleMany = jobs =>
Object.assign(handlers, jobs)
const handlerFor = type =>
typeof handlers[type] === 'function' ? handlers[type] : missing(type)
const processJob = ({ type, payload }, callback) => {
const done = once(callback)
details = mergeAll([ options, { type, payload }, timeoutErr ]),
logTimeout = partial(timeoutLogger, [ details ]),
handleTimeout = compose(partial(done, [ new Error(timeoutErr.error) ]), tap(logTimeout))
timeout = setTimeout(handleTimeout, visibilityTimeout * 1000),
finish = compose(tap(partial(clearTimeout, [ timeout ])), done)
return Promise.resolve(payload)
.then(handlerFor(type))
.then(nAry(0, finish))
.catch(finish)
}
const handleMessage = parseFirst(processJob)
const consumer = Consumer.create(merge(options, { handleMessage })),
producer = Producer.create(options),
queue = {}
const on = bind(consumer.on, consumer),
start = bind(consumer.start, consumer),
stop = bind(consumer.stop, consumer)
queue.handle = curryN(2, compose(always(queue), handle))
queue.handleMany = compose(always(queue), handleMany)
queue.on = curryN(2, compose(always(queue), on))
queue.send = curryN(2, compose(dispatch, action))
queue.start = compose(always(queue), start)
queue.stop = compose(always(queue), stop)
return queue
}