-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathread_adapter.go
143 lines (122 loc) · 2.91 KB
/
read_adapter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package channel
import (
"github.com/michaelquigley/pfxlog"
"github.com/openziti/foundation/v2/concurrenz"
"github.com/pkg/errors"
"io"
"sync/atomic"
"time"
)
var ErrClosed = errors.New("channel closed")
type ReadTimout struct{}
func (r ReadTimout) Error() string {
return "read timed out"
}
func (r ReadTimout) Timeout() bool {
return true
}
func (r ReadTimout) Temporary() bool {
return true
}
func NewReadAdapter(label string, channelDepth int) *ReadAdapter {
return &ReadAdapter{
label: label,
ch: make(chan []byte, channelDepth),
closeNotify: make(chan struct{}),
deadlineNotify: make(chan struct{}),
}
}
type ReadAdapter struct {
label string
ch chan []byte
closeNotify chan struct{}
deadlineNotify chan struct{}
deadline concurrenz.AtomicValue[time.Time]
closed atomic.Bool
readInProgress atomic.Bool
leftover []byte
}
func (self *ReadAdapter) PushData(data []byte) error {
select {
case self.ch <- data:
return nil
case <-self.closeNotify:
return ErrClosed
}
}
func (self *ReadAdapter) SetReadDeadline(deadline time.Time) error {
self.deadline.Store(deadline)
if self.readInProgress.Load() {
select {
case self.deadlineNotify <- struct{}{}:
case <-time.After(5 * time.Millisecond):
}
} else {
select {
case self.deadlineNotify <- struct{}{}:
default:
}
}
return nil
}
func (self *ReadAdapter) GetNext() ([]byte, error) {
self.readInProgress.Store(true)
defer self.readInProgress.Store(false)
for {
deadline := self.deadline.Load()
var timeoutCh <-chan time.Time
if !deadline.IsZero() {
timeoutCh = time.After(time.Until(deadline))
}
select {
case data := <-self.ch:
return data, nil
case <-self.closeNotify:
// If we're closed, return any buffered values, otherwise return nil
select {
case data := <-self.ch:
return data, nil
default:
return nil, ErrClosed
}
case <-self.deadlineNotify:
continue
case <-timeoutCh:
// If we're timing out, return any buffered values, otherwise return nil
select {
case data := <-self.ch:
return data, nil
default:
return nil, &ReadTimout{}
}
}
}
}
func (self *ReadAdapter) Read(b []byte) (n int, err error) {
log := pfxlog.Logger().WithField("label", self.label)
if self.closed.Load() {
return 0, io.EOF
}
log.Tracef("read buffer = %d bytes", len(b))
if len(self.leftover) > 0 {
log.Tracef("found %d leftover bytes", len(self.leftover))
n = copy(b, self.leftover)
self.leftover = self.leftover[n:]
return n, nil
}
d, err := self.GetNext()
if err != nil {
return 0, err
}
log.Tracef("got buffer from sequencer %d bytes", len(d))
n = copy(b, d)
self.leftover = d[n:]
log.Tracef("saving %d bytes for leftover", len(self.leftover))
log.Tracef("reading %v bytes", n)
return n, nil
}
func (self *ReadAdapter) Close() {
if self.closed.CompareAndSwap(false, true) {
close(self.closeNotify)
}
}