Skip to content

Commit 6eba7a1

Browse files
marvinlanhenkeshaeqahmed
authored andcommitted
feat: Glue Catalog - namespace operations (2/3) (apache#304)
* add from_build_error * impl create_namespace * impl get_namespace * add macro with_catalog_id * impl namespace_exists * impl update_namespace * impl list_tables * impl drop_namespace * fix: clippy * update docs * update docs * fix: naming and visibility of error conversions
1 parent b56a8cf commit 6eba7a1

File tree

4 files changed

+486
-32
lines changed

4 files changed

+486
-32
lines changed

crates/catalog/glue/src/catalog.rs

Lines changed: 197 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,29 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! Iceberg Glue Catalog implementation.
19-
2018
use async_trait::async_trait;
2119
use iceberg::table::Table;
22-
use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent};
20+
use iceberg::{
21+
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
22+
TableIdent,
23+
};
2324
use std::{collections::HashMap, fmt::Debug};
2425

2526
use typed_builder::TypedBuilder;
2627

27-
use crate::error::from_aws_error;
28-
use crate::utils::create_sdk_config;
28+
use crate::error::from_aws_sdk_error;
29+
use crate::utils::{
30+
convert_to_database, convert_to_namespace, create_sdk_config, validate_namespace,
31+
};
32+
use crate::with_catalog_id;
2933

3034
#[derive(Debug, TypedBuilder)]
3135
/// Glue Catalog configuration
3236
pub struct GlueCatalogConfig {
3337
#[builder(default, setter(strip_option))]
3438
uri: Option<String>,
39+
#[builder(default, setter(strip_option))]
40+
catalog_id: Option<String>,
3541
#[builder(default)]
3642
props: HashMap<String, String>,
3743
}
@@ -68,6 +74,10 @@ impl GlueCatalog {
6874

6975
#[async_trait]
7076
impl Catalog for GlueCatalog {
77+
/// List namespaces from glue catalog.
78+
///
79+
/// Glue doesn't support nested namespaces.
80+
/// We will return an empty list if parent is some
7181
async fn list_namespaces(
7282
&self,
7383
parent: Option<&NamespaceIdent>,
@@ -80,17 +90,19 @@ impl Catalog for GlueCatalog {
8090
let mut next_token: Option<String> = None;
8191

8292
loop {
83-
let resp = match &next_token {
93+
let builder = match &next_token {
8494
Some(token) => self.client.0.get_databases().next_token(token),
8595
None => self.client.0.get_databases(),
8696
};
87-
let resp = resp.send().await.map_err(from_aws_error)?;
97+
let builder = with_catalog_id!(builder, self.config);
98+
let resp = builder.send().await.map_err(from_aws_sdk_error)?;
8899

89100
let dbs: Vec<NamespaceIdent> = resp
90101
.database_list()
91102
.iter()
92103
.map(|db| NamespaceIdent::new(db.name().to_string()))
93104
.collect();
105+
94106
database_list.extend(dbs);
95107

96108
next_token = resp.next_token().map(ToOwned::to_owned);
@@ -102,36 +114,200 @@ impl Catalog for GlueCatalog {
102114
Ok(database_list)
103115
}
104116

117+
/// Creates a new namespace with the given identifier and properties.
118+
///
119+
/// Attempts to create a namespace defined by the `namespace`
120+
/// parameter and configured with the specified `properties`.
121+
///
122+
/// This function can return an error in the following situations:
123+
///
124+
/// - Errors from `validate_namespace` if the namespace identifier does not
125+
/// meet validation criteria.
126+
/// - Errors from `convert_to_database` if the properties cannot be
127+
/// successfully converted into a database configuration.
128+
/// - Errors from the underlying database creation process, converted using
129+
/// `from_sdk_error`.
105130
async fn create_namespace(
106131
&self,
107-
_namespace: &NamespaceIdent,
108-
_properties: HashMap<String, String>,
132+
namespace: &NamespaceIdent,
133+
properties: HashMap<String, String>,
109134
) -> Result<Namespace> {
110-
todo!()
135+
let db_input = convert_to_database(namespace, &properties)?;
136+
137+
let builder = self.client.0.create_database().database_input(db_input);
138+
let builder = with_catalog_id!(builder, self.config);
139+
140+
builder.send().await.map_err(from_aws_sdk_error)?;
141+
142+
Ok(Namespace::with_properties(namespace.clone(), properties))
111143
}
112144

113-
async fn get_namespace(&self, _namespace: &NamespaceIdent) -> Result<Namespace> {
114-
todo!()
145+
/// Retrieves a namespace by its identifier.
146+
///
147+
/// Validates the given namespace identifier and then queries the
148+
/// underlying database client to fetch the corresponding namespace data.
149+
/// Constructs a `Namespace` object with the retrieved data and returns it.
150+
///
151+
/// This function can return an error in any of the following situations:
152+
/// - If the provided namespace identifier fails validation checks
153+
/// - If there is an error querying the database, returned by
154+
/// `from_sdk_error`.
155+
async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
156+
let db_name = validate_namespace(namespace)?;
157+
158+
let builder = self.client.0.get_database().name(&db_name);
159+
let builder = with_catalog_id!(builder, self.config);
160+
161+
let resp = builder.send().await.map_err(from_aws_sdk_error)?;
162+
163+
match resp.database() {
164+
Some(db) => {
165+
let namespace = convert_to_namespace(db);
166+
Ok(namespace)
167+
}
168+
None => Err(Error::new(
169+
ErrorKind::DataInvalid,
170+
format!("Database with name: {} does not exist", db_name),
171+
)),
172+
}
115173
}
116174

117-
async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> Result<bool> {
118-
todo!()
175+
/// Checks if a namespace exists within the Glue Catalog.
176+
///
177+
/// Validates the namespace identifier by querying the Glue Catalog
178+
/// to determine if the specified namespace (database) exists.
179+
///
180+
/// # Returns
181+
/// A `Result<bool>` indicating the outcome of the check:
182+
/// - `Ok(true)` if the namespace exists.
183+
/// - `Ok(false)` if the namespace does not exist, identified by a specific
184+
/// `EntityNotFoundException` variant.
185+
/// - `Err(...)` if an error occurs during validation or the Glue Catalog
186+
/// query, with the error encapsulating the issue.
187+
async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
188+
let db_name = validate_namespace(namespace)?;
189+
190+
let builder = self.client.0.get_database().name(&db_name);
191+
let builder = with_catalog_id!(builder, self.config);
192+
193+
let resp = builder.send().await;
194+
195+
match resp {
196+
Ok(_) => Ok(true),
197+
Err(err) => {
198+
if err
199+
.as_service_error()
200+
.map(|e| e.is_entity_not_found_exception())
201+
== Some(true)
202+
{
203+
return Ok(false);
204+
}
205+
Err(from_aws_sdk_error(err))
206+
}
207+
}
119208
}
120209

210+
/// Asynchronously updates properties of an existing namespace.
211+
///
212+
/// Converts the given namespace identifier and properties into a database
213+
/// representation and then attempts to update the corresponding namespace
214+
/// in the Glue Catalog.
215+
///
216+
/// # Returns
217+
/// Returns `Ok(())` if the namespace update is successful. If the
218+
/// namespace cannot be updated due to missing information or an error
219+
/// during the update process, an `Err(...)` is returned.
121220
async fn update_namespace(
122221
&self,
123-
_namespace: &NamespaceIdent,
124-
_properties: HashMap<String, String>,
222+
namespace: &NamespaceIdent,
223+
properties: HashMap<String, String>,
125224
) -> Result<()> {
126-
todo!()
225+
let db_name = validate_namespace(namespace)?;
226+
let db_input = convert_to_database(namespace, &properties)?;
227+
228+
let builder = self
229+
.client
230+
.0
231+
.update_database()
232+
.name(&db_name)
233+
.database_input(db_input);
234+
let builder = with_catalog_id!(builder, self.config);
235+
236+
builder.send().await.map_err(from_aws_sdk_error)?;
237+
238+
Ok(())
127239
}
128240

129-
async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
130-
todo!()
241+
/// Asynchronously drops a namespace from the Glue Catalog.
242+
///
243+
/// Checks if the namespace is empty. If it still contains tables the
244+
/// namespace will not be dropped, but an error is returned instead.
245+
///
246+
/// # Returns
247+
/// A `Result<()>` indicating the outcome:
248+
/// - `Ok(())` signifies successful namespace deletion.
249+
/// - `Err(...)` signifies failure to drop the namespace due to validation
250+
/// errors, connectivity issues, or Glue Catalog constraints.
251+
async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
252+
let db_name = validate_namespace(namespace)?;
253+
let table_list = self.list_tables(namespace).await?;
254+
255+
if !table_list.is_empty() {
256+
return Err(Error::new(
257+
ErrorKind::DataInvalid,
258+
format!("Database with name: {} is not empty", &db_name),
259+
));
260+
}
261+
262+
let builder = self.client.0.delete_database().name(db_name);
263+
let builder = with_catalog_id!(builder, self.config);
264+
265+
builder.send().await.map_err(from_aws_sdk_error)?;
266+
267+
Ok(())
131268
}
132269

133-
async fn list_tables(&self, _namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
134-
todo!()
270+
/// Asynchronously lists all tables within a specified namespace.
271+
///
272+
/// # Returns
273+
/// A `Result<Vec<TableIdent>>`, which is:
274+
/// - `Ok(vec![...])` containing a vector of `TableIdent` instances, each
275+
/// representing a table within the specified namespace.
276+
/// - `Err(...)` if an error occurs during namespace validation or while
277+
/// querying the database.
278+
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
279+
let db_name = validate_namespace(namespace)?;
280+
let mut table_list: Vec<TableIdent> = Vec::new();
281+
let mut next_token: Option<String> = None;
282+
283+
loop {
284+
let builder = match &next_token {
285+
Some(token) => self
286+
.client
287+
.0
288+
.get_tables()
289+
.database_name(&db_name)
290+
.next_token(token),
291+
None => self.client.0.get_tables().database_name(&db_name),
292+
};
293+
let builder = with_catalog_id!(builder, self.config);
294+
let resp = builder.send().await.map_err(from_aws_sdk_error)?;
295+
296+
let tables: Vec<_> = resp
297+
.table_list()
298+
.iter()
299+
.map(|tbl| TableIdent::new(namespace.clone(), tbl.name().to_string()))
300+
.collect();
301+
302+
table_list.extend(tables);
303+
304+
next_token = resp.next_token().map(ToOwned::to_owned);
305+
if next_token.is_none() {
306+
break;
307+
}
308+
}
309+
310+
Ok(table_list)
135311
}
136312

137313
async fn create_table(

crates/catalog/glue/src/error.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,13 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! Iceberg Glue Catalog implementation.
19-
2018
use anyhow::anyhow;
2119
use std::fmt::Debug;
2220

2321
use iceberg::{Error, ErrorKind};
2422

2523
/// Format AWS SDK error into iceberg error
26-
pub fn from_aws_error<T>(error: aws_sdk_glue::error::SdkError<T>) -> Error
24+
pub(crate) fn from_aws_sdk_error<T>(error: aws_sdk_glue::error::SdkError<T>) -> Error
2725
where
2826
T: Debug,
2927
{
@@ -33,3 +31,12 @@ where
3331
)
3432
.with_source(anyhow!("aws sdk error: {:?}", error))
3533
}
34+
35+
/// Format AWS Build error into iceberg error
36+
pub(crate) fn from_aws_build_error(error: aws_sdk_glue::error::BuildError) -> Error {
37+
Error::new(
38+
ErrorKind::Unexpected,
39+
"Operation failed for hitting aws build error".to_string(),
40+
)
41+
.with_source(anyhow!("aws build error: {:?}", error))
42+
}

0 commit comments

Comments
 (0)