Skip to content

Commit 3d6b976

Browse files
committed
Example of reading and writing parquet metadata outside the file
1 parent c8f648b commit 3d6b976

File tree

2 files changed

+227
-0
lines changed

2 files changed

+227
-0
lines changed

parquet/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,12 @@ sysinfo = ["dep:sysinfo"]
121121
# Verify 32-bit CRC checksum when decoding parquet pages
122122
crc = ["dep:crc32fast"]
123123

124+
125+
[[example]]
126+
name = "external_metadata"
127+
required-features = ["arrow", "async"]
128+
path = "./examples/external_metadata.rs"
129+
124130
[[example]]
125131
name = "read_parquet"
126132
required-features = ["arrow"]
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
19+
use arrow_cast::pretty::pretty_format_batches;
20+
use futures::TryStreamExt;
21+
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
22+
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
23+
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter};
24+
use parquet::file::properties::{EnabledStatistics, WriterProperties};
25+
use std::fs::File;
26+
use std::path::{Path, PathBuf};
27+
use std::sync::Arc;
28+
use tempfile::TempDir;
29+
30+
/// This example demonstrates advanced usage of the Parquet metadata APIs.
31+
///
32+
/// For example, you can copy the metadata for parquet files stored on remote
33+
/// object storage (e.g. S3) to a local file or an in-memory cache, use a query
34+
/// engine like DataFusion to analyze the metadata to determine which file to
35+
/// read, and then read any matching files with a single object store request.
36+
///
37+
/// # Usecase
38+
///
39+
/// 1. Read Parquet metadata from Parquet file stored on a remote source
40+
///
41+
/// 2. Store that metadata somewhere "locally" that is faster to access than
42+
/// the rest of a parquet data (e.g. a cache).
43+
///
44+
/// 3. Use the metadata to determine of the file should be read, and if so, read
45+
/// the remote Parquet file, without re-reading / reparsing the metadata footer.
46+
///
47+
/// # Example Overview
48+
/// 1. Reads the metadata of a Parquet file
49+
///
50+
/// 2. Removes some column statistics from the metadata (to make them smaller)
51+
///
52+
/// 3. Stores the metadata in a separate file
53+
///
54+
/// 4. Reads the metadata from the separate file and uses that to read the
55+
/// Parquet file
56+
#[tokio::main(flavor = "current_thread")]
57+
async fn main() -> parquet::errors::Result<()> {
58+
let tempdir = TempDir::new().unwrap();
59+
println!("data in {tempdir:?}");
60+
let parquet_path = create_parquet_file(&tempdir);
61+
let metadata_path = tempdir.path().join("thrift_metadata.dat");
62+
63+
// In this example, we use a tokio file to mimic an async remote data source
64+
let mut remote_parquet_file = tokio::fs::File::open(&parquet_path).await?;
65+
66+
let metadata = get_metadata_from_remote_parquet_file(&mut remote_parquet_file).await;
67+
println!(
68+
"Metadata from 'remote' Parquet file into memory: {} bytes",
69+
metadata.memory_size()
70+
);
71+
72+
// now slim down the metadata and write it to a "local" file
73+
let metadata = prepare_metadata(metadata);
74+
write_metadata_to_local_file(metadata, &metadata_path);
75+
76+
// now read the metadata from the local file and use it to read the "remote" Parquet file
77+
let metadata = read_metadata_from_local_file(&metadata_path);
78+
println!("Read metadata from file: {metadata:#?}");
79+
80+
let batches = read_remote_parquet_file_with_metadata(remote_parquet_file, metadata).await;
81+
82+
// display the results
83+
let batches_string = pretty_format_batches(&batches).unwrap().to_string();
84+
let batches_lines: Vec<_> = batches_string.split('\n').collect();
85+
86+
assert_eq!(
87+
batches_lines,
88+
[
89+
"+-----+-------------+",
90+
"| id | description |",
91+
"+-----+-------------+",
92+
"| 100 | oranges |",
93+
"| 200 | apples |",
94+
"| 201 | grapefruit |",
95+
"| 300 | bannanas |",
96+
"| 102 | grapes |",
97+
"| 33 | pears |",
98+
"+-----+-------------+",
99+
],
100+
"actual output:\n\n{batches_lines:#?}"
101+
);
102+
103+
Ok(())
104+
}
105+
106+
/// Reads the metadata from a "remote" parquet file
107+
///
108+
/// Note that this function models reading from a remote file source using a
109+
/// tokio file. In a real application, you would implement [`MetadataFetch`] for
110+
/// your own remote source.
111+
///
112+
/// [`MetadataFetch`]: parquet::arrow::async_reader::MetadataFetch
113+
async fn get_metadata_from_remote_parquet_file(
114+
remote_file: &mut tokio::fs::File,
115+
) -> ParquetMetaData {
116+
// the remote source must know the total file size (e.g. from an object store LIST operation)
117+
let file_size = remote_file.metadata().await.unwrap().len();
118+
119+
// tell the reader to read the page index
120+
ParquetMetaDataReader::new()
121+
.with_page_indexes(true)
122+
.load_and_finish(remote_file, file_size as usize)
123+
.await
124+
.unwrap()
125+
}
126+
127+
/// modifies the metadata to reduce its size
128+
fn prepare_metadata(metadata: ParquetMetaData) -> ParquetMetaData {
129+
// TODO
130+
metadata
131+
}
132+
133+
/// writes the metadata to a file
134+
///
135+
/// The data is stored using the same thrift format as the Parquet file metadata
136+
fn write_metadata_to_local_file(metadata: ParquetMetaData, file: impl AsRef<Path>) {
137+
let file = File::create(file).unwrap();
138+
ParquetMetaDataWriter::new(file, &metadata)
139+
.finish()
140+
.unwrap()
141+
}
142+
143+
/// Reads the metadata from a file
144+
///
145+
/// This function reads the format written by `write_metadata_to_file`
146+
fn read_metadata_from_local_file(file: impl AsRef<Path>) -> ParquetMetaData {
147+
let file = File::open(file).unwrap();
148+
ParquetMetaDataReader::new()
149+
.with_column_indexes(true)
150+
.with_offset_indexes(true)
151+
.parse_and_finish(&file)
152+
.unwrap()
153+
}
154+
155+
/// Reads the "remote" Parquet file using the metadata
156+
///
157+
/// This shows how to read the Parquet file using previously read metadata
158+
/// instead of the metadata in the Parquet file itself. This avoids an IO /
159+
/// having to fetch and decode the metadata from the Parquet file before
160+
/// beginning to read it.
161+
///
162+
/// Note that this function models reading from a remote file source using a
163+
/// tokio file. In a real application, you would implement [`AsyncFileReader`]
164+
/// for your own remote source.
165+
///
166+
/// In this example, we simply read the results as Arrow record batches
167+
///
168+
/// [`AsyncFileReader`]: parquet::arrow::async_reader::AsyncFileReader
169+
async fn read_remote_parquet_file_with_metadata(
170+
remote_file: tokio::fs::File,
171+
metadata: ParquetMetaData,
172+
) -> Vec<RecordBatch> {
173+
let options = ArrowReaderOptions::new()
174+
// tell the reader to read the page index
175+
.with_page_index(true);
176+
// create a reader with pre-existing metadata
177+
let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
178+
let reader =
179+
ParquetRecordBatchStreamBuilder::new_with_metadata(remote_file, arrow_reader_metadata)
180+
.build()
181+
.unwrap();
182+
183+
reader.try_collect::<Vec<_>>().await.unwrap()
184+
}
185+
186+
/// Make a new parquet file in the temporary directory, and returns the path
187+
fn create_parquet_file(tmpdir: &TempDir) -> PathBuf {
188+
let path = tmpdir.path().join("example.parquet");
189+
let new_file = File::create(&path).unwrap();
190+
191+
let batch = RecordBatch::try_from_iter(vec![
192+
(
193+
"id",
194+
Arc::new(Int32Array::from(vec![100, 200, 201, 300, 102, 33])) as ArrayRef,
195+
),
196+
(
197+
"description",
198+
Arc::new(StringArray::from(vec![
199+
"oranges",
200+
"apples",
201+
"grapefruit",
202+
"bannanas",
203+
"grapes",
204+
"pears",
205+
])),
206+
),
207+
])
208+
.unwrap();
209+
210+
let props = WriterProperties::builder()
211+
// ensure we write the page index level statistics
212+
.set_statistics_enabled(EnabledStatistics::Page)
213+
.build();
214+
215+
let mut writer = ArrowWriter::try_new(new_file, batch.schema(), Some(props)).unwrap();
216+
217+
writer.write(&batch).unwrap();
218+
writer.finish().unwrap();
219+
220+
path
221+
}

0 commit comments

Comments
 (0)