Skip to content

Commit 6d61f02

Browse files
committed
initialize topology specs directly without waitging for listener callbacks
1 parent 4de95d6 commit 6d61f02

File tree

7 files changed

+52
-86
lines changed

7 files changed

+52
-86
lines changed

gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java

Lines changed: 17 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,13 @@
1919

2020
import java.io.IOException;
2121
import java.net.URI;
22+
import java.util.Collection;
2223
import java.util.Collections;
2324
import java.util.List;
2425
import java.util.Map;
2526
import java.util.Properties;
2627

2728
import org.apache.commons.lang3.StringUtils;
28-
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
29-
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
30-
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
3129
import org.quartz.CronExpression;
3230
import org.slf4j.Logger;
3331
import org.slf4j.LoggerFactory;
@@ -48,6 +46,7 @@
4846
import org.apache.gobblin.configuration.State;
4947
import org.apache.gobblin.instrumented.Instrumented;
5048
import org.apache.gobblin.metrics.MetricContext;
49+
import org.apache.gobblin.metrics.ServiceMetricNames;
5150
import org.apache.gobblin.metrics.Tag;
5251
import org.apache.gobblin.runtime.api.FlowSpec;
5352
import org.apache.gobblin.runtime.api.JobSpec;
@@ -58,12 +57,14 @@
5857
import org.apache.gobblin.runtime.job_catalog.FSJobCatalog;
5958
import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
6059
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
60+
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
6161
import org.apache.gobblin.service.ServiceConfigKeys;
62-
import org.apache.gobblin.metrics.ServiceMetricNames;
6362
import org.apache.gobblin.service.modules.flowgraph.Dag;
63+
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
6464
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
6565
import org.apache.gobblin.util.ConfigUtils;
6666
import org.apache.gobblin.util.PropertiesUtils;
67+
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
6768

6869

6970
// Provide base implementation for constructing multi-hops route.
@@ -73,8 +74,7 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
7374
// Since {@link SpecCompiler} is an {@link SpecCatalogListener}, it is expected that any Spec change should be reflected
7475
// to these data structures.
7576
@Getter
76-
@Setter
77-
protected final Map<URI, TopologySpec> topologySpecMap;
77+
protected final Map<URI, TopologySpec> topologySpecMap = Maps.newConcurrentMap();
7878

7979
protected final Config config;
8080
protected final Logger log;
@@ -97,35 +97,13 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
9797

9898
private Optional<UserQuotaManager> userQuotaManager;
9999

100-
public BaseFlowToJobSpecCompiler(Config config){
101-
this(config,true);
102-
}
103-
104-
public BaseFlowToJobSpecCompiler(Config config, boolean instrumentationEnabled){
105-
this(config, Optional.<Logger>absent(), true);
106-
}
107-
108-
public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log){
109-
this(config, log,true);
110-
}
111-
112-
public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled){
113-
this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
114-
if (instrumentationEnabled) {
115-
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), IdentityFlowToJobSpecCompiler.class);
116-
this.flowCompilationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_SUCCESSFUL_METER));
117-
this.flowCompilationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_FAILED_METER));
118-
this.flowCompilationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.FLOW_COMPILATION_TIMER));
119-
this.dataAuthorizationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.DATA_AUTHORIZATION_TIMER));
120-
}
121-
else {
122-
this.metricContext = null;
123-
this.flowCompilationSuccessFulMeter = Optional.absent();
124-
this.flowCompilationFailedMeter = Optional.absent();
125-
this.flowCompilationTimer = Optional.absent();
126-
this.dataAuthorizationTimer = Optional.absent();
127-
}
128-
100+
public BaseFlowToJobSpecCompiler(Config config, Collection<TopologySpec> topologySpecSet){
101+
this.log = LoggerFactory.getLogger(getClass());
102+
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), IdentityFlowToJobSpecCompiler.class);
103+
this.flowCompilationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_SUCCESSFUL_METER));
104+
this.flowCompilationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_FAILED_METER));
105+
this.flowCompilationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.FLOW_COMPILATION_TIMER));
106+
this.dataAuthorizationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.DATA_AUTHORIZATION_TIMER));
129107
this.warmStandbyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY, false);
130108
if (this.warmStandbyEnabled) {
131109
userQuotaManager = Optional.of(GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
@@ -134,10 +112,12 @@ public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean in
134112
userQuotaManager = Optional.absent();
135113
}
136114

137-
this.topologySpecMap = Maps.newConcurrentMap();
115+
topologySpecSet.forEach(this::onAddTopologySpec);
116+
117+
138118
this.config = config;
139119

140-
/***
120+
/*
141121
* ETL-5996
142122
* For multi-tenancy, the following needs to be added:
143123
* 1. Change singular templateCatalog to Map<URI, JobCatalogWithTemplates> to support multiple templateCatalogs
@@ -219,8 +199,6 @@ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
219199
public AddSpecResponse onAddSpec(Spec addedSpec) {
220200
if (addedSpec instanceof FlowSpec) {
221201
return onAddFlowSpec((FlowSpec) addedSpec);
222-
} else if (addedSpec instanceof TopologySpec) {
223-
return onAddTopologySpec( (TopologySpec) addedSpec);
224202
}
225203
return new AddSpecResponse(null);
226204
}

gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,12 @@
1818
package org.apache.gobblin.service.modules.flow;
1919

2020
import java.util.ArrayList;
21+
import java.util.Collection;
2122
import java.util.List;
2223
import java.util.Map;
2324
import java.util.concurrent.ExecutionException;
2425
import java.util.concurrent.TimeUnit;
2526

26-
import org.slf4j.Logger;
27-
28-
import com.google.common.base.Optional;
2927
import com.google.common.base.Preconditions;
3028
import com.typesafe.config.Config;
3129

@@ -50,20 +48,8 @@
5048
@Alpha
5149
public class IdentityFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
5250

53-
public IdentityFlowToJobSpecCompiler(Config config) {
54-
super(config, true);
55-
}
56-
57-
public IdentityFlowToJobSpecCompiler(Config config, boolean instrumentationEnabled) {
58-
super(config, Optional.<Logger>absent(), instrumentationEnabled);
59-
}
60-
61-
public IdentityFlowToJobSpecCompiler(Config config, Optional<Logger> log) {
62-
super(config, log, true);
63-
}
64-
65-
public IdentityFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
66-
super(config, log, instrumentationEnabled);
51+
public IdentityFlowToJobSpecCompiler(Config config, Collection<TopologySpec> topologySpecSet) {
52+
super(config, topologySpecSet);
6753
}
6854

6955
@Override

gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MockedSpecCompiler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.gobblin.service.modules.flow;
1919

2020
import java.util.ArrayList;
21+
import java.util.Collection;
2122
import java.util.List;
2223
import java.util.Properties;
2324

@@ -28,6 +29,7 @@
2829
import org.apache.gobblin.runtime.api.FlowSpec;
2930
import org.apache.gobblin.runtime.api.JobSpec;
3031
import org.apache.gobblin.runtime.api.Spec;
32+
import org.apache.gobblin.runtime.api.TopologySpec;
3133
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
3234
import org.apache.gobblin.service.modules.flowgraph.Dag;
3335
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -45,8 +47,8 @@ public class MockedSpecCompiler extends IdentityFlowToJobSpecCompiler {
4547
private static final int NUMBER_OF_JOBS = 3;
4648
public static final String UNCOMPILABLE_FLOW = "uncompilableFlow";
4749

48-
public MockedSpecCompiler(Config config) {
49-
super(config);
50+
public MockedSpecCompiler(Config config, Collection<TopologySpec> topologySpecSet) {
51+
super(config, topologySpecSet);
5052
}
5153

5254
@Override

gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import java.lang.reflect.InvocationTargetException;
2121
import java.net.URISyntaxException;
2222
import java.util.ArrayList;
23+
import java.util.Collection;
24+
import java.util.Collections;
2325
import java.util.HashMap;
2426
import java.util.List;
2527
import java.util.Map;
@@ -34,7 +36,6 @@
3436
import org.apache.commons.lang3.StringUtils;
3537
import org.apache.commons.lang3.reflect.ConstructorUtils;
3638
import org.apache.hadoop.fs.Path;
37-
import org.slf4j.Logger;
3839

3940
import com.google.common.base.Joiner;
4041
import com.google.common.base.Optional;
@@ -57,6 +58,7 @@
5758
import org.apache.gobblin.runtime.api.Spec;
5859
import org.apache.gobblin.runtime.api.SpecExecutor;
5960
import org.apache.gobblin.runtime.api.SpecNotFoundException;
61+
import org.apache.gobblin.runtime.api.TopologySpec;
6062
import org.apache.gobblin.service.ServiceConfigKeys;
6163
import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
6264
import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -98,26 +100,14 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
98100
// a map to hold aliases of data nodes, e.g. gobblin.service.datanode.aliases.map=node1-dev:node1,node1-stg:node1,node1-prod:node1
99101
public static final String DATA_NODE_ID_TO_ALIAS_MAP = ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "datanode.aliases.map";
100102

101-
public MultiHopFlowCompiler(Config config) {
102-
this(config, true);
103-
}
104-
105-
public MultiHopFlowCompiler(Config config, boolean instrumentationEnabled) {
106-
this(config, Optional.<Logger>absent(), instrumentationEnabled);
107-
}
108-
109-
public MultiHopFlowCompiler(Config config, Optional<Logger> log) {
110-
this(config, log, true);
111-
}
112-
113103
public MultiHopFlowCompiler(Config config, AtomicReference<FlowGraph> flowGraph) {
114-
super(config, Optional.absent(), true);
104+
super(config, Collections.EMPTY_SET);
115105
this.flowGraph = flowGraph;
116106
this.dataMovementAuthorizer = new NoopDataMovementAuthorizer(config);
117107
}
118108

119-
public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
120-
super(config, log, instrumentationEnabled);
109+
public MultiHopFlowCompiler(Config config, Collection<TopologySpec> topologySpecSet) {
110+
super(config, topologySpecSet);
121111
try {
122112
this.dataNodeAliasMap = config.hasPath(DATA_NODE_ID_TO_ALIAS_MAP)
123113
? Splitter.on(",").withKeyValueSeparator(":").split(config.getString(DATA_NODE_ID_TO_ALIAS_MAP))
@@ -161,7 +151,7 @@ public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrum
161151
try {
162152
String flowGraphMonitorClassName = ConfigUtils.getString(this.config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOWGRAPH_CLASS_KEY, GitFlowGraphMonitor.class.getCanonicalName());
163153
this.flowGraphMonitor = (FlowGraphMonitor) ConstructorUtils.invokeConstructor(Class.forName(new ClassAliasResolver<>(FlowGraphMonitor.class).resolve(
164-
flowGraphMonitorClassName)), gitFlowGraphConfig, flowTemplateCatalog, this, this.topologySpecMap, this.getInitComplete(), instrumentationEnabled);
154+
flowGraphMonitorClassName)), gitFlowGraphConfig, flowTemplateCatalog, this, this.topologySpecMap, this.getInitComplete(), true);
165155
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException | ClassNotFoundException e) {
166156
throw new RuntimeException(e);
167157
}

gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
4444
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
4545
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
46+
import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
4647
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
4748
import org.apache.gobblin.util.ClassAliasResolver;
4849
import org.apache.gobblin.util.ConfigUtils;
@@ -73,12 +74,12 @@ public class FlowCompilationValidationHelper {
7374

7475
@Inject
7576
public FlowCompilationValidationHelper(Config config, SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
76-
UserQuotaManager userQuotaManager, FlowStatusGenerator flowStatusGenerator) {
77+
UserQuotaManager userQuotaManager, FlowStatusGenerator flowStatusGenerator, TopologySpecFactory topologySpecFactory) {
7778
try {
7879
String specCompilerClassName = ConfigUtils.getString(config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY,
7980
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS);
8081
this.specCompiler = (SpecCompiler) ConstructorUtils.invokeConstructor(Class.forName(
81-
new ClassAliasResolver<>(SpecCompiler.class).resolve(specCompilerClassName)), config);
82+
new ClassAliasResolver<>(SpecCompiler.class).resolve(specCompilerClassName)), config, topologySpecFactory.getTopologies());
8283
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException |
8384
ClassNotFoundException e) {
8485
throw new RuntimeException(e);

gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.net.URI;
2121
import java.net.URISyntaxException;
2222
import java.util.ArrayList;
23+
import java.util.Collections;
2324
import java.util.List;
2425
import java.util.Properties;
2526

@@ -86,13 +87,14 @@ public void setup() throws Exception {
8687
// Initialize compiler with template catalog
8788
Properties compilerWithTemplateCatalogProperties = new Properties();
8889
compilerWithTemplateCatalogProperties.setProperty(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, TEST_TEMPLATE_CATALOG_URI);
89-
this.compilerWithTemplateCalague = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(compilerWithTemplateCatalogProperties));
90+
this.compilerWithTemplateCalague = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(compilerWithTemplateCatalogProperties),
91+
Collections.EMPTY_SET);
9092

9193
// Add a topology to compiler
9294
this.compilerWithTemplateCalague.onAddSpec(initTopologySpec());
9395

9496
// Initialize compiler without template catalog
95-
this.compilerWithoutTemplateCalague = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(new Properties()));
97+
this.compilerWithoutTemplateCalague = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(new Properties()), Collections.EMPTY_SET);
9698

9799
// Add a topology to compiler
98100
this.compilerWithoutTemplateCalague.onAddSpec(initTopologySpec());

gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.net.URI;
2222
import java.net.URISyntaxException;
2323
import java.util.Collection;
24+
import java.util.Collections;
2425
import java.util.List;
2526
import java.util.Properties;
2627

@@ -53,6 +54,7 @@
5354
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
5455
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
5556
import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
57+
import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
5658
import org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
5759
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
5860
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
@@ -61,6 +63,7 @@
6163

6264
import static org.mockito.ArgumentMatchers.anyMap;
6365
import static org.mockito.Mockito.doNothing;
66+
import static org.mockito.Mockito.doReturn;
6467
import static org.mockito.Mockito.mock;
6568

6669

@@ -92,6 +95,8 @@ public void setup() throws Exception {
9295
cleanUpDir(FLOW_SPEC_STORE_DIR);
9396

9497
Properties orchestratorProperties = new Properties();
98+
// Create Spec to play with
99+
this.topologySpec = initTopologySpec();
95100

96101
Properties topologyProperties = new Properties();
97102
topologyProperties.put("specStore.fs.dir", TOPOLOGY_SPEC_STORE_DIR);
@@ -114,6 +119,8 @@ public void setup() throws Exception {
114119
FlowLaunchHandler mockFlowTriggerHandler = mock(FlowLaunchHandler.class);
115120
DagManager mockDagManager = mock(DagManager.class);
116121
doNothing().when(mockDagManager).setTopologySpecMap(anyMap());
122+
TopologySpecFactory mockedTopologySpecFactory = mock(TopologySpecFactory.class);
123+
doReturn(Collections.singleton(this.topologySpec)).when(mockedTopologySpecFactory).getTopologies();
117124
Config config = ConfigBuilder.create()
118125
.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
119126
MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
@@ -130,13 +137,12 @@ public void setup() throws Exception {
130137
this.orchestrator = new Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
131138
this.topologyCatalog, mockDagManager, Optional.of(logger), mockStatusGenerator,
132139
Optional.of(mockFlowTriggerHandler), sharedFlowMetricsSingleton, Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
133-
new FlowCompilationValidationHelper(config, sharedFlowMetricsSingleton, mock(UserQuotaManager.class), mockStatusGenerator));
140+
new FlowCompilationValidationHelper(config, sharedFlowMetricsSingleton, mock(UserQuotaManager.class), mockStatusGenerator,
141+
mockedTopologySpecFactory));
134142
this.topologyCatalog.addListener(orchestrator);
135143
this.flowCatalog.addListener(orchestrator);
136144
// Start application
137145
this.serviceLauncher.start();
138-
// Create Spec to play with
139-
this.topologySpec = initTopologySpec();
140146
this.flowSpec = initFlowSpec();
141147
}
142148

@@ -244,9 +250,10 @@ public void createTopologySpec() {
244250
}
245251
// Make sure TopologyCatalog is empty
246252
Assert.assertTrue(specs.size() == 0, "Spec store should be empty before addition");
247-
// Make sure TopologyCatalog Listener is empty
248-
Assert.assertTrue(specCompiler.getTopologySpecMap().size() == 0, "SpecCompiler should not know about any Topology "
249-
+ "before addition");
253+
Assert.assertTrue(specCompiler.getTopologySpecMap().size() == 1, "SpecCompiler should know about any Topology "
254+
+ " irrespective of what is there in the topology catalog");
255+
// Make sure TopologyCatalog empty
256+
Assert.assertTrue(this.topologyCatalog.getSize() == 0, "Topology catalog should contain 0 Spec before addition");
250257

251258
// Create and add Spec
252259
this.topologyCatalog.put(topologySpec);
@@ -262,7 +269,7 @@ public void createTopologySpec() {
262269
// Make sure TopologyCatalog has the added Topology
263270
Assert.assertTrue(specs.size() == 1, "Spec store should contain 1 Spec after addition");
264271
// Make sure TopologyCatalog Listener knows about added Topology
265-
Assert.assertTrue(specCompiler.getTopologySpecMap().size() == 1, "SpecCompiler should contain 1 Spec after addition");
272+
Assert.assertTrue(this.topologyCatalog.getSize() == 1, "Topology catalog should contain 1 Spec after addition");
266273
}
267274

268275
@Test (dependsOnMethods = "createTopologySpec")

0 commit comments

Comments
 (0)