From 5813b3f320cb0017bfb533c7335c798066b38656 Mon Sep 17 00:00:00 2001 From: Arjun Singh Bora Date: Mon, 29 Apr 2024 15:02:43 +0530 Subject: [PATCH 1/2] initialize topology specs directly without waitging for listener callbacks --- .../flow/BaseFlowToJobSpecCompiler.java | 56 ++++++------------- .../flow/IdentityFlowToJobSpecCompiler.java | 20 +------ .../modules/flow/MockedSpecCompiler.java | 6 +- .../modules/flow/MultiHopFlowCompiler.java | 24 +++----- .../FlowCompilationValidationHelper.java | 5 +- .../IdentityFlowToJobSpecCompilerTest.java | 12 ++-- .../orchestration/OrchestratorTest.java | 21 ++++--- 7 files changed, 52 insertions(+), 92 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java index 528b4d387ab..63c2a5ffc5b 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java @@ -19,15 +19,13 @@ import java.io.IOException; import java.net.URI; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.commons.lang3.StringUtils; -import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; -import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; -import org.apache.gobblin.util.reflection.GobblinConstructorUtils; import org.quartz.CronExpression; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +46,7 @@ import org.apache.gobblin.configuration.State; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.ServiceMetricNames; import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.JobSpec; @@ -58,12 +57,14 @@ import org.apache.gobblin.runtime.job_catalog.FSJobCatalog; import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec; import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse; +import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; import org.apache.gobblin.service.ServiceConfigKeys; -import org.apache.gobblin.metrics.ServiceMetricNames; import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PropertiesUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; // Provide base implementation for constructing multi-hops route. @@ -73,8 +74,7 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler { // Since {@link SpecCompiler} is an {@link SpecCatalogListener}, it is expected that any Spec change should be reflected // to these data structures. @Getter - @Setter - protected final Map topologySpecMap; + protected final Map topologySpecMap = Maps.newConcurrentMap(); protected final Config config; protected final Logger log; @@ -97,35 +97,13 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler { private Optional userQuotaManager; - public BaseFlowToJobSpecCompiler(Config config){ - this(config,true); - } - - public BaseFlowToJobSpecCompiler(Config config, boolean instrumentationEnabled){ - this(config, Optional.absent(), true); - } - - public BaseFlowToJobSpecCompiler(Config config, Optional log){ - this(config, log,true); - } - - public BaseFlowToJobSpecCompiler(Config config, Optional log, boolean instrumentationEnabled){ - this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); - if (instrumentationEnabled) { - this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), IdentityFlowToJobSpecCompiler.class); - this.flowCompilationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_SUCCESSFUL_METER)); - this.flowCompilationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_FAILED_METER)); - this.flowCompilationTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_COMPILATION_TIMER)); - this.dataAuthorizationTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.DATA_AUTHORIZATION_TIMER)); - } - else { - this.metricContext = null; - this.flowCompilationSuccessFulMeter = Optional.absent(); - this.flowCompilationFailedMeter = Optional.absent(); - this.flowCompilationTimer = Optional.absent(); - this.dataAuthorizationTimer = Optional.absent(); - } - + public BaseFlowToJobSpecCompiler(Config config, Collection topologySpecSet){ + this.log = LoggerFactory.getLogger(getClass()); + this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), IdentityFlowToJobSpecCompiler.class); + this.flowCompilationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_SUCCESSFUL_METER)); + this.flowCompilationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_FAILED_METER)); + this.flowCompilationTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_COMPILATION_TIMER)); + this.dataAuthorizationTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.DATA_AUTHORIZATION_TIMER)); this.warmStandbyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY, false); if (this.warmStandbyEnabled) { userQuotaManager = Optional.of(GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class, @@ -134,10 +112,12 @@ public BaseFlowToJobSpecCompiler(Config config, Optional log, boolean in userQuotaManager = Optional.absent(); } - this.topologySpecMap = Maps.newConcurrentMap(); + topologySpecSet.forEach(this::onAddTopologySpec); + + this.config = config; - /*** + /* * ETL-5996 * For multi-tenancy, the following needs to be added: * 1. Change singular templateCatalog to Map to support multiple templateCatalogs @@ -219,8 +199,6 @@ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) { public AddSpecResponse onAddSpec(Spec addedSpec) { if (addedSpec instanceof FlowSpec) { return onAddFlowSpec((FlowSpec) addedSpec); - } else if (addedSpec instanceof TopologySpec) { - return onAddTopologySpec( (TopologySpec) addedSpec); } return new AddSpecResponse(null); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java index d3b25f54e17..8ad2d0f3205 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java @@ -18,14 +18,12 @@ package org.apache.gobblin.service.modules.flow; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; - -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.typesafe.config.Config; @@ -50,20 +48,8 @@ @Alpha public class IdentityFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler { - public IdentityFlowToJobSpecCompiler(Config config) { - super(config, true); - } - - public IdentityFlowToJobSpecCompiler(Config config, boolean instrumentationEnabled) { - super(config, Optional.absent(), instrumentationEnabled); - } - - public IdentityFlowToJobSpecCompiler(Config config, Optional log) { - super(config, log, true); - } - - public IdentityFlowToJobSpecCompiler(Config config, Optional log, boolean instrumentationEnabled) { - super(config, log, instrumentationEnabled); + public IdentityFlowToJobSpecCompiler(Config config, Collection topologySpecSet) { + super(config, topologySpecSet); } @Override diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MockedSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MockedSpecCompiler.java index 09e350462e4..378590771b0 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MockedSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MockedSpecCompiler.java @@ -18,6 +18,7 @@ package org.apache.gobblin.service.modules.flow; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Properties; @@ -28,6 +29,7 @@ import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; @@ -45,8 +47,8 @@ public class MockedSpecCompiler extends IdentityFlowToJobSpecCompiler { private static final int NUMBER_OF_JOBS = 3; public static final String UNCOMPILABLE_FLOW = "uncompilableFlow"; - public MockedSpecCompiler(Config config) { - super(config); + public MockedSpecCompiler(Config config, Collection topologySpecSet) { + super(config, topologySpecSet); } @Override diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java index cd2de788ba3..645f47cf4f7 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java @@ -20,6 +20,8 @@ import java.lang.reflect.InvocationTargetException; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -34,7 +36,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.reflect.ConstructorUtils; import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; import com.google.common.base.Joiner; import com.google.common.base.Optional; @@ -57,6 +58,7 @@ import org.apache.gobblin.runtime.api.Spec; import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.runtime.api.SpecNotFoundException; +import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph; import org.apache.gobblin.service.modules.flowgraph.Dag; @@ -98,26 +100,14 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler { // a map to hold aliases of data nodes, e.g. gobblin.service.datanode.aliases.map=node1-dev:node1,node1-stg:node1,node1-prod:node1 public static final String DATA_NODE_ID_TO_ALIAS_MAP = ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "datanode.aliases.map"; - public MultiHopFlowCompiler(Config config) { - this(config, true); - } - - public MultiHopFlowCompiler(Config config, boolean instrumentationEnabled) { - this(config, Optional.absent(), instrumentationEnabled); - } - - public MultiHopFlowCompiler(Config config, Optional log) { - this(config, log, true); - } - public MultiHopFlowCompiler(Config config, AtomicReference flowGraph) { - super(config, Optional.absent(), true); + super(config, Collections.EMPTY_SET); this.flowGraph = flowGraph; this.dataMovementAuthorizer = new NoopDataMovementAuthorizer(config); } - public MultiHopFlowCompiler(Config config, Optional log, boolean instrumentationEnabled) { - super(config, log, instrumentationEnabled); + public MultiHopFlowCompiler(Config config, Collection topologySpecSet) { + super(config, topologySpecSet); try { this.dataNodeAliasMap = config.hasPath(DATA_NODE_ID_TO_ALIAS_MAP) ? Splitter.on(",").withKeyValueSeparator(":").split(config.getString(DATA_NODE_ID_TO_ALIAS_MAP)) @@ -161,7 +151,7 @@ public MultiHopFlowCompiler(Config config, Optional log, boolean instrum try { String flowGraphMonitorClassName = ConfigUtils.getString(this.config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOWGRAPH_CLASS_KEY, GitFlowGraphMonitor.class.getCanonicalName()); this.flowGraphMonitor = (FlowGraphMonitor) ConstructorUtils.invokeConstructor(Class.forName(new ClassAliasResolver<>(FlowGraphMonitor.class).resolve( - flowGraphMonitorClassName)), gitFlowGraphConfig, flowTemplateCatalog, this, this.topologySpecMap, this.getInitComplete(), instrumentationEnabled); + flowGraphMonitorClassName)), gitFlowGraphConfig, flowTemplateCatalog, this, this.topologySpecMap, this.getInitComplete(), true); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException | ClassNotFoundException e) { throw new RuntimeException(e); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java index 301e3e436ae..881bc3fbe32 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java @@ -43,6 +43,7 @@ import org.apache.gobblin.service.modules.orchestration.TimingEventUtils; import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.service.modules.topology.TopologySpecFactory; import org.apache.gobblin.service.monitoring.FlowStatusGenerator; import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; @@ -73,12 +74,12 @@ public class FlowCompilationValidationHelper { @Inject public FlowCompilationValidationHelper(Config config, SharedFlowMetricsSingleton sharedFlowMetricsSingleton, - UserQuotaManager userQuotaManager, FlowStatusGenerator flowStatusGenerator) { + UserQuotaManager userQuotaManager, FlowStatusGenerator flowStatusGenerator, TopologySpecFactory topologySpecFactory) { try { String specCompilerClassName = ConfigUtils.getString(config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY, ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS); this.specCompiler = (SpecCompiler) ConstructorUtils.invokeConstructor(Class.forName( - new ClassAliasResolver<>(SpecCompiler.class).resolve(specCompilerClassName)), config); + new ClassAliasResolver<>(SpecCompiler.class).resolve(specCompilerClassName)), config, topologySpecFactory.getTopologies()); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException | ClassNotFoundException e) { throw new RuntimeException(e); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java index d4f2bc2baa6..e6a10f64887 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java @@ -20,6 +20,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Properties; @@ -86,16 +87,11 @@ public void setup() throws Exception { // Initialize compiler with template catalog Properties compilerWithTemplateCatalogProperties = new Properties(); compilerWithTemplateCatalogProperties.setProperty(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, TEST_TEMPLATE_CATALOG_URI); - this.compilerWithTemplateCalague = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(compilerWithTemplateCatalogProperties)); - - // Add a topology to compiler - this.compilerWithTemplateCalague.onAddSpec(initTopologySpec()); + this.compilerWithTemplateCalague = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(compilerWithTemplateCatalogProperties), + Collections.singleton(initTopologySpec())); // Initialize compiler without template catalog - this.compilerWithoutTemplateCalague = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(new Properties())); - - // Add a topology to compiler - this.compilerWithoutTemplateCalague.onAddSpec(initTopologySpec()); + this.compilerWithoutTemplateCalague = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(new Properties()), Collections.singleton(initTopologySpec())); } private void setupDir(String dir) throws Exception { diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java index a98fc8cb7af..26c6e545fbf 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java @@ -21,6 +21,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Properties; @@ -53,6 +54,7 @@ import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog; import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler; +import org.apache.gobblin.service.modules.topology.TopologySpecFactory; import org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper; import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton; import org.apache.gobblin.service.monitoring.FlowStatusGenerator; @@ -61,6 +63,7 @@ import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -92,6 +95,8 @@ public void setup() throws Exception { cleanUpDir(FLOW_SPEC_STORE_DIR); Properties orchestratorProperties = new Properties(); + // Create Spec to play with + this.topologySpec = initTopologySpec(); Properties topologyProperties = new Properties(); topologyProperties.put("specStore.fs.dir", TOPOLOGY_SPEC_STORE_DIR); @@ -114,6 +119,8 @@ public void setup() throws Exception { FlowLaunchHandler mockFlowTriggerHandler = mock(FlowLaunchHandler.class); DagManager mockDagManager = mock(DagManager.class); doNothing().when(mockDagManager).setTopologySpecMap(anyMap()); + TopologySpecFactory mockedTopologySpecFactory = mock(TopologySpecFactory.class); + doReturn(Collections.singleton(this.topologySpec)).when(mockedTopologySpecFactory).getTopologies(); Config config = ConfigBuilder.create() .addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY, MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName()) @@ -130,13 +137,12 @@ public void setup() throws Exception { this.orchestrator = new Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties), this.topologyCatalog, mockDagManager, Optional.of(logger), mockStatusGenerator, Optional.of(mockFlowTriggerHandler), sharedFlowMetricsSingleton, Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore), - new FlowCompilationValidationHelper(config, sharedFlowMetricsSingleton, mock(UserQuotaManager.class), mockStatusGenerator)); + new FlowCompilationValidationHelper(config, sharedFlowMetricsSingleton, mock(UserQuotaManager.class), mockStatusGenerator, + mockedTopologySpecFactory)); this.topologyCatalog.addListener(orchestrator); this.flowCatalog.addListener(orchestrator); // Start application this.serviceLauncher.start(); - // Create Spec to play with - this.topologySpec = initTopologySpec(); this.flowSpec = initFlowSpec(); } @@ -244,9 +250,10 @@ public void createTopologySpec() { } // Make sure TopologyCatalog is empty Assert.assertTrue(specs.size() == 0, "Spec store should be empty before addition"); - // Make sure TopologyCatalog Listener is empty - Assert.assertTrue(specCompiler.getTopologySpecMap().size() == 0, "SpecCompiler should not know about any Topology " - + "before addition"); + Assert.assertTrue(specCompiler.getTopologySpecMap().size() == 1, "SpecCompiler should know about any Topology " + + " irrespective of what is there in the topology catalog"); + // Make sure TopologyCatalog empty + Assert.assertTrue(this.topologyCatalog.getSize() == 0, "Topology catalog should contain 0 Spec before addition"); // Create and add Spec this.topologyCatalog.put(topologySpec); @@ -262,7 +269,7 @@ public void createTopologySpec() { // Make sure TopologyCatalog has the added Topology Assert.assertTrue(specs.size() == 1, "Spec store should contain 1 Spec after addition"); // Make sure TopologyCatalog Listener knows about added Topology - Assert.assertTrue(specCompiler.getTopologySpecMap().size() == 1, "SpecCompiler should contain 1 Spec after addition"); + Assert.assertTrue(this.topologyCatalog.getSize() == 1, "Topology catalog should contain 1 Spec after addition"); } @Test (dependsOnMethods = "createTopologySpec") From 03a3d6d73557b93f7e7724cc5855a959b908e44f Mon Sep 17 00:00:00 2001 From: Arjun Singh Bora Date: Thu, 2 May 2024 01:14:46 +0530 Subject: [PATCH 2/2] address review comments --- .../modules/flow/BaseFlowToJobSpecCompiler.java | 1 - .../core/IdentityFlowToJobSpecCompilerTest.java | 14 +++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java index 63c2a5ffc5b..fdc8098696b 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java @@ -118,7 +118,6 @@ public BaseFlowToJobSpecCompiler(Config config, Collection topolog this.config = config; /* - * ETL-5996 * For multi-tenancy, the following needs to be added: * 1. Change singular templateCatalog to Map to support multiple templateCatalogs * 2. Pick templateCatalog from JobCatalogWithTemplates based on URI, and try to resolve JobSpec using that diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java index e6a10f64887..daa4537bdd6 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java @@ -69,8 +69,8 @@ public class IdentityFlowToJobSpecCompilerTest { private static final String TOPOLOGY_SPEC_STORE_DIR = "/tmp/orchestrator/topologyTestSpecStore_" + System.currentTimeMillis(); private static final String FLOW_SPEC_STORE_DIR = "/tmp/orchestrator/flowTestSpecStore_" + System.currentTimeMillis(); - private IdentityFlowToJobSpecCompiler compilerWithTemplateCalague; - private IdentityFlowToJobSpecCompiler compilerWithoutTemplateCalague; + private IdentityFlowToJobSpecCompiler compilerWithTemplateCatalog; + private IdentityFlowToJobSpecCompiler compilerWithoutTemplateCatalog; @BeforeClass public void setup() throws Exception { @@ -87,11 +87,11 @@ public void setup() throws Exception { // Initialize compiler with template catalog Properties compilerWithTemplateCatalogProperties = new Properties(); compilerWithTemplateCatalogProperties.setProperty(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, TEST_TEMPLATE_CATALOG_URI); - this.compilerWithTemplateCalague = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(compilerWithTemplateCatalogProperties), + this.compilerWithTemplateCatalog = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(compilerWithTemplateCatalogProperties), Collections.singleton(initTopologySpec())); // Initialize compiler without template catalog - this.compilerWithoutTemplateCalague = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(new Properties()), Collections.singleton(initTopologySpec())); + this.compilerWithoutTemplateCatalog = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(new Properties()), Collections.singleton(initTopologySpec())); } private void setupDir(String dir) throws Exception { @@ -182,7 +182,7 @@ public void testCompilerWithTemplateCatalog() { FlowSpec flowSpec = initFlowSpec(); // Run compiler on flowSpec - Dag jobExecutionPlanDag = this.compilerWithTemplateCalague.compileFlow(flowSpec); + Dag jobExecutionPlanDag = this.compilerWithTemplateCatalog.compileFlow(flowSpec); // Assert pre-requisites Assert.assertNotNull(jobExecutionPlanDag, "Expected non null dag."); @@ -215,7 +215,7 @@ public void testCompilerWithoutTemplateCatalog() { FlowSpec flowSpec = initFlowSpec(); // Run compiler on flowSpec - Dag jobExecutionPlanDag = this.compilerWithoutTemplateCalague.compileFlow(flowSpec); + Dag jobExecutionPlanDag = this.compilerWithoutTemplateCatalog.compileFlow(flowSpec); // Assert pre-requisites Assert.assertNotNull(jobExecutionPlanDag, "Expected non null dag."); @@ -249,7 +249,7 @@ public void testNoJobSpecCompilation() { FlowSpec flowSpec = initFlowSpec(TEST_FLOW_GROUP, TEST_FLOW_NAME, "unsupportedSource", "unsupportedSink"); // Run compiler on flowSpec - Dag jobExecutionPlanDag = this.compilerWithTemplateCalague.compileFlow(flowSpec); + Dag jobExecutionPlanDag = this.compilerWithTemplateCatalog.compileFlow(flowSpec); // Assert pre-requisites Assert.assertNotNull(jobExecutionPlanDag, "Expected non null dag.");