Skip to content

Commit

Permalink
Increment missing indications from SACK
Browse files Browse the repository at this point in the history
Increment missing indication for reported
missing chunks instead of all inflight chunks.
  • Loading branch information
cnderrauber committed Nov 20, 2024
1 parent 7d6927e commit d3e02e2
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 29 deletions.
9 changes: 6 additions & 3 deletions association.go
Original file line number Diff line number Diff line change
Expand Up @@ -1755,7 +1755,7 @@ func (a *Association) onCumulativeTSNAckPointAdvanced(totalBytesAcked int) {
}

// The caller should hold the lock.
func (a *Association) processFastRetransmission(cumTSNAckPoint, htna uint32, cumTSNAckPointAdvanced bool) error {
func (a *Association) processFastRetransmission(cumTSNAckPoint uint32, gapAckBlocks []gapAckBlock, htna uint32, cumTSNAckPointAdvanced bool) error {
// HTNA algorithm - RFC 4960 Sec 7.2.4
// Increment missIndicator of each chunks that the SACK reported missing
// when either of the following is met:
Expand All @@ -1772,7 +1772,10 @@ func (a *Association) processFastRetransmission(cumTSNAckPoint, htna uint32, cum
maxTSN = htna
} else {
// b) increment for all TSNs reported missing
maxTSN = cumTSNAckPoint + uint32(a.inflightQueue.size()) + 1
maxTSN = cumTSNAckPoint
if len(gapAckBlocks) > 0 {
maxTSN += uint32(gapAckBlocks[len(gapAckBlocks)-1].end)
}

Check warning on line 1778 in association.go

View check run for this annotation

Codecov / codecov/patch

association.go#L1775-L1778

Added lines #L1775 - L1778 were not covered by tests
}

for tsn := cumTSNAckPoint + 1; sna32LT(tsn, maxTSN); tsn++ {
Expand Down Expand Up @@ -1882,7 +1885,7 @@ func (a *Association) handleSack(d *chunkSelectiveAck) error {
a.setRWND(d.advertisedReceiverWindowCredit - bytesOutstanding)
}

err = a.processFastRetransmission(d.cumulativeTSNAck, htna, cumTSNAckPointAdvanced)
err = a.processFastRetransmission(d.cumulativeTSNAck, d.gapAckBlocks, htna, cumTSNAckPointAdvanced)
if err != nil {
return err
}
Expand Down
82 changes: 56 additions & 26 deletions association_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2921,49 +2921,79 @@ func TestAssociationFastRtxWnd(t *testing.T) {
}
}

// intercept SACK
var lastSACK atomic.Pointer[chunkSelectiveAck]
dbConn2.remoteInboundHandler = func(buf []byte) {
p := &packet{}
require.NoError(t, p.unmarshal(true, buf))
for _, c := range p.chunks {
if ack, aok := c.(*chunkSelectiveAck); aok {
lastSACK.Store(ack)
}
}
dbConn1.inboundHandler(buf)
}

_, err = s1.WriteSCTP([]byte("hello"), PayloadTypeWebRTCBinary)
require.NoError(t, err)
require.Eventually(t, func() bool { return lastSACK.Load() != nil }, 1*time.Second, 10*time.Millisecond)

shouldDrop.Store(true)
// send packets and dropped
buf := make([]byte, 1000)
for i := 0; i < 10; i++ {
buf := make([]byte, 700)
for i := 0; i < 20; i++ {
_, err = s1.WriteSCTP(buf, PayloadTypeWebRTCBinary)
require.NoError(t, err)
}

require.Eventually(t, func() bool { return dropCounter.Load() >= 10 }, 5*time.Second, 10*time.Millisecond, "drop %d", dropCounter.Load())
// send packets to trigger fast retransmit
shouldDrop.Store(false)
require.Eventually(t, func() bool { return dropCounter.Load() >= 15 }, 5*time.Second, 10*time.Millisecond, "drop %d", dropCounter.Load())

require.Zero(t, a1.stats.getNumFastRetrans())
require.False(t, a1.inFastRecovery)

// wait SACK
sackCh := make(chan []byte, 1)
dbConn2.remoteInboundHandler = func(buf []byte) {
p := &packet{}
require.NoError(t, p.unmarshal(true, buf))
for _, c := range p.chunks {
if _, ok := c.(*chunkSelectiveAck); ok {
select {
case sackCh <- buf:
default:
}
return
}
}
}
// wait sack to trigger fast retransmit
for i := 0; i < 3; i++ {
_, err = s1.WriteSCTP(buf, PayloadTypeWebRTCBinary)
require.NoError(t, err)
dbConn1.inboundHandler(<-sackCh)
// sack to trigger fast retransmit
ack := *(lastSACK.Load())
ack.gapAckBlocks = []gapAckBlock{{start: 11}}
for i := 11; i < 14; i++ {
ack.gapAckBlocks[0].end = uint16(i)
pkt := a1.createPacket([]chunk{&ack})
pktBuf, err1 := pkt.marshal(true)
require.NoError(t, err1)
dbConn1.inboundHandler(pktBuf)
}
// fast retransmit and new sack sent

require.Eventually(t, func() bool {
a1.lock.RLock()
defer a1.lock.RUnlock()
return a1.inFastRecovery
}, 5*time.Second, 10*time.Millisecond)
require.GreaterOrEqual(t, uint64(10), a1.stats.getNumFastRetrans())

// 7.2.4 b) In fast-recovery AND the Cumulative TSN Ack Point advanced
// the miss indications are incremented for all TSNs reported missing
// in the SACK.
a1.lock.Lock()
lastTSN := a1.inflightQueue.chunks.Back().tsn
lastTSNMinusTwo := lastTSN - 2
lastChunk := a1.inflightQueue.chunks.Back()
lastChunkMinusTwo, ok := a1.inflightQueue.get(lastTSNMinusTwo)
a1.lock.Unlock()
require.True(t, ok)
require.True(t, lastTSN > ack.cumulativeTSNAck+uint32(ack.gapAckBlocks[0].end)+3)

// sack with cumAckPoint advanced, lastTSN should not be marked as missing
ack.cumulativeTSNAck++
end := lastTSN - 1 - ack.cumulativeTSNAck
ack.gapAckBlocks = append(ack.gapAckBlocks, gapAckBlock{start: uint16(end), end: uint16(end)})
pkt := a1.createPacket([]chunk{&ack})
pktBuf, err := pkt.marshal(true)
require.NoError(t, err)
dbConn1.inboundHandler(pktBuf)
require.Eventually(t, func() bool {
a1.lock.Lock()
defer a1.lock.Unlock()
return lastChunkMinusTwo.missIndicator == 1 && lastChunk.missIndicator == 0
}, 5*time.Second, 10*time.Millisecond)
}

func TestAssociationMaxTSNOffset(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (q *queue[T]) Front() T {
return q.buf[q.head]
}

func (q *queue[T]) Back() T {
return q.buf[(q.tail-1+len(q.buf))%len(q.buf)]

Check warning on line 51 in queue.go

View check run for this annotation

Codecov / codecov/patch

queue.go#L50-L51

Added lines #L50 - L51 were not covered by tests
}

func (q *queue[T]) At(i int) T {
return q.buf[(q.head+i)%(len(q.buf))]
}
Expand Down

0 comments on commit d3e02e2

Please sign in to comment.