diff --git a/Cargo.toml b/Cargo.toml index b5a0240b7..84fb5e2f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,7 +56,7 @@ metrics-exporter-prometheus = { version = "0.17.2", default-features = false } parquet = { version = "55.0", default-features = false } pg_escape = { version = "0.1.1", default-features = false } pin-project-lite = { version = "0.2.16", default-features = false } -postgres-replication = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, rev = "c4b473b478b3adfbf8667d2fbe895d8423f1290b" } +postgres-replication = { path = "../rust-postgres/postgres-replication", default-features = false } prost = { version = "0.14.1", default-features = false } rand = { version = "0.9.2", default-features = false } reqwest = { version = "0.12.22", default-features = false } @@ -70,7 +70,7 @@ serde_json = { version = "1.0.141", default-features = false } sqlx = { version = "0.8.6", default-features = false } thiserror = "2.0.12" tokio = { version = "1.47.0", default-features = false } -tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, rev = "c4b473b478b3adfbf8667d2fbe895d8423f1290b" } +tokio-postgres = { path = "../rust-postgres/tokio-postgres", default-features = false } tokio-rustls = { version = "0.26.2", default-features = false } tracing = { version = "0.1.41", default-features = false } tracing-actix-web = { version = "0.7.19", default-features = false } diff --git a/etl-api/tests/pipelines.rs b/etl-api/tests/pipelines.rs index e12712dff..cbb572a4b 100644 --- a/etl-api/tests/pipelines.rs +++ b/etl-api/tests/pipelines.rs @@ -1217,10 +1217,10 @@ async fn deleting_pipeline_removes_table_schemas_from_source_database() { ).bind(pipeline_id).bind(table2_oid).fetch_one(&source_db_pool).await.unwrap(); // Insert multiple columns for each table to test CASCADE behavior - sqlx::query("INSERT INTO etl.table_columns (table_schema_id, column_name, column_type, type_modifier, nullable, primary_key, column_order) VALUES ($1, 'id', 'INT4', -1, false, true, 0), ($1, 'name', 'TEXT', -1, true, false, 1)") + sqlx::query("INSERT INTO etl.table_columns (table_schema_id, column_name, column_type, type_modifier, nullable, primary_key, ordinal_position, primary_key_position) VALUES ($1, 'id', 'INT4', -1, false, true, 1, 1), ($1, 'name', 'TEXT', -1, true, false, 2, NULL)") .bind(table_schema_id_1).execute(&source_db_pool).await.unwrap(); - sqlx::query("INSERT INTO etl.table_columns (table_schema_id, column_name, column_type, type_modifier, nullable, primary_key, column_order) VALUES ($1, 'order_id', 'INT8', -1, false, true, 0), ($1, 'amount', 'NUMERIC', -1, false, false, 1)") + sqlx::query("INSERT INTO etl.table_columns (table_schema_id, column_name, column_type, type_modifier, nullable, primary_key, ordinal_position, primary_key_position) VALUES ($1, 'order_id', 'INT8', -1, false, true, 1, 1), ($1, 'amount', 'NUMERIC', -1, false, false, 2, NULL)") .bind(table_schema_id_2).execute(&source_db_pool).await.unwrap(); // Verify data exists before deletion diff --git a/etl-destinations/src/bigquery/client.rs b/etl-destinations/src/bigquery/client.rs index 73c90fb7b..c76137a1d 100644 --- a/etl-destinations/src/bigquery/client.rs +++ b/etl-destinations/src/bigquery/client.rs @@ -446,7 +446,7 @@ impl BigQueryClient { fn add_primary_key_clause(column_schemas: &[ColumnSchema]) -> String { let identity_columns: Vec = column_schemas .iter() - .filter(|s| s.primary) + .filter(|s| s.primary_key_position.is_some()) .map(|c| format!("`{}`", c.name)) .collect(); @@ -800,16 +800,18 @@ mod tests { #[test] fn test_column_spec() { - let column_schema = ColumnSchema::new("test_col".to_string(), Type::TEXT, -1, true, false); + let column_schema = + ColumnSchema::with_positions("test_col".to_string(), Type::TEXT, -1, true, 1, None); let spec = BigQueryClient::column_spec(&column_schema); assert_eq!(spec, "`test_col` string"); - let not_null_column = ColumnSchema::new("id".to_string(), Type::INT4, -1, false, true); + let not_null_column = + ColumnSchema::with_positions("id".to_string(), Type::INT4, -1, false, 1, Some(1)); let not_null_spec = BigQueryClient::column_spec(¬_null_column); assert_eq!(not_null_spec, "`id` int64 not null"); let array_column = - ColumnSchema::new("tags".to_string(), Type::TEXT_ARRAY, -1, false, false); + ColumnSchema::with_positions("tags".to_string(), Type::TEXT_ARRAY, -1, false, 1, None); let array_spec = BigQueryClient::column_spec(&array_column); assert_eq!(array_spec, "`tags` array"); } @@ -817,16 +819,23 @@ mod tests { #[test] fn test_add_primary_key_clause() { let columns_with_pk = vec![ - ColumnSchema::new("id".to_string(), Type::INT4, -1, false, true), - ColumnSchema::new("name".to_string(), Type::TEXT, -1, true, false), + ColumnSchema::with_positions("id".to_string(), Type::INT4, -1, false, 1, Some(1)), + ColumnSchema::with_positions("name".to_string(), Type::TEXT, -1, true, 2, None), ]; let pk_clause = BigQueryClient::add_primary_key_clause(&columns_with_pk); assert_eq!(pk_clause, ", primary key (`id`) not enforced"); let columns_with_composite_pk = vec![ - ColumnSchema::new("tenant_id".to_string(), Type::INT4, -1, false, true), - ColumnSchema::new("id".to_string(), Type::INT4, -1, false, true), - ColumnSchema::new("name".to_string(), Type::TEXT, -1, true, false), + ColumnSchema::with_positions( + "tenant_id".to_string(), + Type::INT4, + -1, + false, + 1, + Some(1), + ), + ColumnSchema::with_positions("id".to_string(), Type::INT4, -1, false, 2, Some(2)), + ColumnSchema::with_positions("name".to_string(), Type::TEXT, -1, true, 3, None), ]; let composite_pk_clause = BigQueryClient::add_primary_key_clause(&columns_with_composite_pk); @@ -836,8 +845,8 @@ mod tests { ); let columns_no_pk = vec![ - ColumnSchema::new("name".to_string(), Type::TEXT, -1, true, false), - ColumnSchema::new("age".to_string(), Type::INT4, -1, true, false), + ColumnSchema::with_positions("name".to_string(), Type::TEXT, -1, true, 1, None), + ColumnSchema::with_positions("age".to_string(), Type::INT4, -1, true, 2, None), ]; let no_pk_clause = BigQueryClient::add_primary_key_clause(&columns_no_pk); assert_eq!(no_pk_clause, ""); @@ -846,9 +855,9 @@ mod tests { #[test] fn test_create_columns_spec() { let columns = vec![ - ColumnSchema::new("id".to_string(), Type::INT4, -1, false, true), - ColumnSchema::new("name".to_string(), Type::TEXT, -1, true, false), - ColumnSchema::new("active".to_string(), Type::BOOL, -1, false, false), + ColumnSchema::with_positions("id".to_string(), Type::INT4, -1, false, 1, Some(1)), + ColumnSchema::with_positions("name".to_string(), Type::TEXT, -1, true, 2, None), + ColumnSchema::with_positions("active".to_string(), Type::BOOL, -1, false, 3, None), ]; let spec = BigQueryClient::create_columns_spec(&columns); assert_eq!( @@ -866,10 +875,10 @@ mod tests { #[test] fn test_column_schemas_to_table_descriptor() { let columns = vec![ - ColumnSchema::new("id".to_string(), Type::INT4, -1, false, true), - ColumnSchema::new("name".to_string(), Type::TEXT, -1, true, false), - ColumnSchema::new("active".to_string(), Type::BOOL, -1, false, false), - ColumnSchema::new("tags".to_string(), Type::TEXT_ARRAY, -1, false, false), + ColumnSchema::with_positions("id".to_string(), Type::INT4, -1, false, 1, Some(1)), + ColumnSchema::with_positions("name".to_string(), Type::TEXT, -1, true, 2, None), + ColumnSchema::with_positions("active".to_string(), Type::BOOL, -1, false, 3, None), + ColumnSchema::with_positions("tags".to_string(), Type::TEXT_ARRAY, -1, false, 4, None), ]; let descriptor = BigQueryClient::column_schemas_to_table_descriptor(&columns, true); @@ -949,12 +958,19 @@ mod tests { #[test] fn test_column_schemas_to_table_descriptor_complex_types() { let columns = vec![ - ColumnSchema::new("uuid_col".to_string(), Type::UUID, -1, true, false), - ColumnSchema::new("json_col".to_string(), Type::JSON, -1, true, false), - ColumnSchema::new("bytea_col".to_string(), Type::BYTEA, -1, true, false), - ColumnSchema::new("numeric_col".to_string(), Type::NUMERIC, -1, true, false), - ColumnSchema::new("date_col".to_string(), Type::DATE, -1, true, false), - ColumnSchema::new("time_col".to_string(), Type::TIME, -1, true, false), + ColumnSchema::with_positions("uuid_col".to_string(), Type::UUID, -1, true, 1, None), + ColumnSchema::with_positions("json_col".to_string(), Type::JSON, -1, true, 2, None), + ColumnSchema::with_positions("bytea_col".to_string(), Type::BYTEA, -1, true, 3, None), + ColumnSchema::with_positions( + "numeric_col".to_string(), + Type::NUMERIC, + -1, + true, + 4, + None, + ), + ColumnSchema::with_positions("date_col".to_string(), Type::DATE, -1, true, 5, None), + ColumnSchema::with_positions("time_col".to_string(), Type::TIME, -1, true, 6, None), ]; let descriptor = BigQueryClient::column_schemas_to_table_descriptor(&columns, true); @@ -1006,8 +1022,8 @@ mod tests { let table_id = "test_table"; let columns = vec![ - ColumnSchema::new("id".to_string(), Type::INT4, -1, false, true), - ColumnSchema::new("name".to_string(), Type::TEXT, -1, true, false), + ColumnSchema::with_positions("id".to_string(), Type::INT4, -1, false, true, 1, Some(1)), + ColumnSchema::with_positions("name".to_string(), Type::TEXT, -1, true, 2, None), ]; // Simulate the query generation logic @@ -1026,12 +1042,13 @@ mod tests { let table_id = "test_table"; let max_staleness_mins = 15; - let columns = vec![ColumnSchema::new( + let columns = vec![ColumnSchema::with_positions( "id".to_string(), Type::INT4, -1, false, - true, + 1, + Some(1), )]; // Simulate the query generation logic with staleness diff --git a/etl-destinations/src/iceberg/destination.rs b/etl-destinations/src/iceberg/destination.rs index 808c99393..2c75ec37c 100644 --- a/etl-destinations/src/iceberg/destination.rs +++ b/etl-destinations/src/iceberg/destination.rs @@ -387,20 +387,24 @@ where let sequence_number_col = find_unique_column_name(&final_schema.column_schemas, SEQUENCE_NUMBER_COLUMN_NAME); - final_schema.add_column_schema(ColumnSchema { - name: cdc_operation_col, - typ: Type::TEXT, - modifier: -1, - nullable: false, - primary: false, - }); - final_schema.add_column_schema(ColumnSchema { - name: sequence_number_col, - typ: Type::TEXT, - modifier: -1, - nullable: false, - primary: false, - }); + let next_ordinal = (final_schema.column_schemas.len() as i32) + 1; + final_schema.add_column_schema(ColumnSchema::with_positions( + cdc_operation_col, + Type::TEXT, + -1, + false, + next_ordinal, + None, + )); + let next_ordinal = (final_schema.column_schemas.len() as i32) + 1; + final_schema.add_column_schema(ColumnSchema::with_positions( + sequence_number_col, + Type::TEXT, + -1, + false, + next_ordinal, + None, + )); final_schema } diff --git a/etl-destinations/tests/iceberg_client.rs b/etl-destinations/tests/iceberg_client.rs index e716b3aa5..493ff6a83 100644 --- a/etl-destinations/tests/iceberg_client.rs +++ b/etl-destinations/tests/iceberg_client.rs @@ -13,6 +13,26 @@ use crate::support::{ mod support; +fn column(name: &str, typ: Type, modifier: i32, nullable: bool, primary: bool) -> ColumnSchema { + ColumnSchema::new(name.to_string(), typ, modifier, nullable, primary) +} + +fn with_positions(mut columns: Vec) -> Vec { + let mut primary_index = 0; + for (idx, column) in columns.iter_mut().enumerate() { + column.ordinal_position = (idx + 1) as i32; + let is_primary = column.primary_key_position.is_some(); + column.primary_key_position = if is_primary { + primary_index += 1; + Some(primary_index) + } else { + None + }; + } + + columns +} + #[tokio::test] async fn create_namespace() { init_test_tracing(); diff --git a/etl-postgres/src/replication/schema.rs b/etl-postgres/src/replication/schema.rs index cd540d653..169750c30 100644 --- a/etl-postgres/src/replication/schema.rs +++ b/etl-postgres/src/replication/schema.rs @@ -173,14 +173,33 @@ pub async fn store_table_schema( .await?; // Insert all columns + let mut sequential_primary_index = 0; + for (column_order, column_schema) in table_schema.column_schemas.iter().enumerate() { let column_type_str = postgres_type_to_string(&column_schema.typ); + let ordinal_position = if column_schema.ordinal_position > 0 { + column_schema.ordinal_position + } else { + (column_order as i32) + 1 + }; + let primary_key_position = match column_schema.primary_key_position { + Some(position) if position > 0 => { + sequential_primary_index = sequential_primary_index.max(position); + Some(position) + } + Some(_) => { + sequential_primary_index += 1; + Some(sequential_primary_index) + } + None => None, + }; + let is_primary = primary_key_position.is_some(); sqlx::query( r#" insert into etl.table_columns - (table_schema_id, column_name, column_type, type_modifier, nullable, primary_key, column_order) - values ($1, $2, $3, $4, $5, $6, $7) + (table_schema_id, column_name, column_type, type_modifier, nullable, primary_key, ordinal_position, primary_key_position) + values ($1, $2, $3, $4, $5, $6, $7, $8) "#, ) .bind(table_schema_id) @@ -188,8 +207,9 @@ pub async fn store_table_schema( .bind(column_type_str) .bind(column_schema.modifier) .bind(column_schema.nullable) - .bind(column_schema.primary) - .bind(column_order as i32) + .bind(is_primary) + .bind(ordinal_position) + .bind(primary_key_position) .execute(&mut *tx) .await?; } @@ -217,12 +237,12 @@ pub async fn load_table_schemas( tc.column_type, tc.type_modifier, tc.nullable, - tc.primary_key, - tc.column_order + tc.ordinal_position, + tc.primary_key_position from etl.table_schemas ts inner join etl.table_columns tc on ts.id = tc.table_schema_id where ts.pipeline_id = $1 - order by ts.table_id, tc.column_order + order by ts.table_id, tc.ordinal_position "#, ) .bind(pipeline_id) @@ -304,14 +324,16 @@ fn parse_column_schema(row: &PgRow) -> ColumnSchema { let column_type: String = row.get("column_type"); let type_modifier: i32 = row.get("type_modifier"); let nullable: bool = row.get("nullable"); - let primary_key: bool = row.get("primary_key"); + let ordinal_position: i32 = row.get("ordinal_position"); + let primary_key_position: Option = row.get("primary_key_position"); - ColumnSchema::new( + ColumnSchema::with_positions( column_name, string_to_postgres_type(&column_type), type_modifier, nullable, - primary_key, + ordinal_position, + primary_key_position, ) } diff --git a/etl-postgres/src/tokio/test_utils.rs b/etl-postgres/src/tokio/test_utils.rs index 6692e486b..f0750c0cc 100644 --- a/etl-postgres/src/tokio/test_utils.rs +++ b/etl-postgres/src/tokio/test_utils.rs @@ -480,6 +480,8 @@ pub fn id_column_schema() -> ColumnSchema { modifier: -1, nullable: false, primary: true, + ordinal_position: 1, + primary_key_position: Some(1), } } diff --git a/etl-postgres/src/types/schema.rs b/etl-postgres/src/types/schema.rs index c66ab41f1..13de9416c 100644 --- a/etl-postgres/src/types/schema.rs +++ b/etl-postgres/src/types/schema.rs @@ -62,8 +62,10 @@ pub struct ColumnSchema { pub modifier: TypeModifier, /// Whether the column can contain NULL values pub nullable: bool, - /// Whether the column is part of the table's primary key - pub primary: bool, + /// One-based ordinal position of the column within the table definition. + pub ordinal_position: i32, + /// One-based position within the primary key definition when the column is part of it. + pub primary_key_position: Option, } impl ColumnSchema { @@ -79,7 +81,26 @@ impl ColumnSchema { typ, modifier, nullable, - primary, + ordinal_position: 0, + primary_key_position: if primary { Some(0) } else { None }, + } + } + + pub fn with_positions( + name: String, + typ: Type, + modifier: TypeModifier, + nullable: bool, + ordinal_position: i32, + primary_key_position: Option, + ) -> ColumnSchema { + Self { + name, + typ, + modifier, + nullable, + ordinal_position, + primary_key_position, } } @@ -90,9 +111,9 @@ impl ColumnSchema { /// This method is used for comparing table schemas loaded via the initial table sync and the /// relation messages received via CDC. The reason for skipping the `nullable` field is that /// unfortunately Postgres doesn't seem to propagate nullable information of a column via - /// relation messages. The reason for skipping the `primary` field is that if the replica - /// identity of a table is set to full, the relation message sets all columns as primary - /// key, irrespective of what the actual primary key in the table is. + /// relation messages. The method also ignores `ordinal_position` and `primary_key_position` + /// because relation messages only imply ordering indirectly through their sequence and mark + /// all columns as primary when replica identity is full. fn partial_eq(&self, other: &ColumnSchema) -> bool { self.name == other.name && self.typ == other.typ && self.modifier == other.modifier } @@ -212,7 +233,9 @@ impl TableSchema { /// /// This method checks if any column in the table is marked as part of the primary key. pub fn has_primary_keys(&self) -> bool { - self.column_schemas.iter().any(|cs| cs.primary) + self.column_schemas + .iter() + .any(|cs| cs.primary_key_position.is_some()) } /// Compares two [`TableSchema`] instances, excluding the [`ColumnSchema`]'s `nullable` field. diff --git a/etl-replicator/configuration/base.yaml b/etl-replicator/configuration/base.yaml index 7dd8facff..720af4fd6 100644 --- a/etl-replicator/configuration/base.yaml +++ b/etl-replicator/configuration/base.yaml @@ -1,6 +1,6 @@ pipeline: - id: 3 - publication_name: "replicator_publication" + id: 5 + publication_name: "test_pub" batch: max_size: 10000 max_fill_ms: 1000 diff --git a/etl-replicator/migrations/20251007000100_add_primary_key_position_to_table_columns.sql b/etl-replicator/migrations/20251007000100_add_primary_key_position_to_table_columns.sql new file mode 100644 index 000000000..0f917852c --- /dev/null +++ b/etl-replicator/migrations/20251007000100_add_primary_key_position_to_table_columns.sql @@ -0,0 +1,37 @@ +-- Adds primary key and ordinal position metadata to stored table column schemas. +alter table etl.table_columns + add column if not exists primary_key_position integer; + +alter table etl.table_columns + rename column column_order to ordinal_position; + +with primary_positions as ( + select + table_schema_id, + column_name, + row_number() over ( + partition by table_schema_id + order by ordinal_position + ) as position + from etl.table_columns + where primary_key +) +update etl.table_columns tc +set primary_key_position = primary_positions.position +from primary_positions +where tc.table_schema_id = primary_positions.table_schema_id + and tc.column_name = primary_positions.column_name; + +with ordered_columns as ( + select + id, + row_number() over ( + partition by table_schema_id + order by ordinal_position + ) as normalized_position + from etl.table_columns +) +update etl.table_columns tc +set ordinal_position = ordered_columns.normalized_position +from ordered_columns +where tc.id = ordered_columns.id; diff --git a/etl-replicator/migrations/20251007000200_schema_change_triggers.sql b/etl-replicator/migrations/20251007000200_schema_change_triggers.sql new file mode 100644 index 000000000..89fe8246f --- /dev/null +++ b/etl-replicator/migrations/20251007000200_schema_change_triggers.sql @@ -0,0 +1,99 @@ +-- Adds helper functions and event trigger to emit schema change messages via logical decoding. +create or replace function etl.describe_table_schema(p_schema text, p_table text) +returns jsonb +language sql +stable +as +$$ +select jsonb_agg( + jsonb_build_object( + 'column_name', c.column_name, + 'ordinal_position', c.ordinal_position, + 'data_type', c.data_type, + 'is_nullable', c.is_nullable, + 'column_default', c.column_default, + 'is_primary_key', (kcu.column_name is not null), + 'primary_key_position', kcu.ordinal_position + ) order by c.ordinal_position + ) +from information_schema.columns c + left join information_schema.key_column_usage kcu + on kcu.table_schema = c.table_schema + and kcu.table_name = c.table_name + and kcu.column_name = c.column_name + and kcu.constraint_name in (select constraint_name + from information_schema.table_constraints + where table_schema = c.table_schema + and table_name = c.table_name + and constraint_type = 'PRIMARY KEY') +where c.table_schema = p_schema + and c.table_name = p_table; +$$; + +create or replace function etl.emit_schema_change_messages() +returns event_trigger +language plpgsql +as +$$ +declare + cmd record; + table_schema text; + table_name text; + table_oid oid; + schema_json jsonb; + msg_json jsonb; +begin + for cmd in + select * from pg_event_trigger_ddl_commands() + loop + if cmd.object_type not in ('table', 'column') then + continue; + end if; + + table_oid := cmd.objid; + + if table_oid is null then + continue; + end if; + + select n.nspname, c.relname + into table_schema, table_name + from pg_class c + join pg_namespace n on n.oid = c.relnamespace + where c.oid = table_oid + and c.relkind = 'r'; + + if table_schema is null or table_name is null then + continue; + end if; + + select etl.describe_table_schema(table_schema, table_name) + into schema_json; + + if schema_json is null then + continue; + end if; + + msg_json := jsonb_build_object( + 'event', cmd.command_tag, + 'schema_name', table_schema, + 'table_name', table_name, + 'table_id', table_oid::bigint, + 'columns', schema_json + ); + + perform pg_logical_emit_message( + true, + 'supabase_etl_ddl', + convert_to(msg_json::text, 'utf8') + ); + end loop; + + return; +end; +$$; + +create event trigger etl_ddl_message_trigger + on ddl_command_end + when tag in ('ALTER TABLE') + execute function etl.emit_schema_change_messages(); diff --git a/etl/Cargo.toml b/etl/Cargo.toml index b6b94eff2..89d4c1525 100644 --- a/etl/Cargo.toml +++ b/etl/Cargo.toml @@ -31,6 +31,7 @@ ring = { workspace = true, default-features = false } rustls = { workspace = true, features = ["aws-lc-rs", "logging"] } rustls-pemfile = { workspace = true, features = ["std"] } serde_json = { workspace = true, features = ["std"] } +serde = { workspace = true, features = ["derive"] } sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] } tokio = { workspace = true, features = ["rt-multi-thread"] } tokio-postgres = { workspace = true, features = [ diff --git a/etl/src/conversions/event.rs b/etl/src/conversions/event.rs index 81c9867df..be375c012 100644 --- a/etl/src/conversions/event.rs +++ b/etl/src/conversions/event.rs @@ -1,7 +1,5 @@ use core::str; -use etl_postgres::types::{ - ColumnSchema, TableId, TableName, TableSchema, convert_type_oid_to_type, -}; +use etl_postgres::types::{ColumnSchema, TableId, TableSchema}; use postgres_replication::protocol; use std::sync::Arc; use tokio_postgres::types::PgLsn; @@ -10,8 +8,7 @@ use crate::conversions::text::{default_value_for_type, parse_cell_from_postgres_ use crate::error::{ErrorKind, EtlResult}; use crate::store::schema::SchemaStore; use crate::types::{ - BeginEvent, Cell, CommitEvent, DeleteEvent, InsertEvent, RelationEvent, TableRow, - TruncateEvent, UpdateEvent, + BeginEvent, Cell, CommitEvent, DeleteEvent, InsertEvent, TableRow, TruncateEvent, UpdateEvent, }; use crate::{bail, etl_error}; @@ -50,37 +47,6 @@ pub fn parse_event_from_commit_message( } } -/// Creates a [`RelationEvent`] from Postgres protocol data. -/// -/// This method parses the replication protocol relation message and builds -/// a complete table schema for use in interpreting subsequent data events. -pub fn parse_event_from_relation_message( - start_lsn: PgLsn, - commit_lsn: PgLsn, - relation_body: &protocol::RelationBody, -) -> EtlResult { - let table_name = TableName::new( - relation_body.namespace()?.to_string(), - relation_body.name()?.to_string(), - ); - let column_schemas = relation_body - .columns() - .iter() - .map(build_column_schema) - .collect::, _>>()?; - let table_schema = TableSchema::new( - TableId::new(relation_body.rel_id()), - table_name, - column_schemas, - ); - - Ok(RelationEvent { - start_lsn, - commit_lsn, - table_schema, - }) -} - /// Converts a Postgres insert message into an [`InsertEvent`]. /// /// This function processes an insert operation from the replication stream, @@ -249,20 +215,6 @@ where /// This helper method extracts column metadata from the replication protocol /// and converts it into the internal column schema representation. Some fields /// like nullable status have default values due to protocol limitations. -fn build_column_schema(column: &protocol::Column) -> EtlResult { - Ok(ColumnSchema::new( - column.name()?.to_string(), - convert_type_oid_to_type(column.type_id() as u32), - column.type_modifier(), - // We do not have access to this information, so we default it to `false`. - // TODO: figure out how to fill this value correctly or how to handle the missing value - // better. - false, - // Currently 1 means that the column is part of the primary key. - column.flags() == 1, - )) -} - /// Converts Postgres tuple data into a [`TableRow`] using column schemas. /// /// This function transforms raw tuple data from the replication protocol into diff --git a/etl/src/conversions/table_row.rs b/etl/src/conversions/table_row.rs index 3fd95c6df..32b08f556 100644 --- a/etl/src/conversions/table_row.rs +++ b/etl/src/conversions/table_row.rs @@ -164,14 +164,21 @@ mod tests { fn create_test_schema() -> Vec { vec![ - ColumnSchema::new("id".to_string(), Type::INT4, -1, false, true), - ColumnSchema::new("name".to_string(), Type::TEXT, -1, true, false), - ColumnSchema::new("active".to_string(), Type::BOOL, -1, false, false), + ColumnSchema::with_positions("id".to_string(), Type::INT4, -1, false, 1, Some(1)), + ColumnSchema::with_positions("name".to_string(), Type::TEXT, -1, true, 2, None), + ColumnSchema::with_positions("active".to_string(), Type::BOOL, -1, false, 3, None), ] } fn create_single_column_schema(name: &str, typ: Type) -> Vec { - vec![ColumnSchema::new(name.to_string(), typ, -1, false, false)] + vec![ColumnSchema::with_positions( + name.to_string(), + typ, + -1, + false, + 1, + None, + )] } #[test] @@ -227,10 +234,10 @@ mod tests { #[test] fn try_from_multiple_columns_different_types() { let schema = vec![ - ColumnSchema::new("int_col".to_string(), Type::INT4, -1, false, false), - ColumnSchema::new("float_col".to_string(), Type::FLOAT8, -1, false, false), - ColumnSchema::new("text_col".to_string(), Type::TEXT, -1, false, false), - ColumnSchema::new("bool_col".to_string(), Type::BOOL, -1, false, false), + ColumnSchema::with_positions("int_col".to_string(), Type::INT4, -1, false, 1, None), + ColumnSchema::with_positions("float_col".to_string(), Type::FLOAT8, -1, false, 2, None), + ColumnSchema::with_positions("text_col".to_string(), Type::TEXT, -1, false, 3, None), + ColumnSchema::with_positions("bool_col".to_string(), Type::BOOL, -1, false, 4, None), ]; let row_data = b"123\t3.15\tHello World\tt\n"; @@ -333,12 +340,14 @@ mod tests { let mut expected_row = String::new(); for i in 0..50 { - schema.push(ColumnSchema::new( + let ordinal_position = (i + 1) as i32; + schema.push(ColumnSchema::with_positions( format!("col{i}"), Type::INT4, -1, false, - false, + ordinal_position, + None, )); if i > 0 { expected_row.push('\t'); @@ -369,8 +378,8 @@ mod tests { #[test] fn try_from_postgres_delimiter_escaping() { let schema = vec![ - ColumnSchema::new("col1".to_string(), Type::TEXT, -1, false, false), - ColumnSchema::new("col2".to_string(), Type::TEXT, -1, false, false), + ColumnSchema::with_positions("col1".to_string(), Type::TEXT, -1, false, 1, None), + ColumnSchema::with_positions("col2".to_string(), Type::TEXT, -1, false, 2, None), ]; // Postgres escapes tab characters in data with \\t @@ -387,9 +396,9 @@ mod tests { #[test] fn try_from_postgres_escape_at_field_boundaries() { let schema = vec![ - ColumnSchema::new("col1".to_string(), Type::TEXT, -1, false, false), - ColumnSchema::new("col2".to_string(), Type::TEXT, -1, false, false), - ColumnSchema::new("col3".to_string(), Type::TEXT, -1, false, false), + ColumnSchema::with_positions("col1".to_string(), Type::TEXT, -1, false, 1, None), + ColumnSchema::with_positions("col2".to_string(), Type::TEXT, -1, false, 2, None), + ColumnSchema::with_positions("col3".to_string(), Type::TEXT, -1, false, 3, None), ]; // Escapes at the beginning, middle, and end of fields diff --git a/etl/src/replication/apply.rs b/etl/src/replication/apply.rs index 0cfe52371..9b134c297 100644 --- a/etl/src/replication/apply.rs +++ b/etl/src/replication/apply.rs @@ -5,22 +5,23 @@ use futures::StreamExt; use metrics::histogram; use postgres_replication::protocol; use postgres_replication::protocol::{LogicalReplicationMessage, ReplicationMessage}; +use serde::Deserialize; use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::pin; use tokio_postgres::types::PgLsn; -use tracing::{debug, info}; +use tracing::{debug, error, info, warn}; +use crate::bail; use crate::concurrency::shutdown::ShutdownRx; use crate::concurrency::signal::SignalRx; use crate::concurrency::stream::{TimeoutStream, TimeoutStreamResult}; use crate::conversions::event::{ parse_event_from_begin_message, parse_event_from_commit_message, parse_event_from_delete_message, parse_event_from_insert_message, - parse_event_from_relation_message, parse_event_from_truncate_message, - parse_event_from_update_message, + parse_event_from_truncate_message, parse_event_from_update_message, }; use crate::destination::Destination; use crate::error::{ErrorKind, EtlResult}; @@ -31,15 +32,82 @@ use crate::metrics::{ }; use crate::replication::client::PgReplicationClient; use crate::replication::stream::EventsStream; -use crate::state::table::{RetryPolicy, TableReplicationError}; +use crate::state::table::TableReplicationError; use crate::store::schema::SchemaStore; use crate::types::{Event, PipelineId}; -use crate::{bail, etl_error}; /// The amount of milliseconds that pass between one refresh and the other of the system, in case no /// events or shutdown signal are received. const REFRESH_INTERVAL: Duration = Duration::from_millis(1000); +/// The prefix for the custom logical message. +const ETL_PREFIX: &str = "supabase_etl_ddl"; + +/// Column metadata produced by the schema change event trigger. +/// +/// The struct mirrors the JSON payload emitted by the +/// `etl.emit_schema_change_messages` trigger, enabling precise reconstruction of the +/// table definition that triggered the event. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "snake_case")] +struct SchemaChangeColumn { + /// Column identifier in the source table. + column_name: String, + /// Position of the column within the table definition. + ordinal_position: i32, + /// Native Postgres data type for the column. + data_type: String, + /// Nullability marker reported by `information_schema.columns`. + is_nullable: String, + /// Optional default expression for the column. + column_default: Option, + /// True when the column belongs to the primary key. + is_primary_key: bool, + /// Relative position inside the primary key, if applicable. + primary_key_position: Option, +} + +/// Schema change message emitted through logical decoding. +/// +/// The payload describes the DDL command tag, fully qualified table name, and +/// the complete collection of column definitions captured at the time of the change. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "snake_case")] +struct SchemaChangeMessage { + /// Command tag describing the triggering DDL operation. + event: String, + /// Schema that owns the modified table. + schema_name: String, + /// Name of the table affected by the schema change. + table_name: String, + /// Postgres OID identifying the table when it exists at commit time. + table_id: u32, + /// Ordered collection of column metadata for the table. + columns: Vec, +} + +/// Attempts to deserialize a schema change message from raw logical decoding text. +/// +/// Returns `None` when the payload is not valid JSON matching +/// [`SchemaChangeMessage`]. Diagnostic warnings are emitted to aid debugging while +/// ensuring replication continues uninterrupted. +fn decode_schema_change_message(content: &str) -> Option { + let payload = match serde_json::from_str::(content) { + Ok(message) => message, + Err(error) => { + warn!( + ?error, + raw_message = %content, + "failed to decode schema change message, skipping" + ); + + return None; + } + }; + + Some(payload) +} + /// Result type for the apply loop execution. /// /// [`ApplyLoopResult`] indicates the reason why the apply loop terminated, @@ -210,8 +278,6 @@ impl StatusUpdate { enum EndBatch { /// The batch should include the last processed event and end. Inclusive, - /// The batch should exclude the last processed event and end. - Exclusive, } /// Result returned from `handle_replication_message` and related functions @@ -235,7 +301,7 @@ struct HandleMessageResult { /// max size and max fill duration. Currently, this will be set in the following /// conditions: /// - /// * Set to [`EndBatch::Inclusive`]` when a commit message indicates that it will + /// * Set to [`EndBatch::Inclusive`] when a commit message indicates that it will /// mark the table sync worker as caught up. We want to end the batch in this /// case because we do not want to sent events after this commit message because /// these events will also be sent by the apply worker later, leading to @@ -286,18 +352,6 @@ impl HandleMessageResult { ..Default::default() } } - - /// Creates a result that excludes the current event and requests batch termination. - /// - /// Used when the current message triggers a recoverable table-level error. - /// The error is propagated to be handled by the apply loop hook. - fn finish_batch_and_exclude_event(error: TableReplicationError) -> Self { - Self { - end_batch: Some(EndBatch::Exclusive), - table_replication_error: Some(error), - ..Default::default() - } - } } /// A shared state that is used throughout the apply loop to track progress. @@ -913,8 +967,9 @@ where LogicalReplicationMessage::Commit(commit_body) => { handle_commit_message(state, start_lsn, commit_body, hook, pipeline_id).await } - LogicalReplicationMessage::Relation(relation_body) => { - handle_relation_message(state, start_lsn, relation_body, schema_store, hook).await + LogicalReplicationMessage::Relation(_) => { + debug!("skipping RELATION message"); + Ok(HandleMessageResult::default()) } LogicalReplicationMessage::Insert(insert_body) => { handle_insert_message(state, start_lsn, insert_body, hook, schema_store).await @@ -928,6 +983,9 @@ where LogicalReplicationMessage::Truncate(truncate_body) => { handle_truncate_message(state, start_lsn, truncate_body, hook).await } + LogicalReplicationMessage::Message(message_body) => { + handle_logical_decoding_message(state, start_lsn, message_body).await + } LogicalReplicationMessage::Origin(_) => { debug!("received unsupported ORIGIN message"); Ok(HandleMessageResult::default()) @@ -1081,74 +1139,6 @@ where /// /// This function processes schema definition messages by validating that table /// schemas haven't changed unexpectedly during replication. Schema stability -/// is critical for maintaining data consistency between source and destination. -/// -/// When schema changes are detected, the function creates appropriate error -/// conditions and signals batch termination to prevent processing of events -/// with mismatched schemas. This protection mechanism ensures data integrity -/// by failing fast on incompatible schema evolution. -async fn handle_relation_message( - state: &mut ApplyLoopState, - start_lsn: PgLsn, - message: &protocol::RelationBody, - schema_store: &S, - hook: &T, -) -> EtlResult -where - S: SchemaStore + Clone + Send + 'static, - T: ApplyLoopHook, -{ - let Some(remote_final_lsn) = state.remote_final_lsn else { - bail!( - ErrorKind::InvalidState, - "Invalid transaction", - "A transaction should have started for handle_relation_message to be performed" - ); - }; - - let table_id = TableId::new(message.rel_id()); - - if !hook - .should_apply_changes(table_id, remote_final_lsn) - .await? - { - return Ok(HandleMessageResult::no_event()); - } - - // If no table schema is found, it means that something went wrong since we should have schemas - // ready before starting the apply loop. - let existing_table_schema = - schema_store - .get_table_schema(&table_id) - .await? - .ok_or_else(|| { - etl_error!( - ErrorKind::MissingTableSchema, - "Table not found in the schema cache", - format!("The table schema for table {table_id} was not found in the cache") - ) - })?; - - // Convert event from the protocol message. - let event = parse_event_from_relation_message(start_lsn, remote_final_lsn, message)?; - - // We compare the table schema from the relation message with the existing schema (if any). - // The purpose of this comparison is that we want to throw an error and stop the processing - // of any table that incurs in a schema change after the initial table sync is performed. - if !existing_table_schema.partial_eq(&event.table_schema) { - let error = TableReplicationError::with_solution( - table_id, - format!("The schema for table {table_id} has changed during streaming"), - "ETL doesn't support schema changes at this point in time, rollback the schema", - RetryPolicy::ManualRetry, - ); - - return Ok(HandleMessageResult::finish_batch_and_exclude_event(error)); - } - - Ok(HandleMessageResult::return_event(Event::Relation(event))) -} - /// Handles Postgres INSERT messages for row insertion events. async fn handle_insert_message( state: &mut ApplyLoopState, @@ -1295,3 +1285,67 @@ where Ok(HandleMessageResult::return_event(Event::Truncate(event))) } + +/// Handles logical decoding messages emitted by Postgres event triggers. +/// +/// Currently, processes schema change notifications produced by the +/// `etl.emit_schema_change_messages` trigger. Unsupported prefixes are ignored so +/// that other logical decoding messages can be introduced without impacting the +/// replication pipeline. +async fn handle_logical_decoding_message( + state: &mut ApplyLoopState, + start_lsn: PgLsn, + message: &protocol::MessageBody, +) -> EtlResult { + let Some(remote_final_lsn) = state.remote_final_lsn else { + bail!( + ErrorKind::InvalidState, + "Invalid transaction", + "A transaction should have started for handle_logical_decoding_message to be performed" + ); + }; + + let prefix = match message.prefix() { + Ok(prefix) => prefix, + Err(error) => { + warn!(?error, "failed to read logical decoding message prefix"); + + return Ok(HandleMessageResult::default()); + } + }; + + if prefix != ETL_PREFIX { + debug!( + prefix, + "ignoring logical decoding message with unsupported prefix '{prefix}'" + ); + + return Ok(HandleMessageResult::default()); + } + + let content = match message.content() { + Ok(content) => content, + Err(error) => { + warn!(?error, "failed to read logical decoding message content"); + return Ok(HandleMessageResult::default()); + } + }; + + let Some(payload) = decode_schema_change_message(content) else { + error!("didn't manage"); + return Ok(HandleMessageResult::default()); + }; + + info!( + start_lsn = %start_lsn, + commit_lsn = %remote_final_lsn, + event = %payload.event, + schema_name = %payload.schema_name, + table_name = %payload.table_name, + table_id = payload.table_id, + columns = payload.columns.len(), + "received schema change logical decoding message" + ); + + Ok(HandleMessageResult::default()) +} diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index 722437505..f505d7372 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -446,7 +446,7 @@ impl PgReplicationClient { // Do not convert the query or the options to lowercase, see comment in `create_slot_internal`. let options = format!( - r#"("proto_version" '1', "publication_names" {})"#, + r#"("proto_version" '1', "publication_names" {}, "messages" 'true')"#, quote_literal(quote_identifier(publication_name).as_ref()), ); @@ -685,16 +685,24 @@ impl PgReplicationClient { let column_info_query = format!( "{pub_cte} - select a.attname, + select + a.attname, a.atttypid, a.atttypmod, a.attnotnull, - coalesce(i.indisprimary, false) as primary + pk.primary_key_position, + a.attnum as ordinal_position from pg_attribute a - left join pg_index i - on a.attrelid = i.indrelid - and a.attnum = any(i.indkey) - and i.indisprimary = true + left join lateral ( + select + att_positions.ordinality as primary_key_position + from pg_index i + cross join lateral unnest(i.indkey) with ordinality as att_positions(attnum, ordinality) + where i.indrelid = a.attrelid + and i.indisprimary = true + and att_positions.attnum = a.attnum + limit 1 + ) pk on true where a.attnum > 0::int2 and not a.attisdropped and a.attgenerated = '' @@ -714,8 +722,11 @@ impl PgReplicationClient { Self::get_row_value::(&row, "atttypmod", "pg_attribute").await?; let nullable = Self::get_row_value::(&row, "attnotnull", "pg_attribute").await? == "f"; - let primary = - Self::get_row_value::(&row, "primary", "pg_index").await? == "t"; + let ordinal_position = + Self::get_row_value::(&row, "ordinal_position", "pg_attribute").await?; + let primary_key_position = + Self::get_optional_row_value::(&row, "primary_key_position", "pg_index") + .await?; let typ = convert_type_oid_to_type(type_oid); @@ -724,7 +735,8 @@ impl PgReplicationClient { typ, modifier, nullable, - primary, + ordinal_position, + primary_key_position, }) } } @@ -790,4 +802,31 @@ impl PgReplicationClient { ) }) } + + /// Attempts to extract an optional typed value from a row column. + /// + /// Returns `Ok(None)` when the column is NULL. + async fn get_optional_row_value( + row: &SimpleQueryRow, + column_name: &str, + table_name: &str, + ) -> EtlResult> + where + T::Err: fmt::Debug, + { + let Some(value) = row.try_get(column_name)? else { + return Ok(None); + }; + + value.parse().map(Some).map_err(|e: T::Err| { + etl_error!( + ErrorKind::ConversionError, + "Column parsing failed", + format!( + "Failed to parse value from column '{}' in table '{}': {:?}", + column_name, table_name, e + ) + ) + }) + } } diff --git a/etl/src/test_utils/table.rs b/etl/src/test_utils/table.rs index a2c6d46ad..4d336338e 100644 --- a/etl/src/test_utils/table.rs +++ b/etl/src/test_utils/table.rs @@ -30,6 +30,6 @@ pub fn assert_table_schema( assert_eq!(actual.typ, expected.typ); assert_eq!(actual.modifier, expected.modifier); assert_eq!(actual.nullable, expected.nullable); - assert_eq!(actual.primary, expected.primary); + assert_eq!(actual.primary_key_position, expected.primary_key_position); } } diff --git a/etl/tests/postgres_store.rs b/etl/tests/postgres_store.rs index 1038c591a..bec37e934 100644 --- a/etl/tests/postgres_store.rs +++ b/etl/tests/postgres_store.rs @@ -16,14 +16,15 @@ fn create_sample_table_schema() -> TableSchema { let table_id = TableId::new(12345); let table_name = TableName::new("public".to_string(), "test_table".to_string()); let columns = vec![ - ColumnSchema::new("id".to_string(), PgType::INT4, -1, false, true), - ColumnSchema::new("name".to_string(), PgType::TEXT, -1, true, false), - ColumnSchema::new( + ColumnSchema::with_positions("id".to_string(), PgType::INT4, -1, false, 1, Some(1)), + ColumnSchema::with_positions("name".to_string(), PgType::TEXT, -1, true, 2, None), + ColumnSchema::with_positions( "created_at".to_string(), PgType::TIMESTAMPTZ, -1, false, - false, + 3, + None, ), ]; @@ -34,8 +35,15 @@ fn create_another_table_schema() -> TableSchema { let table_id = TableId::new(67890); let table_name = TableName::new("public".to_string(), "another_table".to_string()); let columns = vec![ - ColumnSchema::new("id".to_string(), PgType::INT8, -1, false, true), - ColumnSchema::new("description".to_string(), PgType::VARCHAR, 255, true, false), + ColumnSchema::with_positions("id".to_string(), PgType::INT8, -1, false, 1, Some(1)), + ColumnSchema::with_positions( + "description".to_string(), + PgType::VARCHAR, + 255, + true, + 2, + None, + ), ]; TableSchema::new(table_id, table_name, columns) @@ -333,12 +341,13 @@ async fn test_schema_store_update_existing() { .unwrap(); // Update schema by adding a column - table_schema.add_column_schema(ColumnSchema::new( + table_schema.add_column_schema(ColumnSchema::with_positions( "updated_at".to_string(), PgType::TIMESTAMPTZ, -1, true, - false, + 4, + None, )); // Store updated schema