Skip to content

Commit ff86989

Browse files
RUBY-3390 CSOT for lisCollections
1 parent d4974a0 commit ff86989

File tree

12 files changed

+4635
-50
lines changed

12 files changed

+4635
-50
lines changed

lib/mongo/bulk_write.rb

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -60,20 +60,14 @@ def execute
6060
result_combiner = ResultCombiner.new
6161
operations = op_combiner.combine
6262
validate_requests!
63+
deadline = calculate_deadline
6364

6465
client.with_session(@options) do |session|
6566
operations.each do |operation|
66-
op_timeout_ms = if @deadline
67-
if @deadline == 0
68-
0
69-
else
70-
((@deadline - Utils.monotonic_time) * 1_000).to_i
71-
end
72-
end
7367
context = Operation::Context.new(
7468
client: client,
7569
session: session,
76-
operation_timeouts: { operation_timeout_ms: op_timeout_ms }
70+
operation_timeouts: { operation_timeout_ms: op_timeout_ms(deadline) }
7771
)
7872
if single_statement?(operation)
7973
write_concern = write_concern(session)
@@ -135,15 +129,8 @@ def initialize(collection, requests, options = {})
135129
@collection = collection
136130
@requests = requests
137131
@options = options || {}
138-
if @options[:timeout_ms]
139-
@timeout_ms = @options[:timeout_ms]
140-
if @timeout_ms > 0
141-
@deadline = Utils.monotonic_time + ( @timeout_ms / 1_000.0 )
142-
elsif @timeout_ms == 0
143-
@deadline = 0
144-
else
145-
raise ArgumentError, "timeout_ms options must be non-negative integer"
146-
end
132+
if @options[:timeout_ms] && @options[:timeout_ms] < 0
133+
raise ArgumentError, "timeout_ms options must be non-negative integer"
147134
end
148135
end
149136

@@ -183,6 +170,31 @@ def write_concern(session = nil)
183170
:update_one,
184171
:insert_one ].freeze
185172

173+
# @return [ Float | nil ] Deadline for the batch of operations, if set.
174+
def calculate_deadline
175+
timeout_ms = @options[:timeout_ms] || collection.timeout_ms
176+
return nil if timeout_ms.nil?
177+
178+
if timeout_ms == 0
179+
0
180+
else
181+
Utils.monotonic_time + (timeout_ms / 1_000.0)
182+
end
183+
end
184+
185+
# @param [ Float | nil ] deadline Deadline for the batch of operations.
186+
#
187+
# @return [ Integer | nil ] Timeout in milliseconds for the next operation.
188+
def op_timeout_ms(deadline)
189+
return nil if deadline.nil?
190+
191+
if deadline == 0
192+
0
193+
else
194+
((deadline - Utils.monotonic_time) * 1_000).to_i
195+
end
196+
end
197+
186198
def single_statement?(operation)
187199
SINGLE_STATEMENT_OPS.include?(operation.keys.first)
188200
end

lib/mongo/collection/view/iterable.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def select_cursor(session)
9999
Cursor.new(view, result, server, session: session, context: context)
100100
end
101101
else
102-
read_with_retry_cursor(session, server_selector, view) do |server|
102+
read_with_retry_cursor(session, server_selector, view, context: context) do |server|
103103
send_initial_query(server, context)
104104
end
105105
end

lib/mongo/collection/view/readable.rb

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ def count(opts = {})
187187
session: session,
188188
operation_timeouts: operation_timeouts(opts)
189189
)
190-
read_with_retry(session, selector) do |server|
190+
read_with_retry(session, selector, context) do |server|
191191
Operation::Count.new(
192192
selector: cmd,
193193
db_name: database.name,
@@ -237,7 +237,7 @@ def count_documents(opts = {})
237237
pipeline << { :'$limit' => opts[:limit] } if opts[:limit]
238238
pipeline << { :'$group' => { _id: 1, n: { :'$sum' => 1 } } }
239239

240-
opts = opts.slice(:hint, :max_time_ms, :read, :collation, :session, :comment)
240+
opts = opts.slice(:hint, :max_time_ms, :read, :collation, :session, :comment, :timeout_ms)
241241
opts[:collation] ||= collation
242242

243243
first = aggregate(pipeline, opts).first
@@ -279,12 +279,12 @@ def estimated_document_count(opts = {})
279279
read_pref = opts[:read] || read_preference
280280
selector = ServerSelector.get(read_pref || server_selector)
281281
with_session(opts) do |session|
282-
read_with_retry(session, selector) do |server|
283-
context = Operation::Context.new(
284-
client: client,
285-
session: session,
286-
operation_timeouts: operation_timeouts(opts)
287-
)
282+
context = Operation::Context.new(
283+
client: client,
284+
session: session,
285+
operation_timeouts: operation_timeouts(opts)
286+
)
287+
read_with_retry(session, selector, context) do |server|
288288
cmd = { count: collection.name }
289289
cmd[:maxTimeMS] = opts[:max_time_ms] if opts[:max_time_ms]
290290
if read_concern
@@ -347,7 +347,12 @@ def distinct(field_name, opts = {})
347347
read_pref = opts[:read] || read_preference
348348
selector = ServerSelector.get(read_pref || server_selector)
349349
with_session(opts) do |session|
350-
read_with_retry(session, selector) do |server|
350+
context = Operation::Context.new(
351+
client: client,
352+
session: session,
353+
operation_timeouts: operation_timeouts(opts)
354+
)
355+
read_with_retry(session, selector, context) do |server|
351356
Operation::Distinct.new(
352357
selector: cmd,
353358
db_name: database.name,
@@ -360,11 +365,7 @@ def distinct(field_name, opts = {})
360365
collation: opts[:collation] || opts['collation'] || collation,
361366
).execute(
362367
server,
363-
context: Operation::Context.new(
364-
client: client,
365-
session: session,
366-
operation_timeouts: operation_timeouts(opts)
367-
)
368+
context: context
368369
)
369370
end.first['values']
370371
end

lib/mongo/database.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ def [](collection_name, options = {})
128128
# required privilege to run the command when access control is enforced
129129
# @option options [ Object ] :comment A user-provided
130130
# comment to attach to this command.
131+
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
132+
# Must a positive integer. The default value is unset which means infinite.
131133
#
132134
# See https://mongodb.com/docs/manual/reference/command/listCollections/
133135
# for more information and usage.
@@ -156,6 +158,8 @@ def collection_names(options = {})
156158
# required privilege to run the command when access control is enforced.
157159
# @option options [ Object ] :comment A user-provided
158160
# comment to attach to this command.
161+
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
162+
# Must a positive integer. The default value is unset which means infinite.
159163
#
160164
# See https://mongodb.com/docs/manual/reference/command/listCollections/
161165
# for more information and usage.
@@ -181,6 +185,8 @@ def list_collections(options = {})
181185
# required privilege to run the command when access control is enforced.
182186
# @option options [ Object ] :comment A user-provided
183187
# comment to attach to this command.
188+
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
189+
# Must a positive integer. The default value is unset which means infinite.
184190
#
185191
# See https://mongodb.com/docs/manual/reference/command/listCollections/
186192
# for more information and usage.

lib/mongo/database/view.rb

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ class View
6060
# to run the command when access control is enforced.
6161
# @option options [ Object ] :comment A user-provided
6262
# comment to attach to this command.
63+
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
64+
# Must a positive integer. The default value is unset which means infinite.
6365
#
6466
# See https://mongodb.com/docs/manual/reference/command/listCollections/
6567
# for more information and usage.
@@ -70,9 +72,14 @@ class View
7072
# @since 2.0.0
7173
def collection_names(options = {})
7274
@batch_size = options[:batch_size]
73-
session = client.send(:get_session, options)
74-
cursor = read_with_retry_cursor(session, ServerSelector.primary, self) do |server|
75-
send_initial_query(server, session, options.merge(name_only: true))
75+
session = client.get_session(options)
76+
context = Operation::Context.new(
77+
client: client,
78+
session: session,
79+
operation_timeouts: operation_timeouts(options)
80+
)
81+
cursor = read_with_retry_cursor(session, ServerSelector.primary, self, context: context) do |server|
82+
send_initial_query(server, session, context, options.merge(name_only: true))
7683
end
7784
cursor.map do |info|
7885
if cursor.initial_result.connection_description.features.list_collections_enabled?
@@ -198,11 +205,16 @@ def operation_timeouts(opts = {})
198205

199206
def collections_info(session, server_selector, options = {}, &block)
200207
description = nil
201-
cursor = read_with_retry_cursor(session, server_selector, self) do |server|
208+
context = Operation::Context.new(
209+
client: client,
210+
session: session,
211+
operation_timeouts: operation_timeouts(options)
212+
)
213+
cursor = read_with_retry_cursor(session, server_selector, self, context: context) do |server|
202214
# TODO take description from the connection used to send the query
203215
# once https://jira.mongodb.org/browse/RUBY-1601 is fixed.
204216
description = server.description
205-
send_initial_query(server, session, options)
217+
send_initial_query(server, session, context, options)
206218
end
207219
# On 3.0+ servers, we get just the collection names.
208220
# On 2.6 server, we get collection names prefixed with the database
@@ -266,15 +278,15 @@ def initial_query_op(session, options = {})
266278
# types (where possible).
267279
#
268280
# @return [ Operation::Result ] Result of the query.
269-
def send_initial_query(server, session, options = {})
281+
def send_initial_query(server, session, context, options = {})
270282
opts = options.dup
271283
execution_opts = {}
272284
if opts.key?(:deserialize_as_bson)
273285
execution_opts[:deserialize_as_bson] = opts.delete(:deserialize_as_bson)
274286
end
275287
initial_query_op(session, opts).execute(
276288
server,
277-
context: Operation::Context.new(client: client, session: session, operation_timeouts: operation_timeouts(opts)),
289+
context: context,
278290
options: execution_opts
279291
)
280292
end

lib/mongo/index/view.rb

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -273,9 +273,15 @@ def get(keys_or_name)
273273
#
274274
# @since 2.0.0
275275
def each(&block)
276-
session = client.send(:get_session, @options)
277-
cursor = read_with_retry_cursor(session, ServerSelector.primary, self) do |server|
278-
send_initial_query(server, session)
276+
session = client.get_session(@options)
277+
context = Operation::Context.new(
278+
client: client,
279+
session: session,
280+
operation_timeouts: operation_timeouts(@options)
281+
)
282+
283+
cursor = read_with_retry_cursor(session, ServerSelector.primary, self, context: context) do |server|
284+
send_initial_query(server, session, context)
279285
end
280286
if block_given?
281287
cursor.each do |doc|
@@ -386,8 +392,8 @@ def normalize_models(models, server)
386392
end
387393
end
388394

389-
def send_initial_query(server, session)
390-
initial_query_op(session).execute(server, context: Operation::Context.new(client: client, session: session))
395+
def send_initial_query(server, session, context)
396+
initial_query_op(session).execute(server, context: context)
391397
end
392398
end
393399
end

lib/mongo/operation/find/op_msg.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@ def apply_find_timeouts_to(spec, timeout_ms)
7171
end
7272
end
7373

74-
spec
74+
spec.tap do |spc|
75+
spc.delete(:maxTimeMS) if spc[:maxTimeMS].nil?
76+
end
7577
end
7678

7779
def selector(connection)

lib/mongo/retryable/read_worker.rb

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -280,20 +280,21 @@ def read_without_retry(session, server_selector, &block)
280280
#
281281
# @return [ Result ] The result of the operation.
282282
def retry_read(original_error, session, server_selector, context: nil, failed_server: nil, &block)
283-
context&.check_timeout!
284-
285283
server = select_server_for_retry(
286284
original_error, session, server_selector, context, failed_server
287285
)
288286

289287
log_retry(original_error, message: 'Read retry')
290288

291289
begin
290+
context&.check_timeout!
292291
attempt = attempt ? attempt + 1 : 2
293292
yield server, true
293+
rescue Error::TimeoutError
294+
raise
294295
rescue *retryable_exceptions => e
295296
e.add_notes('modern retry', "attempt #{attempt}")
296-
if context&.deadline
297+
if context&.csot?
297298
failed_server = server
298299
retry
299300
else
@@ -303,7 +304,7 @@ def retry_read(original_error, session, server_selector, context: nil, failed_se
303304
e.add_note('modern retry')
304305
if e.write_retryable?
305306
e.add_note("attempt #{attempt}")
306-
if context&.deadline
307+
if context&.csot?
307308
failed_server = server
308309
retry
309310
else

0 commit comments

Comments
 (0)