1616// under the License.
1717
1818use async_trait:: async_trait;
19+ use aws_sdk_glue:: types:: TableInput ;
20+ use iceberg:: io:: FileIO ;
21+ use iceberg:: spec:: { TableMetadata , TableMetadataBuilder } ;
1922use iceberg:: table:: Table ;
2023use iceberg:: {
2124 Catalog , Error , ErrorKind , Namespace , NamespaceIdent , Result , TableCommit , TableCreation ,
2225 TableIdent ,
2326} ;
2427use std:: { collections:: HashMap , fmt:: Debug } ;
28+ use tokio:: io:: { AsyncReadExt , AsyncWriteExt } ;
2529
2630use typed_builder:: TypedBuilder ;
2731
28- use crate :: error:: from_aws_sdk_error;
32+ use crate :: error:: { from_aws_build_error , from_aws_sdk_error} ;
2933use crate :: utils:: {
30- convert_to_database, convert_to_namespace, create_sdk_config, validate_namespace,
34+ convert_to_database, convert_to_glue_table, convert_to_namespace, create_metadata_location,
35+ create_sdk_config, get_default_table_location, get_metadata_location, validate_namespace,
3136} ;
3237use crate :: with_catalog_id;
3338
@@ -38,6 +43,7 @@ pub struct GlueCatalogConfig {
3843 uri : Option < String > ,
3944 #[ builder( default , setter( strip_option) ) ]
4045 catalog_id : Option < String > ,
46+ warehouse : String ,
4147 #[ builder( default ) ]
4248 props : HashMap < String , String > ,
4349}
@@ -48,6 +54,7 @@ struct GlueClient(aws_sdk_glue::Client);
4854pub struct GlueCatalog {
4955 config : GlueCatalogConfig ,
5056 client : GlueClient ,
57+ file_io : FileIO ,
5158}
5259
5360impl Debug for GlueCatalog {
@@ -60,15 +67,24 @@ impl Debug for GlueCatalog {
6067
6168impl GlueCatalog {
6269 /// Create a new glue catalog
63- pub async fn new ( config : GlueCatalogConfig ) -> Self {
70+ pub async fn new ( config : GlueCatalogConfig ) -> Result < Self > {
6471 let sdk_config = create_sdk_config ( & config. props , config. uri . as_ref ( ) ) . await ;
6572
6673 let client = aws_sdk_glue:: Client :: new ( & sdk_config) ;
6774
68- GlueCatalog {
75+ let file_io = FileIO :: from_path ( & config. warehouse ) ?
76+ . with_props ( & config. props )
77+ . build ( ) ?;
78+
79+ Ok ( GlueCatalog {
6980 config,
7081 client : GlueClient ( client) ,
71- }
82+ file_io,
83+ } )
84+ }
85+ /// Get the catalogs `FileIO`
86+ pub fn file_io ( & self ) -> FileIO {
87+ self . file_io . clone ( )
7288 }
7389}
7490
@@ -77,7 +93,7 @@ impl Catalog for GlueCatalog {
7793 /// List namespaces from glue catalog.
7894 ///
7995 /// Glue doesn't support nested namespaces.
80- /// We will return an empty list if parent is some
96+ /// We will return an empty list if parent is some.
8197 async fn list_namespaces (
8298 & self ,
8399 parent : Option < & NamespaceIdent > ,
@@ -277,6 +293,7 @@ impl Catalog for GlueCatalog {
277293 /// querying the database.
278294 async fn list_tables ( & self , namespace : & NamespaceIdent ) -> Result < Vec < TableIdent > > {
279295 let db_name = validate_namespace ( namespace) ?;
296+
280297 let mut table_list: Vec < TableIdent > = Vec :: new ( ) ;
281298 let mut next_token: Option < String > = None ;
282299
@@ -310,31 +327,282 @@ impl Catalog for GlueCatalog {
310327 Ok ( table_list)
311328 }
312329
330+ /// Creates a new table within a specified namespace using the provided
331+ /// table creation settings.
332+ ///
333+ /// # Returns
334+ /// A `Result` wrapping a `Table` object representing the newly created
335+ /// table.
336+ ///
337+ /// # Errors
338+ /// This function may return an error in several cases, including invalid
339+ /// namespace identifiers, failure to determine a default storage location,
340+ /// issues generating or writing table metadata, and errors communicating
341+ /// with the Glue Catalog.
313342 async fn create_table (
314343 & self ,
315- _namespace : & NamespaceIdent ,
316- _creation : TableCreation ,
344+ namespace : & NamespaceIdent ,
345+ creation : TableCreation ,
317346 ) -> Result < Table > {
318- todo ! ( )
347+ let db_name = validate_namespace ( namespace) ?;
348+ let table_name = creation. name . clone ( ) ;
349+
350+ let location = match & creation. location {
351+ Some ( location) => location. clone ( ) ,
352+ None => {
353+ let ns = self . get_namespace ( namespace) . await ?;
354+ get_default_table_location ( & ns, & db_name, & table_name, & self . config . warehouse )
355+ }
356+ } ;
357+
358+ let metadata = TableMetadataBuilder :: from_table_creation ( creation) ?. build ( ) ?;
359+ let metadata_location = create_metadata_location ( & location, 0 ) ?;
360+
361+ let mut file = self
362+ . file_io
363+ . new_output ( & metadata_location) ?
364+ . writer ( )
365+ . await ?;
366+ file. write_all ( & serde_json:: to_vec ( & metadata) ?) . await ?;
367+ file. shutdown ( ) . await ?;
368+
369+ let glue_table = convert_to_glue_table (
370+ & table_name,
371+ metadata_location. clone ( ) ,
372+ & metadata,
373+ metadata. properties ( ) ,
374+ None ,
375+ ) ?;
376+
377+ let builder = self
378+ . client
379+ . 0
380+ . create_table ( )
381+ . database_name ( & db_name)
382+ . table_input ( glue_table) ;
383+ let builder = with_catalog_id ! ( builder, self . config) ;
384+
385+ builder. send ( ) . await . map_err ( from_aws_sdk_error) ?;
386+
387+ let table = Table :: builder ( )
388+ . file_io ( self . file_io ( ) )
389+ . metadata_location ( metadata_location)
390+ . metadata ( metadata)
391+ . identifier ( TableIdent :: new ( NamespaceIdent :: new ( db_name) , table_name) )
392+ . build ( ) ;
393+
394+ Ok ( table)
319395 }
320396
321- async fn load_table ( & self , _table : & TableIdent ) -> Result < Table > {
322- todo ! ( )
397+ /// Loads a table from the Glue Catalog and constructs a `Table` object
398+ /// based on its metadata.
399+ ///
400+ /// # Returns
401+ /// A `Result` wrapping a `Table` object that represents the loaded table.
402+ ///
403+ /// # Errors
404+ /// This function may return an error in several scenarios, including:
405+ /// - Failure to validate the namespace.
406+ /// - Failure to retrieve the table from the Glue Catalog.
407+ /// - Absence of metadata location information in the table's properties.
408+ /// - Issues reading or deserializing the table's metadata file.
409+ async fn load_table ( & self , table : & TableIdent ) -> Result < Table > {
410+ let db_name = validate_namespace ( table. namespace ( ) ) ?;
411+ let table_name = table. name ( ) ;
412+
413+ let builder = self
414+ . client
415+ . 0
416+ . get_table ( )
417+ . database_name ( & db_name)
418+ . name ( table_name) ;
419+ let builder = with_catalog_id ! ( builder, self . config) ;
420+
421+ let glue_table_output = builder. send ( ) . await . map_err ( from_aws_sdk_error) ?;
422+
423+ match glue_table_output. table ( ) {
424+ None => Err ( Error :: new (
425+ ErrorKind :: Unexpected ,
426+ format ! (
427+ "Table object for database: {} and table: {} does not exist" ,
428+ db_name, table_name
429+ ) ,
430+ ) ) ,
431+ Some ( table) => {
432+ let metadata_location = get_metadata_location ( & table. parameters ) ?;
433+
434+ let mut reader = self . file_io . new_input ( & metadata_location) ?. reader ( ) . await ?;
435+ let mut metadata_str = String :: new ( ) ;
436+ reader. read_to_string ( & mut metadata_str) . await ?;
437+ let metadata = serde_json:: from_str :: < TableMetadata > ( & metadata_str) ?;
438+
439+ let table = Table :: builder ( )
440+ . file_io ( self . file_io ( ) )
441+ . metadata_location ( metadata_location)
442+ . metadata ( metadata)
443+ . identifier ( TableIdent :: new (
444+ NamespaceIdent :: new ( db_name) ,
445+ table_name. to_owned ( ) ,
446+ ) )
447+ . build ( ) ;
448+
449+ Ok ( table)
450+ }
451+ }
323452 }
324453
325- async fn drop_table ( & self , _table : & TableIdent ) -> Result < ( ) > {
326- todo ! ( )
454+ /// Asynchronously drops a table from the database.
455+ ///
456+ /// # Errors
457+ /// Returns an error if:
458+ /// - The namespace provided in `table` cannot be validated
459+ /// or does not exist.
460+ /// - The underlying database client encounters an error while
461+ /// attempting to drop the table. This includes scenarios where
462+ /// the table does not exist.
463+ /// - Any network or communication error occurs with the database backend.
464+ async fn drop_table ( & self , table : & TableIdent ) -> Result < ( ) > {
465+ let db_name = validate_namespace ( table. namespace ( ) ) ?;
466+ let table_name = table. name ( ) ;
467+
468+ let builder = self
469+ . client
470+ . 0
471+ . delete_table ( )
472+ . database_name ( & db_name)
473+ . name ( table_name) ;
474+ let builder = with_catalog_id ! ( builder, self . config) ;
475+
476+ builder. send ( ) . await . map_err ( from_aws_sdk_error) ?;
477+
478+ Ok ( ( ) )
327479 }
328480
329- async fn table_exists ( & self , _table : & TableIdent ) -> Result < bool > {
330- todo ! ( )
481+ /// Asynchronously checks the existence of a specified table
482+ /// in the database.
483+ ///
484+ /// # Returns
485+ /// - `Ok(true)` if the table exists in the database.
486+ /// - `Ok(false)` if the table does not exist in the database.
487+ /// - `Err(...)` if an error occurs during the process
488+ async fn table_exists ( & self , table : & TableIdent ) -> Result < bool > {
489+ let db_name = validate_namespace ( table. namespace ( ) ) ?;
490+ let table_name = table. name ( ) ;
491+
492+ let builder = self
493+ . client
494+ . 0
495+ . get_table ( )
496+ . database_name ( & db_name)
497+ . name ( table_name) ;
498+ let builder = with_catalog_id ! ( builder, self . config) ;
499+
500+ let resp = builder. send ( ) . await ;
501+
502+ match resp {
503+ Ok ( _) => Ok ( true ) ,
504+ Err ( err) => {
505+ if err
506+ . as_service_error ( )
507+ . map ( |e| e. is_entity_not_found_exception ( ) )
508+ == Some ( true )
509+ {
510+ return Ok ( false ) ;
511+ }
512+ Err ( from_aws_sdk_error ( err) )
513+ }
514+ }
331515 }
332516
333- async fn rename_table ( & self , _src : & TableIdent , _dest : & TableIdent ) -> Result < ( ) > {
334- todo ! ( )
517+ /// Asynchronously renames a table within the database
518+ /// or moves it between namespaces (databases).
519+ ///
520+ /// # Returns
521+ /// - `Ok(())` on successful rename or move of the table.
522+ /// - `Err(...)` if an error occurs during the process.
523+ async fn rename_table ( & self , src : & TableIdent , dest : & TableIdent ) -> Result < ( ) > {
524+ let src_db_name = validate_namespace ( src. namespace ( ) ) ?;
525+ let dest_db_name = validate_namespace ( dest. namespace ( ) ) ?;
526+
527+ let src_table_name = src. name ( ) ;
528+ let dest_table_name = dest. name ( ) ;
529+
530+ let builder = self
531+ . client
532+ . 0
533+ . get_table ( )
534+ . database_name ( & src_db_name)
535+ . name ( src_table_name) ;
536+ let builder = with_catalog_id ! ( builder, self . config) ;
537+
538+ let glue_table_output = builder. send ( ) . await . map_err ( from_aws_sdk_error) ?;
539+
540+ match glue_table_output. table ( ) {
541+ None => Err ( Error :: new (
542+ ErrorKind :: Unexpected ,
543+ format ! (
544+ "'Table' object for database: {} and table: {} does not exist" ,
545+ src_db_name, src_table_name
546+ ) ,
547+ ) ) ,
548+ Some ( table) => {
549+ let rename_table_input = TableInput :: builder ( )
550+ . name ( dest_table_name)
551+ . set_parameters ( table. parameters . clone ( ) )
552+ . set_storage_descriptor ( table. storage_descriptor . clone ( ) )
553+ . set_table_type ( table. table_type . clone ( ) )
554+ . set_description ( table. description . clone ( ) )
555+ . build ( )
556+ . map_err ( from_aws_build_error) ?;
557+
558+ let builder = self
559+ . client
560+ . 0
561+ . create_table ( )
562+ . database_name ( & dest_db_name)
563+ . table_input ( rename_table_input) ;
564+ let builder = with_catalog_id ! ( builder, self . config) ;
565+
566+ builder. send ( ) . await . map_err ( from_aws_sdk_error) ?;
567+
568+ let drop_src_table_result = self . drop_table ( src) . await ;
569+
570+ match drop_src_table_result {
571+ Ok ( _) => Ok ( ( ) ) ,
572+ Err ( _) => {
573+ let err_msg_src_table = format ! (
574+ "Failed to drop old table {}.{}." ,
575+ src_db_name, src_table_name
576+ ) ;
577+
578+ let drop_dest_table_result = self . drop_table ( dest) . await ;
579+
580+ match drop_dest_table_result {
581+ Ok ( _) => Err ( Error :: new (
582+ ErrorKind :: Unexpected ,
583+ format ! (
584+ "{} Rolled back table creation for {}.{}." ,
585+ err_msg_src_table, dest_db_name, dest_table_name
586+ ) ,
587+ ) ) ,
588+ Err ( _) => Err ( Error :: new (
589+ ErrorKind :: Unexpected ,
590+ format ! (
591+ "{} Failed to roll back table creation for {}.{}. Please clean up manually." ,
592+ err_msg_src_table, dest_db_name, dest_table_name
593+ ) ,
594+ ) ) ,
595+ }
596+ }
597+ }
598+ }
599+ }
335600 }
336601
337602 async fn update_table ( & self , _commit : TableCommit ) -> Result < Table > {
338- todo ! ( )
603+ Err ( Error :: new (
604+ ErrorKind :: FeatureUnsupported ,
605+ "Updating a table is not supported yet" ,
606+ ) )
339607 }
340608}
0 commit comments