-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathpacket_queue.go
129 lines (111 loc) · 2.13 KB
/
packet_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package sio
import (
"time"
"github.com/karagenc/socket.io-go/internal/sync"
eio "github.com/karagenc/socket.io-go/engine.io"
eioparser "github.com/karagenc/socket.io-go/engine.io/parser"
)
type packetQueue struct {
packets []*eioparser.Packet
mu sync.Mutex
ready chan struct{}
drain chan struct{}
_reset chan struct{}
_close chan struct{}
}
func newPacketQueue() *packetQueue {
return &packetQueue{
ready: make(chan struct{}, 1),
drain: make(chan struct{}),
_reset: make(chan struct{}, 1),
_close: make(chan struct{}, 1),
}
}
func (pq *packetQueue) poll() (packets []*eioparser.Packet, ok, closed bool) {
packets = pq.get()
if len(packets) != 0 {
ok = true
return
}
select {
// _close takes precedence.
// Otherwise we would go with the already-invoked `pq.ready` channel.
case <-pq._close:
return nil, false, true
case <-pq.ready:
packets = pq.get()
if len(packets) != 0 {
ok = true
}
select {
case pq.drain <- struct{}{}:
default:
}
}
return
}
func (pq *packetQueue) get() (packets []*eioparser.Packet) {
pq.mu.Lock()
defer pq.mu.Unlock()
packets = pq.packets
pq.packets = nil
return
}
func (pq *packetQueue) add(packets ...*eioparser.Packet) {
pq.mu.Lock()
if len(pq.packets) == 0 {
pq.packets = packets
} else {
pq.packets = append(pq.packets, packets...)
}
pq.mu.Unlock()
select {
case pq.ready <- struct{}{}:
default:
}
}
func (pq *packetQueue) reset() {
pq.mu.Lock()
defer pq.mu.Unlock()
pq.packets = nil
select {
case pq._reset <- struct{}{}:
default:
}
}
func (pq *packetQueue) close() {
pq.mu.Lock()
defer pq.mu.Unlock()
pq.packets = nil
select {
case pq._close <- struct{}{}:
default:
}
}
func (pq *packetQueue) waitForDrain(timeout time.Duration) (timedout bool) {
pq.mu.Lock()
alreadyDrained := len(pq.packets) == 0
pq.mu.Unlock()
if alreadyDrained {
return
}
select {
case <-pq.drain:
case <-pq._reset:
case <-time.After(timeout):
timedout = true
}
return
}
func (pq *packetQueue) pollAndSend(socket eio.Socket) {
for {
packets, ok, closed := pq.poll()
if closed {
return
}
if !ok {
continue
}
socket.Send(packets...)
}
}