From 630989b0608d06a1362d3f6ef0f4811aec55e00e Mon Sep 17 00:00:00 2001 From: Divjot Arora Date: Wed, 3 Jun 2020 10:28:32 -0400 Subject: [PATCH] GODRIVER-1615 Track wire version on ChangeStream This was needed to fix test errors. Because the deployment was changed and the new deployment closes the connection right after the aggregate, any code that access the wire version after the aggregate finishes would panic. To fix that, we store the wire version of the last used connection on ChangeStream. --- mongo/change_stream.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/mongo/change_stream.go b/mongo/change_stream.go index df2bbb0362..a377bd42be 100644 --- a/mongo/change_stream.go +++ b/mongo/change_stream.go @@ -59,6 +59,7 @@ type ChangeStream struct { options *options.ChangeStreamOptions selector description.ServerSelector operationTime *primitive.Timestamp + wireVersion *description.VersionRange } type changeStreamConfig struct { @@ -183,11 +184,12 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err } defer conn.Close() + cs.wireVersion = conn.Description().WireVersion cs.aggregate.Deployment(cs.createOperationDeployment(server, conn)) if resuming { - cs.replaceOptions(ctx, conn.Description().WireVersion) // pass wire version + cs.replaceOptions(ctx, cs.wireVersion) // pass wire version csOptDoc := cs.createPipelineOptionsDoc() pipIdx, pipDoc := bsoncore.AppendDocumentStart(nil) @@ -205,8 +207,7 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err } if original := cs.aggregate.Execute(ctx); original != nil { - wireVersion := conn.Description().WireVersion - retryableRead := cs.client.retryReads && wireVersion != nil && wireVersion.Max >= 6 + retryableRead := cs.client.retryReads && cs.wireVersion != nil && cs.wireVersion.Max >= 6 if !retryableRead { cs.err = replaceErrors(original) return cs.err @@ -230,9 +231,9 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err break } defer conn.Close() + cs.wireVersion = conn.Description().WireVersion - wireVersion := conn.Description().WireVersion - if wireVersion == nil || wireVersion.Max < 6 { + if cs.wireVersion == nil || cs.wireVersion.Max < 6 { break } @@ -258,7 +259,7 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err cs.updatePbrtFromCommand() if cs.options.StartAtOperationTime == nil && cs.options.ResumeAfter == nil && - cs.options.StartAfter == nil && conn.Description().WireVersion.Max >= 7 && + cs.options.StartAfter == nil && cs.wireVersion.Max >= 7 && cs.emptyBatch() && cs.resumeToken == nil { cs.operationTime = cs.sess.OperationTime }