Skip to content

Commit 66c6c2f

Browse files
authored
RUBY-3372 CSOT Cursors (#2863)
* cursors CSOT * need to add context as parameter * wups, maxTimeMS, not timeoutMS * there is no msg variable here * context#timeout_ms is not actually used anywhere
1 parent ba6c83b commit 66c6c2f

File tree

43 files changed

+1327
-307
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1327
-307
lines changed

lib/mongo/client.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1104,7 +1104,7 @@ def watch(pipeline = [], options = {})
11041104
return use(Database::ADMIN).watch(pipeline, options) unless database.name == Database::ADMIN
11051105

11061106
view_options = options.dup
1107-
view_options[:await_data] = true if options[:max_await_time_ms]
1107+
view_options[:cursor_type] = :tailable_await if options[:max_await_time_ms]
11081108

11091109
Mongo::Collection::View::ChangeStream.new(
11101110
Mongo::Collection::View.new(self["#{Database::COMMAND}.aggregate"], {}, view_options),

lib/mongo/collection.rb

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ def initialize(database, name, options = {})
165165
@database = database
166166
@name = name.to_s.freeze
167167
@options = options.dup
168+
@timeout_ms = options.delete(:timeout_ms)
168169
=begin WriteConcern object support
169170
if @options[:write_concern].is_a?(WriteConcern::Base)
170171
# Cache the instance so that we do not needlessly reconstruct it.
@@ -504,6 +505,9 @@ def drop(opts = {})
504505
# @option options [ Integer ] :skip The number of docs to skip before returning results.
505506
# @option options [ Hash ] :sort The key and direction pairs by which the result set
506507
# will be sorted.
508+
# @option options [ :cursor_lifetime | :iteration ] :timeout_mode How to interpret
509+
# :timeout_ms (whether it applies to the lifetime of the cursor, or per
510+
# iteration).
507511
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
508512
# Must a positive integer. The default value is unset which means infinite.
509513
# @option options [ Hash ] :let Mapping of variables to use in the command.
@@ -538,10 +542,6 @@ def find(filter = nil, options = {})
538542
# See the server documentation for details.
539543
# @option options [ Integer ] :max_time_ms The maximum amount of time in
540544
# milliseconds to allow the aggregation to run.
541-
# @option options [ true | false ] :use_cursor Indicates whether the command
542-
# will request that the server provide results using a cursor. Note that
543-
# as of server version 3.6, aggregations always provide results using a
544-
# cursor and this option is therefore not valid.
545545
# @option options [ Session ] :session The session to use.
546546
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
547547
# Must a positive integer. The default value is unset which means infinite.
@@ -612,6 +612,11 @@ def aggregate(pipeline, options = {})
612612
# events included with this flag set are: createIndexes, dropIndexes,
613613
# modify, create, shardCollection, reshardCollection,
614614
# refineCollectionShardKey.
615+
# @option options [ :cursor_lifetime | :iteration ] :timeout_mode How to interpret
616+
# :timeout_ms (whether it applies to the lifetime of the cursor, or per
617+
# iteration).
618+
# @option options [ Integer ] :timeout_ms The maximum amount of time to
619+
# allow the query to run, in milliseconds.
615620
#
616621
# @note A change stream only allows 'majority' read concern.
617622
# @note This helper method is preferable to running a raw aggregation with
@@ -622,7 +627,7 @@ def aggregate(pipeline, options = {})
622627
# @since 2.5.0
623628
def watch(pipeline = [], options = {})
624629
view_options = options.dup
625-
view_options[:await_data] = true if options[:max_await_time_ms]
630+
view_options[:cursor_type] = :tailable_await if options[:max_await_time_ms]
626631
View::ChangeStream.new(View.new(self, {}, view_options), pipeline, nil, options)
627632
end
628633

@@ -966,12 +971,17 @@ def delete_many(filter = nil, options = {})
966971
# @option options [ Integer ] :max_time_ms The maximum amount of time to allow the command
967972
# to run in milliseconds.
968973
# @option options [ Session ] :session The session to use.
974+
# @option options [ :cursor_lifetime | :iteration ] :timeout_mode How to interpret
975+
# :timeout_ms (whether it applies to the lifetime of the cursor, or per
976+
# iteration).
977+
# @option options [ Integer ] :timeout_ms The maximum amount of time to
978+
# allow the query to run, in milliseconds.
969979
#
970980
# @return [ Array<Cursor> ] An array of cursors.
971981
#
972982
# @since 2.1
973983
def parallel_scan(cursor_count, options = {})
974-
find({}, options).send(:parallel_scan, cursor_count, options)
984+
find({}, options).parallel_scan(cursor_count, options)
975985
end
976986

977987
# Replaces a single document in the collection with the new document.
@@ -1205,14 +1215,14 @@ def system_collection?
12051215
#
12061216
# @api private
12071217
def timeout_ms
1208-
options[:timeout_ms] || database.timeout_ms
1218+
@timeout_ms || database.timeout_ms
12091219
end
12101220

12111221
# @return [ Hash ] timeout_ms value set on the operation level (if any),
12121222
# and/or timeout_ms that is set on collection/database/client level (if any).
12131223
#
12141224
# @api private
1215-
def operation_timeouts(opts)
1225+
def operation_timeouts(opts = {})
12161226
# TODO: We should re-evaluate if we need two timeouts separately.
12171227
{}.tap do |result|
12181228
if opts[:timeout_ms].nil?

lib/mongo/collection/view.rb

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ class View
6666
:nro_write_with_retry,
6767
:read_with_retry,
6868
:read_with_retry_cursor,
69-
:timeout_ms,
7069
:write_with_retry,
7170
:write_concern_with_session
7271

@@ -75,6 +74,12 @@ class View
7574

7675
alias :selector :filter
7776

77+
# @return [ Integer | nil | The timeout_ms value that was passed as an
78+
# option to the view.
79+
#
80+
# @api private
81+
attr_reader :operation_timeout_ms
82+
7883
# Compare two +View+ objects.
7984
#
8085
# @example Compare the view with another object.
@@ -152,17 +157,24 @@ def hash
152157
# document more than once. Deprecated as of MongoDB server version 4.0.
153158
# @option options [ Hash ] :sort The key and direction pairs used to sort
154159
# the results.
160+
# @option options [ :cursor_lifetime | :iteration ] :timeout_mode How to interpret
161+
# :timeout_ms (whether it applies to the lifetime of the cursor, or per
162+
# iteration).
155163
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
156164
# Must a positive integer. The default value is unset which means infinite.
157165
#
158166
# @since 2.0.0
159167
def initialize(collection, filter = {}, options = {})
160168
validate_doc!(filter)
161-
@collection = collection
162169

163170
filter = BSON::Document.new(filter)
164171
options = BSON::Document.new(options)
165172

173+
@collection = collection
174+
@operation_timeout_ms = options.delete(:timeout_ms)
175+
176+
validate_timeout_mode!(options)
177+
166178
# This is when users pass $query in filter and other modifiers
167179
# alongside?
168180
query = filter.delete(:$query)
@@ -174,6 +186,14 @@ def initialize(collection, filter = {}, options = {})
174186
@options = Operation::Find::Builder::Modifiers.map_driver_options(modifiers).merge!(options).freeze
175187
end
176188

189+
# The timeout_ms value to use for this operation; either specified as an
190+
# option to the view, or inherited from the collection.
191+
#
192+
# @return [ Integer | nil ] the timeout_ms for this operation
193+
def timeout_ms
194+
operation_timeout_ms || collection.timeout_ms
195+
end
196+
177197
# Get a human-readable string representation of +View+.
178198
#
179199
# @example Get the inspection.
@@ -199,6 +219,20 @@ def write_concern
199219
WriteConcern.get(options[:write_concern] || options[:write] || collection.write_concern)
200220
end
201221

222+
# @return [ Hash ] timeout_ms value set on the operation level (if any),
223+
# and/or timeout_ms that is set on collection/database/client level (if any).
224+
#
225+
# @api private
226+
def operation_timeouts(opts = {})
227+
{}.tap do |result|
228+
if opts[:timeout_ms] || operation_timeout_ms
229+
result[:operation_timeout_ms] = opts[:timeout_ms] || operation_timeout_ms
230+
else
231+
result[:inherited_timeout_ms] = collection.timeout_ms
232+
end
233+
end
234+
end
235+
202236
private
203237

204238
def initialize_copy(other)
@@ -208,27 +242,14 @@ def initialize_copy(other)
208242
end
209243

210244
def new(options)
245+
options = options.merge(timeout_ms: operation_timeout_ms) if operation_timeout_ms
211246
View.new(collection, filter, options)
212247
end
213248

214249
def view; self; end
215250

216251
def with_session(opts = {}, &block)
217-
client.send(:with_session, @options.merge(opts), &block)
218-
end
219-
220-
# @return [ Hash ] timeout_ms value set on the operation level (if any),
221-
# and/or timeout_ms that is set on collection/database/client level (if any).
222-
#
223-
# @api private
224-
def operation_timeouts(opts)
225-
{}.tap do |result|
226-
if opts[:timeout_ms].nil?
227-
result[:inherited_timeout_ms] = collection.timeout_ms
228-
else
229-
result[:operation_timeout_ms] = opts[:timeout_ms]
230-
end
231-
end
252+
client.with_session(@options.merge(opts), &block)
232253
end
233254
end
234255
end

lib/mongo/collection/view/aggregation.rb

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class Aggregation
3737
attr_reader :pipeline
3838

3939
# Delegate necessary operations to the view.
40-
def_delegators :view, :collection, :read, :cluster
40+
def_delegators :view, :collection, :read, :cluster, :cursor_type
4141

4242
# Delegate necessary operations to the collection.
4343
def_delegators :collection, :database, :client
@@ -87,22 +87,34 @@ def allow_disk_use(value = nil)
8787
# See the server documentation for details.
8888
# @option options [ Integer ] :max_time_ms The maximum amount of time in
8989
# milliseconds to allow the aggregation to run.
90-
# @option options [ true, false ] :use_cursor Indicates whether the command
91-
# will request that the server provide results using a cursor. Note that
92-
# as of server version 3.6, aggregations always provide results using a
93-
# cursor and this option is therefore not valid.
9490
# @option options [ Session ] :session The session to use.
91+
# @option options [ :cursor_lifetime | :iteration ] :timeout_mode How to interpret
92+
# :timeout_ms (whether it applies to the lifetime of the cursor, or per
93+
# iteration).
9594
# @option options [ Integer ] :timeout_ms The per-operation timeout in milliseconds.
9695
# Must a positive integer. The default value is unset which means infinite.
9796
#
9897
# @since 2.0.0
9998
def initialize(view, pipeline, options = {})
10099
@view = view
101100
@pipeline = pipeline.dup
101+
102+
@timeout_ms = options.delete(:timeout_ms)
103+
@options = BSON::Document.new(options).freeze
104+
105+
validate_timeout_mode!(options)
106+
102107
unless Mongo.broken_view_aggregate || view.filter.empty?
103108
@pipeline.unshift(:$match => view.filter)
104109
end
105-
@options = BSON::Document.new(options).freeze
110+
end
111+
112+
# @return [ Integer | nil ] the timeout_ms value that was passed as
113+
# an option to this object, or which was inherited from the view.
114+
#
115+
# @api private
116+
def timeout_ms
117+
@timeout_ms || view.timeout_ms
106118
end
107119

108120
# Get the explain plan for the aggregation.
@@ -182,15 +194,10 @@ def effective_read_preference(connection)
182194

183195
end
184196

185-
def send_initial_query(server, session)
186-
context = Operation::Context.new(
187-
client: client,
188-
session: session,
189-
operation_timeouts: operation_timeouts(options)
190-
)
197+
def send_initial_query(server, context)
191198
server.with_connection do |connection|
192199
initial_query_op(
193-
session,
200+
context.session,
194201
effective_read_preference(connection)
195202
).execute_with_connection(
196203
connection,
@@ -214,16 +221,16 @@ def cache_options
214221
}
215222
end
216223

217-
# @return [ Hash ] timeout_ms value set on the operation level (if any),
218-
# and/or timeout_ms that is set on collection/database/client level (if any).
219-
#
220-
# @api private
221-
def operation_timeouts(opts)
224+
# @return [ Hash ] timeout_ms value set on the operation level (if any),
225+
# and/or timeout_ms that is set on collection/database/client level (if any).
226+
#
227+
# @api private
228+
def operation_timeouts(opts = {})
222229
{}.tap do |result|
223-
if opts[:timeout_ms].nil?
224-
result[:inherited_timeout_ms] = collection.timeout_ms
230+
if opts[:timeout_ms] || @timeout_ms
231+
result[:operation_timeout_ms] = opts.delete(:timeout_ms) || @timeout_ms
225232
else
226-
result[:operation_timeout_ms] = opts.delete(:timeout_ms)
233+
result[:inherited_timeout_ms] = view.timeout_ms
227234
end
228235
end
229236
end

lib/mongo/collection/view/builder/aggregation.rb

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -113,17 +113,11 @@ def aggregation_command
113113
command[:readConcern] = Options::Mapper.transform_values_to_strings(
114114
read_concern)
115115
end
116-
command[:cursor] = cursor if cursor
116+
command[:cursor] = batch_size_doc
117117
command.merge!(Options::Mapper.transform_documents(options, MAPPINGS))
118118
command
119119
end
120120

121-
def cursor
122-
if options[:use_cursor] == true || options[:use_cursor].nil?
123-
batch_size_doc
124-
end
125-
end
126-
127121
def batch_size_doc
128122
value = options[:batch_size] || view.batch_size
129123
if value == 0 && write?

lib/mongo/collection/view/change_stream.rb

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ class ChangeStream < Aggregation
6060
# @since 2.5.0
6161
attr_reader :options
6262

63+
# @return [ Cursor ] the underlying cursor for this operation
64+
# @api private
65+
attr_reader :cursor
66+
6367
# Initialize the change stream for the provided collection view, pipeline
6468
# and options.
6569
#
@@ -231,13 +235,16 @@ def try_next
231235
# This method ignores any errors that occur when closing the
232236
# server-side cursor.
233237
#
238+
# @params [ Hash ] opts Options to be passed to the cursor close
239+
# command.
240+
#
234241
# @return [ nil ] Always nil.
235242
#
236243
# @since 2.5.0
237-
def close
244+
def close(opts = {})
238245
unless closed?
239246
begin
240-
@cursor.close
247+
@cursor.close(opts)
241248
rescue Error::OperationFailure::Family, Error::SocketError, Error::SocketTimeoutError, Error::MissingConnection
242249
# ignore
243250
end
@@ -303,14 +310,16 @@ def create_cursor!
303310
# (rolling upgrades)
304311
@start_at_operation_time_supported = nil
305312

306-
session = client.send(:get_session, @options)
313+
session = client.get_session(@options)
314+
context = Operation::Context.new(client: client, session: session, operation_timeouts: operation_timeouts)
315+
307316
start_at_operation_time = nil
308317
start_at_operation_time_supported = nil
309-
@cursor = read_with_retry_cursor(session, server_selector, view) do |server|
318+
@cursor = read_with_retry_cursor(session, server_selector, view, context: context) do |server|
310319
server.with_connection do |connection|
311320
start_at_operation_time_supported = connection.description.server_version_gte?('4.0')
312321

313-
result = send_initial_query(connection, session)
322+
result = send_initial_query(connection, context)
314323
if doc = result.replies.first && result.replies.first.documents.first
315324
start_at_operation_time = doc['operationTime']
316325
else
@@ -390,11 +399,11 @@ def change_doc
390399
end
391400
end
392401

393-
def send_initial_query(connection, session)
394-
initial_query_op(session, view.read_preference)
402+
def send_initial_query(connection, context)
403+
initial_query_op(context.session, view.read_preference)
395404
.execute_with_connection(
396405
connection,
397-
context: Operation::Context.new(client: client, session: session),
406+
context: context,
398407
)
399408
end
400409

0 commit comments

Comments
 (0)