diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java index 220d95a46c3..bb89a7b00ae 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java @@ -279,6 +279,7 @@ private void runJobExecutionLauncher() throws JobException { try { if (planningJobIdFromStore.isPresent() && !canRun(planningJobIdFromStore.get(), planningJobHelixManager)) { + // todo it should emit SKIPPED_JOB event that sets the job status SKIPPED rather than CANCELLED TimingEvent timer = new TimingEvent(eventSubmitter, TimingEvent.JOB_SKIPPED_TIME); HashMap metadata = new HashMap<>(Tag.toMap(Tag.tagValuesToString( HelixUtils.initBaseEventTags(jobProps, Lists.newArrayList())))); diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java index 7a0d4cf68b1..e941c947df8 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java @@ -43,6 +43,7 @@ public static class LauncherTimings { public static final String JOB_PENDING_RESUME = "JobPendingResume"; public static final String JOB_ORCHESTRATED = "JobOrchestrated"; public static final String JOB_PREPARE = "JobPrepareTimer"; + public static final String JOB_SKIPPED = "JobSkipped"; public static final String JOB_START = "JobStartTimer"; public static final String JOB_RUN = "JobRunTimer"; public static final String JOB_COMMIT = "JobCommitTimer"; @@ -76,6 +77,7 @@ public static class FlowTimings { public static final String FLOW_RUN_DEADLINE_EXCEEDED = "FlowRunDeadlineExceeded"; public static final String FLOW_START_DEADLINE_EXCEEDED = "FlowStartDeadlineExceeded"; public static final String FLOW_PENDING_RESUME = "FlowPendingResume"; + public static final String FLOW_SKIPPED = "FlowSkipped"; } public static class FlowEventConstants { diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java index 06fb1f9dbc5..720ba270dec 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java @@ -308,7 +308,7 @@ public void testProcessMessageForSkippedFlow() throws IOException, ReflectiveOpe ImmutableList.of( createFlowCompiledEvent(), createJobOrchestratedEvent(1, 2), - createJobSkippedEvent() + createJobSkippedTimeEvent() ).forEach(event -> { context.submitEvent(event); kafkaReporter.report(); @@ -836,6 +836,40 @@ public void testObservabilityEventFlowFailed() throws IOException, ReflectiveOpe jobStatusMonitor.shutDown(); } + @Test + public void testProcessMessageForSkippedEvent() throws IOException, ReflectiveOperationException { + DagManagementStateStore dagManagementStateStore = mock(DagManagementStateStore.class); + KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic8"); + + //Submit GobblinTrackingEvents to Kafka + ImmutableList.of( + createJobSkippedEvent() + ).forEach(event -> { + context.submitEvent(event); + kafkaReporter.report(); + }); + + try { + Thread.sleep(1000); + } catch(InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), + ConfigFactory.empty(), new NoopGaaSJobObservabilityEventProducer(), dagManagementStateStore); + jobStatusMonitor.buildMetricsContextAndMetrics(); + Iterator> recordIterator = Iterators.transform( + this.kafkaTestHelper.getIteratorForTopic(TOPIC), + this::convertMessageAndMetadataToDecodableKafkaRecord); + + State state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName); + Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.SKIPPED.name()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).addJobDagAction( + any(), any(), anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE)); + + jobStatusMonitor.shutDown(); + } + private State getNextJobStatusState(MockKafkaAvroJobStatusMonitor jobStatusMonitor, Iterator> recordIterator, String jobGroup, String jobName) throws IOException { jobStatusMonitor.processMessage(recordIterator.next()); @@ -871,11 +905,15 @@ private GobblinTrackingEvent createJobOrchestratedEvent(int currentAttempt, int return createGTE(TimingEvent.LauncherTimings.JOB_ORCHESTRATED, metadata); } + private GobblinTrackingEvent createJobSkippedEvent() { + return createGTE(TimingEvent.LauncherTimings.JOB_SKIPPED, Maps.newHashMap()); + } + private GobblinTrackingEvent createJobStartEvent() { return createGTE(TimingEvent.LauncherTimings.JOB_START, Maps.newHashMap()); } - private GobblinTrackingEvent createJobSkippedEvent() { + private GobblinTrackingEvent createJobSkippedTimeEvent() { return createGTE(TimingEvent.JOB_SKIPPED_TIME, Maps.newHashMap()); } diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdl b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdl index ed9a59f889d..09aa5f29314 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdl +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdl @@ -49,4 +49,9 @@ enum ExecutionStatus { * Flow cancelled. */ CANCELLED + + /** + * Flow or job is skipped + */ + SKIPPED } \ No newline at end of file diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json index b2cdddc5e3a..83689212a13 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json @@ -13,7 +13,7 @@ "name" : "ExecutionStatus", "namespace" : "org.apache.gobblin.service", "doc" : "Execution status for a flow or job", - "symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ], + "symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED", "SKIPPED" ], "symbolDocs" : { "CANCELLED" : "Flow cancelled.", "COMPILED" : "Flow compiled to jobs.", @@ -23,7 +23,8 @@ "PENDING" : "Flow or job is in pending state.", "PENDING_RESUME" : "Flow or job is currently resuming.", "PENDING_RETRY" : "Flow or job is pending retry.", - "RUNNING" : "Flow or job is currently executing" + "RUNNING" : "Flow or job is currently executing.", + "SKIPPED" : "Flow or job is skipped." } }, { "type" : "record", diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json index e6a52bc8355..302b6915578 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json @@ -13,7 +13,7 @@ "name" : "ExecutionStatus", "namespace" : "org.apache.gobblin.service", "doc" : "Execution status for a flow or job", - "symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ], + "symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED", "SKIPPED" ], "symbolDocs" : { "CANCELLED" : "Flow cancelled.", "COMPILED" : "Flow compiled to jobs.", @@ -23,7 +23,8 @@ "PENDING" : "Flow or job is in pending state.", "PENDING_RESUME" : "Flow or job is currently resuming.", "PENDING_RETRY" : "Flow or job is pending retry.", - "RUNNING" : "Flow or job is currently executing" + "RUNNING" : "Flow or job is currently executing.", + "SKIPPED" : "Flow or job is skipped." } }, { "type" : "record", diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java index dea106c2944..f044d7dc959 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java @@ -41,7 +41,7 @@ @Slf4j public class FlowStatusGenerator { public static final List FINISHED_STATUSES = Lists.newArrayList(ExecutionStatus.FAILED.name(), - ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name()); + ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name(), ExecutionStatus.SKIPPED.name()); public static final int MAX_LOOKBACK = 100; private final JobStatusRetriever jobStatusRetriever; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java index d50887b365d..d936f0129b4 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java @@ -99,7 +99,6 @@ public interface DagManagementStateStore { * {@link DagManagementStateStore#addDag}. This call is just an additional identifier which may be used * for DagNode level operations. In the future, it may be merged with checkpointDag. * @param dagNode dag node to be added - * @param dagId dag id of the dag this dag node belongs to */ void updateDagNode(Dag.DagNode dagNode) throws IOException; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java index 7f9e3920f10..d6e6f801403 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java @@ -158,7 +158,7 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat log.info("Submitted job {} for dagId {}", DagUtils.getJobName(dagNode), dagId); } - public static void cancelDagNode(Dag.DagNode dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws IOException { + public static void cancelDagNode(Dag.DagNode dagNodeToCancel) throws IOException { Properties cancelJobArgs = new Properties(); String serializedFuture = null; @@ -183,12 +183,34 @@ public static void cancelDagNode(Dag.DagNode dagNodeToCancel, } } - public static void cancelDag(Dag dag, DagManagementStateStore dagManagementStateStore) throws IOException { + /** + * Emits JOB_SKIPPED GTE for each of the dependent jobs. + */ + public static void sendSkippedEventForDependentJobs(Dag dag, Dag.DagNode node) { + Set> dependentJobs = new HashSet<>(); + findDependentJobs(dag, node, dependentJobs); + for (Dag.DagNode dependentJob : dependentJobs) { + Map jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), dependentJob.getValue()); + DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_SKIPPED).stop(jobMetadata); + } + } + + private static void findDependentJobs(Dag dag, + Dag.DagNode node, Set> result) { + for (Dag.DagNode child : dag.getChildren(node)) { + if (!result.contains(child)) { + result.add(child); + findDependentJobs(dag, child, result); + } + } + } + + public static void cancelDag(Dag dag) throws IOException { List> dagNodesToCancel = dag.getNodes(); log.info("Found {} DagNodes to cancel (DagId {}).", dagNodesToCancel.size(), DagUtils.generateDagId(dag)); for (Dag.DagNode dagNodeToCancel : dagNodesToCancel) { - DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore); + DagProcUtils.cancelDagNode(dagNodeToCancel); } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java index 0d4a21058ac..348a3ae4599 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java @@ -57,7 +57,7 @@ protected void enforceDeadline(DagManagementStateStore dagManagementStateStore, log.info("Found {} DagNodes to cancel (DagId {}).", dagNodesToCancel.size(), getDagId()); for (Dag.DagNode dagNodeToCancel : dagNodesToCancel) { - DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore); + DagProcUtils.cancelDagNode(dagNodeToCancel); } dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java index 065e43c2d6b..4696e8c55ec 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java @@ -78,7 +78,7 @@ protected void enforceDeadline(DagManagementStateStore dagManagementStateStore, log.info("Job exceeded the job start deadline. Killing it now. Job - {}, jobOrchestratedTime - {}, timeOutForJobStart - {}", DagUtils.getJobName(dagNode), jobOrchestratedTime, timeOutForJobStart); dagManagementStateStore.getDagManagerMetrics().incrementCountsStartSlaExceeded(dagNode); - DagProcUtils.cancelDagNode(dagNode, dagManagementStateStore); + DagProcUtils.cancelDagNode(dagNode); dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED); dag.setMessage("Flow killed because no update received for " + timeOutForJobStart + " ms after orchestration"); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java index abb15ae3a54..dde9988e190 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java @@ -64,18 +64,18 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional> dagNodeToCancel = dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId).getLeft(); if (dagNodeToCancel.isPresent()) { - DagProcUtils.cancelDagNode(dagNodeToCancel.get(), dagManagementStateStore); + DagProcUtils.cancelDagNode(dagNodeToCancel.get()); } else { dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false); log.error("Did not find Dag node with id {}, it might be already cancelled/finished and thus cleaned up from the store.", getDagNodeId()); } } else { - DagProcUtils.cancelDag(dag.get(), dagManagementStateStore); + DagProcUtils.cancelDag(dag.get()); } dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java index 21fd7d588a6..2953c75adb6 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java @@ -80,11 +80,11 @@ protected Optional> initialize(DagManagementStateStore dag protected void act(DagManagementStateStore dagManagementStateStore, Optional> dag, DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException { if (!dag.isPresent()) { - log.warn("Dag with id " + getDagId() + " could not be compiled."); + log.warn("Dag with id " + getDagId() + " could not be compiled or cannot run concurrently."); dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false); } else { DagProcUtils.submitNextNodes(dagManagementStateStore, dag.get(), getDagId()); - DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag.get(), TimingEvent.FlowTimings.FLOW_RUNNING); + DagProcUtils.setAndEmitFlowEvent(DagProc.eventSubmitter, dag.get(), TimingEvent.FlowTimings.FLOW_RUNNING); dagManagementStateStore.getDagManagerMetrics().conditionallyMarkFlowAsState(DagUtils.getFlowId(dag.get()), Dag.FlowState.RUNNING); DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore, getDagTask().getDagAction()); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java index f78263233a9..c91f7723c9a 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java @@ -60,8 +60,11 @@ protected Pair>, Optional> ini protected void act(DagManagementStateStore dagManagementStateStore, Pair>, Optional> dagNodeWithJobStatus, DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException { if (!dagNodeWithJobStatus.getLeft().isPresent()) { - // one of the reason this could arise is when the MALA leasing doesn't work cleanly and another DagProc::process - // has cleaned up the Dag, yet did not complete the lease before this current one acquired its own + // One of the reason this could arise is when the MALA leasing doesn't work cleanly and another DagProc::process + // has cleaned up the Dag, yet did not complete the lease before this current one acquired its own. + // Another reason could be that LaunchDagProc was unable to compile the FlowSpec or the flow cannot run concurrently. + // In these cases FLOW_FAILED and FLOW_SKIPPED events are emitted respectively, which are terminal status and + // create a ReevaluateDagProc. But in these cases Dag was never created or never saved. log.error("DagNode or its job status not found for a Reevaluate DagAction with dag node id {}", this.dagNodeId); dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false); return; @@ -99,6 +102,8 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair node : failedDag.get().getNodes()) { ExecutionStatus executionStatus = node.getValue().getExecutionStatus(); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java index 473ff3cf1a7..6e67a755f76 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java @@ -171,10 +171,10 @@ public Optional> validateAndHandleConcurrentExecution(Conf quotaManager.releaseQuota(dagNode); } } - // Send FLOW_FAILED event - flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because another instance is running and concurrent " + // Send FLOW_SKIPPED event + flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow is skipped because another instance is running and concurrent " + "executions are disabled. Set flow.allowConcurrentExecution to true in the flowSpec to change this behaviour."); - new TimingEvent(eventSubmitter, TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata); + new TimingEvent(eventSubmitter, TimingEvent.FlowTimings.FLOW_SKIPPED).stop(flowMetadata); return Optional.absent(); } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java index 74a864da37c..fc64459016d 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java @@ -141,6 +141,10 @@ public org.apache.gobblin.configuration.State parseJobStatus(GobblinTrackingEven case TimingEvent.LauncherTimings.JOB_PENDING: properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.PENDING.name()); break; + case TimingEvent.FlowTimings.FLOW_SKIPPED: + case TimingEvent.LauncherTimings.JOB_SKIPPED: + properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.SKIPPED.name()); + break; case TimingEvent.FlowTimings.FLOW_PENDING_RESUME: case TimingEvent.LauncherTimings.JOB_PENDING_RESUME: properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.PENDING_RESUME.name()); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java index f4a18d4e2f3..3091b4cc5e9 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java @@ -116,7 +116,7 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer ORDERED_EXECUTION_STATUSES = ImmutableList .of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, ExecutionStatus.PENDING_RESUME, ExecutionStatus.PENDING_RETRY, - ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.COMPLETE, + ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.SKIPPED, ExecutionStatus.COMPLETE, ExecutionStatus.FAILED, ExecutionStatus.CANCELLED); private final JobIssueEventHandler jobIssueEventHandler; diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java index 9fc0e2c059d..36b9ba17cc9 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java @@ -21,7 +21,6 @@ import java.net.URISyntaxException; import java.util.List; import java.util.Optional; -import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -186,7 +185,8 @@ public void killDagNode() throws IOException, URISyntaxException, InterruptedExc message("Test message").eventName(ExecutionStatus.COMPLETE.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build(); doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any()); - doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)), Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any()); + // third node (job2) will be queried for by the KillDagProc because we are killing that node + doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(2)), Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any()); doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any()); LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new DagActionStore.DagAction("fg", "flow2", @@ -215,8 +215,6 @@ public void killDagNode() throws IOException, URISyntaxException, InterruptedExc .getInvocations() .stream() .filter(a -> a.getMethod().getName().equals("cancelJob")) - .filter(a -> ((Properties) a.getArgument(1)) - .getProperty(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE).equals(MockedSpecExecutor.dummySerializedFuture)) .count()) .sum(); // kill dag proc tries to cancel only the exact dag node that was provided diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java index 0fd098fd3a9..5fc4cb6cec6 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java @@ -23,8 +23,11 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.ImmutablePair; +import org.junit.runner.RunWith; import org.mockito.MockedStatic; import org.mockito.Mockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -37,41 +40,47 @@ import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase; import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory; +import org.apache.gobblin.metrics.RootMetricContext; +import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.runtime.api.Spec; import org.apache.gobblin.runtime.api.SpecProducer; import org.apache.gobblin.service.ExecutionStatus; -import org.apache.gobblin.service.modules.core.GobblinServiceManager; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.orchestration.DagActionStore; import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.DagManagerTest; +import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine; import org.apache.gobblin.service.modules.orchestration.DagTestUtils; import org.apache.gobblin.service.modules.orchestration.DagUtils; -import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine; import org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest; import org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics; import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; import org.apache.gobblin.service.monitoring.JobStatus; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.spy; +import static org.powermock.reflect.Whitebox.setInternalState; +@RunWith(PowerMockRunner.class) +@PrepareForTest(EventSubmitter.class) public class ReevaluateDagProcTest { private final long flowExecutionId = System.currentTimeMillis(); private final String flowGroup = "fg"; private ITestMetastoreDatabase testMetastoreDatabase; private DagManagementStateStore dagManagementStateStore; - private MockedStatic mockedGobblinServiceManager; private DagProcessingEngineMetrics mockedDagProcEngineMetrics; + private MockedStatic dagProc; + private EventSubmitter mockedEventSubmitter; @BeforeClass public void setUpClass() throws Exception { this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get(); - this.mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class); + this.dagProc = mockStatic(DagProc.class); } @BeforeMethod @@ -79,13 +88,15 @@ public void setUp() throws Exception { this.dagManagementStateStore = spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase)); LaunchDagProcTest.mockDMSSCommonBehavior(dagManagementStateStore); this.mockedDagProcEngineMetrics = Mockito.mock(DagProcessingEngineMetrics.class); + this.mockedEventSubmitter = spy(new EventSubmitter.Builder(RootMetricContext.get(), "org.apache.gobblin.service").build()); + setInternalState(DagProc.class, "eventSubmitter", this.mockedEventSubmitter); } @AfterClass(alwaysRun = true) public void tearDownClass() throws Exception { - this.mockedGobblinServiceManager.close(); // `.close()` to avoid (in the aggregate, across multiple suites) - java.sql.SQLNonTransientConnectionException: Too many connections this.testMetastoreDatabase.close(); + this.dagProc.close(); } @Test @@ -308,6 +319,224 @@ public void testRetryCurrentFailedJob() throws Exception { eq(DagActionStore.DagActionType.REEVALUATE)); } + @Test + public void testCancelledJob() throws Exception { + String flowName = "fn5"; + Dag dag = DagTestUtils.buildDag("1", flowExecutionId, DagProcessingEngine.FailureOption.FINISH_ALL_POSSIBLE.name(), + 3, "user5", ConfigFactory.empty() + .withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup)) + .withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef(flowName)) + .withValue(ConfigurationKeys.JOB_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup)) + .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef( + MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))); + dagManagementStateStore.addDag(dag); + + List> specProducers = getDagSpecProducers(dag); + JobStatus jobStatus1 = JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup) + .jobName("job0").flowExecutionId(flowExecutionId).message("Test message").eventName(ExecutionStatus.CANCELLED.name()) + .startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build(); + JobStatus jobStatus2 = JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup) + .jobName("job1").flowExecutionId(flowExecutionId).message("Test message").eventName(ExecutionStatus.SKIPPED.name()) + .startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build(); + JobStatus jobStatus3 = JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup) + .jobName("job2").flowExecutionId(flowExecutionId).message("Test message").eventName(ExecutionStatus.SKIPPED.name()) + .startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build(); + + doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(0)), Optional.of(jobStatus1))) + .when(dagManagementStateStore).getDagNodeWithJobStatus(any()); + + ReevaluateDagProc reEvaluateDagProc1 = new ReevaluateDagProc(new ReevaluateDagTask(new DagActionStore.DagAction( + flowGroup, flowName, flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE), null, + dagManagementStateStore, mockedDagProcEngineMetrics), ConfigFactory.empty()); + reEvaluateDagProc1.process(dagManagementStateStore, mockedDagProcEngineMetrics); + + // job cancelled, so no more jobs to launch + specProducers.forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any())); + + // dag is considered finish because the remaining jobs depend on this cancelled job + // it should have been marked `failed` + Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDag(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).markDagFailed(any()); + + // and ENFORCE_FLOW_FINISH_DEADLINE dag action should have got deleted + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction( + argThat(dagAction -> dagAction.getDagActionType() == DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE)); + + // total of three events should be emitted. two JOB_SKIPPED events, once for each of the child job and one flow_cancelled event + Mockito.verify(this.mockedEventSubmitter, Mockito.times(2)) + .submit(eq(TimingEvent.LauncherTimings.JOB_SKIPPED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(1)) + .submit(eq(TimingEvent.FlowTimings.FLOW_CANCELLED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(3)).submit(any(), anyMap()); + + // this finished dag should not create any new dag actions + Mockito.verify(dagManagementStateStore, Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(), any()); + + doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(1)), Optional.of(jobStatus2))) + .when(dagManagementStateStore).getDagNodeWithJobStatus(any()); + ReevaluateDagProc reEvaluateDagProc2 = new ReevaluateDagProc(new ReevaluateDagTask(new DagActionStore.DagAction( + flowGroup, flowName, flowExecutionId, "job1", DagActionStore.DagActionType.REEVALUATE), null, + dagManagementStateStore, mockedDagProcEngineMetrics), ConfigFactory.empty()); + reEvaluateDagProc2.process(dagManagementStateStore, mockedDagProcEngineMetrics); + + // reEvaluateDagProc2 should exit without doing anything because the dag is already deleted + specProducers.forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any())); + Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDag(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).markDagFailed(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction( + argThat(dagAction -> dagAction.getDagActionType() == DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE)); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(2)) + .submit(eq(TimingEvent.LauncherTimings.JOB_SKIPPED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(1)) + .submit(eq(TimingEvent.FlowTimings.FLOW_CANCELLED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(3)).submit(any(), anyMap()); + Mockito.verify(dagManagementStateStore, Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(), any()); + + doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(2)), Optional.of(jobStatus3))) + .when(dagManagementStateStore).getDagNodeWithJobStatus(any()); + ReevaluateDagProc reEvaluateDagProc3 = new ReevaluateDagProc(new ReevaluateDagTask(new DagActionStore.DagAction( + flowGroup, flowName, flowExecutionId, "job2", DagActionStore.DagActionType.REEVALUATE), null, + dagManagementStateStore, mockedDagProcEngineMetrics), ConfigFactory.empty()); + reEvaluateDagProc3.process(dagManagementStateStore, mockedDagProcEngineMetrics); + + // reEvaluateDagProc3 should exit without doing anything because the dag is already deleted + specProducers.forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any())); + Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDag(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).markDagFailed(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction( + argThat(dagAction -> dagAction.getDagActionType() == DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE)); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(2)) + .submit(eq(TimingEvent.LauncherTimings.JOB_SKIPPED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(1)) + .submit(eq(TimingEvent.FlowTimings.FLOW_CANCELLED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(3)).submit(any(), anyMap()); + Mockito.verify(dagManagementStateStore, Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(), any()); + } + + @Test + public void testFailedJob() throws Exception { + String flowName = "fn6"; + Dag dag = DagManagerTest.buildDag("1", flowExecutionId, DagProcessingEngine.FailureOption.FINISH_ALL_POSSIBLE.name(), + 4, "user5", ConfigFactory.empty() + .withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup)) + .withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef(flowName)) + .withValue(ConfigurationKeys.JOB_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup)) + .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef( + MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))); + dagManagementStateStore.addDag(dag); + + List> specProducers = getDagSpecProducers(dag); + JobStatus jobStatus = JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup) + .jobName("job0").flowExecutionId(flowExecutionId).message("Test message").eventName(ExecutionStatus.FAILED.name()) + .startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build(); + JobStatus jobStatus2 = JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup) + .jobName("job1").flowExecutionId(flowExecutionId).message("Test message").eventName(ExecutionStatus.SKIPPED.name()) + .startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build(); + JobStatus jobStatus3 = JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup) + .jobName("job2").flowExecutionId(flowExecutionId).message("Test message").eventName(ExecutionStatus.SKIPPED.name()) + .startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build(); + JobStatus jobStatus4 = JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup) + .jobName("job3").flowExecutionId(flowExecutionId).message("Test message").eventName(ExecutionStatus.SKIPPED.name()) + .startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build(); + + doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(0)), Optional.of(jobStatus))) + .when(dagManagementStateStore).getDagNodeWithJobStatus(any()); + + ReevaluateDagProc reEvaluateDagProc1 = new ReevaluateDagProc(new ReevaluateDagTask(new DagActionStore.DagAction( + flowGroup, flowName, flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE), null, + dagManagementStateStore, mockedDagProcEngineMetrics), ConfigFactory.empty()); + reEvaluateDagProc1.process(dagManagementStateStore, mockedDagProcEngineMetrics); + + // job cancelled, so no more jobs to launch + specProducers.forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any())); + + // dag should be considered finish because the remaining jobs depend on this cancelled job + // it should have been marked `failed` + Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDag(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).markDagFailed(any()); + + // and ENFORCE_FLOW_FINISH_DEADLINE dag action should have got deleted + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction( + argThat(dagAction -> dagAction.getDagActionType() == DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE)); + + // total of four events should be emitted. three JOB_SKIPPED events, once for each of the descendent jobs and one flow_failed event + Mockito.verify(this.mockedEventSubmitter, Mockito.times(3)) + .submit(eq(TimingEvent.LauncherTimings.JOB_SKIPPED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(1)) + .submit(eq(TimingEvent.FlowTimings.FLOW_FAILED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(4)).submit(any(), anyMap()); + + // this finished dag should not create any new dag actions + Mockito.verify(dagManagementStateStore, Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(), any()); + + doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(1)), Optional.of(jobStatus2))) + .when(dagManagementStateStore).getDagNodeWithJobStatus(any()); + ReevaluateDagProc reEvaluateDagProc2 = new ReevaluateDagProc(new ReevaluateDagTask(new DagActionStore.DagAction( + flowGroup, flowName, flowExecutionId, "job1", DagActionStore.DagActionType.REEVALUATE), null, + dagManagementStateStore, mockedDagProcEngineMetrics), ConfigFactory.empty()); + reEvaluateDagProc2.process(dagManagementStateStore, mockedDagProcEngineMetrics); + + // reEvaluateDagProc2 should exit without doing anything because the dag is already deleted + specProducers.forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any())); + Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDag(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).markDagFailed(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction( + argThat(dagAction -> dagAction.getDagActionType() == DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE)); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(3)) + .submit(eq(TimingEvent.LauncherTimings.JOB_SKIPPED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(1)) + .submit(eq(TimingEvent.FlowTimings.FLOW_FAILED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(4)).submit(any(), anyMap()); + Mockito.verify(dagManagementStateStore, Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(), any()); + + doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(2)), Optional.of(jobStatus3))) + .when(dagManagementStateStore).getDagNodeWithJobStatus(any()); + ReevaluateDagProc reEvaluateDagProc3 = new ReevaluateDagProc(new ReevaluateDagTask(new DagActionStore.DagAction( + flowGroup, flowName, flowExecutionId, "job2", DagActionStore.DagActionType.REEVALUATE), null, + dagManagementStateStore, mockedDagProcEngineMetrics), ConfigFactory.empty()); + reEvaluateDagProc3.process(dagManagementStateStore, mockedDagProcEngineMetrics); + + // reEvaluateDagProc3 should exit without doing anything because the dag is already deleted + specProducers.forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any())); + Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDag(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).markDagFailed(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction( + argThat(dagAction -> dagAction.getDagActionType() == DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE)); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(3)) + .submit(eq(TimingEvent.LauncherTimings.JOB_SKIPPED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(1)) + .submit(eq(TimingEvent.FlowTimings.FLOW_FAILED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(4)).submit(any(), anyMap()); + Mockito.verify(dagManagementStateStore, Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(), any()); + + doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(3)), Optional.of(jobStatus4))) + .when(dagManagementStateStore).getDagNodeWithJobStatus(any()); + ReevaluateDagProc reEvaluateDagProc4 = new ReevaluateDagProc(new ReevaluateDagTask(new DagActionStore.DagAction( + flowGroup, flowName, flowExecutionId, "job2", DagActionStore.DagActionType.REEVALUATE), null, + dagManagementStateStore, mockedDagProcEngineMetrics), ConfigFactory.empty()); + reEvaluateDagProc4.process(dagManagementStateStore, mockedDagProcEngineMetrics); + + // reEvaluateDagProc4 should exit without doing anything because the dag is already deleted + specProducers.forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any())); + Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDag(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).markDagFailed(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction(any()); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).deleteDagAction( + argThat(dagAction -> dagAction.getDagActionType() == DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE)); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(3)) + .submit(eq(TimingEvent.LauncherTimings.JOB_SKIPPED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(1)) + .submit(eq(TimingEvent.FlowTimings.FLOW_FAILED), anyMap()); + Mockito.verify(this.mockedEventSubmitter, Mockito.times(4)).submit(any(), anyMap()); + Mockito.verify(dagManagementStateStore, Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(), any()); + } + public static List> getDagSpecProducers(Dag dag) { return dag.getNodes().stream().map(n -> { try {