From 8df5115d18c9f2cd04cf6fd46cc949e964167cb7 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Sun, 28 Dec 2025 12:26:30 +0800 Subject: [PATCH] feat: add snapshot cached manifests --- src/iceberg/manifest/manifest_list.h | 3 +- src/iceberg/manifest/manifest_writer.cc | 1 + src/iceberg/snapshot.cc | 61 +++++++++++++++++++++++++ src/iceberg/snapshot.h | 55 ++++++++++++++++++++++ 4 files changed, 118 insertions(+), 2 deletions(-) diff --git a/src/iceberg/manifest/manifest_list.h b/src/iceberg/manifest/manifest_list.h index 47a7ad48a..da70fb696 100644 --- a/src/iceberg/manifest/manifest_list.h +++ b/src/iceberg/manifest/manifest_list.h @@ -31,7 +31,6 @@ #include "iceberg/partition_spec.h" #include "iceberg/result.h" #include "iceberg/schema_field.h" -#include "iceberg/snapshot.h" #include "iceberg/table_metadata.h" #include "iceberg/type.h" @@ -107,7 +106,7 @@ struct ICEBERG_EXPORT ManifestFile { int64_t min_sequence_number = TableMetadata::kInitialSequenceNumber; /// Field id: 503 /// ID of the snapshot where the manifest file was added - int64_t added_snapshot_id = Snapshot::kInvalidSnapshotId; + int64_t added_snapshot_id = -1; // Snapshot::kInvalidSnapshotId /// Field id: 504 /// Number of entries in the manifest that have status ADDED (1), when null this is /// assumed to be non-zero diff --git a/src/iceberg/manifest/manifest_writer.cc b/src/iceberg/manifest/manifest_writer.cc index 8cd940d56..e77f4e2da 100644 --- a/src/iceberg/manifest/manifest_writer.cc +++ b/src/iceberg/manifest/manifest_writer.cc @@ -29,6 +29,7 @@ #include "iceberg/partition_summary_internal.h" #include "iceberg/result.h" #include "iceberg/schema.h" +#include "iceberg/snapshot.h" #include "iceberg/table_metadata.h" #include "iceberg/util/macros.h" diff --git a/src/iceberg/snapshot.cc b/src/iceberg/snapshot.cc index fb994f8b7..f421e8381 100644 --- a/src/iceberg/snapshot.cc +++ b/src/iceberg/snapshot.cc @@ -19,6 +19,11 @@ #include "iceberg/snapshot.h" +#include "iceberg/file_io.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/util/macros.h" + namespace iceberg { bool SnapshotRef::Branch::Equals(const SnapshotRef::Branch& other) const { @@ -80,4 +85,60 @@ bool Snapshot::Equals(const Snapshot& other) const { schema_id == other.schema_id; } +Result CachedSnapshot::InitManifestsCache( + const Snapshot& snapshot, std::shared_ptr file_io) { + if (file_io == nullptr) { + return InvalidArgument("Cannot cache manifests: FileIO is null"); + } + + // Read manifest list + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ManifestListReader::Make(snapshot.manifest_list, file_io)); + ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, reader->Files()); + + std::vector manifests; + manifests.reserve(manifest_files.size()); + + // Partition manifests: data manifests first, then delete manifests + // First pass: collect data manifests + for (const auto& manifest_file : manifest_files) { + if (manifest_file.content == ManifestContent::kData) { + manifests.push_back(manifest_file); + } + } + size_t data_manifests_count = manifests.size(); + + // Second pass: append delete manifests + for (const auto& manifest_file : manifest_files) { + if (manifest_file.content == ManifestContent::kDeletes) { + manifests.push_back(manifest_file); + } + } + + return std::make_pair(std::move(manifests), data_manifests_count); +} + +Result> CachedSnapshot::Manifests( + std::shared_ptr file_io) const { + ICEBERG_ASSIGN_OR_RAISE(auto cache_ref, manifests_cache_.Get(snapshot_, file_io)); + auto& cache = cache_ref.get(); + return std::span(cache.first.data(), cache.first.size()); +} + +Result> CachedSnapshot::DataManifests( + std::shared_ptr file_io) const { + ICEBERG_ASSIGN_OR_RAISE(auto cache_ref, manifests_cache_.Get(snapshot_, file_io)); + auto& cache = cache_ref.get(); + return std::span(cache.first.data(), cache.second); +} + +Result> CachedSnapshot::DeleteManifests( + std::shared_ptr file_io) const { + ICEBERG_ASSIGN_OR_RAISE(auto cache_ref, manifests_cache_.Get(snapshot_, file_io)); + auto& cache = cache_ref.get(); + const size_t delete_start = cache.second; + const size_t delete_count = cache.first.size() - delete_start; + return std::span(cache.first.data() + delete_start, delete_count); +} + } // namespace iceberg diff --git a/src/iceberg/snapshot.h b/src/iceberg/snapshot.h index 5afe2d22e..a047c76bf 100644 --- a/src/iceberg/snapshot.h +++ b/src/iceberg/snapshot.h @@ -19,7 +19,9 @@ #pragma once +#include #include +#include #include #include #include @@ -27,7 +29,10 @@ #include #include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_list.h" #include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/util/lazy.h" #include "iceberg/util/timepoint.h" namespace iceberg { @@ -260,4 +265,54 @@ struct ICEBERG_EXPORT Snapshot { bool Equals(const Snapshot& other) const; }; +/// \brief A snapshot with cached manifest loading capabilities. +/// +/// This class wraps a Snapshot reference and provides lazy-loading of manifests. +class ICEBERG_EXPORT CachedSnapshot { + public: + explicit CachedSnapshot(const Snapshot& snapshot) : snapshot_(snapshot) {} + + /// \brief Get the underlying Snapshot reference + const Snapshot& snapshot() const { return snapshot_; } + + /// \brief Returns all ManifestFile instances for either data or delete manifests + /// in this snapshot. + /// + /// \param file_io The FileIO instance to use for reading the manifest list + /// \return A span of ManifestFile instances, or an error + Result> Manifests(std::shared_ptr file_io) const; + + /// \brief Returns a ManifestFile for each data manifest in this snapshot. + /// + /// \param file_io The FileIO instance to use for reading the manifest list + /// \return A span of ManifestFile instances, or an error + Result> DataManifests(std::shared_ptr file_io) const; + + /// \brief Returns a ManifestFile for each delete manifest in this snapshot. + /// + /// \param file_io The FileIO instance to use for reading the manifest list + /// \return A span of ManifestFile instances, or an error + Result> DeleteManifests(std::shared_ptr file_io) const; + + private: + /// \brief Cache structure for storing loaded manifests + /// + /// \note Manifests are stored in a single vector with data manifests at the head + /// and delete manifests at the tail, separated by the number of data manifests. + using ManifestsCache = std::pair, size_t>; + + /// \brief Initialize manifests cache by loading them from the manifest list file. + /// \param snapshot The snapshot to initialize the manifests cache for + /// \param file_io The FileIO instance to use for reading the manifest list + /// \return A result containing the manifests cache + static Result InitManifestsCache(const Snapshot& snapshot, + std::shared_ptr file_io); + + /// The underlying snapshot data + const Snapshot& snapshot_; + + /// Lazy-loaded manifests cache + Lazy manifests_cache_; +}; + } // namespace iceberg