Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 82 additions & 9 deletions docs/lineage/openlineage.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,88 @@ The DataHub OpenLineage integration can be configured using environment variable

##### Environment Variables

| Environment Variable | Property | Type | Default | Description |
| ------------------------------------------------------ | ------------------------------------------------------ | ------- | ------- | --------------------------------------------------------------- |
| `DATAHUB_OPENLINEAGE_PLATFORM_INSTANCE` | `datahub.openlineage.platform-instance` | String | `null` | Specific platform instance identifier |
| `DATAHUB_OPENLINEAGE_COMMON_DATASET_PLATFORM_INSTANCE` | `datahub.openlineage.common-dataset-platform-instance` | String | `null` | Common platform instance for datasets |
| `DATAHUB_OPENLINEAGE_MATERIALIZE_DATASET` | `datahub.openlineage.materialize-dataset` | Boolean | `true` | Whether to materialize dataset entities |
| `DATAHUB_OPENLINEAGE_INCLUDE_SCHEMA_METADATA` | `datahub.openlineage.include-schema-metadata` | Boolean | `true` | Whether to include schema metadata in lineage |
| `DATAHUB_OPENLINEAGE_CAPTURE_COLUMN_LEVEL_LINEAGE` | `datahub.openlineage.capture-column-level-lineage` | Boolean | `true` | Whether to capture column-level lineage information |
| `DATAHUB_OPENLINEAGE_FILE_PARTITION_REGEXP_PATTERN` | `datahub.openlineage.file-partition-regexp-pattern` | String | `null` | Regular expression pattern for file partition detection |
| `DATAHUB_OPENLINEAGE_USE_PATCH` | `datahub.openlineage.use-patch` | Boolean | `false` | Whether to use patch operations for lineage/incremental lineage |
| Environment Variable | Property | Type | Default | Description |
| ------------------------------------------------------ | ------------------------------------------------------ | ------- | ------- | ----------------------------------------------------------------------------------------------------------------- |
| `DATAHUB_OPENLINEAGE_ENV` | `datahub.openlineage.env` | String | `PROD` | Environment for DataFlow cluster and Dataset fabricType (see valid values below) |
| `DATAHUB_OPENLINEAGE_ORCHESTRATOR` | `datahub.openlineage.orchestrator` | String | `null` | Orchestrator name for DataFlow entities. When set, takes precedence over processing_engine facet and producer URL |
| `DATAHUB_OPENLINEAGE_PLATFORM_INSTANCE` | `datahub.openlineage.platform-instance` | String | `null` | Override DataFlow cluster (defaults to env if not specified) |
| `DATAHUB_OPENLINEAGE_COMMON_DATASET_ENV` | `datahub.openlineage.common-dataset-env` | String | `null` | Override Dataset environment independently from DataFlow cluster |
| `DATAHUB_OPENLINEAGE_COMMON_DATASET_PLATFORM_INSTANCE` | `datahub.openlineage.common-dataset-platform-instance` | String | `null` | Common platform instance for dataset entities |
| `DATAHUB_OPENLINEAGE_MATERIALIZE_DATASET` | `datahub.openlineage.materialize-dataset` | Boolean | `true` | Whether to materialize dataset entities |
| `DATAHUB_OPENLINEAGE_INCLUDE_SCHEMA_METADATA` | `datahub.openlineage.include-schema-metadata` | Boolean | `true` | Whether to include schema metadata in lineage |
| `DATAHUB_OPENLINEAGE_CAPTURE_COLUMN_LEVEL_LINEAGE` | `datahub.openlineage.capture-column-level-lineage` | Boolean | `true` | Whether to capture column-level lineage information |
| `DATAHUB_OPENLINEAGE_USE_PATCH` | `datahub.openlineage.use-patch` | Boolean | `false` | Whether to use patch operations for lineage/incremental lineage |
| `DATAHUB_OPENLINEAGE_FILE_PARTITION_REGEXP_PATTERN` | `datahub.openlineage.file-partition-regexp-pattern` | String | `null` | Regular expression pattern for file partition detection |

> **Valid `env` values**: `PROD`, `DEV`, `TEST`, `QA`, `UAT`, `EI`, `PRE`, `STG`, `NON_PROD`, `CORP`, `RVW`, `PRD`, `TST`, `SIT`, `SBX`, `SANDBOX`
>
> **How `env` works**:
>
> - **By default**, `env` sets both the DataFlow cluster and Dataset fabricType for simplicity
> - **For advanced scenarios**, use `platform-instance` to override the DataFlow cluster or `common-dataset-env` to override the Dataset environment independently
>
> **Note**: The `env` property naming matches DataHub SDK conventions where `env` is the user-facing parameter that internally maps to the URN `cluster` field.

##### Usage Examples

**Setting Environment and Orchestrator**

_Simple Configuration (Recommended):_

For most use cases, set `env` to configure both DataFlow and Datasets:

```bash
# Development environment - sets DataFlow cluster to "dev" and Dataset fabricType to DEV
DATAHUB_OPENLINEAGE_ENV=DEV
DATAHUB_OPENLINEAGE_ORCHESTRATOR=my-orchestrator

# Production environment - sets DataFlow cluster to "prod" and Dataset fabricType to PROD
DATAHUB_OPENLINEAGE_ENV=PROD
DATAHUB_OPENLINEAGE_ORCHESTRATOR=dagster

# Staging environment
DATAHUB_OPENLINEAGE_ENV=STG
DATAHUB_OPENLINEAGE_ORCHESTRATOR=custom-pipeline
```

_Advanced Configuration (Multi-Region/Complex Deployments):_

Override DataFlow cluster or Dataset environment independently:

```bash
# DataFlow in specific regional cluster, but datasets marked as generic PROD
DATAHUB_OPENLINEAGE_ENV=PROD
DATAHUB_OPENLINEAGE_PLATFORM_INSTANCE=prod-us-west-2 # DataFlow cluster override

# Test pipeline against DEV data (cross-environment testing)
DATAHUB_OPENLINEAGE_ENV=PROD # DataFlow cluster: prod
DATAHUB_OPENLINEAGE_COMMON_DATASET_ENV=DEV # Dataset fabricType: DEV

# Blue-green deployment
DATAHUB_OPENLINEAGE_ENV=PROD
DATAHUB_OPENLINEAGE_PLATFORM_INSTANCE=prod-blue # or prod-green
```

**Using Application Properties**

Alternatively, configure via `application.yml`:

```yaml
datahub:
openlineage:
env: PROD
orchestrator: my-custom-orchestrator
platform-instance: us-west-2
capture-column-level-lineage: true
```

**Priority Order for Orchestrator Determination**

The orchestrator name is determined in the following priority order:

1. `DATAHUB_OPENLINEAGE_ORCHESTRATOR` environment variable (highest priority)
2. `processing_engine` facet in the OpenLineage event
3. Parsing the `producer` URL field with known patterns (Airflow, etc.)

#### Known Limitations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies {
implementation externalDependency.json

testImplementation externalDependency.testng
testImplementation "io.openlineage:openlineage-java:$openLineageVersion"
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,31 @@
@Getter
@ToString
public class DatahubOpenlineageConfig {
@Builder.Default private final boolean isSpark = false;
@Builder.Default private final boolean isStreaming = false;
// Pipeline/Flow configuration
@Builder.Default private final String pipelineName = null;
private final String orchestrator;
@Builder.Default private final FabricType fabricType = FabricType.PROD;

// Platform configuration
private final String platformInstance;
private final String commonDatasetPlatformInstance;
private final String commonDatasetEnv;
private final String platform;

// Spark-specific configuration
@Builder.Default private final boolean isSpark = false;
@Builder.Default private final boolean isStreaming = false;

// Dataset path configuration
@Builder.Default private final Map<String, List<PathSpec>> pathSpecs = new HashMap<>();
private final String filePartitionRegexpPattern;
@Builder.Default private final FabricType fabricType = FabricType.PROD;

// Metadata ingestion configuration
private final boolean materializeDataset;
private final boolean includeSchemaMetadata;
@Builder.Default private final boolean captureColumnLevelLineage = true;

// Advanced configuration
@Builder.Default private final DataJobUrn parentJobUrn = null;
// This is disabled until column level patch support won't be fixed in GMS
@Builder.Default private final boolean usePatch = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1244,7 +1244,8 @@ public static DataFlowUrn getFlowUrn(
producerName = producer.toString();
}

String orchestrator = getOrchestrator(processingEngine, producerName);
String orchestrator =
getOrchestrator(processingEngine, producerName, datahubOpenlineageConfig.getOrchestrator());
String flowName = datahubOpenlineageConfig.getPipelineName();
if (datahubOpenlineageConfig.getPlatformInstance() != null) {
namespace = datahubOpenlineageConfig.getPlatformInstance();
Expand All @@ -1259,7 +1260,13 @@ public static DataFlowInfo convertRunEventToDataFlowInfo(
return dataFlowInfo;
}

private static String getOrchestrator(String processingEngine, String producer) {
private static String getOrchestrator(
String processingEngine, String producer, String orchestratorConfig) {
// If orchestrator is configured, use it with highest priority
if (orchestratorConfig != null && !orchestratorConfig.isEmpty()) {
return orchestratorConfig;
}

String regex = "https://github.com/OpenLineage/OpenLineage/.*/(.*)$";
Pattern pattern = Pattern.compile(regex, Pattern.MULTILINE);
String orchestrator = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package io.datahubproject.openlineage;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

import com.linkedin.common.FabricType;
import io.datahubproject.openlineage.config.DatahubOpenlineageConfig;
import io.datahubproject.openlineage.converter.OpenLineageToDataHub;
import io.datahubproject.openlineage.dataset.DatahubJob;
import io.openlineage.client.OpenLineage;
import java.net.URI;
import java.time.ZonedDateTime;
import java.util.UUID;
import org.testng.annotations.Test;

/** Tests for OpenLineage configuration including orchestrator and fabric type */
public class OpenLineageConfigTest {

private OpenLineage.RunEvent createTestEvent(URI producerUri) {
OpenLineage openLineage = new OpenLineage(producerUri);
return openLineage
.newRunEventBuilder()
.eventTime(ZonedDateTime.now())
.eventType(OpenLineage.RunEvent.EventType.START)
.run(openLineage.newRunBuilder().runId(UUID.randomUUID()).build())
.job(
openLineage
.newJobBuilder()
.namespace("test-namespace")
.name("test-job")
.facets(openLineage.newJobFacetsBuilder().build())
.build())
.inputs(java.util.Collections.emptyList())
.outputs(java.util.Collections.emptyList())
.build();
}

@Test
public void testOrchestratorOverride() throws Exception {
// Create config with orchestrator override
DatahubOpenlineageConfig config =
DatahubOpenlineageConfig.builder()
.fabricType(FabricType.PROD)
.orchestrator("custom-orchestrator")
.build();

OpenLineage.RunEvent runEvent =
createTestEvent(URI.create("https://github.com/OpenLineage/OpenLineage/"));

DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, config);

assertNotNull(datahubJob);
assertEquals(
datahubJob.getFlowUrn().getOrchestratorEntity(),
"custom-orchestrator",
"Orchestrator should be overridden to custom-orchestrator");
}

@Test
public void testOrchestratorFromProducerUrl() throws Exception {
// Test with an Airflow producer URL and no override
DatahubOpenlineageConfig config =
DatahubOpenlineageConfig.builder().fabricType(FabricType.PROD).build();

URI producerUri = URI.create("https://github.com/apache/airflow/");
OpenLineage.RunEvent runEvent = createTestEvent(producerUri);

DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, config);

assertNotNull(datahubJob);
assertEquals(
datahubJob.getFlowUrn().getOrchestratorEntity(),
"airflow",
"Orchestrator should be derived from Airflow producer URL");
}

@Test
public void testOrchestratorOverrideTakesPrecedence() throws Exception {
// Even if producer URL suggests Airflow, override should win
DatahubOpenlineageConfig config =
DatahubOpenlineageConfig.builder()
.fabricType(FabricType.DEV)
.orchestrator("my-platform")
.build();

URI producerUri = URI.create("https://github.com/apache/airflow/");
OpenLineage.RunEvent runEvent = createTestEvent(producerUri);

DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, config);

assertNotNull(datahubJob);
assertEquals(
datahubJob.getFlowUrn().getOrchestratorEntity(),
"my-platform",
"Orchestrator override should take precedence over producer URL");
}

@Test
public void testFabricTypeConfiguration() throws Exception {
// Test that fabric type can be configured (fabric applies to datasets, not flows)
DatahubOpenlineageConfig config =
DatahubOpenlineageConfig.builder()
.fabricType(FabricType.QA)
.orchestrator("test-orchestrator")
.build();

OpenLineage.RunEvent runEvent =
createTestEvent(URI.create("https://github.com/OpenLineage/OpenLineage/"));

DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, config);

assertNotNull(datahubJob);
// Fabric type is applied to datasets not flows, which is tested in existing tests
assertEquals(config.getFabricType(), FabricType.QA, "Config should have QA fabric type");
}

@Test
public void testDefaultFabricTypeIsProd() throws Exception {
// Test that default fabric type is PROD when not specified
// Need to set orchestrator since no producer URL pattern will match
DatahubOpenlineageConfig config =
DatahubOpenlineageConfig.builder().orchestrator("test").build();

OpenLineage.RunEvent runEvent =
createTestEvent(URI.create("https://github.com/OpenLineage/OpenLineage/spark/"));

DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, config);

assertNotNull(datahubJob);
// Fabric type default is tested via the config
assertEquals(config.getFabricType(), FabricType.PROD, "Default fabric should be PROD");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,25 @@
@ConfigurationProperties(prefix = "datahub.openlineage")
public class DatahubOpenlineageProperties {

// Pipeline/Flow configuration
private String pipelineName;
private String orchestrator;
private String env;

// Platform configuration
private String platformInstance;
private String commonDatasetPlatformInstance;
private String commonDatasetEnv;
private String platform;

// Dataset path configuration
private String filePartitionRegexpPattern;

// Metadata ingestion configuration
private boolean materializeDataset = true;
private boolean includeSchemaMetadata = true;
private boolean captureColumnLevelLineage = true;

// Advanced configuration
private boolean usePatch = false;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.datahubproject.openapi.openlineage.config;

import com.linkedin.common.FabricType;
import io.datahubproject.openapi.openlineage.mapping.RunEventMapper;
import io.datahubproject.openlineage.config.DatahubOpenlineageConfig;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -18,16 +19,46 @@ public OpenLineageServletConfig(DatahubOpenlineageProperties properties) {

@Bean
public RunEventMapper.MappingConfig mappingConfig() {
// Parse FabricType from string property
// Use commonDatasetEnv if specified, otherwise fall back to env
String envValue =
properties.getCommonDatasetEnv() != null
? properties.getCommonDatasetEnv()
: properties.getEnv();

FabricType fabricType = FabricType.PROD; // default
if (envValue != null && !envValue.isEmpty()) {
try {
fabricType = FabricType.valueOf(envValue.toUpperCase());
} catch (IllegalArgumentException e) {
log.warn(
"Invalid env value '{}'. Using default PROD. Valid values: PROD, DEV, TEST, QA, UAT, EI, PRE, STG, NON_PROD, CORP, RVW, PRD, TST, SIT, SBX, SANDBOX",
envValue);
}
}

// Use platformInstance if specified, otherwise use env as the cluster
String platformInstance = properties.getPlatformInstance();
if (platformInstance == null && properties.getEnv() != null && !properties.getEnv().isEmpty()) {
// Default: use env as the DataFlow cluster
platformInstance = properties.getEnv().toLowerCase();
log.debug(
"Using env '{}' as DataFlow cluster (platformInstance not specified)", platformInstance);
}

DatahubOpenlineageConfig datahubOpenlineageConfig =
DatahubOpenlineageConfig.builder()
.platformInstance(properties.getPlatformInstance())
.platformInstance(platformInstance)
.commonDatasetPlatformInstance(properties.getCommonDatasetPlatformInstance())
.commonDatasetEnv(properties.getCommonDatasetEnv())
.platform(properties.getPlatform())
.filePartitionRegexpPattern(properties.getFilePartitionRegexpPattern())
.materializeDataset(properties.isMaterializeDataset())
.includeSchemaMetadata(properties.isIncludeSchemaMetadata())
.captureColumnLevelLineage(properties.isCaptureColumnLevelLineage())
.usePatch(properties.isUsePatch())
.fabricType(fabricType)
.orchestrator(properties.getOrchestrator())
.parentJobUrn(null)
.build();
log.info("Starting OpenLineage Endpoint with config: {}", datahubOpenlineageConfig);
Expand Down
Loading
Loading