Skip to content

Commit 65f02fc

Browse files
committed
feat(loadBalancer)!: add support for load balancers
1 parent 6cd982f commit 65f02fc

File tree

210 files changed

+7645
-240
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

210 files changed

+7645
-240
lines changed

src/cmap/connect.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
MIN_SUPPORTED_WIRE_VERSION,
1212
MIN_SUPPORTED_SERVER_VERSION
1313
} from './wire_protocol/constants';
14-
import type { Document } from '../bson';
14+
import type { Document, ObjectId } from '../bson';
1515

1616
import type { Socket, SocketConnectOpts } from 'net';
1717
import type { TLSSocket, ConnectionOptions as TLSConnectionOpts } from 'tls';
@@ -149,6 +149,7 @@ export interface HandshakeDocument extends Document {
149149
client: ClientMetadata;
150150
compression: string[];
151151
saslSupportedMechs?: string;
152+
loadBalanced?: boolean;
152153
}
153154

154155
function prepareHandshakeDocument(authContext: AuthContext, callback: Callback<HandshakeDocument>) {
@@ -158,7 +159,8 @@ function prepareHandshakeDocument(authContext: AuthContext, callback: Callback<H
158159
const handshakeDoc: HandshakeDocument = {
159160
ismaster: true,
160161
client: options.metadata || makeClientMetadata(options),
161-
compression: compressors
162+
compression: compressors,
163+
loadBalanced: options.loadBalanced
162164
};
163165

164166
const credentials = authContext.credentials;
@@ -295,7 +297,7 @@ function makeConnection(options: ConnectionOptions, _callback: CallbackWithType<
295297
socket.setNoDelay(noDelay);
296298

297299
const connectEvent = useTLS ? 'secureConnect' : 'connect';
298-
let cancellationHandler: (err: Error) => void;
300+
let cancellationHandler: (err: Error, serviceId?: ObjectId) => void;
299301
function errorHandler(eventName: ErrorHandlerEventName) {
300302
return (err: Error) => {
301303
SOCKET_ERROR_EVENTS.forEach(event => socket.removeAllListeners(event));

src/cmap/connection.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import {
3434
OpQueryOptions,
3535
Msg
3636
} from './commands';
37-
import { BSONSerializeOptions, Document, Long, pluckBSONSerializeOptions } from '../bson';
37+
import { BSONSerializeOptions, Document, Long, pluckBSONSerializeOptions, ObjectId } from '../bson';
3838
import type { AutoEncrypter } from '../deps';
3939
import type { MongoCredentials } from './auth/mongo_credentials';
4040
import type { Stream } from './connect';
@@ -220,6 +220,10 @@ export class Connection extends EventEmitter {
220220
this[kIsMaster] = response;
221221
}
222222

223+
get serviceIdd(): ObjectId {
224+
return this.ismaster.serviceId;
225+
}
226+
223227
get generation(): number {
224228
return this[kGeneration] || 0;
225229
}
@@ -632,8 +636,10 @@ export class CryptoConnection extends Connection {
632636
}
633637
}
634638

635-
function hasSessionSupport(conn: Connection) {
636-
return conn.description.logicalSessionTimeoutMinutes != null;
639+
/** @public */
640+
export function hasSessionSupport(conn: Connection): boolean {
641+
const description = conn.description;
642+
return description.logicalSessionTimeoutMinutes != null || !!description.loadBalanced;
637643
}
638644

639645
function supportsOpMsg(conn: Connection) {

src/cmap/connection_pool.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
ConnectionCheckedInEvent,
1919
ConnectionPoolClearedEvent
2020
} from './connection_pool_events';
21+
import type { ObjectId } from '../bson';
2122

2223
const kLogger = Symbol('logger');
2324
const kConnections = Symbol('connections');
@@ -270,6 +271,17 @@ export class ConnectionPool extends EventEmitter {
270271
this.emit('connectionPoolCleared', new ConnectionPoolClearedEvent(this));
271272
}
272273

274+
/**
275+
* Close all connections in the pool for the provided serviceId.
276+
*/
277+
closeConnections(serviceId: ObjectId): void {
278+
// cancel in flight matching connections.
279+
this[kCancellationToken].emit('cancel', serviceId);
280+
281+
// destroy each matching connection
282+
// this.destroyConnections('serverError', {}, serviceId, callback);
283+
}
284+
273285
/** Close the pool */
274286
close(callback: Callback<void>): void;
275287
close(options: CloseOptions, callback: Callback<void>): void;
@@ -314,19 +326,25 @@ export class ConnectionPool extends EventEmitter {
314326

315327
// mark the pool as closed immediately
316328
this.closed = true;
317-
318329
eachAsync<Connection>(
319330
this[kConnections].toArray(),
320331
(conn, cb) => {
332+
// Destroy the connection in the case of closing the entire pool
333+
// or if the connection matches the server id.
334+
//if (!serviceId || serviceId === conn.serviceId) {
321335
this.emit(
322336
ConnectionPool.CONNECTION_CLOSED,
323337
new ConnectionClosedEvent(this, conn, 'poolClosed')
324338
);
325339
conn.destroy(options, cb);
340+
//}
326341
},
327342
err => {
343+
// Dont close the entire pool for error on single server.
344+
//if (!serviceId) {
328345
this[kConnections].clear();
329346
this.emit(ConnectionPool.CONNECTION_POOL_CLOSED, new ConnectionPoolClosedEvent(this));
347+
//}
330348
callback(err);
331349
}
332350
);
@@ -382,6 +400,7 @@ function connectionIsIdle(pool: ConnectionPool, connection: Connection) {
382400
return !!(pool.options.maxIdleTimeMS && connection.idleTime > pool.options.maxIdleTimeMS);
383401
}
384402

403+
// TODO: Durran: In LB mode set the server id on the connection.
385404
function createConnection(pool: ConnectionPool, callback?: Callback<Connection>) {
386405
const connectOptions: ConnectionOptions = {
387406
...pool.options,

src/cmap/stream_description.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ const RESPONSE_FIELDS = [
1515
/** @public */
1616
export interface StreamDescriptionOptions {
1717
compressors?: CompressorName[];
18+
logicalSessionTimeoutMinutes?: number;
19+
loadBalanced?: boolean;
1820
}
1921

2022
/** @public */
@@ -29,6 +31,7 @@ export class StreamDescription {
2931
compressors: CompressorName[];
3032
compressor?: CompressorName;
3133
logicalSessionTimeoutMinutes?: number;
34+
loadBalanced?: boolean;
3235

3336
__nodejs_mock_server__?: boolean;
3437

@@ -42,6 +45,8 @@ export class StreamDescription {
4245
this.maxBsonObjectSize = 16777216;
4346
this.maxMessageSizeBytes = 48000000;
4447
this.maxWriteBatchSize = 100000;
48+
this.logicalSessionTimeoutMinutes = options ? options.logicalSessionTimeoutMinutes : undefined;
49+
this.loadBalanced = options ? !!options.loadBalanced : false;
4550
this.compressors =
4651
options && options.compressors && Array.isArray(options.compressors)
4752
? options.compressors

src/connection_string.ts

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ import { Logger, LoggerLevelId } from './logger';
3131
import { PromiseProvider } from './promise_provider';
3232
import { Encrypter } from './encrypter';
3333

34+
const VALID_TXT_RECORDS = ['authSource', 'replicaSet', 'loadBalanced'];
35+
36+
const LB_SINGLE_HOST = 'loadBalanced option only supported with a single host in the URI';
37+
const LB_REPLICA_SET = 'loadBalanced option not supported with a replicaSet option';
38+
const LB_DIRECT_CONNECTION = 'loadBalanced option not supported when directConnection is provided';
39+
3440
/**
3541
* Determines whether a provided address matches the provided parent domain in order
3642
* to avoid certain attack vectors.
@@ -83,6 +89,11 @@ export function resolveSRVRecord(options: MongoOptions, callback: Callback<HostA
8389
HostAddress.fromString(`${r.name}:${r.port ?? 27017}`)
8490
);
8591

92+
const lbError = checkLoadBalancerOptions(options, hostAddresses);
93+
if (lbError) {
94+
return callback(lbError);
95+
}
96+
8697
// Resolve TXT record and add options from there if they exist.
8798
dns.resolveTxt(lookupAddress, (err, record) => {
8899
if (err) {
@@ -96,14 +107,15 @@ export function resolveSRVRecord(options: MongoOptions, callback: Callback<HostA
96107

97108
const txtRecordOptions = new URLSearchParams(record[0].join(''));
98109
const txtRecordOptionKeys = [...txtRecordOptions.keys()];
99-
if (txtRecordOptionKeys.some(key => key !== 'authSource' && key !== 'replicaSet')) {
110+
if (txtRecordOptionKeys.some(key => !VALID_TXT_RECORDS.includes(key))) {
100111
return callback(
101-
new MongoParseError('Text record must only set `authSource` or `replicaSet`')
112+
new MongoParseError(`Text record must only set one of: ${VALID_TXT_RECORDS.join(', ')}`)
102113
);
103114
}
104115

105116
const source = txtRecordOptions.get('authSource') ?? undefined;
106117
const replicaSet = txtRecordOptions.get('replicaSet') ?? undefined;
118+
const loadBalanced = txtRecordOptions.get('loadBalanced') ?? undefined;
107119

108120
if (source === '' || replicaSet === '') {
109121
return callback(new MongoParseError('Cannot have empty URI params in DNS TXT Record'));
@@ -116,13 +128,49 @@ export function resolveSRVRecord(options: MongoOptions, callback: Callback<HostA
116128
if (!options.userSpecifiedReplicaSet && replicaSet) {
117129
options.replicaSet = replicaSet;
118130
}
131+
132+
if (loadBalanced === 'true') {
133+
options.loadBalanced = true;
134+
}
135+
136+
const lbError = checkLoadBalancerOptions(options, hostAddresses);
137+
if (lbError) {
138+
return callback(lbError);
139+
}
140+
141+
if (options.loadBalanced) {
142+
if (options.replicaSet) {
143+
return callback(new MongoParseError(LB_REPLICA_SET));
144+
}
145+
146+
if (hostAddresses.length > 1) {
147+
return callback(new MongoParseError(LB_SINGLE_HOST));
148+
}
149+
}
119150
}
120151

121152
callback(undefined, hostAddresses);
122153
});
123154
});
124155
}
125156

157+
function checkLoadBalancerOptions(
158+
options: MongoOptions,
159+
addresses: HostAddress[]
160+
): MongoParseError | null {
161+
if (options.loadBalanced) {
162+
if (options.replicaSet) {
163+
return new MongoParseError(LB_REPLICA_SET);
164+
}
165+
166+
if (addresses.length > 1) {
167+
return new MongoParseError(LB_SINGLE_HOST);
168+
}
169+
return null;
170+
}
171+
return null;
172+
}
173+
126174
/**
127175
* Checks if TLS options are valid
128176
*
@@ -431,6 +479,8 @@ export function parseOptions(
431479
throw new MongoParseError('directConnection not supported with SRV URI');
432480
}
433481

482+
validateLoadBalancedOptions(hosts, mongoOptions);
483+
434484
// Potential SRV Overrides
435485
mongoOptions.userSpecifiedAuthSource =
436486
objectOptions.has('authSource') || urlOptions.has('authSource');
@@ -446,6 +496,20 @@ export function parseOptions(
446496
return mongoOptions;
447497
}
448498

499+
function validateLoadBalancedOptions(hosts: HostAddress[] | string[], mongoOptions: MongoOptions) {
500+
if (mongoOptions.loadBalanced) {
501+
if (hosts.length > 1) {
502+
throw new MongoParseError(LB_SINGLE_HOST);
503+
}
504+
if (mongoOptions.replicaSet) {
505+
throw new MongoParseError(LB_REPLICA_SET);
506+
}
507+
if (mongoOptions.directConnection) {
508+
throw new MongoParseError(LB_DIRECT_CONNECTION);
509+
}
510+
}
511+
}
512+
449513
function setOption(
450514
mongoOptions: any,
451515
key: string,
@@ -706,6 +770,10 @@ export const OPTIONS = {
706770
default: 120000,
707771
type: 'uint'
708772
},
773+
loadBalanced: {
774+
default: false,
775+
type: 'boolean'
776+
},
709777
localThresholdMS: {
710778
default: 15,
711779
type: 'uint'

src/mongo_client.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ export interface MongoClientOptions extends BSONSerializeOptions, SupportedNodeC
174174
retryWrites?: boolean;
175175
/** Allow a driver to force a Single topology type with a connection string containing one host */
176176
directConnection?: boolean;
177+
/** Instruct the driver it is connecting to a load balancer fronting a mongos like service */
178+
loadBalanced?: boolean;
177179

178180
/** The write concern w value */
179181
w?: W;
@@ -623,6 +625,7 @@ export interface MongoOptions
623625
| 'heartbeatFrequencyMS'
624626
| 'keepAlive'
625627
| 'keepAliveInitialDelay'
628+
| 'loadBalanced'
626629
| 'localThresholdMS'
627630
| 'logger'
628631
| 'maxIdleTimeMS'

src/sdam/common.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ export const TopologyType = Object.freeze({
1717
ReplicaSetNoPrimary: 'ReplicaSetNoPrimary',
1818
ReplicaSetWithPrimary: 'ReplicaSetWithPrimary',
1919
Sharded: 'Sharded',
20-
Unknown: 'Unknown'
20+
Unknown: 'Unknown',
21+
LoadBalanced: 'LoadBalanced'
2122
} as const);
2223

2324
/** @public */
@@ -36,7 +37,8 @@ export const ServerType = Object.freeze({
3637
RSArbiter: 'RSArbiter',
3738
RSOther: 'RSOther',
3839
RSGhost: 'RSGhost',
39-
Unknown: 'Unknown'
40+
Unknown: 'Unknown',
41+
LoadBalancer: 'LoadBalancer'
4042
} as const);
4143

4244
/** @public */

src/sdam/monitor.ts

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ export interface MonitorOptions
5353
connectTimeoutMS: number;
5454
heartbeatFrequencyMS: number;
5555
minHeartbeatFrequencyMS: number;
56+
loadBalanced?: boolean;
5657
}
5758

5859
/** @public */
@@ -61,7 +62,10 @@ export class Monitor extends EventEmitter {
6162
s: MonitorPrivate;
6263
address: string;
6364
options: Readonly<
64-
Pick<MonitorOptions, 'connectTimeoutMS' | 'heartbeatFrequencyMS' | 'minHeartbeatFrequencyMS'>
65+
Pick<
66+
MonitorOptions,
67+
'connectTimeoutMS' | 'heartbeatFrequencyMS' | 'minHeartbeatFrequencyMS' | 'loadBalanced'
68+
>
6569
>;
6670
connectOptions: ConnectionOptions;
6771
[kServer]: Server;
@@ -87,7 +91,8 @@ export class Monitor extends EventEmitter {
8791
this.options = Object.freeze({
8892
connectTimeoutMS: options.connectTimeoutMS ?? 10000,
8993
heartbeatFrequencyMS: options.heartbeatFrequencyMS ?? 10000,
90-
minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS ?? 500
94+
minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS ?? 500,
95+
loadBalanced: options.loadBalanced ?? false
9196
});
9297

9398
const cancellationToken = this[kCancellationToken];
@@ -211,14 +216,20 @@ function checkServer(monitor: Monitor, callback: Callback<Document>) {
211216
const connection = monitor[kConnection];
212217
if (connection && !connection.closed) {
213218
const connectTimeoutMS = monitor.options.connectTimeoutMS;
219+
const loadBalanced = monitor.options.loadBalanced;
214220
const maxAwaitTimeMS = monitor.options.heartbeatFrequencyMS;
215221
const topologyVersion = monitor[kServer].description.topologyVersion;
216222
const isAwaitable = topologyVersion != null;
217223

218224
const cmd =
219225
isAwaitable && topologyVersion
220-
? { ismaster: true, maxAwaitTimeMS, topologyVersion: makeTopologyVersion(topologyVersion) }
221-
: { ismaster: true };
226+
? {
227+
ismaster: true,
228+
maxAwaitTimeMS,
229+
topologyVersion: makeTopologyVersion(topologyVersion),
230+
loadBalanced: loadBalanced
231+
}
232+
: { ismaster: true, loadBalanced: loadBalanced };
222233

223234
const options = isAwaitable
224235
? {

0 commit comments

Comments
 (0)