Skip to content

Commit

Permalink
internal/transportutil: add some extra comments
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisccoulson committed Aug 13, 2024
1 parent 8b94daf commit 010b929
Showing 1 changed file with 30 additions and 20 deletions.
50 changes: 30 additions & 20 deletions internal/transportutil/retrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ type RetryParams struct {
type retrierTransport struct {
tomb *tomb.Tomb

w io.WriteCloser // write channel
w io.WriteCloser // write channel from public io.Writer to transport routine

r io.ReadCloser // read channel
rLen <-chan int64 // next response length, used to demarcate responses.
rErr <-chan error // read errors.
lr io.Reader // current response reader, limited by the last value read from rLen.
r io.ReadCloser // read channel to public io.Reader from transport routine
rLen <-chan int64 // next response length, used to demarcate responses, sent from transport routine to public io.Reader
rErr <-chan error // read errors, sent from transport routine to io.Reader
lr io.Reader // current response, limited by the last value read from rLen, accessed from public io.Reader

closeErr <-chan error
closeErr <-chan error // close errors to public io.Closer from transport routine
}

// NewRetrierTransport returns a new transport that resubmits commands on certain
Expand All @@ -51,17 +51,19 @@ func NewRetrierTransport(transport tpm2.Transport, params RetryParams) tpm2.Tran
// Construct the write channel
wr, ww := io.Pipe()
t.w = ww
// wr is read by the transport routine

// Construct the read channel
rr, rw := io.Pipe()
t.r = rr
rLen := make(chan int64)
// rw is written to from the transport routine
rLen := make(chan int64) // Used by the transport routine to tell the public io.Reader how big the next response is
t.rLen = rLen
rErr := make(chan error)
rErr := make(chan error) // Used by the transport routine to signal a transport error to the public io.Reader.
t.rErr = rErr

// Construct the close channel
closeErr := make(chan error)
closeErr := make(chan error) // Used by the transport routine to signal close errors to the public io.Closer.
t.closeErr = closeErr

tmb := new(tomb.Tomb)
Expand All @@ -72,13 +74,13 @@ func NewRetrierTransport(transport tpm2.Transport, params RetryParams) tpm2.Tran
loop := newRetrierTransportLoop(&params, transport, tmb, wr, rw, rLen, rErr)
err := loop.run()

// Ensure the calling routine gets unblocked.
wr.Close()
rw.Close()
close(rLen)
close(rErr)
closeErr <- transport.Close()
close(closeErr)
// Ensure the public calling routine gets unblocked.
wr.Close() // Unblocks public io.Writer
rw.Close() // Unblocks public io.Reader reads from io.LimitedReader
close(rLen) // Unblocks public io.Reader waits for next response or io.Closer
close(rErr) // Unblocks public io.Reader waits for next transport error or io.Closer
closeErr <- transport.Close() // Close the underlying transport, unblocking public io.Closer with the actual error
close(closeErr) // Last ditch attempt to unblock public io.Closer, causing it to return ErrClosed
return err
})
return t
Expand Down Expand Up @@ -242,32 +244,40 @@ func (t *retrierTransport) Write(data []byte) (n int, err error) {

func (t *retrierTransport) Close() error {
// Close pipes to unblock I/O on the transport side.
t.w.Close()
t.r.Close()
t.w.Close() // Unblocks transport routine waits on reads
t.r.Close() // Unblocks transport routine waits on writes

// Mark dying so the retry loop exits.
t.tomb.Kill(nil)

var closeErr error
var wasOpen bool

// Unblock senders on the transport side and wait for everything
// to die.
// Wait for everything on the transport routine to die
Loop:
for {
select {
case <-t.rErr:
// Response error channel is closed
case <-t.rLen:
// Response length channel is closed
case closeErr, wasOpen = <-t.closeErr:
// Close channel receives a close response from the underlying
// transport routine if it wasn't already closed
if !wasOpen {
// It was already closed
closeErr = ErrClosed
}
// This is the last thing we were waiting for.
break Loop
}
}

// Wait for all goroutines to terminate
if err := t.tomb.Wait(); err != nil {
return err
}

// We're done!
return closeErr
}

0 comments on commit 010b929

Please sign in to comment.