diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 8327b590..f54b24d1 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -18,6 +18,7 @@ set(ICEBERG_INCLUDES "$" "$") set(ICEBERG_SOURCES + arrow_c_data_guard_internal.cc catalog/memory/in_memory_catalog.cc expression/expression.cc expression/expressions.cc @@ -28,8 +29,12 @@ set(ICEBERG_SOURCES file_writer.cc inheritable_metadata.cc json_internal.cc + manifest_adapter.cc manifest_entry.cc manifest_list.cc + manifest_reader.cc + manifest_reader_internal.cc + manifest_writer.cc metadata_columns.cc name_mapping.cc partition_field.cc @@ -51,16 +56,15 @@ set(ICEBERG_SOURCES transform.cc transform_function.cc type.cc - manifest_reader.cc - manifest_reader_internal.cc - manifest_writer.cc - arrow_c_data_guard_internal.cc util/conversions.cc util/decimal.cc util/gzip_internal.cc util/murmurhash3_internal.cc util/timepoint.cc - util/uuid.cc) + util/uuid.cc + v1_metadata.cc + v2_metadata.cc + v3_metadata.cc) set(ICEBERG_STATIC_BUILD_INTERFACE_LIBS) set(ICEBERG_SHARED_BUILD_INTERFACE_LIBS) diff --git a/src/iceberg/arrow/nanoarrow_error_transform_internal.h b/src/iceberg/arrow/nanoarrow_error_transform_internal.h new file mode 100644 index 00000000..1a8f401d --- /dev/null +++ b/src/iceberg/arrow/nanoarrow_error_transform_internal.h @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#define ICEBERG_NANOARROW_RETURN_IF_NOT_OK(status) \ + if (status != NANOARROW_OK) [[unlikely]] { \ + return iceberg::InvalidArrowData("nanoarrow error: {}", status); \ + } + +#define ICEBERG_NANOARROW_RETURN_IF_NOT_OK_WITH_ERROR(status, error) \ + if (status != NANOARROW_OK) [[unlikely]] { \ + return iceberg::InvalidArrowData("nanoarrow error: {} msg:{}", status, \ + error.message); \ + } diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc index 4d82fb9a..1db0614e 100644 --- a/src/iceberg/avro/avro_writer.cc +++ b/src/iceberg/avro/avro_writer.cc @@ -75,9 +75,9 @@ class AvroWriter::Impl { return {}; } - Status Write(ArrowArray data) { + Status Write(ArrowArray* data) { ICEBERG_ARROW_ASSIGN_OR_RETURN(auto result, - ::arrow::ImportArray(&data, &arrow_schema_)); + ::arrow::ImportArray(data, &arrow_schema_)); for (int64_t i = 0; i < result->length(); i++) { ICEBERG_RETURN_UNEXPECTED(ExtractDatumFromArray(*result, i, datum_.get())); @@ -119,7 +119,7 @@ class AvroWriter::Impl { AvroWriter::~AvroWriter() = default; -Status AvroWriter::Write(ArrowArray data) { return impl_->Write(data); } +Status AvroWriter::Write(ArrowArray* data) { return impl_->Write(data); } Status AvroWriter::Open(const WriterOptions& options) { impl_ = std::make_unique(); diff --git a/src/iceberg/avro/avro_writer.h b/src/iceberg/avro/avro_writer.h index 57499d8e..0a5dd0b4 100644 --- a/src/iceberg/avro/avro_writer.h +++ b/src/iceberg/avro/avro_writer.h @@ -35,7 +35,7 @@ class ICEBERG_BUNDLE_EXPORT AvroWriter : public Writer { Status Close() final; - Status Write(ArrowArray data) final; + Status Write(ArrowArray* data) final; std::optional metrics() final; diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h index 135151af..9ecfb8b5 100644 --- a/src/iceberg/file_writer.h +++ b/src/iceberg/file_writer.h @@ -65,7 +65,7 @@ class ICEBERG_EXPORT Writer { /// \brief Write arrow data to the file. /// /// \return Status of write results. - virtual Status Write(ArrowArray data) = 0; + virtual Status Write(ArrowArray* data) = 0; /// \brief Get the file statistics. /// Only valid after the file is closed. diff --git a/src/iceberg/manifest_adapter.cc b/src/iceberg/manifest_adapter.cc new file mode 100644 index 00000000..3fe1c312 --- /dev/null +++ b/src/iceberg/manifest_adapter.cc @@ -0,0 +1,675 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest_adapter.h" + +#include + +#include "iceberg/arrow/nanoarrow_error_transform_internal.h" +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" +#include "iceberg/schema.h" +#include "iceberg/schema_internal.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace { +static constexpr int64_t kBlockSizeInBytesV1 = 64 * 1024 * 1024L; +} + +namespace iceberg { + +Status ManifestAdapter::StartAppending() { + if (size_ > 0) { + return InvalidArgument("Adapter buffer not empty, cannot start appending."); + } + array_ = {}; + size_ = 0; + ArrowError error; + ICEBERG_NANOARROW_RETURN_IF_NOT_OK_WITH_ERROR( + ArrowArrayInitFromSchema(&array_, &schema_, &error), error); + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayStartAppending(&array_)); + return {}; +} + +Result ManifestAdapter::FinishAppending() { + ArrowError error; + ICEBERG_NANOARROW_RETURN_IF_NOT_OK_WITH_ERROR( + ArrowArrayFinishBuildingDefault(&array_, &error), error); + return &array_; +} + +Status ManifestAdapter::AppendField(ArrowArray* arrowArray, int64_t value) { + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendInt(arrowArray, value)); + return {}; +} + +Status ManifestAdapter::AppendField(ArrowArray* arrowArray, uint64_t value) { + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendUInt(arrowArray, value)); + return {}; +} + +Status ManifestAdapter::AppendField(ArrowArray* arrowArray, double value) { + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendDouble(arrowArray, value)); + return {}; +} + +Status ManifestAdapter::AppendField(ArrowArray* arrowArray, std::string_view value) { + ArrowStringView view(value.data(), value.size()); + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendString(arrowArray, view)); + return {}; +} + +Status ManifestAdapter::AppendField(ArrowArray* arrowArray, + std::span value) { + ArrowBufferViewData data; + data.as_char = reinterpret_cast(value.data()); + ArrowBufferView view(data, value.size()); + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendBytes(arrowArray, view)); + return {}; +} + +ManifestEntryAdapter::~ManifestEntryAdapter() { + if (array_.release != nullptr) { + ArrowArrayRelease(&array_); + } + if (schema_.release != nullptr) { + ArrowSchemaRelease(&schema_); + } +} + +Result> ManifestEntryAdapter::GetManifestEntryStructType() { + if (partition_spec_ == nullptr) [[unlikely]] { + // NOTICE: never reach here by design, partition_spec_ should always not null. + return ManifestEntry::TypeFromPartitionType(nullptr); + } + ICEBERG_ASSIGN_OR_RAISE(auto partition_schema, partition_spec_->PartitionType()); + return ManifestEntry::TypeFromPartitionType(std::move(partition_schema)); +} + +Status ManifestEntryAdapter::AppendPartition( + ArrowArray* arrow_array, const std::shared_ptr& partition_type, + const std::vector& partitions) { + if (arrow_array->n_children != partition_type->fields().size()) [[unlikely]] { + return InvalidManifest("Arrow array of partition does not match partition type."); + } + if (partitions.size() != partition_type->fields().size()) [[unlikely]] { + return InvalidManifest("Literal list of partition does not match partition type."); + } + auto fields = partition_type->fields(); + + for (int32_t i = 0; i < fields.size(); i++) { + const auto& partition = partitions[i]; + const auto& field = fields[i]; + auto array = arrow_array->children[i]; + if (partition.IsNull()) { + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendNull(array, 1)); + continue; + } + switch (field.type()->type_id()) { + case TypeId::kBoolean: + ICEBERG_RETURN_UNEXPECTED(AppendField( + array, + static_cast(std::get(partition.value()) == true ? 1L : 0L))); + break; + case TypeId::kInt: + ICEBERG_RETURN_UNEXPECTED(AppendField( + array, static_cast(std::get(partition.value())))); + break; + case TypeId::kLong: + ICEBERG_RETURN_UNEXPECTED( + AppendField(array, std::get(partition.value()))); + break; + case TypeId::kFloat: + ICEBERG_RETURN_UNEXPECTED( + AppendField(array, static_cast(std::get(partition.value())))); + break; + case TypeId::kDouble: + ICEBERG_RETURN_UNEXPECTED( + AppendField(array, std::get(partition.value()))); + break; + case TypeId::kString: + ICEBERG_RETURN_UNEXPECTED( + AppendField(array, std::get(partition.value()))); + break; + case TypeId::kFixed: + case TypeId::kBinary: + ICEBERG_RETURN_UNEXPECTED( + AppendField(array, std::get>(partition.value()))); + break; + case TypeId::kDate: + ICEBERG_RETURN_UNEXPECTED(AppendField( + array, static_cast(std::get(partition.value())))); + break; + case TypeId::kTime: + case TypeId::kTimestamp: + case TypeId::kTimestampTz: + ICEBERG_RETURN_UNEXPECTED( + AppendField(array, std::get(partition.value()))); + break; + case TypeId::kDecimal: + ICEBERG_RETURN_UNEXPECTED( + AppendField(array, std::get>(partition.value()))); + break; + case TypeId::kUuid: + case TypeId::kStruct: + case TypeId::kList: + case TypeId::kMap: + // TODO(xiao.dong) currently literal does not support those types + default: + return InvalidManifest("Unsupported partition type: {}", field.ToString()); + } + } + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayFinishElement(arrow_array)); + return {}; +} + +Status ManifestEntryAdapter::AppendList(ArrowArray* arrow_array, + const std::vector& list_value) { + auto list_array = arrow_array->children[0]; + for (const auto& value : list_value) { + ICEBERG_NANOARROW_RETURN_IF_NOT_OK( + ArrowArrayAppendInt(list_array, static_cast(value))); + } + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayFinishElement(arrow_array)); + return {}; +} + +Status ManifestEntryAdapter::AppendList(ArrowArray* arrow_array, + const std::vector& list_value) { + auto list_array = arrow_array->children[0]; + for (const auto& value : list_value) { + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendInt(list_array, value)); + } + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayFinishElement(arrow_array)); + return {}; +} + +Status ManifestEntryAdapter::AppendMap(ArrowArray* arrow_array, + const std::map& map_value) { + auto map_array = arrow_array->children[0]; + if (map_array->n_children != 2) { + return InvalidManifest("Invalid map array."); + } + for (const auto& [key, value] : map_value) { + auto key_array = map_array->children[0]; + auto value_array = map_array->children[1]; + ICEBERG_RETURN_UNEXPECTED(AppendField(key_array, static_cast(key))); + ICEBERG_RETURN_UNEXPECTED(AppendField(value_array, value)); + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayFinishElement(map_array)); + } + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayFinishElement(arrow_array)); + return {}; +} + +Status ManifestEntryAdapter::AppendMap( + ArrowArray* arrow_array, const std::map>& map_value) { + auto map_array = arrow_array->children[0]; + if (map_array->n_children != 2) { + return InvalidManifest("Invalid map array."); + } + for (const auto& [key, value] : map_value) { + auto key_array = map_array->children[0]; + auto value_array = map_array->children[1]; + ICEBERG_RETURN_UNEXPECTED(AppendField(key_array, static_cast(key))); + ICEBERG_RETURN_UNEXPECTED(AppendField(value_array, value)); + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayFinishElement(map_array)); + } + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayFinishElement(arrow_array)); + return {}; +} + +Status ManifestEntryAdapter::AppendDataFile( + ArrowArray* arrow_array, const std::shared_ptr& data_file_type, + const DataFile& file) { + auto fields = data_file_type->fields(); + for (int32_t i = 0; i < fields.size(); i++) { + const auto& field = fields[i]; + auto array = arrow_array->children[i]; + + switch (field.field_id()) { + case 134: // content (optional int32) + ICEBERG_RETURN_UNEXPECTED(AppendField(array, static_cast(file.content))); + break; + case 100: // file_path (required string) + ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.file_path)); + break; + case 101: // file_format (required string) + ICEBERG_RETURN_UNEXPECTED(AppendField(array, ToString(file.file_format))); + break; + case 102: // partition (required struct) + { + auto partition_type = internal::checked_pointer_cast(field.type()); + ICEBERG_RETURN_UNEXPECTED(AppendPartition(array, partition_type, file.partition)); + } break; + case 103: // record_count (required int64) + ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.record_count)); + break; + case 104: // file_size_in_bytes (required int64) + ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.file_size_in_bytes)); + break; + case 105: // block_size_in_bytes (compatible in v1) + // always 64MB for v1 + ICEBERG_RETURN_UNEXPECTED(AppendField(array, kBlockSizeInBytesV1)); + break; + case 108: // column_sizes (optional map) + ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file.column_sizes)); + break; + case 109: // value_counts (optional map) + ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file.value_counts)); + break; + case 110: // null_value_counts (optional map) + ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file.null_value_counts)); + break; + case 137: // nan_value_counts (optional map) + ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file.nan_value_counts)); + break; + case 125: // lower_bounds (optional map) + ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file.lower_bounds)); + break; + case 128: // upper_bounds (optional map) + ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file.upper_bounds)); + break; + case 131: // key_metadata (optional binary) + if (!file.key_metadata.empty()) { + ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.key_metadata)); + } else { + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendNull(array, 1)); + } + break; + case 132: // split_offsets (optional list) + ICEBERG_RETURN_UNEXPECTED(AppendList(array, file.split_offsets)); + break; + case 135: // equality_ids (optional list) + ICEBERG_RETURN_UNEXPECTED(AppendList(array, file.equality_ids)); + break; + case 140: // sort_order_id (optional int32) + if (file.sort_order_id.has_value()) { + ICEBERG_RETURN_UNEXPECTED( + AppendField(array, static_cast(file.sort_order_id.value()))); + } else { + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendNull(array, 1)); + } + break; + case 142: // first_row_id (optional int64) + if (file.first_row_id.has_value()) { + ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.first_row_id.value())); + } else { + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendNull(array, 1)); + } + break; + case 143: // referenced_data_file (optional string) + { + ICEBERG_ASSIGN_OR_RAISE(auto referenced_data_file, GetReferenceDataFile(file)); + if (referenced_data_file.has_value()) { + ICEBERG_RETURN_UNEXPECTED(AppendField(array, referenced_data_file.value())); + } else { + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendNull(array, 1)); + } + break; + } + case 144: // content_offset (optional int64) + if (file.content_offset.has_value()) { + ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.content_offset.value())); + } else { + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendNull(array, 1)); + } + break; + case 145: // content_size_in_bytes (optional int64) + if (file.content_size_in_bytes.has_value()) { + ICEBERG_RETURN_UNEXPECTED( + AppendField(array, file.content_size_in_bytes.value())); + } else { + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendNull(array, 1)); + } + break; + default: + return InvalidManifest("Unknown data file field id: {} ", field.field_id()); + } + } + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayFinishElement(arrow_array)); + return {}; +} + +Result> ManifestEntryAdapter::GetSequenceNumber( + const ManifestEntry& entry) { + return entry.sequence_number; +} + +Result> ManifestEntryAdapter::GetReferenceDataFile( + const DataFile& file) { + return file.referenced_data_file; +} + +Result> ManifestEntryAdapter::GetFirstRowId(const DataFile& file) { + return file.first_row_id; +} + +Result> ManifestEntryAdapter::GetContentOffset( + const DataFile& file) { + return file.content_offset; +} + +Result> ManifestEntryAdapter::GetContentSizeInBytes( + const DataFile& file) { + return file.content_size_in_bytes; +} + +Status ManifestEntryAdapter::AppendInternal(const ManifestEntry& entry) { + const auto& fields = manifest_schema_->fields(); + for (int32_t i = 0; i < fields.size(); i++) { + const auto& field = fields[i]; + auto array = array_.children[i]; + + switch (field.field_id()) { + case 0: // status (required int32) + ICEBERG_RETURN_UNEXPECTED( + AppendField(array, static_cast(static_cast(entry.status)))); + break; + case 1: // snapshot_id (optional int64) + if (entry.snapshot_id.has_value()) { + ICEBERG_RETURN_UNEXPECTED(AppendField(array, entry.snapshot_id.value())); + } else { + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendNull(array, 1)); + } + break; + case 2: // data_file (required struct) + if (entry.data_file) { + // Get the data file type from the field + auto data_file_type = internal::checked_pointer_cast(field.type()); + ICEBERG_RETURN_UNEXPECTED( + AppendDataFile(array, data_file_type, *entry.data_file)); + } else { + return InvalidManifest("Missing required data file field."); + } + break; + case 3: // sequence_number (optional int64) + { + ICEBERG_ASSIGN_OR_RAISE(auto sequence_num, GetSequenceNumber(entry)); + if (sequence_num.has_value()) { + ICEBERG_RETURN_UNEXPECTED(AppendField(array, sequence_num.value())); + } else { + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendNull(array, 1)); + } + break; + } + case 4: // file_sequence_number (optional int64) + if (entry.file_sequence_number.has_value()) { + ICEBERG_RETURN_UNEXPECTED( + AppendField(array, entry.file_sequence_number.value())); + } else { + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendNull(array, 1)); + } + break; + default: + return InvalidManifest("Unknown manifest entry field id: {}", field.field_id()); + } + } + + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayFinishElement(&array_)); + size_++; + return {}; +} + +Status ManifestEntryAdapter::InitSchema(const std::unordered_set& fields_ids) { + ICEBERG_ASSIGN_OR_RAISE(auto manifest_entry_schema, GetManifestEntryStructType()) + auto fields_span = manifest_entry_schema->fields(); + std::vector fields; + // TODO(xiao.dong) make this a common function to recursive handle + // all nested fields in schema + for (const auto& field : fields_span) { + if (field.field_id() == 2) { + // handle data_file field + auto data_file_struct = internal::checked_pointer_cast(field.type()); + std::vector data_file_fields; + for (const auto& data_file_field : data_file_struct->fields()) { + if (fields_ids.contains(data_file_field.field_id())) { + data_file_fields.emplace_back(data_file_field); + } + } + auto type = std::make_shared(data_file_fields); + auto data_file_field = SchemaField::MakeRequired( + field.field_id(), std::string(field.name()), std::move(type)); + fields.emplace_back(std::move(data_file_field)); + } else { + if (fields_ids.contains(field.field_id())) { + fields.emplace_back(field); + } + } + } + manifest_schema_ = std::make_shared(fields); + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*manifest_schema_, &schema_)); + return {}; +} + +ManifestFileAdapter::~ManifestFileAdapter() { + if (array_.release != nullptr) { + ArrowArrayRelease(&array_); + } + if (schema_.release != nullptr) { + ArrowSchemaRelease(&schema_); + } +} + +Status ManifestFileAdapter::AppendPartitionSummary( + ArrowArray* arrow_array, const std::shared_ptr& summary_type, + const std::vector& summaries) { + auto& array = arrow_array->children[0]; + if (array->n_children != 4) { + return InvalidManifestList("Invalid partition array."); + } + auto summary_struct = + internal::checked_pointer_cast(summary_type->fields()[0].type()); + auto summary_fields = summary_struct->fields(); + for (const auto& summary : summaries) { + for (const auto& summary_field : summary_fields) { + switch (summary_field.field_id()) { + case 509: // contains_null (required bool) + ICEBERG_RETURN_UNEXPECTED(AppendField( + array->children[0], static_cast(summary.contains_null ? 1 : 0))); + break; + case 518: // contains_nan (optional bool) + { + auto field_array = array->children[1]; + if (summary.contains_nan.has_value()) { + ICEBERG_RETURN_UNEXPECTED( + AppendField(field_array, + static_cast(summary.contains_nan.value() ? 1 : 0))); + } else { + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendNull(field_array, 1)); + } + break; + } + case 510: // lower_bound (optional binary) + { + auto field_array = array->children[2]; + if (summary.lower_bound.has_value() && !summary.lower_bound->empty()) { + ICEBERG_RETURN_UNEXPECTED( + AppendField(field_array, summary.lower_bound.value())); + } else { + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendNull(field_array, 1)); + } + break; + } + case 511: // upper_bound (optional binary) + { + auto field_array = array->children[3]; + if (summary.upper_bound.has_value() && !summary.upper_bound->empty()) { + ICEBERG_RETURN_UNEXPECTED( + AppendField(field_array, summary.upper_bound.value())); + } else { + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendNull(field_array, 1)); + } + break; + } + default: + return InvalidManifestList("Unknown field id: {}", summary_field.field_id()); + } + } + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayFinishElement(array)); + } + + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayFinishElement(arrow_array)); + return {}; +} + +Result ManifestFileAdapter::GetSequenceNumber(const ManifestFile& file) { + return file.sequence_number; +} + +Result ManifestFileAdapter::GetMinSequenceNumber(const ManifestFile& file) { + return file.min_sequence_number; +} + +Result> ManifestFileAdapter::GetFirstRowId( + const ManifestFile& file) { + return file.first_row_id; +} + +Status ManifestFileAdapter::AppendInternal(const ManifestFile& file) { + const auto& fields = manifest_list_schema_->fields(); + for (int32_t i = 0; i < fields.size(); i++) { + const auto& field = fields[i]; + auto array = array_.children[i]; + switch (field.field_id()) { + case 500: // manifest_path + ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.manifest_path)); + break; + case 501: // manifest_length + ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.manifest_length)); + break; + case 502: // partition_spec_id + ICEBERG_RETURN_UNEXPECTED( + AppendField(array, static_cast(file.partition_spec_id))); + break; + case 517: // content + ICEBERG_RETURN_UNEXPECTED( + AppendField(array, static_cast(static_cast(file.content)))); + break; + case 515: // sequence_number + { + ICEBERG_ASSIGN_OR_RAISE(auto sequence_num, GetSequenceNumber(file)); + ICEBERG_RETURN_UNEXPECTED(AppendField(array, sequence_num)); + break; + } + case 516: // min_sequence_number + { + ICEBERG_ASSIGN_OR_RAISE(auto min_sequence_num, GetMinSequenceNumber(file)); + ICEBERG_RETURN_UNEXPECTED(AppendField(array, min_sequence_num)); + break; + } + case 503: // added_snapshot_id + ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.added_snapshot_id)); + break; + case 504: // added_files_count + if (file.added_files_count.has_value()) { + ICEBERG_RETURN_UNEXPECTED( + AppendField(array, static_cast(file.added_files_count.value()))); + } else { + // Append null for optional field + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendNull(array, 1)); + } + break; + case 505: // existing_files_count + if (file.existing_files_count.has_value()) { + ICEBERG_RETURN_UNEXPECTED(AppendField( + array, static_cast(file.existing_files_count.value()))); + } else { + // Append null for optional field + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendNull(array, 1)); + } + break; + case 506: // deleted_files_count + if (file.deleted_files_count.has_value()) { + ICEBERG_RETURN_UNEXPECTED( + AppendField(array, static_cast(file.deleted_files_count.value()))); + } else { + // Append null for optional field + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendNull(array, 1)); + } + break; + case 512: // added_rows_count + if (file.added_rows_count.has_value()) { + ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.added_rows_count.value())); + } else { + // Append null for optional field + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendNull(array, 1)); + } + break; + case 513: // existing_rows_count + if (file.existing_rows_count.has_value()) { + ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.existing_rows_count.value())); + } else { + // Append null for optional field + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendNull(array, 1)); + } + break; + case 514: // deleted_rows_count + if (file.deleted_rows_count.has_value()) { + ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.deleted_rows_count.value())); + } else { + // Append null for optional field + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendNull(array, 1)); + } + break; + case 507: // partitions + ICEBERG_RETURN_UNEXPECTED(AppendPartitionSummary( + array, internal::checked_pointer_cast(field.type()), + file.partitions)); + break; + case 519: // key_metadata + if (!file.key_metadata.empty()) { + ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.key_metadata)); + } else { + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendNull(array, 1)); + } + break; + case 520: // first_row_id + { + ICEBERG_ASSIGN_OR_RAISE(auto first_row_id, GetFirstRowId(file)); + if (first_row_id.has_value()) { + ICEBERG_RETURN_UNEXPECTED(AppendField(array, first_row_id.value())); + } else { + // Append null for optional field + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayAppendNull(array, 1)); + } + break; + } + default: + return InvalidManifestList("Unknown field id: {}", field.field_id()); + } + } + ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayFinishElement(&array_)); + size_++; + return {}; +} + +Status ManifestFileAdapter::InitSchema(const std::unordered_set& fields_ids) { + std::vector fields; + for (const auto& field : ManifestFile::Type().fields()) { + if (fields_ids.contains(field.field_id())) { + fields.emplace_back(field); + } + } + manifest_list_schema_ = std::make_shared(fields); + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*manifest_list_schema_, &schema_)); + return {}; +} + +} // namespace iceberg diff --git a/src/iceberg/manifest_adapter.h b/src/iceberg/manifest_adapter.h index 2ffe51be..cec6a0b4 100644 --- a/src/iceberg/manifest_adapter.h +++ b/src/iceberg/manifest_adapter.h @@ -22,6 +22,14 @@ /// \file iceberg/metadata_adapter.h /// Base class of adapter for v1v2v3v4 metadata. +#include +#include +#include +#include +#include +#include +#include + #include "iceberg/arrow_c_data.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" @@ -33,24 +41,71 @@ class ICEBERG_EXPORT ManifestAdapter { public: ManifestAdapter() = default; virtual ~ManifestAdapter() = default; + virtual Status Init() = 0; - virtual Status StartAppending() = 0; - virtual Result FinishAppending() = 0; + Status StartAppending(); + Result FinishAppending(); int64_t size() const { return size_; } + protected: + static Status AppendField(ArrowArray* arrowArray, int64_t value); + static Status AppendField(ArrowArray* arrowArray, uint64_t value); + static Status AppendField(ArrowArray* arrowArray, double value); + static Status AppendField(ArrowArray* arrowArray, std::string_view value); + static Status AppendField(ArrowArray* arrowArray, std::span value); + protected: ArrowArray array_; + ArrowSchema schema_; // converted from manifest_schema_ or manifest_list_schema_ int64_t size_ = 0; + std::unordered_map metadata_; }; // \brief Implemented by different versions with different schemas to // append a list of `ManifestEntry`s to an `ArrowArray`. class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter { public: - ManifestEntryAdapter() = default; - ~ManifestEntryAdapter() override = default; + explicit ManifestEntryAdapter(std::shared_ptr partition_spec) + : partition_spec_(std::move(partition_spec)) {} + ~ManifestEntryAdapter() override; virtual Status Append(const ManifestEntry& entry) = 0; + + const std::shared_ptr& schema() const { return manifest_schema_; } + + protected: + virtual Result> GetManifestEntryStructType(); + + /// \brief Init version-specific schema for each version. + /// + /// \param fields_ids each version of manifest schema has schema, we will init this + /// schema based on the fields_ids. + Status InitSchema(const std::unordered_set& fields_ids); + Status AppendInternal(const ManifestEntry& entry); + Status AppendDataFile(ArrowArray* arrow_array, + const std::shared_ptr& data_file_type, + const DataFile& file); + static Status AppendPartition(ArrowArray* arrow_array, + const std::shared_ptr& partition_type, + const std::vector& partitions); + static Status AppendList(ArrowArray* arrow_array, + const std::vector& list_value); + static Status AppendList(ArrowArray* arrow_array, + const std::vector& list_value); + static Status AppendMap(ArrowArray* arrow_array, + const std::map& map_value); + static Status AppendMap(ArrowArray* arrow_array, + const std::map>& map_value); + + virtual Result> GetSequenceNumber(const ManifestEntry& entry); + virtual Result> GetReferenceDataFile(const DataFile& file); + virtual Result> GetFirstRowId(const DataFile& file); + virtual Result> GetContentOffset(const DataFile& file); + virtual Result> GetContentSizeInBytes(const DataFile& file); + + protected: + std::shared_ptr partition_spec_; + std::shared_ptr manifest_schema_; }; // \brief Implemented by different versions with different schemas to @@ -58,9 +113,29 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter { class ICEBERG_EXPORT ManifestFileAdapter : public ManifestAdapter { public: ManifestFileAdapter() = default; - ~ManifestFileAdapter() override = default; + ~ManifestFileAdapter() override; virtual Status Append(const ManifestFile& file) = 0; + + const std::shared_ptr& schema() const { return manifest_list_schema_; } + + protected: + /// \brief Init version-specific schema for each version. + /// + /// \param fields_ids each version of manifest schema has schema, we will init this + /// schema based on the fields_ids. + Status InitSchema(const std::unordered_set& fields_ids); + Status AppendInternal(const ManifestFile& file); + static Status AppendPartitionSummary( + ArrowArray* arrow_array, const std::shared_ptr& summary_type, + const std::vector& summaries); + + virtual Result GetSequenceNumber(const ManifestFile& file); + virtual Result GetMinSequenceNumber(const ManifestFile& file); + virtual Result> GetFirstRowId(const ManifestFile& file); + + protected: + std::shared_ptr manifest_list_schema_; }; } // namespace iceberg diff --git a/src/iceberg/manifest_entry.cc b/src/iceberg/manifest_entry.cc index d387aad6..5a963b5d 100644 --- a/src/iceberg/manifest_entry.cc +++ b/src/iceberg/manifest_entry.cc @@ -44,7 +44,8 @@ std::shared_ptr DataFile::Type(std::shared_ptr partition kContent, kFilePath, kFileFormat, - SchemaField::MakeRequired(102, kPartitionField, std::move(partition_type)), + SchemaField::MakeRequired(kPartitionFieldId, kPartitionField, + std::move(partition_type)), kRecordCount, kFileSize, kColumnSizes, @@ -70,9 +71,10 @@ std::shared_ptr ManifestEntry::TypeFromPartitionType( std::shared_ptr ManifestEntry::TypeFromDataFileType( std::shared_ptr datafile_type) { - return std::make_shared(std::vector{ - kStatus, kSnapshotId, kSequenceNumber, kFileSequenceNumber, - SchemaField::MakeRequired(2, kDataFileField, std::move(datafile_type))}); + return std::make_shared( + std::vector{kStatus, kSnapshotId, kSequenceNumber, kFileSequenceNumber, + SchemaField::MakeRequired(kDataFileFieldId, kDataFileField, + std::move(datafile_type))}); } } // namespace iceberg diff --git a/src/iceberg/manifest_entry.h b/src/iceberg/manifest_entry.h index 0aa697aa..2b9987b0 100644 --- a/src/iceberg/manifest_entry.h +++ b/src/iceberg/manifest_entry.h @@ -183,6 +183,7 @@ struct ICEBERG_EXPORT DataFile { 100, "file_path", iceberg::string(), "Location URI with FS scheme"); inline static const SchemaField kFileFormat = SchemaField::MakeRequired( 101, "file_format", iceberg::string(), "File format name: avro, orc, or parquet"); + inline static const int32_t kPartitionFieldId = 102; inline static const std::string kPartitionField = "partition"; inline static const SchemaField kRecordCount = SchemaField::MakeRequired( 103, "record_count", iceberg::int64(), "Number of records in the file"); @@ -296,11 +297,12 @@ struct ICEBERG_EXPORT ManifestEntry { SchemaField::MakeRequired(0, "status", iceberg::int32()); inline static const SchemaField kSnapshotId = SchemaField::MakeOptional(1, "snapshot_id", iceberg::int64()); + inline static const int32_t kDataFileFieldId = 2; + inline static const std::string kDataFileField = "data_file"; inline static const SchemaField kSequenceNumber = SchemaField::MakeOptional(3, "sequence_number", iceberg::int64()); inline static const SchemaField kFileSequenceNumber = SchemaField::MakeOptional(4, "file_sequence_number", iceberg::int64()); - inline static const std::string kDataFileField = "data_file"; bool operator==(const ManifestEntry& other) const; diff --git a/src/iceberg/manifest_reader_internal.cc b/src/iceberg/manifest_reader_internal.cc index feff82f1..4b20af9c 100644 --- a/src/iceberg/manifest_reader_internal.cc +++ b/src/iceberg/manifest_reader_internal.cc @@ -21,6 +21,7 @@ #include +#include "iceberg/arrow/nanoarrow_error_transform_internal.h" #include "iceberg/arrow_c_data_guard_internal.h" #include "iceberg/file_format.h" #include "iceberg/manifest_entry.h" @@ -32,11 +33,6 @@ namespace iceberg { -#define NANOARROW_RETURN_IF_NOT_OK(status, error) \ - if (status != NANOARROW_OK) [[unlikely]] { \ - return InvalidArrowData("Nanoarrow error: {}", error.message); \ - } - #define PARSE_PRIMITIVE_FIELD(item, array_view, type) \ for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) { \ if (!ArrowArrayViewIsNull(array_view, row_idx)) { \ @@ -208,12 +204,12 @@ Result> ParseManifestList(ArrowSchema* schema, ArrowError error; ArrowArrayView array_view; auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error); - NANOARROW_RETURN_IF_NOT_OK(status, error); + ICEBERG_NANOARROW_RETURN_IF_NOT_OK_WITH_ERROR(status, error); internal::ArrowArrayViewGuard view_guard(&array_view); status = ArrowArrayViewSetArray(&array_view, array_in, &error); - NANOARROW_RETURN_IF_NOT_OK(status, error); + ICEBERG_NANOARROW_RETURN_IF_NOT_OK_WITH_ERROR(status, error); status = ArrowArrayViewValidate(&array_view, NANOARROW_VALIDATION_LEVEL_FULL, &error); - NANOARROW_RETURN_IF_NOT_OK(status, error); + ICEBERG_NANOARROW_RETURN_IF_NOT_OK_WITH_ERROR(status, error); std::vector manifest_files; manifest_files.resize(array_in->length); @@ -471,12 +467,12 @@ Result> ParseManifestEntry(ArrowSchema* schema, ArrowError error; ArrowArrayView array_view; auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error); - NANOARROW_RETURN_IF_NOT_OK(status, error); + ICEBERG_NANOARROW_RETURN_IF_NOT_OK_WITH_ERROR(status, error); internal::ArrowArrayViewGuard view_guard(&array_view); status = ArrowArrayViewSetArray(&array_view, array_in, &error); - NANOARROW_RETURN_IF_NOT_OK(status, error); + ICEBERG_NANOARROW_RETURN_IF_NOT_OK_WITH_ERROR(status, error); status = ArrowArrayViewValidate(&array_view, NANOARROW_VALIDATION_LEVEL_FULL, &error); - NANOARROW_RETURN_IF_NOT_OK(status, error); + ICEBERG_NANOARROW_RETURN_IF_NOT_OK_WITH_ERROR(status, error); std::vector manifest_entries; manifest_entries.resize(array_in->length); diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc index 27fd3f76..7bdfee7b 100644 --- a/src/iceberg/manifest_writer.cc +++ b/src/iceberg/manifest_writer.cc @@ -50,7 +50,7 @@ Status ManifestWriter::Close() { ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending()); ICEBERG_RETURN_UNEXPECTED(writer_->Write(array)); } - return {}; + return writer_->Close(); } Result> OpenFileWriter(std::string_view location, @@ -66,48 +66,47 @@ Result> OpenFileWriter(std::string_view location, Result> ManifestWriter::MakeV1Writer( std::optional snapshot_id, std::string_view manifest_location, - std::shared_ptr file_io, std::shared_ptr partition_schema) { - // TODO(xiao.dong) parse v1 schema - auto manifest_entry_schema = - ManifestEntry::TypeFromPartitionType(std::move(partition_schema)); - auto fields_span = manifest_entry_schema->fields(); - std::vector fields(fields_span.begin(), fields_span.end()); - auto schema = std::make_shared(fields); - ICEBERG_ASSIGN_OR_RAISE(auto writer, - OpenFileWriter(manifest_location, schema, std::move(file_io))); - auto adapter = std::make_unique(snapshot_id, std::move(schema)); + std::shared_ptr file_io, std::shared_ptr partition_spec) { + auto adapter = + std::make_unique(snapshot_id, std::move(partition_spec)); + ICEBERG_RETURN_UNEXPECTED(adapter->Init()); + ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); + + auto schema = adapter->schema(); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, + OpenFileWriter(manifest_location, std::move(schema), std::move(file_io))); return std::make_unique(std::move(writer), std::move(adapter)); } Result> ManifestWriter::MakeV2Writer( std::optional snapshot_id, std::string_view manifest_location, - std::shared_ptr file_io, std::shared_ptr partition_schema) { - // TODO(xiao.dong) parse v2 schema - auto manifest_entry_schema = - ManifestEntry::TypeFromPartitionType(std::move(partition_schema)); - auto fields_span = manifest_entry_schema->fields(); - std::vector fields(fields_span.begin(), fields_span.end()); - auto schema = std::make_shared(fields); - ICEBERG_ASSIGN_OR_RAISE(auto writer, - OpenFileWriter(manifest_location, schema, std::move(file_io))); - auto adapter = std::make_unique(snapshot_id, std::move(schema)); + std::shared_ptr file_io, std::shared_ptr partition_spec) { + auto adapter = + std::make_unique(snapshot_id, std::move(partition_spec)); + ICEBERG_RETURN_UNEXPECTED(adapter->Init()); + ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); + + auto schema = adapter->schema(); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, + OpenFileWriter(manifest_location, std::move(schema), std::move(file_io))); return std::make_unique(std::move(writer), std::move(adapter)); } Result> ManifestWriter::MakeV3Writer( std::optional snapshot_id, std::optional first_row_id, std::string_view manifest_location, std::shared_ptr file_io, - std::shared_ptr partition_schema) { - // TODO(xiao.dong) parse v3 schema - auto manifest_entry_schema = - ManifestEntry::TypeFromPartitionType(std::move(partition_schema)); - auto fields_span = manifest_entry_schema->fields(); - std::vector fields(fields_span.begin(), fields_span.end()); - auto schema = std::make_shared(fields); - ICEBERG_ASSIGN_OR_RAISE(auto writer, - OpenFileWriter(manifest_location, schema, std::move(file_io))); + std::shared_ptr partition_spec) { auto adapter = std::make_unique(snapshot_id, first_row_id, - std::move(schema)); + std::move(partition_spec)); + ICEBERG_RETURN_UNEXPECTED(adapter->Init()); + ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); + + auto schema = adapter->schema(); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, + OpenFileWriter(manifest_location, std::move(schema), std::move(file_io))); return std::make_unique(std::move(writer), std::move(adapter)); } @@ -132,20 +131,20 @@ Status ManifestListWriter::Close() { ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending()); ICEBERG_RETURN_UNEXPECTED(writer_->Write(array)); } - return {}; + return writer_->Close(); } Result> ManifestListWriter::MakeV1Writer( int64_t snapshot_id, std::optional parent_snapshot_id, std::string_view manifest_list_location, std::shared_ptr file_io) { - // TODO(xiao.dong) parse v1 schema - std::vector fields(ManifestFile::Type().fields().begin(), - ManifestFile::Type().fields().end()); - auto schema = std::make_shared(fields); + auto adapter = std::make_unique(snapshot_id, parent_snapshot_id); + ICEBERG_RETURN_UNEXPECTED(adapter->Init()); + ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); + + auto schema = adapter->schema(); ICEBERG_ASSIGN_OR_RAISE( - auto writer, OpenFileWriter(manifest_list_location, schema, std::move(file_io))); - auto adapter = std::make_unique(snapshot_id, parent_snapshot_id, - std::move(schema)); + auto writer, + OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io))); return std::make_unique(std::move(writer), std::move(adapter)); } @@ -153,14 +152,16 @@ Result> ManifestListWriter::MakeV2Writer( int64_t snapshot_id, std::optional parent_snapshot_id, int64_t sequence_number, std::string_view manifest_list_location, std::shared_ptr file_io) { - // TODO(xiao.dong) parse v2 schema - std::vector fields(ManifestFile::Type().fields().begin(), - ManifestFile::Type().fields().end()); - auto schema = std::make_shared(fields); + auto adapter = std::make_unique(snapshot_id, parent_snapshot_id, + sequence_number); + ICEBERG_RETURN_UNEXPECTED(adapter->Init()); + ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); + + auto schema = adapter->schema(); ICEBERG_ASSIGN_OR_RAISE( - auto writer, OpenFileWriter(manifest_list_location, schema, std::move(file_io))); - auto adapter = std::make_unique( - snapshot_id, parent_snapshot_id, sequence_number, std::move(schema)); + auto writer, + OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io))); + return std::make_unique(std::move(writer), std::move(adapter)); } @@ -168,14 +169,15 @@ Result> ManifestListWriter::MakeV3Writer( int64_t snapshot_id, std::optional parent_snapshot_id, int64_t sequence_number, std::optional first_row_id, std::string_view manifest_list_location, std::shared_ptr file_io) { - // TODO(xiao.dong) parse v3 schema - std::vector fields(ManifestFile::Type().fields().begin(), - ManifestFile::Type().fields().end()); - auto schema = std::make_shared(fields); + auto adapter = std::make_unique(snapshot_id, parent_snapshot_id, + sequence_number, first_row_id); + ICEBERG_RETURN_UNEXPECTED(adapter->Init()); + ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending()); + + auto schema = adapter->schema(); ICEBERG_ASSIGN_OR_RAISE( - auto writer, OpenFileWriter(manifest_list_location, schema, std::move(file_io))); - auto adapter = std::make_unique( - snapshot_id, parent_snapshot_id, sequence_number, first_row_id, std::move(schema)); + auto writer, + OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io))); return std::make_unique(std::move(writer), std::move(adapter)); } diff --git a/src/iceberg/manifest_writer.h b/src/iceberg/manifest_writer.h index c9ec3070..69e19121 100644 --- a/src/iceberg/manifest_writer.h +++ b/src/iceberg/manifest_writer.h @@ -39,7 +39,7 @@ class ICEBERG_EXPORT ManifestWriter { std::unique_ptr adapter) : writer_(std::move(writer)), adapter_(std::move(adapter)) {} - virtual ~ManifestWriter() = default; + ~ManifestWriter() = default; /// \brief Write manifest entry to file. /// \param entry Manifest entry to write. @@ -58,30 +58,33 @@ class ICEBERG_EXPORT ManifestWriter { /// \param snapshot_id ID of the snapshot. /// \param manifest_location Path to the manifest file. /// \param file_io File IO implementation to use. + /// \param partition_spec Partition spec for the manifest. /// \return A Result containing the writer or an error. static Result> MakeV1Writer( std::optional snapshot_id, std::string_view manifest_location, - std::shared_ptr file_io, std::shared_ptr partition_schema); + std::shared_ptr file_io, std::shared_ptr partition_spec); /// \brief Creates a writer for a manifest file. /// \param snapshot_id ID of the snapshot. /// \param manifest_location Path to the manifest file. /// \param file_io File IO implementation to use. + /// \param partition_spec Partition spec for the manifest. /// \return A Result containing the writer or an error. static Result> MakeV2Writer( std::optional snapshot_id, std::string_view manifest_location, - std::shared_ptr file_io, std::shared_ptr partition_schema); + std::shared_ptr file_io, std::shared_ptr partition_spec); /// \brief Creates a writer for a manifest file. /// \param snapshot_id ID of the snapshot. /// \param first_row_id First row ID of the snapshot. /// \param manifest_location Path to the manifest file. /// \param file_io File IO implementation to use. + /// \param partition_spec Partition spec for the manifest. /// \return A Result containing the writer or an error. static Result> MakeV3Writer( std::optional snapshot_id, std::optional first_row_id, std::string_view manifest_location, std::shared_ptr file_io, - std::shared_ptr partition_schema); + std::shared_ptr partition_spec); private: static constexpr int64_t kBatchSize = 1024; @@ -96,7 +99,7 @@ class ICEBERG_EXPORT ManifestListWriter { std::unique_ptr adapter) : writer_(std::move(writer)), adapter_(std::move(adapter)) {} - virtual ~ManifestListWriter() = default; + ~ManifestListWriter() = default; /// \brief Write manifest file to manifest list file. /// \param file Manifest file to write. diff --git a/src/iceberg/parquet/parquet_writer.cc b/src/iceberg/parquet/parquet_writer.cc index 2d2cd540..ed0bb8fd 100644 --- a/src/iceberg/parquet/parquet_writer.cc +++ b/src/iceberg/parquet/parquet_writer.cc @@ -76,9 +76,9 @@ class ParquetWriter::Impl { return {}; } - Status Write(ArrowArray array) { + Status Write(ArrowArray* array) { ICEBERG_ARROW_ASSIGN_OR_RETURN(auto batch, - ::arrow::ImportRecordBatch(&array, arrow_schema_)); + ::arrow::ImportRecordBatch(array, arrow_schema_)); ICEBERG_ARROW_RETURN_NOT_OK(writer_->WriteRecordBatch(*batch)); @@ -132,7 +132,7 @@ Status ParquetWriter::Open(const WriterOptions& options) { return impl_->Open(options); } -Status ParquetWriter::Write(ArrowArray array) { return impl_->Write(array); } +Status ParquetWriter::Write(ArrowArray* array) { return impl_->Write(array); } Status ParquetWriter::Close() { return impl_->Close(); } diff --git a/src/iceberg/parquet/parquet_writer.h b/src/iceberg/parquet/parquet_writer.h index 5371f381..9be0a430 100644 --- a/src/iceberg/parquet/parquet_writer.h +++ b/src/iceberg/parquet/parquet_writer.h @@ -35,7 +35,7 @@ class ICEBERG_BUNDLE_EXPORT ParquetWriter : public Writer { Status Close() final; - Status Write(ArrowArray array) final; + Status Write(ArrowArray* array) final; std::optional metrics() final; diff --git a/src/iceberg/partition_spec.cc b/src/iceberg/partition_spec.cc index 2b41950b..3fa5d86e 100644 --- a/src/iceberg/partition_spec.cc +++ b/src/iceberg/partition_spec.cc @@ -24,7 +24,10 @@ #include #include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/transform.h" #include "iceberg/util/formatter.h" // IWYU pragma: keep +#include "iceberg/util/macros.h" namespace iceberg { @@ -57,6 +60,50 @@ int32_t PartitionSpec::spec_id() const { return spec_id_; } std::span PartitionSpec::fields() const { return fields_; } +Result> PartitionSpec::PartitionType() { + if (fields_.empty()) { + return nullptr; + } + { + std::scoped_lock lock(mutex_); + if (partition_type_ != nullptr) { + return partition_type_; + } + } + + std::vector partition_fields; + for (const auto& partition_field : fields_) { + // Get the source field from the original schema by source_id + ICEBERG_ASSIGN_OR_RAISE(auto source_field, + schema_->FindFieldById(partition_field.source_id())); + if (!source_field.has_value()) { + // TODO(xiao.dong) when source field is missing, + // should return an error or just use UNKNOWN type + return InvalidSchema("Cannot find source field for partition field:{}", + partition_field.field_id()); + } + auto source_field_type = source_field.value().get().type(); + // Bind the transform to the source field type to get the result type + ICEBERG_ASSIGN_OR_RAISE(auto transform_function, + partition_field.transform()->Bind(source_field_type)); + + auto result_type = transform_function->ResultType(); + + // Create the partition field with the transform result type + // Partition fields are always optional (can be null) + partition_fields.emplace_back(partition_field.field_id(), + std::string(partition_field.name()), + std::move(result_type), + /*optional=*/true); + } + + std::scoped_lock lock(mutex_); + if (partition_type_ == nullptr) { + partition_type_ = std::make_shared(std::move(partition_fields)); + } + return partition_type_; +} + std::string PartitionSpec::ToString() const { std::string repr = std::format("partition_spec[spec_id<{}>,\n", spec_id_); for (const auto& field : fields_) { diff --git a/src/iceberg/partition_spec.h b/src/iceberg/partition_spec.h index f105a27e..d4d4ea7f 100644 --- a/src/iceberg/partition_spec.h +++ b/src/iceberg/partition_spec.h @@ -23,6 +23,7 @@ /// Partition specs for Iceberg tables. #include +#include #include #include #include @@ -30,6 +31,7 @@ #include "iceberg/iceberg_export.h" #include "iceberg/partition_field.h" +#include "iceberg/result.h" #include "iceberg/util/formattable.h" namespace iceberg { @@ -67,6 +69,8 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable { /// \brief Get a view of the partition fields. std::span fields() const; + Result> PartitionType(); + std::string ToString() const override; int32_t last_assigned_field_id() const { return last_assigned_field_id_; } @@ -83,6 +87,8 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable { const int32_t spec_id_; std::vector fields_; int32_t last_assigned_field_id_; + std::mutex mutex_; + std::shared_ptr partition_type_; }; } // namespace iceberg diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 0a54a57c..7e53a8dc 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -117,8 +117,8 @@ if(ICEBERG_BUILD_BUNDLE) avro_test.cc avro_schema_test.cc avro_stream_test.cc - manifest_list_reader_test.cc - manifest_reader_test.cc + manifest_list_reader_writer_test.cc + manifest_reader_writer_test.cc test_common.cc) add_iceberg_test(arrow_test diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc index 4bc773cb..2bd09f92 100644 --- a/src/iceberg/test/avro_test.cc +++ b/src/iceberg/test/avro_test.cc @@ -128,7 +128,7 @@ class AvroReaderTest : public TempFileTestBase { {.path = temp_avro_file_, .schema = schema, .io = file_io_}); ASSERT_TRUE(writer_result.has_value()); auto writer = std::move(writer_result.value()); - ASSERT_THAT(writer->Write(arrow_array), IsOk()); + ASSERT_THAT(writer->Write(&arrow_array), IsOk()); ASSERT_THAT(writer->Close(), IsOk()); auto file_info_result = local_fs_->GetFileInfo(temp_avro_file_); diff --git a/src/iceberg/test/manifest_list_reader_test.cc b/src/iceberg/test/manifest_list_reader_writer_test.cc similarity index 78% rename from src/iceberg/test/manifest_list_reader_test.cc rename to src/iceberg/test/manifest_list_reader_writer_test.cc index 9fd6e4c1..793dd33b 100644 --- a/src/iceberg/test/manifest_list_reader_test.cc +++ b/src/iceberg/test/manifest_list_reader_writer_test.cc @@ -18,7 +18,6 @@ */ #include -#include #include #include "iceberg/arrow/arrow_fs_file_io_internal.h" @@ -26,12 +25,14 @@ #include "iceberg/expression/literal.h" #include "iceberg/manifest_list.h" #include "iceberg/manifest_reader.h" +#include "iceberg/manifest_writer.h" +#include "matchers.h" #include "temp_file_test_base.h" #include "test_common.h" namespace iceberg { -class ManifestListReaderTestBase : public TempFileTestBase { +class ManifestListReaderWriterTestBase : public TempFileTestBase { protected: static void SetUpTestSuite() { avro::RegisterAll(); } @@ -44,6 +45,11 @@ class ManifestListReaderTestBase : public TempFileTestBase { void TestManifestListReading(const std::string& resource_name, const std::vector& expected_manifest_list) { std::string path = GetResourcePath(resource_name); + TestManifestListReadingByPath(path, expected_manifest_list); + } + + void TestManifestListReadingByPath( + const std::string& path, const std::vector& expected_manifest_list) { auto manifest_reader_result = ManifestListReader::Make(path, file_io_); ASSERT_EQ(manifest_reader_result.has_value(), true); @@ -66,7 +72,7 @@ class ManifestListReaderTestBase : public TempFileTestBase { std::shared_ptr file_io_; }; -class ManifestListReaderV1Test : public ManifestListReaderTestBase { +class ManifestListReaderWriterV1Test : public ManifestListReaderWriterTestBase { protected: std::vector PreparePartitionedTestData() { std::vector paths = { @@ -202,9 +208,20 @@ class ManifestListReaderV1Test : public ManifestListReaderTestBase { .lower_bound = lower_bounds[3], .upper_bound = upper_bounds[3]}}}}; } + + void TestWriteManifestList(const std::string& manifest_list_path, + const std::vector& manifest_files) { + auto result = ManifestListWriter::MakeV1Writer(1, 0, manifest_list_path, file_io_); + ASSERT_TRUE(result.has_value()) << result.error().message; + auto writer = std::move(result.value()); + auto status = writer->AddAll(manifest_files); + EXPECT_THAT(status, IsOk()); + status = writer->Close(); + EXPECT_THAT(status, IsOk()); + } }; -class ManifestListReaderV2Test : public ManifestListReaderTestBase { +class ManifestListReaderWriterV2Test : public ManifestListReaderWriterTestBase { protected: std::vector PreparePartitionedTestData() { std::vector manifest_files; @@ -280,39 +297,71 @@ class ManifestListReaderV2Test : public ManifestListReaderTestBase { } return manifest_files; } + + void TestWriteManifestList(const std::string& manifest_list_path, + const std::vector& manifest_files) { + auto result = ManifestListWriter::MakeV2Writer(1, 0, 4, manifest_list_path, file_io_); + ASSERT_TRUE(result.has_value()) << result.error().message; + auto writer = std::move(result.value()); + auto status = writer->AddAll(manifest_files); + EXPECT_THAT(status, IsOk()); + status = writer->Close(); + EXPECT_THAT(status, IsOk()); + } }; // V1 Tests -TEST_F(ManifestListReaderV1Test, PartitionedTest) { +TEST_F(ManifestListReaderWriterV1Test, PartitionedTest) { auto expected_manifest_list = PreparePartitionedTestData(); TestManifestListReading( "snap-7532614258660258098-1-eafd2972-f58e-4185-9237-6378f564787e.avro", expected_manifest_list); } -TEST_F(ManifestListReaderV1Test, ComplexTypeTest) { +TEST_F(ManifestListReaderWriterV1Test, ComplexTypeTest) { auto expected_manifest_list = PrepareComplexTypeTestData(); TestManifestListReading( "snap-4134160420377642835-1-aeffe099-3bac-4011-bc17-5875210d8dc0.avro", expected_manifest_list); } -TEST_F(ManifestListReaderV1Test, ComplexPartitionedTest) { +TEST_F(ManifestListReaderWriterV1Test, ComplexPartitionedTest) { auto expected_manifest_list = PrepareComplexPartitionedTestData(); TestManifestListReading( "snap-7522296285847100621-1-5d690750-8fb4-4cd1-8ae7-85c7b39abe14.avro", expected_manifest_list); } +TEST_F(ManifestListReaderWriterV1Test, WritePartitionedTest) { + auto expected_manifest_list = PreparePartitionedTestData(); + auto write_manifest_list_path = CreateNewTempFilePath(); + TestWriteManifestList(write_manifest_list_path, expected_manifest_list); + TestManifestListReadingByPath(write_manifest_list_path, expected_manifest_list); +} + +TEST_F(ManifestListReaderWriterV1Test, WriteComplexTypeTest) { + auto expected_manifest_list = PrepareComplexTypeTestData(); + auto write_manifest_list_path = CreateNewTempFilePath(); + TestWriteManifestList(write_manifest_list_path, expected_manifest_list); + TestManifestListReadingByPath(write_manifest_list_path, expected_manifest_list); +} + +TEST_F(ManifestListReaderWriterV1Test, WriteComplexPartitionedTest) { + auto expected_manifest_list = PrepareComplexPartitionedTestData(); + auto write_manifest_list_path = CreateNewTempFilePath(); + TestWriteManifestList(write_manifest_list_path, expected_manifest_list); + TestManifestListReadingByPath(write_manifest_list_path, expected_manifest_list); +} + // V2 Tests -TEST_F(ManifestListReaderV2Test, PartitionedTest) { +TEST_F(ManifestListReaderWriterV2Test, PartitionedTest) { auto expected_manifest_list = PreparePartitionedTestData(); TestManifestListReading( "snap-7412193043800610213-1-2bccd69e-d642-4816-bba0-261cd9bd0d93.avro", expected_manifest_list); } -TEST_F(ManifestListReaderV2Test, NonPartitionedTest) { +TEST_F(ManifestListReaderWriterV2Test, NonPartitionedTest) { auto expected_manifest_list = PrepareNonPartitionedTestData(); TestManifestListReading( "snap-251167482216575399-1-ccb6dbcb-0611-48da-be68-bd506ea63188.avro", @@ -322,4 +371,21 @@ TEST_F(ManifestListReaderV2Test, NonPartitionedTest) { TestNonPartitionedManifests(expected_manifest_list); } +TEST_F(ManifestListReaderWriterV2Test, WritePartitionedTest) { + auto expected_manifest_list = PreparePartitionedTestData(); + auto write_manifest_list_path = CreateNewTempFilePath(); + TestWriteManifestList(write_manifest_list_path, expected_manifest_list); + TestManifestListReadingByPath(write_manifest_list_path, expected_manifest_list); +} + +TEST_F(ManifestListReaderWriterV2Test, WriteNonPartitionedTest) { + auto expected_manifest_list = PrepareNonPartitionedTestData(); + auto write_manifest_list_path = CreateNewTempFilePath(); + TestWriteManifestList(write_manifest_list_path, expected_manifest_list); + TestManifestListReadingByPath(write_manifest_list_path, expected_manifest_list); + + // Additional verification: ensure all manifests are truly non-partitioned + TestNonPartitionedManifests(expected_manifest_list); +} + } // namespace iceberg diff --git a/src/iceberg/test/manifest_reader_test.cc b/src/iceberg/test/manifest_reader_writer_test.cc similarity index 74% rename from src/iceberg/test/manifest_reader_test.cc rename to src/iceberg/test/manifest_reader_writer_test.cc index 7381b298..435b9bca 100644 --- a/src/iceberg/test/manifest_reader_test.cc +++ b/src/iceberg/test/manifest_reader_writer_test.cc @@ -17,8 +17,6 @@ * under the License. */ -#include "iceberg/manifest_reader.h" - #include #include @@ -28,7 +26,11 @@ #include "iceberg/avro/avro_register.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" +#include "iceberg/manifest_reader.h" +#include "iceberg/manifest_writer.h" #include "iceberg/schema.h" +#include "iceberg/transform.h" +#include "matchers.h" #include "temp_file_test_base.h" #include "test_common.h" @@ -48,6 +50,12 @@ class ManifestReaderTestBase : public TempFileTestBase { const std::vector& expected_entries, std::shared_ptr partition_schema = nullptr) { std::string path = GetResourcePath(resource_name); + TestManifestReadingByPath(path, expected_entries, partition_schema); + } + + void TestManifestReadingByPath(const std::string& path, + const std::vector& expected_entries, + std::shared_ptr partition_schema = nullptr) { auto manifest_reader_result = ManifestReader::Make(path, file_io_, partition_schema); ASSERT_TRUE(manifest_reader_result.has_value()) << manifest_reader_result.error().message; @@ -142,9 +150,23 @@ class ManifestReaderV1Test : public ManifestReaderTestBase { } return manifest_entries; } + + void TestWriteManifest(const std::string& manifest_list_path, + std::shared_ptr partition_spec, + const std::vector& manifest_entries) { + auto result = ManifestWriter::MakeV1Writer(1, manifest_list_path, file_io_, + std::move(partition_spec)); + ASSERT_TRUE(result.has_value()) << result.error().message; + auto writer = std::move(result.value()); + auto status = writer->AddAll(manifest_entries); + EXPECT_THAT(status, IsOk()); + status = writer->Close(); + EXPECT_THAT(status, IsOk()); + } }; TEST_F(ManifestReaderV1Test, PartitionedTest) { + // TODO(xiao.dong) we need to add more cases for different partition types iceberg::SchemaField partition_field(1000, "order_ts_hour", iceberg::int32(), true); auto partition_schema = std::make_shared(std::vector({partition_field})); @@ -153,6 +175,23 @@ TEST_F(ManifestReaderV1Test, PartitionedTest) { partition_schema); } +TEST_F(ManifestReaderV1Test, WritePartitionedTest) { + iceberg::SchemaField table_field(1, "order_ts_hour_source", iceberg::int32(), true); + iceberg::SchemaField partition_field(1000, "order_ts_hour", iceberg::int32(), true); + auto table_schema = std::make_shared(std::vector({table_field})); + auto partition_schema = + std::make_shared(std::vector({partition_field})); + auto identity_transform = Transform::Identity(); + std::vector fields{ + PartitionField(1, 1000, "order_ts_hour", identity_transform)}; + auto partition_spec = std::make_shared(table_schema, 1, fields); + + auto expected_entries = PreparePartitionedTestData(); + auto write_manifest_path = CreateNewTempFilePath(); + TestWriteManifest(write_manifest_path, partition_spec, expected_entries); + TestManifestReadingByPath(write_manifest_path, expected_entries, partition_schema); +} + class ManifestReaderV2Test : public ManifestReaderTestBase { protected: std::vector CreateV2TestData( @@ -218,6 +257,19 @@ class ManifestReaderV2Test : public ManifestReaderTestBase { std::vector PrepareMetadataInheritanceTestData() { return CreateV2TestData(/*sequence_number=*/15, /*partition_spec_id*/ 12); } + + void TestWriteManifest(int64_t snapshot_id, const std::string& manifest_list_path, + std::shared_ptr partition_spec, + const std::vector& manifest_entries) { + auto result = ManifestWriter::MakeV2Writer(snapshot_id, manifest_list_path, file_io_, + std::move(partition_spec)); + ASSERT_TRUE(result.has_value()) << result.error().message; + auto writer = std::move(result.value()); + auto status = writer->AddAll(manifest_entries); + EXPECT_THAT(status, IsOk()); + status = writer->Close(); + EXPECT_THAT(status, IsOk()); + } }; TEST_F(ManifestReaderV2Test, NonPartitionedTest) { @@ -239,4 +291,26 @@ TEST_F(ManifestReaderV2Test, MetadataInheritanceTest) { TestManifestReadingWithManifestFile(manifest_file, expected_entries); } +TEST_F(ManifestReaderV2Test, WriteNonPartitionedTest) { + auto expected_entries = PrepareNonPartitionedTestData(); + auto write_manifest_path = CreateNewTempFilePath(); + TestWriteManifest(679879563479918846LL, write_manifest_path, nullptr, expected_entries); + TestManifestReadingByPath(write_manifest_path, expected_entries); +} + +TEST_F(ManifestReaderV2Test, WriteInheritancePartitionedTest) { + auto expected_entries = PrepareMetadataInheritanceTestData(); + auto write_manifest_path = CreateNewTempFilePath(); + TestWriteManifest(679879563479918846LL, write_manifest_path, nullptr, expected_entries); + ManifestFile manifest_file{ + .manifest_path = write_manifest_path, + .manifest_length = 100, + .partition_spec_id = 12, + .content = ManifestFile::Content::kData, + .sequence_number = 15, + .added_snapshot_id = 679879563479918846LL, + }; + TestManifestReadingWithManifestFile(manifest_file, expected_entries); +} + } // namespace iceberg diff --git a/src/iceberg/test/parquet_test.cc b/src/iceberg/test/parquet_test.cc index 0c42b846..75581756 100644 --- a/src/iceberg/test/parquet_test.cc +++ b/src/iceberg/test/parquet_test.cc @@ -51,7 +51,7 @@ namespace { Status WriteArray(std::shared_ptr<::arrow::Array> data, Writer& writer) { ArrowArray arr; ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*data, &arr)); - ICEBERG_RETURN_UNEXPECTED(writer.Write(arr)); + ICEBERG_RETURN_UNEXPECTED(writer.Write(&arr)); return writer.Close(); } diff --git a/src/iceberg/test/partition_spec_test.cc b/src/iceberg/test/partition_spec_test.cc index 871bb081..538d89a0 100644 --- a/src/iceberg/test/partition_spec_test.cc +++ b/src/iceberg/test/partition_spec_test.cc @@ -87,4 +87,23 @@ TEST(PartitionSpecTest, Equality) { ASSERT_NE(schema1, schema6); ASSERT_NE(schema6, schema1); } + +TEST(PartitionSpecTest, PartitionSchemaTest) { + SchemaField field1(5, "ts", iceberg::timestamp(), true); + SchemaField field2(7, "bar", iceberg::string(), true); + auto const schema = + std::make_shared(std::vector{field1, field2}, 100); + auto identity_transform = Transform::Identity(); + PartitionField pt_field1(5, 1000, "day", identity_transform); + PartitionField pt_field2(7, 1001, "hour", identity_transform); + PartitionSpec spec(schema, 100, {pt_field1, pt_field2}); + + auto partition_schema = spec.PartitionType(); + ASSERT_TRUE(partition_schema.has_value()); + ASSERT_EQ(2, partition_schema.value()->fields().size()); + EXPECT_EQ(pt_field1.name(), partition_schema.value()->fields()[0].name()); + EXPECT_EQ(pt_field1.field_id(), partition_schema.value()->fields()[0].field_id()); + EXPECT_EQ(pt_field2.name(), partition_schema.value()->fields()[1].name()); + EXPECT_EQ(pt_field2.field_id(), partition_schema.value()->fields()[1].field_id()); +} } // namespace iceberg diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 41061d39..bdc5c1e3 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -128,6 +128,7 @@ struct DataFile; struct ManifestEntry; struct ManifestFile; struct ManifestList; +struct PartitionFieldSummary; class ManifestListReader; class ManifestListWriter; diff --git a/src/iceberg/v1_metadata.cc b/src/iceberg/v1_metadata.cc new file mode 100644 index 00000000..f8b02276 --- /dev/null +++ b/src/iceberg/v1_metadata.cc @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/v1_metadata.h" + +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" +#include "iceberg/schema.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Status ManifestEntryAdapterV1::Init() { + static std::unordered_set kManifestEntryFieldIds{ + ManifestEntry::kStatus.field_id(), + ManifestEntry::kSnapshotId.field_id(), + ManifestEntry::kDataFileFieldId, + DataFile::kFilePath.field_id(), + DataFile::kFileFormat.field_id(), + DataFile::kPartitionFieldId, + DataFile::kRecordCount.field_id(), + DataFile::kFileSize.field_id(), + 105, // kBlockSizeInBytes field id + DataFile::kColumnSizes.field_id(), + DataFile::kValueCounts.field_id(), + DataFile::kNullValueCounts.field_id(), + DataFile::kNanValueCounts.field_id(), + DataFile::kLowerBounds.field_id(), + DataFile::kUpperBounds.field_id(), + DataFile::kKeyMetadata.field_id(), + DataFile::kSplitOffsets.field_id(), + DataFile::kSortOrderId.field_id(), + }; + // TODO(xiao.dong) schema to json + metadata_["schema"] = "{}"; + // TODO(xiao.dong) partition spec to json + metadata_["partition-spec"] = "{}"; + if (partition_spec_ != nullptr) { + metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id()); + } + metadata_["format-version"] = "1"; + return InitSchema(kManifestEntryFieldIds); +} + +Status ManifestEntryAdapterV1::Append(const ManifestEntry& entry) { + return AppendInternal(entry); +} + +Result> ManifestEntryAdapterV1::GetManifestEntryStructType() { + // 'block_size_in_bytes' (ID 105) is a deprecated field that is REQUIRED + // in the v1 data_file schema for backward compatibility. + // Deprecated. Always write a default in v1. Do not write in v2 or v3. + static const SchemaField kBlockSizeInBytes = SchemaField::MakeRequired( + 105, "block_size_in_bytes", int64(), "Block size in bytes"); + ICEBERG_ASSIGN_OR_RAISE(auto partition_type, partition_spec_->PartitionType()); + if (!partition_type) { + partition_type = PartitionSpec::Unpartitioned()->schema(); + } + auto datafile_type = std::make_shared(std::vector{ + DataFile::kFilePath, DataFile::kFileFormat, + SchemaField::MakeRequired(102, DataFile::kPartitionField, + std::move(partition_type)), + DataFile::kRecordCount, DataFile::kFileSize, kBlockSizeInBytes, + DataFile::kColumnSizes, DataFile::kValueCounts, DataFile::kNullValueCounts, + DataFile::kNanValueCounts, DataFile::kLowerBounds, DataFile::kUpperBounds, + DataFile::kKeyMetadata, DataFile::kSplitOffsets, DataFile::kSortOrderId}); + + return std::make_shared( + std::vector{ManifestEntry::kStatus, ManifestEntry::kSnapshotId, + SchemaField::MakeRequired(2, ManifestEntry::kDataFileField, + std::move(datafile_type))}); +} + +Status ManifestFileAdapterV1::Init() { + static std::unordered_set kManifestFileFieldIds{ + ManifestFile::kManifestPath.field_id(), + ManifestFile::kManifestLength.field_id(), + ManifestFile::kPartitionSpecId.field_id(), + ManifestFile::kAddedSnapshotId.field_id(), + ManifestFile::kAddedFilesCount.field_id(), + ManifestFile::kExistingFilesCount.field_id(), + ManifestFile::kDeletedFilesCount.field_id(), + ManifestFile::kAddedRowsCount.field_id(), + ManifestFile::kExistingRowsCount.field_id(), + ManifestFile::kDeletedRowsCount.field_id(), + ManifestFile::kPartitions.field_id(), + ManifestFile::kKeyMetadata.field_id(), + }; + metadata_["snapshot-id"] = std::to_string(snapshot_id_); + metadata_["parent-snapshot-id"] = parent_snapshot_id_.has_value() + ? std::to_string(parent_snapshot_id_.value()) + : "null"; + metadata_["format-version"] = "1"; + return InitSchema(kManifestFileFieldIds); +} + +Status ManifestFileAdapterV1::Append(const ManifestFile& file) { + if (file.content != ManifestFile::Content::kData) { + return InvalidManifestList("Cannot store delete manifests in a v1 table"); + } + return AppendInternal(file); +} + +} // namespace iceberg diff --git a/src/iceberg/v1_metadata.h b/src/iceberg/v1_metadata.h index 7e91da7b..7b2dd1d0 100644 --- a/src/iceberg/v1_metadata.h +++ b/src/iceberg/v1_metadata.h @@ -20,9 +20,6 @@ #pragma once /// \file iceberg/v1_metadata.h - -#include - #include "iceberg/manifest_adapter.h" namespace iceberg { @@ -31,32 +28,29 @@ namespace iceberg { class ManifestEntryAdapterV1 : public ManifestEntryAdapter { public: ManifestEntryAdapterV1(std::optional snapshot_id, - std::shared_ptr schema) { - // TODO(xiao.dong): init v1 schema - } - Status StartAppending() override { return {}; } - Status Append(const ManifestEntry& entry) override { return {}; } - Result FinishAppending() override { return {}; } + std::shared_ptr partition_spec) + : ManifestEntryAdapter(std::move(partition_spec)), snapshot_id_(snapshot_id) {} + Status Init() override; + Status Append(const ManifestEntry& entry) override; + + protected: + Result> GetManifestEntryStructType() override; private: - std::shared_ptr manifest_schema_; - ArrowSchema schema_; // converted from manifest_schema_ + std::optional snapshot_id_; }; /// \brief Adapter to convert V1 ManifestFile to `ArrowArray`. class ManifestFileAdapterV1 : public ManifestFileAdapter { public: - ManifestFileAdapterV1(int64_t snapshot_id, std::optional parent_snapshot_id, - std::shared_ptr schema) { - // TODO(xiao.dong): init v1 schema - } - Status StartAppending() override { return {}; } - Status Append(const ManifestFile& file) override { return {}; } - Result FinishAppending() override { return {}; } + ManifestFileAdapterV1(int64_t snapshot_id, std::optional parent_snapshot_id) + : snapshot_id_(snapshot_id), parent_snapshot_id_(parent_snapshot_id) {} + Status Init() override; + Status Append(const ManifestFile& file) override; private: - std::shared_ptr manifest_list_schema_; - ArrowSchema schema_; // converted from manifest_list_schema_ + int64_t snapshot_id_; + std::optional parent_snapshot_id_; }; } // namespace iceberg diff --git a/src/iceberg/v2_metadata.cc b/src/iceberg/v2_metadata.cc new file mode 100644 index 00000000..c0de24ef --- /dev/null +++ b/src/iceberg/v2_metadata.cc @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/v2_metadata.h" + +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" +#include "iceberg/schema.h" + +namespace iceberg { + +Status ManifestEntryAdapterV2::Init() { + static std::unordered_set kManifestEntryFieldIds{ + ManifestEntry::kStatus.field_id(), + ManifestEntry::kSnapshotId.field_id(), + ManifestEntry::kSequenceNumber.field_id(), + ManifestEntry::kFileSequenceNumber.field_id(), + ManifestEntry::kDataFileFieldId, + DataFile::kContent.field_id(), + DataFile::kFilePath.field_id(), + DataFile::kFileFormat.field_id(), + DataFile::kPartitionFieldId, + DataFile::kRecordCount.field_id(), + DataFile::kFileSize.field_id(), + DataFile::kColumnSizes.field_id(), + DataFile::kValueCounts.field_id(), + DataFile::kNullValueCounts.field_id(), + DataFile::kNanValueCounts.field_id(), + DataFile::kLowerBounds.field_id(), + DataFile::kUpperBounds.field_id(), + DataFile::kKeyMetadata.field_id(), + DataFile::kSplitOffsets.field_id(), + DataFile::kEqualityIds.field_id(), + DataFile::kSortOrderId.field_id(), + DataFile::kReferencedDataFile.field_id(), + }; + // TODO(xiao.dong) schema to json + metadata_["schema"] = "{}"; + // TODO(xiao.dong) partition spec to json + metadata_["partition-spec"] = "{}"; + if (partition_spec_ != nullptr) { + metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id()); + } + metadata_["format-version"] = "2"; + metadata_["content"] = "data"; + return InitSchema(kManifestEntryFieldIds); +} + +Status ManifestEntryAdapterV2::Append(const ManifestEntry& entry) { + return AppendInternal(entry); +} + +Result> ManifestEntryAdapterV2::GetSequenceNumber( + const ManifestEntry& entry) { + if (!entry.sequence_number.has_value()) { + // if the entry's data sequence number is null, + // then it will inherit the sequence number of the current commit. + // to validate that this is correct, check that the snapshot id is either null (will + // also be inherited) or that it matches the id of the current commit. + if (entry.snapshot_id.has_value() && entry.snapshot_id.value() != snapshot_id_) { + return InvalidManifest( + "Found unassigned sequence number for an entry from snapshot: {}", + entry.snapshot_id.value()); + } + + // inheritance should work only for ADDED entries + if (entry.status != ManifestStatus::kAdded) { + return InvalidManifest( + "Only entries with status ADDED can have null sequence number"); + } + + return std::nullopt; + } + return entry.sequence_number; +} + +Result> ManifestEntryAdapterV2::GetReferenceDataFile( + const DataFile& file) { + if (file.content == DataFile::Content::kPositionDeletes) { + return file.referenced_data_file; + } + return std::nullopt; +} + +Status ManifestFileAdapterV2::Init() { + static std::unordered_set kManifestFileFieldIds{ + ManifestFile::kManifestPath.field_id(), + ManifestFile::kManifestLength.field_id(), + ManifestFile::kPartitionSpecId.field_id(), + ManifestFile::kContent.field_id(), + ManifestFile::kSequenceNumber.field_id(), + ManifestFile::kMinSequenceNumber.field_id(), + ManifestFile::kAddedSnapshotId.field_id(), + ManifestFile::kAddedFilesCount.field_id(), + ManifestFile::kExistingFilesCount.field_id(), + ManifestFile::kDeletedFilesCount.field_id(), + ManifestFile::kAddedRowsCount.field_id(), + ManifestFile::kExistingRowsCount.field_id(), + ManifestFile::kDeletedRowsCount.field_id(), + ManifestFile::kPartitions.field_id(), + ManifestFile::kKeyMetadata.field_id(), + }; + metadata_["snapshot-id"] = std::to_string(snapshot_id_); + metadata_["parent-snapshot-id"] = parent_snapshot_id_.has_value() + ? std::to_string(parent_snapshot_id_.value()) + : "null"; + metadata_["sequence-number"] = std::to_string(sequence_number_); + metadata_["format-version"] = "2"; + return InitSchema(kManifestFileFieldIds); +} + +Status ManifestFileAdapterV2::Append(const ManifestFile& file) { + return AppendInternal(file); +} + +Result ManifestFileAdapterV2::GetSequenceNumber(const ManifestFile& file) { + if (file.sequence_number == TableMetadata::kInvalidSequenceNumber) { + if (snapshot_id_ != file.added_snapshot_id) { + return InvalidManifestList( + "Found unassigned sequence number for a manifest from snapshot: %s", + file.added_snapshot_id); + } + return sequence_number_; + } + return file.sequence_number; +} + +Result ManifestFileAdapterV2::GetMinSequenceNumber(const ManifestFile& file) { + if (file.min_sequence_number == TableMetadata::kInvalidSequenceNumber) { + if (snapshot_id_ != file.added_snapshot_id) { + return InvalidManifestList( + "Found unassigned sequence number for a manifest from snapshot: %s", + file.added_snapshot_id); + } + return sequence_number_; + } + return file.min_sequence_number; +} + +} // namespace iceberg diff --git a/src/iceberg/v2_metadata.h b/src/iceberg/v2_metadata.h index d6ff6aa3..caee8c42 100644 --- a/src/iceberg/v2_metadata.h +++ b/src/iceberg/v2_metadata.h @@ -21,8 +21,6 @@ /// \file iceberg/v2_metadata.h -#include - #include "iceberg/manifest_adapter.h" namespace iceberg { @@ -31,32 +29,38 @@ namespace iceberg { class ManifestEntryAdapterV2 : public ManifestEntryAdapter { public: ManifestEntryAdapterV2(std::optional snapshot_id, - std::shared_ptr schema) { - // TODO(xiao.dong): init v2 schema - } - Status StartAppending() override { return {}; } - Status Append(const ManifestEntry& entry) override { return {}; } - Result FinishAppending() override { return {}; } + std::shared_ptr partition_spec) + : ManifestEntryAdapter(std::move(partition_spec)), snapshot_id_(snapshot_id) {} + Status Init() override; + Status Append(const ManifestEntry& entry) override; + + protected: + Result> GetSequenceNumber(const ManifestEntry& entry) override; + Result> GetReferenceDataFile(const DataFile& file) override; private: - std::shared_ptr manifest_schema_; - ArrowSchema schema_; // converted from manifest_schema_ + std::optional snapshot_id_; }; /// \brief Adapter to convert V2 ManifestFile to `ArrowArray`. class ManifestFileAdapterV2 : public ManifestFileAdapter { public: ManifestFileAdapterV2(int64_t snapshot_id, std::optional parent_snapshot_id, - int64_t sequence_number, std::shared_ptr schema) { - // TODO(xiao.dong): init v2 schema - } - Status StartAppending() override { return {}; } - Status Append(const ManifestFile& file) override { return {}; } - Result FinishAppending() override { return {}; } + int64_t sequence_number) + : snapshot_id_(snapshot_id), + parent_snapshot_id_(parent_snapshot_id), + sequence_number_(sequence_number) {} + Status Init() override; + Status Append(const ManifestFile& file) override; + + protected: + Result GetSequenceNumber(const ManifestFile& file) override; + Result GetMinSequenceNumber(const ManifestFile& file) override; private: - std::shared_ptr manifest_list_schema_; - ArrowSchema schema_; // converted from manifest_list_schema_ + int64_t snapshot_id_; + std::optional parent_snapshot_id_; + int64_t sequence_number_; }; } // namespace iceberg diff --git a/src/iceberg/v3_metadata.cc b/src/iceberg/v3_metadata.cc new file mode 100644 index 00000000..25399e02 --- /dev/null +++ b/src/iceberg/v3_metadata.cc @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/v3_metadata.h" + +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" +#include "iceberg/schema.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Status ManifestEntryAdapterV3::Init() { + static std::unordered_set kManifestEntryFieldIds{ + ManifestEntry::kStatus.field_id(), + ManifestEntry::kSnapshotId.field_id(), + ManifestEntry::kDataFileFieldId, + ManifestEntry::kSequenceNumber.field_id(), + ManifestEntry::kFileSequenceNumber.field_id(), + DataFile::kContent.field_id(), + DataFile::kFilePath.field_id(), + DataFile::kFileFormat.field_id(), + DataFile::kPartitionFieldId, + DataFile::kRecordCount.field_id(), + DataFile::kFileSize.field_id(), + DataFile::kColumnSizes.field_id(), + DataFile::kValueCounts.field_id(), + DataFile::kNullValueCounts.field_id(), + DataFile::kNanValueCounts.field_id(), + DataFile::kLowerBounds.field_id(), + DataFile::kUpperBounds.field_id(), + DataFile::kKeyMetadata.field_id(), + DataFile::kSplitOffsets.field_id(), + DataFile::kEqualityIds.field_id(), + DataFile::kSortOrderId.field_id(), + DataFile::kFirstRowId.field_id(), + DataFile::kReferencedDataFile.field_id(), + DataFile::kContentOffset.field_id(), + DataFile::kContentSize.field_id(), + }; + // TODO(xiao.dong) schema to json + metadata_["schema"] = "{}"; + // TODO(xiao.dong) partition spec to json + metadata_["partition-spec"] = "{}"; + if (partition_spec_ != nullptr) { + metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id()); + } + metadata_["format-version"] = "3"; + metadata_["content"] = "data"; + return InitSchema(kManifestEntryFieldIds); +} + +Status ManifestEntryAdapterV3::Append(const ManifestEntry& entry) { + return AppendInternal(entry); +} + +Result> ManifestEntryAdapterV3::GetSequenceNumber( + const ManifestEntry& entry) { + if (!entry.sequence_number.has_value()) { + // if the entry's data sequence number is null, + // then it will inherit the sequence number of the current commit. + // to validate that this is correct, check that the snapshot id is either null (will + // also be inherited) or that it matches the id of the current commit. + if (entry.snapshot_id.has_value() && entry.snapshot_id.value() != snapshot_id_) { + return InvalidManifest( + "Found unassigned sequence number for an entry from snapshot: {}", + entry.snapshot_id.value()); + } + + // inheritance should work only for ADDED entries + if (entry.status != ManifestStatus::kAdded) { + return InvalidManifest( + "Only entries with status ADDED can have null sequence number"); + } + + return std::nullopt; + } + return entry.sequence_number; +} + +Result> ManifestEntryAdapterV3::GetReferenceDataFile( + const DataFile& file) { + if (file.content == DataFile::Content::kPositionDeletes) { + return file.referenced_data_file; + } + return std::nullopt; +} + +Result> ManifestEntryAdapterV3::GetFirstRowId( + const DataFile& file) { + if (file.content == DataFile::Content::kData) { + return file.first_row_id; + } + return std::nullopt; +} + +Result> ManifestEntryAdapterV3::GetContentOffset( + const DataFile& file) { + if (file.content == DataFile::Content::kPositionDeletes) { + return file.content_offset; + } + return std::nullopt; +} + +Result> ManifestEntryAdapterV3::GetContentSizeInBytes( + const DataFile& file) { + if (file.content == DataFile::Content::kPositionDeletes) { + return file.content_size_in_bytes; + } + return std::nullopt; +} + +Status ManifestFileAdapterV3::Init() { + static std::unordered_set kManifestFileFieldIds{ + ManifestFile::kManifestPath.field_id(), + ManifestFile::kManifestLength.field_id(), + ManifestFile::kPartitionSpecId.field_id(), + ManifestFile::kContent.field_id(), + ManifestFile::kSequenceNumber.field_id(), + ManifestFile::kMinSequenceNumber.field_id(), + ManifestFile::kAddedSnapshotId.field_id(), + ManifestFile::kAddedFilesCount.field_id(), + ManifestFile::kExistingFilesCount.field_id(), + ManifestFile::kDeletedFilesCount.field_id(), + ManifestFile::kAddedRowsCount.field_id(), + ManifestFile::kExistingRowsCount.field_id(), + ManifestFile::kDeletedRowsCount.field_id(), + ManifestFile::kPartitions.field_id(), + ManifestFile::kKeyMetadata.field_id(), + ManifestFile::kFirstRowId.field_id(), + }; + metadata_["snapshot-id"] = std::to_string(snapshot_id_); + metadata_["parent-snapshot-id"] = parent_snapshot_id_.has_value() + ? std::to_string(parent_snapshot_id_.value()) + : "null"; + metadata_["sequence-number"] = std::to_string(sequence_number_); + metadata_["first-row-id"] = + next_row_id_.has_value() ? std::to_string(next_row_id_.value()) : "null"; + metadata_["format-version"] = "3"; + return InitSchema(kManifestFileFieldIds); +} + +Status ManifestFileAdapterV3::Append(const ManifestFile& file) { + auto status = AppendInternal(file); + ICEBERG_RETURN_UNEXPECTED(status); + if (WrappedFirstRowId(file) && next_row_id_.has_value()) { + next_row_id_ = next_row_id_.value() + file.existing_rows_count.value_or(0) + + file.added_rows_count.value_or(0); + } + return status; +} + +Result ManifestFileAdapterV3::GetSequenceNumber(const ManifestFile& file) { + if (file.sequence_number == TableMetadata::kInvalidSequenceNumber) { + if (snapshot_id_ != file.added_snapshot_id) { + return InvalidManifestList( + "Found unassigned sequence number for a manifest from snapshot: %s", + file.added_snapshot_id); + } + return sequence_number_; + } + return file.sequence_number; +} + +Result ManifestFileAdapterV3::GetMinSequenceNumber(const ManifestFile& file) { + if (file.min_sequence_number == TableMetadata::kInvalidSequenceNumber) { + if (snapshot_id_ != file.added_snapshot_id) { + return InvalidManifestList( + "Found unassigned sequence number for a manifest from snapshot: %s", + file.added_snapshot_id); + } + return sequence_number_; + } + return file.min_sequence_number; +} + +Result> ManifestFileAdapterV3::GetFirstRowId( + const ManifestFile& file) { + if (WrappedFirstRowId(file)) { + return next_row_id_; + } else if (file.content != ManifestFile::Content::kData) { + return std::nullopt; + } else { + if (!file.first_row_id.has_value()) { + return InvalidManifestList("Found unassigned first-row-id for file:{}", + file.manifest_path); + } + return file.first_row_id.value(); + } +} + +bool ManifestFileAdapterV3::WrappedFirstRowId(const ManifestFile& file) { + return file.content == ManifestFile::Content::kData && !file.first_row_id.has_value(); +} + +} // namespace iceberg diff --git a/src/iceberg/v3_metadata.h b/src/iceberg/v3_metadata.h index e7bcc355..784345f1 100644 --- a/src/iceberg/v3_metadata.h +++ b/src/iceberg/v3_metadata.h @@ -21,44 +21,59 @@ /// \file iceberg/v3_metadata.h -#include - #include "iceberg/manifest_adapter.h" namespace iceberg { -/// \brief Adapter to convert V3ManifestEntry to `ArrowArray`. +/// \brief Adapter to convert V3 ManifestEntry to `ArrowArray`. class ManifestEntryAdapterV3 : public ManifestEntryAdapter { public: ManifestEntryAdapterV3(std::optional snapshot_id, std::optional first_row_id, - std::shared_ptr schema) { - // TODO(xiao.dong): init v3 schema - } - Status StartAppending() override { return {}; } - Status Append(const ManifestEntry& entry) override { return {}; } - Result FinishAppending() override { return {}; } + std::shared_ptr partition_spec) + : ManifestEntryAdapter(std::move(partition_spec)), + snapshot_id_(snapshot_id), + first_row_id_(first_row_id) {} + Status Init() override; + Status Append(const ManifestEntry& entry) override; + + protected: + Result> GetSequenceNumber(const ManifestEntry& entry) override; + Result> GetReferenceDataFile(const DataFile& file) override; + Result> GetFirstRowId(const DataFile& file) override; + Result> GetContentOffset(const DataFile& file) override; + Result> GetContentSizeInBytes(const DataFile& file) override; private: - std::shared_ptr manifest_schema_; - ArrowSchema schema_; // converted from manifest_schema_ + std::optional snapshot_id_; + std::optional first_row_id_; }; /// \brief Adapter to convert V3 ManifestFile to `ArrowArray`. class ManifestFileAdapterV3 : public ManifestFileAdapter { public: ManifestFileAdapterV3(int64_t snapshot_id, std::optional parent_snapshot_id, - int64_t sequence_number, std::optional first_row_id, - std::shared_ptr schema) { - // TODO(xiao.dong): init v3 schema - } - Status StartAppending() override { return {}; } - Status Append(const ManifestFile& file) override { return {}; } - Result FinishAppending() override { return {}; } + int64_t sequence_number, std::optional first_row_id) + : snapshot_id_(snapshot_id), + parent_snapshot_id_(parent_snapshot_id), + sequence_number_(sequence_number), + next_row_id_(first_row_id) {} + Status Init() override; + Status Append(const ManifestFile& file) override; + + protected: + Result GetSequenceNumber(const ManifestFile& file) override; + Result GetMinSequenceNumber(const ManifestFile& file) override; + Result> GetFirstRowId(const ManifestFile& file) override; + + private: + bool WrappedFirstRowId(const ManifestFile& file); private: - std::shared_ptr manifest_list_schema_; - ArrowSchema schema_; // converted from manifest_list_schema_ + int64_t snapshot_id_; + std::optional parent_snapshot_id_; + int64_t sequence_number_; + std::optional next_row_id_; }; } // namespace iceberg