-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsend_queue.go
158 lines (129 loc) · 3.38 KB
/
send_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Copyright 2022 Kirill Scherba <kirill@scherba.ru>. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// TRU Send Queue module
package tru
import (
"container/list"
"fmt"
"sync"
"time"
)
type sendQueue struct {
queue list.List // Send queue list
index map[uint32]*list.Element // Send queue index
sync.RWMutex // Send queue mutex
retransmitTimer *time.Timer // Send queue retransmit Timer
}
const (
minRTT = 30 * time.Millisecond
maxRTT = 3000 * time.Millisecond
startRTT = 200 * time.Millisecond
maxRetransmitAttempts = 100
)
// init send queue
func (s *sendQueue) init(ch *Channel) {
s.index = make(map[uint32]*list.Element)
s.retransmit(ch)
}
// destroy send queue
func (s *sendQueue) destroy() {
s.Lock()
defer s.Unlock()
s.retransmitTimer.Stop()
}
// add packet to send queue
func (s *sendQueue) add(pac *Packet) {
s.Lock()
defer s.Unlock()
id := uint32(pac.ID())
s.index[id] = s.queue.PushBack(pac)
log.Debugvvv.Println("add to send queue", pac.ID())
}
// delete packet from send queue
func (s *sendQueue) delete(id int) (pac *Packet, ok bool) {
s.Lock()
defer s.Unlock()
e, pac, ok := s.get(id, true) // unsafe get (does not lock)
if ok {
s.queue.Remove(e)
delete(s.index, uint32(id))
log.Debugvvv.Println("delete from send queue", pac.ID())
}
return
}
// get packet from send queue. Does not lock/unlock if seconf parameter true
func (s *sendQueue) get(id int, unsafe ...bool) (e *list.Element, pac *Packet, ok bool) {
if len(unsafe) == 0 || !unsafe[0] {
s.RLock()
defer s.RUnlock()
}
e, ok = s.index[uint32(id)]
if ok {
pac = e.Value.(*Packet)
}
return
}
// getFirst return first queu element or nil if queue is empty
func (s *sendQueue) getFirst() (pac *Packet) {
s.RLock()
defer s.RUnlock()
e := s.queue.Front()
if e == nil {
return
}
pac = e.Value.(*Packet)
return
}
// getRetransmitAttempts return retransmit attmenpts of first queu element or
// 0 if queue is empty
func (s *sendQueue) getRetransmitAttempts() (rta int) {
pac := s.getFirst()
if pac == nil {
return
}
return pac.getRetransmitAttempts()
}
// len return send queue len
func (s *sendQueue) len() int {
s.RLock()
defer s.RUnlock()
return len(s.index)
}
// retransmit packets from send queue
func (s *sendQueue) retransmit(ch *Channel) {
s.Lock()
defer s.Unlock()
s.retransmitTimer = time.AfterFunc( /* minRTT */ 100*time.Millisecond, func() {
s.RLock()
// Retransmit packets from send queue while retransmit
// time before now
for e := s.queue.Front(); e != nil; e = e.Next() {
// Check retransmit time
pac := e.Value.(*Packet)
if !pac.getRetransmitTime().Before(time.Now()) {
// break
continue
}
// Resend packet and set new retransmitTime
rta := pac.retransmitAttempts + 1
pac.setRetransmitAttempts(rta)
if rta > maxRetransmitAttempts {
s.RUnlock()
ch.destroy(fmt.Sprint("channel max retransmit, destroy ", ch.addr.String()))
return
}
ch.setRetransmitTime(pac)
// Send to write channel
ch.writeToSender(pac)
ch.stat.setRetransmit()
// Does not retranmit another packets if this has more than 1
// retransmit attempts
// if pac.retransmitAttempts > 1 {
// break
// }
}
s.RUnlock()
s.retransmit(ch)
})
}