Skip to content

Commit ddd9468

Browse files
authored
Propagate file system job properties to Work units (#4124)
1 parent 51773ab commit ddd9468

File tree

8 files changed

+31
-12
lines changed

8 files changed

+31
-12
lines changed

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public void submitJob(List<WorkUnit> workunits) {
9090
EventSubmitterContext eventSubmitterContext = new EventSubmitterContext.Builder()
9191
.withEventSubmitter(eventSubmitter)
9292
.build();
93-
PriorJobStateWUProcessingSpec wuSpec = new PriorJobStateWUProcessingSpec(nameNodeUri, workUnitsDir.toString(), eventSubmitterContext);
93+
PriorJobStateWUProcessingSpec wuSpec = new PriorJobStateWUProcessingSpec(nameNodeUri, workUnitsDir.toString(), eventSubmitterContext, this.jobProps);
9494
if (this.jobProps.containsKey(GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE) &&
9595
this.jobProps.containsKey(GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE)) {
9696
int maxBranchesPerTree = PropertiesUtils.getRequiredPropAsInt(this.jobProps, GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.net.URI;
2121
import java.util.Comparator;
2222
import java.util.Optional;
23+
import java.util.Properties;
2324

2425
import org.apache.hadoop.fs.FileStatus;
2526
import org.apache.hadoop.fs.Path;
@@ -44,16 +45,19 @@
4445
public class EagerFsDirBackedWorkUnitClaimCheckWorkload extends AbstractEagerFsDirBackedWorkload<WorkUnitClaimCheck> {
4546
private EventSubmitterContext eventSubmitterContext;
4647

47-
public EagerFsDirBackedWorkUnitClaimCheckWorkload(URI fileSystemUri, String hdfsDir, EventSubmitterContext eventSubmitterContext) {
48+
private Properties fileSystemProperties;
49+
50+
public EagerFsDirBackedWorkUnitClaimCheckWorkload(URI fileSystemUri, String hdfsDir, EventSubmitterContext eventSubmitterContext, Properties fileSystemProperties) {
4851
super(fileSystemUri, hdfsDir);
4952
this.eventSubmitterContext = eventSubmitterContext;
53+
this.fileSystemProperties = fileSystemProperties;
5054
}
5155

5256
@Override
5357
protected WorkUnitClaimCheck fromFileStatus(FileStatus fileStatus) {
5458
// begin by setting all correlators to empty string - later we'll `acknowledgeOrdering()`
5559
Path filePath = fileStatus.getPath();
56-
return new WorkUnitClaimCheck("", this.getFileSystemUri(), filePath.toString(), extractTunneledWorkUnitSizeInfo(filePath), this.eventSubmitterContext);
60+
return new WorkUnitClaimCheck("", this.getFileSystemUri(), filePath.toString(), extractTunneledWorkUnitSizeInfo(filePath), this.eventSubmitterContext, fileSystemProperties);
5761
}
5862

5963
@Override

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.net.URI;
2222
import java.util.ArrayList;
2323
import java.util.List;
24+
import java.util.Properties;
2425

2526
import lombok.Data;
2627
import lombok.EqualsAndHashCode;
@@ -51,8 +52,8 @@ public class PriorJobStateWUProcessingSpec extends WUProcessingSpec {
5152
@NonNull private List<Tag<?>> tags = new ArrayList<>();
5253
@NonNull private String metricsSuffix = GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX;
5354

54-
public PriorJobStateWUProcessingSpec(URI fileSystemUri, String workUnitsDir, EventSubmitterContext eventSubmitterContext) {
55-
super(fileSystemUri, workUnitsDir, eventSubmitterContext);
55+
public PriorJobStateWUProcessingSpec(URI fileSystemUri, String workUnitsDir, EventSubmitterContext eventSubmitterContext, Properties fileSystemProperties) {
56+
super(fileSystemUri, workUnitsDir, eventSubmitterContext, fileSystemProperties);
5657
}
5758

5859
@JsonIgnore

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.gobblin.temporal.ddm.work;
1919

2020
import java.net.URI;
21+
import java.util.Properties;
2122

2223
import org.apache.hadoop.fs.Path;
2324

@@ -51,6 +52,7 @@ public class WUProcessingSpec implements FileSystemApt, FileSystemJobStateful {
5152
@NonNull private URI fileSystemUri;
5253
@NonNull private String workUnitsDir;
5354
@NonNull private EventSubmitterContext eventSubmitterContext;
55+
@NonNull private Properties fileSystemProperties;
5456
@NonNull @Setter(AccessLevel.PUBLIC) private Tuning tuning = Tuning.DEFAULT;
5557

5658
/** whether to conduct job-level timing (and send results via GTE) */
@@ -62,7 +64,7 @@ public boolean isToDoJobLevelTiming() {
6264
@JsonIgnore // (because no-arg method resembles 'java bean property')
6365
@Override
6466
public State getFileSystemConfig() {
65-
return new State(); // TODO - figure out how to truly set!
67+
return new State(fileSystemProperties);
6668
}
6769

6870
@JsonIgnore // (because no-arg method resembles 'java bean property')

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.gobblin.temporal.ddm.work;
1919

2020
import java.net.URI;
21+
import java.util.Properties;
2122

2223
import org.apache.hadoop.fs.Path;
2324

@@ -59,11 +60,12 @@ public class WorkUnitClaimCheck implements FileSystemApt, FileSystemJobStateful
5960
@NonNull private String workUnitPath;
6061
@NonNull private WorkUnitSizeInfo workUnitSizeInfo;
6162
@NonNull private EventSubmitterContext eventSubmitterContext;
63+
@NonNull private Properties fileSystemProperties;
6264

6365
@JsonIgnore // (because no-arg method resembles 'java bean property')
6466
@Override
6567
public State getFileSystemConfig() {
66-
return new State(); // TODO - figure out how to truly set!
68+
return new State(fileSystemProperties);
6769
}
6870

6971
@JsonIgnore // (because no-arg method resembles 'java bean property')

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext event
8585
// Filtering only temporal job properties to pass to child workflows to avoid passing unnecessary properties
8686
final Properties temporalJobProps = PropertiesUtils.extractPropertiesWithPrefix(jobProps,
8787
com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX));
88+
// Add File system properties to the temporal job properties
89+
temporalJobProps.putAll(PropertiesUtils.extractPropertiesWithPrefix(jobProps,
90+
com.google.common.base.Optional.of(ConfigurationKeys.DEFAULT_STATE_STORE_TYPE)));
8891
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext, temporalJobProps);
8992
timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); // update GaaS: `TimingEvent.JOB_START_TIME`
9093
EventTimer jobSuccessTimer = timerFactory.createJobTimer();
@@ -197,7 +200,9 @@ protected static WUProcessingSpec createProcessingSpec(Properties jobProps, Even
197200
JobState jobState = new JobState(jobProps);
198201
URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
199202
Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
200-
WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, workUnitsDirPath.toString(), eventSubmitterContext);
203+
Properties fsProps = PropertiesUtils.extractPropertiesWithPrefix(jobProps,
204+
com.google.common.base.Optional.of(ConfigurationKeys.DEFAULT_STATE_STORE_TYPE));
205+
WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, workUnitsDirPath.toString(), eventSubmitterContext, fsProps);
201206
// TODO: use our own prop names; don't "borrow" from `ProcessWorkUnitsJobLauncher`
202207
if (jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
203208
&& jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE)) {

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.temporal.workflow.ChildWorkflowOptions;
3030
import io.temporal.workflow.Workflow;
3131

32+
import org.apache.gobblin.configuration.ConfigurationKeys;
3233
import org.apache.gobblin.temporal.cluster.WorkerConfig;
3334
import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
3435
import org.apache.gobblin.temporal.ddm.work.CommitStats;
@@ -46,6 +47,7 @@
4647
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
4748
import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
4849
import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
50+
import org.apache.gobblin.util.PropertiesUtils;
4951

5052

5153
@Slf4j
@@ -62,7 +64,7 @@ public CommitStats process(WUProcessingSpec workSpec, final Properties props) {
6264
}
6365

6466
private CommitStats performWork(WUProcessingSpec workSpec, final Properties props) {
65-
Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
67+
Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec, props);
6668
Map<String, Object> searchAttributes = TemporalWorkFlowUtils.generateGaasSearchAttributes(props);
6769
NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec, searchAttributes);
6870

@@ -121,9 +123,11 @@ private Optional<EventTimer> createOptJobEventTimer(WUProcessingSpec workSpec, P
121123
}
122124
}
123125

124-
protected Workload<WorkUnitClaimCheck> createWorkload(WUProcessingSpec workSpec) {
126+
protected Workload<WorkUnitClaimCheck> createWorkload(WUProcessingSpec workSpec, Properties props) {
127+
Properties fsProps = PropertiesUtils.extractPropertiesWithPrefix(props,
128+
com.google.common.base.Optional.of(ConfigurationKeys.DEFAULT_STATE_STORE_TYPE));
125129
return new EagerFsDirBackedWorkUnitClaimCheckWorkload(workSpec.getFileSystemUri(), workSpec.getWorkUnitsDir(),
126-
workSpec.getEventSubmitterContext());
130+
workSpec.getEventSubmitterContext(), fsProps);
127131
}
128132

129133
protected NestingExecWorkflow<WorkUnitClaimCheck> createProcessingWorkflow(FileSystemJobStateful f,

gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkloadTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Arrays;
2222
import java.util.List;
2323
import java.util.Optional;
24+
import java.util.Properties;
2425
import java.util.stream.Collectors;
2526
import java.util.stream.StreamSupport;
2627

@@ -53,7 +54,7 @@ public void setUp() throws Exception {
5354
URI fileSystemUri = new URI("hdfs://localhost:9000");
5455
String hdfsDir = "/test/dir";
5556
eventSubmitterContext = Mockito.mock(EventSubmitterContext.class);
56-
workload = Mockito.spy(new EagerFsDirBackedWorkUnitClaimCheckWorkload(fileSystemUri, hdfsDir, eventSubmitterContext));
57+
workload = Mockito.spy(new EagerFsDirBackedWorkUnitClaimCheckWorkload(fileSystemUri, hdfsDir, eventSubmitterContext, new Properties()));
5758
mockFileSystem = Mockito.mock(FileSystem.class);
5859

5960
MockedStatic<HadoopUtils> mockedHadoopUtils = Mockito.mockStatic(HadoopUtils.class);

0 commit comments

Comments
 (0)