19
19
20
20
import java .io .IOException ;
21
21
import java .net .URI ;
22
+ import java .util .Collection ;
22
23
import java .util .Collections ;
23
24
import java .util .List ;
24
25
import java .util .Map ;
25
26
import java .util .Properties ;
26
27
27
28
import org .apache .commons .lang3 .StringUtils ;
28
- import org .apache .gobblin .runtime .spec_catalog .FlowCatalog ;
29
- import org .apache .gobblin .service .modules .orchestration .UserQuotaManager ;
30
- import org .apache .gobblin .util .reflection .GobblinConstructorUtils ;
31
29
import org .quartz .CronExpression ;
32
30
import org .slf4j .Logger ;
33
31
import org .slf4j .LoggerFactory ;
48
46
import org .apache .gobblin .configuration .State ;
49
47
import org .apache .gobblin .instrumented .Instrumented ;
50
48
import org .apache .gobblin .metrics .MetricContext ;
49
+ import org .apache .gobblin .metrics .ServiceMetricNames ;
51
50
import org .apache .gobblin .metrics .Tag ;
52
51
import org .apache .gobblin .runtime .api .FlowSpec ;
53
52
import org .apache .gobblin .runtime .api .JobSpec ;
58
57
import org .apache .gobblin .runtime .job_catalog .FSJobCatalog ;
59
58
import org .apache .gobblin .runtime .job_spec .ResolvedJobSpec ;
60
59
import org .apache .gobblin .runtime .spec_catalog .AddSpecResponse ;
60
+ import org .apache .gobblin .runtime .spec_catalog .FlowCatalog ;
61
61
import org .apache .gobblin .service .ServiceConfigKeys ;
62
- import org .apache .gobblin .metrics .ServiceMetricNames ;
63
62
import org .apache .gobblin .service .modules .flowgraph .Dag ;
63
+ import org .apache .gobblin .service .modules .orchestration .UserQuotaManager ;
64
64
import org .apache .gobblin .service .modules .spec .JobExecutionPlan ;
65
65
import org .apache .gobblin .util .ConfigUtils ;
66
66
import org .apache .gobblin .util .PropertiesUtils ;
67
+ import org .apache .gobblin .util .reflection .GobblinConstructorUtils ;
67
68
68
69
69
70
// Provide base implementation for constructing multi-hops route.
@@ -73,8 +74,7 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
73
74
// Since {@link SpecCompiler} is an {@link SpecCatalogListener}, it is expected that any Spec change should be reflected
74
75
// to these data structures.
75
76
@ Getter
76
- @ Setter
77
- protected final Map <URI , TopologySpec > topologySpecMap ;
77
+ protected final Map <URI , TopologySpec > topologySpecMap = Maps .newConcurrentMap ();
78
78
79
79
protected final Config config ;
80
80
protected final Logger log ;
@@ -97,35 +97,13 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
97
97
98
98
private Optional <UserQuotaManager > userQuotaManager ;
99
99
100
- public BaseFlowToJobSpecCompiler (Config config ){
101
- this (config ,true );
102
- }
103
-
104
- public BaseFlowToJobSpecCompiler (Config config , boolean instrumentationEnabled ){
105
- this (config , Optional .<Logger >absent (), true );
106
- }
107
-
108
- public BaseFlowToJobSpecCompiler (Config config , Optional <Logger > log ){
109
- this (config , log ,true );
110
- }
111
-
112
- public BaseFlowToJobSpecCompiler (Config config , Optional <Logger > log , boolean instrumentationEnabled ){
113
- this .log = log .isPresent () ? log .get () : LoggerFactory .getLogger (getClass ());
114
- if (instrumentationEnabled ) {
115
- this .metricContext = Instrumented .getMetricContext (ConfigUtils .configToState (config ), IdentityFlowToJobSpecCompiler .class );
116
- this .flowCompilationSuccessFulMeter = Optional .of (this .metricContext .meter (ServiceMetricNames .FLOW_COMPILATION_SUCCESSFUL_METER ));
117
- this .flowCompilationFailedMeter = Optional .of (this .metricContext .meter (ServiceMetricNames .FLOW_COMPILATION_FAILED_METER ));
118
- this .flowCompilationTimer = Optional .<Timer >of (this .metricContext .timer (ServiceMetricNames .FLOW_COMPILATION_TIMER ));
119
- this .dataAuthorizationTimer = Optional .<Timer >of (this .metricContext .timer (ServiceMetricNames .DATA_AUTHORIZATION_TIMER ));
120
- }
121
- else {
122
- this .metricContext = null ;
123
- this .flowCompilationSuccessFulMeter = Optional .absent ();
124
- this .flowCompilationFailedMeter = Optional .absent ();
125
- this .flowCompilationTimer = Optional .absent ();
126
- this .dataAuthorizationTimer = Optional .absent ();
127
- }
128
-
100
+ public BaseFlowToJobSpecCompiler (Config config , Collection <TopologySpec > topologySpecSet ){
101
+ this .log = LoggerFactory .getLogger (getClass ());
102
+ this .metricContext = Instrumented .getMetricContext (ConfigUtils .configToState (config ), IdentityFlowToJobSpecCompiler .class );
103
+ this .flowCompilationSuccessFulMeter = Optional .of (this .metricContext .meter (ServiceMetricNames .FLOW_COMPILATION_SUCCESSFUL_METER ));
104
+ this .flowCompilationFailedMeter = Optional .of (this .metricContext .meter (ServiceMetricNames .FLOW_COMPILATION_FAILED_METER ));
105
+ this .flowCompilationTimer = Optional .<Timer >of (this .metricContext .timer (ServiceMetricNames .FLOW_COMPILATION_TIMER ));
106
+ this .dataAuthorizationTimer = Optional .<Timer >of (this .metricContext .timer (ServiceMetricNames .DATA_AUTHORIZATION_TIMER ));
129
107
this .warmStandbyEnabled = ConfigUtils .getBoolean (config , ServiceConfigKeys .GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY , false );
130
108
if (this .warmStandbyEnabled ) {
131
109
userQuotaManager = Optional .of (GobblinConstructorUtils .invokeConstructor (UserQuotaManager .class ,
@@ -134,10 +112,12 @@ public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean in
134
112
userQuotaManager = Optional .absent ();
135
113
}
136
114
137
- this .topologySpecMap = Maps .newConcurrentMap ();
115
+ topologySpecSet .forEach (this ::onAddTopologySpec );
116
+
117
+
138
118
this .config = config ;
139
119
140
- /***
120
+ /*
141
121
* ETL-5996
142
122
* For multi-tenancy, the following needs to be added:
143
123
* 1. Change singular templateCatalog to Map<URI, JobCatalogWithTemplates> to support multiple templateCatalogs
@@ -219,8 +199,6 @@ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
219
199
public AddSpecResponse onAddSpec (Spec addedSpec ) {
220
200
if (addedSpec instanceof FlowSpec ) {
221
201
return onAddFlowSpec ((FlowSpec ) addedSpec );
222
- } else if (addedSpec instanceof TopologySpec ) {
223
- return onAddTopologySpec ( (TopologySpec ) addedSpec );
224
202
}
225
203
return new AddSpecResponse (null );
226
204
}
0 commit comments