Skip to content

Commit

Permalink
IsClosed connection
Browse files Browse the repository at this point in the history
Signed-off-by: Dusan Malusev <dusan@dusanmalusev.dev>
  • Loading branch information
CodeLieutenant committed Feb 23, 2024
1 parent 039d2d7 commit 6080ecf
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 46 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# 2.0.7

## Features

* Connection IsClosed method added

## Bufix

Expand Down
5 changes: 5 additions & 0 deletions connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ func (c *Connection) hasChannelClosed(err error) bool {
return errors.Is(err, amqp091.ErrClosed) && !c.conn.Load().IsClosed()
}

func (c *Connection) IsClosed() bool {
conn := c.conn.Load()
return conn != nil && conn.IsClosed()
}

func (c *Connection) handleReconnect(ctx context.Context, connection *amqp091.Connection, connect func(ctx context.Context) error) {
notifyClose := connection.NotifyClose(make(chan *amqp091.Error))
for {
Expand Down
37 changes: 0 additions & 37 deletions publisher/guard_lock.go

This file was deleted.

18 changes: 9 additions & 9 deletions publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type (
exchangeName string
routingKey string
wg sync.WaitGroup
ready guardLock
ready sync.RWMutex
closing atomic.Bool
gettingCh atomic.Bool
}
Expand Down Expand Up @@ -79,6 +79,7 @@ func (p *Publisher[T]) swapChannel(connection *amqp091.Connection, cfg Config[T]
}

p.ch = chOrigin
p.gettingCh.Store(false)
return notifyClose, nil
}

Expand All @@ -99,33 +100,32 @@ func (p *Publisher[T]) onConnectionReady(cfg Config[T]) connection.OnConnectionR
for {
select {
case <-ctx.Done():
p.closing.Store(true)
p.ready.Lock()
defer p.ready.Unlock()
if !p.ch.IsClosed() {
if err := p.ch.Close(); err != nil {
cfg.logger.Error("Failed to close channel: %v", err)
}
}
p.ready.Unlock()
return
case <-errCh:
notifyClose, err = p.swapChannel(connection, cfg)
if err != nil {
errCh <- err
} else {
p.gettingCh.Store(false)
}
case err, ok := <-notifyClose:
if !ok {
return
}

p.ready.Lock()

if connection.IsClosed() {
cfg.logger.Error("Connection closed")
return
}

p.gettingCh.CompareAndSwap(false, true)

cfg.logger.Error("Channel Error: %v", err)
// When connection is still open and channel is closed we need to create new channel
// and throw away the old one
Expand Down Expand Up @@ -205,7 +205,7 @@ func New[T any](exchangeName string, options ...Option[T]) (*Publisher[T], error
conn, err := connection.New(ctx, cfg.connectionOptions, connection.Events{
OnConnectionReady: publisher.onConnectionReady(cfg),
OnBeforeConnectionReady: func(ctx context.Context) error {
publisher.ready.Lock()
publisher.gettingCh.Store(true)
return nil
},
OnError: cfg.onError,
Expand All @@ -223,7 +223,7 @@ type PublishConfig struct {
}

func (p *Publisher[T]) Publish(ctx context.Context, msg T, config ...PublishConfig) error {
if p.closing.Load() {
if p.conn.IsClosed() {
return ErrClosed
}

Expand Down Expand Up @@ -259,7 +259,7 @@ func (p *Publisher[T]) Close() error {
p.closing.Store(true)
p.ready.Lock()
defer p.ready.Unlock()

p.cancel()
p.wg.Wait()
p.ch = nil
Expand Down

0 comments on commit 6080ecf

Please sign in to comment.