Skip to content

Commit

Permalink
Make the channels less fail-prone by initialising them once the subsc…
Browse files Browse the repository at this point in the history
…ription succeeds and ensuring they always close in case of an error to prevent a deadlock
  • Loading branch information
000xE committed Jan 25, 2025
1 parent 9544800 commit e0a9fcc
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 15 deletions.
9 changes: 2 additions & 7 deletions streaming/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,12 @@ func ReadStream[T any](connection *tls.Conn, reads chan<- T) (err error) {
var x T

if err := dec.Decode(&x); err != nil && err != io.EOF {
return err
break
}

select {
case reads <- x:
default:
return nil
}
reads <- x
}

close(reads)

return nil
}
6 changes: 2 additions & 4 deletions streaming/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,11 @@ func (client *StreamingClient) SubscribeToMarkets(marketFilter MarketFilter, mar
Clk: client.Clk,
}

marketChanges := make(chan MarketChangeMessage)

if err := client.Write(ms, true); err != nil {
close(marketChanges)
return marketChanges, err
return nil, err
}

marketChanges := make(chan MarketChangeMessage)
go ReadStream(client.Connection, marketChanges)

return marketChanges, nil
Expand Down
6 changes: 2 additions & 4 deletions streaming/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,11 @@ func (client *StreamingClient) SubscribeToOrders(orderFilter OrderFilter) (chan
Clk: client.Clk,
}

orderChanges := make(chan OrderChangeMessage)

if err := client.Write(ms, true); err != nil {
close(orderChanges)
return orderChanges, err
return nil, err
}

orderChanges := make(chan OrderChangeMessage)
go ReadStream(client.Connection, orderChanges)

return orderChanges, nil
Expand Down

0 comments on commit e0a9fcc

Please sign in to comment.