From 99950c54895bef637dcc5e6e74709a906b9bdf53 Mon Sep 17 00:00:00 2001 From: Gregor Riepl Date: Tue, 18 Jan 2022 14:00:09 +0100 Subject: [PATCH 1/2] Abstract output queue and fix double close issue --- streaming/connection.go | 63 ++++++++++++++++++++++++++++++++++------- streaming/streamer.go | 39 ++++++++++++------------- 2 files changed, 70 insertions(+), 32 deletions(-) diff --git a/streaming/connection.go b/streaming/connection.go index 3e54443..f57112d 100644 --- a/streaming/connection.go +++ b/streaming/connection.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "github.com/onitake/restreamer/protocol" + "github.com/onitake/restreamer/util" "net/http" "time" ) @@ -29,15 +30,14 @@ import ( // This is meant to be called directly from a ServeHTTP handler. // No separate thread is created. type Connection struct { - // Queue is the per-connection packet queue - Queue chan protocol.MpegTsPacket + // queue is the per-connection packet queue + queue chan protocol.MpegTsPacket // ClientAddress is the remote client address ClientAddress string - // the destination socket + // writer is the destination socket writer http.ResponseWriter - // Closed is true if Serve was ended because of a closed channel. - // This is simply there to avoid a double close. - Closed bool + // closed is used to protect against double closure + closed util.AtomicBool // context contains the cached context object for this connection context context.Context } @@ -49,7 +49,7 @@ type Connection struct { // and will be used for logging. func NewConnection(destination http.ResponseWriter, qsize int, clientaddr string, ctx context.Context) *Connection { conn := &Connection{ - Queue: make(chan protocol.MpegTsPacket, qsize), + queue: make(chan protocol.MpegTsPacket, qsize), ClientAddress: clientaddr, writer: destination, context: ctx, @@ -57,6 +57,32 @@ func NewConnection(destination http.ResponseWriter, qsize int, clientaddr string return conn } +// Send writes a packet into the output queue. +// It returns true if the packet was processed, or false if the queue was full. +func (conn *Connection) Send(packet protocol.MpegTsPacket) bool { + select { + case conn.queue <- packet: + // packet distributed, done + //log.Printf("Queued packet (length %d):\n%s\n", len(packet), hex.Dump(packet)) + return true + default: + // queue is full + //log.Print(ErrSlowRead) + return false + } +} + +// Close closes the queue and stops the client connection. +// This can be called multiple times, and even asynchronously. +func (conn *Connection) Close() error { + // The Go standard library recommends against using atomics if there are other means, but they are sooo handy. + if util.CompareAndSwapBool(&conn.closed, false, true) { + close(conn.queue) + // important: we need to set this to nil, to avoid writing + } + return nil +} + // Serve starts serving data to a client, continuously feeding packets from the queue. // An optional preamble buffer can be passed that will be sent before streaming the live payload // (but after the HTTP response headers). @@ -102,10 +128,12 @@ func (conn *Connection) Serve(preamble []byte) { } } + // this is the exit status indicator + qclosed := false // start reading packets for running { select { - case packet, ok := <-conn.Queue: + case packet, ok := <-conn.queue: if ok { // packet received, log //log.Printf("Sending packet (length %d):\n%s\n", len(packet), hex.Dump(packet)) @@ -129,7 +157,8 @@ func (conn *Connection) Serve(preamble []byte) { "message", "Shutting down client connection", ) running = false - conn.Closed = true + // indicate that the queue was closed + qclosed = true } case <-conn.context.Done(): // connection closed while we were waiting for more data @@ -138,17 +167,29 @@ func (conn *Connection) Serve(preamble []byte) { "message", "Downstream connection closed (while waiting)", "error", fmt.Sprintf("%v", conn.context.Err()), ) + // stop processing events + // we do NOT close the queue here, this will be done when the writer completes running = false } } - // we cannot drain the channel here, as it might not be closed yet. - // better let our caller handle closure and draining. + // we're not draining the queue yet, this will happen when Drain() is called + // (i.e. after the connection has been removed from the pool) logger.Logkv( "event", eventConnectionDone, "message", "Streaming finished", ) + + return qclosed +} + +// Drain drains the built-in queue. +// You should call this when all writers have been removed and the queue is closed. +func (conn *Connection) Drain() { + for range conn.queue { + // drain any leftovers + } } // ServeStreamError returns an appropriate error response to the client. diff --git a/streaming/streamer.go b/streaming/streamer.go index 36e85c6..f825390 100644 --- a/streaming/streamer.go +++ b/streaming/streamer.go @@ -294,23 +294,15 @@ func (streamer *Streamer) Stream(queue <-chan protocol.MpegTsPacket) error { //log.Printf("Got packet (length %d)\n", len(packet)) for conn := range pool { - select { - case conn.Queue <- packet: - // packet distributed, done - //log.Printf("Queued packet (length %d):\n%s\n", len(packet), hex.Dump(packet)) - + if conn.Send(packet) { // report the packet streamer.stats.PacketSent() if streamer.promCounter { metricPacketsSent.With(prometheus.Labels{"stream": streamer.name}).Inc() metricBytesSent.With(prometheus.Labels{"stream": streamer.name}).Add(protocol.MpegTsPacketSize) } - - default: - // queue is full - //log.Print(ErrSlowRead) - - // report the drop + } else { + // queue full, report the dropped packet streamer.stats.PacketDropped() if streamer.promCounter { metricPacketsDropped.With(prometheus.Labels{"stream": streamer.name}).Inc() @@ -331,9 +323,6 @@ func (streamer *Streamer) Stream(queue <-chan protocol.MpegTsPacket) error { "event", eventStreamerClientRemove, "message", fmt.Sprintf("Removing client %s from pool", request.Address), ) - if !request.Connection.Closed { - close(request.Connection.Queue) - } delete(pool, request.Connection) case StreamerCommandAdd: // check if the connection can be accepted @@ -362,7 +351,7 @@ func (streamer *Streamer) Stream(queue <-chan protocol.MpegTsPacket) error { inhibit = true // close all downstream connections for conn := range pool { - close(conn.Queue) + conn.Close() } // TODO implement inhibit in the check api case StreamerCommandAllow: @@ -391,7 +380,8 @@ func (streamer *Streamer) Stream(queue <-chan protocol.MpegTsPacket) error { // drain any leftovers } for conn := range pool { - close(conn.Queue) + // ensure this queue/connection is closed + conn.Close() } // start the command eater again @@ -456,15 +446,22 @@ func (streamer *Streamer) ServeHTTP(writer http.ResponseWriter, request *http.Re duration := time.Since(start) // done, remove the stale connection - streamer.request <- &ConnectionRequest{ + command = &ConnectionRequest{ Command: StreamerCommandRemove, Address: request.RemoteAddr, Connection: conn, + Waiter: &sync.WaitGroup{}, } - // and drain the queue AFTER we have sent the shutdown signal - for range conn.Queue { - // drain any leftovers - } + command.Waiter.Add(1) + streamer.request <- command + // and wait for completion + command.Waiter.Wait() + + // we are now safe to close and drain the queue + // double close is prevented by the connection object + conn.Close() + conn.Drain() + logger.Logkv( "event", eventStreamerClosed, "message", fmt.Sprintf("Connection from %s closed", request.RemoteAddr), From 7e80c72ff243ac9fd6bacb7626ccadb9a3c38a09 Mon Sep 17 00:00:00 2001 From: Gregor Riepl Date: Tue, 18 Jan 2022 14:00:33 +0100 Subject: [PATCH 2/2] Use context object for socket close instead of notifier --- streaming/connection.go | 2 +- streaming/streamer.go | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/streaming/connection.go b/streaming/connection.go index f57112d..fbd0f26 100644 --- a/streaming/connection.go +++ b/streaming/connection.go @@ -86,7 +86,7 @@ func (conn *Connection) Close() error { // Serve starts serving data to a client, continuously feeding packets from the queue. // An optional preamble buffer can be passed that will be sent before streaming the live payload // (but after the HTTP response headers). -func (conn *Connection) Serve(preamble []byte) { +func (conn *Connection) Serve(preamble []byte) bool { // set the content type (important) conn.writer.Header().Set("Content-Type", "video/mpeg") // a stream is always current diff --git a/streaming/streamer.go b/streaming/streamer.go index f825390..918c819 100644 --- a/streaming/streamer.go +++ b/streaming/streamer.go @@ -352,6 +352,9 @@ func (streamer *Streamer) Stream(queue <-chan protocol.MpegTsPacket) error { // close all downstream connections for conn := range pool { conn.Close() + // avoid waiting for the removal round-trip, this will make us less racy + // double deletes are safe, so nothing bad will happen when we do get the remove command later + delete(pool, request.Connection) } // TODO implement inhibit in the check api case StreamerCommandAllow: @@ -441,6 +444,7 @@ func (streamer *Streamer) ServeHTTP(writer http.ResponseWriter, request *http.Re "remote", request.RemoteAddr, ) + // here's where the action happens start := time.Now() conn.Serve(streamer.preamble) duration := time.Since(start)