-
Notifications
You must be signed in to change notification settings - Fork 74
Closed
Description
A. The following test is performed on 3 node cluster with a db associated with 3 forests.
20:52:22.790 [main] INFO c.m.c.d.impl.WriteBatcherImpl - (withForestConfig) Using [rh7v-intel64-90-test-5.marklogic.com, rh7v-intel64-90-test-6.marklogic.com, rh7v-intel64-90-test-4.marklogic.com] hosts with forests for "ApplyTransform"
20:52:22.790 [main] DEBUG c.m.client.impl.JerseyServices - Connecting to rh7v-intel64-90-test-5.marklogic.com at 8000 as admin
20:52:23.461 [main] INFO c.m.c.d.impl.WriteBatcherImpl - Adding DatabaseClient on port 8000 for host "rh7v-intel64-90-test-5.marklogic.com" to the rotation
20:52:23.461 [main] DEBUG c.m.client.impl.JerseyServices - Connecting to rh7v-intel64-90-test-6.marklogic.com at 8000 as admin
20:52:23.742 [main] INFO c.m.c.d.impl.WriteBatcherImpl - Adding DatabaseClient on port 8000 for host "rh7v-intel64-90-test-6.marklogic.com" to the rotation
20:52:23.742 [main] INFO c.m.c.d.impl.WriteBatcherImpl - Adding DatabaseClient on port 8000 for host "rh7v-intel64-90-test-4.marklogic.com" to the rotation
B. When query batcher was being executing, rh7v-intel64-90-test-6.marklogic.com node was stopped and started 3 times (Start and stop time difference was greater than 30 seconds during the 3rd time causing a forest failover). It can be seen that some documents (7) weren't transformed and the number of docs transformed was 1993 and not 2000 as expected.
Test:
@Test
public void xQueryMasstransformReplace() throws Exception{
WriteBatcher ihb2 = dmManager.newWriteBatcher();
ihb2.withBatchSize(27).withThreadCount(10);
ihb2.onBatchSuccess(
batch -> {
}
)
.onBatchFailure(
(batch, throwable) -> {
throwable.printStackTrace();
});
dmManager.startJob(ihb2);
for (int j =0 ;j < 2000; j++){
String uri ="/local/string-"+ j;
ihb2.add(uri, meta2, stringHandle);
}
ihb2.flushAndWait();
ServerTransform transform = new ServerTransform("add-attr-xquery-transform");
transform.put("name", "Lang");
transform.put("value", "English");
AtomicInteger skipped = new AtomicInteger(0);
AtomicInteger success = new AtomicInteger(0);
AtomicInteger failure = new AtomicInteger(0);
ApplyTransformListener listener = new ApplyTransformListener()
.withTransform(transform)
.withApplyResult(ApplyResult.REPLACE)
.onSuccess(batch -> {
success.addAndGet(batch.getItems().length);
}).
onBatchFailure((batch, throwable) -> {
failure.addAndGet(batch.getItems().length);
throwable.printStackTrace();
}).onSkipped(batch -> {
skipped.addAndGet(batch.getItems().length);
});
QueryBatcher batcher = dmManager.newQueryBatcher(new StructuredQueryBuilder().collection("XmlTransform"))
.onUrisReady(listener).withBatchSize(7);
batcher.setQueryFailureListeners(
new HostAvailabilityListener(dmManager)
.withSuspendTimeForHostUnavailable(Duration.ofSeconds(15))
.withMinHosts(2)
);
JobTicket ticket = dmManager.startJob( batcher );
batcher.awaitCompletion();
dmManager.stopJob(ticket);
System.out.println("Success "+ success.intValue());
System.out.println("Failure "+failure.intValue());
String uris[] = new String[2000];
for(int i =0;i<2000;i++){
uris[i] = "/local/string-"+ i;
}
int count=0;
DocumentPage page = dbClient.newDocumentManager().read(uris);
DOMHandle dh = new DOMHandle();
while(page.hasNext()){
DocumentRecord rec = page.next();
rec.getContent(dh);
assertTrue("Element has attribure ? :",dh.get().getElementsByTagName("foo").item(0).hasAttributes());
assertEquals("Attribute value should be English","English",dh.get().getElementsByTagName("foo").item(0).getAttributes().item(0).getNodeValue());
count++;
}
assertEquals("document count", 2000,count);
assertEquals("document count", 2000,success.intValue());
assertEquals("document count", 0,skipped.intValue());
}