diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSFlowObservabilityEvent.avsc b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSFlowObservabilityEvent.avsc index 36a755bfbd7..6ebd7c49f45 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSFlowObservabilityEvent.avsc +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSFlowObservabilityEvent.avsc @@ -50,13 +50,15 @@ "type": "enum", "name": "FlowStatus", "symbols": [ + "COMPILED", + "RUNNING", "SUCCEEDED", "COMPILATION_FAILURE", "SUBMISSION_FAILURE", "EXECUTION_FAILURE", "CANCELLED" ], - "doc": "Final flow status for the GaaS flow", + "doc": "Flow status for the GaaS flow", "compliance": "NONE" } }, diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSJobObservabilityEvent.avsc b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSJobObservabilityEvent.avsc index 1f3bf0a096b..9fe1cc087ad 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSJobObservabilityEvent.avsc +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSJobObservabilityEvent.avsc @@ -62,13 +62,15 @@ "type": "enum", "name": "JobStatus", "symbols": [ + "COMPILED", + "RUNNING", "SUCCEEDED", "COMPILATION_FAILURE", "SUBMISSION_FAILURE", "EXECUTION_FAILURE", "CANCELLED" ], - "doc": "Final job status for this job in the GaaS flow", + "doc": "Job status for this job in the GaaS flow", "compliance": "NONE" } }, diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java index 06fb1f9dbc5..9cc652cf988 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java @@ -627,6 +627,7 @@ public void testJobMonitorCreatesGaaSObservabilityEvent() throws IOException, Re //Submit GobblinTrackingEvents to Kafka ImmutableList.of( createFlowCompiledEvent(), + createJobStartEvent(), createWorkUnitTimingEvent(), createJobSucceededEvent() ).forEach(event -> { @@ -651,19 +652,31 @@ public void testJobMonitorCreatesGaaSObservabilityEvent() throws IOException, Re State state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", "NA"); Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPILED.name()); + state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName); + Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.RUNNING.name()); + getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName); state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName); Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPLETE.name()); // Only the COMPLETE event should create a GaaSJobObservabilityEvent List emittedEvents = mockEventProducer.getTestEmittedJobEvents(); + Assert.assertEquals(emittedEvents.size(), 3); Iterator iterator = emittedEvents.iterator(); - GaaSJobObservabilityEvent event1 = iterator.next(); - Assert.assertEquals(event1.getJobStatus(), JobStatus.SUCCEEDED); - Assert.assertEquals(event1.getFlowName(), this.flowName); - Assert.assertEquals(event1.getFlowGroup(), this.flowGroup); - Assert.assertEquals(event1.getJobPlanningStartTimestamp(), Long.valueOf(2)); - Assert.assertEquals(event1.getJobPlanningEndTimestamp(), Long.valueOf(3)); + GaaSJobObservabilityEvent event = iterator.next(); + Assert.assertEquals(event.getJobStatus(), JobStatus.COMPILED); + Assert.assertEquals(event.getFlowName(), this.flowName); + Assert.assertEquals(event.getFlowGroup(), this.flowGroup); + event = iterator.next(); + Assert.assertEquals(event.getJobStatus(), JobStatus.RUNNING); + Assert.assertEquals(event.getFlowName(), this.flowName); + Assert.assertEquals(event.getFlowGroup(), this.flowGroup); + event = iterator.next(); + Assert.assertEquals(event.getJobStatus(), JobStatus.SUCCEEDED); + Assert.assertEquals(event.getFlowName(), this.flowName); + Assert.assertEquals(event.getFlowGroup(), this.flowGroup); + Assert.assertEquals(event.getJobPlanningStartTimestamp(), Long.valueOf(2)); + Assert.assertEquals(event.getJobPlanningEndTimestamp(), Long.valueOf(3)); jobStatusMonitor.shutDown(); } @@ -706,12 +719,16 @@ public void testObservabilityEventSingleEmission() throws IOException, Reflectiv // Only the COMPLETE event should create a GaaSJobObservabilityEvent List emittedEvents = mockEventProducer.getTestEmittedJobEvents(); - Assert.assertEquals(emittedEvents.size(), 1); + Assert.assertEquals(emittedEvents.size(), 2); Iterator iterator = emittedEvents.iterator(); - GaaSJobObservabilityEvent event1 = iterator.next(); - Assert.assertEquals(event1.getJobStatus(), JobStatus.CANCELLED); - Assert.assertEquals(event1.getFlowName(), this.flowName); - Assert.assertEquals(event1.getFlowGroup(), this.flowGroup); + GaaSJobObservabilityEvent event = iterator.next(); + Assert.assertEquals(event.getJobStatus(), JobStatus.COMPILED); + Assert.assertEquals(event.getFlowName(), this.flowName); + Assert.assertEquals(event.getFlowGroup(), this.flowGroup); + event = iterator.next(); + Assert.assertEquals(event.getJobStatus(), JobStatus.CANCELLED); + Assert.assertEquals(event.getFlowName(), this.flowName); + Assert.assertEquals(event.getFlowGroup(), this.flowGroup); jobStatusMonitor.shutDown(); } @@ -766,9 +783,13 @@ public void testObservabilityEventFlowLevel() throws IOException, ReflectiveOper // Only the COMPLETE event should create a GaaSFlowObservabilityEvent List emittedFlowEvents = mockEventProducer.getTestEmittedFlowEvents(); - Assert.assertEquals(emittedFlowEvents.size(), 1); + Assert.assertEquals(emittedFlowEvents.size(), 2); Iterator flowIterator = emittedFlowEvents.iterator(); GaaSFlowObservabilityEvent flowEvent = flowIterator.next(); + Assert.assertEquals(flowEvent.getFlowStatus(), FlowStatus.COMPILED); + Assert.assertEquals(flowEvent.getFlowName(), this.flowName); + Assert.assertEquals(flowEvent.getFlowGroup(), this.flowGroup); + flowEvent = flowIterator.next(); Assert.assertEquals(flowEvent.getFlowStatus(), FlowStatus.SUCCEEDED); Assert.assertEquals(flowEvent.getFlowName(), this.flowName); Assert.assertEquals(flowEvent.getFlowGroup(), this.flowGroup); @@ -826,9 +847,13 @@ public void testObservabilityEventFlowFailed() throws IOException, ReflectiveOpe // Only the COMPLETE event should create a GaaSFlowObservabilityEvent List emittedFlowEvents = mockEventProducer.getTestEmittedFlowEvents(); - Assert.assertEquals(emittedFlowEvents.size(), 1); + Assert.assertEquals(emittedFlowEvents.size(), 2); Iterator flowIterator = emittedFlowEvents.iterator(); GaaSFlowObservabilityEvent flowEvent = flowIterator.next(); + Assert.assertEquals(flowEvent.getFlowStatus(), FlowStatus.COMPILED); + Assert.assertEquals(flowEvent.getFlowName(), this.flowName); + Assert.assertEquals(flowEvent.getFlowGroup(), this.flowGroup); + flowEvent = flowIterator.next(); Assert.assertEquals(flowEvent.getFlowStatus(), FlowStatus.EXECUTION_FAILURE); Assert.assertEquals(flowEvent.getFlowName(), this.flowName); Assert.assertEquals(flowEvent.getFlowGroup(), this.flowGroup); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java index 860f3ea5874..a40601644f6 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java @@ -244,6 +244,11 @@ private GaaSJobObservabilityEvent createGaaSObservabilityEvent(final State jobSt private static JobStatus convertExecutionStatusTojobState(State state, ExecutionStatus executionStatus) { switch (executionStatus) { + // Add more mapping for new events rather than just terminal ones + case COMPILED: + return JobStatus.COMPILED; + case RUNNING: + return JobStatus.RUNNING; case FAILED: // TODO: Separate failure cases to SUBMISSION FAILURE and COMPILATION FAILURE, investigate events to populate these fields if (state.contains(TimingEvent.JOB_END_TIME)) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java index f4a18d4e2f3..f77f2fad2aa 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java @@ -22,6 +22,7 @@ import java.time.Duration; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -233,6 +234,12 @@ protected void processMessage(DecodeableKafkaRecord message) { // as much as FAILED does if we chose to emit ObservabilityEvent for FAILED_PENDING_RETRY boolean retryRequired = modifyStateIfRetryRequired(jobStatus); + if (Objects.equals(status, "COMPILED")) { + this.eventProducer.emitObservabilityEvent(jobStatus); + } + if (updatedJobStatus.getRight() == NewState.RUNNING) { + this.eventProducer.emitObservabilityEvent(jobStatus); + } if (updatedJobStatus.getRight() == NewState.FINISHED && !retryRequired) { // do not send event if retry is required, because it can alert users to re-submit a job that is already set to be retried by GaaS this.eventProducer.emitObservabilityEvent(jobStatus);