Skip to content

Commit

Permalink
Merge pull request #4 from ajwerner/ajwerner/eliminate-deadlock-in-ch…
Browse files Browse the repository at this point in the history
…annel-send

eliminate deadlocks in Subscribe and AddListener
  • Loading branch information
ajwerner authored Jan 8, 2019
2 parents 4f5b168 + 4379391 commit 0e362d8
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 6 deletions.
26 changes: 20 additions & 6 deletions circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,22 @@ func NewRateBreaker(rate float64, minSamples int64) *Breaker {

// Subscribe returns a channel of BreakerEvents. Whenever the breaker changes state,
// the state will be sent over the channel. See BreakerEvent for the types of events.
// Note that events may be dropped or not sent so clients should not rely on
// events for program correctness.
func (cb *Breaker) Subscribe() <-chan BreakerEvent {
eventReader := make(chan BreakerEvent)
output := make(chan BreakerEvent, 100)

go func() {
for v := range eventReader {
trySend:
select {
case output <- v:
default:
<-output
output <- v
select {
case <-output:
default:
}
goto trySend
}
}
}()
Expand All @@ -210,6 +215,8 @@ func (cb *Breaker) Subscribe() <-chan BreakerEvent {

// AddListener adds a channel of ListenerEvents on behalf of a listener.
// The listener channel must be buffered.
// Note that events may be dropped or not sent so clients should not rely on
// events for program correctness.
func (cb *Breaker) AddListener(listener chan ListenerEvent) {
cb.listeners = append(cb.listeners, listener)
}
Expand Down Expand Up @@ -336,7 +343,9 @@ func (cb *Breaker) Call(circuit func() error, timeout time.Duration) error {

// CallContext is same as Call but if the ctx is canceled after the circuit returned an error,
// the error will not be marked as a failure because the call was canceled intentionally.
func (cb *Breaker) CallContext(ctx context.Context, circuit func() error, timeout time.Duration) error {
func (cb *Breaker) CallContext(
ctx context.Context, circuit func() error, timeout time.Duration,
) error {
var err error

if !cb.Ready() {
Expand Down Expand Up @@ -406,11 +415,16 @@ func (cb *Breaker) sendEvent(event BreakerEvent) {
}
for _, listener := range cb.listeners {
le := ListenerEvent{CB: cb, Event: event}
trySend:
select {
case listener <- le:
default:
<-listener
listener <- le
// The channel was full so attempt to pull off of it and send again.
select {
case <-listener:
default:
}
goto trySend
}
}
}
Expand Down
66 changes: 66 additions & 0 deletions circuitbreaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package circuit
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -518,3 +519,68 @@ func TestPartialSecondBackoff(t *testing.T) {
t.Fatalf("expected breaker to be ready after more than nextBackoff time had passed")
}
}

// TestNoDeadlockOnChannelSends ensures that the behavior of channel sends in
// the face of concurrent events and consumers does not lead to deadlock.
func TestNoDeadlockOnChannelSends(t *testing.T) {
const listeners = 1000
const subscribers = 1000
const resetters = 3
b := NewBreakerWithOptions(nil)
var lcs []chan ListenerEvent
for i := 0; i < listeners; i++ {
lcs = append(lcs, make(chan ListenerEvent, 1))
b.AddListener(lcs[i])
}
var scs []<-chan BreakerEvent
for i := 0; i < subscribers; i++ {
scs = append(scs, b.Subscribe())
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
var wg sync.WaitGroup
readFromSubscribeChan := func(sc <-chan BreakerEvent) {
defer wg.Done()
for {
select {
case <-sc:
case <-ctx.Done():
return
}
}
}
readFromListenerChan := func(lc chan ListenerEvent) {
defer wg.Done()
for {
select {
case <-lc:
case <-ctx.Done():
return
}
}
}
tripAndReset := func() {
defer wg.Done()
for i := 0; true; i++ {
// Keep sending a bit after the other goroutine exits.
if i%1000 == 0 && ctx.Err() != nil {
return
}
b.Reset()
b.Trip()
}
}
for _, lc := range lcs {
wg.Add(1)
go readFromListenerChan(lc)
}
for _, sc := range scs {
wg.Add(1)
go readFromSubscribeChan(sc)
}
for i := 0; i < resetters; i++ {
wg.Add(1)
go tripAndReset()
}
wg.Wait()
}

0 comments on commit 0e362d8

Please sign in to comment.