diff --git a/internal/framing/frame.go b/internal/framing/frame.go index f998127..797cdfc 100644 --- a/internal/framing/frame.go +++ b/internal/framing/frame.go @@ -143,7 +143,7 @@ type Frame interface { // IsResumable returns true if frame supports resume. IsResumable() bool // Done marks current frame has been sent. - Done() + Done() (closed bool) // DoneNotify notifies when frame done. DoneNotify() <-chan struct{} } @@ -156,8 +156,14 @@ type BaseFrame struct { } // Done can be invoked when a frame has been been processed. -func (p *BaseFrame) Done() { +func (p *BaseFrame) Done() (closed bool) { + defer func() { + if e := recover(); e != nil { + closed = true + } + }() close(p.done) + return } // DoneNotify notify when frame has been done. diff --git a/internal/socket/duplex.go b/internal/socket/duplex.go index 8e5d3c2..45c6eae 100644 --- a/internal/socket/duplex.go +++ b/internal/socket/duplex.go @@ -984,10 +984,9 @@ func (p *DuplexRSocket) drainOutBack() { var out framing.Frame for i := range p.outsPriority { out = p.outsPriority[i] - if p.tp != nil { - if err := p.tp.Send(out, false); err != nil { - logger.Errorf("send frame failed: %v\n", err) - } + if err := p.tp.Send(out, false); err != nil { + out.Done() + logger.Errorf("send frame failed: %v\n", err) } } if err := p.tp.Flush(); err != nil { diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 113d2d2..c89f064 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -19,6 +19,8 @@ type ( ServerTransportAcceptor = func(ctx context.Context, tp *Transport) ) +var errTransportClosed = errors.New("transport closed") + // ServerTransport is server-side RSocket transport. type ServerTransport interface { io.Closer @@ -76,26 +78,36 @@ func (p *Transport) SetLifetime(lifetime time.Duration) { } // Send send a frame. -func (p *Transport) Send(frame framing.Frame, flush bool) error { - defer frame.Done() - if err := p.conn.Write(frame); err != nil { - return errors.Wrap(err, "send failed") +func (p *Transport) Send(frame framing.Frame, flush bool) (err error) { + defer func() { + // ensure frame done when send success. + if err == nil { + frame.Done() + } + }() + if p == nil || p.conn == nil { + err = errTransportClosed + return } - if !flush { - return nil + err = p.conn.Write(frame) + if err != nil { + return } - if err := p.conn.Flush(); err != nil { - return errors.Wrap(err, "flush failed") + if !flush { + return } - return nil + err = p.conn.Flush() + return } // Flush flush all bytes in current connection. -func (p *Transport) Flush() error { - if err := p.conn.Flush(); err != nil { - return errors.Wrap(err, "flush failed") +func (p *Transport) Flush() (err error) { + if p == nil || p.conn == nil { + err = errTransportClosed + return } - return nil + err = p.conn.Flush() + return } // Close close current transport. diff --git a/rx/rx.go b/rx/rx.go index cc428b6..6ed6c57 100644 --- a/rx/rx.go +++ b/rx/rx.go @@ -26,7 +26,7 @@ type ( FnOnNext = func(input payload.Payload) // FnOnSubscribe is alias of function for signal when subscribe begin. FnOnSubscribe = func(s Subscription) - // FnOnError is alias of function for signal when an error occured. + // FnOnError is alias of function for signal when an error occurred. FnOnError = func(e error) // FnOnCancel is alias of function for signal when subscription canceled. FnOnCancel = func()