-
Notifications
You must be signed in to change notification settings - Fork 76
feat: add rolling manifest writer #443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
zhjwpku
wants to merge
3
commits into
apache:main
Choose a base branch
from
zhjwpku:add_rolling_manifest_writer
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| #include "iceberg/manifest/rolling_manifest_writer.h" | ||
|
|
||
| #include "iceberg/manifest/manifest_entry.h" | ||
| #include "iceberg/result.h" | ||
| #include "iceberg/util/macros.h" | ||
|
|
||
| namespace iceberg { | ||
|
|
||
| RollingManifestWriter::RollingManifestWriter( | ||
| ManifestWriterFactory manifest_writer_factory, int64_t target_file_size_in_bytes) | ||
| : manifest_writer_factory_(std::move(manifest_writer_factory)), | ||
| target_file_size_in_bytes_(target_file_size_in_bytes) {} | ||
|
|
||
| RollingManifestWriter::~RollingManifestWriter() { | ||
| // Ensure we close the current writer if not already closed | ||
| std::ignore = Close(); | ||
| } | ||
|
|
||
| Status RollingManifestWriter::WriteAddedEntry( | ||
| std::shared_ptr<DataFile> file, std::optional<int64_t> data_sequence_number) { | ||
| ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter()); | ||
| ICEBERG_RETURN_UNEXPECTED( | ||
| writer->WriteAddedEntry(std::move(file), data_sequence_number)); | ||
| current_file_rows_++; | ||
| return {}; | ||
| } | ||
|
|
||
| Status RollingManifestWriter::WriteExistingEntry( | ||
| std::shared_ptr<DataFile> file, int64_t file_snapshot_id, | ||
| int64_t data_sequence_number, std::optional<int64_t> file_sequence_number) { | ||
| ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter()); | ||
| ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry( | ||
| std::move(file), file_snapshot_id, data_sequence_number, file_sequence_number)); | ||
| current_file_rows_++; | ||
| return {}; | ||
| } | ||
|
|
||
| Status RollingManifestWriter::WriteDeletedEntry( | ||
| std::shared_ptr<DataFile> file, int64_t data_sequence_number, | ||
| std::optional<int64_t> file_sequence_number) { | ||
| ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter()); | ||
| ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry( | ||
| std::move(file), data_sequence_number, file_sequence_number)); | ||
| current_file_rows_++; | ||
| return {}; | ||
| } | ||
|
|
||
| Status RollingManifestWriter::Close() { | ||
| if (!closed_) { | ||
| ICEBERG_RETURN_UNEXPECTED(CloseCurrentWriter()); | ||
| closed_ = true; | ||
| } | ||
| return {}; | ||
| } | ||
|
|
||
| Result<std::vector<ManifestFile>> RollingManifestWriter::ToManifestFiles() const { | ||
| if (!closed_) { | ||
| return Invalid("Cannot get ManifestFile list from unclosed writer"); | ||
| } | ||
| return manifest_files_; | ||
| } | ||
|
|
||
| Result<ManifestWriter*> RollingManifestWriter::CurrentWriter() { | ||
| if (current_writer_ == nullptr) { | ||
| ICEBERG_ASSIGN_OR_RAISE(current_writer_, manifest_writer_factory_()); | ||
| } else if (ShouldRollToNewFile()) { | ||
| ICEBERG_RETURN_UNEXPECTED(CloseCurrentWriter()); | ||
| ICEBERG_ASSIGN_OR_RAISE(current_writer_, manifest_writer_factory_()); | ||
| } | ||
|
|
||
| return current_writer_.get(); | ||
| } | ||
|
|
||
| bool RollingManifestWriter::ShouldRollToNewFile() const { | ||
| if (current_writer_ == nullptr) { | ||
| return false; | ||
| } | ||
| // Roll when row count is a multiple of the divisor and file size >= target | ||
| if (current_file_rows_ % kRowsDivisor == 0) { | ||
| auto length_result = current_writer_->length(); | ||
| if (length_result.has_value()) { | ||
| return length_result.value() >= target_file_size_in_bytes_; | ||
| } | ||
| // TODO(anyone): If we can't get the length, don't roll for now, revisit this later. | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| Status RollingManifestWriter::CloseCurrentWriter() { | ||
| if (current_writer_ != nullptr) { | ||
| ICEBERG_RETURN_UNEXPECTED(current_writer_->Close()); | ||
| ICEBERG_ASSIGN_OR_RAISE(auto manifest_file, current_writer_->ToManifestFile()); | ||
| manifest_files_.push_back(std::move(manifest_file)); | ||
| current_writer_.reset(); | ||
| current_file_rows_ = 0; | ||
| } | ||
| return {}; | ||
| } | ||
|
|
||
| } // namespace iceberg |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,128 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| #pragma once | ||
|
|
||
| /// \file iceberg/manifest/rolling_manifest_writer.h | ||
| /// Rolling manifest writer that can produce multiple manifest files. | ||
|
|
||
| #include <functional> | ||
| #include <memory> | ||
| #include <vector> | ||
|
|
||
| #include "iceberg/iceberg_export.h" | ||
| #include "iceberg/manifest/manifest_entry.h" | ||
| #include "iceberg/manifest/manifest_list.h" | ||
| #include "iceberg/manifest/manifest_writer.h" | ||
| #include "iceberg/result.h" | ||
|
|
||
| namespace iceberg { | ||
|
|
||
| /// \brief A rolling manifest writer that can produce multiple manifest files. | ||
| class ICEBERG_EXPORT RollingManifestWriter { | ||
| public: | ||
| /// \brief Factory function type for creating ManifestWriter instances. | ||
| using ManifestWriterFactory = std::function<Result<std::unique_ptr<ManifestWriter>>()>; | ||
|
|
||
| /// \brief Construct a rolling manifest writer. | ||
| /// \param manifest_writer_factory Factory function to create new ManifestWriter | ||
| /// instances. | ||
| /// \param target_file_size_in_bytes Target file size in bytes. When the current | ||
| /// file reaches this size (and row count is a multiple of 250), a new file | ||
| /// will be created. | ||
| RollingManifestWriter(ManifestWriterFactory manifest_writer_factory, | ||
| int64_t target_file_size_in_bytes); | ||
|
|
||
| ~RollingManifestWriter(); | ||
|
|
||
| /// \brief Add an added entry for a file. | ||
| /// | ||
| /// \param file a data file | ||
| /// \return Status indicating success or failure | ||
| /// \note The entry's snapshot ID will be this manifest's snapshot ID. The | ||
| /// entry's data sequence number will be the provided data sequence number. | ||
| /// The entry's file sequence number will be assigned at commit. | ||
| Status WriteAddedEntry(std::shared_ptr<DataFile> file, | ||
| std::optional<int64_t> data_sequence_number = std::nullopt); | ||
|
|
||
| /// \brief Add an existing entry for a file. | ||
| /// | ||
| /// \param file an existing data file | ||
| /// \param file_snapshot_id snapshot ID when the data file was added to the table | ||
| /// \param data_sequence_number a data sequence number of the file (assigned when | ||
| /// the file was added) | ||
| /// \param file_sequence_number a file sequence number (assigned when the file | ||
| /// was added) | ||
| /// \return Status indicating success or failure | ||
| /// \note The original data and file sequence numbers, snapshot ID, which were | ||
| /// assigned at commit, must be preserved when adding an existing entry. | ||
| Status WriteExistingEntry(std::shared_ptr<DataFile> file, int64_t file_snapshot_id, | ||
| int64_t data_sequence_number, | ||
| std::optional<int64_t> file_sequence_number = std::nullopt); | ||
|
|
||
| /// \brief Add a delete entry for a file. | ||
| /// | ||
| /// \param file a deleted data file | ||
| /// \param data_sequence_number a data sequence number of the file (assigned when | ||
| /// the file was added) | ||
| /// \param file_sequence_number a file sequence number (assigned when the file | ||
| /// was added) | ||
| /// \return Status indicating success or failure | ||
| /// \note The entry's snapshot ID will be this manifest's snapshot ID. However, | ||
| /// the original data and file sequence numbers of the file must be preserved | ||
| /// when the file is marked as deleted. | ||
| Status WriteDeletedEntry(std::shared_ptr<DataFile> file, int64_t data_sequence_number, | ||
| std::optional<int64_t> file_sequence_number = std::nullopt); | ||
|
|
||
| /// \brief Close the rolling manifest writer. | ||
| Status Close(); | ||
|
|
||
| /// \brief Get the list of manifest files produced by this writer. | ||
| /// \return A vector of ManifestFile objects | ||
| /// \note Only valid after the writer is closed. | ||
| Result<std::vector<ManifestFile>> ToManifestFiles() const; | ||
|
|
||
| private: | ||
| /// \brief Get or create the current writer, rolling to a new file if needed. | ||
| /// \return The current ManifestWriter, or an error if creation fails | ||
| Result<ManifestWriter*> CurrentWriter(); | ||
|
|
||
| /// \brief Check if we should roll to a new file. | ||
| /// | ||
| /// This method checks if the current file has reached the target size | ||
| /// or the number of rows has reached the threshold. If so, it rolls to a new file. | ||
| bool ShouldRollToNewFile() const; | ||
|
|
||
| /// \brief Close the current writer and add its ManifestFile to the list. | ||
| Status CloseCurrentWriter(); | ||
|
|
||
| /// \brief The number of rows after which to consider rolling to a new file. | ||
| /// \note This aligned with Iceberg's Java impl. | ||
| static constexpr int64_t kRowsDivisor = 250; | ||
|
|
||
| ManifestWriterFactory manifest_writer_factory_; | ||
| int64_t target_file_size_in_bytes_; | ||
| std::vector<ManifestFile> manifest_files_; | ||
|
|
||
| int64_t current_file_rows_{0}; | ||
| std::unique_ptr<ManifestWriter> current_writer_{nullptr}; | ||
| bool closed_{false}; | ||
| }; | ||
|
|
||
| } // namespace iceberg | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.