Skip to content

Conversation

@treff7es
Copy link
Contributor

@treff7es treff7es commented Nov 7, 2025

Summary

This PR adds automatic lineage inference from DataHub to the Kafka Connect source connector. Instead of relying solely on connector manifests, the ingestion can now query DataHub's metadata graph to resolve schemas and generate both table-level and column-level lineage.

Motivation

Currently, Kafka Connect lineage extraction is limited by what's explicitly declared in connector configurations. This PR enables:

  1. Wildcard pattern expansion: Connectors configured with patterns like table.include.list: "database.*" can now be resolved to actual table names by querying DataHub
  2. Column-level lineage: Generate fine-grained lineage showing which source columns map to Kafka topic fields
  3. Schema-aware ingestion: Leverage existing metadata in DataHub to enrich Kafka Connect lineage without requiring external database connections

Changes

New Configuration Options

Added three new configuration fields to KafkaConnectSourceConfig:

source:
  type: kafka-connect
  config:
    # Enable DataHub schema resolution (default: false for backward compatibility)
    use_schema_resolver: true
    
    # Expand wildcard patterns to concrete table names (default: true)
    schema_resolver_expand_patterns: true
    
    # Generate column-level lineage (default: true)
    schema_resolver_finegrained_lineage: true

Core Components

  1. SchemaResolver Integration (connector_registry.py):

    • New create_schema_resolver() method to instantiate schema resolvers with platform-specific configurations
    • Automatically attaches resolvers to connector instances during creation
    • Passes pipeline context through the instantiation chain
  2. Fine-Grained Lineage Extraction (common.py):

    • New _extract_fine_grained_lineage() method in BaseConnector
    • Assumes 1:1 column mapping between source tables and Kafka topics (typical for CDC connectors)
    • Generates FineGrainedLineageClass instances for column-level lineage
  3. Snowflake Source Connector (source_connectors.py):

    • New connector for Confluent Cloud Snowflake Source
    • Pattern expansion support (e.g., ANALYTICS.PUBLIC.* → actual tables)
    • Efficient caching to avoid redundant DataHub queries
    • Proper topic naming following Snowflake connector conventions
  4. Platform Support (source_connectors.py):

    • Platform-specific schema resolution for PostgreSQL, MySQL, Snowflake, MongoDB, etc.
    • Handles platform URN generation and schema field mapping
    • Integration with existing JDBC and Debezium connectors

Code Quality Improvements

  • Removed redundant connector instantiation in _derive_topics_from_config()
  • Cleaned up unused legacy methods (extract_lineages(), extract_flow_property_bag())
  • Improved null safety with comprehensive checks before schema resolver access
  • Enhanced error handling with detailed logging and graceful fallbacks

Usage

Before (pattern-based config):

source:
  type: kafka-connect
  config:
    connect_uri: "http://localhost:8083"
    # Connector with wildcard pattern
    # Lineage only shows pattern, not actual tables

After (schema-aware):

source:
  type: kafka-connect
  config:
    connect_uri: "http://localhost:8083"
    use_schema_resolver: true
    schema_resolver_expand_patterns: true
    schema_resolver_finegrained_lineage: true
    # Now resolves actual tables and generates column lineage

Testing

  • 138 tests passing (117 existing + 21 new)
  • ✅ All linting checks pass (ruff)
  • ✅ All type checks pass (mypy)
  • ✅ Test categories:
    • Transform pipeline tests (9)
    • Connector implementation tests (30+)
    • Cloud connector tests (12)
    • Error handling tests (4)
    • Schema resolver integration tests (21)

New test coverage:

  • Pattern expansion with wildcards
  • Fine-grained lineage generation
  • Snowflake Source connector
  • Schema resolver integration
  • Error handling and fallback scenarios

Breaking Changes

None - All features are opt-in via configuration flags. Existing Kafka Connect ingestions continue to work unchanged.

Default behavior (backward compatible):

  • use_schema_resolver: false (disabled by default)
  • No schema resolution without explicit configuration
  • No changes to existing connector behavior

Documentation

All configuration options include comprehensive descriptions:

  • use_schema_resolver: Master switch for the feature
  • schema_resolver_expand_patterns: Controls pattern expansion behavior
  • schema_resolver_finegrained_lineage: Controls column-level lineage generation

🤖 Generated with Claude Code

Co-Authored-By: Claude [email protected]

@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Nov 7, 2025
@codecov
Copy link

codecov bot commented Nov 7, 2025

@treff7es treff7es changed the title Kafka connect datahub infer lineage feat(ingest/kafka-connect): Kafka connect infer lineage from DataHub Nov 10, 2025
@treff7es treff7es marked this pull request as ready for review November 10, 2025 09:13
@treff7es treff7es force-pushed the kafka_connect_cloud_improvements branch from fa085cd to 2ede18b Compare November 10, 2025 09:14
@treff7es treff7es force-pushed the kafak_connect_datahub_infer_lineage branch from bf95de8 to 724d682 Compare November 10, 2025 09:17
@datahub-cyborg datahub-cyborg bot added the needs-review Label for PRs that need review from a maintainer. label Nov 10, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ingestion PR or Issue related to the ingestion of metadata needs-review Label for PRs that need review from a maintainer.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants