Skip to content

Commit

Permalink
Start at operation time implementation (#978)
Browse files Browse the repository at this point in the history
* RUBY-1369 Collection change stream integration test

* RUBY-1369 try_next implementation

* RUBY-1369 Move to spec/integration

* RUBY-1369 Got try_next working when there are changes

* RUBY-1369 Test try_next with no changes

* RUBY-1369 Document try_next

* RUBY-1369 Change change stream resume logic more to retry once only

* RUBY-1369 Apply current resume behavior to try_next

* RUBY-1369 Add docstrings for change_stream_resumable?

* RUBY-1369 Document how to restart change streams and when they are resumed by the driver automatically

* RUBY-1369 failCommand is 4.0+ only

* RUBY-1369 These are now non-resumed

* RUBY-1369 Repair the logic yet again.

The concept of "retrying once" is apparently much more difficult than it first appears

* RUBY-1369 Repair this test again and try another way to not have it be stuck in a getmore loop

* RUBY-1369 Use timeout-interrupt to stop tests hanging forever on change stream reads

* RUBY-1369 Force a successful getMore prior to failing them.

I believe change stream drops documents upon a reset without startAtOperationTime,
hence right now we can't reliably test initial getMores failing

* RUBY-1369 Need to clear fail points only on tests using them if I want to run others on servers < 4.0
  • Loading branch information
p-mongo authored Jun 26, 2018
1 parent 124568d commit b147f78
Show file tree
Hide file tree
Showing 17 changed files with 669 additions and 36 deletions.
6 changes: 6 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,9 @@ group :development do
gem 'pry-rescue'
gem 'pry-nav'
end

group :testing do
platforms :mri do
gem 'timeout-interrupt'
end
end
21 changes: 21 additions & 0 deletions docs/tutorials/ruby-driver-change-streams.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,24 @@ You can close a change stream by calling the ``#close`` method:
doc = stream.to_enum.next
process(doc)
stream.close

Resuming a Change Stream
------------------------

The driver will automatically retry getMore operations on a change stream
once. Initial aggregation is never retried. In practical terms this means
that, for example:

- Calling ``collection.watch`` will fail if the cluster does not have
enough available nodes to satisfy the ``"majority"`` read preference;
- Once ``collection.watch`` successfully returns, if the cluster subsequently
experiences an election or loses a node, but heals quickly enough,
change stream reads via ``next`` or ``each`` methods will continue
transparently to the application.

If the cluster loses enough nodes to not be able to satisfy the ``"majority"``
read preference and does not heal quickly enough, ``next`` and ``each``
will raise an error. In these cases the application must track, via the
resume token, which documents from the change stream it has processed and
create a new change stream object via the ``watch`` call, passing an
appropriate ``:resume_after`` argument.
7 changes: 4 additions & 3 deletions lib/mongo/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -487,9 +487,10 @@ def start_session(options = {})
# @option options [ Integer ] :batch_size The number of documents to return per batch.
# @option options [ BSON::Document, Hash ] :collation The collation to use.
# @option options [ Session ] :session The session to use.
# @option options [ BSON::Timestamp ] :start_at_cluster_time Only return changes that occurred
# after the specified timestamp. Any command run against the server will return a cluster time
# that can be used here. Only valid in server versions 4.0+.
# @option options [ BSON::Timestamp ] :start_at_operation_time Only return
# changes that occurred at or after the specified timestamp. Any command run
# against the server will return a cluster time that can be used here.
# Only recognized by server versions 4.0+.
#
# @note A change stream only allows 'majority' read concern.
# @note This helper method is preferable to running a raw aggregation with a $changeStream
Expand Down
7 changes: 4 additions & 3 deletions lib/mongo/collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,10 @@ def aggregate(pipeline, options = {})
# per batch.
# @option options [ BSON::Document, Hash ] :collation The collation to use.
# @option options [ Session ] :session The session to use.
# @option options [ BSON::Timestamp ] :start_at_cluster_time Only return changes that occurred
# after the specified timestamp. Any command run against the server will return a cluster time
# that can be used here. Only valid in server versions 4.0+.
# @option options [ BSON::Timestamp ] :start_at_operation_time Only return
# changes that occurred at or after the specified timestamp. Any command run
# against the server will return a cluster time that can be used here.
# Only recognized by server versions 4.0+.
#
# @note A change stream only allows 'majority' read concern.
# @note This helper method is preferable to running a raw aggregation with
Expand Down
151 changes: 143 additions & 8 deletions lib/mongo/collection/view/change_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ class ChangeStream < Aggregation
# on new documents to satisfy a change stream query.
# @option options [ Integer ] :batch_size The number of documents to return per batch.
# @option options [ BSON::Document, Hash ] :collation The collation to use.
# @option options [ BSON::Timestamp ] :start_at_cluster_time Only return changes that occurred
# after the specified timestamp. Any command run against the server will return a cluster time
# that can be used here. Only valid in server versions 4.0+.
# @option options [ BSON::Timestamp ] :start_at_operation_time Only
# return changes that occurred at or after the specified timestamp. Any
# command run against the server will return a cluster time that can
# be used here. Only recognized by server versions 4.0+.
#
# @since 2.5.0
def initialize(view, pipeline, changes_for, options = {})
Expand All @@ -88,11 +89,19 @@ def initialize(view, pipeline, changes_for, options = {})
@change_stream_filters = pipeline && pipeline.dup
@options = options && options.dup.freeze
@resume_token = @options[:resume_after]
read_with_one_retry { create_cursor! }
create_cursor!

# We send different parameters when we resume a change stream
# compared to when we send the first query
@resuming = true
end

# Iterate through documents returned by the change stream.
#
# This method retries once per error on resumable errors
# (two consecutive errors result in the second error being raised,
# an error which is recovered from resets the error count to zero).
#
# @example Iterate through the stream of documents.
# stream.each do |document|
# p document
Expand All @@ -105,20 +114,82 @@ def initialize(view, pipeline, changes_for, options = {})
# @yieldparam [ BSON::Document ] Each change stream document.
def each
raise StopIteration.new if closed?
retried = false
begin
@cursor.each do |doc|
cache_resume_token(doc)
yield doc
end if block_given?
@cursor.to_enum
rescue => e
rescue Mongo::Error => e
if retried || !e.change_stream_resumable?
raise
end

retried = true
# Rerun initial aggregation.
# Any errors here will stop iteration and break out of this
# method
close
if retryable?(e)
create_cursor!
retry
end
end

# Return one document from the change stream, if one is available.
#
# Retries once on a resumable error.
#
# Raises StopIteration if the change stream is closed.
#
# This method will wait up to max_await_time_ms milliseconds
# for changes from the server, and if no changes are received
# it will return nil.
#
# @note This method is experimental and subject to change.
#
# @return [ BSON::Document | nil ] A change stream document.
# @api private
def try_next
raise StopIteration.new if closed?
retried = false

begin
doc = @cursor.try_next
rescue Mongo::Error => e
unless e.change_stream_resumable?
raise
end

if retried
# Rerun initial aggregation.
# Any errors here will stop iteration and break out of this
# method
close
create_cursor!
retried = false
else
# Attempt to retry a getMore once
retried = true
retry
end
raise
end

if doc
cache_resume_token(doc)
end
doc
end

def to_enum
enum = super
enum.send(:instance_variable_set, '@obj', self)
class << enum
def try_next
@obj.try_next
end
end
enum
end

# Close the change stream.
Expand Down Expand Up @@ -176,15 +247,30 @@ def for_collection?
end

def cache_resume_token(doc)
# Always record both resume token and operation time,
# in case we get an older or newer server during rolling
# upgrades/downgrades
unless @resume_token = (doc[:_id] && doc[:_id].dup)
raise Error::MissingResumeToken.new
raise Error::MissingResumeToken
end
end

def create_cursor!
# clear the cache because we may get a newer or an older server
# (rolling upgrades)
@start_at_operation_time_supported = nil

session = client.send(:get_session, @options)
server = server_selector.select_server(cluster)
result = send_initial_query(server, session)
if doc = result.replies.first && result.replies.first.documents.first
@start_at_operation_time = doc['operationTime']
else
# The above may set @start_at_operation_time to nil
# if it was not in the document for some reason,
# for consistency set it to nil here as well
@start_at_operation_time = nil
end
@cursor = Cursor.new(view, result, server, disable_retry: true, session: session)
end

Expand All @@ -200,6 +286,32 @@ def aggregate_spec(session)

def change_doc
{ fullDocument: ( @options[:full_document] || FULL_DOCUMENT_DEFAULT ) }.tap do |doc|
if resuming?
# We have a resume token once we retrieved any documents.
# However, if the first getMore fails and the user didn't pass
# a resume token we won't have a resume token to use.
# Use start_at_operation time in this case
if @resume_token
# Spec says we need to remove startAtOperationTime if
# one was passed in by user, thus we won't forward it
elsif start_at_operation_time_supported? && @start_at_operation_time
# It is crucial to check @start_at_operation_time_supported
# here - we may have switched to an older server that
# does not support operation times and therefore shouldn't
# try to send one to it!
#
# @start_at_operation_time is already a BSON::Timestamp
doc[:startAtOperationTime] = @start_at_operation_time
else
# Can't resume if we don't have either
raise Mongo::Error::MissingResumeToken
end
else
if options[:start_at_operation_time]
doc[:startAtOperationTime] = time_to_bson_timestamp(
options[:start_at_operation_time])
end
end
doc[:resumeAfter] = @resume_token if @resume_token
doc[:allChangesForCluster] = true if for_cluster?
end
Expand All @@ -208,6 +320,29 @@ def change_doc
def send_initial_query(server, session)
initial_query_op(session).execute(server)
end

def time_to_bson_timestamp(time)
if time.is_a?(Time)
seconds = time.to_f
BSON::Timestamp.new(seconds.to_i, ((seconds - seconds.to_i) * 1000000).to_i)
elsif time.is_a?(BSON::Timestamp)
time
else
raise ArgumentError, 'Time must be a Time or a BSON::Timestamp instance'
end
end

def resuming?
!!@resuming
end

def start_at_operation_time_supported?
if @start_at_operation_time_supported.nil?
server = server_selector.select_server(cluster)
@start_at_operation_time_supported = server.description.max_wire_version >= 7
end
@start_at_operation_time_supported
end
end
end
end
Expand Down
16 changes: 2 additions & 14 deletions lib/mongo/collection/view/change_stream/retryable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,13 @@ module Retryable

def read_with_one_retry
yield
rescue => e
if retryable?(e)
rescue Mongo::Error => e
if e.change_stream_resumable?
yield
else
raise(e)
end
end

def retryable?(error)
network_error?(error) || retryable_operation_failure?(error)
end

def network_error?(error)
[ Error::SocketError, Error::SocketTimeoutError].include?(error.class)
end

def retryable_operation_failure?(error)
error.is_a?(Error::OperationFailure) && error.change_stream_resumable?
end
end
end
end
Expand Down
40 changes: 40 additions & 0 deletions lib/mongo/cursor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,46 @@ def each
end
end

# Return one document from the query, if one is available.
#
# Retries once on a resumable error.
#
# This method will wait up to max_await_time_ms milliseconds
# for changes from the server, and if no changes are received
# it will return nil.
#
# @note This method is experimental and subject to change.
#
# @return [ BSON::Document | nil ] A document.
# @api private
def try_next
if @documents.nil?
@documents = process(@initial_result)
# the documents here can be an empty array, hence
# we may end up issuing a getMore in the first try_next call
end

if @documents.empty?
if more?
if exhausted?
kill_cursors
return nil
end

@documents = get_more
end
else
# cursor is closed here
# keep documents as an empty array
end

if @documents
return @documents.shift
end

nil
end

# Get the batch size.
#
# @example Get the batch size.
Expand Down
7 changes: 4 additions & 3 deletions lib/mongo/database.rb
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,10 @@ def users
# @option options [ Integer ] :batch_size The number of documents to return per batch.
# @option options [ BSON::Document, Hash ] :collation The collation to use.
# @option options [ Session ] :session The session to use.
# @option options [ BSON::Timestamp ] :start_at_cluster_time Only return changes that occurred
# after the specified timestamp. Any command run against the server will return a cluster time
# that can be used here. Only valid in server versions 4.0+.
# @option options [ BSON::Timestamp ] :start_at_operation_time Only return
# changes that occurred after the specified timestamp. Any command run
# against the server will return a cluster time that can be used here.
# Only recognized by server versions 4.0+.
#
# @note A change stream only allows 'majority' read concern.
# @note This helper method is preferable to running a raw aggregation with a $changeStream
Expand Down
14 changes: 14 additions & 0 deletions lib/mongo/error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ class Error < StandardError
# @since 2.2.3
CURSOR_NOT_FOUND = 'Cursor not found.'

# Can the change stream on which this error occurred be resumed,
# provided the operation that triggered this error was a getMore?
#
# @example Is the error resumable for the change stream?
# error.change_stream_resumable?
#
# @return [ true, false ] Whether the error is resumable.
#
# @since 2.6.0
def change_stream_resumable?
false
end

# Error label describing commitTransaction errors that may or may not occur again if a commit is
# manually retried by the user.
#
Expand Down Expand Up @@ -111,6 +124,7 @@ def add_label(label)

require 'mongo/error/parser'
require 'mongo/error/write_retryable'
require 'mongo/error/change_stream_resumable'
require 'mongo/error/bulk_write_error'
require 'mongo/error/closed_stream'
require 'mongo/error/extra_file_chunk'
Expand Down
Loading

0 comments on commit b147f78

Please sign in to comment.