Skip to content

Commit ac3e76e

Browse files
authored
feat: allow creating one namespace for one schema for iceberg destination (#420)
1 parent 075d4b0 commit ac3e76e

File tree

12 files changed

+422
-98
lines changed

12 files changed

+422
-98
lines changed

etl-api/src/configs/destination.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ pub enum StoredIcebergConfig {
441441
Supabase {
442442
project_ref: String,
443443
warehouse_name: String,
444-
namespace: String,
444+
namespace: Option<String>,
445445
catalog_token: SerializableSecretString,
446446
s3_access_key_id: SerializableSecretString,
447447
s3_secret_access_key: SerializableSecretString,
@@ -450,7 +450,7 @@ pub enum StoredIcebergConfig {
450450
Rest {
451451
catalog_uri: String,
452452
warehouse_name: String,
453-
namespace: String,
453+
namespace: Option<String>,
454454
s3_access_key_id: SerializableSecretString,
455455
s3_secret_access_key: SerializableSecretString,
456456
s3_endpoint: String,
@@ -466,7 +466,8 @@ pub enum FullApiIcebergConfig {
466466
#[schema(example = "my-warehouse")]
467467
warehouse_name: String,
468468
#[schema(example = "my-namespace")]
469-
namespace: String,
469+
#[serde(skip_serializing_if = "Option::is_none")]
470+
namespace: Option<String>,
470471
#[schema(
471472
example = "eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiIsImtpZCI6IjFkNzFjMGEyNmIxMDFjODQ5ZTkxZmQ1NjdjYjA5NTJmIn0.eyJleHAiOjIwNzA3MTcxNjAsImlhdCI6MTc1NjE0NTE1MCwiaXNzIjoic3VwYWJhc2UiLCJyZWYiOiJhYmNkZWZnaGlqbGttbm9wcXJzdCIsInJvbGUiOiJzZXJ2aWNlX3JvbGUifQ.YdTWkkIvwjSkXot3NC07xyjPjGWQMNzLq5EPzumzrdLzuHrj-zuzI-nlyQtQ5V7gZauysm-wGwmpztRXfPc3AQ"
472473
)]
@@ -484,7 +485,7 @@ pub enum FullApiIcebergConfig {
484485
#[schema(example = "my-warehouse")]
485486
warehouse_name: String,
486487
#[schema(example = "my-namespace")]
487-
namespace: String,
488+
namespace: Option<String>,
488489
#[schema(example = "9156667efc2c70d89af6588da86d2924")]
489490
s3_access_key_id: SerializableSecretString,
490491
#[schema(example = "ca833e890916d848c69135924bcd75e5909184814a0ebc6c988937ee094120d4")]
@@ -500,7 +501,7 @@ pub enum EncryptedStoredIcebergConfig {
500501
Supabase {
501502
project_ref: String,
502503
warehouse_name: String,
503-
namespace: String,
504+
namespace: Option<String>,
504505
catalog_token: EncryptedValue,
505506
s3_access_key_id: EncryptedValue,
506507
s3_secret_access_key: EncryptedValue,
@@ -509,7 +510,7 @@ pub enum EncryptedStoredIcebergConfig {
509510
Rest {
510511
catalog_uri: String,
511512
warehouse_name: String,
512-
namespace: String,
513+
namespace: Option<String>,
513514
s3_access_key_id: EncryptedValue,
514515
s3_secret_access_key: EncryptedValue,
515516
s3_endpoint: String,
@@ -571,7 +572,7 @@ mod tests {
571572
config: StoredIcebergConfig::Supabase {
572573
project_ref: "abcdefghijklmnopqrst".to_string(),
573574
warehouse_name: "my-warehouse".to_string(),
574-
namespace: "my-namespace".to_string(),
575+
namespace: Some("my-namespace".to_string()),
575576
catalog_token: SerializableSecretString::from("eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiIsImtpZCI6IjFkNzFjMGEyNmIxMDFjODQ5ZTkxZmQ1NjdjYjA5NTJmIn0.eyJleHAiOjIwNzA3MTcxNjAsImlhdCI6MTc1NjE0NTE1MCwiaXNzIjoic3VwYWJhc2UiLCJyZWYiOiJhYmNkZWZnaGlqbGttbm9wcXJzdCIsInJvbGUiOiJzZXJ2aWNlX3JvbGUifQ.YdTWkkIvwjSkXot3NC07xyjPjGWQMNzLq5EPzumzrdLzuHrj-zuzI-nlyQtQ5V7gZauysm-wGwmpztRXfPc3AQ".to_string()),
576577
s3_access_key_id: SerializableSecretString::from("9156667efc2c70d89af6588da86d2924".to_string()),
577578
s3_secret_access_key: SerializableSecretString::from("ca833e890916d848c69135924bcd75e5909184814a0ebc6c988937ee094120d4".to_string()),
@@ -637,7 +638,7 @@ mod tests {
637638
catalog_uri: "https://abcdefghijklmnopqrst.storage.supabase.com/storage/v1/iceberg"
638639
.to_string(),
639640
warehouse_name: "my-warehouse".to_string(),
640-
namespace: "my-namespace".to_string(),
641+
namespace: Some("my-namespace".to_string()),
641642
s3_access_key_id: SerializableSecretString::from("id".to_string()),
642643
s3_secret_access_key: SerializableSecretString::from("key".to_string()),
643644
s3_endpoint: "http://localhost:8080".to_string(),
@@ -741,7 +742,7 @@ mod tests {
741742
config: StoredIcebergConfig::Supabase {
742743
project_ref: "abcdefghijklmnopqrst".to_string(),
743744
warehouse_name: "my-warehouse".to_string(),
744-
namespace: "my-namespace".to_string(),
745+
namespace: Some("my-namespace".to_string()),
745746
catalog_token: SerializableSecretString::from("eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiIsImtpZCI6IjFkNzFjMGEyNmIxMDFjODQ5ZTkxZmQ1NjdjYjA5NTJmIn0.eyJleHAiOjIwNzA3MTcxNjAsImlhdCI6MTc1NjE0NTE1MCwiaXNzIjoic3VwYWJhc2UiLCJyZWYiOiJhYmNkZWZnaGlqbGttbm9wcXJzdCIsInJvbGUiOiJzZXJ2aWNlX3JvbGUifQ.YdTWkkIvwjSkXot3NC07xyjPjGWQMNzLq5EPzumzrdLzuHrj-zuzI-nlyQtQ5V7gZauysm-wGwmpztRXfPc3AQ".to_string()),
746747
s3_access_key_id: SerializableSecretString::from("9156667efc2c70d89af6588da86d2924".to_string()),
747748
s3_secret_access_key: SerializableSecretString::from("ca833e890916d848c69135924bcd75e5909184814a0ebc6c988937ee094120d4".to_string()),
@@ -813,7 +814,7 @@ mod tests {
813814
catalog_uri: "https://abcdefghijklmnopqrst.storage.supabase.com/storage/v1/iceberg"
814815
.to_string(),
815816
warehouse_name: "my-warehouse".to_string(),
816-
namespace: "my-namespace".to_string(),
817+
namespace: Some("my-namespace".to_string()),
817818
s3_access_key_id: SerializableSecretString::from("id".to_string()),
818819
s3_secret_access_key: SerializableSecretString::from("key".to_string()),
819820
s3_endpoint: "http://localhost:8080".to_string(),
@@ -924,7 +925,7 @@ mod tests {
924925
config: FullApiIcebergConfig::Supabase {
925926
project_ref: "abcdefghijklmnopqrst".to_string(),
926927
warehouse_name: "my-warehouse".to_string(),
927-
namespace: "my-namespace".to_string(),
928+
namespace: Some("my-namespace".to_string()),
928929
catalog_token: SerializableSecretString::from("eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiIsImtpZCI6IjFkNzFjMGEyNmIxMDFjODQ5ZTkxZmQ1NjdjYjA5NTJmIn0.eyJleHAiOjIwNzA3MTcxNjAsImlhdCI6MTc1NjE0NTE1MCwiaXNzIjoic3VwYWJhc2UiLCJyZWYiOiJhYmNkZWZnaGlqbGttbm9wcXJzdCIsInJvbGUiOiJzZXJ2aWNlX3JvbGUifQ.YdTWkkIvwjSkXot3NC07xyjPjGWQMNzLq5EPzumzrdLzuHrj-zuzI-nlyQtQ5V7gZauysm-wGwmpztRXfPc3AQ".to_string()),
929930
s3_access_key_id: SerializableSecretString::from("9156667efc2c70d89af6588da86d2924".to_string()),
930931
s3_secret_access_key: SerializableSecretString::from("ca833e890916d848c69135924bcd75e5909184814a0ebc6c988937ee094120d4".to_string()),
@@ -990,7 +991,7 @@ mod tests {
990991
catalog_uri: "https://abcdefghijklmnopqrst.storage.supabase.com/storage/v1/iceberg"
991992
.to_string(),
992993
warehouse_name: "my-warehouse".to_string(),
993-
namespace: "my-namespace".to_string(),
994+
namespace: Some("my-namespace".to_string()),
994995
s3_access_key_id: SerializableSecretString::from("id".to_string()),
995996
s3_secret_access_key: SerializableSecretString::from("key".to_string()),
996997
s3_endpoint: "http://localhost:8080".to_string(),
@@ -1048,7 +1049,7 @@ mod tests {
10481049
config: FullApiIcebergConfig::Supabase {
10491050
project_ref: "abcdefghijklmnopqrst".to_string(),
10501051
warehouse_name: "my-warehouse".to_string(),
1051-
namespace: "my-namespace".to_string(),
1052+
namespace: Some("my-namespace".to_string()),
10521053
catalog_token: SerializableSecretString::from("token123".to_string()),
10531054
s3_access_key_id: SerializableSecretString::from("access_key_123".to_string()),
10541055
s3_secret_access_key: SerializableSecretString::from("secret123".to_string()),
@@ -1116,7 +1117,7 @@ mod tests {
11161117
config: FullApiIcebergConfig::Rest {
11171118
catalog_uri: "https://catalog.example.com/iceberg".to_string(),
11181119
warehouse_name: "my-warehouse".to_string(),
1119-
namespace: "my-namespace".to_string(),
1120+
namespace: Some("my-namespace".to_string()),
11201121
s3_access_key_id: SerializableSecretString::from("id".to_string()),
11211122
s3_secret_access_key: SerializableSecretString::from("key".to_string()),
11221123
s3_endpoint: "http://localhost:8080".to_string(),

etl-api/tests/snapshots/destinations__an_existing_iceberg_supabase_destination_can_be_updated.snap

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ Iceberg {
66
config: Supabase {
77
project_ref: "tsrqponmlkjihgfedcba",
88
warehouse_name: "my-updated-warehouse",
9-
namespace: "my-updated-namespace",
9+
namespace: Some(
10+
"my-updated-namespace",
11+
),
1012
catalog_token: SerializableSecretString(
1113
SecretBox<str>([REDACTED]),
1214
),

etl-api/tests/snapshots/destinations_pipelines__an_existing_iceberg_supabase_destination_and_pipeline_can_be_updated.snap

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ Iceberg {
66
config: Supabase {
77
project_ref: "tsrqponmlkjihgfedcba",
88
warehouse_name: "my-updated-warehouse",
9-
namespace: "my-updated-namespace",
9+
namespace: Some(
10+
"my-updated-namespace",
11+
),
1012
catalog_token: SerializableSecretString(
1113
SecretBox<str>([REDACTED]),
1214
),

etl-api/tests/snapshots/destinations_pipelines__iceberg_supabase_destination_and_pipeline_can_be_created.snap

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ Iceberg {
66
config: Supabase {
77
project_ref: "abcdefghijklmnopqrst",
88
warehouse_name: "my-warehouse",
9-
namespace: "my-namespace",
9+
namespace: Some(
10+
"my-namespace",
11+
),
1012
catalog_token: SerializableSecretString(
1113
SecretBox<str>([REDACTED]),
1214
),

etl-api/tests/support/mocks.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ pub mod destinations {
7474
config: FullApiIcebergConfig::Supabase {
7575
project_ref: "abcdefghijklmnopqrst".to_string(),
7676
warehouse_name: "my-warehouse".to_string(),
77-
namespace: "my-namespace".to_string(),
77+
namespace: Some("my-namespace".to_string()),
7878
catalog_token: SerializableSecretString::from(
7979
"eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiIsImtpZCI6IjFkNzFjMGEyNmIxMDFjODQ5ZTkxZmQ1NjdjYjA5NTJmIn0.eyJleHAiOjIwNzA3MTcxNjAsImlhdCI6MTc1NjE0NTE1MCwiaXNzIjoic3VwYWJhc2UiLCJyZWYiOiJhYmNkZWZnaGlqbGttbm9wcXJzdCIsInJvbGUiOiJzZXJ2aWNlX3JvbGUifQ.YdTWkkIvwjSkXot3NC07xyjPjGWQMNzLq5EPzumzrdLzuHrj-zuzI-nlyQtQ5V7gZauysm-wGwmpztRXfPc3AQ".to_string()
8080
),
@@ -95,7 +95,7 @@ pub mod destinations {
9595
config: FullApiIcebergConfig::Supabase {
9696
project_ref: "tsrqponmlkjihgfedcba".to_string(),
9797
warehouse_name: "my-updated-warehouse".to_string(),
98-
namespace: "my-updated-namespace".to_string(),
98+
namespace: Some("my-updated-namespace".to_string()),
9999
catalog_token: SerializableSecretString::from(
100100
"eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiIsImtpZCI6IjJlOGQxZDNjN2MyMTJkOTU4ZmEyOGU2ZDhjZDEwYTMzIn0.eyJleHAiOjIwNzA3MTcxNjAsImlhdCI6MTc1NjE0NTE1MCwiaXNzIjoic3VwYWJhc2UiLCJyZWYiOiJ0c3JxcG9ubWxramloZ2ZlZGNiYSIsInJvbGUiOiJzZXJ2aWNlX3JvbGUifQ.UpdatedTokenSignatureForTesting".to_string()
101101
),

etl-config/src/shared/destination.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,10 @@ pub enum IcebergConfig {
6464
project_ref: String,
6565
/// Name of the warehouse in the catalog
6666
warehouse_name: String,
67-
/// Iceberg catalog namespace where tables will be created
68-
namespace: String,
67+
/// If present, the iceberg catalog namespace where tables will be created.
68+
/// If missing, multiple catlog namespaces will be created, one per source
69+
/// schema.
70+
namespace: Option<String>,
6971
/// Catalog authentication token
7072
catalog_token: SecretString,
7173
/// The S3 access key id
@@ -80,8 +82,10 @@ pub enum IcebergConfig {
8082
catalog_uri: String,
8183
/// Name of the warehouse in the catalog
8284
warehouse_name: String,
83-
/// Iceberg catalog namespace where tables will be created
84-
namespace: String,
85+
/// If present, the iceberg catalog namespace where tables will be created.
86+
/// If missing, multiple catlog namespaces will be created, one per source
87+
/// schema.
88+
namespace: Option<String>,
8589
/// The S3 access key id
8690
s3_access_key_id: SecretString,
8791
/// The S3 secret access key
@@ -102,8 +106,10 @@ pub enum IcebergConfigWithoutSecrets {
102106
project_ref: String,
103107
/// Name of the warehouse in the catalog
104108
warehouse_name: String,
105-
/// Iceberg catalog namespace where tables will be created
106-
namespace: String,
109+
/// If present, the iceberg catalog namespace where tables will be created.
110+
/// If missing, multiple catlog namespaces will be created, one per source
111+
/// schema.
112+
namespace: Option<String>,
107113
/// The S3 region
108114
s3_region: String,
109115
},
@@ -113,7 +119,7 @@ pub enum IcebergConfigWithoutSecrets {
113119
/// Name of the warehouse in the catalog
114120
warehouse_name: String,
115121
/// Iceberg catalog namespace where tables will be created
116-
namespace: String,
122+
namespace: Option<String>,
117123
/// The S3 endpoint
118124
s3_endpoint: String,
119125
},

0 commit comments

Comments
 (0)