Skip to content

Commit

Permalink
stop caches being used after close
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox committed Dec 6, 2024
1 parent 5a71d52 commit a27fedf
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 7 deletions.
2 changes: 1 addition & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func (a *Agent) Stop() error {
return err
}
a.controlClientCache.Close()
a.connCaches.Close()
if err := a.compactionWorkersService.Stop(); err != nil {
return err
}
Expand All @@ -243,7 +244,6 @@ func (a *Agent) Stop() error {
if err := a.controller.Stop(); err != nil {
return err
}
a.connCaches.Close()
a.started = false
return nil
}
Expand Down
15 changes: 12 additions & 3 deletions control/client_cache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package control

import (
"github.com/pkg/errors"
log "github.com/spirit-labs/tektite/logger"
"github.com/spirit-labs/tektite/lsm"
"github.com/spirit-labs/tektite/offsets"
Expand All @@ -16,6 +17,7 @@ type ClientCache struct {
clients []*clientWrapper
pos int64
injectedError error
closed bool
}

type ClientFactory func() (Client, error)
Expand All @@ -34,19 +36,25 @@ func (cc *ClientCache) SetInjectedError(err error) {
}

func (cc *ClientCache) GetClient() (Client, error) {
cl, index := cc.getCachedClient()
cl, index, err := cc.getCachedClient()
if err != nil {
return nil, err
}
if cl != nil {
return cl, nil
}
return cc.createClient(index)
}

func (cc *ClientCache) getCachedClient() (*clientWrapper, int) {
func (cc *ClientCache) getCachedClient() (*clientWrapper, int, error) {
cc.lock.RLock()
defer cc.lock.RUnlock()
if cc.closed {
return nil, 0, errors.New("client cache is closed")
}
pos := atomic.AddInt64(&cc.pos, 1) - 1
index := int(pos) % len(cc.clients)
return cc.clients[index], index
return cc.clients[index], index, nil
}

func (cc *ClientCache) createClient(index int) (*clientWrapper, error) {
Expand Down Expand Up @@ -86,6 +94,7 @@ func (cc *ClientCache) Close() {
}
}
}
cc.closed = true
}

type clientWrapper struct {
Expand Down
15 changes: 12 additions & 3 deletions transport/conn_cache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package transport

import (
"github.com/pkg/errors"
log "github.com/spirit-labs/tektite/logger"
"sync"
"sync/atomic"
Expand All @@ -13,6 +14,7 @@ type ConnectionCache struct {
connFactory ConnectionFactory
connections []*connectionWrapper
pos int64
closed bool
}

func NewConnectionCache(address string, maxConnections int, connFactory ConnectionFactory) *ConnectionCache {
Expand All @@ -24,7 +26,10 @@ func NewConnectionCache(address string, maxConnections int, connFactory Connecti
}

func (cc *ConnectionCache) GetConnection() (Connection, error) {
cl, index := cc.getCachedConnection()
cl, index, err := cc.getCachedConnection()
if err != nil {
return nil, err
}
if cl != nil {
return cl, nil
}
Expand All @@ -42,6 +47,7 @@ func (cc *ConnectionCache) Close() {
}
cc.connections[i] = nil
}
cc.closed = true
}

func (cc *ConnectionCache) NumConnections() int {
Expand All @@ -56,12 +62,15 @@ func (cc *ConnectionCache) NumConnections() int {
return num
}

func (cc *ConnectionCache) getCachedConnection() (*connectionWrapper, int) {
func (cc *ConnectionCache) getCachedConnection() (*connectionWrapper, int, error) {
cc.lock.RLock()
defer cc.lock.RUnlock()
if cc.closed {
return nil, 0, errors.New("connection cache is closed")
}
pos := atomic.AddInt64(&cc.pos, 1) - 1
index := int(pos) % len(cc.connections)
return cc.connections[index], index
return cc.connections[index], index, nil
}

func (cc *ConnectionCache) createConnection(index int) (*connectionWrapper, error) {
Expand Down

0 comments on commit a27fedf

Please sign in to comment.