diff --git a/lib/mongo/collection/view/aggregation.rb b/lib/mongo/collection/view/aggregation.rb index 87ea8e5afb..ccbd5e09e0 100644 --- a/lib/mongo/collection/view/aggregation.rb +++ b/lib/mongo/collection/view/aggregation.rb @@ -15,6 +15,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +require 'mongo/collection/view/aggregation/behavior' + module Mongo class Collection class View @@ -23,46 +25,11 @@ class View # # @since 2.0.0 class Aggregation - extend Forwardable - include Enumerable - include Immutable - include Iterable - include Explainable - include Loggable - include Retryable + include Behavior - # @return [ View ] view The collection view. - attr_reader :view # @return [ Array ] pipeline The aggregation pipeline. attr_reader :pipeline - # Delegate necessary operations to the view. - def_delegators :view, :collection, :read, :cluster, :cursor_type - - # Delegate necessary operations to the collection. - def_delegators :collection, :database, :client - - # The reroute message. - # - # @since 2.1.0 - # @deprecated - REROUTE = 'Rerouting the Aggregation operation to the primary server.'.freeze - - # Set to true if disk usage is allowed during the aggregation. - # - # @example Set disk usage flag. - # aggregation.allow_disk_use(true) - # - # @param [ true, false ] value The flag value. - # - # @return [ true, false, Aggregation ] The aggregation if a value was - # set or the value if used as a getter. - # - # @since 2.0.0 - def allow_disk_use(value = nil) - configure(:allow_disk_use, value) - end - # Initialize the aggregation for the provided collection view, pipeline # and options. # @@ -96,63 +63,16 @@ def allow_disk_use(value = nil) # # @since 2.0.0 def initialize(view, pipeline, options = {}) - @view = view - @pipeline = pipeline.dup - - @timeout_ms = options.delete(:timeout_ms) - @options = BSON::Document.new(options).freeze - - validate_timeout_mode!(options) - - unless Mongo.broken_view_aggregate || view.filter.empty? - @pipeline.unshift(:$match => view.filter) + perform_setup(view, options) do + @pipeline = pipeline.dup + unless Mongo.broken_view_aggregate || view.filter.empty? + @pipeline.unshift(:$match => view.filter) + end end end - # @return [ Integer | nil ] the timeout_ms value that was passed as - # an option to this object, or which was inherited from the view. - # - # @api private - def timeout_ms - @timeout_ms || view.timeout_ms - end - - # Get the explain plan for the aggregation. - # - # @example Get the explain plan for the aggregation. - # aggregation.explain - # - # @return [ Hash ] The explain plan. - # - # @since 2.0.0 - def explain - self.class.new(view, pipeline, options.merge(explain: true)).first - end - - # Whether this aggregation will write its result to a database collection. - # - # @return [ Boolean ] Whether the aggregation will write its result - # to a collection. - # - # @api private - def write? - pipeline.any? { |op| op.key?('$out') || op.key?(:$out) || op.key?('$merge') || op.key?(:$merge) } - end - private - def server_selector - @view.send(:server_selector) - end - - def aggregate_spec(session, read_preference) - Builder::Aggregation.new( - pipeline, - view, - options.merge(session: session, read_preference: read_preference) - ).specification - end - def new(options) Aggregation.new(view, pipeline, options) end @@ -205,35 +125,6 @@ def send_initial_query(server, context) ) end end - - # Skip, sort, limit, projection are specified as pipeline stages - # rather than as options. - def cache_options - { - namespace: collection.namespace, - selector: pipeline, - read_concern: view.read_concern, - read_preference: view.read_preference, - collation: options[:collation], - # Aggregations can read documents from more than one collection, - # so they will be cleared on every write operation. - multi_collection: true, - } - end - - # @return [ Hash ] timeout_ms value set on the operation level (if any), - # and/or timeout_ms that is set on collection/database/client level (if any). - # - # @api private - def operation_timeouts(opts = {}) - {}.tap do |result| - if opts[:timeout_ms] || @timeout_ms - result[:operation_timeout_ms] = opts.delete(:timeout_ms) || @timeout_ms - else - result[:inherited_timeout_ms] = view.timeout_ms - end - end - end end end end diff --git a/lib/mongo/collection/view/aggregation/behavior.rb b/lib/mongo/collection/view/aggregation/behavior.rb new file mode 100644 index 0000000000..06b3445043 --- /dev/null +++ b/lib/mongo/collection/view/aggregation/behavior.rb @@ -0,0 +1,131 @@ +# frozen_string_literal: true + +module Mongo + class Collection + class View + class Aggregation + # Distills the behavior common to aggregator classes, like + # View::Aggregator and View::ChangeStream. + module Behavior + extend Forwardable + include Enumerable + include Immutable + include Iterable + include Explainable + include Loggable + include Retryable + + # @return [ View ] view The collection view. + attr_reader :view + + # Delegate necessary operations to the view. + def_delegators :view, :collection, :read, :cluster, :cursor_type + + # Delegate necessary operations to the collection. + def_delegators :collection, :database, :client + + # Set to true if disk usage is allowed during the aggregation. + # + # @example Set disk usage flag. + # aggregation.allow_disk_use(true) + # + # @param [ true, false ] value The flag value. + # + # @return [ true, false, Aggregation ] The aggregation if a value was + # set or the value if used as a getter. + # + # @since 2.0.0 + def allow_disk_use(value = nil) + configure(:allow_disk_use, value) + end + + # Get the explain plan for the aggregation. + # + # @example Get the explain plan for the aggregation. + # aggregation.explain + # + # @return [ Hash ] The explain plan. + # + # @since 2.0.0 + def explain + self.class.new(view, pipeline, options.merge(explain: true)).first + end + + # Whether this aggregation will write its result to a database collection. + # + # @return [ Boolean ] Whether the aggregation will write its result + # to a collection. + # + # @api private + def write? + pipeline.any? { |op| op.key?('$out') || op.key?(:$out) || op.key?('$merge') || op.key?(:$merge) } + end + + # @return [ Integer | nil ] the timeout_ms value that was passed as + # an option to this object, or which was inherited from the view. + # + # @api private + def timeout_ms + @timeout_ms || view.timeout_ms + end + + private + + # Common setup for all classes that include this behavior; the + # constructor should invoke this method. + def perform_setup(view, options, forbid: []) + @view = view + + @timeout_ms = options.delete(:timeout_ms) + @options = BSON::Document.new(options).freeze + + yield + + validate_timeout_mode!(options, forbid: forbid) + end + + def server_selector + @view.send(:server_selector) + end + + def aggregate_spec(session, read_preference) + Builder::Aggregation.new( + pipeline, + view, + options.merge(session: session, read_preference: read_preference) + ).specification + end + + # Skip, sort, limit, projection are specified as pipeline stages + # rather than as options. + def cache_options + { + namespace: collection.namespace, + selector: pipeline, + read_concern: view.read_concern, + read_preference: view.read_preference, + collation: options[:collation], + # Aggregations can read documents from more than one collection, + # so they will be cleared on every write operation. + multi_collection: true, + } + end + + # @return [ Hash ] timeout_ms value set on the operation level (if any), + # and/or timeout_ms that is set on collection/database/client level (if any). + # + # @api private + def operation_timeouts(opts = {}) + {}.tap do |result| + if opts[:timeout_ms] || @timeout_ms + result[:operation_timeout_ms] = opts.delete(:timeout_ms) || @timeout_ms + else + result[:inherited_timeout_ms] = view.timeout_ms + end + end + end + end + end + end + end +end diff --git a/lib/mongo/collection/view/change_stream.rb b/lib/mongo/collection/view/change_stream.rb index b35cb9b732..f5e572358c 100644 --- a/lib/mongo/collection/view/change_stream.rb +++ b/lib/mongo/collection/view/change_stream.rb @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +require 'mongo/collection/view/aggregation/behavior' require 'mongo/collection/view/change_stream/retryable' module Mongo @@ -35,7 +36,8 @@ class View # # # @since 2.5.0 - class ChangeStream < Aggregation + class ChangeStream + include Aggregation::Behavior include Retryable # @return [ String ] The fullDocument option default value. @@ -129,11 +131,13 @@ class ChangeStream < Aggregation # # @since 2.5.0 def initialize(view, pipeline, changes_for, options = {}) - @view = view - @changes_for = changes_for - @change_stream_filters = pipeline && pipeline.dup - @options = options && options.dup.freeze - @start_after = @options[:start_after] + # change stream cursors can only be :iterable, so we don't allow + # timeout_mode to be specified. + perform_setup(view, options, forbid: %i[ timeout_mode ]) do + @changes_for = changes_for + @change_stream_filters = pipeline && pipeline.dup + @start_after = @options[:start_after] + end # The resume token tracked by the change stream, used only # when there is no cursor, or no cursor resume token @@ -185,24 +189,30 @@ def each # @return [ BSON::Document | nil ] A change stream document. # @since 2.6.0 def try_next + recreate_cursor! if @timed_out + raise StopIteration.new if closed? + begin doc = @cursor.try_next rescue Mongo::Error => e - if !e.change_stream_resumable? - raise - end - - # Rerun initial aggregation. - # Any errors here will stop iteration and break out of this - # method. + # "If a next call fails with a timeout error, drivers MUST NOT + # invalidate the change stream. The subsequent next call MUST + # perform a resume attempt to establish a new change stream on the + # server..." + # + # However, SocketTimeoutErrors are TimeoutErrors, but are also + # change-stream-resumable. To preserve existing (specified) behavior, + # We only count timeouts when the error is not also + # change-stream-resumable. + @timed_out = e.is_a?(Mongo::Error::TimeoutError) && !e.change_stream_resumable? + + raise unless @timed_out || e.change_stream_resumable? - # Save cursor's resume token so we can use it - # to create a new cursor @resume_token = @cursor.resume_token + raise e if @timed_out - close - create_cursor! + recreate_cursor!(@cursor.context) retry end @@ -305,21 +315,23 @@ def for_collection? !for_cluster? && !for_database? end - def create_cursor! + def create_cursor!(timeout_ms = nil) # clear the cache because we may get a newer or an older server # (rolling upgrades) @start_at_operation_time_supported = nil session = client.get_session(@options) - context = Operation::Context.new(client: client, session: session, operation_timeouts: operation_timeouts) + context = Operation::Context.new(client: client, session: session, operation_timeouts: timeout_ms ? { operation_timeout_ms: timeout_ms } : operation_timeouts) start_at_operation_time = nil start_at_operation_time_supported = nil + @cursor = read_with_retry_cursor(session, server_selector, view, context: context) do |server| server.with_connection do |connection| start_at_operation_time_supported = connection.description.server_version_gte?('4.0') result = send_initial_query(connection, context) + if doc = result.replies.first && result.replies.first.documents.first start_at_operation_time = doc['operationTime'] else @@ -333,6 +345,7 @@ def create_cursor! result end end + @start_at_operation_time = start_at_operation_time @start_at_operation_time_supported = start_at_operation_time_supported end @@ -421,6 +434,15 @@ def time_to_bson_timestamp(time) def resuming? !!@resuming end + + # Recreates the current cursor (typically as a consequence of attempting + # to resume the change stream) + def recreate_cursor!(context = nil) + @timed_out = false + + close + create_cursor!(context&.remaining_timeout_ms) + end end end end diff --git a/lib/mongo/cursor_host.rb b/lib/mongo/cursor_host.rb index 3df60eb650..1d192527d9 100644 --- a/lib/mongo/cursor_host.rb +++ b/lib/mongo/cursor_host.rb @@ -24,13 +24,19 @@ module CursorHost # have been given. # # @param [ Hash ] options The options to inspect. + # @param [ Array ] forbid The list of options to forbid for this + # class. # # @raise [ ArgumentError ] if inconsistent or incompatible options are # detected. # # @api private # rubocop:disable Metrics - def validate_timeout_mode!(options) + def validate_timeout_mode!(options, forbid: []) + forbid.each do |key| + raise ArgumentError, "#{key} is not allowed here" if options.key?(key) + end + cursor_type = options[:cursor_type] timeout_mode = options[:timeout_mode] diff --git a/lib/mongo/retryable/read_worker.rb b/lib/mongo/retryable/read_worker.rb index 93e215ac7c..4f3f14283f 100644 --- a/lib/mongo/retryable/read_worker.rb +++ b/lib/mongo/retryable/read_worker.rb @@ -66,7 +66,7 @@ class ReadWorker < BaseWorker # # @return [ Cursor ] The cursor for the result set. def read_with_retry_cursor(session, server_selector, view, context: nil, &block) - read_with_retry(session, server_selector) do |server| + read_with_retry(session, server_selector, context) do |server| result = yield server # RUBY-2367: This will be updated to allow the query cache to diff --git a/spec/mongo/collection/view/change_stream_spec.rb b/spec/mongo/collection/view/change_stream_spec.rb index ad25ad65a8..42446d2eeb 100644 --- a/spec/mongo/collection/view/change_stream_spec.rb +++ b/spec/mongo/collection/view/change_stream_spec.rb @@ -507,7 +507,7 @@ end it 'includes the max_await_time value in the formatted string' do - expect(change_stream.inspect).to include({ max_await_time_ms: 10 }.to_s) + expect(change_stream.inspect).to include({ 'max_await_time_ms' => 10 }.to_s) end end @@ -518,7 +518,7 @@ end it 'includes the batch_size value in the formatted string' do - expect(change_stream.inspect).to include({ batch_size: 5 }.to_s) + expect(change_stream.inspect).to include({ 'batch_size' => 5 }.to_s) end end @@ -529,7 +529,7 @@ end it 'includes the collation value in the formatted string' do - expect(change_stream.inspect).to include({ 'collation' => { locale: 'en_US', strength: 2 } }.to_s) + expect(change_stream.inspect).to include({ 'collation' => { 'locale' => 'en_US', 'strength' => 2 } }.to_s) end end diff --git a/spec/runners/unified/change_stream_operations.rb b/spec/runners/unified/change_stream_operations.rb index 008ac1129f..ade607f40c 100644 --- a/spec/runners/unified/change_stream_operations.rb +++ b/spec/runners/unified/change_stream_operations.rb @@ -10,22 +10,9 @@ def create_change_stream(op) object = entities.get_any(object_id) use_arguments(op) do |args| pipeline = args.use!('pipeline') - opts = {} - if batch_size = args.use('batchSize') - opts[:batch_size] = batch_size - end - if comment = args.use('comment') - opts[:comment] = comment - end - if full_document = args.use('fullDocument') - opts[:full_document] = full_document - end - if full_document_before_change = args.use('fullDocumentBeforeChange') - opts[:full_document_before_change] = full_document_before_change - end - if args.key?('showExpandedEvents') - opts[:show_expanded_events] = args.use!('showExpandedEvents') - end + opts = extract_options(args, 'batchSize', 'comment', 'fullDocument', + 'fullDocumentBeforeChange', 'showExpandedEvents', 'timeoutMS', + 'maxAwaitTimeMS') cs = object.watch(pipeline, **opts) if name = op.use('saveResultAsEntity') entities.set(:change_stream, name, cs) @@ -39,6 +26,12 @@ def iterate_until_document_or_error(op) object.try_next end + def iterate_once(op) + stream_id = op.use!('object') + stream = entities.get_any(stream_id) + stream.try_next + end + def close(op) object_id = op.use!('object') opts = op.key?('arguments') ? extract_options(op.use!('arguments'), 'timeoutMS') : {} diff --git a/spec/spec_tests/data/client_side_operations_timeout/change-streams.yml b/spec/spec_tests/data/client_side_operations_timeout/change-streams.yml new file mode 100644 index 0000000000..e42b03fefa --- /dev/null +++ b/spec/spec_tests/data/client_side_operations_timeout/change-streams.yml @@ -0,0 +1,354 @@ +description: "timeoutMS behaves correctly for change streams" + +schemaVersion: "1.9" + +runOnRequirements: + - minServerVersion: "4.4" + topologies: ["replicaset", "sharded"] + +createEntities: + - client: + id: &failPointClient failPointClient + useMultipleMongoses: false + - client: + id: &client client + useMultipleMongoses: false + observeEvents: + - commandStartedEvent + # Drivers are not required to execute killCursors during resume attempts, so it should be ignored for command + # monitoring assertions. + ignoreCommandMonitoringEvents: ["killCursors"] + - database: + id: &database database + client: *client + databaseName: &databaseName test + - collection: + id: &collection collection + database: *database + collectionName: &collectionName coll + +initialData: + - collectionName: *collectionName + databaseName: *databaseName + documents: [] + +tests: + - description: "error if maxAwaitTimeMS is greater than timeoutMS" + operations: + - name: createChangeStream + object: *collection + arguments: + pipeline: [] + timeoutMS: 5 + maxAwaitTimeMS: 10 + expectError: + isClientError: true + + - description: "error if maxAwaitTimeMS is equal to timeoutMS" + operations: + - name: createChangeStream + object: *collection + arguments: + pipeline: [] + timeoutMS: 5 + maxAwaitTimeMS: 5 + expectError: + isClientError: true + + - description: "timeoutMS applied to initial aggregate" + operations: + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: ["aggregate"] + blockConnection: true + blockTimeMS: 55 + - name: createChangeStream + object: *collection + arguments: + pipeline: [] + timeoutMS: 50 + expectError: + isTimeoutError: true + expectEvents: + - client: *client + events: + - commandStartedEvent: + commandName: aggregate + databaseName: *databaseName + command: + aggregate: *collectionName + maxTimeMS: { $$type: ["int", "long"] } + + # If maxAwaitTimeMS is not set, timeoutMS should be refreshed for the getMore and the getMore should not have a + # maxTimeMS field. This test requires a high timeout because the server applies a default 1000ms maxAwaitTime. To + # ensure that the driver is refreshing the timeout between commands, the test blocks aggregate and getMore commands + # for 30ms each and creates/iterates a change stream with timeoutMS=1050. The initial aggregate will block for 30ms + # and the getMore will block for 1030ms. + - description: "timeoutMS is refreshed for getMore if maxAwaitTimeMS is not set" + operations: + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: { times: 2 } + data: + failCommands: ["aggregate", "getMore"] + blockConnection: true + blockTimeMS: 30 + - name: createChangeStream + object: *collection + arguments: + pipeline: [] + timeoutMS: 1050 + saveResultAsEntity: &changeStream changeStream + - name: iterateOnce + object: *changeStream + expectEvents: + - client: *client + events: + - commandStartedEvent: + commandName: aggregate + databaseName: *databaseName + command: + aggregate: *collectionName + maxTimeMS: { $$type: ["int", "long"] } + - commandStartedEvent: + commandName: getMore + databaseName: *databaseName + command: + getMore: { $$type: ["int", "long"] } + collection: *collectionName + maxTimeMS: { $$exists: false } + + # If maxAwaitTimeMS is set, timeoutMS should still be refreshed for the getMore and the getMore command should have a + # maxTimeMS field. + - description: "timeoutMS is refreshed for getMore if maxAwaitTimeMS is set" + operations: + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: { times: 2 } + data: + failCommands: ["aggregate", "getMore"] + blockConnection: true + # was 15, changed to 30 to account for jruby driver latency. + blockTimeMS: 30 + - name: createChangeStream + object: *collection + arguments: + pipeline: [] + # was 20, changed to 29 to account for native ruby driver latency. + # Changed again to 59 to account for additional jruby driver latency. + # The idea for this test is that each operation is delayed by 15ms + # (by failpoint). the timeout for each operation is set to (originally) + # 20ms, because if timeoutMS was not refreshed for getMore, it would timeout. + # However, we're tickling the 20ms timeout because the driver itself + # is taking more than 5ms to do its thing. + # + # Changing the blockTimeMS in the failpoint to 30ms, and then bumping + # the timeout to almost twice that (59ms) should give us the same + # effect in the test. + timeoutMS: 59 + batchSize: 2 + maxAwaitTimeMS: 1 + saveResultAsEntity: &changeStream changeStream + - name: iterateOnce + object: *changeStream + expectEvents: + - client: *client + events: + - commandStartedEvent: + commandName: aggregate + databaseName: *databaseName + command: + aggregate: *collectionName + maxTimeMS: { $$type: ["int", "long"] } + - commandStartedEvent: + commandName: getMore + databaseName: *databaseName + command: + getMore: { $$type: ["int", "long"] } + collection: *collectionName + maxTimeMS: 1 + + # The timeout should be applied to the entire resume attempt, not individually to each command. The test creates a + # change stream with timeoutMS=20 which returns an empty initial batch and then sets a fail point to block both + # getMore and aggregate for 12ms each and fail with a resumable error. When the resume attempt happens, the getMore + # and aggregate block for longer than 20ms total, so it times out. + - description: "timeoutMS applies to full resume attempt in a next call" + operations: + - name: createChangeStream + object: *collection + arguments: + pipeline: [] + # Originally set to 20, but the Ruby driver was too-often taking + # that much time, and causing the timing of the test to fail. Instead, + # bumped the timout to 23ms, which is just less than twice the + # blockTimeMS for the failpoint. It still failed on jruby, so the + # timeout (and blockTimeMS) were drastically increased to accomodate + # JRuby. This tests the same thing, but gives the driver a bit more + # breathing space. + timeoutMS: 99 + saveResultAsEntity: &changeStream changeStream + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: { times: 2 } + data: + failCommands: ["getMore", "aggregate"] + blockConnection: true + # Originally 12, bumped it to 50 to give the jruby driver a bit + # more breathing room. + blockTimeMS: 50 + errorCode: 7 # HostNotFound - resumable but does not require an SDAM state change. + # failCommand doesn't correctly add the ResumableChangeStreamError by default. It needs to be specified + # manually here so the error is considered resumable. The failGetMoreAfterCursorCheckout fail point + # would add the label without this, but it does not support blockConnection functionality. + errorLabels: ["ResumableChangeStreamError"] + - name: iterateUntilDocumentOrError + object: *changeStream + expectError: + isTimeoutError: true + expectEvents: + - client: *client + events: + - commandStartedEvent: + commandName: aggregate + databaseName: *databaseName + command: + aggregate: *collectionName + maxTimeMS: { $$type: ["int", "long"] } + - commandStartedEvent: + commandName: getMore + databaseName: *databaseName + command: + getMore: { $$type: ["int", "long"] } + collection: *collectionName + maxTimeMS: { $$exists: false } + - commandStartedEvent: + commandName: aggregate + databaseName: *databaseName + command: + aggregate: *collectionName + maxTimeMS: { $$type: ["int", "long"] } + + - description: "change stream can be iterated again if previous iteration times out" + operations: + - name: createChangeStream + object: *collection + arguments: + pipeline: [] + # Specify a short maxAwaitTimeMS because otherwise the getMore on the new cursor will wait for 1000ms and + # time out. + maxAwaitTimeMS: 1 + timeoutMS: 100 + saveResultAsEntity: &changeStream changeStream + # Block getMore for 150ms to force the next() call to time out. + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: ["getMore"] + blockConnection: true + blockTimeMS: 150 + # The original aggregate didn't return any events so this should do a getMore and return a timeout error. + - name: iterateUntilDocumentOrError + object: *changeStream + expectError: + isTimeoutError: true + # The previous iteration attempt timed out so this should re-create the change stream. We use iterateOnce rather + # than iterateUntilDocumentOrError because there haven't been any events and we only want to assert that the + # cursor was re-created. + - name: iterateOnce + object: *changeStream + expectEvents: + - client: *client + events: + - commandStartedEvent: + commandName: aggregate + databaseName: *databaseName + command: + aggregate: *collectionName + maxTimeMS: { $$type: ["int", "long"] } + # The iterateUntilDocumentOrError operation should send a getMore. + - commandStartedEvent: + commandName: getMore + databaseName: *databaseName + command: + getMore: { $$type: ["int", "long"] } + collection: *collectionName + # The iterateOnce operation should re-create the cursor via an aggregate and then send a getMore to iterate + # the new cursor. + - commandStartedEvent: + commandName: aggregate + databaseName: *databaseName + command: + aggregate: *collectionName + maxTimeMS: { $$type: ["int", "long"] } + - commandStartedEvent: + commandName: getMore + databaseName: *databaseName + command: + getMore: { $$type: ["int", "long"] } + collection: *collectionName + + # The timeoutMS value should be refreshed for getMore's. This is a failure test. The createChangeStream operation + # sets timeoutMS=10 and the getMore blocks for 15ms, causing iteration to fail with a timeout error. + - description: "timeoutMS is refreshed for getMore - failure" + operations: + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: ["getMore"] + blockConnection: true + blockTimeMS: 15 + - name: createChangeStream + object: *collection + arguments: + pipeline: [] + timeoutMS: 10 + saveResultAsEntity: &changeStream changeStream + # The first iteration should do a getMore + - name: iterateUntilDocumentOrError + object: *changeStream + expectError: + isTimeoutError: true + expectEvents: + - client: *client + events: + - commandStartedEvent: + commandName: aggregate + databaseName: *databaseName + command: + aggregate: *collectionName + maxTimeMS: { $$type: ["int", "long"] } + # The iterateUntilDocumentOrError operation should send a getMore. + - commandStartedEvent: + commandName: getMore + databaseName: *databaseName + command: + getMore: { $$type: ["int", "long"] } + collection: *collectionName diff --git a/spec/spec_tests/data/client_side_operations_timeout/convenient-transactions.yml b/spec/spec_tests/data/client_side_operations_timeout/convenient-transactions.yml index d79aa4bd05..050d0d514f 100644 --- a/spec/spec_tests/data/client_side_operations_timeout/convenient-transactions.yml +++ b/spec/spec_tests/data/client_side_operations_timeout/convenient-transactions.yml @@ -66,10 +66,18 @@ tests: data: failCommands: ["insert"] blockConnection: true - blockTimeMS: 30 + # Was 30, but JRuby was taking too long in preparing and issuing + # the operation. We now specify the timeoutMS below, and set this + # value to just more than half of it (so that two inserts will + # exceed the timeout, but one won't--or shouldn't). + blockTimeMS: 51 - name: withTransaction object: *session arguments: + # Was originally not specified here, inheriting the client value of 50ms. + # That wasn't giving JRuby enough time, so we specify a larger value + # here. + timeoutMS: 100 callback: - name: insertOne object: *collection diff --git a/spec/spec_tests/data/client_side_operations_timeout/non-tailable-cursors.yml b/spec/spec_tests/data/client_side_operations_timeout/non-tailable-cursors.yml index 087976206e..463800135e 100644 --- a/spec/spec_tests/data/client_side_operations_timeout/non-tailable-cursors.yml +++ b/spec/spec_tests/data/client_side_operations_timeout/non-tailable-cursors.yml @@ -53,11 +53,14 @@ tests: data: failCommands: ["find"] blockConnection: true - blockTimeMS: 15 + # changed to 30ms to accommodate jruby latencies + blockTimeMS: 30 - name: find object: *collection arguments: filter: {} + # added as a 25ms timeout to accommodate jruby latencies + timeoutMS: 25 timeoutMode: cursorLifetime expectError: isTimeoutError: true @@ -86,14 +89,16 @@ tests: data: failCommands: ["find", "getMore"] blockConnection: true - blockTimeMS: 20 + # bumped to 50 to accommodate jruby latencies + blockTimeMS: 50 # Run a find with timeoutMS=39 and batchSize=1 to force two batches, which will cause a find and a getMore to be # sent. Both will block for 20ms so together they will go over the timeout. - name: find object: *collection arguments: filter: {} - timeoutMS: 39 + # bumped to 99 to accommodate jruby latencies + timeoutMS: 99 batchSize: 2 expectError: isTimeoutError: true