-
Couldn't load subscription status.
- Fork 74
Closed
Labels
Milestone
Description
The setup consists of a 3 node cluster. When query is being executed and results are getting returned , node 3 is stopped and forest fails over to node 1. The number of uri's returned is not same as expected. It is higher than what is present in the db as there are duplicates. The logs are attached .'serverStartStop' method takes 2 arguments: server name , command (start/ stop) and based on command starts/ stops the specified server.
TEST-com.marklogic.client.datamovement.functionaltests.QBFailover.txt
@Test
public void testStopOneNode() throws Exception{
WriteBatcher ihb2 = dmManager.newWriteBatcher();
ihb2.withBatchSize(27).withThreadCount(10);
ihb2.onBatchSuccess(
batch -> { }
)
.onBatchFailure(
(batch, throwable) -> {
throwable.printStackTrace();
});
dmManager.startJob(ihb2);
for (int j =0 ;j < 3000; j++){
String uri ="/local/string-"+ j;
ihb2.add(uri, meta2, stringHandle);
}
ihb2.flushAndWait();
Assert.assertTrue(dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue()==3000);
AtomicInteger success = new AtomicInteger(0);
AtomicInteger failure = new AtomicInteger(0);
AtomicBoolean isRunning = new AtomicBoolean(true);
QueryBatcher batcher = dmManager.newQueryBatcher(new StructuredQueryBuilder().collection("XmlTransform"))
.withBatchSize(4).withThreadCount(3);
QueryFailureListener[] qfl = batcher.getQueryFailureListeners();
List<QueryFailureListener> failureListeners = Arrays.asList(qfl);
failureListeners = new ArrayList<QueryFailureListener>(failureListeners);
failureListeners.add( new HostAvailabilityListener(dmManager)
.withSuspendTimeForHostUnavailable(Duration.ofSeconds(15))
.withMinHosts(2));
batcher.setQueryFailureListeners(failureListeners.toArray(new QueryFailureListener[failureListeners.size()]));
batcher.onUrisReady((batch)->{
success.addAndGet(batch.getItems().length);
if (dmManager.getJobReport(ticket).getSuccessEventsCount() > 1000 && isRunning.get()){
isRunning.set(false);
try {
serverStartStop(hostNames[hostNames.length -1], "stop");
Thread.currentThread().sleep(40000L);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println("batch: " + batch.getJobBatchNumber() +
", items: " + batch.getItems().length +
", count: " + success.get() +
", results so far: " + batch.getJobResultsSoFar() +
", host: " + batch.getForest().getPreferredHost() +
", forest: " + batch.getForest().getForestName() +
", forest batch: " + batch.getForestBatchNumber() +
", forest results so far: " + batch.getForestResultsSoFar());
for (String s: batch.getItems()){
System.out.println("Uri : "+ s);
}
});
batcher.onQueryFailure(queryException->
{
System.out.println("batch: " + queryException.getJobBatchNumber() +
", results so far: " + queryException.getJobResultsSoFar() +
", host: " + queryException.getForest().getPreferredHost() +
", forest: " + queryException.getForest().getForestName() +
", forest batch: " + queryException.getForestBatchNumber() +
", forest results so far: " + queryException.getForestResultsSoFar());
queryException.printStackTrace();
}
);
ticket = dmManager.startJob( batcher );
batcher.awaitCompletion();
dmManager.stopJob(ticket);
System.out.println("Success "+ success.intValue());
System.out.println("Failure "+failure.intValue());
assertEquals("document count", 3000,success.intValue());
assertEquals("document count", 0,failure.intValue());
}