Skip to content

Conversation

@Poorvankbhatia
Copy link
Contributor

@Poorvankbhatia Poorvankbhatia commented Sep 30, 2025

Cassandra Sink V2

This PR is a detailed Phase-1 implementation of FLIP-533.
It adds a new Cassandra sink for Flink that cleanly separates planning (CQL generation/binding) from writing

POJO writing via DataStax Mapper; Row/Tuple & Scala.Product via static & dynamic CQL

  • POJO: implemented with DataStax Mapper (upsert semantics), mapper options (TTL, consistency, saveNullFields)
  • Row / Tuple / Scala.Product: support static CQL (INSERT/UPDATE) and dynamic CQL (per-record routing/clauses/values/customizers)

What this unlocks

  • Static CQL (Row,Tuple,Scala.Product)

    • Write your own UPDATE ... SET ... WHERE ... (incl. IF LWT, USING TTL/TIMESTAMP).
    • Ignore nulls via UNSET to avoid tombstones.
    • Per-statement consistency/timeout via customizer.
  • Dynamic CQL (Row, Tuple)

    • Per-record routing: resolve keyspace/table dynamically (multi-tenant, time-partitioned tables).
    • Per-record values & partial updates: choose which columns to SET, skip nulls.
    • Per-record clauses: USING and IF generated from the record.
    • Per-record statement customization: consistency and timeout decisions at write time.
    • Prepared-statement caching for throughput.

Capabilities (what the sink supports)

Static CQL (user-provided):

  • INSERT / UPDATE / LWT (IF)
  • USING TTL / TIMESTAMP
  • Quoted identifiers & composite keys
  • Optional ignore-null via UNSET (avoid tombstones)
  • Statement customization (consistency, timeout)

Dynamic CQL (planned per record):

  • Table routing (incl. multi-keyspace) via TableResolver
  • Composite-key updates and per-record value binding via ColumnValueResolver
  • Per-record USING / IF via CqlClauseResolver
  • Per-record StatementCustomizer (consistency/timeout)
  • Prepared statement reuse via PreparedStatementCache

Input formats:

  • POJO (DataStax Mapper; static INSERT upsert with mapper options)
  • Row (CQL)
  • Tuple (CQL)
  • Scala.Product (CQL)

Writer semantics:

  • Bounded concurrency (permits) + backpressure
  • Retries with fatal vs. retriable classification
  • flush() waits for all in-flight operations
  • Success/failure callbacks run on the mailbox (task thread)

Metrics (representative):

  • Counters: numRecordsOut, numRecordsOutErrors, retries
  • Histogram hooks for latencies

Feature matrix

Capability POJO (DataStax Mapper) Row (CQL) Tuple (CQL) Scala Product (CQL)
INSERT ✅ (mapper .save() upsert)
UPDATE
LWT (IF, IF EXISTS/NOT EXISTS)
Dynamic CQL (per-record table/key routing, clauses, values)
Ignore nulls ✅ (saveNullFields=false) ✅ (UNSET/customizer) ✅ (UNSET/customizer) ✅ (UNSET/customizer)
TTL / TIMESTAMP / Consistency ✅ (mapper options)
Backpressure / Retries / Flush / Close / Metrics

How users use it

POJO TYPE

PojoSinkConfig<MyPojo> cfg =
    PojoSinkConfig.forPojo(MyPojo.class)
        .withSaveNullFields(false)      // UNSET nulls to avoid tombstones
        .withTtlSeconds(3600)           // optional TTL
        .withConsistencyLevel(ConsistencyLevel.QUORUM); // optional

CassandraSink.<MyPojo>builder()
    .setClusterBuilder(clusterBuilder)
    .setRequestConfiguration(
        RequestConfiguration.builder()
            .setMaxConcurrentRequests(8)
            .setMaxRetries(3)
            .build())
    .setConfig(cfg)
    .build();

CQL TYPES (STATIC QUERIES)

CqlSinkConfig<Row> cfg =
    CqlSinkConfig.<Row>forRow()
        .withQuery("INSERT INTO ks.tbl(id,name,age) VALUES (?,?,?)");

CassandraSink.<Row>builder()
    .setClusterBuilder(clusterBuilder)
    .setRequestConfiguration(RequestConfiguration.builder().setMaxConcurrentRequests(8).build())
    .setConfig(cfg)
    .build();

CQL TYPES (DYNAMIC)

CqlSinkConfig<Row> cfg =
    CqlSinkConfig.<Row>forRow()
        .withTableResolver(/* per-record ks/table */)
        .withClauseResolver(/* USING TTL/TIMESTAMP, IF/LWT per record */)
        .withStatementCustomizer(/* consistency/timeout, ignore-null via UNSET, etc. */);

CassandraSink.<Row>builder()
    .setClusterBuilder(clusterBuilder)
    .setRequestConfiguration(RequestConfiguration.builder().setMaxConcurrentRequests(8).build())
    .setConfig(cfg)
    .build();

What’s included (by area)

Architecture Overview

┌─────────────────────────────────────────────────────┐
│                  CassandraSink<IN>                  │
│                  (Sink V2 Interface)                │
└─────────────────────┬───────────────────────────────┘
                      │
        ┌─────────────▼─────────────┐
        │   CassandraSinkWriter<IN> │
        │                           │
        └─────────────┬─────────────┘
                      │
     ┌────────────────┼────────────────┐
     │                │                │
┌────▼──────┐ ┌───────▼──────┐ ┌───────▼──────┐
│   POJO    │ │     CQL      │ │   RowData    │
│  Writer   │ │    Writer    │ │    Writer    │
└───────────┘ └──────────────┘ └──────────────┘
                    │                │
           ┌────────▼────────────────▼───────────────────┐
           │          Statement Planner                  │
           │   (Static/Dynamic Query Generation)         │
           └─────────────────────────────────────────────┘

Sink entrypoints

  • CassandraSink.java - Main sink implementation with Sink V2 API
  • CassandraSinkWriter.java - Async write coordinator with backpressure
  • CassandraSinkBuilder.java - Fluent builder for sink configuration

Configuration Layer

  • CassandraSinkConfig.java - Base configuration interface
  • PojoSinkConfig.java - POJO-specific with DataStax Mapper support
  • CqlSinkConfig.java - CQL query-based configuration
  • RowDataSinkConfig.java - Table API RowData configuration
  • RequestConfiguration.java - Write operation parameters

Writer runtime & failure handling

  • RecordWriterFactory
  • CassandraFailureHandler
  • CassandraFatalExceptionClassifier
  • MaxRetriesExceededException

Query Planning

  • StatementPlanner.java - Core planning interface
  • StaticPlannerAssembler.java - Pre-compiled statement generation
  • DynamicPlannerAssembler.java - Runtime query construction
  • PreparedStatementCache.java - Statement caching layer

Strategy Pattern Implementation

  • InsertStrategy.java / StaticInsertStrategy.java - INSERT operations
  • UpdateStrategy.java / StaticUpdateStrategy.java - UPDATE operations
  • NullUnsettingCustomizer.java - Null field handling customization

Resolvers & Helpers

  • Tables: TableResolver, FixedTableResolver, TableRef
  • Columns/values: ColumnValueResolver, FixedColumnValueResolver, ResolvedWrite
  • Clauses: CqlClauseResolver, NoOpClauseResolver, ClauseBindings
  • Customization: StatementCustomizer, NoOpCustomizer, NullUnsettingCustomizer
  • Utilities: QueryParser, CqlStatementHelper

Testing in this PR

A. Unit tests (no Cassandra; fast & deterministic)

  • Planning / Assemblers
    • DynamicPlannerAssemblerTest, StaticPlannerAssemblerTest — assemble StatementPlanner for dynamic/static paths; verify routing, clause resolution, strategy selection.
  • Configs
    • RequestConfigurationTest — concurrency/retry/timeout and builder validation.
    • CqlSinkConfigTest, PojoSinkConfigTest — static vs dynamic toggles, null-unsetting, customizers, format selection.
  • Strategies & Parsing
    • InsertStrategyTest, StaticInsertStrategyTest, UpdateStrategyTest, StaticUpdateStrategyTest — CQL generation for INSERT/UPDATE, TTL/TIMESTAMP/IF.
    • QueryParserTest — user CQL parsing, placeholder counts, safety checks.
  • Resolvers & Model
    • ColumnValueResolverTest, FixedColumnValueResolverTest — value mapping, null semantics.
    • ResolvedWriteTest — field extraction, composite PK binding.
    • TableRefTest — keyspace/table/quoted identifiers.
  • Helpers & Factories
    • PreparedStatementCacheTest — reuse/eviction behavior.
    • CqlStatementHelperTest — binding helpers & UNSET handling.
    • StatementPlannerFactoryTest, StatementPlannerTest — end-to-end plan → bind (incl. clause/customizer/multi-keyspace).
    • RecordWriterFactoryTest — picks correct low-level writer per format.
    • CassandraSinkBuilderTest — public builder validation and option wiring.
  • Writer behavior (format-agnostic)
    • CassandraSinkWriterTest — permits/backpressure, permit release (success/error), retries & max-retries, fatal short-circuit, flush()/close() semantics, mailbox callback threading, metrics, no deadlocks.

B. Integration tests (real Cassandra; smoke coverage)

  • CassandraCqlSinkITCase — static CQL path: INSERT/UPDATE variants, USING TTL/TIMESTAMP/IF, quoted identifiers, composite keys, ignore-null (UNSET), fatal validation messages. (Dynamic Table & Column Resolvers.)
  • CassandraPojoSinkITCase — POJO path: mapper options (TTL, consistency, saveNullFields), upsert semantics, null-PK failure, missing table, TTL expiry.
  • CassandraSinkWriterITCase — end-to-end writer loop with mailbox & permits: backpressure at maxConcurrentRequests=1, flush() waits, close() idempotent, fatal error (bad table) surfaces once.

@Poorvankbhatia Poorvankbhatia changed the title [FLINK-26820] Cassandra Sink V2 Implementation (FLIP-533) (Phase-1) [FLINK-37984] Cassandra Sink V2 Implementation (FLIP-533) (Phase-1) Sep 30, 2025
@Poorvankbhatia
Copy link
Contributor Author

Hey @dannycranmer @MartijnVisser can you help with this review? Thanks.

cc: @echauchot for visibility.

@Poorvankbhatia Poorvankbhatia force-pushed the FLINK-37984-Cassandra-Sink-V2-Phase-1 branch 2 times, most recently from 5386e16 to f9e0c15 Compare October 11, 2025 15:30
@Poorvankbhatia Poorvankbhatia force-pushed the FLINK-37984-Cassandra-Sink-V2-Phase-1 branch from f9e0c15 to 0fbdea6 Compare October 12, 2025 07:09
@Poorvankbhatia
Copy link
Contributor Author

Hey @vahmed-hamdy PTAL :) Thank you.

@Poorvankbhatia
Copy link
Contributor Author

cc: @Samrat002 for visibility.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant