Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ metrics-exporter-prometheus = { version = "0.17.2", default-features = false }
parquet = { version = "55.0", default-features = false }
pg_escape = { version = "0.1.1", default-features = false }
pin-project-lite = { version = "0.2.16", default-features = false }
postgres-replication = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, rev = "c4b473b478b3adfbf8667d2fbe895d8423f1290b" }
postgres-replication = { path = "../rust-postgres/postgres-replication", default-features = false }
prost = { version = "0.14.1", default-features = false }
rand = { version = "0.9.2", default-features = false }
reqwest = { version = "0.12.22", default-features = false }
Expand All @@ -70,7 +70,7 @@ serde_json = { version = "1.0.141", default-features = false }
sqlx = { version = "0.8.6", default-features = false }
thiserror = "2.0.12"
tokio = { version = "1.47.0", default-features = false }
tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, rev = "c4b473b478b3adfbf8667d2fbe895d8423f1290b" }
tokio-postgres = { path = "../rust-postgres/tokio-postgres", default-features = false }
tokio-rustls = { version = "0.26.2", default-features = false }
tracing = { version = "0.1.41", default-features = false }
tracing-actix-web = { version = "0.7.19", default-features = false }
Expand Down
4 changes: 2 additions & 2 deletions etl-api/tests/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1217,10 +1217,10 @@ async fn deleting_pipeline_removes_table_schemas_from_source_database() {
).bind(pipeline_id).bind(table2_oid).fetch_one(&source_db_pool).await.unwrap();

// Insert multiple columns for each table to test CASCADE behavior
sqlx::query("INSERT INTO etl.table_columns (table_schema_id, column_name, column_type, type_modifier, nullable, primary_key, column_order) VALUES ($1, 'id', 'INT4', -1, false, true, 0), ($1, 'name', 'TEXT', -1, true, false, 1)")
sqlx::query("INSERT INTO etl.table_columns (table_schema_id, column_name, column_type, type_modifier, nullable, primary_key, ordinal_position, primary_key_position) VALUES ($1, 'id', 'INT4', -1, false, true, 1, 1), ($1, 'name', 'TEXT', -1, true, false, 2, NULL)")
.bind(table_schema_id_1).execute(&source_db_pool).await.unwrap();

sqlx::query("INSERT INTO etl.table_columns (table_schema_id, column_name, column_type, type_modifier, nullable, primary_key, column_order) VALUES ($1, 'order_id', 'INT8', -1, false, true, 0), ($1, 'amount', 'NUMERIC', -1, false, false, 1)")
sqlx::query("INSERT INTO etl.table_columns (table_schema_id, column_name, column_type, type_modifier, nullable, primary_key, ordinal_position, primary_key_position) VALUES ($1, 'order_id', 'INT8', -1, false, true, 1, 1), ($1, 'amount', 'NUMERIC', -1, false, false, 2, NULL)")
.bind(table_schema_id_2).execute(&source_db_pool).await.unwrap();

// Verify data exists before deletion
Expand Down
73 changes: 45 additions & 28 deletions etl-destinations/src/bigquery/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ impl BigQueryClient {
fn add_primary_key_clause(column_schemas: &[ColumnSchema]) -> String {
let identity_columns: Vec<String> = column_schemas
.iter()
.filter(|s| s.primary)
.filter(|s| s.primary_key_position.is_some())
.map(|c| format!("`{}`", c.name))
.collect();

Expand Down Expand Up @@ -800,33 +800,42 @@ mod tests {

#[test]
fn test_column_spec() {
let column_schema = ColumnSchema::new("test_col".to_string(), Type::TEXT, -1, true, false);
let column_schema =
ColumnSchema::with_positions("test_col".to_string(), Type::TEXT, -1, true, 1, None);
let spec = BigQueryClient::column_spec(&column_schema);
assert_eq!(spec, "`test_col` string");

let not_null_column = ColumnSchema::new("id".to_string(), Type::INT4, -1, false, true);
let not_null_column =
ColumnSchema::with_positions("id".to_string(), Type::INT4, -1, false, 1, Some(1));
let not_null_spec = BigQueryClient::column_spec(&not_null_column);
assert_eq!(not_null_spec, "`id` int64 not null");

let array_column =
ColumnSchema::new("tags".to_string(), Type::TEXT_ARRAY, -1, false, false);
ColumnSchema::with_positions("tags".to_string(), Type::TEXT_ARRAY, -1, false, 1, None);
let array_spec = BigQueryClient::column_spec(&array_column);
assert_eq!(array_spec, "`tags` array<string>");
}

#[test]
fn test_add_primary_key_clause() {
let columns_with_pk = vec![
ColumnSchema::new("id".to_string(), Type::INT4, -1, false, true),
ColumnSchema::new("name".to_string(), Type::TEXT, -1, true, false),
ColumnSchema::with_positions("id".to_string(), Type::INT4, -1, false, 1, Some(1)),
ColumnSchema::with_positions("name".to_string(), Type::TEXT, -1, true, 2, None),
];
let pk_clause = BigQueryClient::add_primary_key_clause(&columns_with_pk);
assert_eq!(pk_clause, ", primary key (`id`) not enforced");

let columns_with_composite_pk = vec![
ColumnSchema::new("tenant_id".to_string(), Type::INT4, -1, false, true),
ColumnSchema::new("id".to_string(), Type::INT4, -1, false, true),
ColumnSchema::new("name".to_string(), Type::TEXT, -1, true, false),
ColumnSchema::with_positions(
"tenant_id".to_string(),
Type::INT4,
-1,
false,
1,
Some(1),
),
ColumnSchema::with_positions("id".to_string(), Type::INT4, -1, false, 2, Some(2)),
ColumnSchema::with_positions("name".to_string(), Type::TEXT, -1, true, 3, None),
];
let composite_pk_clause =
BigQueryClient::add_primary_key_clause(&columns_with_composite_pk);
Expand All @@ -836,8 +845,8 @@ mod tests {
);

let columns_no_pk = vec![
ColumnSchema::new("name".to_string(), Type::TEXT, -1, true, false),
ColumnSchema::new("age".to_string(), Type::INT4, -1, true, false),
ColumnSchema::with_positions("name".to_string(), Type::TEXT, -1, true, 1, None),
ColumnSchema::with_positions("age".to_string(), Type::INT4, -1, true, 2, None),
];
let no_pk_clause = BigQueryClient::add_primary_key_clause(&columns_no_pk);
assert_eq!(no_pk_clause, "");
Expand All @@ -846,9 +855,9 @@ mod tests {
#[test]
fn test_create_columns_spec() {
let columns = vec![
ColumnSchema::new("id".to_string(), Type::INT4, -1, false, true),
ColumnSchema::new("name".to_string(), Type::TEXT, -1, true, false),
ColumnSchema::new("active".to_string(), Type::BOOL, -1, false, false),
ColumnSchema::with_positions("id".to_string(), Type::INT4, -1, false, 1, Some(1)),
ColumnSchema::with_positions("name".to_string(), Type::TEXT, -1, true, 2, None),
ColumnSchema::with_positions("active".to_string(), Type::BOOL, -1, false, 3, None),
];
let spec = BigQueryClient::create_columns_spec(&columns);
assert_eq!(
Expand All @@ -866,10 +875,10 @@ mod tests {
#[test]
fn test_column_schemas_to_table_descriptor() {
let columns = vec![
ColumnSchema::new("id".to_string(), Type::INT4, -1, false, true),
ColumnSchema::new("name".to_string(), Type::TEXT, -1, true, false),
ColumnSchema::new("active".to_string(), Type::BOOL, -1, false, false),
ColumnSchema::new("tags".to_string(), Type::TEXT_ARRAY, -1, false, false),
ColumnSchema::with_positions("id".to_string(), Type::INT4, -1, false, 1, Some(1)),
ColumnSchema::with_positions("name".to_string(), Type::TEXT, -1, true, 2, None),
ColumnSchema::with_positions("active".to_string(), Type::BOOL, -1, false, 3, None),
ColumnSchema::with_positions("tags".to_string(), Type::TEXT_ARRAY, -1, false, 4, None),
];

let descriptor = BigQueryClient::column_schemas_to_table_descriptor(&columns, true);
Expand Down Expand Up @@ -949,12 +958,19 @@ mod tests {
#[test]
fn test_column_schemas_to_table_descriptor_complex_types() {
let columns = vec![
ColumnSchema::new("uuid_col".to_string(), Type::UUID, -1, true, false),
ColumnSchema::new("json_col".to_string(), Type::JSON, -1, true, false),
ColumnSchema::new("bytea_col".to_string(), Type::BYTEA, -1, true, false),
ColumnSchema::new("numeric_col".to_string(), Type::NUMERIC, -1, true, false),
ColumnSchema::new("date_col".to_string(), Type::DATE, -1, true, false),
ColumnSchema::new("time_col".to_string(), Type::TIME, -1, true, false),
ColumnSchema::with_positions("uuid_col".to_string(), Type::UUID, -1, true, 1, None),
ColumnSchema::with_positions("json_col".to_string(), Type::JSON, -1, true, 2, None),
ColumnSchema::with_positions("bytea_col".to_string(), Type::BYTEA, -1, true, 3, None),
ColumnSchema::with_positions(
"numeric_col".to_string(),
Type::NUMERIC,
-1,
true,
4,
None,
),
ColumnSchema::with_positions("date_col".to_string(), Type::DATE, -1, true, 5, None),
ColumnSchema::with_positions("time_col".to_string(), Type::TIME, -1, true, 6, None),
];

let descriptor = BigQueryClient::column_schemas_to_table_descriptor(&columns, true);
Expand Down Expand Up @@ -1006,8 +1022,8 @@ mod tests {
let table_id = "test_table";

let columns = vec![
ColumnSchema::new("id".to_string(), Type::INT4, -1, false, true),
ColumnSchema::new("name".to_string(), Type::TEXT, -1, true, false),
ColumnSchema::with_positions("id".to_string(), Type::INT4, -1, false, true, 1, Some(1)),
ColumnSchema::with_positions("name".to_string(), Type::TEXT, -1, true, 2, None),
];

// Simulate the query generation logic
Expand All @@ -1026,12 +1042,13 @@ mod tests {
let table_id = "test_table";
let max_staleness_mins = 15;

let columns = vec![ColumnSchema::new(
let columns = vec![ColumnSchema::with_positions(
"id".to_string(),
Type::INT4,
-1,
false,
true,
1,
Some(1),
)];

// Simulate the query generation logic with staleness
Expand Down
32 changes: 18 additions & 14 deletions etl-destinations/src/iceberg/destination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,20 +387,24 @@ where
let sequence_number_col =
find_unique_column_name(&final_schema.column_schemas, SEQUENCE_NUMBER_COLUMN_NAME);

final_schema.add_column_schema(ColumnSchema {
name: cdc_operation_col,
typ: Type::TEXT,
modifier: -1,
nullable: false,
primary: false,
});
final_schema.add_column_schema(ColumnSchema {
name: sequence_number_col,
typ: Type::TEXT,
modifier: -1,
nullable: false,
primary: false,
});
let next_ordinal = (final_schema.column_schemas.len() as i32) + 1;
final_schema.add_column_schema(ColumnSchema::with_positions(
cdc_operation_col,
Type::TEXT,
-1,
false,
next_ordinal,
None,
));
let next_ordinal = (final_schema.column_schemas.len() as i32) + 1;
final_schema.add_column_schema(ColumnSchema::with_positions(
sequence_number_col,
Type::TEXT,
-1,
false,
next_ordinal,
None,
));
final_schema
}

Expand Down
20 changes: 20 additions & 0 deletions etl-destinations/tests/iceberg_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,26 @@ use crate::support::{

mod support;

fn column(name: &str, typ: Type, modifier: i32, nullable: bool, primary: bool) -> ColumnSchema {
ColumnSchema::new(name.to_string(), typ, modifier, nullable, primary)
}

fn with_positions(mut columns: Vec<ColumnSchema>) -> Vec<ColumnSchema> {
let mut primary_index = 0;
for (idx, column) in columns.iter_mut().enumerate() {
column.ordinal_position = (idx + 1) as i32;
let is_primary = column.primary_key_position.is_some();
column.primary_key_position = if is_primary {
primary_index += 1;
Some(primary_index)
} else {
None
};
}

columns
}

#[tokio::test]
async fn create_namespace() {
init_test_tracing();
Expand Down
42 changes: 32 additions & 10 deletions etl-postgres/src/replication/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,23 +173,43 @@ pub async fn store_table_schema(
.await?;

// Insert all columns
let mut sequential_primary_index = 0;

for (column_order, column_schema) in table_schema.column_schemas.iter().enumerate() {
let column_type_str = postgres_type_to_string(&column_schema.typ);
let ordinal_position = if column_schema.ordinal_position > 0 {
column_schema.ordinal_position
} else {
(column_order as i32) + 1
};
let primary_key_position = match column_schema.primary_key_position {
Some(position) if position > 0 => {
sequential_primary_index = sequential_primary_index.max(position);
Some(position)
}
Some(_) => {
sequential_primary_index += 1;
Some(sequential_primary_index)
}
None => None,
};
let is_primary = primary_key_position.is_some();

sqlx::query(
r#"
insert into etl.table_columns
(table_schema_id, column_name, column_type, type_modifier, nullable, primary_key, column_order)
values ($1, $2, $3, $4, $5, $6, $7)
(table_schema_id, column_name, column_type, type_modifier, nullable, primary_key, ordinal_position, primary_key_position)
values ($1, $2, $3, $4, $5, $6, $7, $8)
"#,
)
.bind(table_schema_id)
.bind(&column_schema.name)
.bind(column_type_str)
.bind(column_schema.modifier)
.bind(column_schema.nullable)
.bind(column_schema.primary)
.bind(column_order as i32)
.bind(is_primary)
.bind(ordinal_position)
.bind(primary_key_position)
.execute(&mut *tx)
.await?;
}
Expand Down Expand Up @@ -217,12 +237,12 @@ pub async fn load_table_schemas(
tc.column_type,
tc.type_modifier,
tc.nullable,
tc.primary_key,
tc.column_order
tc.ordinal_position,
tc.primary_key_position
from etl.table_schemas ts
inner join etl.table_columns tc on ts.id = tc.table_schema_id
where ts.pipeline_id = $1
order by ts.table_id, tc.column_order
order by ts.table_id, tc.ordinal_position
"#,
)
.bind(pipeline_id)
Expand Down Expand Up @@ -304,14 +324,16 @@ fn parse_column_schema(row: &PgRow) -> ColumnSchema {
let column_type: String = row.get("column_type");
let type_modifier: i32 = row.get("type_modifier");
let nullable: bool = row.get("nullable");
let primary_key: bool = row.get("primary_key");
let ordinal_position: i32 = row.get("ordinal_position");
let primary_key_position: Option<i32> = row.get("primary_key_position");

ColumnSchema::new(
ColumnSchema::with_positions(
column_name,
string_to_postgres_type(&column_type),
type_modifier,
nullable,
primary_key,
ordinal_position,
primary_key_position,
)
}

Expand Down
2 changes: 2 additions & 0 deletions etl-postgres/src/tokio/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,8 @@ pub fn id_column_schema() -> ColumnSchema {
modifier: -1,
nullable: false,
primary: true,
ordinal_position: 1,
primary_key_position: Some(1),
}
}

Expand Down
Loading
Loading