Skip to content

Commit 1fc6b7d

Browse files
committed
Use maxWaitTimeout consistently.
Add JavaDoc to StartTime. Shutdown executor properly.
1 parent 440891e commit 1fc6b7d

File tree

7 files changed

+54
-47
lines changed

7 files changed

+54
-47
lines changed

driver-core/src/main/com/mongodb/internal/TimeoutContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ public TimeoutContext withComputedServerSelectionTimeoutContext() {
446446
return this;
447447
}
448448

449-
public Timeout startWaitQueueTimeout(final StartTime checkoutStart) {
449+
public Timeout startMaxWaitTimeout(final StartTime checkoutStart) {
450450
if (hasTimeoutMS()) {
451451
return assertNotNull(timeout);
452452
}

driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,12 @@
112112
import static com.mongodb.internal.logging.LogMessage.Entry.Name.MAX_CONNECTING;
113113
import static com.mongodb.internal.logging.LogMessage.Entry.Name.MAX_IDLE_TIME_MS;
114114
import static com.mongodb.internal.logging.LogMessage.Entry.Name.MAX_POOL_SIZE;
115+
import static com.mongodb.internal.logging.LogMessage.Entry.Name.MAX_WAIT_TIMEOUT_MS;
115116
import static com.mongodb.internal.logging.LogMessage.Entry.Name.MIN_POOL_SIZE;
116117
import static com.mongodb.internal.logging.LogMessage.Entry.Name.REASON_DESCRIPTION;
117118
import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_HOST;
118119
import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_PORT;
119120
import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVICE_ID;
120-
import static com.mongodb.internal.logging.LogMessage.Entry.Name.WAIT_QUEUE_TIMEOUT_MS;
121121
import static com.mongodb.internal.logging.LogMessage.Level.DEBUG;
122122
import static java.lang.String.format;
123123
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -192,12 +192,12 @@ public int getGeneration(@NonNull final ObjectId serviceId) {
192192
@Override
193193
public InternalConnection get(final OperationContext operationContext) {
194194
StartTime checkoutStart = connectionCheckoutStarted(operationContext);
195-
Timeout waitQueueTimeout = operationContext.getTimeoutContext().startWaitQueueTimeout(checkoutStart);
195+
Timeout maxWaitTimeout = operationContext.getTimeoutContext().startMaxWaitTimeout(checkoutStart);
196196
try {
197197
stateAndGeneration.throwIfClosedOrPaused();
198-
PooledConnection connection = getPooledConnection(waitQueueTimeout, checkoutStart, operationContext);
198+
PooledConnection connection = getPooledConnection(maxWaitTimeout, checkoutStart, operationContext.getTimeoutContext());
199199
if (!connection.opened()) {
200-
connection = openConcurrencyLimiter.openOrGetAvailable(operationContext, connection, waitQueueTimeout, checkoutStart);
200+
connection = openConcurrencyLimiter.openOrGetAvailable(operationContext, connection, maxWaitTimeout, checkoutStart);
201201
}
202202
connection.checkedOutForOperation(operationContext);
203203
connectionCheckedOut(operationContext, connection, checkoutStart);
@@ -210,7 +210,7 @@ public InternalConnection get(final OperationContext operationContext) {
210210
@Override
211211
public void getAsync(final OperationContext operationContext, final SingleResultCallback<InternalConnection> callback) {
212212
StartTime checkoutStart = connectionCheckoutStarted(operationContext);
213-
Timeout maxWaitTimeout = operationContext.getTimeoutContext().startWaitQueueTimeout(checkoutStart);
213+
Timeout maxWaitTimeout = operationContext.getTimeoutContext().startMaxWaitTimeout(checkoutStart);
214214
SingleResultCallback<PooledConnection> eventSendingCallback = (connection, failure) -> {
215215
SingleResultCallback<InternalConnection> errHandlingCallback = errorHandlingCallback(callback, LOGGER);
216216
if (failure == null) {
@@ -227,13 +227,13 @@ public void getAsync(final OperationContext operationContext, final SingleResult
227227
eventSendingCallback.onResult(null, e);
228228
return;
229229
}
230-
asyncWorkManager.enqueue(new Task(maxWaitTimeout, checkoutStart, operationContext, t -> {
230+
asyncWorkManager.enqueue(new Task(maxWaitTimeout, checkoutStart, operationContext.getTimeoutContext(), t -> {
231231
if (t != null) {
232232
eventSendingCallback.onResult(null, t);
233233
} else {
234234
PooledConnection connection;
235235
try {
236-
connection = getPooledConnection(maxWaitTimeout, checkoutStart, operationContext);
236+
connection = getPooledConnection(maxWaitTimeout, checkoutStart, operationContext.getTimeoutContext());
237237
} catch (Exception e) {
238238
eventSendingCallback.onResult(null, e);
239239
return;
@@ -332,24 +332,24 @@ public int getGeneration() {
332332
return stateAndGeneration.generation();
333333
}
334334

335-
private PooledConnection getPooledConnection(final Timeout waitQueueTimeout,
335+
private PooledConnection getPooledConnection(final Timeout maxWaitTimeout,
336336
final StartTime startTime,
337-
final OperationContext operationContext) throws MongoTimeoutException {
337+
final TimeoutContext timeoutContext) throws MongoTimeoutException {
338338
try {
339-
UsageTrackingInternalConnection internalConnection = waitQueueTimeout.call(NANOSECONDS,
339+
UsageTrackingInternalConnection internalConnection = maxWaitTimeout.call(NANOSECONDS,
340340
() -> pool.get(-1L, NANOSECONDS),
341341
(ns) -> pool.get(ns, NANOSECONDS),
342342
() -> pool.get(0L, NANOSECONDS));
343343
while (shouldPrune(internalConnection)) {
344344
pool.release(internalConnection, true);
345-
internalConnection = waitQueueTimeout.call(NANOSECONDS,
345+
internalConnection = maxWaitTimeout.call(NANOSECONDS,
346346
() -> pool.get(-1L, NANOSECONDS),
347347
(ns) -> pool.get(ns, NANOSECONDS),
348348
() -> pool.get(0L, NANOSECONDS));
349349
}
350350
return new PooledConnection(internalConnection);
351351
} catch (MongoTimeoutException e) {
352-
throw createTimeoutException(startTime, operationContext.getTimeoutContext());
352+
throw createTimeoutException(startTime, timeoutContext);
353353
}
354354
}
355355

@@ -367,10 +367,11 @@ private MongoTimeoutException createTimeoutException(final StartTime startTime,
367367
long elapsedMs = startTime.elapsed().toMillis();
368368
int numPinnedToCursor = pinnedStatsManager.getNumPinnedToCursor();
369369
int numPinnedToTransaction = pinnedStatsManager.getNumPinnedToTransaction();
370+
String errorMessage;
371+
370372
if (numPinnedToCursor == 0 && numPinnedToTransaction == 0) {
371-
String errorMessage = format("Timed out after %d ms while waiting for a connection to server %s.",
373+
errorMessage = format("Timed out after %d ms while waiting for a connection to server %s.",
372374
elapsedMs, serverId.getAddress());
373-
return timeoutContext.hasTimeoutMS() ? createMongoTimeoutException(errorMessage) : new MongoTimeoutException(errorMessage);
374375
} else {
375376
int maxSize = pool.getMaxSize();
376377
int numInUse = pool.getInUseCount();
@@ -399,14 +400,15 @@ private MongoTimeoutException createTimeoutException(final StartTime startTime,
399400
int numOtherInUse = numInUse - numPinnedToCursor - numPinnedToTransaction;
400401
assertTrue(numOtherInUse >= 0);
401402
assertTrue(numPinnedToCursor + numPinnedToTransaction + numOtherInUse <= maxSize);
402-
String errorMessage = format("Timed out after %d ms while waiting for a connection to server %s. Details: "
403+
errorMessage = format("Timed out after %d ms while waiting for a connection to server %s. Details: "
403404
+ "maxPoolSize: %s, connections in use by cursors: %d, connections in use by transactions: %d, "
404405
+ "connections in use by other operations: %d",
405406
elapsedMs, serverId.getAddress(),
406407
sizeToString(maxSize), numPinnedToCursor, numPinnedToTransaction,
407408
numOtherInUse);
408-
return timeoutContext.hasTimeoutMS() ? createMongoTimeoutException(errorMessage) : new MongoTimeoutException(errorMessage);
409409
}
410+
411+
return timeoutContext.hasTimeoutMS() ? createMongoTimeoutException(errorMessage) : new MongoTimeoutException(errorMessage);
410412
}
411413

412414
@VisibleForTesting(otherwise = PRIVATE)
@@ -503,7 +505,7 @@ private void connectionPoolCreated(final ConnectionPoolListener connectionPoolLi
503505
entries.add(new LogMessage.Entry(MIN_POOL_SIZE, settings.getMinSize()));
504506
entries.add(new LogMessage.Entry(MAX_POOL_SIZE, settings.getMaxSize()));
505507
entries.add(new LogMessage.Entry(MAX_CONNECTING, settings.getMaxConnecting()));
506-
entries.add(new LogMessage.Entry(WAIT_QUEUE_TIMEOUT_MS, settings.getMaxWaitTime(MILLISECONDS)));
508+
entries.add(new LogMessage.Entry(MAX_WAIT_TIMEOUT_MS, settings.getMaxWaitTime(MILLISECONDS)));
507509

508510
logMessage("Connection pool created", clusterId, message, entries);
509511
}
@@ -909,11 +911,11 @@ private final class OpenConcurrencyLimiter {
909911
}
910912

911913
PooledConnection openOrGetAvailable(final OperationContext operationContext, final PooledConnection connection,
912-
final Timeout waitQueueTimeout, final StartTime startTime)
914+
final Timeout maxWaitTimeout, final StartTime startTime)
913915
throws MongoTimeoutException {
914916
PooledConnection result = openWithConcurrencyLimit(
915917
operationContext, connection, OpenWithConcurrencyLimitMode.TRY_GET_AVAILABLE,
916-
waitQueueTimeout, startTime);
918+
maxWaitTimeout, startTime);
917919
return assertNotNull(result);
918920
}
919921

@@ -956,7 +958,7 @@ void openImmediatelyAndTryHandOverOrRelease(final OperationContext operationCont
956958
* </ol>
957959
*
958960
* @param operationContext the operation context
959-
* @param waitQueueTimeout Applies only to the first phase.
961+
* @param maxWaitTimeout Applies only to the first phase.
960962
* @return An {@linkplain PooledConnection#opened() opened} connection which is either the specified
961963
* {@code connection}, or potentially a different one if {@code mode} is
962964
* {@link OpenWithConcurrencyLimitMode#TRY_GET_AVAILABLE}, or {@code null} if {@code mode} is
@@ -966,12 +968,13 @@ void openImmediatelyAndTryHandOverOrRelease(final OperationContext operationCont
966968
@Nullable
967969
private PooledConnection openWithConcurrencyLimit(final OperationContext operationContext,
968970
final PooledConnection connection, final OpenWithConcurrencyLimitMode mode,
969-
final Timeout waitQueueTimeout, final StartTime startTime)
971+
final Timeout maxWaitTimeout, final StartTime startTime)
970972
throws MongoTimeoutException {
971973
PooledConnection availableConnection;
972974
try {//phase one
973975
availableConnection = acquirePermitOrGetAvailableOpenedConnection(
974-
mode == OpenWithConcurrencyLimitMode.TRY_GET_AVAILABLE, waitQueueTimeout, startTime, operationContext);
976+
mode == OpenWithConcurrencyLimitMode.TRY_GET_AVAILABLE, maxWaitTimeout, startTime,
977+
operationContext.getTimeoutContext());
975978
} catch (Exception e) {
976979
connection.closeSilently();
977980
throw e;
@@ -1013,7 +1016,8 @@ void openWithConcurrencyLimitAsync(
10131016
final SingleResultCallback<PooledConnection> callback) {
10141017
PooledConnection availableConnection;
10151018
try {//phase one
1016-
availableConnection = acquirePermitOrGetAvailableOpenedConnection(true, maxWaitTimeout, startTime, operationContext);
1019+
availableConnection =
1020+
acquirePermitOrGetAvailableOpenedConnection(true, maxWaitTimeout, startTime, operationContext.getTimeoutContext());
10171021
} catch (Exception e) {
10181022
connection.closeSilently();
10191023
callback.onResult(null, e);
@@ -1044,8 +1048,8 @@ void openWithConcurrencyLimitAsync(
10441048
*/
10451049
@Nullable
10461050
private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boolean tryGetAvailable,
1047-
final Timeout waitQueueTimeout, final StartTime startTime,
1048-
final OperationContext operationContext)
1051+
final Timeout maxWaitTimeout, final StartTime startTime,
1052+
final TimeoutContext timeoutContext)
10491053
throws MongoTimeoutException, MongoInterruptedException {
10501054
PooledConnection availableConnection = null;
10511055
boolean expressedDesireToGetAvailableConnection = false;
@@ -1073,10 +1077,10 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
10731077
& !stateAndGeneration.throwIfClosedOrPaused()
10741078
& (availableConnection = tryGetAvailable ? tryGetAvailableConnection() : null) == null) {
10751079

1076-
Timeout.onExistsAndExpired(waitQueueTimeout, () -> {
1077-
throw createTimeoutException(startTime, operationContext.getTimeoutContext());
1080+
Timeout.onExistsAndExpired(maxWaitTimeout, () -> {
1081+
throw createTimeoutException(startTime, timeoutContext);
10781082
});
1079-
waitQueueTimeout.awaitOn(permitAvailableOrHandedOverOrClosedOrPausedCondition,
1083+
maxWaitTimeout.awaitOn(permitAvailableOrHandedOverOrClosedOrPausedCondition,
10801084
() -> "acquiring permit or getting available opened connection");
10811085
}
10821086
if (availableConnection == null) {
@@ -1396,15 +1400,15 @@ final class Task {
13961400
private final Timeout timeout;
13971401
private final StartTime startTime;
13981402
private final Consumer<RuntimeException> action;
1399-
private final OperationContext operationContext;
1403+
private final TimeoutContext timeoutContext;
14001404
private boolean completed;
14011405

14021406
Task(final Timeout timeout,
14031407
final StartTime startTime,
1404-
final OperationContext operationContext,
1408+
final TimeoutContext timeoutContext,
14051409
final Consumer<RuntimeException> action) {
14061410
this.timeout = timeout;
1407-
this.operationContext = operationContext;
1411+
this.timeoutContext = timeoutContext;
14081412
this.startTime = startTime;
14091413
this.action = action;
14101414
}
@@ -1418,7 +1422,7 @@ void failAsClosed() {
14181422
}
14191423

14201424
void failAsTimedOut() {
1421-
doComplete(() -> createTimeoutException(startTime, operationContext.getTimeoutContext()));
1425+
doComplete(() -> createTimeoutException(startTime, timeoutContext));
14221426
}
14231427

14241428
private void doComplete(final Supplier<RuntimeException> failureSupplier) {

driver-core/src/main/com/mongodb/internal/logging/LogMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public enum Name {
122122
MIN_POOL_SIZE("minPoolSize"),
123123
MAX_POOL_SIZE("maxPoolSize"),
124124
MAX_CONNECTING("maxConnecting"),
125-
WAIT_QUEUE_TIMEOUT_MS("waitQueueTimeoutMS"),
125+
MAX_WAIT_TIMEOUT_MS("waitQueueTimeoutMS"),
126126
SELECTOR("selector"),
127127
TOPOLOGY_DESCRIPTION("topologyDescription"),
128128
REMAINING_TIME_MS("remainingTimeMS"),

driver-core/src/main/com/mongodb/internal/time/StartTime.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
* A point in time used to track how much time has elapsed. In contrast to a
2323
* Timeout, it is guaranteed to not be in the future, and is never infinite.
2424
*
25+
* Implementations of this interface must be immutable.
26+
*
2527
* @see TimePoint
2628
*/
2729
public interface StartTime {

driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public void shouldThrowOnTimeout() throws InterruptedException {
127127

128128
// when
129129
TimeoutTrackingConnectionGetter connectionGetter = new TimeoutTrackingConnectionGetter(provider, timeoutSettings);
130-
new Thread(connectionGetter).start();
130+
cachedExecutor.submit(connectionGetter);
131131

132132
connectionGetter.getLatch().await();
133133

@@ -136,7 +136,7 @@ public void shouldThrowOnTimeout() throws InterruptedException {
136136
}
137137

138138
@Test
139-
public void shouldNotUseMaxAwaitTimeMSOnWhenTimeoutMsIsSet() throws InterruptedException {
139+
public void shouldNotUseMaxAwaitTimeMSWhenTimeoutMsIsSet() throws InterruptedException {
140140
// given
141141
provider = new DefaultConnectionPool(SERVER_ID, connectionFactory,
142142
ConnectionPoolSettings.builder()
@@ -152,7 +152,7 @@ public void shouldNotUseMaxAwaitTimeMSOnWhenTimeoutMsIsSet() throws InterruptedE
152152

153153
// when
154154
TimeoutTrackingConnectionGetter connectionGetter = new TimeoutTrackingConnectionGetter(provider, timeoutSettings);
155-
new Thread(connectionGetter).start();
155+
cachedExecutor.submit(connectionGetter);
156156

157157
sleep(70); // wait for more than maxWaitTimeMS but less than timeoutMs.
158158
internalConnection.close();

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,7 @@ public void setUp() {
522522

523523
@Override
524524
@AfterEach
525-
public void tearDown() {
525+
public void tearDown() throws InterruptedException {
526526
super.tearDown();
527527
SyncMongoClient.disableSleep();
528528
}

0 commit comments

Comments
 (0)