diff --git a/lib/mongo/operation.rb b/lib/mongo/operation.rb index 8def25dbe3..c7d7140121 100644 --- a/lib/mongo/operation.rb +++ b/lib/mongo/operation.rb @@ -22,6 +22,7 @@ require 'mongo/operation/shared/validatable' require 'mongo/operation/shared/object_id_generator' require 'mongo/operation/shared/op_msg_executable' +require 'mongo/operation/shared/timed' require 'mongo/operation/op_msg_base' require 'mongo/operation/command' diff --git a/lib/mongo/operation/delete/op_msg.rb b/lib/mongo/operation/delete/op_msg.rb index f1435f9564..4ee081478d 100644 --- a/lib/mongo/operation/delete/op_msg.rb +++ b/lib/mongo/operation/delete/op_msg.rb @@ -49,7 +49,8 @@ def selector(connection) def message(connection) section = Protocol::Msg::Section1.new(IDENTIFIER, send(IDENTIFIER)) - Protocol::Msg.new(flags, {}, command(connection), section) + cmd = apply_relevant_timeouts_to(command(connection), connection) + Protocol::Msg.new(flags, {}, cmd, section) end end end diff --git a/lib/mongo/operation/insert/op_msg.rb b/lib/mongo/operation/insert/op_msg.rb index 7ab863e6af..9f2b66f35e 100644 --- a/lib/mongo/operation/insert/op_msg.rb +++ b/lib/mongo/operation/insert/op_msg.rb @@ -49,7 +49,8 @@ def selector(connection) def message(connection) section = Protocol::Msg::Section1.new(IDENTIFIER, send(IDENTIFIER)) - Protocol::Msg.new(flags, {}, command(connection), section) + cmd = apply_relevant_timeouts_to(command(connection), connection) + Protocol::Msg.new(flags, {}, cmd, section) end end end diff --git a/lib/mongo/operation/op_msg_base.rb b/lib/mongo/operation/op_msg_base.rb index 5f00d42aec..5716227cd1 100644 --- a/lib/mongo/operation/op_msg_base.rb +++ b/lib/mongo/operation/op_msg_base.rb @@ -22,11 +22,13 @@ class OpMsgBase include Specifiable include Executable include SessionsSupported + include Timed private def message(connection) - Protocol::Msg.new(flags, options(connection), command(connection)) + cmd = apply_relevant_timeouts_to(command(connection), connection) + Protocol::Msg.new(flags, options(connection), cmd) end end end diff --git a/lib/mongo/operation/shared/executable.rb b/lib/mongo/operation/shared/executable.rb index 713ea111e8..a48e39a6fb 100644 --- a/lib/mongo/operation/shared/executable.rb +++ b/lib/mongo/operation/shared/executable.rb @@ -28,7 +28,18 @@ module Executable include ResponseHandling + # @return [ Operation::Context | nil ] the operation context used to + # execute this operation. + attr_accessor :context + def do_execute(connection, context, options = {}) + # Save the context on the instance, to avoid having to pass it as a + # parameter to every single method. There are many legacy methods that + # still accept it as a parameter, which are left as-is for now to + # minimize the impact of this change. Moving forward, it may be + # reasonable to refactor things so this saved reference is used instead. + @context = context + session&.materialize_if_needed unpin_maybe(session, connection) do add_error_labels(connection, context) do @@ -112,7 +123,6 @@ def build_message(connection, context) if server_api = context.server_api msg = msg.maybe_add_server_api(server_api) end - msg = msg.maybe_add_max_time_ms(connection, context) msg end diff --git a/lib/mongo/operation/shared/timed.rb b/lib/mongo/operation/shared/timed.rb new file mode 100644 index 0000000000..26c2192d89 --- /dev/null +++ b/lib/mongo/operation/shared/timed.rb @@ -0,0 +1,51 @@ +# frozen_string_literal: true + +module Mongo + module Operation + # Defines the behavior of operations that have the default timeout + # behavior described by the client-side operation timeouts (CSOT) + # spec. + # + # @api private + module Timed + # If a timeout is active (as defined by the current context), and it has + # not yet expired, add :maxTimeMS to the spec. + # + # @param [ Hash ] spec The spec to modify + # @param [ Connection ] connection The connection that will be used to + # execute the operation + # + # @return [ Hash ] the spec + # + # @raises [ Mongo::Error::TimeoutError ] if the current timeout has + # expired. + def apply_relevant_timeouts_to(spec, connection) + with_max_time(connection) do |max_time_sec| + return spec if max_time_sec.nil? + + spec.tap { spec[:maxTimeMS] = (max_time_sec * 1_000).to_i } + end + end + + # A helper method that computes the remaining timeout (in seconds) and + # yields it to the associated block. If no timeout is present, yields + # nil. If the timeout has expired, raises Mongo::Error::TimeoutError. + # + # @param [ Connection ] connection The connection that will be used to + # execute the operation + # + # @return [ Hash ] the result of yielding to the block (which must be + # a Hash) + def with_max_time(connection) + if context&.has_timeout? + max_time_sec = context.remaining_timeout_sec - connection.server.minimum_round_trip_time + raise Mongo::Error::TimeoutError if max_time_sec <= 0 + + yield max_time_sec + else + yield nil + end + end + end + end +end diff --git a/lib/mongo/operation/update/op_msg.rb b/lib/mongo/operation/update/op_msg.rb index 23ac1f4b4b..9606cd7d86 100644 --- a/lib/mongo/operation/update/op_msg.rb +++ b/lib/mongo/operation/update/op_msg.rb @@ -45,7 +45,8 @@ def selector(connection) def message(connection) updates = validate_updates(connection, send(IDENTIFIER)) section = Protocol::Msg::Section1.new(IDENTIFIER, updates) - Protocol::Msg.new(flags, {}, command(connection), section) + cmd = apply_relevant_timeouts_to(command(connection), connection) + Protocol::Msg.new(flags, {}, cmd, section) end end end diff --git a/lib/mongo/protocol/msg.rb b/lib/mongo/protocol/msg.rb index faf9015301..73df7a31b6 100644 --- a/lib/mongo/protocol/msg.rb +++ b/lib/mongo/protocol/msg.rb @@ -301,37 +301,6 @@ def maybe_add_server_api(server_api) Msg.new(@flags, @options, main_document, *@sequences) end - # Adds maxTimeMS attribute to the message if timeoutMS is set - # and there is enough time left to send the message to the server - # (remaining timeout is bigger than minimum round trip time for - # the server. - # - # @param [ Mongo::Server::Connection ] connection Connection the message - # should be sent with. - # @param [ Mongo::Operation::Context ] context Context of the operation - # the message is build from. - # - # @return [ Mongo::Protocol::Msg] message with added maxTimeMS attribute - # if needed. - # - # @raise [ Mongo::Error::TimeoutError ] if timeout expired or there is - # not enough time to send the message to the server. - def maybe_add_max_time_ms(connection, context) - return self unless context.has_timeout? - - max_time_sec = context.remaining_timeout_sec - connection.server.minimum_round_trip_time - if max_time_sec > 0 - max_time_ms = (max_time_sec * 1_000).to_i - main_document = @main_document.merge( - maxTimeMS: max_time_ms - ) - Msg.new(@flags, @options, main_document, *@sequences) - else - raise Mongo::Error::TimeoutError - end - end - - # Returns the number of documents returned from the server. # # The Msg instance must be for a server reply and the reply must return diff --git a/spec/mongo/operation/create/op_msg_spec.rb b/spec/mongo/operation/create/op_msg_spec.rb index f4033611b8..0ecafc8d52 100644 --- a/spec/mongo/operation/create/op_msg_spec.rb +++ b/spec/mongo/operation/create/op_msg_spec.rb @@ -2,8 +2,12 @@ # rubocop:todo all require 'spec_helper' +require_relative '../shared/csot/examples' describe Mongo::Operation::Create::OpMsg do + include CSOT::Examples + + let(:context) { Mongo::Operation::Context.new } let(:write_concern) do Mongo::WriteConcern.get(w: :majority) @@ -73,8 +77,6 @@ end describe '#selector' do - min_server_fcv '3.6' - it 'does not mutate user input' do user_input = IceNine.deep_freeze(spec.dup) expect do @@ -87,158 +89,152 @@ # https://jira.mongodb.org/browse/RUBY-2224 require_no_linting - context 'when the server supports OP_MSG' do + let(:global_args) do + { + create: TEST_COLL, + writeConcern: write_concern.options, + '$db' => SpecConfig.instance.test_db, + lsid: session.session_id + } + end + + let(:session) do + authorized_client.start_session + end - let(:global_args) do - { - create: TEST_COLL, - writeConcern: write_concern.options, - '$db' => SpecConfig.instance.test_db, - lsid: session.session_id - } + context 'when the topology is replica set or sharded' do + require_topology :replica_set, :sharded + + let(:expected_global_args) do + global_args.merge(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time) end - let(:session) do - authorized_client.start_session + it 'creates the correct OP_MSG message' do + authorized_client.command(ping:1) + expect(Mongo::Protocol::Msg).to receive(:new).with([], {}, expected_global_args) + op.send(:message, connection) end + end - context 'when the topology is replica set or sharded' do - min_server_fcv '3.6' - require_topology :replica_set, :sharded + context 'when the topology is standalone' do + require_topology :single - let(:expected_global_args) do - global_args.merge(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time) - end + let(:expected_global_args) do + global_args + end - it 'creates the correct OP_MSG message' do - authorized_client.command(ping:1) - expect(Mongo::Protocol::Msg).to receive(:new).with([], {}, expected_global_args) - op.send(:message, connection) - end + it 'creates the correct OP_MSG message' do + authorized_client.command(ping:1) + expect(Mongo::Protocol::Msg).to receive(:new).with([], {}, expected_global_args) + op.send(:message, connection) end - context 'when the topology is standalone' do - min_server_fcv '3.6' - require_topology :single + context 'when an implicit session is created and the topology is then updated and the server does not support sessions' do + # Mocks on features are incompatible with linting + require_no_linting let(:expected_global_args) do - global_args - end - - it 'creates the correct OP_MSG message' do - authorized_client.command(ping:1) - expect(Mongo::Protocol::Msg).to receive(:new).with([], {}, expected_global_args) - op.send(:message, connection) - end - - context 'when an implicit session is created and the topology is then updated and the server does not support sessions' do - # Mocks on features are incompatible with linting - require_no_linting - - let(:expected_global_args) do - global_args.dup.tap do |args| - args.delete(:lsid) - end + global_args.dup.tap do |args| + args.delete(:lsid) end + end - let(:session) do - Mongo::Session.new(nil, authorized_client, implicit: true).tap do |session| - allow(session).to receive(:session_id).and_return(42) - session.should be_implicit - end + let(:session) do + Mongo::Session.new(nil, authorized_client, implicit: true).tap do |session| + allow(session).to receive(:session_id).and_return(42) + session.should be_implicit end + end - it 'creates the correct OP_MSG message' do - RSpec::Mocks.with_temporary_scope do - expect(connection.features).to receive(:sessions_enabled?).and_return(false) + it 'creates the correct OP_MSG message' do + RSpec::Mocks.with_temporary_scope do + expect(connection.features).to receive(:sessions_enabled?).and_return(false) - expect(expected_global_args[:session]).to be nil - expect(Mongo::Protocol::Msg).to receive(:new).with([], {}, expected_global_args) - op.send(:message, connection) - end + expect(expected_global_args[:session]).to be nil + expect(Mongo::Protocol::Msg).to receive(:new).with([], {}, expected_global_args) + op.send(:message, connection) end end end + end - context 'when the write concern is 0' do + context 'when the write concern is 0' do - let(:write_concern) do - Mongo::WriteConcern.get(w: 0) - end + let(:write_concern) do + Mongo::WriteConcern.get(w: 0) + end - context 'when the session is implicit' do + context 'when the session is implicit' do - let(:session) do - Mongo::Session.new(nil, authorized_client, implicit: true).tap do |session| - allow(session).to receive(:session_id).and_return(42) - session.should be_implicit - end + let(:session) do + Mongo::Session.new(nil, authorized_client, implicit: true).tap do |session| + allow(session).to receive(:session_id).and_return(42) + session.should be_implicit end + end - context 'when the topology is replica set or sharded' do - min_server_fcv '3.6' - require_topology :replica_set, :sharded + context 'when the topology is replica set or sharded' do + require_topology :replica_set, :sharded - let(:expected_global_args) do - global_args.dup.tap do |args| - args.delete(:lsid) - args.merge!(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time) - end + let(:expected_global_args) do + global_args.dup.tap do |args| + args.delete(:lsid) + args.merge!(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time) end + end - it 'does not send a session id in the command' do - authorized_client.command(ping:1) - expect(Mongo::Protocol::Msg).to receive(:new).with([:more_to_come], {}, expected_global_args) - op.send(:message, connection) - end + it 'does not send a session id in the command' do + authorized_client.command(ping:1) + expect(Mongo::Protocol::Msg).to receive(:new).with([:more_to_come], {}, expected_global_args) + op.send(:message, connection) end + end - context 'when the topology is standalone' do - min_server_fcv '3.6' - require_topology :single + context 'when the topology is standalone' do + require_topology :single - let(:expected_global_args) do - global_args.dup.tap do |args| - args.delete(:lsid) - end + let(:expected_global_args) do + global_args.dup.tap do |args| + args.delete(:lsid) end + end - it 'creates the correct OP_MSG message' do - authorized_client.command(ping:1) - expect(Mongo::Protocol::Msg).to receive(:new).with([:more_to_come], {}, expected_global_args) - op.send(:message, connection) - end + it 'creates the correct OP_MSG message' do + authorized_client.command(ping:1) + expect(Mongo::Protocol::Msg).to receive(:new).with([:more_to_come], {}, expected_global_args) + op.send(:message, connection) end end + end - context 'when the session is explicit' do - min_server_fcv '3.6' - require_topology :replica_set, :sharded + context 'when the session is explicit' do + require_topology :replica_set, :sharded - let(:session) do - authorized_client.start_session - end + let(:session) do + authorized_client.start_session + end - before do - session.should_not be_implicit - end + before do + session.should_not be_implicit + end - let(:expected_global_args) do - global_args.dup.tap do |args| - args.delete(:lsid) - args.merge!(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time) - end + let(:expected_global_args) do + global_args.dup.tap do |args| + args.delete(:lsid) + args.merge!(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time) end + end - it 'does not send a session id in the command' do - authorized_client.command(ping:1) - RSpec::Mocks.with_temporary_scope do - expect(Mongo::Protocol::Msg).to receive(:new).with([:more_to_come], {}, expected_global_args) - op.send(:message, connection) - end + it 'does not send a session id in the command' do + authorized_client.command(ping:1) + RSpec::Mocks.with_temporary_scope do + expect(Mongo::Protocol::Msg).to receive(:new).with([:more_to_come], {}, expected_global_args) + op.send(:message, connection) end end end end end + + it_behaves_like 'a CSOT-compliant OpMsg subclass' end diff --git a/spec/mongo/operation/delete/op_msg_spec.rb b/spec/mongo/operation/delete/op_msg_spec.rb index ab163e3840..2e477df33d 100644 --- a/spec/mongo/operation/delete/op_msg_spec.rb +++ b/spec/mongo/operation/delete/op_msg_spec.rb @@ -2,8 +2,12 @@ # rubocop:todo all require 'spec_helper' +require_relative '../shared/csot/examples' describe Mongo::Operation::Delete::OpMsg do + include CSOT::Examples + + let(:context) { Mongo::Operation::Context.new } let(:write_concern) do Mongo::WriteConcern.get(w: :majority) @@ -125,7 +129,6 @@ end context 'when the topology is replica set or sharded' do - min_server_fcv '3.6' require_topology :replica_set, :sharded let(:expected_global_args) do @@ -140,7 +143,6 @@ end context 'when the topology is standalone' do - min_server_fcv '3.6' require_topology :single let(:expected_global_args) do @@ -198,7 +200,6 @@ end context 'when the topology is replica set or sharded' do - min_server_fcv '3.6' require_topology :replica_set, :sharded let(:expected_global_args) do @@ -216,7 +217,6 @@ end context 'when the topology is standalone' do - min_server_fcv '3.6' require_topology :single let(:expected_global_args) do @@ -234,7 +234,6 @@ end context 'when the session is explicit' do - min_server_fcv '3.6' require_topology :replica_set, :sharded let(:session) do @@ -263,4 +262,6 @@ end end end + + it_behaves_like 'a CSOT-compliant OpMsg subclass' end diff --git a/spec/mongo/operation/insert/op_msg_spec.rb b/spec/mongo/operation/insert/op_msg_spec.rb index 9da07f2ab6..9b9e28cc17 100644 --- a/spec/mongo/operation/insert/op_msg_spec.rb +++ b/spec/mongo/operation/insert/op_msg_spec.rb @@ -2,8 +2,12 @@ # rubocop:todo all require 'spec_helper' +require_relative '../shared/csot/examples' describe Mongo::Operation::Insert::OpMsg do + include CSOT::Examples + + let(:context) { Mongo::Operation::Context.new } let(:documents) { [{ :_id => 1, :foo => 1 }] } let(:session) { nil } @@ -104,193 +108,186 @@ # https://jira.mongodb.org/browse/RUBY-2224 require_no_linting - context 'when the server supports OP_MSG' do - min_server_fcv '3.6' + let(:documents) do + [ { foo: 1 }, { bar: 2 }] + end + + let(:global_args) do + { + insert: TEST_COLL, + ordered: true, + writeConcern: write_concern.options, + '$db' => SpecConfig.instance.test_db, + lsid: session.session_id + } + end + + let!(:expected_payload_1) do + Mongo::Protocol::Msg::Section1.new('documents', op.documents) + end - let(:documents) do - [ { foo: 1 }, { bar: 2 }] + let(:session) do + Mongo::Session.new(nil, authorized_client, implicit: true).tap do |session| + allow(session).to receive(:session_id).and_return(42) end + end - let(:global_args) do - { - insert: TEST_COLL, - ordered: true, - writeConcern: write_concern.options, - '$db' => SpecConfig.instance.test_db, - lsid: session.session_id - } + context 'when the topology is replica set or sharded' do + require_topology :replica_set, :sharded + + let(:expected_global_args) do + global_args.merge(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time) + end + + it 'creates the correct OP_MSG message' do + authorized_client.command(ping:1) + RSpec::Mocks.with_temporary_scope do + expect(Mongo::Protocol::Msg).to receive(:new).with([], + {}, + expected_global_args, + expected_payload_1) + op.send(:message, connection) + end end + end + + context 'when the topology is standalone' do + require_topology :single - let!(:expected_payload_1) do - Mongo::Protocol::Msg::Section1.new('documents', op.documents) + let(:expected_global_args) do + global_args end - let(:session) do - Mongo::Session.new(nil, authorized_client, implicit: true).tap do |session| - allow(session).to receive(:session_id).and_return(42) + it 'creates the correct OP_MSG message' do + RSpec::Mocks.with_temporary_scope do + authorized_client.command(ping:1) + expect(Mongo::Protocol::Msg).to receive(:new).with([], + {}, + expected_global_args, + expected_payload_1) + op.send(:message, connection) end end - context 'when the topology is replica set or sharded' do - min_server_fcv '3.6' - require_topology :replica_set, :sharded + context 'when an implicit session is created and the topology is then updated and the server does not support sessions' do + # Mocks on features are incompatible with linting + require_no_linting let(:expected_global_args) do - global_args.merge(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time) + global_args.dup.tap do |args| + args.delete(:lsid) + end + end + + before do + session.implicit?.should be true end it 'creates the correct OP_MSG message' do - authorized_client.command(ping:1) RSpec::Mocks.with_temporary_scope do + expect(connection.features).to receive(:sessions_enabled?).and_return(false) + + expect(expected_global_args).not_to have_key(:lsid) expect(Mongo::Protocol::Msg).to receive(:new).with([], - {}, - expected_global_args, - expected_payload_1) + {}, + expected_global_args, + expected_payload_1) op.send(:message, connection) end end end + end - context 'when the topology is standalone' do - min_server_fcv '3.6' - require_topology :single + context 'when the write concern is 0' do - let(:expected_global_args) do - global_args - end + let(:write_concern) do + Mongo::WriteConcern.get(w: 0) + end - it 'creates the correct OP_MSG message' do - RSpec::Mocks.with_temporary_scope do - authorized_client.command(ping:1) - expect(Mongo::Protocol::Msg).to receive(:new).with([], - {}, - expected_global_args, - expected_payload_1) - op.send(:message, connection) + context 'when the session is implicit' do + + let(:session) do + Mongo::Session.new(nil, authorized_client, implicit: true).tap do |session| + allow(session).to receive(:session_id).and_return(42) + session.should be_implicit end end - context 'when an implicit session is created and the topology is then updated and the server does not support sessions' do - # Mocks on features are incompatible with linting - require_no_linting + context 'when the topology is replica set or sharded' do + require_topology :replica_set, :sharded let(:expected_global_args) do global_args.dup.tap do |args| args.delete(:lsid) + args.merge!(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time) end end - before do - session.implicit?.should be true - end - - it 'creates the correct OP_MSG message' do + it 'does not send a session id in the command' do + authorized_client.command(ping:1) RSpec::Mocks.with_temporary_scope do - expect(connection.features).to receive(:sessions_enabled?).and_return(false) - - expect(expected_global_args).not_to have_key(:lsid) - expect(Mongo::Protocol::Msg).to receive(:new).with([], - {}, - expected_global_args, - expected_payload_1) + expect(Mongo::Protocol::Msg).to receive(:new).with([:more_to_come], + {}, + expected_global_args, + expected_payload_1) op.send(:message, connection) end end end - end - context 'when the write concern is 0' do - - let(:write_concern) do - Mongo::WriteConcern.get(w: 0) - end + context 'when the topology is standalone' do + require_topology :single - context 'when the session is implicit' do - - let(:session) do - Mongo::Session.new(nil, authorized_client, implicit: true).tap do |session| - allow(session).to receive(:session_id).and_return(42) - session.should be_implicit - end - end - - context 'when the topology is replica set or sharded' do - min_server_fcv '3.6' - require_topology :replica_set, :sharded - - let(:expected_global_args) do - global_args.dup.tap do |args| - args.delete(:lsid) - args.merge!(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time) - end - end - - it 'does not send a session id in the command' do - authorized_client.command(ping:1) - RSpec::Mocks.with_temporary_scope do - expect(Mongo::Protocol::Msg).to receive(:new).with([:more_to_come], - {}, - expected_global_args, - expected_payload_1) - op.send(:message, connection) - end + let(:expected_global_args) do + global_args.dup.tap do |args| + args.delete(:lsid) end end - context 'when the topology is standalone' do - min_server_fcv '3.6' - require_topology :single - - let(:expected_global_args) do - global_args.dup.tap do |args| - args.delete(:lsid) - end - end - - it 'creates the correct OP_MSG message' do - authorized_client.command(ping:1) - RSpec::Mocks.with_temporary_scope do - expect(Mongo::Protocol::Msg).to receive(:new).with([:more_to_come], - {}, - expected_global_args, - expected_payload_1) - op.send(:message, connection) - end + it 'creates the correct OP_MSG message' do + authorized_client.command(ping:1) + RSpec::Mocks.with_temporary_scope do + expect(Mongo::Protocol::Msg).to receive(:new).with([:more_to_come], + {}, + expected_global_args, + expected_payload_1) + op.send(:message, connection) end end end + end - context 'when the session is explicit' do - min_server_fcv '3.6' - require_topology :replica_set, :sharded + context 'when the session is explicit' do + require_topology :replica_set, :sharded - let(:session) do - authorized_client.start_session - end + let(:session) do + authorized_client.start_session + end - before do - session.should_not be_implicit - end + before do + session.should_not be_implicit + end - let(:expected_global_args) do - global_args.dup.tap do |args| - args.delete(:lsid) - args.merge!(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time) - end + let(:expected_global_args) do + global_args.dup.tap do |args| + args.delete(:lsid) + args.merge!(Mongo::Operation::CLUSTER_TIME => authorized_client.cluster.cluster_time) end + end - it 'does not send a session id in the command' do - authorized_client.command(ping:1) - RSpec::Mocks.with_temporary_scope do - expect(Mongo::Protocol::Msg).to receive(:new).with([:more_to_come], - {}, - expected_global_args, - expected_payload_1) - op.send(:message, connection) - end + it 'does not send a session id in the command' do + authorized_client.command(ping:1) + RSpec::Mocks.with_temporary_scope do + expect(Mongo::Protocol::Msg).to receive(:new).with([:more_to_come], + {}, + expected_global_args, + expected_payload_1) + op.send(:message, connection) end end end end end + + it_behaves_like 'a CSOT-compliant OpMsg subclass' end diff --git a/spec/mongo/operation/shared/csot/examples.rb b/spec/mongo/operation/shared/csot/examples.rb new file mode 100644 index 0000000000..c46a58d951 --- /dev/null +++ b/spec/mongo/operation/shared/csot/examples.rb @@ -0,0 +1,100 @@ +# frozen_string_literal: true +# rubocop:todo all + +module CSOT + module Examples + # expects the following values to be available: + # `op` -- an instance of an OpMsgBase subclass + def self.included(example_context) + example_context.shared_examples 'mock CSOT environment' do + # Linting freaks out because of the doubles used in these specs. + require_no_linting + + let(:message) { op.send(:message, connection) } + + let(:body) { message.documents.first } + + let(:remaining_timeout_sec) { nil } + let(:minimum_round_trip_time) { 0 } + + let(:context) do + Mongo::Operation::Context.new.tap do |context| + allow(context).to receive(:remaining_timeout_sec).and_return(remaining_timeout_sec) + allow(context).to receive(:has_timeout?).and_return(!remaining_timeout_sec.nil?) + end + end + + let(:server) do + instance_double(Mongo::Server).tap do |server| + allow(server).to receive(:minimum_round_trip_time).and_return(minimum_round_trip_time) + end + end + + let(:address) { Mongo::Address.new('127.0.0.1') } + + let(:description) do + Mongo::Server::Description.new( + address, { Mongo::Operation::Result::OK => 1 } + ) + end + + let(:features) do + Mongo::Server::Description::Features.new( + Mongo::Server::Description::Features::DRIVER_WIRE_VERSIONS, + address + ) + end + + let(:connection) do + instance_double(Mongo::Server::Connection).tap do |conn| + allow(conn).to receive(:server).and_return(server) + allow(conn).to receive(:description).and_return(description) + allow(conn).to receive(:features).and_return(features) + end + end + + before do + # context is normally set when calling `execute` on the operation, + # but since we're not doing that, we have to tell the operation + # what the context is. + op.context = context + end + end + + example_context.shared_examples 'a CSOT-compliant OpMsg subclass' do + include_examples 'mock CSOT environment' + + context 'when no timeout_ms set' do + it 'does not set maxTimeMS' do + expect(body.key?(:maxTimeMS)).to be false + end + end + + context 'when there is enough time to send the message' do + # Ten seconds remaining + let(:remaining_timeout_sec) { 10 } + + # One second RTT + let(:minimum_round_trip_time) { 1 } + + it 'sets the maxTimeMS' do + # Nine seconds + expect(body[:maxTimeMS]).to eq(9_000) + end + end + + context 'when there is not enough time to send the message' do + # Ten seconds remaining + let(:remaining_timeout_sec) { 0.1 } + + # One second RTT + let(:minimum_round_trip_time) { 1 } + + it 'fails with an exception' do + expect { message }.to raise_error(Mongo::Error::TimeoutError) + end + end + end + end + end +end diff --git a/spec/mongo/protocol/msg_spec.rb b/spec/mongo/protocol/msg_spec.rb index f1481363e1..23a4f41d1b 100644 --- a/spec/mongo/protocol/msg_spec.rb +++ b/spec/mongo/protocol/msg_spec.rb @@ -504,68 +504,4 @@ end end end - - describe '#maybe_add_max_time_ms' do - let(:server) { instance_double(Mongo::Server) } - - let(:connection) do - instance_double(Mongo::Server::Connection).tap do |conn| - allow(conn).to receive(:server).and_return(server) - end - end - - let(:new_msg) do - message.maybe_add_max_time_ms(connection, context) - end - - context 'when no timeout_ms set' do - let(:context) do - Mongo::Operation::Context.new - end - - it 'does now set maxTimeMS' do - expect(new_msg.documents.first.key?(:maxTimeMS)).to eq(false) - end - end - - context 'when there is enough time to send the message' do - let(:context) do - instance_double(Mongo::Operation::Context).tap do |ctx| - # Ten seconds - allow(ctx).to receive(:has_timeout?).and_return(true) - allow(ctx).to receive(:remaining_timeout_sec).twice.and_return(10) - end - end - - before do - # One second - allow(server).to receive(:minimum_round_trip_time).and_return(1) - end - - it 'sets the maxTimeMS' do - # Nine seconds - expect(new_msg.documents.first[:maxTimeMS]).to eq(9_000) - end - end - - context 'when there is not enough time to send the message' do - let(:context) do - instance_double(Mongo::Operation::Context).tap do |ctx| - allow(ctx).to receive(:has_timeout?).and_return(true) - allow(ctx).to receive(:remaining_timeout_sec).twice.and_return(0.1) - end - end - - before do - # One second - allow(server).to receive(:minimum_round_trip_time).and_return(1) - end - - it 'sets the maxTimeMS' do - expect do - new_msg - end.to raise_error(Mongo::Error::TimeoutError) - end - end - end end diff --git a/spec/runners/unified/crud_operations.rb b/spec/runners/unified/crud_operations.rb index a0b5ed83b2..871e820aab 100644 --- a/spec/runners/unified/crud_operations.rb +++ b/spec/runners/unified/crud_operations.rb @@ -240,7 +240,8 @@ def replace_one(op) comment: args.use('comment'), upsert: args.use('upsert'), let: args.use('let'), - hint: args.use('hint') + hint: args.use('hint'), + timeout_ms: args.use('timeout_ms') ) end end