Skip to content

Commit e49f7ff

Browse files
committed
remove optionality of dag manager and topology catalog
1 parent 81757ce commit e49f7ff

File tree

20 files changed

+155
-249
lines changed

20 files changed

+155
-249
lines changed

gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,13 @@ public class ServiceConfigKeys {
2929
public static final String GOBBLIN_ORCHESTRATOR_LISTENER_CLASS = "org.apache.gobblin.service.modules.orchestration.Orchestrator";
3030

3131
// Gobblin Service Manager Keys
32-
public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled";
3332
public static final String GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "flowCatalog.enabled";
3433
public static final String GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "scheduler.enabled";
3534
public static final String GOBBLIN_SERVICE_INSTANCE_NAME = GOBBLIN_SERVICE_PREFIX + "instance.name";
3635

3736
public static final String GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "restliServer.enabled";
3837
public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled";
3938
public static final String GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "gitConfigMonitor.enabled";
40-
public static final String GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "dagManager.enabled";
4139
public static final boolean DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED = false;
4240
public static final String GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "jobStatusMonitor.enabled";
4341
public static final String GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "warmStandby.enabled";

gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public class FlowStatusTest {
5454
class TestJobStatusRetriever extends JobStatusRetriever {
5555

5656
protected TestJobStatusRetriever(MultiContextIssueRepository issueRepository) {
57-
super(ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED, issueRepository);
57+
super(issueRepository);
5858
}
5959

6060
@Override

gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public FlowStatus getFlowStatus(String flowName, String flowGroup, long flowExec
118118
List<JobStatus> jobStatuses = ImmutableList.copyOf(retainStatusOfAnyFlowOrJobMatchingTag(
119119
jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId), tag));
120120
ExecutionStatus flowExecutionStatus =
121-
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getDagManagerEnabled(), jobStatuses.iterator());
121+
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatuses.iterator());
122122
return jobStatuses.iterator().hasNext()
123123
? new FlowStatus(flowName, flowGroup, flowExecutionId, jobStatuses.iterator(), flowExecutionStatus) : null;
124124
}

gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,10 @@
1919

2020
import java.util.Collections;
2121
import java.util.Comparator;
22-
import java.util.HashSet;
2322
import java.util.Iterator;
2423
import java.util.List;
2524
import java.util.Map;
2625
import java.util.Objects;
27-
import java.util.Set;
2826
import java.util.stream.Collectors;
2927

3028
import com.google.common.base.Supplier;
@@ -60,15 +58,12 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker
6058

6159
@Getter
6260
protected final MetricContext metricContext;
63-
@Getter
64-
protected final Boolean dagManagerEnabled;
6561

6662
private final MultiContextIssueRepository issueRepository;
6763

68-
protected JobStatusRetriever(boolean dagManagerEnabled, MultiContextIssueRepository issueRepository) {
64+
protected JobStatusRetriever(MultiContextIssueRepository issueRepository) {
6965
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
7066
this.issueRepository = Objects.requireNonNull(issueRepository);
71-
this.dagManagerEnabled = dagManagerEnabled;
7267
}
7368

7469
public abstract Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup,
@@ -186,7 +181,7 @@ protected List<FlowStatus> asFlowStatuses(List<FlowExecutionJobStateGrouping> fl
186181
Comparator.comparing(this::getJobGroup).thenComparing(this::getJobName).thenComparing(this::getJobExecutionId)
187182
).collect(Collectors.toList())));
188183
return new FlowStatus(exec.getFlowName(), exec.getFlowGroup(), exec.getFlowExecutionId(), jobStatuses.iterator(),
189-
getFlowStatusFromJobStatuses(dagManagerEnabled, jobStatuses.iterator()));
184+
getFlowStatusFromJobStatuses(jobStatuses.iterator()));
190185
}).collect(Collectors.toList());
191186
}
192187

@@ -227,31 +222,15 @@ public static boolean isFlowStatus(org.apache.gobblin.service.monitoring.JobStat
227222
&& jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY) && jobStatus.getJobGroup().equals(JobStatusRetriever.NA_KEY);
228223
}
229224

230-
public static ExecutionStatus getFlowStatusFromJobStatuses(boolean dagManagerEnabled, Iterator<JobStatus> jobStatusIterator) {
225+
public static ExecutionStatus getFlowStatusFromJobStatuses(Iterator<JobStatus> jobStatusIterator) {
231226
ExecutionStatus flowExecutionStatus = ExecutionStatus.$UNKNOWN;
232227

233-
if (dagManagerEnabled) {
234-
while (jobStatusIterator.hasNext()) {
235-
JobStatus jobStatus = jobStatusIterator.next();
236-
// Check if this is the flow status instead of a single job status
237-
if (JobStatusRetriever.isFlowStatus(jobStatus)) {
238-
flowExecutionStatus = ExecutionStatus.valueOf(jobStatus.getEventName());
239-
}
240-
}
241-
} else {
242-
Set<ExecutionStatus> jobStatuses = new HashSet<>();
243-
while (jobStatusIterator.hasNext()) {
244-
JobStatus jobStatus = jobStatusIterator.next();
245-
// because in absence of DagManager we do not get all flow level events, we will ignore the flow level events
246-
// we actually get and purely calculate flow status based on flow statuses.
247-
if (!JobStatusRetriever.isFlowStatus(jobStatus)) {
248-
jobStatuses.add(ExecutionStatus.valueOf(jobStatus.getEventName()));
249-
}
228+
while (jobStatusIterator.hasNext()) {
229+
JobStatus jobStatus = jobStatusIterator.next();
230+
// Check if this is the flow status instead of a single job status
231+
if (JobStatusRetriever.isFlowStatus(jobStatus)) {
232+
flowExecutionStatus = ExecutionStatus.valueOf(jobStatus.getEventName());
250233
}
251-
252-
List<ExecutionStatus> statusesInDescendingSalience = ImmutableList.of(ExecutionStatus.FAILED, ExecutionStatus.CANCELLED,
253-
ExecutionStatus.RUNNING, ExecutionStatus.ORCHESTRATED, ExecutionStatus.COMPLETE);
254-
flowExecutionStatus = statusesInDescendingSalience.stream().filter(jobStatuses::contains).findFirst().orElse(ExecutionStatus.$UNKNOWN);
255234
}
256235

257236
return flowExecutionStatus;

gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ public void testIsFlowRunning() {
7777
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("RUNNING").build();
7878
jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, flowStatus).iterator();
7979
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
80-
when(jobStatusRetriever.getDagManagerEnabled()).thenReturn(true);
8180
Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
8281
}
8382

@@ -109,9 +108,9 @@ public void testGetFlowStatusesAcrossGroup() {
109108

110109
// IMPORTANT: result invariants to honor - ordered by ascending flowName, all of same flowName adjacent, therein descending flowExecutionId
111110
// NOTE: Three copies of FlowStatus are needed for repeated use, due to mutable, non-rewinding `Iterator FlowStatus.getJobStatusIterator`
112-
FlowStatus flowStatus = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2), jobStatusRetriever);
113-
FlowStatus flowStatus2 = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2), jobStatusRetriever);
114-
FlowStatus flowStatus3 = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2), jobStatusRetriever);
111+
FlowStatus flowStatus = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2));
112+
FlowStatus flowStatus2 = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2));
113+
FlowStatus flowStatus3 = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2));
115114
Mockito.when(jobStatusRetriever.getFlowStatusesForFlowGroupExecutions("myFlowGroup", 2))
116115
.thenReturn(Collections.singletonList(flowStatus), Collections.singletonList(flowStatus2), Collections.singletonList(flowStatus3)); // (for three invocations)
117116

@@ -138,9 +137,9 @@ public void testGetFlowStatusesAcrossGroup() {
138137
Arrays.asList(f0jsmDep2)));
139138
}
140139

141-
private FlowStatus createFlowStatus(String flowGroup, String flowName, long flowExecutionId, List<JobStatus> jobStatuses, JobStatusRetriever jobStatusRetriever) {
140+
private FlowStatus createFlowStatus(String flowGroup, String flowName, long flowExecutionId, List<JobStatus> jobStatuses) {
142141
return new FlowStatus(flowName, flowGroup, flowExecutionId, jobStatuses.iterator(),
143-
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getDagManagerEnabled(), jobStatuses.iterator()));
142+
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatuses.iterator()));
144143
}
145144

146145
private JobStatus createFlowJobStatus(String flowGroup, String flowName, long flowExecutionId, ExecutionStatus status) {

gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,6 @@ public class GobblinServiceConfiguration {
4646
@Getter
4747
private final boolean isMultiActiveSchedulerEnabled;
4848

49-
@Getter
50-
private final boolean isTopologyCatalogEnabled;
51-
5249
@Getter
5350
private final boolean isFlowCatalogEnabled;
5451

@@ -64,9 +61,6 @@ public class GobblinServiceConfiguration {
6461
@Getter
6562
private final boolean isGitConfigMonitorEnabled;
6663

67-
@Getter
68-
private final boolean isDagManagerEnabled;
69-
7064
@Getter
7165
private final boolean isJobStatusMonitorEnabled;
7266

@@ -93,8 +87,6 @@ public GobblinServiceConfiguration(String serviceName, String serviceId, Config
9387
this.innerConfig = Objects.requireNonNull(config, "Config cannot be null");
9488
this.serviceWorkDir = serviceWorkDir;
9589

96-
isTopologyCatalogEnabled =
97-
ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY, true);
9890
isFlowCatalogEnabled =
9991
ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY, true);
10092

@@ -113,8 +105,6 @@ public GobblinServiceConfiguration(String serviceName, String serviceId, Config
113105
this.isMultiActiveSchedulerEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_MULTI_ACTIVE_SCHEDULER_ENABLED_KEY, false);
114106

115107
this.isHelixManagerEnabled = config.hasPath(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY);
116-
this.isDagManagerEnabled =
117-
ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED);
118108
this.isJobStatusMonitorEnabled =
119109
ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, true);
120110
this.isSchedulerEnabled =

gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,6 @@
1919

2020
import java.util.Objects;
2121

22-
import org.apache.gobblin.runtime.api.DagActionStore;
23-
import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
24-
import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
25-
import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
26-
import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
27-
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
28-
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
29-
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby;
30-
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
31-
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
32-
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitorFactory;
33-
import org.apache.gobblin.service.monitoring.GitConfigMonitor;
3422
import org.apache.helix.HelixManager;
3523
import org.slf4j.Logger;
3624
import org.slf4j.LoggerFactory;
@@ -49,13 +37,18 @@
4937
import javax.inject.Singleton;
5038

5139
import org.apache.gobblin.restli.EmbeddedRestliServer;
40+
import org.apache.gobblin.runtime.api.DagActionStore;
5241
import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
42+
import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
43+
import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
44+
import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
5345
import org.apache.gobblin.runtime.instance.StandardGobblinInstanceLauncher;
5446
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
5547
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
5648
import org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueRepository;
5749
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
5850
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
51+
import org.apache.gobblin.runtime.util.InjectionNames;
5952
import org.apache.gobblin.scheduler.SchedulerService;
6053
import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
6154
import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler;
@@ -75,17 +68,24 @@
7568
import org.apache.gobblin.service.modules.db.ServiceDatabaseProvider;
7669
import org.apache.gobblin.service.modules.db.ServiceDatabaseProviderImpl;
7770
import org.apache.gobblin.service.modules.orchestration.DagManager;
71+
import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
7872
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
73+
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
7974
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
8075
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandler;
76+
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
8177
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandler;
78+
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby;
8279
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
8380
import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
8481
import org.apache.gobblin.service.modules.troubleshooter.MySqlMultiContextIssueRepository;
8582
import org.apache.gobblin.service.modules.utils.HelixUtils;
86-
import org.apache.gobblin.runtime.util.InjectionNames;
83+
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
84+
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
85+
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitorFactory;
8786
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
8887
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
88+
import org.apache.gobblin.service.monitoring.GitConfigMonitor;
8989
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
9090
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
9191
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
@@ -190,20 +190,15 @@ public void configure(Binder binder) {
190190
binder.bind(SharedFlowMetricsSingleton.class);
191191

192192
OptionalBinder.newOptionalBinder(binder, TopologyCatalog.class);
193-
if (serviceConfig.isTopologyCatalogEnabled()) {
194-
binder.bind(TopologyCatalog.class);
195-
}
193+
binder.bind(TopologyCatalog.class);
196194

197195
if (serviceConfig.isTopologySpecFactoryEnabled()) {
198196
binder.bind(TopologySpecFactory.class)
199197
.to(getClassByNameOrAlias(TopologySpecFactory.class, serviceConfig.getInnerConfig(),
200198
ServiceConfigKeys.TOPOLOGYSPEC_FACTORY_KEY, ServiceConfigKeys.DEFAULT_TOPOLOGY_SPEC_FACTORY));
201199
}
202200

203-
OptionalBinder.newOptionalBinder(binder, DagManager.class);
204-
if (serviceConfig.isDagManagerEnabled()) {
205-
binder.bind(DagManager.class);
206-
}
201+
binder.bind(DagManager.class);
207202

208203
OptionalBinder.newOptionalBinder(binder, HelixManager.class);
209204
if (serviceConfig.isHelixManagerEnabled()) {

gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@
3131
import org.apache.commons.cli.Options;
3232
import org.apache.commons.cli.ParseException;
3333
import org.apache.commons.lang3.ObjectUtils;
34-
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
35-
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
36-
import org.apache.gobblin.service.monitoring.GitConfigMonitor;
3734
import org.apache.hadoop.conf.Configuration;
3835
import org.apache.hadoop.fs.FileSystem;
3936
import org.apache.hadoop.fs.Path;
@@ -97,9 +94,12 @@
9794
import org.apache.gobblin.service.modules.db.ServiceDatabaseManager;
9895
import org.apache.gobblin.service.modules.orchestration.DagManager;
9996
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
97+
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
10098
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
10199
import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
100+
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
102101
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
102+
import org.apache.gobblin.service.monitoring.GitConfigMonitor;
103103
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
104104
import org.apache.gobblin.service.monitoring.SpecStoreChangeMonitor;
105105
import org.apache.gobblin.util.ConfigUtils;
@@ -317,12 +317,10 @@ private void handleLeadershipChange(NotificationContext changeContext) {
317317

318318
// TODO: surround by try/catch to disconnect from Helix and fail the leader transition if DagManager is not
319319
// transitioned properly
320-
if (configuration.isDagManagerEnabled()) {
321-
//Activate DagManager only if TopologyCatalog is initialized. If not; skip activation.
322-
if (this.topologyCatalog.getInitComplete().getCount() == 0) {
323-
this.dagManager.setActive(true);
324-
this.eventBus.register(this.dagManager);
325-
}
320+
//Activate DagManager only if TopologyCatalog is initialized. If not; skip activation.
321+
if (this.topologyCatalog.getInitComplete().getCount() == 0) {
322+
this.dagManager.setActive(true);
323+
this.eventBus.register(this.dagManager);
326324
}
327325

328326
if (configuration.isOnlyAnnounceLeader()) {
@@ -346,10 +344,8 @@ private void handleLeadershipChange(NotificationContext changeContext) {
346344
this.gitConfigMonitor.setActive(false);
347345
}
348346

349-
if (configuration.isDagManagerEnabled()) {
350-
this.dagManager.setActive(false);
351-
this.eventBus.unregister(this.dagManager);
352-
}
347+
this.dagManager.setActive(false);
348+
this.eventBus.unregister(this.dagManager);
353349

354350
if (configuration.isOnlyAnnounceLeader()) {
355351
this.d2Announcer.markDownServer();
@@ -359,9 +355,7 @@ private void handleLeadershipChange(NotificationContext changeContext) {
359355
}
360356

361357
private void registerServicesInLauncher(){
362-
if (configuration.isTopologyCatalogEnabled()) {
363-
this.serviceLauncher.addService(topologyCatalog);
364-
}
358+
this.serviceLauncher.addService(topologyCatalog);
365359

366360
if (configuration.isFlowCatalogEnabled()) {
367361
this.serviceLauncher.addService(flowCatalog);
@@ -371,9 +365,7 @@ private void registerServicesInLauncher(){
371365
}
372366
}
373367

374-
if (configuration.isDagManagerEnabled()) {
375-
this.serviceLauncher.addService(dagManager);
376-
}
368+
this.serviceLauncher.addService(dagManager);
377369

378370
this.serviceLauncher.addService(databaseManager);
379371
this.serviceLauncher.addService(issueRepository);
@@ -526,10 +518,8 @@ public void start() throws ApplicationException {
526518

527519
//Activate the DagManager service, after the topologyCatalog has been initialized.
528520
if (!this.helixManager.isPresent() || this.helixManager.get().isLeader()){
529-
if (configuration.isDagManagerEnabled()) {
530-
this.dagManager.setActive(true);
531-
this.eventBus.register(this.dagManager);
532-
}
521+
this.dagManager.setActive(true);
522+
this.eventBus.register(this.dagManager);
533523
}
534524
}
535525

0 commit comments

Comments
 (0)