Skip to content

Commit

Permalink
GODRIVER-2109 Prevent a data race between connecting and checking out…
Browse files Browse the repository at this point in the history
… a connection from the resourcePool. (#728)
  • Loading branch information
matthewdale committed Sep 1, 2021
1 parent 1e6b456 commit 7641385
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 2 deletions.
8 changes: 8 additions & 0 deletions mongo/integration/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package integration

import (
"context"
"fmt"
"os"
"reflect"
Expand Down Expand Up @@ -456,6 +457,13 @@ func TestClient(t *testing.T) {
mode := modeVal.StringValue()
assert.Equal(mt, mode, "primaryPreferred", "expected read preference mode primaryPreferred, got %v", mode)
})

// Test that using a client with minPoolSize set doesn't cause a data race.
mtOpts = mtest.NewOptions().ClientOptions(options.Client().SetMinPoolSize(5))
mt.RunOpts("minPoolSize", mtOpts, func(mt *mtest.T) {
err := mt.Client.Ping(context.Background(), readpref.Primary())
assert.Nil(t, err, "unexpected error calling Ping: %v", err)
})
}

type proxyMessage struct {
Expand Down
3 changes: 3 additions & 0 deletions x/mongo/driver/topology/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type connection struct {
idleDeadline atomic.Value // Stores a time.Time
readTimeout time.Duration
writeTimeout time.Duration
descMu sync.RWMutex // Guards desc. TODO: Remove with or after GODRIVER-2038.
desc description.Server
isMasterRTT time.Duration
compressor wiremessage.CompressorID
Expand Down Expand Up @@ -228,7 +229,9 @@ func (c *connection) connect(ctx context.Context) {
if err == nil {
// We only need to retain the Description field as the connection's description. The authentication-related
// fields in handshakeInfo are tracked by the handshaker if necessary.
c.descMu.Lock()
c.desc = handshakeInfo.Description
c.descMu.Unlock()
c.isMasterRTT = time.Since(handshakeStartTime)

// If the application has indicated that the cluster is load balanced, ensure the server has included serviceId
Expand Down
14 changes: 12 additions & 2 deletions x/mongo/driver/topology/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,14 @@ func newPool(config poolConfig, connOpts ...ConnectionOption) (*pool, error) {

// stale checks if a given connection's generation is below the generation of the pool
func (p *pool) stale(c *connection) bool {
return c == nil || p.generation.stale(c.desc.ServiceID, c.generation)
if c == nil {
return true
}

c.descMu.RLock()
serviceID := c.desc.ServiceID
c.descMu.RUnlock()
return p.generation.stale(serviceID, c.generation)
}

// connect puts the pool into the connected state, allowing it to be used and will allow items to begin being processed from the wait queue
Expand Down Expand Up @@ -532,7 +539,10 @@ func (p *pool) removeConnection(c *connection, reason string) error {
// Only update the generation numbers map if the connection has retrieved its generation number. Otherwise, we'd
// decrement the count for the generation even though it had never been incremented.
if c.hasGenerationNumber() {
p.generation.removeConnection(c.desc.ServiceID)
c.descMu.RLock()
serviceID := c.desc.ServiceID
c.descMu.RUnlock()
p.generation.removeConnection(serviceID)
}

if publishEvent && p.monitor != nil {
Expand Down

0 comments on commit 7641385

Please sign in to comment.