diff --git a/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java index 98b7ed6e55..a935552c8d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java @@ -57,7 +57,7 @@ public class ArrowWriter implements AutoCloseable { * The initial capacity of the vectors which are used to store the rows. The capacity will be * expanded automatically if the rows exceed the initial capacity. */ - private static final int INITIAL_CAPACITY = 1024; + public static final int INITIAL_CAPACITY = 1024; /** * The buffer usage ratio which is used to determine whether the writer is full. The writer is diff --git a/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java index 548c4e9562..3c9fbb54cb 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java @@ -20,6 +20,7 @@ import org.apache.fluss.row.DataGetters; import org.apache.fluss.row.InternalArray; +import org.apache.fluss.row.arrow.ArrowWriter; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector; @@ -41,7 +42,12 @@ public void doWrite(int rowIndex, DataGetters row, int ordinal, boolean handleSa listVector.startNewValue(rowIndex); for (int arrIndex = 0; arrIndex < array.size(); arrIndex++) { int fieldIndex = offset + arrIndex; - elementWriter.write(fieldIndex, array, arrIndex, handleSafe); + // Always use safe writes for array elements because the element index (offset + + // arrIndex) can exceed INITIAL_CAPACITY even when the row count doesn't. The parent's + // handleSafe is based on row count, but array element indices grow based on the total + // number of elements across all arrays, which can be much larger. + boolean elementHandleSafe = fieldIndex >= ArrowWriter.INITIAL_CAPACITY; + elementWriter.write(fieldIndex, array, arrIndex, elementHandleSafe); } offset += array.size(); listVector.endValue(rowIndex, array.size()); diff --git a/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java b/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java index fa8d14b3d2..001eb4e886 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java @@ -262,4 +262,70 @@ void testWriterExceedMaxSizeInBytes() { "The arrow batch size is full and it shouldn't accept writing new rows, it's a bug."); } } + + /** + * Tests that array columns work correctly when the total number of array elements exceeds + * INITIAL_CAPACITY (1024) while the row count stays below it. This reproduces a bug where + * ArrowArrayWriter used the parent's handleSafe flag (based on row count) for element writes, + * causing IndexOutOfBoundsException when element indices exceeded the vector's initial + * capacity. + */ + @Test + void testArrayWriterWithManyElements() throws IOException { + // Schema with array column + RowType rowType = + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("arr", DataTypes.ARRAY(DataTypes.INT()))); + + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + VectorSchemaRoot root = + VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator); + ArrowWriterPool provider = new ArrowWriterPool(allocator); + ArrowWriter writer = + provider.getOrCreateWriter( + 1L, 1, Integer.MAX_VALUE, rowType, NO_COMPRESSION)) { + + // Write 200 rows, each with a 10-element array. + // Total elements = 2000, exceeding INITIAL_CAPACITY (1024). + // But row count (200) < 1024, so handleSafe would be false without the fix. + int numRows = 200; + int arraySize = 10; + for (int i = 0; i < numRows; i++) { + Integer[] elements = new Integer[arraySize]; + for (int j = 0; j < arraySize; j++) { + elements[j] = i * arraySize + j; + } + writer.writeRow(GenericRow.of(i, GenericArray.of(elements))); + } + + // Verify serialization works without IndexOutOfBoundsException + AbstractPagedOutputView pagedOutputView = + new ManagedPagedOutputView(new TestingMemorySegmentPool(64 * 1024)); + int size = + writer.serializeToOutputView( + pagedOutputView, arrowChangeTypeOffset(CURRENT_LOG_MAGIC_VALUE)); + assertThat(size).isGreaterThan(0); + + // Verify the data can be read back correctly + int heapMemorySize = Math.max(size, writer.estimatedSizeInBytes()); + MemorySegment segment = MemorySegment.allocateHeapMemory(heapMemorySize); + MemorySegment firstSegment = pagedOutputView.getCurrentSegment(); + firstSegment.copyTo(arrowChangeTypeOffset(CURRENT_LOG_MAGIC_VALUE), segment, 0, size); + + ArrowReader reader = + ArrowUtils.createArrowReader(segment, 0, size, root, allocator, rowType); + assertThat(reader.getRowCount()).isEqualTo(numRows); + + for (int i = 0; i < numRows; i++) { + ColumnarRow row = reader.read(i); + row.setRowId(i); + assertThat(row.getInt(0)).isEqualTo(i); + assertThat(row.getArray(1).size()).isEqualTo(arraySize); + for (int j = 0; j < arraySize; j++) { + assertThat(row.getArray(1).getInt(j)).isEqualTo(i * arraySize + j); + } + } + } + } }