From 81757ce671c0ecf9670cc34928b2e80a332b6508 Mon Sep 17 00:00:00 2001 From: arjun4084346 Date: Wed, 10 Jan 2024 14:06:34 -0800 Subject: [PATCH 1/2] remove from the fields that are actually mandatory --- .../gobblin/instrumented/Instrumented.java | 12 ++- .../gobblin/metrics/event/EventSubmitter.java | 2 +- .../modules/orchestration/DagManager.java | 94 +++++++------------ .../orchestration/DagManagerUtils.java | 7 +- .../modules/orchestration/Orchestrator.java | 81 +++++----------- .../scheduler/GobblinServiceJobScheduler.java | 2 +- .../FlowCompilationValidationHelper.java | 26 ++--- .../orchestration/DagManagerFlowTest.java | 6 +- .../modules/orchestration/DagManagerTest.java | 2 +- .../orchestration/OrchestratorTest.java | 4 +- 10 files changed, 87 insertions(+), 149 deletions(-) diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java index f2aa1030382..33f4b56a8ca 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java @@ -25,8 +25,6 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; - import org.apache.commons.lang3.StringUtils; import com.codahale.metrics.Meter; @@ -41,6 +39,8 @@ import com.google.common.collect.Lists; import com.google.common.io.Closer; +import javax.annotation.Nonnull; + import org.apache.gobblin.Constructs; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; @@ -247,6 +247,10 @@ public Timer apply(@Nonnull Timer input) { }); } + public static void updateTimer(Timer timer, final long duration, final TimeUnit unit) { + updateTimer(Optional.of(timer), duration, unit); + } + /** * Marks a meter only if it is defined. * @param meter an Optional<{@link com.codahale.metrics.Meter}> @@ -255,6 +259,10 @@ public static void markMeter(Optional meter) { markMeter(meter, 1); } + public static void markMeter(Meter meter) { + markMeter(Optional.of(meter), 1); + } + /** * Marks a meter only if it is defined. * @param meter an Optional<{@link com.codahale.metrics.Meter}> diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java index 891f980b7f9..4805425608f 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java @@ -179,7 +179,7 @@ public void submit(String name, Map additionalMetadata) { } // Timestamp is set by metric context. - this.metricContext.get().submitEvent(new GobblinTrackingEvent(0l, this.namespace, name, finalMetadata)); + this.metricContext.get().submitEvent(new GobblinTrackingEvent(0L, this.namespace, name, finalMetadata)); } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index 3b422c55bf6..1b7b2de43d4 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -91,7 +91,7 @@ /** * This class implements a manager to manage the life cycle of a {@link Dag}. A {@link Dag} is submitted to the - * {@link DagManager} by the {@link Orchestrator#orchestrate(Spec)} method. On receiving a {@link Dag}, the + * {@link DagManager} by the {@link Orchestrator#orchestrate} method. On receiving a {@link Dag}, the * {@link DagManager} first persists the {@link Dag} to the {@link DagStateStore}, and then submits it to the specific * {@link DagManagerThread}'s {@link BlockingQueue} based on the flowExecutionId of the Flow. * This guarantees that each {@link Dag} received by the {@link DagManager} can be recovered in case of a leadership @@ -194,10 +194,8 @@ DagActionStore.DagAction toDagAction(DagActionStore.FlowActionType actionType) { DagManagerThread[] dagManagerThreads; private final ScheduledExecutorService scheduledExecutorPool; - private final boolean instrumentationEnabled; private DagStateStore dagStateStore; private Map topologySpecMap; - private int houseKeepingThreadInitialDelay = INITIAL_HOUSEKEEPING_THREAD_DELAY; @Getter private ScheduledExecutorService houseKeepingThreadPool; @@ -215,7 +213,7 @@ DagActionStore.DagAction toDagAction(DagActionStore.FlowActionType actionType) { private final FlowCatalog flowCatalog; private final FlowCompilationValidationHelper flowCompilationValidationHelper; private final Config config; - private final Optional eventSubmitter; + private final EventSubmitter eventSubmitter; private final long failedDagRetentionTime; private final DagManagerMetrics dagManagerMetrics; @@ -226,9 +224,10 @@ DagActionStore.DagAction toDagAction(DagActionStore.FlowActionType actionType) { private volatile boolean isActive = false; + @Inject public DagManager(Config config, JobStatusRetriever jobStatusRetriever, SharedFlowMetricsSingleton sharedFlowMetricsSingleton, FlowStatusGenerator flowStatusGenerator, - FlowCatalog flowCatalog, boolean instrumentationEnabled) { + FlowCatalog flowCatalog) { this.config = config; this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, DEFAULT_NUM_THREADS); this.runQueue = (BlockingQueue>[]) initializeDagQueue(this.numThreads); @@ -237,14 +236,8 @@ public DagManager(Config config, JobStatusRetriever jobStatusRetriever, this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads); this.pollingInterval = ConfigUtils.getInt(config, JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL); this.retentionPollingInterval = ConfigUtils.getInt(config, FAILED_DAG_POLLING_INTERVAL, DEFAULT_FAILED_DAG_POLLING_INTERVAL); - this.instrumentationEnabled = instrumentationEnabled; - MetricContext metricContext = null; - if (instrumentationEnabled) { - metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass()); - this.eventSubmitter = Optional.of(new EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build()); - } else { - this.eventSubmitter = Optional.absent(); - } + MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass()); + this.eventSubmitter = new EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build(); this.dagManagerMetrics = new DagManagerMetrics(); TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, JOB_START_SLA_UNITS, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT)); this.defaultJobStartSlaTimeMillis = jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME)); @@ -284,13 +277,6 @@ private static LinkedBlockingDeque[] initializeDagQueue(int numThreads) { return queue; } - @Inject - public DagManager(Config config, JobStatusRetriever jobStatusRetriever, - SharedFlowMetricsSingleton sharedFlowMetricsSingleton, FlowStatusGenerator flowStatusGenerator, - FlowCatalog flowCatalog) { - this(config, jobStatusRetriever, sharedFlowMetricsSingleton, flowStatusGenerator, flowCatalog, true); - } - /** Do Nothing on service startup. Scheduling of {@link DagManagerThread}s and loading of any {@link Dag}s is done * during leadership change. */ @@ -340,13 +326,11 @@ public synchronized void addDag(Dag dag, boolean persist, bool } private void submitEventsAndSetStatus(Dag dag) { - if (this.eventSubmitter.isPresent()) { - for (DagNode dagNode : dag.getNodes()) { - JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(dagNode); - Map jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan); - this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata); - jobExecutionPlan.setExecutionStatus(PENDING); - } + for (DagNode dagNode : dag.getNodes()) { + JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(dagNode); + Map jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan); + new TimingEvent(eventSubmitter, TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata); + jobExecutionPlan.setExecutionStatus(PENDING); } } @@ -448,7 +432,7 @@ public synchronized void setActive(boolean active) { this.dagManagerThreads = new DagManagerThread[numThreads]; for (int i = 0; i < numThreads; i++) { DagManagerThread dagManagerThread = new DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore, dagActionStore, - runQueue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, failedDagIds, this.dagManagerMetrics, + runQueue[i], cancelQueue[i], resumeQueue[i], failedDagIds, this.dagManagerMetrics, this.defaultJobStartSlaTimeMillis, quotaManager, i); this.dagManagerThreads[i] = dagManagerThread; this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, this.pollingInterval, TimeUnit.SECONDS); @@ -457,7 +441,7 @@ public synchronized void setActive(boolean active) { this.scheduledExecutorPool.scheduleAtFixedRate(failedDagRetentionThread, 0, retentionPollingInterval, TimeUnit.MINUTES); loadDagFromDagStateStore(); this.houseKeepingThreadPool = Executors.newSingleThreadScheduledExecutor(); - for (int delay = houseKeepingThreadInitialDelay; delay < MAX_HOUSEKEEPING_THREAD_DELAY; delay *= 2) { + for (int delay = INITIAL_HOUSEKEEPING_THREAD_DELAY; delay < MAX_HOUSEKEEPING_THREAD_DELAY; delay *= 2) { this.houseKeepingThreadPool.schedule(() -> { try { loadDagFromDagStateStore(); @@ -510,8 +494,8 @@ public static class DagManagerThread implements Runnable { final Map dagToSLA = new HashMap<>(); private final MetricContext metricContext; private final Set dagIdstoClean = new HashSet<>(); - private final Optional eventSubmitter; - private final Optional jobStatusPolledTimer; + private final EventSubmitter eventSubmitter; + private final Timer jobStatusPolledTimer; private final AtomicLong orchestrationDelay = new AtomicLong(0); private final DagManagerMetrics dagManagerMetrics; private final UserQuotaManager quotaManager; @@ -523,13 +507,13 @@ public static class DagManagerThread implements Runnable { private final BlockingQueue resumeQueue; private final Long defaultJobStartSlaTimeMillis; private final Optional dagActionStore; - private final Optional dagManagerThreadHeartbeat; + private final Meter dagManagerThreadHeartbeat; /** * Constructor. */ DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore dagStateStore, DagStateStore failedDagStateStore, Optional dagActionStore, BlockingQueue> queue, BlockingQueue cancelQueue, - BlockingQueue resumeQueue, boolean instrumentationEnabled, Set failedDagIds, DagManagerMetrics dagManagerMetrics, + BlockingQueue resumeQueue, Set failedDagIds, DagManagerMetrics dagManagerMetrics, Long defaultJobStartSla, UserQuotaManager quotaManager, int dagMangerThreadId) { this.jobStatusRetriever = jobStatusRetriever; this.dagStateStore = dagStateStore; @@ -542,21 +526,13 @@ public static class DagManagerThread implements Runnable { this.defaultJobStartSlaTimeMillis = defaultJobStartSla; this.quotaManager = quotaManager; this.dagActionStore = dagActionStore; - - if (instrumentationEnabled) { - this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass()); - this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build()); - this.jobStatusPolledTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER)); - ContextAwareGauge orchestrationDelayMetric = metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, - orchestrationDelay::get); - this.metricContext.register(orchestrationDelayMetric); - this.dagManagerThreadHeartbeat = Optional.of(this.metricContext.contextAwareMeter(String.format(DAG_MANAGER_HEARTBEAT, dagMangerThreadId))); - } else { - this.metricContext = null; - this.eventSubmitter = Optional.absent(); - this.jobStatusPolledTimer = Optional.absent(); - this.dagManagerThreadHeartbeat = Optional.absent(); - } + this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass()); + this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build(); + this.jobStatusPolledTimer = this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER); + ContextAwareGauge orchestrationDelayMetric = metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, + orchestrationDelay::get); + this.metricContext.register(orchestrationDelayMetric); + this.dagManagerThreadHeartbeat = this.metricContext.contextAwareMeter(String.format(DAG_MANAGER_HEARTBEAT, dagMangerThreadId)); } /** @@ -645,7 +621,7 @@ private void beginResumingDag(DagId dagIdToResume) throws IOException { node.getValue().setCurrentAttempts(0); DagManagerUtils.incrementJobGeneration(node); Map jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue()); - this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata); + this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata); } // Set flowStartTime so that flow SLA will be based on current time instead of original flow @@ -690,7 +666,7 @@ private void finishResumingDags() throws IOException { /** * Cancels the dag and sends a cancellation tracking event. - * @param dagToCancel dag node to cancel + * @param dagId dag node to cancel * @throws ExecutionException executionException * @throws InterruptedException interruptedException */ @@ -731,11 +707,9 @@ private void cancelDagNode(DagNode dagNodeToCancel) throws Exe } private void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) { - if (this.eventSubmitter.isPresent()) { - Map jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan); - this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata); - jobExecutionPlan.setExecutionStatus(CANCELLED); - } + Map jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan); + this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata); + jobExecutionPlan.setExecutionStatus(CANCELLED); } /** @@ -1040,8 +1014,7 @@ private void submitJob(DagNode dagNode) { quotaManager.checkQuota(Collections.singleton(dagNode)); producer = DagManagerUtils.getSpecProducer(dagNode); - TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get(). - getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : null; + TimingEvent jobOrchestrationTimer = new TimingEvent(this.eventSubmitter, TimingEvent.LauncherTimings.JOB_ORCHESTRATED); // Increment job count before submitting the job onto the spec producer, in case that throws an exception. // By this point the quota is allocated, so it's imperative to increment as missing would introduce the potential to decrement below zero upon quota release. @@ -1065,14 +1038,11 @@ private void submitJob(DagNode dagNode) { jobMetadata.put(TimingEvent.METADATA_MESSAGE, producer.getExecutionLink(addSpecFuture, specExecutorUri)); // Add serialized job properties as part of the orchestrated job event metadata jobMetadata.put(JobExecutionPlan.JOB_PROPS_KEY, dagNode.getValue().toString()); - if (jobOrchestrationTimer != null) { - jobOrchestrationTimer.stop(jobMetadata); - } + jobOrchestrationTimer.stop(jobMetadata); log.info("Orchestrated job: {} on Executor: {}", DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri); this.dagManagerMetrics.incrementJobsSentToExecutor(dagNode); } catch (Exception e) { - TimingEvent jobFailedTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get(). - getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED) : null; + TimingEvent jobFailedTimer = this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED); String message = "Cannot submit job " + DagManagerUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri; log.error(message, e); jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + e.getMessage()); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java index 152dca00d65..97201a4aa9f 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java @@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.typesafe.config.Config; @@ -332,8 +331,8 @@ static String getSpecExecutorName(DagNode dagNode) { return dagNode.getValue().getSpecExecutor().getUri().toString(); } - static void emitFlowEvent(Optional eventSubmitter, Dag dag, String flowEvent) { - if (eventSubmitter.isPresent() && !dag.isEmpty()) { + static void emitFlowEvent(EventSubmitter eventSubmitter, Dag dag, String flowEvent) { + if (!dag.isEmpty()) { // Every dag node will contain the same flow metadata Config config = getDagJobConfig(dag); Map flowMetadata = TimingEventUtils.getFlowMetadata(config); @@ -345,7 +344,7 @@ static void emitFlowEvent(Optional eventSubmitter, Dag eventSubmitter; + protected final EventSubmitter eventSubmitter; private final boolean isFlowConcurrencyEnabled; @Getter - private Optional flowOrchestrationSuccessFulMeter; + private Meter flowOrchestrationSuccessFulMeter; @Getter - private Optional flowOrchestrationFailedMeter; + private Meter flowOrchestrationFailedMeter; @Getter - private Optional flowOrchestrationTimer; - private Optional flowFailedForwardToDagManagerCounter; + private Timer flowOrchestrationTimer; + private Counter flowFailedForwardToDagManagerCounter; @Setter private FlowStatusGenerator flowStatusGenerator; @@ -109,10 +109,10 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { private final ClassAliasResolver aliasResolver; + @Inject public Orchestrator(Config config, Optional topologyCatalog, Optional dagManager, - Optional log, FlowStatusGenerator flowStatusGenerator, boolean instrumentationEnabled, - Optional flowTriggerHandler, SharedFlowMetricsSingleton sharedFlowMetricsSingleton, - Optional flowCatalog) { + Optional log, FlowStatusGenerator flowStatusGenerator, Optional flowTriggerHandler, + SharedFlowMetricsSingleton sharedFlowMetricsSingleton, Optional flowCatalog) { _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class); this.topologyCatalog = topologyCatalog; @@ -139,21 +139,13 @@ public Orchestrator(Config config, Optional topologyCatalog, Op this.dagManager.get().setTopologySpecMap(getSpecCompiler().getTopologySpecMap()); } - if (instrumentationEnabled) { - this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.specCompiler.getClass()); - this.flowOrchestrationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER)); - this.flowOrchestrationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER)); - this.flowOrchestrationTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER)); - this.flowFailedForwardToDagManagerCounter = Optional.of(this.metricContext.counter(ServiceMetricNames.FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT)); - this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build()); - } else { - this.metricContext = null; - this.flowOrchestrationSuccessFulMeter = Optional.absent(); - this.flowOrchestrationFailedMeter = Optional.absent(); - this.flowOrchestrationTimer = Optional.absent(); - this.flowFailedForwardToDagManagerCounter = Optional.absent(); - this.eventSubmitter = Optional.absent(); - } + this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.specCompiler.getClass()); + this.flowOrchestrationSuccessFulMeter = this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER); + this.flowOrchestrationFailedMeter = this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER); + this.flowOrchestrationTimer = this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER); + this.flowFailedForwardToDagManagerCounter = this.metricContext.counter(ServiceMetricNames.FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT); + this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build(); + this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED); quotaManager = GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class, @@ -163,15 +155,6 @@ public Orchestrator(Config config, Optional topologyCatalog, Op quotaManager, eventSubmitter, flowStatusGenerator, isFlowConcurrencyEnabled); } - @Inject - public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator, Optional topologyCatalog, - Optional dagManager, Optional log, Optional flowTriggerHandler, - SharedFlowMetricsSingleton sharedFlowMetricsSingleton, Optional flowCatalog) { - this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true, flowTriggerHandler, - sharedFlowMetricsSingleton, flowCatalog); - } - - @VisibleForTesting public SpecCompiler getSpecCompiler() { return this.specCompiler; @@ -261,9 +244,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)); flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow orchestration skipped because no trigger timestamp " + "associated with flow action."); - if (this.eventSubmitter.isPresent()) { - new TimingEvent(this.eventSubmitter.get(), TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata); - } + new TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata); return; } @@ -273,8 +254,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil _log.info("Multi-active scheduler finished handling trigger event: [{}, is: {}, triggerEventTimestamp: {}]", flowAction, isReminderEvent ? "reminder" : "original", triggerTimestampMillis); } else { - Optional flowCompilationTimer = - this.eventSubmitter.transform(submitter -> new TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED)); + TimingEvent flowCompilationTimer = new TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED); Optional> compiledDagOptional = this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig, spec, flowGroup, flowName); @@ -284,7 +264,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil return; } Dag compiledDag = compiledDagOptional.get(); - if (compiledDag == null || compiledDag.isEmpty()) { + if (compiledDag.isEmpty()) { FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter, spec, flowMetadata); Instrumented.markMeter(this.flowOrchestrationFailedMeter); sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec, @@ -296,9 +276,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL); FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, compiledDag); - if (flowCompilationTimer.isPresent()) { - flowCompilationTimer.get().stop(flowMetadata); - } + flowCompilationTimer.stop(flowMetadata); // Depending on if DagManager is present, handle execution if (this.dagManager.isPresent()) { @@ -322,14 +300,11 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil Map jobMetadata = TimingEventUtils.getJobMetadata(flowMetadata, jobExecutionPlan); _log.info(String.format("Going to orchestrate JobSpec: %s on Executor: %s", jobSpec, producer)); - Optional jobOrchestrationTimer = this.eventSubmitter.transform( - submitter -> new TimingEvent(submitter, TimingEvent.LauncherTimings.JOB_ORCHESTRATED)); + TimingEvent jobOrchestrationTimer = new TimingEvent(this.eventSubmitter, TimingEvent.LauncherTimings.JOB_ORCHESTRATED); producer.addSpec(jobSpec); - if (jobOrchestrationTimer.isPresent()) { - jobOrchestrationTimer.get().stop(jobMetadata); - } + jobOrchestrationTimer.stop(jobMetadata); } catch (Exception e) { _log.error("Cannot successfully setup spec: " + jobExecutionPlan.getJobSpec() + " on executor: " + producer + " for flow: " + spec, e); @@ -373,15 +348,11 @@ public void submitFlowToDagManager(FlowSpec flowSpec, Dag jobE } catch (Exception ex) { String failureMessage = "Failed to add Job Execution Plan due to: " + ex.getMessage(); _log.warn("Orchestrator call - " + failureMessage, ex); - if (this.flowFailedForwardToDagManagerCounter.isPresent()) { - this.flowFailedForwardToDagManagerCounter.get().inc(); - } - if (this.eventSubmitter.isPresent()) { - // pronounce failed before stack unwinds, to ensure flow not marooned in `COMPILED` state; (failure likely attributable to DB connection/failover) - Map flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec); - flowMetadata.put(TimingEvent.METADATA_MESSAGE, failureMessage); - new TimingEvent(this.eventSubmitter.get(), TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata); - } + this.flowFailedForwardToDagManagerCounter.inc(); + // pronounce failed before stack unwinds, to ensure flow not marooned in `COMPILED` state; (failure likely attributable to DB connection/failover) + Map flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec); + flowMetadata.put(TimingEvent.METADATA_MESSAGE, failureMessage); + new TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata); throw ex; } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index 7ccc094b1f2..6d11b3bd3f1 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -213,7 +213,7 @@ public GobblinServiceJobScheduler(String serviceName, Config config, FlowStatusG SharedFlowMetricsSingleton sharedFlowMetricsSingleton) throws Exception { this(serviceName, config, helixManager, flowCatalog, topologyCatalog, - new Orchestrator(config, flowStatusGenerator, topologyCatalog, dagManager, log, flowTriggerHandler, + new Orchestrator(config, topologyCatalog, dagManager, log, flowStatusGenerator, flowTriggerHandler, sharedFlowMetricsSingleton, flowCatalog), schedulerService, quotaManager, log, isWarmStandbyEnabled, flowTriggerHandler); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java index 78b5446bf7c..674ff0024ca 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java @@ -56,7 +56,7 @@ public final class FlowCompilationValidationHelper { private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton; private final SpecCompiler specCompiler; private final UserQuotaManager quotaManager; - private final Optional eventSubmitter; + private final EventSubmitter eventSubmitter; private final FlowStatusGenerator flowStatusGenerator; private final boolean isFlowConcurrencyEnabled; @@ -66,7 +66,7 @@ public final class FlowCompilationValidationHelper { * caller. * @param flowSpec * @param optionalFlowExecutionId for scheduled (non-ad-hoc) flows, to pass the ID "laundered" via the DB; - * see: {@link MysqlMultiActiveLeaseArbiter javadoc section titled + * see: {@link org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter javadoc section titled * `Database event_timestamp laundering`} * @return jobExecutionPlan dag if one can be constructed for the given flowSpec */ @@ -79,8 +79,7 @@ public Optional> createExecutionPlanIfValid(FlowSpec flowS //Wait for the SpecCompiler to become healthy. specCompiler.awaitHealthy(); - Optional flowCompilationTimer = - this.eventSubmitter.transform(submitter -> new TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED)); + TimingEvent flowCompilationTimer = new TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED); Optional> jobExecutionPlanDagOptional = validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup, flowName); Map flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec); @@ -89,15 +88,13 @@ public Optional> createExecutionPlanIfValid(FlowSpec flowS return Optional.absent(); } - if (jobExecutionPlanDagOptional.get() == null || jobExecutionPlanDagOptional.get().isEmpty()) { + if (jobExecutionPlanDagOptional.get().isEmpty()) { populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, flowMetadata); return Optional.absent(); } addFlowExecutionIdIfAbsent(flowMetadata, optionalFlowExecutionId, jobExecutionPlanDagOptional.get()); - if (flowCompilationTimer.isPresent()) { - flowCompilationTimer.get().stop(flowMetadata); - } + flowCompilationTimer.stop(flowMetadata); return jobExecutionPlanDagOptional; } @@ -133,9 +130,7 @@ public Optional> validateAndHandleConcurrentExecution(Conf Map flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec); flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because another instance is running and concurrent " + "executions are disabled. Set flow.allowConcurrentExecution to true in the flow spec to change this behaviour."); - if (eventSubmitter.isPresent()) { - new TimingEvent(eventSubmitter.get(), TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata); - } + new TimingEvent(eventSubmitter, TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata); return Optional.absent(); } } @@ -158,7 +153,7 @@ private boolean isExecutionPermitted(FlowStatusGenerator flowStatusGenerator, St * @param spec * @param flowMetadata */ - public static void populateFlowCompilationFailedEventMessage(Optional eventSubmitter, Spec spec, + public static void populateFlowCompilationFailedEventMessage(EventSubmitter eventSubmitter, Spec spec, Map flowMetadata) { // For scheduled flows, we do not insert the flowExecutionId into the FlowSpec. As a result, if the flow // compilation fails (i.e. we are unable to find a path), the metadata will not have flowExecutionId. @@ -172,12 +167,7 @@ public static void populateFlowCompilationFailedEventMessage(Optional flowCompileFailedTimer = eventSubmitter.transform(submitter -> - new TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILE_FAILED)); - - if (flowCompileFailedTimer.isPresent()) { - flowCompileFailedTimer.get().stop(flowMetadata); - } + new TimingEvent(eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILE_FAILED).stop(flowMetadata); } /** diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java index a72f45732e0..d61d9a6de1f 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java @@ -90,7 +90,7 @@ public void setUp() throws Exception { dagActionStore = new MysqlDagActionStore(config); dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId, DagActionStore.FlowActionType.KILL); dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId_2, DagActionStore.FlowActionType.RESUME); - dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props), false); + dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props)); dagManager.dagActionStore = Optional.of(dagActionStore); dagManager.setActive(true); this.dagNumThreads = dagManager.getNumThreads(); @@ -391,9 +391,9 @@ public boolean apply(@Nullable Void input) { class MockedDagManager extends DagManager { - public MockedDagManager(Config config, boolean instrumentationEnabled) { + public MockedDagManager(Config config) { super(config, createJobStatusRetriever(), Mockito.mock(SharedFlowMetricsSingleton.class), - Mockito.mock(FlowStatusGenerator.class), Mockito.mock(FlowCatalog.class), instrumentationEnabled); + Mockito.mock(FlowStatusGenerator.class), Mockito.mock(FlowCatalog.class)); } private static JobStatusRetriever createJobStatusRetriever() { diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java index 2babd068311..d5aec8ad404 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java @@ -106,7 +106,7 @@ public void setUp() throws Exception { this._dagManagerMetrics.activate(); this._dagManagerThread = new DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, _failedDagStateStore, Optional.absent(), queue, cancelQueue, - resumeQueue, true, new HashSet<>(), this._dagManagerMetrics, START_SLA_DEFAULT, _gobblinServiceQuotaManager, 0); + resumeQueue, new HashSet<>(), this._dagManagerMetrics, START_SLA_DEFAULT, _gobblinServiceQuotaManager, 0); Field jobToDagField = DagManager.DagManagerThread.class.getDeclaredField("jobToDag"); jobToDagField.setAccessible(true); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java index 7e7daef9b4a..514e975182a 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java @@ -107,8 +107,8 @@ public void setup() throws Exception { this._mockFlowTriggerHandler = mock(FlowTriggerHandler.class); this.orchestrator = new Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties), - this.mockStatusGenerator, Optional.of(this.topologyCatalog), Optional.absent(), Optional.of(logger), - Optional.of(this._mockFlowTriggerHandler), new SharedFlowMetricsSingleton( + Optional.of(this.topologyCatalog), Optional.absent(), Optional.of(logger), this.mockStatusGenerator, + Optional.of(this._mockFlowTriggerHandler), new SharedFlowMetricsSingleton( ConfigUtils.propertiesToConfig(orchestratorProperties)), Optional.of(mock(FlowCatalog.class))); this.topologyCatalog.addListener(orchestrator); this.flowCatalog.addListener(orchestrator); From f7f7cdfabd6a075b668ad67e90ffc0a400daf59c Mon Sep 17 00:00:00 2001 From: arjun4084346 Date: Thu, 11 Jan 2024 09:51:07 -0800 Subject: [PATCH 2/2] remove optionality of dag manager and topology catalog --- .../gobblin/service/ServiceConfigKeys.java | 3 - .../gobblin/service/FlowStatusTest.java | 2 +- .../monitoring/FlowStatusGenerator.java | 2 +- .../monitoring/JobStatusRetriever.java | 37 ++--- .../monitoring/FlowStatusGeneratorTest.java | 11 +- .../core/GobblinServiceConfiguration.java | 10 -- .../core/GobblinServiceGuiceModule.java | 35 ++--- .../modules/core/GobblinServiceManager.java | 41 +++--- .../modules/orchestration/DagManager.java | 31 +++-- .../modules/orchestration/Orchestrator.java | 81 ++++------- .../scheduler/GobblinServiceJobScheduler.java | 52 +++---- .../monitoring/FsJobStatusRetriever.java | 5 +- .../monitoring/LocalFsJobStatusRetriever.java | 3 +- .../monitoring/MysqlJobStatusRetriever.java | 5 +- .../service/GobblinServiceManagerTest.java | 34 +++-- .../modules/core/GobblinServiceHATest.java | 11 +- .../core/GobblinServiceRedirectTest.java | 10 +- .../orchestration/OrchestratorTest.java | 45 +++--- .../GobblinServiceJobSchedulerTest.java | 21 ++- .../monitoring/FsJobStatusRetrieverTest.java | 19 ++- ...bStatusRetrieverTestWithoutDagManager.java | 116 ---------------- .../monitoring/JobStatusRetrieverTest.java | 2 +- .../MysqlJobStatusRetrieverTest.java | 15 +- ...bStatusRetrieverTestWithoutDagManager.java | 128 ------------------ 24 files changed, 203 insertions(+), 516 deletions(-) delete mode 100644 gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTestWithoutDagManager.java delete mode 100644 gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTestWithoutDagManager.java diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java index 21b32b58cc0..2cfb6a01762 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java @@ -29,7 +29,6 @@ public class ServiceConfigKeys { public static final String GOBBLIN_ORCHESTRATOR_LISTENER_CLASS = "org.apache.gobblin.service.modules.orchestration.Orchestrator"; // Gobblin Service Manager Keys - public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled"; public static final String GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "flowCatalog.enabled"; public static final String GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "scheduler.enabled"; public static final String GOBBLIN_SERVICE_INSTANCE_NAME = GOBBLIN_SERVICE_PREFIX + "instance.name"; @@ -37,8 +36,6 @@ public class ServiceConfigKeys { public static final String GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "restliServer.enabled"; public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled"; public static final String GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "gitConfigMonitor.enabled"; - public static final String GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "dagManager.enabled"; - public static final boolean DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED = false; public static final String GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "jobStatusMonitor.enabled"; public static final String GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "warmStandby.enabled"; public static final String GOBBLIN_SERVICE_MULTI_ACTIVE_SCHEDULER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "multiActiveScheduler.enabled"; diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java index c507b031056..4888a2c23db 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java @@ -54,7 +54,7 @@ public class FlowStatusTest { class TestJobStatusRetriever extends JobStatusRetriever { protected TestJobStatusRetriever(MultiContextIssueRepository issueRepository) { - super(ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED, issueRepository); + super(issueRepository); } @Override diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java index 8b991b9858d..0bf8b71ea33 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java @@ -118,7 +118,7 @@ public FlowStatus getFlowStatus(String flowName, String flowGroup, long flowExec List jobStatuses = ImmutableList.copyOf(retainStatusOfAnyFlowOrJobMatchingTag( jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId), tag)); ExecutionStatus flowExecutionStatus = - JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getDagManagerEnabled(), jobStatuses.iterator()); + JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatuses.iterator()); return jobStatuses.iterator().hasNext() ? new FlowStatus(flowName, flowGroup, flowExecutionId, jobStatuses.iterator(), flowExecutionStatus) : null; } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java index e5845fe373e..7508340921d 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java @@ -19,12 +19,10 @@ import java.util.Collections; import java.util.Comparator; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.stream.Collectors; import com.google.common.base.Supplier; @@ -60,15 +58,12 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker @Getter protected final MetricContext metricContext; - @Getter - protected final Boolean dagManagerEnabled; private final MultiContextIssueRepository issueRepository; - protected JobStatusRetriever(boolean dagManagerEnabled, MultiContextIssueRepository issueRepository) { + protected JobStatusRetriever(MultiContextIssueRepository issueRepository) { this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass()); this.issueRepository = Objects.requireNonNull(issueRepository); - this.dagManagerEnabled = dagManagerEnabled; } public abstract Iterator getJobStatusesForFlowExecution(String flowName, String flowGroup, @@ -186,7 +181,7 @@ protected List asFlowStatuses(List fl Comparator.comparing(this::getJobGroup).thenComparing(this::getJobName).thenComparing(this::getJobExecutionId) ).collect(Collectors.toList()))); return new FlowStatus(exec.getFlowName(), exec.getFlowGroup(), exec.getFlowExecutionId(), jobStatuses.iterator(), - getFlowStatusFromJobStatuses(dagManagerEnabled, jobStatuses.iterator())); + getFlowStatusFromJobStatuses(jobStatuses.iterator())); }).collect(Collectors.toList()); } @@ -227,31 +222,15 @@ public static boolean isFlowStatus(org.apache.gobblin.service.monitoring.JobStat && jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY) && jobStatus.getJobGroup().equals(JobStatusRetriever.NA_KEY); } - public static ExecutionStatus getFlowStatusFromJobStatuses(boolean dagManagerEnabled, Iterator jobStatusIterator) { + public static ExecutionStatus getFlowStatusFromJobStatuses(Iterator jobStatusIterator) { ExecutionStatus flowExecutionStatus = ExecutionStatus.$UNKNOWN; - if (dagManagerEnabled) { - while (jobStatusIterator.hasNext()) { - JobStatus jobStatus = jobStatusIterator.next(); - // Check if this is the flow status instead of a single job status - if (JobStatusRetriever.isFlowStatus(jobStatus)) { - flowExecutionStatus = ExecutionStatus.valueOf(jobStatus.getEventName()); - } - } - } else { - Set jobStatuses = new HashSet<>(); - while (jobStatusIterator.hasNext()) { - JobStatus jobStatus = jobStatusIterator.next(); - // because in absence of DagManager we do not get all flow level events, we will ignore the flow level events - // we actually get and purely calculate flow status based on flow statuses. - if (!JobStatusRetriever.isFlowStatus(jobStatus)) { - jobStatuses.add(ExecutionStatus.valueOf(jobStatus.getEventName())); - } + while (jobStatusIterator.hasNext()) { + JobStatus jobStatus = jobStatusIterator.next(); + // Check if this is the flow status instead of a single job status + if (JobStatusRetriever.isFlowStatus(jobStatus)) { + flowExecutionStatus = ExecutionStatus.valueOf(jobStatus.getEventName()); } - - List statusesInDescendingSalience = ImmutableList.of(ExecutionStatus.FAILED, ExecutionStatus.CANCELLED, - ExecutionStatus.RUNNING, ExecutionStatus.ORCHESTRATED, ExecutionStatus.COMPLETE); - flowExecutionStatus = statusesInDescendingSalience.stream().filter(jobStatuses::contains).findFirst().orElse(ExecutionStatus.$UNKNOWN); } return flowExecutionStatus; diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java index 1c5c534bef6..38017793691 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java @@ -77,7 +77,6 @@ public void testIsFlowRunning() { .jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("RUNNING").build(); jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, flowStatus).iterator(); when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator); - when(jobStatusRetriever.getDagManagerEnabled()).thenReturn(true); Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup)); } @@ -109,9 +108,9 @@ public void testGetFlowStatusesAcrossGroup() { // IMPORTANT: result invariants to honor - ordered by ascending flowName, all of same flowName adjacent, therein descending flowExecutionId // NOTE: Three copies of FlowStatus are needed for repeated use, due to mutable, non-rewinding `Iterator FlowStatus.getJobStatusIterator` - FlowStatus flowStatus = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2), jobStatusRetriever); - FlowStatus flowStatus2 = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2), jobStatusRetriever); - FlowStatus flowStatus3 = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2), jobStatusRetriever); + FlowStatus flowStatus = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2)); + FlowStatus flowStatus2 = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2)); + FlowStatus flowStatus3 = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2)); Mockito.when(jobStatusRetriever.getFlowStatusesForFlowGroupExecutions("myFlowGroup", 2)) .thenReturn(Collections.singletonList(flowStatus), Collections.singletonList(flowStatus2), Collections.singletonList(flowStatus3)); // (for three invocations) @@ -138,9 +137,9 @@ public void testGetFlowStatusesAcrossGroup() { Arrays.asList(f0jsmDep2))); } - private FlowStatus createFlowStatus(String flowGroup, String flowName, long flowExecutionId, List jobStatuses, JobStatusRetriever jobStatusRetriever) { + private FlowStatus createFlowStatus(String flowGroup, String flowName, long flowExecutionId, List jobStatuses) { return new FlowStatus(flowName, flowGroup, flowExecutionId, jobStatuses.iterator(), - JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getDagManagerEnabled(), jobStatuses.iterator())); + JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatuses.iterator())); } private JobStatus createFlowJobStatus(String flowGroup, String flowName, long flowExecutionId, ExecutionStatus status) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java index 03081b3ba2d..1998b541d96 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java @@ -46,9 +46,6 @@ public class GobblinServiceConfiguration { @Getter private final boolean isMultiActiveSchedulerEnabled; - @Getter - private final boolean isTopologyCatalogEnabled; - @Getter private final boolean isFlowCatalogEnabled; @@ -64,9 +61,6 @@ public class GobblinServiceConfiguration { @Getter private final boolean isGitConfigMonitorEnabled; - @Getter - private final boolean isDagManagerEnabled; - @Getter private final boolean isJobStatusMonitorEnabled; @@ -93,8 +87,6 @@ public GobblinServiceConfiguration(String serviceName, String serviceId, Config this.innerConfig = Objects.requireNonNull(config, "Config cannot be null"); this.serviceWorkDir = serviceWorkDir; - isTopologyCatalogEnabled = - ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY, true); isFlowCatalogEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY, true); @@ -113,8 +105,6 @@ public GobblinServiceConfiguration(String serviceName, String serviceId, Config this.isMultiActiveSchedulerEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_MULTI_ACTIVE_SCHEDULER_ENABLED_KEY, false); this.isHelixManagerEnabled = config.hasPath(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY); - this.isDagManagerEnabled = - ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED); this.isJobStatusMonitorEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, true); this.isSchedulerEnabled = diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java index af0c3461adf..e108dec14f5 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java @@ -19,18 +19,6 @@ import java.util.Objects; -import org.apache.gobblin.runtime.api.DagActionStore; -import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter; -import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter; -import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore; -import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler; -import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; -import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby; -import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby; -import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton; -import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor; -import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitorFactory; -import org.apache.gobblin.service.monitoring.GitConfigMonitor; import org.apache.helix.HelixManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,13 +37,18 @@ import javax.inject.Singleton; import org.apache.gobblin.restli.EmbeddedRestliServer; +import org.apache.gobblin.runtime.api.DagActionStore; import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment; +import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter; +import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter; +import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore; import org.apache.gobblin.runtime.instance.StandardGobblinInstanceLauncher; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog; import org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueRepository; import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler; import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository; +import org.apache.gobblin.runtime.util.InjectionNames; import org.apache.gobblin.scheduler.SchedulerService; import org.apache.gobblin.service.FlowConfigResourceLocalHandler; import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler; @@ -75,17 +68,24 @@ import org.apache.gobblin.service.modules.db.ServiceDatabaseProvider; import org.apache.gobblin.service.modules.db.ServiceDatabaseProviderImpl; import org.apache.gobblin.service.modules.orchestration.DagManager; +import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler; import org.apache.gobblin.service.modules.orchestration.Orchestrator; +import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler; import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandler; +import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby; import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandler; +import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby; import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler; import org.apache.gobblin.service.modules.topology.TopologySpecFactory; import org.apache.gobblin.service.modules.troubleshooter.MySqlMultiContextIssueRepository; import org.apache.gobblin.service.modules.utils.HelixUtils; -import org.apache.gobblin.runtime.util.InjectionNames; +import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton; +import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor; +import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitorFactory; import org.apache.gobblin.service.monitoring.FlowStatusGenerator; import org.apache.gobblin.service.monitoring.FsJobStatusRetriever; +import org.apache.gobblin.service.monitoring.GitConfigMonitor; import org.apache.gobblin.service.monitoring.JobStatusRetriever; import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor; import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory; @@ -190,9 +190,7 @@ public void configure(Binder binder) { binder.bind(SharedFlowMetricsSingleton.class); OptionalBinder.newOptionalBinder(binder, TopologyCatalog.class); - if (serviceConfig.isTopologyCatalogEnabled()) { - binder.bind(TopologyCatalog.class); - } + binder.bind(TopologyCatalog.class); if (serviceConfig.isTopologySpecFactoryEnabled()) { binder.bind(TopologySpecFactory.class) @@ -200,10 +198,7 @@ public void configure(Binder binder) { ServiceConfigKeys.TOPOLOGYSPEC_FACTORY_KEY, ServiceConfigKeys.DEFAULT_TOPOLOGY_SPEC_FACTORY)); } - OptionalBinder.newOptionalBinder(binder, DagManager.class); - if (serviceConfig.isDagManagerEnabled()) { - binder.bind(DagManager.class); - } + binder.bind(DagManager.class); OptionalBinder.newOptionalBinder(binder, HelixManager.class); if (serviceConfig.isHelixManagerEnabled()) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java index 9212eae91be..737c13876fd 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java @@ -31,9 +31,6 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.lang3.ObjectUtils; -import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; -import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor; -import org.apache.gobblin.service.monitoring.GitConfigMonitor; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -97,9 +94,12 @@ import org.apache.gobblin.service.modules.db.ServiceDatabaseManager; import org.apache.gobblin.service.modules.orchestration.DagManager; import org.apache.gobblin.service.modules.orchestration.Orchestrator; +import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler; import org.apache.gobblin.service.modules.topology.TopologySpecFactory; +import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor; import org.apache.gobblin.service.monitoring.FlowStatusGenerator; +import org.apache.gobblin.service.monitoring.GitConfigMonitor; import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor; import org.apache.gobblin.service.monitoring.SpecStoreChangeMonitor; import org.apache.gobblin.util.ConfigUtils; @@ -188,9 +188,10 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri @Inject(optional = true) protected GitConfigMonitor gitConfigMonitor; - @Inject(optional = true) + @Inject @Getter - protected DagManager dagManager; + @VisibleForTesting + public DagManager dagManager; @Inject(optional = true) protected KafkaJobStatusMonitor jobStatusMonitor; @@ -317,12 +318,10 @@ private void handleLeadershipChange(NotificationContext changeContext) { // TODO: surround by try/catch to disconnect from Helix and fail the leader transition if DagManager is not // transitioned properly - if (configuration.isDagManagerEnabled()) { - //Activate DagManager only if TopologyCatalog is initialized. If not; skip activation. - if (this.topologyCatalog.getInitComplete().getCount() == 0) { - this.dagManager.setActive(true); - this.eventBus.register(this.dagManager); - } + //Activate DagManager only if TopologyCatalog is initialized. If not; skip activation. + if (this.topologyCatalog.getInitComplete().getCount() == 0) { + this.dagManager.setActive(true); + this.eventBus.register(this.dagManager); } if (configuration.isOnlyAnnounceLeader()) { @@ -346,10 +345,8 @@ private void handleLeadershipChange(NotificationContext changeContext) { this.gitConfigMonitor.setActive(false); } - if (configuration.isDagManagerEnabled()) { - this.dagManager.setActive(false); - this.eventBus.unregister(this.dagManager); - } + this.dagManager.setActive(false); + this.eventBus.unregister(this.dagManager); if (configuration.isOnlyAnnounceLeader()) { this.d2Announcer.markDownServer(); @@ -359,9 +356,7 @@ private void handleLeadershipChange(NotificationContext changeContext) { } private void registerServicesInLauncher(){ - if (configuration.isTopologyCatalogEnabled()) { - this.serviceLauncher.addService(topologyCatalog); - } + this.serviceLauncher.addService(topologyCatalog); if (configuration.isFlowCatalogEnabled()) { this.serviceLauncher.addService(flowCatalog); @@ -371,9 +366,7 @@ private void registerServicesInLauncher(){ } } - if (configuration.isDagManagerEnabled()) { - this.serviceLauncher.addService(dagManager); - } + this.serviceLauncher.addService(dagManager); this.serviceLauncher.addService(databaseManager); this.serviceLauncher.addService(issueRepository); @@ -526,10 +519,8 @@ public void start() throws ApplicationException { //Activate the DagManager service, after the topologyCatalog has been initialized. if (!this.helixManager.isPresent() || this.helixManager.get().isLeader()){ - if (configuration.isDagManagerEnabled()) { - this.dagManager.setActive(true); - this.eventBus.register(this.dagManager); - } + this.dagManager.setActive(true); + this.eventBus.register(this.dagManager); } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index 1b7b2de43d4..ca38eda0c1b 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -17,20 +17,6 @@ package org.apache.gobblin.service.modules.orchestration; -import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.eventbus.Subscribe; -import com.google.common.util.concurrent.AbstractIdleService; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigException; -import com.typesafe.config.ConfigFactory; import java.io.IOException; import java.net.URI; import java.util.ArrayList; @@ -51,9 +37,26 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.eventbus.Subscribe; +import com.google.common.util.concurrent.AbstractIdleService; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigException; +import com.typesafe.config.ConfigFactory; + import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.instrumented.Instrumented; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index 92ab024ff87..85c8248fb75 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -17,12 +17,6 @@ package org.apache.gobblin.service.modules.orchestration; -import com.codahale.metrics.Counter; -import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.typesafe.config.Config; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.net.URI; @@ -31,12 +25,24 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.reflect.ConstructorUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.typesafe.config.Config; + import javax.annotation.Nonnull; import javax.inject.Inject; import javax.inject.Singleton; import lombok.Getter; import lombok.Setter; -import org.apache.commons.lang3.reflect.ConstructorUtils; + import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; @@ -68,8 +74,6 @@ import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** @@ -83,8 +87,8 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { protected final Logger _log; protected final SpecCompiler specCompiler; - protected final Optional topologyCatalog; - protected final Optional dagManager; + protected final TopologyCatalog topologyCatalog; + protected final DagManager dagManager; protected final MetricContext metricContext; @@ -110,7 +114,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { private final ClassAliasResolver aliasResolver; @Inject - public Orchestrator(Config config, Optional topologyCatalog, Optional dagManager, + public Orchestrator(Config config, TopologyCatalog topologyCatalog, DagManager dagManager, Optional log, FlowStatusGenerator flowStatusGenerator, Optional flowTriggerHandler, SharedFlowMetricsSingleton sharedFlowMetricsSingleton, Optional flowCatalog) { _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); @@ -135,9 +139,7 @@ public Orchestrator(Config config, Optional topologyCatalog, Op } //At this point, the TopologySpecMap is initialized by the SpecCompiler. Pass the TopologySpecMap to the DagManager. - if (this.dagManager.isPresent()) { - this.dagManager.get().setTopologySpecMap(getSpecCompiler().getTopologySpecMap()); - } + this.dagManager.setTopologySpecMap(getSpecCompiler().getTopologySpecMap()); this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.specCompiler.getClass()); this.flowOrchestrationSuccessFulMeter = this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER); @@ -182,9 +184,7 @@ public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) { public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion, Properties headers) { _log.info("Spec deletion detected: " + deletedSpecURI + "/" + deletedSpecVersion); - if (topologyCatalog.isPresent()) { - this.specCompiler.onDeleteSpec(deletedSpecURI, deletedSpecVersion, headers); - } + this.specCompiler.onDeleteSpec(deletedSpecURI, deletedSpecVersion, headers); } /** {@inheritDoc} */ @@ -209,13 +209,12 @@ public void onUpdateSpec(Spec updatedSpec) { } catch (Exception e) { _log.error("Failed to update Spec: " + updatedSpec, e); } - } public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMillis, boolean isReminderEvent) throws Exception { // Add below waiting because TopologyCatalog and FlowCatalog service can be launched at the same time - this.topologyCatalog.get().getInitComplete().await(); + this.topologyCatalog.getInitComplete().await(); //Wait for the SpecCompiler to become healthy. this.getSpecCompiler().awaitHealthy(); @@ -279,39 +278,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil flowCompilationTimer.stop(flowMetadata); // Depending on if DagManager is present, handle execution - if (this.dagManager.isPresent()) { - submitFlowToDagManager(flowSpec, compiledDag); - } else { - // Schedule all compiled JobSpecs on their respective Executor - for (Dag.DagNode dagNode : compiledDag.getNodes()) { - DagManagerUtils.incrementJobAttempt(dagNode); - JobExecutionPlan jobExecutionPlan = dagNode.getValue(); - - // Run this spec on selected executor - SpecProducer producer = null; - try { - producer = jobExecutionPlan.getSpecExecutor().getProducer().get(); - Spec jobSpec = jobExecutionPlan.getJobSpec(); - - if (!((JobSpec) jobSpec).getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) { - _log.warn("JobSpec does not contain flowExecutionId: {}", jobSpec); - } - - Map jobMetadata = TimingEventUtils.getJobMetadata(flowMetadata, jobExecutionPlan); - _log.info(String.format("Going to orchestrate JobSpec: %s on Executor: %s", jobSpec, producer)); - - TimingEvent jobOrchestrationTimer = new TimingEvent(this.eventSubmitter, TimingEvent.LauncherTimings.JOB_ORCHESTRATED); - - producer.addSpec(jobSpec); - - jobOrchestrationTimer.stop(jobMetadata); - } catch (Exception e) { - _log.error("Cannot successfully setup spec: " + jobExecutionPlan.getJobSpec() + " on executor: " + producer - + " for flow: " + spec, e); - } - } - deleteSpecFromCatalogIfAdhoc(flowSpec); - } + submitFlowToDagManager(flowSpec, compiledDag); } } else { Instrumented.markMeter(this.flowOrchestrationFailedMeter); @@ -337,7 +304,7 @@ public void submitFlowToDagManager(FlowSpec flowSpec, Dag jobE throws IOException { try { // Send the dag to the DagManager - this.dagManager.get().addDag(jobExecutionPlanDag, true, true); + this.dagManager.addDag(jobExecutionPlanDag, true, true); /* Adhoc flows can be deleted after persisting it in DagManager as the DagManager's failure recovery method ensures @@ -364,10 +331,8 @@ public void remove(Spec spec, Properties headers) throws IOException { if (spec instanceof FlowSpec) { //Send the dag to the DagManager to stop it. //Also send it to the SpecProducer to do any cleanup tasks on SpecExecutor. - if (this.dagManager.isPresent()) { - _log.info("Forwarding cancel request for flow URI {} to DagManager.", spec.getUri()); - this.dagManager.get().stopDag(spec.getUri()); - } + _log.info("Forwarding cancel request for flow URI {} to DagManager.", spec.getUri()); + this.dagManager.stopDag(spec.getUri()); // We need to recompile the flow to find the spec producer, // If compilation result is different, its remove request can go to some different spec producer deleteFromExecutor(spec, headers); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index 6d11b3bd3f1..cd1bd421f7d 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -17,12 +17,6 @@ package org.apache.gobblin.service.modules.scheduler; -import com.codahale.metrics.MetricFilter; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.collect.Maps; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; import java.io.IOException; import java.net.URI; import java.text.ParseException; @@ -38,12 +32,35 @@ import java.util.Map; import java.util.Properties; import java.util.TimeZone; + +import org.apache.commons.lang.StringUtils; +import org.apache.helix.HelixManager; +import org.quartz.CronExpression; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.InterruptableJob; +import org.quartz.JobDataMap; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.SchedulerException; +import org.quartz.Trigger; +import org.quartz.UnableToInterruptJobException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.codahale.metrics.MetricFilter; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.collect.Maps; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + import javax.inject.Inject; import javax.inject.Named; import javax.inject.Singleton; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang.StringUtils; + import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.instrumented.Instrumented; @@ -76,19 +93,6 @@ import org.apache.gobblin.service.monitoring.FlowStatusGenerator; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PropertiesUtils; -import org.apache.helix.HelixManager; -import org.quartz.CronExpression; -import org.quartz.DisallowConcurrentExecution; -import org.quartz.InterruptableJob; -import org.quartz.JobDataMap; -import org.quartz.JobDetail; -import org.quartz.JobExecutionContext; -import org.quartz.JobExecutionException; -import org.quartz.SchedulerException; -import org.quartz.Trigger; -import org.quartz.UnableToInterruptJobException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX; @@ -168,7 +172,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata @Inject public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String serviceName, Config config, - Optional helixManager, Optional flowCatalog, Optional topologyCatalog, + Optional helixManager, Optional flowCatalog, Orchestrator orchestrator, SchedulerService schedulerService, Optional quotaManager, Optional log, @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean isWarmStandbyEnabled, Optional flowTriggerHandler) throws Exception { @@ -207,12 +211,12 @@ public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser } public GobblinServiceJobScheduler(String serviceName, Config config, FlowStatusGenerator flowStatusGenerator, - Optional helixManager, Optional flowCatalog, Optional topologyCatalog, - Optional dagManager, Optional quotaManager, SchedulerService schedulerService, + Optional helixManager, Optional flowCatalog, TopologyCatalog topologyCatalog, + DagManager dagManager, Optional quotaManager, SchedulerService schedulerService, Optional log, boolean isWarmStandbyEnabled, Optional flowTriggerHandler, SharedFlowMetricsSingleton sharedFlowMetricsSingleton) throws Exception { - this(serviceName, config, helixManager, flowCatalog, topologyCatalog, + this(serviceName, config, helixManager, flowCatalog, new Orchestrator(config, topologyCatalog, dagManager, log, flowStatusGenerator, flowTriggerHandler, sharedFlowMetricsSingleton, flowCatalog), schedulerService, quotaManager, log, isWarmStandbyEnabled, flowTriggerHandler); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java index 15fa2eee64c..e27ce534e7f 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java @@ -43,8 +43,6 @@ import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory; import org.apache.gobblin.metastore.FsStateStore; import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository; -import org.apache.gobblin.service.ServiceConfigKeys; -import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.function.CheckedExceptionFunction; @@ -63,8 +61,7 @@ public class FsJobStatusRetriever extends JobStatusRetriever { @Inject public FsJobStatusRetriever(Config config, MultiContextIssueRepository issueRepository) { - super(ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, - ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED), issueRepository); + super(issueRepository); this.stateStore = (FileContextBasedFsStateStore) new FileContextBasedFsStateStoreFactory(). createStateStore(config.getConfig(CONF_PREFIX), State.class); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java index 84dbb60ce6c..887ee5827f3 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java @@ -37,7 +37,6 @@ import org.apache.gobblin.runtime.spec_executorInstance.LocalFsSpecProducer; import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository; import org.apache.gobblin.service.ExecutionStatus; -import org.apache.gobblin.service.ServiceConfigKeys; /** @@ -54,7 +53,7 @@ public class LocalFsJobStatusRetriever extends JobStatusRetriever { // Do not use a state store for this implementation, just look at the job folder that @LocalFsSpecProducer writes to @Inject public LocalFsJobStatusRetriever(Config config, MultiContextIssueRepository issueRepository) { - super(ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED, issueRepository); + super(issueRepository); this.specProducerPath = config.getString(CONF_PREFIX + LocalFsSpecProducer.LOCAL_FS_PRODUCER_PATH_KEY); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java index 0b593165dea..6b8d1e25186 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java @@ -35,8 +35,6 @@ import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory; import org.apache.gobblin.metrics.ServiceMetricNames; import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository; -import org.apache.gobblin.service.ServiceConfigKeys; -import org.apache.gobblin.util.ConfigUtils; /** @@ -65,8 +63,7 @@ private interface SupplierThrowingIO { @Inject public MysqlJobStatusRetriever(Config config, MultiContextIssueRepository issueRepository) throws ReflectiveOperationException { - super(ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, - ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED), issueRepository); + super(issueRepository); config = config.getConfig(MYSQL_JOB_STATUS_RETRIEVER_PREFIX).withFallback(config); this.stateStore = (MysqlJobStatusStateStoreFactory.class.newInstance()).createStateStore(config, State.class); } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java index 08e89ef0b29..38bfc5bee08 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java @@ -28,9 +28,6 @@ import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingServer; - -import org.apache.gobblin.service.modules.orchestration.AbstractUserQuotaManager; -import org.apache.gobblin.service.modules.orchestration.ServiceAzkabanConfigKeys; import org.apache.hadoop.fs.Path; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jgit.api.Git; @@ -61,13 +58,20 @@ import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.Spec; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; -import org.apache.gobblin.service.monitoring.GitConfigMonitor; import org.apache.gobblin.service.modules.core.GobblinServiceManager; import org.apache.gobblin.service.modules.flow.MockedSpecCompiler; +import org.apache.gobblin.service.modules.orchestration.AbstractUserQuotaManager; +import org.apache.gobblin.service.modules.orchestration.DagManager; +import org.apache.gobblin.service.modules.orchestration.ServiceAzkabanConfigKeys; import org.apache.gobblin.service.monitoring.FsJobStatusRetriever; +import org.apache.gobblin.service.monitoring.GitConfigMonitor; import org.apache.gobblin.testing.AssertWithBackoff; import org.apache.gobblin.util.ConfigUtils; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.spy; + public class GobblinServiceManagerTest { @@ -193,14 +197,29 @@ public void setup() throws Exception { this.gitForPush.commit().setMessage("First commit").call(); this.gitForPush.push().setRemote("origin").setRefSpecs(new RefSpec("master")).call(); - this.gobblinServiceManager = GobblinServiceManager.create("CoreService", "1", - ConfigUtils.propertiesToConfig(serviceCoreProperties), new Path(SERVICE_WORK_DIR)); + this.gobblinServiceManager = createTestGobblinServiceManager(serviceCoreProperties); + this.gobblinServiceManager.start(); this.flowConfigClient = new FlowConfigV2Client(String.format("http://127.0.0.1:%s/", this.gobblinServiceManager.getRestLiServerListeningURI().getPort()), transportClientProperties); } + public static GobblinServiceManager createTestGobblinServiceManager(Properties serviceCoreProperties) { + return createTestGobblinServiceManager(serviceCoreProperties, "CoreService", "1", SERVICE_WORK_DIR); + } + + public static GobblinServiceManager createTestGobblinServiceManager(Properties serviceCoreProperties, + String serviceName, String serviceId, String serviceWorkDir) { + GobblinServiceManager gobblinServiceManager = GobblinServiceManager.create(serviceName, serviceId, + ConfigUtils.propertiesToConfig(serviceCoreProperties), new Path(serviceWorkDir)); + + DagManager spiedDagManager = spy(gobblinServiceManager.getDagManager()); + doNothing().when(spiedDagManager).setActive(anyBoolean()); + gobblinServiceManager.dagManager = spiedDagManager; + return gobblinServiceManager; + } + private void cleanUpDir(String dir) throws Exception { File specStoreDir = new File(dir); if (specStoreDir.exists()) { @@ -555,8 +574,7 @@ public void testBadUpdate() throws Exception { private void serviceReboot() throws Exception { this.gobblinServiceManager.stop(); - this.gobblinServiceManager = GobblinServiceManager.create("CoreService", "1", - ConfigUtils.propertiesToConfig(serviceCoreProperties), new Path(SERVICE_WORK_DIR)); + this.gobblinServiceManager = createTestGobblinServiceManager(serviceCoreProperties); this.gobblinServiceManager.start(); this.flowConfigClient = new FlowConfigV2Client(String.format("http://127.0.0.1:%s/", this.gobblinServiceManager.getRestLiServerListeningURI().getPort()), transportClientProperties); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java index 29782af114d..3a61a730107 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java @@ -23,7 +23,6 @@ import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingServer; -import org.apache.hadoop.fs.Path; import org.eclipse.jetty.http.HttpStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,12 +45,12 @@ import org.apache.gobblin.service.FlowConfig; import org.apache.gobblin.service.FlowConfigClient; import org.apache.gobblin.service.FlowId; +import org.apache.gobblin.service.GobblinServiceManagerTest; import org.apache.gobblin.service.Schedule; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.TestServiceDatabaseConfig; import org.apache.gobblin.service.modules.utils.HelixUtils; import org.apache.gobblin.service.monitoring.FsJobStatusRetriever; -import org.apache.gobblin.util.ConfigUtils; @Test @@ -173,13 +172,13 @@ public void setup() throws Exception { node2ServiceCoreProperties.put(QUARTZ_THREAD_POOL_COUNT, 3); // Start Node 1 - this.node1GobblinServiceManager = GobblinServiceManager.create("CoreService1", "1", - ConfigUtils.propertiesToConfig(node1ServiceCoreProperties), new Path(NODE_1_SERVICE_WORK_DIR)); + this.node1GobblinServiceManager = GobblinServiceManagerTest.createTestGobblinServiceManager( + node1ServiceCoreProperties, "CoreService1", "1", NODE_1_SERVICE_WORK_DIR); this.node1GobblinServiceManager.start(); // Start Node 2 - this.node2GobblinServiceManager = GobblinServiceManager.create("CoreService2", "2", - ConfigUtils.propertiesToConfig(node2ServiceCoreProperties), new Path(NODE_2_SERVICE_WORK_DIR)); + this.node2GobblinServiceManager = GobblinServiceManagerTest.createTestGobblinServiceManager( + node2ServiceCoreProperties, "CoreService2", "2", NODE_2_SERVICE_WORK_DIR); this.node2GobblinServiceManager.start(); // Initialize Node 1 Client diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java index 4f6fbded9d9..7d2648a28f9 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java @@ -46,12 +46,12 @@ import org.apache.gobblin.service.FlowConfig; import org.apache.gobblin.service.FlowConfigClient; import org.apache.gobblin.service.FlowId; +import org.apache.gobblin.service.GobblinServiceManagerTest; import org.apache.gobblin.service.Schedule; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.TestServiceDatabaseConfig; import org.apache.gobblin.service.modules.utils.HelixUtils; import org.apache.gobblin.service.monitoring.FsJobStatusRetriever; -import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PortUtils; @@ -177,13 +177,13 @@ public void setup() throws Exception { node2ServiceCoreProperties.put(ServiceConfigKeys.SERVICE_PORT, port2); // Start Node 1 - this.node1GobblinServiceManager = GobblinServiceManager.create("RedirectCoreService1", "1", - ConfigUtils.propertiesToConfig(node1ServiceCoreProperties), new Path(NODE_1_SERVICE_WORK_DIR)); + this.node1GobblinServiceManager = GobblinServiceManagerTest.createTestGobblinServiceManager( + node1ServiceCoreProperties, "RedirectCoreService1", "1", NODE_1_SERVICE_WORK_DIR); this.node1GobblinServiceManager.start(); // Start Node 2 - this.node2GobblinServiceManager = GobblinServiceManager.create("RedirectCoreService2", "2", - ConfigUtils.propertiesToConfig(node2ServiceCoreProperties), new Path(NODE_2_SERVICE_WORK_DIR)); + this.node2GobblinServiceManager = GobblinServiceManagerTest.createTestGobblinServiceManager( + node2ServiceCoreProperties, "RedirectCoreService2", "2", NODE_2_SERVICE_WORK_DIR); this.node2GobblinServiceManager.start(); // Initialize Node 1 Client diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java index 514e975182a..8d64b0978fe 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java @@ -17,24 +17,33 @@ package org.apache.gobblin.service.modules.orchestration; -import com.codahale.metrics.MetricRegistry; -import com.google.common.base.Optional; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.typesafe.config.Config; import java.io.File; import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; import java.util.List; import java.util.Properties; + import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.codahale.metrics.MetricRegistry; +import com.google.common.base.Optional; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.typesafe.config.Config; + import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.ServiceMetricNames; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.api.SpecCatalogListener; import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher; @@ -46,15 +55,10 @@ import org.apache.gobblin.service.monitoring.FlowStatusGenerator; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PathUtils; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; -import static org.mockito.Mockito.*; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; public class OrchestratorTest { @@ -73,10 +77,7 @@ public class OrchestratorTest { private TopologySpec topologySpec; private FlowCatalog flowCatalog; - private SpecCatalogListener mockListener; private FlowSpec flowSpec; - private FlowStatusGenerator mockStatusGenerator; - private FlowTriggerHandler _mockFlowTriggerHandler; private Orchestrator orchestrator; @BeforeClass @@ -103,12 +104,14 @@ public void setup() throws Exception { Optional.of(logger), Optional.absent(), true, true); this.serviceLauncher.addService(flowCatalog); - this.mockStatusGenerator = mock(FlowStatusGenerator.class); + FlowStatusGenerator mockStatusGenerator = mock(FlowStatusGenerator.class); + FlowTriggerHandler mockFlowTriggerHandler = mock(FlowTriggerHandler.class); + DagManager mockDagManager = mock(DagManager.class); + doNothing().when(mockDagManager).setTopologySpecMap(anyMap()); - this._mockFlowTriggerHandler = mock(FlowTriggerHandler.class); this.orchestrator = new Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties), - Optional.of(this.topologyCatalog), Optional.absent(), Optional.of(logger), this.mockStatusGenerator, - Optional.of(this._mockFlowTriggerHandler), new SharedFlowMetricsSingleton( + this.topologyCatalog, mockDagManager, Optional.of(logger), mockStatusGenerator, + Optional.of(mockFlowTriggerHandler), new SharedFlowMetricsSingleton( ConfigUtils.propertiesToConfig(orchestratorProperties)), Optional.of(mock(FlowCatalog.class))); this.topologyCatalog.addListener(orchestrator); this.flowCatalog.addListener(orchestrator); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java index 0fdfc894b94..45f8323a3e6 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java @@ -50,7 +50,6 @@ import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; import org.apache.gobblin.runtime.spec_catalog.FlowCatalogTest; -import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog; import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; import org.apache.gobblin.scheduler.SchedulerService; import org.apache.gobblin.service.ServiceConfigKeys; @@ -58,9 +57,9 @@ import org.apache.gobblin.service.modules.flow.SpecCompiler; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.orchestration.AbstractUserQuotaManager; +import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler; import org.apache.gobblin.service.modules.orchestration.InMemoryUserQuotaManager; import org.apache.gobblin.service.modules.orchestration.Orchestrator; -import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler; import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory; @@ -135,7 +134,7 @@ public void testJobSchedulerInit() throws Throwable { // Mock a GaaS scheduler. TestGobblinServiceJobScheduler scheduler = new TestGobblinServiceJobScheduler("testscheduler", - ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(quotaManager), null, false); + ConfigFactory.empty(), Optional.of(flowCatalog), mockOrchestrator, Optional.of(quotaManager), null, false); SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class); Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler); @@ -221,7 +220,7 @@ public void testJobSchedulerInitWithFailedSpec() throws Throwable { // Mock a GaaS scheduler. TestGobblinServiceJobScheduler scheduler = new TestGobblinServiceJobScheduler("testscheduler", - ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(new InMemoryUserQuotaManager(quotaConfig)), null, false); + ConfigFactory.empty(), Optional.of(flowCatalog), mockOrchestrator, Optional.of(new InMemoryUserQuotaManager(quotaConfig)), null, false); SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class); Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler); @@ -284,7 +283,7 @@ public void testJobSchedulerUnschedule() throws Throwable { SchedulerService schedulerService = new SchedulerService(new Properties()); // Mock a GaaS scheduler. TestGobblinServiceJobScheduler scheduler = new TestGobblinServiceJobScheduler("testscheduler", - ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(new InMemoryUserQuotaManager(quotaConfig)), schedulerService, false); + ConfigFactory.empty(), Optional.of(flowCatalog), mockOrchestrator, Optional.of(new InMemoryUserQuotaManager(quotaConfig)), schedulerService, false); schedulerService.startAsync().awaitRunning(); scheduler.startUp(); @@ -355,7 +354,7 @@ public void testJobSchedulerAddFlowQuotaExceeded() throws Exception { SchedulerService schedulerService = new SchedulerService(new Properties()); // Mock a GaaS scheduler not in warm standby mode GobblinServiceJobScheduler scheduler = new GobblinServiceJobScheduler("testscheduler", - ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService, + ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), mockOrchestrator, schedulerService, Optional.of(new InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), false, Optional.of(Mockito.mock( FlowTriggerHandler.class))); @@ -374,9 +373,9 @@ public void testJobSchedulerAddFlowQuotaExceeded() throws Exception { //Mock a GaaS scheduler in warm standby mode, where we don't check quota GobblinServiceJobScheduler schedulerWithWarmStandbyEnabled = new GobblinServiceJobScheduler("testscheduler", - ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService, - Optional.of(new InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), true, Optional.of(Mockito.mock( - FlowTriggerHandler.class))); + ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), mockOrchestrator, schedulerService, + Optional.of(new InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), true, + Optional.of(Mockito.mock(FlowTriggerHandler.class))); schedulerWithWarmStandbyEnabled.startUp(); schedulerWithWarmStandbyEnabled.setActive(true); @@ -396,9 +395,9 @@ class TestGobblinServiceJobScheduler extends GobblinServiceJobScheduler { private boolean hasScheduler = false; public TestGobblinServiceJobScheduler(String serviceName, Config config, - Optional flowCatalog, Optional topologyCatalog, Orchestrator orchestrator, Optional quotaManager, + Optional flowCatalog, Orchestrator orchestrator, Optional quotaManager, SchedulerService schedulerService, boolean isWarmStandbyEnabled) throws Exception { - super(serviceName, config, Optional.absent(), flowCatalog, topologyCatalog, orchestrator, schedulerService, + super(serviceName, config, Optional.absent(), flowCatalog, orchestrator, schedulerService, quotaManager, Optional.absent(), isWarmStandbyEnabled, Optional.of(Mockito.mock(FlowTriggerHandler.class))); if (schedulerService != null) { hasScheduler = true; diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java index 257726e1dbe..5af639cb536 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java @@ -32,7 +32,6 @@ import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository; import org.apache.gobblin.service.ExecutionStatus; -import org.apache.gobblin.service.ServiceConfigKeys; import static org.mockito.Mockito.mock; @@ -46,9 +45,7 @@ public void setUp() throws Exception { cleanUpDir(); Config config = ConfigFactory.empty() .withValue(FsJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, - ConfigValueFactory.fromAnyRef(stateStoreDir)) - .withValue(ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, - ConfigValueFactory.fromAnyRef("true")); + ConfigValueFactory.fromAnyRef(stateStoreDir)); this.jobStatusRetriever = new FsJobStatusRetriever(config, mock(MultiContextIssueRepository.class)); } @@ -83,31 +80,31 @@ public void testGetFlowStatusFromJobStatuses() throws Exception { addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name()); Assert.assertEquals(ExecutionStatus.COMPILED, - jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); + jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME); Assert.assertEquals(ExecutionStatus.COMPILED, - jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); + jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.ORCHESTRATED.name()); Assert.assertEquals(ExecutionStatus.ORCHESTRATED, - jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); + jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME); Assert.assertEquals(ExecutionStatus.ORCHESTRATED, - jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); + jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.RUNNING.name()); Assert.assertEquals(ExecutionStatus.RUNNING, - jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); + jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME); Assert.assertEquals(ExecutionStatus.RUNNING, - jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); + jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPLETE.name()); Assert.assertEquals(ExecutionStatus.COMPLETE, - jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); + jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); } @Override diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTestWithoutDagManager.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTestWithoutDagManager.java deleted file mode 100644 index c91fc5be374..00000000000 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTestWithoutDagManager.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service.monitoring; - -import java.io.File; -import java.io.IOException; - -import org.apache.commons.io.FileUtils; -import org.testng.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigValueFactory; - -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository; -import org.apache.gobblin.service.ExecutionStatus; - -import static org.mockito.Mockito.mock; - - -public class FsJobStatusRetrieverTestWithoutDagManager extends JobStatusRetrieverTest { - - private String stateStoreDir = "/tmp/jobStatusRetrieverTest/statestore"; - - @BeforeClass - public void setUp() throws Exception { - cleanUpDir(); - Config config = ConfigFactory.empty().withValue(FsJobStatusRetriever.CONF_PREFIX + "." + ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, - ConfigValueFactory.fromAnyRef(stateStoreDir)); - this.jobStatusRetriever = new FsJobStatusRetriever(config, mock(MultiContextIssueRepository.class)); - } - - @Test - public void testGetJobStatusesForFlowExecution() throws IOException { - super.testGetJobStatusesForFlowExecution(); - } - - @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution") - public void testJobTiming() throws Exception { - super.testJobTiming(); - } - - @Test (dependsOnMethods = "testJobTiming") - public void testOutOfOrderJobTimingEvents() throws IOException { - super.testOutOfOrderJobTimingEvents(); - } - - @Test (dependsOnMethods = "testJobTiming") - public void testGetJobStatusesForFlowExecution1() { - super.testGetJobStatusesForFlowExecution1(); - } - - @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution1") - public void testGetLatestExecutionIdsForFlow() throws Exception { - super.testGetLatestExecutionIdsForFlow(); - } - - @Test (dependsOnMethods = "testGetLatestExecutionIdsForFlow") - public void testGetFlowStatusFromJobStatuses() throws Exception { - long flowExecutionId = 1237L; - - addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name()); - Assert.assertEquals(ExecutionStatus.$UNKNOWN, - jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); - - addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.ORCHESTRATED.name()); - Assert.assertEquals(ExecutionStatus.$UNKNOWN, - jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); - - addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME); - Assert.assertEquals(ExecutionStatus.ORCHESTRATED, - jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); - - addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.RUNNING.name()); - Assert.assertEquals(ExecutionStatus.ORCHESTRATED, - jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); - - addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME); - Assert.assertEquals(ExecutionStatus.RUNNING, - jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); - - addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPLETE.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME); - Assert.assertEquals(ExecutionStatus.RUNNING, - jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); - - addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name()); - Assert.assertEquals(ExecutionStatus.COMPLETE, - jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); - } - - @Override - protected void cleanUpDir() throws Exception { - File specStoreDir = new File(this.stateStoreDir); - if (specStoreDir.exists()) { - FileUtils.deleteDirectory(specStoreDir); - } - } -} \ No newline at end of file diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java index 00b2598c87a..345b5d7cc95 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java @@ -175,7 +175,7 @@ public void testGetJobStatusesForFlowExecution() throws IOException { Assert.assertEquals(jobStatus.getJobGroup(), jobGroup); Assert.assertFalse(jobStatusIterator.hasNext()); Assert.assertEquals(ExecutionStatus.RUNNING, - this.jobStatusRetriever.getFlowStatusFromJobStatuses(this.jobStatusRetriever.dagManagerEnabled, this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); + this.jobStatusRetriever.getFlowStatusFromJobStatuses(this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_2, ExecutionStatus.RUNNING.name()); jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java index f26ebf59e26..66d552e54a9 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java @@ -56,7 +56,6 @@ public void setUp() throws Exception { configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, jdbcUrl); configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER); configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD); - configBuilder.addPrimitive(ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, "true"); this.jobStatusRetriever = new MysqlJobStatusRetriever(configBuilder.build(), mock(MultiContextIssueRepository.class)); @@ -95,31 +94,31 @@ public void testGetFlowStatusFromJobStatuses() throws Exception { addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name()); Assert.assertEquals(ExecutionStatus.COMPILED, - jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); + jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME); Assert.assertEquals(ExecutionStatus.COMPILED, - jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); + jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.ORCHESTRATED.name()); Assert.assertEquals(ExecutionStatus.ORCHESTRATED, - jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); + jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME); Assert.assertEquals(ExecutionStatus.ORCHESTRATED, - jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); + jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.RUNNING.name()); Assert.assertEquals(ExecutionStatus.RUNNING, - jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); + jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME); Assert.assertEquals(ExecutionStatus.RUNNING, - jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); + jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPLETE.name()); Assert.assertEquals(ExecutionStatus.COMPLETE, - jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); + jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); } @Test diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTestWithoutDagManager.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTestWithoutDagManager.java deleted file mode 100644 index 03a62192765..00000000000 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTestWithoutDagManager.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service.monitoring; - -import java.io.IOException; - -import org.testng.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import org.apache.gobblin.config.ConfigBuilder; -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.configuration.State; -import org.apache.gobblin.metastore.MysqlJobStatusStateStore; -import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase; -import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory; -import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository; -import org.apache.gobblin.service.ExecutionStatus; -import org.apache.gobblin.service.ServiceConfigKeys; - -import static org.mockito.Mockito.mock; - - -/** - * Flow status can be different when DagManager is not being used. So we need separate unit tests for testing job/flow - * status when DagManager is disabled. - */ -public class MysqlJobStatusRetrieverTestWithoutDagManager extends JobStatusRetrieverTest { - private MysqlJobStatusStateStore dbJobStateStore; - private static final String TEST_USER = "testUser"; - private static final String TEST_PASSWORD = "testPassword"; - - @BeforeClass - @Override - public void setUp() throws Exception { - ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get(); - String jdbcUrl = testMetastoreDatabase.getJdbcUrl(); - - ConfigBuilder configBuilder = ConfigBuilder.create(); - configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, jdbcUrl); - configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER); - configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD); - - configBuilder.addPrimitive(ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, "false"); - this.jobStatusRetriever = - new MysqlJobStatusRetriever(configBuilder.build(), mock(MultiContextIssueRepository.class)); - this.dbJobStateStore = ((MysqlJobStatusRetriever) this.jobStatusRetriever).getStateStore(); - cleanUpDir(); - } - - @Test - public void testGetJobStatusesForFlowExecution() throws IOException { - super.testGetJobStatusesForFlowExecution(); - } - - @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution") - public void testJobTiming() throws Exception { - super.testJobTiming(); - } - - @Test (dependsOnMethods = "testJobTiming") - public void testOutOfOrderJobTimingEvents() throws IOException { - super.testOutOfOrderJobTimingEvents(); - } - - @Test (dependsOnMethods = "testJobTiming") - public void testGetJobStatusesForFlowExecution1() { - super.testGetJobStatusesForFlowExecution1(); - } - - @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution1") - public void testGetLatestExecutionIdsForFlow() throws Exception { - super.testGetLatestExecutionIdsForFlow(); - } - - @Test (dependsOnMethods = "testGetLatestExecutionIdsForFlow") - public void testGetFlowStatusFromJobStatuses() throws Exception { - long flowExecutionId = 1237L; - - addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name()); - Assert.assertEquals(ExecutionStatus.$UNKNOWN, - JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); - - addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.ORCHESTRATED.name()); - Assert.assertEquals(ExecutionStatus.$UNKNOWN, - JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); - - addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME); - Assert.assertEquals(ExecutionStatus.ORCHESTRATED, - JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); - - addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.RUNNING.name()); - Assert.assertEquals(ExecutionStatus.ORCHESTRATED, - JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); - - addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.RUNNING.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME); - Assert.assertEquals(ExecutionStatus.RUNNING, - JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); - - addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, ExecutionStatus.COMPLETE.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME); - Assert.assertEquals(ExecutionStatus.RUNNING, - JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); - - addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name()); - Assert.assertEquals(ExecutionStatus.COMPLETE, - JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled, jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, flowExecutionId))); - } - - @Override - void cleanUpDir() throws Exception { - this.dbJobStateStore.delete(KafkaJobStatusMonitor.jobStatusStoreName(FLOW_GROUP, FLOW_NAME)); - } -}