Skip to content

Commit

Permalink
GODRIVER-1615 Track wire version on ChangeStream
Browse files Browse the repository at this point in the history
This was needed to fix test errors. Because the deployment was changed
and the new deployment closes the connection right after the aggregate,
any code that access the wire version after the aggregate finishes would
panic. To fix that, we store the wire version of the last used connection
on ChangeStream.
  • Loading branch information
Divjot Arora committed Jun 3, 2020
1 parent 012750a commit 630989b
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions mongo/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type ChangeStream struct {
options *options.ChangeStreamOptions
selector description.ServerSelector
operationTime *primitive.Timestamp
wireVersion *description.VersionRange
}

type changeStreamConfig struct {
Expand Down Expand Up @@ -183,11 +184,12 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err
}

defer conn.Close()
cs.wireVersion = conn.Description().WireVersion

cs.aggregate.Deployment(cs.createOperationDeployment(server, conn))

if resuming {
cs.replaceOptions(ctx, conn.Description().WireVersion) // pass wire version
cs.replaceOptions(ctx, cs.wireVersion) // pass wire version

csOptDoc := cs.createPipelineOptionsDoc()
pipIdx, pipDoc := bsoncore.AppendDocumentStart(nil)
Expand All @@ -205,8 +207,7 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err
}

if original := cs.aggregate.Execute(ctx); original != nil {
wireVersion := conn.Description().WireVersion
retryableRead := cs.client.retryReads && wireVersion != nil && wireVersion.Max >= 6
retryableRead := cs.client.retryReads && cs.wireVersion != nil && cs.wireVersion.Max >= 6
if !retryableRead {
cs.err = replaceErrors(original)
return cs.err
Expand All @@ -230,9 +231,9 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err
break
}
defer conn.Close()
cs.wireVersion = conn.Description().WireVersion

wireVersion := conn.Description().WireVersion
if wireVersion == nil || wireVersion.Max < 6 {
if cs.wireVersion == nil || cs.wireVersion.Max < 6 {
break
}

Expand All @@ -258,7 +259,7 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err

cs.updatePbrtFromCommand()
if cs.options.StartAtOperationTime == nil && cs.options.ResumeAfter == nil &&
cs.options.StartAfter == nil && conn.Description().WireVersion.Max >= 7 &&
cs.options.StartAfter == nil && cs.wireVersion.Max >= 7 &&
cs.emptyBatch() && cs.resumeToken == nil {
cs.operationTime = cs.sess.OperationTime
}
Expand Down

0 comments on commit 630989b

Please sign in to comment.