Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ set(ICEBERG_INCLUDES "$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}/src>"
set(ICEBERG_SOURCES
arrow_c_data_guard_internal.cc
catalog/memory/in_memory_catalog.cc
data/writer.cc
expression/aggregate.cc
expression/binder.cc
expression/evaluator.cc
Expand Down Expand Up @@ -142,6 +143,7 @@ add_iceberg_lib(iceberg
iceberg_install_all_headers(iceberg)

add_subdirectory(catalog)
add_subdirectory(data)
add_subdirectory(expression)
add_subdirectory(manifest)
add_subdirectory(row)
Expand Down
18 changes: 18 additions & 0 deletions src/iceberg/data/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

iceberg_install_all_headers(iceberg/data)
27 changes: 27 additions & 0 deletions src/iceberg/data/writer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/data/writer.h"

namespace iceberg {

// FileWriter is a pure virtual interface class.
// Implementations will be provided in subsequent tasks.

} // namespace iceberg
91 changes: 91 additions & 0 deletions src/iceberg/data/writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

/// \file iceberg/data/writer.h
/// Base interface for Iceberg data file writers.

#include <cstdint>
#include <memory>
#include <vector>

#include "iceberg/arrow_c_data.h"
#include "iceberg/iceberg_export.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/result.h"

namespace iceberg {

/// \brief Base interface for data file writers.
///
/// This interface defines the common operations for writing Iceberg data files,
/// including data files, equality delete files, and position delete files.
///
/// Typical usage:
/// 1. Create a writer instance (via concrete implementation)
/// 2. Call Write() one or more times to write data
/// 3. Call Close() to finalize the file
/// 4. Call Metadata() to get file metadata (only valid after Close())
///
/// \note This interface is not thread-safe. Concurrent calls to Write()
/// from multiple threads on the same instance are not supported.
///
/// \note This interface uses PascalCase method naming (Write, Length, Close, Metadata)
/// to distinguish it from the lower-level iceberg/file_writer.h::Writer interface which
/// uses lowercase naming. FileWriter is the Iceberg-specific data file writer
/// abstraction, while Writer is the file format-level abstraction.
Comment on lines +49 to +53
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
///
/// \note This interface uses PascalCase method naming (Write, Length, Close, Metadata)
/// to distinguish it from the lower-level iceberg/file_writer.h::Writer interface which
/// uses lowercase naming. FileWriter is the Iceberg-specific data file writer
/// abstraction, while Writer is the file format-level abstraction.

class ICEBERG_EXPORT FileWriter {
public:
virtual ~FileWriter() = default;

/// \brief Write a batch of records.
///
/// \param data Arrow array containing the records to write.
/// \return Status indicating success or failure.
virtual Status Write(ArrowArray* data) = 0;

/// \brief Get the current number of bytes written.
///
/// \return Result containing the number of bytes written or an error.
virtual Result<int64_t> Length() const = 0;

/// \brief Close the writer and finalize the file.
///
/// \return Status indicating success or failure.
virtual Status Close() = 0;

/// \brief File metadata for all files produced by the writer.
struct ICEBERG_EXPORT WriteResult {
/// Usually a writer produces a single data or delete file.
/// Position delete writer may produce multiple file-scoped delete files.
/// In the future, multiple files can be produced if file rolling is supported.
std::vector<std::shared_ptr<DataFile>> data_files;
};

/// \brief Get file metadata for all files produced by this writer.
///
/// This method should be called after Close() to retrieve the metadata
/// for all files written by this writer.
///
/// \return Result containing the write result or an error.
virtual Result<WriteResult> Metadata() = 0;
};

} // namespace iceberg
2 changes: 2 additions & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ add_iceberg_test(util_test

add_iceberg_test(roaring_test SOURCES roaring_test.cc)

add_iceberg_test(data_writer_test SOURCES data_writer_test.cc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to move it to the if(ICEBERG_BUILD_BUNDLE) block below since it depends on Avro and Parquet libraries.


if(ICEBERG_BUILD_BUNDLE)
add_iceberg_test(avro_test
USE_BUNDLE
Expand Down
218 changes: 218 additions & 0 deletions src/iceberg/test/data_writer_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include <memory>
#include <vector>

#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "iceberg/arrow_c_data.h"
#include "iceberg/data/writer.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/result.h"
#include "iceberg/test/matchers.h"

namespace iceberg {

// Mock implementation of FileWriter for testing
class MockFileWriter : public FileWriter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, we don't need to add test cases for a pure interface classes. We can keep it for now and then remove all these cases once we have implemented DataWriter.

public:
MockFileWriter() = default;

Status Write(ArrowArray* data) override {
if (is_closed_) {
return Invalid("Writer is closed");
}
if (data == nullptr) {
return Invalid("Null data provided");
}
write_count_++;
// Simulate writing some bytes
bytes_written_ += 1024;
return {};
}

Result<int64_t> Length() const override { return bytes_written_; }

Status Close() override {
if (is_closed_) {
return Invalid("Writer already closed");
}
is_closed_ = true;
return {};
}

Result<WriteResult> Metadata() override {
if (!is_closed_) {
return Invalid("Writer must be closed before getting metadata");
}

WriteResult result;
auto data_file = std::make_shared<DataFile>();
data_file->file_path = "/test/data/file.parquet";
data_file->file_format = FileFormatType::kParquet;
data_file->record_count = write_count_ * 100;
data_file->file_size_in_bytes = bytes_written_;
result.data_files.push_back(data_file);

return result;
}

bool is_closed() const { return is_closed_; }
int32_t write_count() const { return write_count_; }

private:
int64_t bytes_written_ = 0;
bool is_closed_ = false;
int32_t write_count_ = 0;
};

TEST(FileWriterTest, BasicWriteOperation) {
MockFileWriter writer;

// Create a dummy ArrowArray (normally this would contain actual data)
ArrowArray dummy_array = {};

ASSERT_THAT(writer.Write(&dummy_array), IsOk());
ASSERT_EQ(writer.write_count(), 1);

auto length_result = writer.Length();
ASSERT_THAT(length_result, IsOk());
ASSERT_EQ(*length_result, 1024);
}

TEST(FileWriterTest, MultipleWrites) {
MockFileWriter writer;
ArrowArray dummy_array = {};

// Write multiple times
for (int i = 0; i < 5; i++) {
ASSERT_THAT(writer.Write(&dummy_array), IsOk());
}

ASSERT_EQ(writer.write_count(), 5);

auto length_result = writer.Length();
ASSERT_THAT(length_result, IsOk());
ASSERT_EQ(*length_result, 5120); // 5 * 1024
}

TEST(FileWriterTest, WriteNullData) {
MockFileWriter writer;

auto status = writer.Write(nullptr);
ASSERT_THAT(status, HasErrorMessage("Null data provided"));
}

TEST(FileWriterTest, CloseWriter) {
MockFileWriter writer;
ArrowArray dummy_array = {};

ASSERT_THAT(writer.Write(&dummy_array), IsOk());
ASSERT_FALSE(writer.is_closed());

ASSERT_THAT(writer.Close(), IsOk());
ASSERT_TRUE(writer.is_closed());
}

TEST(FileWriterTest, DoubleClose) {
MockFileWriter writer;

ASSERT_THAT(writer.Close(), IsOk());
auto status = writer.Close();
ASSERT_THAT(status, HasErrorMessage("Writer already closed"));
}

TEST(FileWriterTest, WriteAfterClose) {
MockFileWriter writer;
ArrowArray dummy_array = {};

ASSERT_THAT(writer.Close(), IsOk());

auto status = writer.Write(&dummy_array);
ASSERT_THAT(status, HasErrorMessage("Writer is closed"));
}

TEST(FileWriterTest, MetadataBeforeClose) {
MockFileWriter writer;
ArrowArray dummy_array = {};

ASSERT_THAT(writer.Write(&dummy_array), IsOk());

auto metadata_result = writer.Metadata();
ASSERT_THAT(metadata_result,
HasErrorMessage("Writer must be closed before getting metadata"));
}

TEST(FileWriterTest, MetadataAfterClose) {
MockFileWriter writer;
ArrowArray dummy_array = {};

// Write some data
ASSERT_THAT(writer.Write(&dummy_array), IsOk());
ASSERT_THAT(writer.Write(&dummy_array), IsOk());
ASSERT_THAT(writer.Write(&dummy_array), IsOk());

// Close the writer
ASSERT_THAT(writer.Close(), IsOk());

// Get metadata
auto metadata_result = writer.Metadata();
ASSERT_THAT(metadata_result, IsOk());

const auto& result = *metadata_result;
ASSERT_EQ(result.data_files.size(), 1);

const auto& data_file = result.data_files[0];
ASSERT_EQ(data_file->file_path, "/test/data/file.parquet");
ASSERT_EQ(data_file->file_format, FileFormatType::kParquet);
ASSERT_EQ(data_file->record_count, 300); // 3 writes * 100 records
ASSERT_EQ(data_file->file_size_in_bytes, 3072); // 3 * 1024
}

TEST(FileWriterTest, WriteResultStructure) {
FileWriter::WriteResult result;

// Test that WriteResult can hold multiple data files
auto data_file1 = std::make_shared<DataFile>();
data_file1->file_path = "/test/data/file1.parquet";
data_file1->record_count = 100;

auto data_file2 = std::make_shared<DataFile>();
data_file2->file_path = "/test/data/file2.parquet";
data_file2->record_count = 200;

result.data_files.push_back(data_file1);
result.data_files.push_back(data_file2);

ASSERT_EQ(result.data_files.size(), 2);
ASSERT_EQ(result.data_files[0]->file_path, "/test/data/file1.parquet");
ASSERT_EQ(result.data_files[0]->record_count, 100);
ASSERT_EQ(result.data_files[1]->file_path, "/test/data/file2.parquet");
ASSERT_EQ(result.data_files[1]->record_count, 200);
}

TEST(FileWriterTest, EmptyWriteResult) {
FileWriter::WriteResult result;
ASSERT_EQ(result.data_files.size(), 0);
ASSERT_TRUE(result.data_files.empty());
}

} // namespace iceberg
Loading