Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/mongo/operation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
require 'mongo/operation/shared/validatable'
require 'mongo/operation/shared/object_id_generator'
require 'mongo/operation/shared/op_msg_executable'
require 'mongo/operation/shared/timed'

require 'mongo/operation/op_msg_base'
require 'mongo/operation/command'
Expand Down
3 changes: 2 additions & 1 deletion lib/mongo/operation/delete/op_msg.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ def selector(connection)

def message(connection)
section = Protocol::Msg::Section1.new(IDENTIFIER, send(IDENTIFIER))
Protocol::Msg.new(flags, {}, command(connection), section)
cmd = apply_relevant_timeouts_to(command(connection), connection)
Protocol::Msg.new(flags, {}, cmd, section)
end
end
end
Expand Down
3 changes: 2 additions & 1 deletion lib/mongo/operation/insert/op_msg.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ def selector(connection)

def message(connection)
section = Protocol::Msg::Section1.new(IDENTIFIER, send(IDENTIFIER))
Protocol::Msg.new(flags, {}, command(connection), section)
cmd = apply_relevant_timeouts_to(command(connection), connection)
Protocol::Msg.new(flags, {}, cmd, section)
end
end
end
Expand Down
4 changes: 3 additions & 1 deletion lib/mongo/operation/op_msg_base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ class OpMsgBase
include Specifiable
include Executable
include SessionsSupported
include Timed

private

def message(connection)
Protocol::Msg.new(flags, options(connection), command(connection))
cmd = apply_relevant_timeouts_to(command(connection), connection)
Protocol::Msg.new(flags, options(connection), cmd)
end
end
end
Expand Down
12 changes: 11 additions & 1 deletion lib/mongo/operation/shared/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,18 @@ module Executable

include ResponseHandling

# @return [ Operation::Context | nil ] the operation context used to
# execute this operation.
attr_accessor :context

def do_execute(connection, context, options = {})
# Save the context on the instance, to avoid having to pass it as a
# parameter to every single method. There are many legacy methods that
# still accept it as a parameter, which are left as-is for now to
# minimize the impact of this change. Moving forward, it may be
# reasonable to refactor things so this saved reference is used instead.
@context = context

session&.materialize_if_needed
unpin_maybe(session, connection) do
add_error_labels(connection, context) do
Expand Down Expand Up @@ -112,7 +123,6 @@ def build_message(connection, context)
if server_api = context.server_api
msg = msg.maybe_add_server_api(server_api)
end
msg = msg.maybe_add_max_time_ms(connection, context)
msg
end

Expand Down
51 changes: 51 additions & 0 deletions lib/mongo/operation/shared/timed.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# frozen_string_literal: true

module Mongo
module Operation
# Defines the behavior of operations that have the default timeout
# behavior described by the client-side operation timeouts (CSOT)
# spec.
#
# @api private
module Timed
# If a timeout is active (as defined by the current context), and it has
# not yet expired, add :maxTimeMS to the spec.
#
# @param [ Hash ] spec The spec to modify
# @param [ Connection ] connection The connection that will be used to
# execute the operation
#
# @return [ Hash ] the spec
#
# @raises [ Mongo::Error::TimeoutError ] if the current timeout has
# expired.
def apply_relevant_timeouts_to(spec, connection)
with_max_time(connection) do |max_time_sec|
return spec if max_time_sec.nil?

spec.tap { spec[:maxTimeMS] = (max_time_sec * 1_000).to_i }
end
end

# A helper method that computes the remaining timeout (in seconds) and
# yields it to the associated block. If no timeout is present, yields
# nil. If the timeout has expired, raises Mongo::Error::TimeoutError.
#
# @param [ Connection ] connection The connection that will be used to
# execute the operation
#
# @return [ Hash ] the result of yielding to the block (which must be
# a Hash)
def with_max_time(connection)
if context&.has_timeout?
max_time_sec = context.remaining_timeout_sec - connection.server.minimum_round_trip_time
raise Mongo::Error::TimeoutError if max_time_sec <= 0

yield max_time_sec
else
yield nil
end
end
end
end
end
3 changes: 2 additions & 1 deletion lib/mongo/operation/update/op_msg.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ def selector(connection)
def message(connection)
updates = validate_updates(connection, send(IDENTIFIER))
section = Protocol::Msg::Section1.new(IDENTIFIER, updates)
Protocol::Msg.new(flags, {}, command(connection), section)
cmd = apply_relevant_timeouts_to(command(connection), connection)
Protocol::Msg.new(flags, {}, cmd, section)
end
end
end
Expand Down
31 changes: 0 additions & 31 deletions lib/mongo/protocol/msg.rb
Original file line number Diff line number Diff line change
Expand Up @@ -301,37 +301,6 @@ def maybe_add_server_api(server_api)
Msg.new(@flags, @options, main_document, *@sequences)
end

# Adds maxTimeMS attribute to the message if timeoutMS is set
# and there is enough time left to send the message to the server
# (remaining timeout is bigger than minimum round trip time for
# the server.
#
# @param [ Mongo::Server::Connection ] connection Connection the message
# should be sent with.
# @param [ Mongo::Operation::Context ] context Context of the operation
# the message is build from.
#
# @return [ Mongo::Protocol::Msg] message with added maxTimeMS attribute
# if needed.
#
# @raise [ Mongo::Error::TimeoutError ] if timeout expired or there is
# not enough time to send the message to the server.
def maybe_add_max_time_ms(connection, context)
return self unless context.has_timeout?

max_time_sec = context.remaining_timeout_sec - connection.server.minimum_round_trip_time
if max_time_sec > 0
max_time_ms = (max_time_sec * 1_000).to_i
main_document = @main_document.merge(
maxTimeMS: max_time_ms
)
Msg.new(@flags, @options, main_document, *@sequences)
else
raise Mongo::Error::TimeoutError
end
end


# Returns the number of documents returned from the server.
#
# The Msg instance must be for a server reply and the reply must return
Expand Down
Loading