Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 29 additions & 17 deletions lib/mongo/bulk_write.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,14 @@ def execute
result_combiner = ResultCombiner.new
operations = op_combiner.combine
validate_requests!
deadline = calculate_deadline

client.with_session(@options) do |session|
operations.each do |operation|
op_timeout_ms = if @deadline
if @deadline == 0
0
else
((@deadline - Utils.monotonic_time) * 1_000).to_i
end
end
context = Operation::Context.new(
client: client,
session: session,
operation_timeouts: { operation_timeout_ms: op_timeout_ms }
operation_timeouts: { operation_timeout_ms: op_timeout_ms(deadline) }
)
if single_statement?(operation)
write_concern = write_concern(session)
Expand Down Expand Up @@ -135,15 +129,8 @@ def initialize(collection, requests, options = {})
@collection = collection
@requests = requests
@options = options || {}
if @options[:timeout_ms]
@timeout_ms = @options[:timeout_ms]
if @timeout_ms > 0
@deadline = Utils.monotonic_time + ( @timeout_ms / 1_000.0 )
elsif @timeout_ms == 0
@deadline = 0
else
raise ArgumentError, "timeout_ms options must be non-negative integer"
end
if @options[:timeout_ms] && @options[:timeout_ms] < 0
raise ArgumentError, "timeout_ms options must be non-negative integer"
end
end

Expand Down Expand Up @@ -183,6 +170,31 @@ def write_concern(session = nil)
:update_one,
:insert_one ].freeze

# @return [ Float | nil ] Deadline for the batch of operations, if set.
def calculate_deadline
timeout_ms = @options[:timeout_ms] || collection.timeout_ms
return nil if timeout_ms.nil?

if timeout_ms == 0
0
else
Utils.monotonic_time + (timeout_ms / 1_000.0)
end
end

# @param [ Float | nil ] deadline Deadline for the batch of operations.
#
# @return [ Integer | nil ] Timeout in milliseconds for the next operation.
def op_timeout_ms(deadline)
return nil if deadline.nil?

if deadline == 0
0
else
((deadline - Utils.monotonic_time) * 1_000).to_i
end
end

def single_statement?(operation)
SINGLE_STATEMENT_OPS.include?(operation.keys.first)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/collection/view/iterable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def select_cursor(session)
Cursor.new(view, result, server, session: session, context: context)
end
else
read_with_retry_cursor(session, server_selector, view) do |server|
read_with_retry_cursor(session, server_selector, view, context: context) do |server|
send_initial_query(server, context)
end
end
Expand Down
29 changes: 15 additions & 14 deletions lib/mongo/collection/view/readable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def count(opts = {})
session: session,
operation_timeouts: operation_timeouts(opts)
)
read_with_retry(session, selector) do |server|
read_with_retry(session, selector, context) do |server|
Operation::Count.new(
selector: cmd,
db_name: database.name,
Expand Down Expand Up @@ -237,7 +237,7 @@ def count_documents(opts = {})
pipeline << { :'$limit' => opts[:limit] } if opts[:limit]
pipeline << { :'$group' => { _id: 1, n: { :'$sum' => 1 } } }

opts = opts.slice(:hint, :max_time_ms, :read, :collation, :session, :comment)
opts = opts.slice(:hint, :max_time_ms, :read, :collation, :session, :comment, :timeout_ms)
opts[:collation] ||= collation

first = aggregate(pipeline, opts).first
Expand Down Expand Up @@ -279,12 +279,12 @@ def estimated_document_count(opts = {})
read_pref = opts[:read] || read_preference
selector = ServerSelector.get(read_pref || server_selector)
with_session(opts) do |session|
read_with_retry(session, selector) do |server|
context = Operation::Context.new(
client: client,
session: session,
operation_timeouts: operation_timeouts(opts)
)
context = Operation::Context.new(
client: client,
session: session,
operation_timeouts: operation_timeouts(opts)
)
read_with_retry(session, selector, context) do |server|
cmd = { count: collection.name }
cmd[:maxTimeMS] = opts[:max_time_ms] if opts[:max_time_ms]
if read_concern
Expand Down Expand Up @@ -347,7 +347,12 @@ def distinct(field_name, opts = {})
read_pref = opts[:read] || read_preference
selector = ServerSelector.get(read_pref || server_selector)
with_session(opts) do |session|
read_with_retry(session, selector) do |server|
context = Operation::Context.new(
client: client,
session: session,
operation_timeouts: operation_timeouts(opts)
)
read_with_retry(session, selector, context) do |server|
Operation::Distinct.new(
selector: cmd,
db_name: database.name,
Expand All @@ -360,11 +365,7 @@ def distinct(field_name, opts = {})
collation: opts[:collation] || opts['collation'] || collation,
).execute(
server,
context: Operation::Context.new(
client: client,
session: session,
operation_timeouts: operation_timeouts(opts)
)
context: context
)
end.first['values']
end
Expand Down
6 changes: 6 additions & 0 deletions lib/mongo/database.rb
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ def [](collection_name, options = {})
# required privilege to run the command when access control is enforced
# @option options [ Object ] :comment A user-provided
# comment to attach to this command.
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
#
# See https://mongodb.com/docs/manual/reference/command/listCollections/
# for more information and usage.
Expand Down Expand Up @@ -156,6 +158,8 @@ def collection_names(options = {})
# required privilege to run the command when access control is enforced.
# @option options [ Object ] :comment A user-provided
# comment to attach to this command.
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
#
# See https://mongodb.com/docs/manual/reference/command/listCollections/
# for more information and usage.
Expand All @@ -181,6 +185,8 @@ def list_collections(options = {})
# required privilege to run the command when access control is enforced.
# @option options [ Object ] :comment A user-provided
# comment to attach to this command.
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
#
# See https://mongodb.com/docs/manual/reference/command/listCollections/
# for more information and usage.
Expand Down
26 changes: 19 additions & 7 deletions lib/mongo/database/view.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class View
# to run the command when access control is enforced.
# @option options [ Object ] :comment A user-provided
# comment to attach to this command.
# @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
# Must a positive integer. The default value is unset which means infinite.
#
# See https://mongodb.com/docs/manual/reference/command/listCollections/
# for more information and usage.
Expand All @@ -70,9 +72,14 @@ class View
# @since 2.0.0
def collection_names(options = {})
@batch_size = options[:batch_size]
session = client.send(:get_session, options)
cursor = read_with_retry_cursor(session, ServerSelector.primary, self) do |server|
send_initial_query(server, session, options.merge(name_only: true))
session = client.get_session(options)
context = Operation::Context.new(
client: client,
session: session,
operation_timeouts: operation_timeouts(options)
)
cursor = read_with_retry_cursor(session, ServerSelector.primary, self, context: context) do |server|
send_initial_query(server, session, context, options.merge(name_only: true))
end
cursor.map do |info|
if cursor.initial_result.connection_description.features.list_collections_enabled?
Expand Down Expand Up @@ -198,11 +205,16 @@ def operation_timeouts(opts = {})

def collections_info(session, server_selector, options = {}, &block)
description = nil
cursor = read_with_retry_cursor(session, server_selector, self) do |server|
context = Operation::Context.new(
client: client,
session: session,
operation_timeouts: operation_timeouts(options)
)
cursor = read_with_retry_cursor(session, server_selector, self, context: context) do |server|
# TODO take description from the connection used to send the query
# once https://jira.mongodb.org/browse/RUBY-1601 is fixed.
description = server.description
send_initial_query(server, session, options)
send_initial_query(server, session, context, options)
end
# On 3.0+ servers, we get just the collection names.
# On 2.6 server, we get collection names prefixed with the database
Expand Down Expand Up @@ -266,15 +278,15 @@ def initial_query_op(session, options = {})
# types (where possible).
#
# @return [ Operation::Result ] Result of the query.
def send_initial_query(server, session, options = {})
def send_initial_query(server, session, context, options = {})
opts = options.dup
execution_opts = {}
if opts.key?(:deserialize_as_bson)
execution_opts[:deserialize_as_bson] = opts.delete(:deserialize_as_bson)
end
initial_query_op(session, opts).execute(
server,
context: Operation::Context.new(client: client, session: session, operation_timeouts: operation_timeouts(opts)),
context: context,
options: execution_opts
)
end
Expand Down
16 changes: 11 additions & 5 deletions lib/mongo/index/view.rb
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,15 @@ def get(keys_or_name)
#
# @since 2.0.0
def each(&block)
session = client.send(:get_session, @options)
cursor = read_with_retry_cursor(session, ServerSelector.primary, self) do |server|
send_initial_query(server, session)
session = client.get_session(@options)
context = Operation::Context.new(
client: client,
session: session,
operation_timeouts: operation_timeouts(@options)
)

cursor = read_with_retry_cursor(session, ServerSelector.primary, self, context: context) do |server|
send_initial_query(server, session, context)
end
if block_given?
cursor.each do |doc|
Expand Down Expand Up @@ -386,8 +392,8 @@ def normalize_models(models, server)
end
end

def send_initial_query(server, session)
initial_query_op(session).execute(server, context: Operation::Context.new(client: client, session: session))
def send_initial_query(server, session, context)
initial_query_op(session).execute(server, context: context)
end
end
end
Expand Down
4 changes: 3 additions & 1 deletion lib/mongo/operation/find/op_msg.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ def apply_find_timeouts_to(spec, timeout_ms)
end
end

spec
spec.tap do |spc|
spc.delete(:maxTimeMS) if spc[:maxTimeMS].nil?
end
end

def selector(connection)
Expand Down
9 changes: 5 additions & 4 deletions lib/mongo/retryable/read_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -280,20 +280,21 @@ def read_without_retry(session, server_selector, &block)
#
# @return [ Result ] The result of the operation.
def retry_read(original_error, session, server_selector, context: nil, failed_server: nil, &block)
context&.check_timeout!

server = select_server_for_retry(
original_error, session, server_selector, context, failed_server
)

log_retry(original_error, message: 'Read retry')

begin
context&.check_timeout!
attempt = attempt ? attempt + 1 : 2
yield server, true
rescue Error::TimeoutError
raise
rescue *retryable_exceptions => e
e.add_notes('modern retry', "attempt #{attempt}")
if context&.deadline
if context&.csot?
failed_server = server
retry
else
Expand All @@ -303,7 +304,7 @@ def retry_read(original_error, session, server_selector, context: nil, failed_se
e.add_note('modern retry')
if e.write_retryable?
e.add_note("attempt #{attempt}")
if context&.deadline
if context&.csot?
failed_server = server
retry
else
Expand Down
Loading