Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Guard against double channel close #35

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 53 additions & 12 deletions streaming/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"github.com/onitake/restreamer/protocol"
"github.com/onitake/restreamer/util"
"net/http"
"time"
)
Expand All @@ -29,15 +30,14 @@ import (
// This is meant to be called directly from a ServeHTTP handler.
// No separate thread is created.
type Connection struct {
// Queue is the per-connection packet queue
Queue chan protocol.MpegTsPacket
// queue is the per-connection packet queue
queue chan protocol.MpegTsPacket
// ClientAddress is the remote client address
ClientAddress string
// the destination socket
// writer is the destination socket
writer http.ResponseWriter
// Closed is true if Serve was ended because of a closed channel.
// This is simply there to avoid a double close.
Closed bool
// closed is used to protect against double closure
closed util.AtomicBool
// context contains the cached context object for this connection
context context.Context
}
Expand All @@ -49,18 +49,44 @@ type Connection struct {
// and will be used for logging.
func NewConnection(destination http.ResponseWriter, qsize int, clientaddr string, ctx context.Context) *Connection {
conn := &Connection{
Queue: make(chan protocol.MpegTsPacket, qsize),
queue: make(chan protocol.MpegTsPacket, qsize),
ClientAddress: clientaddr,
writer: destination,
context: ctx,
}
return conn
}

// Send writes a packet into the output queue.
// It returns true if the packet was processed, or false if the queue was full.
func (conn *Connection) Send(packet protocol.MpegTsPacket) bool {
select {
case conn.queue <- packet:
// packet distributed, done
//log.Printf("Queued packet (length %d):\n%s\n", len(packet), hex.Dump(packet))
return true
default:
// queue is full
//log.Print(ErrSlowRead)
return false
}
}

// Close closes the queue and stops the client connection.
// This can be called multiple times, and even asynchronously.
func (conn *Connection) Close() error {
// The Go standard library recommends against using atomics if there are other means, but they are sooo handy.
if util.CompareAndSwapBool(&conn.closed, false, true) {
close(conn.queue)
// important: we need to set this to nil, to avoid writing
}
return nil
}

// Serve starts serving data to a client, continuously feeding packets from the queue.
// An optional preamble buffer can be passed that will be sent before streaming the live payload
// (but after the HTTP response headers).
func (conn *Connection) Serve(preamble []byte) {
func (conn *Connection) Serve(preamble []byte) bool {
// set the content type (important)
conn.writer.Header().Set("Content-Type", "video/mpeg")
// a stream is always current
Expand Down Expand Up @@ -102,10 +128,12 @@ func (conn *Connection) Serve(preamble []byte) {
}
}

// this is the exit status indicator
qclosed := false
// start reading packets
for running {
select {
case packet, ok := <-conn.Queue:
case packet, ok := <-conn.queue:
if ok {
// packet received, log
//log.Printf("Sending packet (length %d):\n%s\n", len(packet), hex.Dump(packet))
Expand All @@ -129,7 +157,8 @@ func (conn *Connection) Serve(preamble []byte) {
"message", "Shutting down client connection",
)
running = false
conn.Closed = true
// indicate that the queue was closed
qclosed = true
}
case <-conn.context.Done():
// connection closed while we were waiting for more data
Expand All @@ -138,17 +167,29 @@ func (conn *Connection) Serve(preamble []byte) {
"message", "Downstream connection closed (while waiting)",
"error", fmt.Sprintf("%v", conn.context.Err()),
)
// stop processing events
// we do NOT close the queue here, this will be done when the writer completes
running = false
}
}

// we cannot drain the channel here, as it might not be closed yet.
// better let our caller handle closure and draining.
// we're not draining the queue yet, this will happen when Drain() is called
// (i.e. after the connection has been removed from the pool)

logger.Logkv(
"event", eventConnectionDone,
"message", "Streaming finished",
)

return qclosed
}

// Drain drains the built-in queue.
// You should call this when all writers have been removed and the queue is closed.
func (conn *Connection) Drain() {
for range conn.queue {
// drain any leftovers
}
}

// ServeStreamError returns an appropriate error response to the client.
Expand Down
43 changes: 22 additions & 21 deletions streaming/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,23 +294,15 @@ func (streamer *Streamer) Stream(queue <-chan protocol.MpegTsPacket) error {
//log.Printf("Got packet (length %d)\n", len(packet))

for conn := range pool {
select {
case conn.Queue <- packet:
// packet distributed, done
//log.Printf("Queued packet (length %d):\n%s\n", len(packet), hex.Dump(packet))

if conn.Send(packet) {
// report the packet
streamer.stats.PacketSent()
if streamer.promCounter {
metricPacketsSent.With(prometheus.Labels{"stream": streamer.name}).Inc()
metricBytesSent.With(prometheus.Labels{"stream": streamer.name}).Add(protocol.MpegTsPacketSize)
}

default:
// queue is full
//log.Print(ErrSlowRead)

// report the drop
} else {
// queue full, report the dropped packet
streamer.stats.PacketDropped()
if streamer.promCounter {
metricPacketsDropped.With(prometheus.Labels{"stream": streamer.name}).Inc()
Expand All @@ -331,9 +323,6 @@ func (streamer *Streamer) Stream(queue <-chan protocol.MpegTsPacket) error {
"event", eventStreamerClientRemove,
"message", fmt.Sprintf("Removing client %s from pool", request.Address),
)
if !request.Connection.Closed {
close(request.Connection.Queue)
}
delete(pool, request.Connection)
case StreamerCommandAdd:
// check if the connection can be accepted
Expand Down Expand Up @@ -362,7 +351,10 @@ func (streamer *Streamer) Stream(queue <-chan protocol.MpegTsPacket) error {
inhibit = true
// close all downstream connections
for conn := range pool {
close(conn.Queue)
conn.Close()
// avoid waiting for the removal round-trip, this will make us less racy
// double deletes are safe, so nothing bad will happen when we do get the remove command later
delete(pool, request.Connection)
}
// TODO implement inhibit in the check api
case StreamerCommandAllow:
Expand Down Expand Up @@ -391,7 +383,8 @@ func (streamer *Streamer) Stream(queue <-chan protocol.MpegTsPacket) error {
// drain any leftovers
}
for conn := range pool {
close(conn.Queue)
// ensure this queue/connection is closed
conn.Close()
}

// start the command eater again
Expand Down Expand Up @@ -451,20 +444,28 @@ func (streamer *Streamer) ServeHTTP(writer http.ResponseWriter, request *http.Re
"remote", request.RemoteAddr,
)

// here's where the action happens
start := time.Now()
conn.Serve(streamer.preamble)
duration := time.Since(start)

// done, remove the stale connection
streamer.request <- &ConnectionRequest{
command = &ConnectionRequest{
Command: StreamerCommandRemove,
Address: request.RemoteAddr,
Connection: conn,
Waiter: &sync.WaitGroup{},
}
// and drain the queue AFTER we have sent the shutdown signal
for range conn.Queue {
// drain any leftovers
}
command.Waiter.Add(1)
streamer.request <- command
// and wait for completion
command.Waiter.Wait()

// we are now safe to close and drain the queue
// double close is prevented by the connection object
conn.Close()
conn.Drain()

logger.Logkv(
"event", eventStreamerClosed,
"message", fmt.Sprintf("Connection from %s closed", request.RemoteAddr),
Expand Down
Loading