Skip to content

Write batcher jobs goes into an infinite loop  #798

@vivekmuniyandi

Description

@vivekmuniyandi
 try {
      final String query1 = "fn:count(fn:doc())";

      final AtomicInteger successCount = new AtomicInteger(0);

      final MutableBoolean failState = new MutableBoolean(false);
      final AtomicInteger failCount = new AtomicInteger(0);

      WriteBatcher ihb2 = dmManager.newWriteBatcher();
      ihb2.withBatchSize(20);
      ihb2.withThreadCount(20);

      ihb2.setBatchFailureListeners(new HostAvailabilityListener(dmManager)
          .withSuspendTimeForHostUnavailable(Duration.ofSeconds(15)).withMinHosts(2));
      ihb2.onBatchSuccess(batch -> {

        successCount.addAndGet(batch.getItems().length);
        System.out.println("Success Host: " + batch.getClient().getHost());
        System.out.println("Success batch number: " + batch.getJobBatchNumber());
        System.out.println("Success Job writes so far: " + batch.getJobWritesSoFar());
      }).onBatchFailure((batch, throwable) -> {
        System.out.println("Failed batch number: " + batch.getJobBatchNumber());
        /*
         * try{ System.out.println("Retrying batch: "+
         * batch.getJobBatchNumber()); ihb2.retry(batch); } catch(Exception e){
         * System.out.println("Retry of batch "+ batch.getJobBatchNumber()+
         * " failed"); e.printStackTrace(); }
         */

        throwable.printStackTrace();
        failState.setTrue();
        failCount.addAndGet(batch.getItems().length);
      });

      dmManager.startJob(ihb2);

      for (int j = 0; j < 50000; j++) {
        String uri = "/local/ABC-" + j;
        ihb2.add(uri, stringHandle);
      }

      ihb2.flushAndWait();

      System.out.println("Fail : " + failCount.intValue());
      System.out.println("Success : " + successCount.intValue());
      System.out.println("Count : " + dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue());

      Assert.assertTrue(dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue() == 50000);

    } catch (Exception e) {
      e.printStackTrace();
    }

This is run on a 3 node cluster. Bring 2 nodes down simultaneously and since the minHosts is set to 2, the batcher fails but the process keeps trying on and on in processFailure. Need to add a check if the batcher is stopped and if it is stopped, we need to return false in processException of HostAvailabilityListener.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions