4040import java .util .Arrays ;
4141import java .util .List ;
4242import java .util .concurrent .TimeUnit ;
43+ import java .util .concurrent .TimeoutException ;
4344import java .util .concurrent .locks .Condition ;
4445import java .util .concurrent .locks .Lock ;
4546import java .util .concurrent .locks .ReentrantLock ;
@@ -63,6 +64,7 @@ public class TestCommandListener implements CommandListener {
6364 private final TestListener listener ;
6465 private final Lock lock = new ReentrantLock ();
6566 private final Condition commandCompletedCondition = lock .newCondition ();
67+ private final Condition commandAnyEventCondition = lock .newCondition ();
6668 private final boolean observeSensitiveCommands ;
6769 private boolean ignoreNextSucceededOrFailedEvent ;
6870 private static final CodecRegistry CODEC_REGISTRY_HACK ;
@@ -223,22 +225,12 @@ private <T extends CommandEvent> List<T> getEvents(final Class<T> type,
223225 }
224226 }
225227
226- public List <CommandStartedEvent > waitForStartedEvents (final int numEvents ) {
227- lock .lock ();
228- try {
229- while (!hasCompletedEvents (numEvents )) {
230- try {
231- if (!commandCompletedCondition .await (TIMEOUT , TimeUnit .SECONDS )) {
232- throw new MongoTimeoutException ("Timeout waiting for event" );
233- }
234- } catch (InterruptedException e ) {
235- throw interruptAndCreateMongoInterruptedException ("Interrupted waiting for event" , e );
236- }
237- }
238- return getEvents (CommandStartedEvent .class , numEvents );
239- } finally {
240- lock .unlock ();
241- }
228+ private <T extends CommandEvent > long getEventCount (final Class <T > eventClass , final Predicate <T > matcher ) {
229+ return getEvents ().stream ()
230+ .filter (eventClass ::isInstance )
231+ .map (eventClass ::cast )
232+ .filter (matcher )
233+ .count ();
242234 }
243235
244236 public void waitForFirstCommandCompletion () {
@@ -287,6 +279,7 @@ else if (!observeSensitiveCommands) {
287279 addEvent (new CommandStartedEvent (event .getRequestContext (), event .getOperationId (), event .getRequestId (),
288280 event .getConnectionDescription (), event .getDatabaseName (), event .getCommandName (),
289281 event .getCommand () == null ? null : getWritableClone (event .getCommand ())));
282+ commandAnyEventCondition .signal ();
290283 } finally {
291284 lock .unlock ();
292285 }
@@ -312,6 +305,7 @@ else if (!observeSensitiveCommands) {
312305 event .getResponse () == null ? null : event .getResponse ().clone (),
313306 event .getElapsedTime (TimeUnit .NANOSECONDS )));
314307 commandCompletedCondition .signal ();
308+ commandAnyEventCondition .signal ();
315309 } finally {
316310 lock .unlock ();
317311 }
@@ -334,6 +328,7 @@ else if (!observeSensitiveCommands) {
334328 try {
335329 addEvent (event );
336330 commandCompletedCondition .signal ();
331+ commandAnyEventCondition .signal ();
337332 } finally {
338333 lock .unlock ();
339334 }
@@ -428,4 +423,22 @@ private void assertEquivalence(final CommandStartedEvent actual, final CommandSt
428423 assertEquals (expected .getDatabaseName (), actual .getDatabaseName ());
429424 assertEquals (expected .getCommand (), actual .getCommand ());
430425 }
426+
427+ public <T extends CommandEvent > void waitForEvents (final Class <T > eventClass , final Predicate <T > matcher , final int count )
428+ throws TimeoutException {
429+ lock .lock ();
430+ try {
431+ while (getEventCount (eventClass , matcher ) < count ) {
432+ try {
433+ if (!commandAnyEventCondition .await (TIMEOUT , TimeUnit .SECONDS )) {
434+ throw new MongoTimeoutException ("Timeout waiting for command event" );
435+ }
436+ } catch (InterruptedException e ) {
437+ throw interruptAndCreateMongoInterruptedException ("Interrupted waiting for event" , e );
438+ }
439+ }
440+ } finally {
441+ lock .unlock ();
442+ }
443+ }
431444}
0 commit comments