-
Notifications
You must be signed in to change notification settings - Fork 74
Closed
Description
try {
final String query1 = "fn:count(fn:doc())";
final AtomicInteger successCount = new AtomicInteger(0);
final MutableBoolean failState = new MutableBoolean(false);
final AtomicInteger failCount = new AtomicInteger(0);
WriteBatcher ihb2 = dmManager.newWriteBatcher();
ihb2.withBatchSize(20);
ihb2.withThreadCount(20);
ihb2.setBatchFailureListeners(new HostAvailabilityListener(dmManager)
.withSuspendTimeForHostUnavailable(Duration.ofSeconds(15)).withMinHosts(2));
ihb2.onBatchSuccess(batch -> {
successCount.addAndGet(batch.getItems().length);
System.out.println("Success Host: " + batch.getClient().getHost());
System.out.println("Success batch number: " + batch.getJobBatchNumber());
System.out.println("Success Job writes so far: " + batch.getJobWritesSoFar());
}).onBatchFailure((batch, throwable) -> {
System.out.println("Failed batch number: " + batch.getJobBatchNumber());
/*
* try{ System.out.println("Retrying batch: "+
* batch.getJobBatchNumber()); ihb2.retry(batch); } catch(Exception e){
* System.out.println("Retry of batch "+ batch.getJobBatchNumber()+
* " failed"); e.printStackTrace(); }
*/
throwable.printStackTrace();
failState.setTrue();
failCount.addAndGet(batch.getItems().length);
});
dmManager.startJob(ihb2);
for (int j = 0; j < 50000; j++) {
String uri = "/local/ABC-" + j;
ihb2.add(uri, stringHandle);
}
ihb2.flushAndWait();
System.out.println("Fail : " + failCount.intValue());
System.out.println("Success : " + successCount.intValue());
System.out.println("Count : " + dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue());
Assert.assertTrue(dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue() == 50000);
} catch (Exception e) {
e.printStackTrace();
}This is run on a 3 node cluster. Bring 2 nodes down simultaneously and since the minHosts is set to 2, the batcher fails but the process keeps trying on and on in processFailure. Need to add a check if the batcher is stopped and if it is stopped, we need to return false in processException of HostAvailabilityListener.