Skip to content

Commit d42b312

Browse files
authored
RUBY-3445 Add timeouts in OpMsg instead of Protocol::Message (#2861)
* starting on the op_msg timeout refactoring * make sure classes with custom message() implementions set the timeouts * add more tests
1 parent 76869d4 commit d42b312

File tree

14 files changed

+414
-347
lines changed

14 files changed

+414
-347
lines changed

lib/mongo/operation.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
require 'mongo/operation/shared/validatable'
2323
require 'mongo/operation/shared/object_id_generator'
2424
require 'mongo/operation/shared/op_msg_executable'
25+
require 'mongo/operation/shared/timed'
2526

2627
require 'mongo/operation/op_msg_base'
2728
require 'mongo/operation/command'

lib/mongo/operation/delete/op_msg.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ def selector(connection)
4949

5050
def message(connection)
5151
section = Protocol::Msg::Section1.new(IDENTIFIER, send(IDENTIFIER))
52-
Protocol::Msg.new(flags, {}, command(connection), section)
52+
cmd = apply_relevant_timeouts_to(command(connection), connection)
53+
Protocol::Msg.new(flags, {}, cmd, section)
5354
end
5455
end
5556
end

lib/mongo/operation/insert/op_msg.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ def selector(connection)
4949

5050
def message(connection)
5151
section = Protocol::Msg::Section1.new(IDENTIFIER, send(IDENTIFIER))
52-
Protocol::Msg.new(flags, {}, command(connection), section)
52+
cmd = apply_relevant_timeouts_to(command(connection), connection)
53+
Protocol::Msg.new(flags, {}, cmd, section)
5354
end
5455
end
5556
end

lib/mongo/operation/op_msg_base.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@ class OpMsgBase
2222
include Specifiable
2323
include Executable
2424
include SessionsSupported
25+
include Timed
2526

2627
private
2728

2829
def message(connection)
29-
Protocol::Msg.new(flags, options(connection), command(connection))
30+
cmd = apply_relevant_timeouts_to(command(connection), connection)
31+
Protocol::Msg.new(flags, options(connection), cmd)
3032
end
3133
end
3234
end

lib/mongo/operation/shared/executable.rb

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,18 @@ module Executable
2828

2929
include ResponseHandling
3030

31+
# @return [ Operation::Context | nil ] the operation context used to
32+
# execute this operation.
33+
attr_accessor :context
34+
3135
def do_execute(connection, context, options = {})
36+
# Save the context on the instance, to avoid having to pass it as a
37+
# parameter to every single method. There are many legacy methods that
38+
# still accept it as a parameter, which are left as-is for now to
39+
# minimize the impact of this change. Moving forward, it may be
40+
# reasonable to refactor things so this saved reference is used instead.
41+
@context = context
42+
3243
session&.materialize_if_needed
3344
unpin_maybe(session, connection) do
3445
add_error_labels(connection, context) do
@@ -112,7 +123,6 @@ def build_message(connection, context)
112123
if server_api = context.server_api
113124
msg = msg.maybe_add_server_api(server_api)
114125
end
115-
msg = msg.maybe_add_max_time_ms(connection, context)
116126
msg
117127
end
118128

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# frozen_string_literal: true
2+
3+
module Mongo
4+
module Operation
5+
# Defines the behavior of operations that have the default timeout
6+
# behavior described by the client-side operation timeouts (CSOT)
7+
# spec.
8+
#
9+
# @api private
10+
module Timed
11+
# If a timeout is active (as defined by the current context), and it has
12+
# not yet expired, add :maxTimeMS to the spec.
13+
#
14+
# @param [ Hash ] spec The spec to modify
15+
# @param [ Connection ] connection The connection that will be used to
16+
# execute the operation
17+
#
18+
# @return [ Hash ] the spec
19+
#
20+
# @raises [ Mongo::Error::TimeoutError ] if the current timeout has
21+
# expired.
22+
def apply_relevant_timeouts_to(spec, connection)
23+
with_max_time(connection) do |max_time_sec|
24+
return spec if max_time_sec.nil?
25+
26+
spec.tap { spec[:maxTimeMS] = (max_time_sec * 1_000).to_i }
27+
end
28+
end
29+
30+
# A helper method that computes the remaining timeout (in seconds) and
31+
# yields it to the associated block. If no timeout is present, yields
32+
# nil. If the timeout has expired, raises Mongo::Error::TimeoutError.
33+
#
34+
# @param [ Connection ] connection The connection that will be used to
35+
# execute the operation
36+
#
37+
# @return [ Hash ] the result of yielding to the block (which must be
38+
# a Hash)
39+
def with_max_time(connection)
40+
if context&.has_timeout?
41+
max_time_sec = context.remaining_timeout_sec - connection.server.minimum_round_trip_time
42+
raise Mongo::Error::TimeoutError if max_time_sec <= 0
43+
44+
yield max_time_sec
45+
else
46+
yield nil
47+
end
48+
end
49+
end
50+
end
51+
end

lib/mongo/operation/update/op_msg.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ def selector(connection)
4545
def message(connection)
4646
updates = validate_updates(connection, send(IDENTIFIER))
4747
section = Protocol::Msg::Section1.new(IDENTIFIER, updates)
48-
Protocol::Msg.new(flags, {}, command(connection), section)
48+
cmd = apply_relevant_timeouts_to(command(connection), connection)
49+
Protocol::Msg.new(flags, {}, cmd, section)
4950
end
5051
end
5152
end

lib/mongo/protocol/msg.rb

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -301,37 +301,6 @@ def maybe_add_server_api(server_api)
301301
Msg.new(@flags, @options, main_document, *@sequences)
302302
end
303303

304-
# Adds maxTimeMS attribute to the message if timeoutMS is set
305-
# and there is enough time left to send the message to the server
306-
# (remaining timeout is bigger than minimum round trip time for
307-
# the server.
308-
#
309-
# @param [ Mongo::Server::Connection ] connection Connection the message
310-
# should be sent with.
311-
# @param [ Mongo::Operation::Context ] context Context of the operation
312-
# the message is build from.
313-
#
314-
# @return [ Mongo::Protocol::Msg] message with added maxTimeMS attribute
315-
# if needed.
316-
#
317-
# @raise [ Mongo::Error::TimeoutError ] if timeout expired or there is
318-
# not enough time to send the message to the server.
319-
def maybe_add_max_time_ms(connection, context)
320-
return self unless context.has_timeout?
321-
322-
max_time_sec = context.remaining_timeout_sec - connection.server.minimum_round_trip_time
323-
if max_time_sec > 0
324-
max_time_ms = (max_time_sec * 1_000).to_i
325-
main_document = @main_document.merge(
326-
maxTimeMS: max_time_ms
327-
)
328-
Msg.new(@flags, @options, main_document, *@sequences)
329-
else
330-
raise Mongo::Error::TimeoutError
331-
end
332-
end
333-
334-
335304
# Returns the number of documents returned from the server.
336305
#
337306
# The Msg instance must be for a server reply and the reply must return

0 commit comments

Comments
 (0)