forked from streadway/amqp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumers.go
142 lines (109 loc) · 2.79 KB
/
consumers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Source code and contact info at http://github.com/streadway/amqp
package amqp
import (
"os"
"strconv"
"sync"
"sync/atomic"
)
var consumerSeq uint64
const consumerTagLengthMax = 0xFF // see writeShortstr
func uniqueConsumerTag() string {
return commandNameBasedUniqueConsumerTag(os.Args[0])
}
func commandNameBasedUniqueConsumerTag(commandName string) string {
tagPrefix := "ctag-"
tagInfix := commandName
tagSuffix := "-" + strconv.FormatUint(atomic.AddUint64(&consumerSeq, 1), 10)
if len(tagPrefix)+len(tagInfix)+len(tagSuffix) > consumerTagLengthMax {
tagInfix = "streadway/amqp"
}
return tagPrefix + tagInfix + tagSuffix
}
type consumerBuffers map[string]chan *Delivery
// Concurrent type that manages the consumerTag ->
// ingress consumerBuffer mapping
type consumers struct {
sync.WaitGroup // one for buffer
closed chan struct{} // signal buffer
sync.Mutex // protects below
chans consumerBuffers
}
func makeConsumers() *consumers {
return &consumers{
closed: make(chan struct{}),
chans: make(consumerBuffers),
}
}
func (subs *consumers) buffer(in chan *Delivery, out chan Delivery) {
defer close(out)
defer subs.Done()
var inflight = in
var queue []*Delivery
for delivery := range in {
queue = append(queue, delivery)
for len(queue) > 0 {
select {
case <-subs.closed:
// closed before drained, drop in-flight
return
case delivery, consuming := <-inflight:
if consuming {
queue = append(queue, delivery)
} else {
inflight = nil
}
case out <- *queue[0]:
queue = queue[1:]
}
}
}
}
// On key conflict, close the previous channel.
func (subs *consumers) add(tag string, consumer chan Delivery) {
subs.Lock()
defer subs.Unlock()
if prev, found := subs.chans[tag]; found {
close(prev)
}
in := make(chan *Delivery)
subs.chans[tag] = in
subs.Add(1)
go subs.buffer(in, consumer)
}
func (subs *consumers) cancel(tag string) (found bool) {
subs.Lock()
defer subs.Unlock()
ch, found := subs.chans[tag]
if found {
delete(subs.chans, tag)
close(ch)
}
return found
}
func (subs *consumers) close() {
subs.Lock()
defer subs.Unlock()
close(subs.closed)
for tag, ch := range subs.chans {
delete(subs.chans, tag)
close(ch)
}
subs.Wait()
}
// Sends a delivery to a the consumer identified by `tag`.
// If unbuffered channels are used for Consume this method
// could block all deliveries until the consumer
// receives on the other end of the channel.
func (subs *consumers) send(tag string, msg *Delivery) bool {
subs.Lock()
defer subs.Unlock()
buffer, found := subs.chans[tag]
if found {
buffer <- msg
}
return found
}