-
Notifications
You must be signed in to change notification settings - Fork 76
feat: add FileWriter base interface for data file writers #446
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
||
| iceberg_install_all_headers(iceberg/data) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| #include "iceberg/data/writer.h" | ||
|
|
||
| namespace iceberg { | ||
|
|
||
| // FileWriter is a pure virtual interface class. | ||
| // Implementations will be provided in subsequent tasks. | ||
|
|
||
| } // namespace iceberg |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,91 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| #pragma once | ||
|
|
||
| /// \file iceberg/data/writer.h | ||
| /// Base interface for Iceberg data file writers. | ||
|
|
||
| #include <cstdint> | ||
| #include <memory> | ||
| #include <vector> | ||
|
|
||
| #include "iceberg/arrow_c_data.h" | ||
| #include "iceberg/iceberg_export.h" | ||
| #include "iceberg/manifest/manifest_entry.h" | ||
| #include "iceberg/result.h" | ||
|
|
||
| namespace iceberg { | ||
|
|
||
| /// \brief Base interface for data file writers. | ||
| /// | ||
| /// This interface defines the common operations for writing Iceberg data files, | ||
| /// including data files, equality delete files, and position delete files. | ||
| /// | ||
| /// Typical usage: | ||
| /// 1. Create a writer instance (via concrete implementation) | ||
| /// 2. Call Write() one or more times to write data | ||
| /// 3. Call Close() to finalize the file | ||
| /// 4. Call Metadata() to get file metadata (only valid after Close()) | ||
| /// | ||
| /// \note This interface is not thread-safe. Concurrent calls to Write() | ||
| /// from multiple threads on the same instance are not supported. | ||
| /// | ||
| /// \note This interface uses PascalCase method naming (Write, Length, Close, Metadata) | ||
| /// to distinguish it from the lower-level iceberg/file_writer.h::Writer interface which | ||
| /// uses lowercase naming. FileWriter is the Iceberg-specific data file writer | ||
| /// abstraction, while Writer is the file format-level abstraction. | ||
| class ICEBERG_EXPORT FileWriter { | ||
| public: | ||
| virtual ~FileWriter() = default; | ||
|
|
||
| /// \brief Write a batch of records. | ||
| /// | ||
| /// \param data Arrow array containing the records to write. | ||
| /// \return Status indicating success or failure. | ||
| virtual Status Write(ArrowArray* data) = 0; | ||
|
|
||
| /// \brief Get the current number of bytes written. | ||
| /// | ||
| /// \return Result containing the number of bytes written or an error. | ||
| virtual Result<int64_t> Length() const = 0; | ||
|
|
||
| /// \brief Close the writer and finalize the file. | ||
| /// | ||
| /// \return Status indicating success or failure. | ||
| virtual Status Close() = 0; | ||
|
|
||
| /// \brief File metadata for all files produced by the writer. | ||
| struct ICEBERG_EXPORT WriteResult { | ||
| /// Usually a writer produces a single data or delete file. | ||
| /// Position delete writer may produce multiple file-scoped delete files. | ||
| /// In the future, multiple files can be produced if file rolling is supported. | ||
| std::vector<std::shared_ptr<DataFile>> data_files; | ||
| }; | ||
|
|
||
| /// \brief Get file metadata for all files produced by this writer. | ||
| /// | ||
| /// This method should be called after Close() to retrieve the metadata | ||
| /// for all files written by this writer. | ||
| /// | ||
| /// \return Result containing the write result or an error. | ||
| virtual Result<WriteResult> Metadata() = 0; | ||
| }; | ||
|
|
||
| } // namespace iceberg | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -113,6 +113,8 @@ add_iceberg_test(util_test | |
|
|
||
| add_iceberg_test(roaring_test SOURCES roaring_test.cc) | ||
|
|
||
| add_iceberg_test(data_writer_test SOURCES data_writer_test.cc) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to move it to the |
||
|
|
||
| if(ICEBERG_BUILD_BUNDLE) | ||
| add_iceberg_test(avro_test | ||
| USE_BUNDLE | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,218 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| #include <memory> | ||
| #include <vector> | ||
|
|
||
| #include <gmock/gmock.h> | ||
| #include <gtest/gtest.h> | ||
|
|
||
| #include "iceberg/arrow_c_data.h" | ||
| #include "iceberg/data/writer.h" | ||
| #include "iceberg/manifest/manifest_entry.h" | ||
| #include "iceberg/result.h" | ||
| #include "iceberg/test/matchers.h" | ||
|
|
||
| namespace iceberg { | ||
|
|
||
| // Mock implementation of FileWriter for testing | ||
| class MockFileWriter : public FileWriter { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMHO, we don't need to add test cases for a pure interface classes. We can keep it for now and then remove all these cases once we have implemented |
||
| public: | ||
| MockFileWriter() = default; | ||
|
|
||
| Status Write(ArrowArray* data) override { | ||
| if (is_closed_) { | ||
| return Invalid("Writer is closed"); | ||
| } | ||
| if (data == nullptr) { | ||
| return Invalid("Null data provided"); | ||
| } | ||
| write_count_++; | ||
| // Simulate writing some bytes | ||
| bytes_written_ += 1024; | ||
| return {}; | ||
| } | ||
|
|
||
| Result<int64_t> Length() const override { return bytes_written_; } | ||
|
|
||
| Status Close() override { | ||
| if (is_closed_) { | ||
| return Invalid("Writer already closed"); | ||
| } | ||
| is_closed_ = true; | ||
| return {}; | ||
| } | ||
|
|
||
| Result<WriteResult> Metadata() override { | ||
| if (!is_closed_) { | ||
| return Invalid("Writer must be closed before getting metadata"); | ||
| } | ||
|
|
||
| WriteResult result; | ||
| auto data_file = std::make_shared<DataFile>(); | ||
| data_file->file_path = "/test/data/file.parquet"; | ||
| data_file->file_format = FileFormatType::kParquet; | ||
| data_file->record_count = write_count_ * 100; | ||
| data_file->file_size_in_bytes = bytes_written_; | ||
| result.data_files.push_back(data_file); | ||
|
|
||
| return result; | ||
| } | ||
|
|
||
| bool is_closed() const { return is_closed_; } | ||
| int32_t write_count() const { return write_count_; } | ||
|
|
||
| private: | ||
| int64_t bytes_written_ = 0; | ||
| bool is_closed_ = false; | ||
| int32_t write_count_ = 0; | ||
| }; | ||
|
|
||
| TEST(FileWriterTest, BasicWriteOperation) { | ||
| MockFileWriter writer; | ||
|
|
||
| // Create a dummy ArrowArray (normally this would contain actual data) | ||
| ArrowArray dummy_array = {}; | ||
|
|
||
| ASSERT_THAT(writer.Write(&dummy_array), IsOk()); | ||
| ASSERT_EQ(writer.write_count(), 1); | ||
|
|
||
| auto length_result = writer.Length(); | ||
| ASSERT_THAT(length_result, IsOk()); | ||
| ASSERT_EQ(*length_result, 1024); | ||
| } | ||
|
|
||
| TEST(FileWriterTest, MultipleWrites) { | ||
| MockFileWriter writer; | ||
| ArrowArray dummy_array = {}; | ||
|
|
||
| // Write multiple times | ||
| for (int i = 0; i < 5; i++) { | ||
| ASSERT_THAT(writer.Write(&dummy_array), IsOk()); | ||
| } | ||
|
|
||
| ASSERT_EQ(writer.write_count(), 5); | ||
|
|
||
| auto length_result = writer.Length(); | ||
| ASSERT_THAT(length_result, IsOk()); | ||
| ASSERT_EQ(*length_result, 5120); // 5 * 1024 | ||
| } | ||
|
|
||
| TEST(FileWriterTest, WriteNullData) { | ||
| MockFileWriter writer; | ||
|
|
||
| auto status = writer.Write(nullptr); | ||
| ASSERT_THAT(status, HasErrorMessage("Null data provided")); | ||
| } | ||
|
|
||
| TEST(FileWriterTest, CloseWriter) { | ||
| MockFileWriter writer; | ||
| ArrowArray dummy_array = {}; | ||
|
|
||
| ASSERT_THAT(writer.Write(&dummy_array), IsOk()); | ||
| ASSERT_FALSE(writer.is_closed()); | ||
|
|
||
| ASSERT_THAT(writer.Close(), IsOk()); | ||
| ASSERT_TRUE(writer.is_closed()); | ||
| } | ||
|
|
||
| TEST(FileWriterTest, DoubleClose) { | ||
| MockFileWriter writer; | ||
|
|
||
| ASSERT_THAT(writer.Close(), IsOk()); | ||
| auto status = writer.Close(); | ||
| ASSERT_THAT(status, HasErrorMessage("Writer already closed")); | ||
| } | ||
|
|
||
| TEST(FileWriterTest, WriteAfterClose) { | ||
| MockFileWriter writer; | ||
| ArrowArray dummy_array = {}; | ||
|
|
||
| ASSERT_THAT(writer.Close(), IsOk()); | ||
|
|
||
| auto status = writer.Write(&dummy_array); | ||
| ASSERT_THAT(status, HasErrorMessage("Writer is closed")); | ||
| } | ||
|
|
||
| TEST(FileWriterTest, MetadataBeforeClose) { | ||
| MockFileWriter writer; | ||
| ArrowArray dummy_array = {}; | ||
|
|
||
| ASSERT_THAT(writer.Write(&dummy_array), IsOk()); | ||
|
|
||
| auto metadata_result = writer.Metadata(); | ||
| ASSERT_THAT(metadata_result, | ||
| HasErrorMessage("Writer must be closed before getting metadata")); | ||
| } | ||
|
|
||
| TEST(FileWriterTest, MetadataAfterClose) { | ||
| MockFileWriter writer; | ||
| ArrowArray dummy_array = {}; | ||
|
|
||
| // Write some data | ||
| ASSERT_THAT(writer.Write(&dummy_array), IsOk()); | ||
| ASSERT_THAT(writer.Write(&dummy_array), IsOk()); | ||
| ASSERT_THAT(writer.Write(&dummy_array), IsOk()); | ||
|
|
||
| // Close the writer | ||
| ASSERT_THAT(writer.Close(), IsOk()); | ||
|
|
||
| // Get metadata | ||
| auto metadata_result = writer.Metadata(); | ||
| ASSERT_THAT(metadata_result, IsOk()); | ||
|
|
||
| const auto& result = *metadata_result; | ||
| ASSERT_EQ(result.data_files.size(), 1); | ||
|
|
||
| const auto& data_file = result.data_files[0]; | ||
| ASSERT_EQ(data_file->file_path, "/test/data/file.parquet"); | ||
| ASSERT_EQ(data_file->file_format, FileFormatType::kParquet); | ||
| ASSERT_EQ(data_file->record_count, 300); // 3 writes * 100 records | ||
| ASSERT_EQ(data_file->file_size_in_bytes, 3072); // 3 * 1024 | ||
| } | ||
|
|
||
| TEST(FileWriterTest, WriteResultStructure) { | ||
| FileWriter::WriteResult result; | ||
|
|
||
| // Test that WriteResult can hold multiple data files | ||
| auto data_file1 = std::make_shared<DataFile>(); | ||
| data_file1->file_path = "/test/data/file1.parquet"; | ||
| data_file1->record_count = 100; | ||
|
|
||
| auto data_file2 = std::make_shared<DataFile>(); | ||
| data_file2->file_path = "/test/data/file2.parquet"; | ||
| data_file2->record_count = 200; | ||
|
|
||
| result.data_files.push_back(data_file1); | ||
| result.data_files.push_back(data_file2); | ||
|
|
||
| ASSERT_EQ(result.data_files.size(), 2); | ||
| ASSERT_EQ(result.data_files[0]->file_path, "/test/data/file1.parquet"); | ||
| ASSERT_EQ(result.data_files[0]->record_count, 100); | ||
| ASSERT_EQ(result.data_files[1]->file_path, "/test/data/file2.parquet"); | ||
| ASSERT_EQ(result.data_files[1]->record_count, 200); | ||
| } | ||
|
|
||
| TEST(FileWriterTest, EmptyWriteResult) { | ||
| FileWriter::WriteResult result; | ||
| ASSERT_EQ(result.data_files.size(), 0); | ||
| ASSERT_TRUE(result.data_files.empty()); | ||
| } | ||
|
|
||
| } // namespace iceberg | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.