@@ -23,6 +23,7 @@ use bytes::Bytes;
2323use datafusion_physical_plan:: metrics:: ExecutionPlanMetricsSet ;
2424use futures:: future:: BoxFuture ;
2525use object_store:: ObjectStore ;
26+ use parquet:: arrow:: arrow_reader:: { ArrowReaderMetadata , ArrowReaderOptions } ;
2627use parquet:: arrow:: async_reader:: { AsyncFileReader , ParquetObjectReader } ;
2728use parquet:: file:: metadata:: ParquetMetaData ;
2829use std:: fmt:: Debug ;
@@ -57,9 +58,49 @@ pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
5758 file_meta : FileMeta ,
5859 metadata_size_hint : Option < usize > ,
5960 metrics : & ExecutionPlanMetricsSet ,
60- ) -> datafusion_common:: Result < Box < dyn AsyncFileReader + Send > > ;
61+ ) -> datafusion_common:: Result < Box < dyn ParquetFileReader > > ;
6162}
6263
64+ /// [`AsyncFileReader`] augmented with a method to customize how file metadata is loaded.
65+ pub trait ParquetFileReader : AsyncFileReader + Send + ' static {
66+ /// Returns a [`AsyncFileReader`] trait object
67+ ///
68+ /// This can usually be implemented as `Box::new(*self)`
69+ fn upcast ( self : Box < Self > ) -> Box < dyn AsyncFileReader + ' static > ;
70+
71+ /// Parses the file's metadata
72+ ///
73+ /// The default implementation is:
74+ ///
75+ /// ```
76+ /// Box::pin(ArrowReaderMetadata::load_async(self, options))
77+ /// ```
78+ fn load_metadata (
79+ & mut self ,
80+ options : ArrowReaderOptions ,
81+ ) -> BoxFuture < ' _ , parquet:: errors:: Result < ArrowReaderMetadata > > ;
82+ }
83+
84+ macro_rules! impl_ParquetFileReader {
85+ ( $type: ty) => {
86+ impl ParquetFileReader for $type {
87+ fn upcast( self : Box <Self >) -> Box <dyn AsyncFileReader + ' static > {
88+ Box :: new( * self )
89+ }
90+
91+ fn load_metadata(
92+ & mut self ,
93+ options: ArrowReaderOptions ,
94+ ) -> BoxFuture <' _, parquet:: errors:: Result <ArrowReaderMetadata >> {
95+ Box :: pin( ArrowReaderMetadata :: load_async( self , options) )
96+ }
97+ }
98+ }
99+ }
100+
101+ impl_ParquetFileReader ! ( ParquetObjectReader ) ;
102+ impl_ParquetFileReader ! ( DefaultParquetFileReader ) ;
103+
63104/// Default implementation of [`ParquetFileReaderFactory`]
64105///
65106/// This implementation:
@@ -86,12 +127,12 @@ impl DefaultParquetFileReaderFactory {
86127/// This implementation does not coalesce I/O operations or cache bytes. Such
87128/// optimizations can be done either at the object store level or by providing a
88129/// custom implementation of [`ParquetFileReaderFactory`].
89- pub ( crate ) struct ParquetFileReader {
130+ pub ( crate ) struct DefaultParquetFileReader {
90131 pub file_metrics : ParquetFileMetrics ,
91132 pub inner : ParquetObjectReader ,
92133}
93134
94- impl AsyncFileReader for ParquetFileReader {
135+ impl AsyncFileReader for DefaultParquetFileReader {
95136 fn get_bytes (
96137 & mut self ,
97138 range : Range < usize > ,
@@ -126,7 +167,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
126167 file_meta : FileMeta ,
127168 metadata_size_hint : Option < usize > ,
128169 metrics : & ExecutionPlanMetricsSet ,
129- ) -> datafusion_common:: Result < Box < dyn AsyncFileReader + Send > > {
170+ ) -> datafusion_common:: Result < Box < dyn ParquetFileReader > > {
130171 let file_metrics = ParquetFileMetrics :: new (
131172 partition_index,
132173 file_meta. location ( ) . as_ref ( ) ,
@@ -139,7 +180,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
139180 inner = inner. with_footer_size_hint ( hint)
140181 } ;
141182
142- Ok ( Box :: new ( ParquetFileReader {
183+ Ok ( Box :: new ( DefaultParquetFileReader {
143184 inner,
144185 file_metrics,
145186 } ) )
0 commit comments