Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ set(ICEBERG_SOURCES
manifest/manifest_list.cc
manifest/manifest_reader.cc
manifest/manifest_writer.cc
manifest/rolling_manifest_writer.cc
manifest/v1_metadata.cc
manifest/v2_metadata.cc
manifest/v3_metadata.cc
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class ICEBERG_EXPORT Writer {
virtual Result<Metrics> metrics() = 0;

/// \brief Get the file length.
/// Only valid after the file is closed.
/// This can be called while the writer is still open or after the file is closed.
virtual Result<int64_t> length() = 0;

/// \brief Returns a list of recommended split locations, if applicable, empty
Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/manifest/manifest_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ enum class ManifestStatus {

/// \brief Get the relative manifest status type from int
ICEBERG_EXPORT constexpr Result<ManifestStatus> ManifestStatusFromInt(
int status) noexcept {
int32_t status) noexcept {
switch (status) {
case 0:
return ManifestStatus::kExisting;
Expand Down Expand Up @@ -387,7 +387,7 @@ ICEBERG_EXPORT constexpr std::string_view ToString(DataFile::Content type) noexc

/// \brief Get the relative data file content type from int
ICEBERG_EXPORT constexpr Result<DataFile::Content> DataFileContentFromInt(
int content) noexcept {
int32_t content) noexcept {
switch (content) {
case 0:
return DataFile::Content::kData;
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/manifest/manifest_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ ManifestContent ManifestWriter::content() const { return adapter_->content(); }

Result<Metrics> ManifestWriter::metrics() const { return writer_->metrics(); }

Result<int64_t> ManifestWriter::length() const { return writer_->length(); }

Result<ManifestFile> ManifestWriter::ToManifestFile() const {
if (!closed_) [[unlikely]] {
return Invalid("Cannot get ManifestFile before closing the writer.");
Expand Down
18 changes: 11 additions & 7 deletions src/iceberg/manifest/manifest_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ICEBERG_EXPORT ManifestWriter {

/// \brief Write the entry that all its fields are populated correctly.
/// \param entry Manifest entry to write.
/// \return Status::OK() if entry was written successfully
/// \return Status indicating success or failure
/// \note All other write entry variants delegate to this method after populating
/// the necessary fields.
Status WriteEntry(const ManifestEntry& entry);
Expand All @@ -50,7 +50,7 @@ class ICEBERG_EXPORT ManifestWriter {
///
/// \param file an added data file
/// \param data_sequence_number a data sequence number for the file
/// \return Status::OK() if the entry was written successfully
/// \return Status indicating success or failure
/// \note The entry's snapshot ID will be this manifest's snapshot ID. The entry's data
/// sequence number will be the provided data sequence number. The entry's file sequence
/// number will be assigned at commit.
Expand All @@ -67,7 +67,7 @@ class ICEBERG_EXPORT ManifestWriter {
/// file was added)
/// \param file_sequence_number a file sequence number (assigned when the file was
/// added)
/// \return Status::OK() if the entry was written successfully
/// \return Status indicating success or failure
/// \note The original data and file sequence numbers, snapshot ID, which were assigned
/// at commit, must be preserved when adding an existing entry.
Status WriteExistingEntry(std::shared_ptr<DataFile> file, int64_t file_snapshot_id,
Expand All @@ -83,7 +83,7 @@ class ICEBERG_EXPORT ManifestWriter {
/// file was added)
/// \param file_sequence_number a file sequence number (assigned when the file was
/// added)
/// \return Status::OK() if the entry was written successfully
/// \return Status indicating success or failure
/// \note The entry's snapshot ID will be this manifest's snapshot ID. However, the
/// original data and file sequence numbers of the file must be preserved when the file
/// is marked as deleted.
Expand All @@ -95,7 +95,7 @@ class ICEBERG_EXPORT ManifestWriter {

/// \brief Write manifest entries to file.
/// \param entries Already populated manifest entries to write.
/// \return Status::OK() if all entries were written successfully
/// \return Status indicating success or failure
Status AddAll(const std::vector<ManifestEntry>& entries);

/// \brief Close writer and flush to storage.
Expand All @@ -108,6 +108,10 @@ class ICEBERG_EXPORT ManifestWriter {
/// \note Only valid after the file is closed.
Result<Metrics> metrics() const;

/// \brief Get the current length of the manifest file in bytes.
/// \return The current length of the file, or an error if the operation fails.
Result<int64_t> length() const;

/// \brief Get the ManifestFile object.
/// \note Only valid after the file is closed.
Result<ManifestFile> ToManifestFile() const;
Expand Down Expand Up @@ -187,12 +191,12 @@ class ICEBERG_EXPORT ManifestListWriter {

/// \brief Write manifest file to manifest list file.
/// \param file Manifest file to write.
/// \return Status::OK() if file was written successfully
/// \return Status indicating success or failure
Status Add(const ManifestFile& file);

/// \brief Write manifest file list to manifest list file.
/// \param files Manifest file list to write.
/// \return Status::OK() if all files were written successfully
/// \return Status indicating success or failure
Status AddAll(const std::vector<ManifestFile>& files);

/// \brief Close writer and flush to storage.
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/manifest/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ install_headers(
'manifest_list.h',
'manifest_reader.h',
'manifest_writer.h',
'rolling_manifest_writer.h',
],
subdir: 'iceberg/manifest',
)
119 changes: 119 additions & 0 deletions src/iceberg/manifest/rolling_manifest_writer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/manifest/rolling_manifest_writer.h"

#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/result.h"
#include "iceberg/util/macros.h"

namespace iceberg {

RollingManifestWriter::RollingManifestWriter(
ManifestWriterFactory manifest_writer_factory, int64_t target_file_size_in_bytes)
: manifest_writer_factory_(std::move(manifest_writer_factory)),
target_file_size_in_bytes_(target_file_size_in_bytes) {}

RollingManifestWriter::~RollingManifestWriter() {
// Ensure we close the current writer if not already closed
std::ignore = Close();
}

Status RollingManifestWriter::WriteAddedEntry(
std::shared_ptr<DataFile> file, std::optional<int64_t> data_sequence_number) {
ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter());
ICEBERG_RETURN_UNEXPECTED(
writer->WriteAddedEntry(std::move(file), data_sequence_number));
current_file_rows_++;
return {};
}

Status RollingManifestWriter::WriteExistingEntry(
std::shared_ptr<DataFile> file, int64_t file_snapshot_id,
int64_t data_sequence_number, std::optional<int64_t> file_sequence_number) {
ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter());
ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(
std::move(file), file_snapshot_id, data_sequence_number, file_sequence_number));
current_file_rows_++;
return {};
}

Status RollingManifestWriter::WriteDeletedEntry(
std::shared_ptr<DataFile> file, int64_t data_sequence_number,
std::optional<int64_t> file_sequence_number) {
ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter());
ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(
std::move(file), data_sequence_number, file_sequence_number));
current_file_rows_++;
return {};
}

Status RollingManifestWriter::Close() {
if (!closed_) {
ICEBERG_RETURN_UNEXPECTED(CloseCurrentWriter());
closed_ = true;
}
return {};
}

Result<std::vector<ManifestFile>> RollingManifestWriter::ToManifestFiles() const {
if (!closed_) {
return Invalid("Cannot get ManifestFile list from unclosed writer");
}
return manifest_files_;
}

Result<ManifestWriter*> RollingManifestWriter::CurrentWriter() {
if (current_writer_ == nullptr) {
ICEBERG_ASSIGN_OR_RAISE(current_writer_, manifest_writer_factory_());
} else if (ShouldRollToNewFile()) {
ICEBERG_RETURN_UNEXPECTED(CloseCurrentWriter());
ICEBERG_ASSIGN_OR_RAISE(current_writer_, manifest_writer_factory_());
}

return current_writer_.get();
}

bool RollingManifestWriter::ShouldRollToNewFile() const {
if (current_writer_ == nullptr) {
return false;
}
// Roll when row count is a multiple of the divisor and file size >= target
if (current_file_rows_ % kRowsDivisor == 0) {
auto length_result = current_writer_->length();
if (length_result.has_value()) {
return length_result.value() >= target_file_size_in_bytes_;
}
// TODO(anyone): If we can't get the length, don't roll for now, revisit this later.
}
return false;
}

Status RollingManifestWriter::CloseCurrentWriter() {
if (current_writer_ != nullptr) {
ICEBERG_RETURN_UNEXPECTED(current_writer_->Close());
ICEBERG_ASSIGN_OR_RAISE(auto manifest_file, current_writer_->ToManifestFile());
manifest_files_.push_back(std::move(manifest_file));
current_writer_.reset();
current_file_rows_ = 0;
}
return {};
}

} // namespace iceberg
128 changes: 128 additions & 0 deletions src/iceberg/manifest/rolling_manifest_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

/// \file iceberg/manifest/rolling_manifest_writer.h
/// Rolling manifest writer that can produce multiple manifest files.

#include <functional>
#include <memory>
#include <vector>

#include "iceberg/iceberg_export.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_list.h"
#include "iceberg/manifest/manifest_writer.h"
#include "iceberg/result.h"

namespace iceberg {

/// \brief A rolling manifest writer that can produce multiple manifest files.
class ICEBERG_EXPORT RollingManifestWriter {
public:
/// \brief Factory function type for creating ManifestWriter instances.
using ManifestWriterFactory = std::function<Result<std::unique_ptr<ManifestWriter>>()>;

/// \brief Construct a rolling manifest writer.
/// \param manifest_writer_factory Factory function to create new ManifestWriter
/// instances.
/// \param target_file_size_in_bytes Target file size in bytes. When the current
/// file reaches this size (and row count is a multiple of 250), a new file
/// will be created.
RollingManifestWriter(ManifestWriterFactory manifest_writer_factory,
int64_t target_file_size_in_bytes);

~RollingManifestWriter();

/// \brief Add an added entry for a file.
///
/// \param file a data file
/// \return Status indicating success or failure
/// \note The entry's snapshot ID will be this manifest's snapshot ID. The
/// entry's data sequence number will be the provided data sequence number.
/// The entry's file sequence number will be assigned at commit.
Status WriteAddedEntry(std::shared_ptr<DataFile> file,
std::optional<int64_t> data_sequence_number = std::nullopt);

/// \brief Add an existing entry for a file.
///
/// \param file an existing data file
/// \param file_snapshot_id snapshot ID when the data file was added to the table
/// \param data_sequence_number a data sequence number of the file (assigned when
/// the file was added)
/// \param file_sequence_number a file sequence number (assigned when the file
/// was added)
/// \return Status indicating success or failure
/// \note The original data and file sequence numbers, snapshot ID, which were
/// assigned at commit, must be preserved when adding an existing entry.
Status WriteExistingEntry(std::shared_ptr<DataFile> file, int64_t file_snapshot_id,
int64_t data_sequence_number,
std::optional<int64_t> file_sequence_number = std::nullopt);

/// \brief Add a delete entry for a file.
///
/// \param file a deleted data file
/// \param data_sequence_number a data sequence number of the file (assigned when
/// the file was added)
/// \param file_sequence_number a file sequence number (assigned when the file
/// was added)
/// \return Status indicating success or failure
/// \note The entry's snapshot ID will be this manifest's snapshot ID. However,
/// the original data and file sequence numbers of the file must be preserved
/// when the file is marked as deleted.
Status WriteDeletedEntry(std::shared_ptr<DataFile> file, int64_t data_sequence_number,
std::optional<int64_t> file_sequence_number = std::nullopt);

/// \brief Close the rolling manifest writer.
Status Close();

/// \brief Get the list of manifest files produced by this writer.
/// \return A vector of ManifestFile objects
/// \note Only valid after the writer is closed.
Result<std::vector<ManifestFile>> ToManifestFiles() const;

private:
/// \brief Get or create the current writer, rolling to a new file if needed.
/// \return The current ManifestWriter, or an error if creation fails
Result<ManifestWriter*> CurrentWriter();

/// \brief Check if we should roll to a new file.
///
/// This method checks if the current file has reached the target size
/// or the number of rows has reached the threshold. If so, it rolls to a new file.
bool ShouldRollToNewFile() const;

/// \brief Close the current writer and add its ManifestFile to the list.
Status CloseCurrentWriter();

/// \brief The number of rows after which to consider rolling to a new file.
/// \note This aligned with Iceberg's Java impl.
static constexpr int64_t kRowsDivisor = 250;

ManifestWriterFactory manifest_writer_factory_;
int64_t target_file_size_in_bytes_;
std::vector<ManifestFile> manifest_files_;

int64_t current_file_rows_{0};
std::unique_ptr<ManifestWriter> current_writer_{nullptr};
bool closed_{false};
};

} // namespace iceberg
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ iceberg_sources = files(
'manifest/manifest_list.cc',
'manifest/manifest_reader.cc',
'manifest/manifest_writer.cc',
'manifest/rolling_manifest_writer.cc',
'manifest/v1_metadata.cc',
'manifest/v2_metadata.cc',
'manifest/v3_metadata.cc',
Expand Down
3 changes: 2 additions & 1 deletion src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ if(ICEBERG_BUILD_BUNDLE)
manifest_list_versions_test.cc
manifest_reader_stats_test.cc
manifest_reader_test.cc
manifest_writer_versions_test.cc)
manifest_writer_versions_test.cc
rolling_manifest_writer_test.cc)

add_iceberg_test(parquet_test
USE_BUNDLE
Expand Down
Loading
Loading