diff --git a/mongo/integration/client_test.go b/mongo/integration/client_test.go index c6ba613604..3d82c1ec95 100644 --- a/mongo/integration/client_test.go +++ b/mongo/integration/client_test.go @@ -7,6 +7,7 @@ package integration import ( + "context" "fmt" "os" "reflect" @@ -456,6 +457,13 @@ func TestClient(t *testing.T) { mode := modeVal.StringValue() assert.Equal(mt, mode, "primaryPreferred", "expected read preference mode primaryPreferred, got %v", mode) }) + + // Test that using a client with minPoolSize set doesn't cause a data race. + mtOpts = mtest.NewOptions().ClientOptions(options.Client().SetMinPoolSize(5)) + mt.RunOpts("minPoolSize", mtOpts, func(mt *mtest.T) { + err := mt.Client.Ping(context.Background(), readpref.Primary()) + assert.Nil(t, err, "unexpected error calling Ping: %v", err) + }) } type proxyMessage struct { diff --git a/x/mongo/driver/topology/connection.go b/x/mongo/driver/topology/connection.go index e23aeeaf1b..4255a7e2a3 100644 --- a/x/mongo/driver/topology/connection.go +++ b/x/mongo/driver/topology/connection.go @@ -46,6 +46,7 @@ type connection struct { idleDeadline atomic.Value // Stores a time.Time readTimeout time.Duration writeTimeout time.Duration + descMu sync.RWMutex // Guards desc. TODO: Remove with or after GODRIVER-2038. desc description.Server isMasterRTT time.Duration compressor wiremessage.CompressorID @@ -228,7 +229,9 @@ func (c *connection) connect(ctx context.Context) { if err == nil { // We only need to retain the Description field as the connection's description. The authentication-related // fields in handshakeInfo are tracked by the handshaker if necessary. + c.descMu.Lock() c.desc = handshakeInfo.Description + c.descMu.Unlock() c.isMasterRTT = time.Since(handshakeStartTime) // If the application has indicated that the cluster is load balanced, ensure the server has included serviceId diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index 21bc2a69ec..16a3c31d09 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -196,7 +196,14 @@ func newPool(config poolConfig, connOpts ...ConnectionOption) (*pool, error) { // stale checks if a given connection's generation is below the generation of the pool func (p *pool) stale(c *connection) bool { - return c == nil || p.generation.stale(c.desc.ServiceID, c.generation) + if c == nil { + return true + } + + c.descMu.RLock() + serviceID := c.desc.ServiceID + c.descMu.RUnlock() + return p.generation.stale(serviceID, c.generation) } // connect puts the pool into the connected state, allowing it to be used and will allow items to begin being processed from the wait queue @@ -532,7 +539,10 @@ func (p *pool) removeConnection(c *connection, reason string) error { // Only update the generation numbers map if the connection has retrieved its generation number. Otherwise, we'd // decrement the count for the generation even though it had never been incremented. if c.hasGenerationNumber() { - p.generation.removeConnection(c.desc.ServiceID) + c.descMu.RLock() + serviceID := c.desc.ServiceID + c.descMu.RUnlock() + p.generation.removeConnection(serviceID) } if publishEvent && p.monitor != nil {