[FLINK-37984] Cassandra Sink V2 Implementation (FLIP-533) (Phase-1) #38
      
        
          +14,156
        
        
          −3
        
        
          
        
      
    
  
  Add this suggestion to a batch that can be applied as a single commit.
  This suggestion is invalid because no changes were made to the code.
  Suggestions cannot be applied while the pull request is closed.
  Suggestions cannot be applied while viewing a subset of changes.
  Only one suggestion per line can be applied in a batch.
  Add this suggestion to a batch that can be applied as a single commit.
  Applying suggestions on deleted lines is not supported.
  You must change the existing code in this line in order to create a valid suggestion.
  Outdated suggestions cannot be applied.
  This suggestion has been applied or marked resolved.
  Suggestions cannot be applied from pending reviews.
  Suggestions cannot be applied on multi-line comments.
  Suggestions cannot be applied while the pull request is queued to merge.
  Suggestion cannot be applied right now. Please check back later.
  
    
  
    
Cassandra Sink V2
This PR is a detailed Phase-1 implementation of FLIP-533.
It adds a new Cassandra sink for Flink that cleanly separates planning (CQL generation/binding) from writing
POJO writing via DataStax Mapper; Row/Tuple & Scala.Product via static & dynamic CQL
What this unlocks
Static CQL (
Row,Tuple,Scala.Product)UPDATE ... SET ... WHERE ...(incl.IFLWT,USING TTL/TIMESTAMP).UNSETto avoid tombstones.Dynamic CQL (
Row,Tuple)SET, skip nulls.USINGandIFgenerated from the record.Capabilities (what the sink supports)
Static CQL (user-provided):
Dynamic CQL (planned per record):
TableResolverColumnValueResolverCqlClauseResolverStatementCustomizer(consistency/timeout)PreparedStatementCacheInput formats:
Writer semantics:
flush()waits for all in-flight operationsMetrics (representative):
numRecordsOut,numRecordsOutErrors,retriesFeature matrix
.save()upsert)IF,IF EXISTS/NOT EXISTS)saveNullFields=false)How users use it
POJO TYPE
CQL TYPES (STATIC QUERIES)
CQL TYPES (DYNAMIC)
What’s included (by area)
Architecture Overview
Sink entrypoints
CassandraSink.java- Main sink implementation with Sink V2 APICassandraSinkWriter.java- Async write coordinator with backpressureCassandraSinkBuilder.java- Fluent builder for sink configurationConfiguration Layer
CassandraSinkConfig.java- Base configuration interfacePojoSinkConfig.java- POJO-specific with DataStax Mapper supportCqlSinkConfig.java- CQL query-based configurationRowDataSinkConfig.java- Table API RowData configurationRequestConfiguration.java- Write operation parametersWriter runtime & failure handling
RecordWriterFactoryCassandraFailureHandlerCassandraFatalExceptionClassifierMaxRetriesExceededExceptionQuery Planning
StatementPlanner.java- Core planning interfaceStaticPlannerAssembler.java- Pre-compiled statement generationDynamicPlannerAssembler.java- Runtime query constructionPreparedStatementCache.java- Statement caching layerStrategy Pattern Implementation
InsertStrategy.java/StaticInsertStrategy.java- INSERT operationsUpdateStrategy.java/StaticUpdateStrategy.java- UPDATE operationsNullUnsettingCustomizer.java- Null field handling customizationResolvers & Helpers
TableResolver,FixedTableResolver,TableRefColumnValueResolver,FixedColumnValueResolver,ResolvedWriteCqlClauseResolver,NoOpClauseResolver,ClauseBindingsStatementCustomizer,NoOpCustomizer,NullUnsettingCustomizerQueryParser,CqlStatementHelperTesting in this PR
A. Unit tests (no Cassandra; fast & deterministic)
DynamicPlannerAssemblerTest,StaticPlannerAssemblerTest— assembleStatementPlannerfor dynamic/static paths; verify routing, clause resolution, strategy selection.RequestConfigurationTest— concurrency/retry/timeout and builder validation.CqlSinkConfigTest,PojoSinkConfigTest— static vs dynamic toggles, null-unsetting, customizers, format selection.InsertStrategyTest,StaticInsertStrategyTest,UpdateStrategyTest,StaticUpdateStrategyTest— CQL generation for INSERT/UPDATE, TTL/TIMESTAMP/IF.QueryParserTest— user CQL parsing, placeholder counts, safety checks.ColumnValueResolverTest,FixedColumnValueResolverTest— value mapping, null semantics.ResolvedWriteTest— field extraction, composite PK binding.TableRefTest— keyspace/table/quoted identifiers.PreparedStatementCacheTest— reuse/eviction behavior.CqlStatementHelperTest— binding helpers & UNSET handling.StatementPlannerFactoryTest,StatementPlannerTest— end-to-end plan → bind (incl. clause/customizer/multi-keyspace).RecordWriterFactoryTest— picks correct low-level writer per format.CassandraSinkBuilderTest— public builder validation and option wiring.CassandraSinkWriterTest— permits/backpressure, permit release (success/error), retries & max-retries, fatal short-circuit,flush()/close()semantics, mailbox callback threading, metrics, no deadlocks.B. Integration tests (real Cassandra; smoke coverage)
CassandraCqlSinkITCase— static CQL path: INSERT/UPDATE variants, USING TTL/TIMESTAMP/IF, quoted identifiers, composite keys, ignore-null (UNSET), fatal validation messages. (Dynamic Table & Column Resolvers.)CassandraPojoSinkITCase— POJO path: mapper options (TTL, consistency, saveNullFields), upsert semantics, null-PK failure, missing table, TTL expiry.CassandraSinkWriterITCase— end-to-end writer loop with mailbox & permits: backpressure atmaxConcurrentRequests=1,flush()waits,close()idempotent, fatal error (bad table) surfaces once.