Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class ArrowWriter implements AutoCloseable {
* The initial capacity of the vectors which are used to store the rows. The capacity will be
* expanded automatically if the rows exceed the initial capacity.
*/
private static final int INITIAL_CAPACITY = 1024;
public static final int INITIAL_CAPACITY = 1024;

/**
* The buffer usage ratio which is used to determine whether the writer is full. The writer is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.fluss.row.DataGetters;
import org.apache.fluss.row.InternalArray;
import org.apache.fluss.row.arrow.ArrowWriter;
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector;
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector;

Expand All @@ -41,7 +42,12 @@ public void doWrite(int rowIndex, DataGetters row, int ordinal, boolean handleSa
listVector.startNewValue(rowIndex);
for (int arrIndex = 0; arrIndex < array.size(); arrIndex++) {
int fieldIndex = offset + arrIndex;
elementWriter.write(fieldIndex, array, arrIndex, handleSafe);
// Always use safe writes for array elements because the element index (offset +
// arrIndex) can exceed INITIAL_CAPACITY even when the row count doesn't. The parent's
// handleSafe is based on row count, but array element indices grow based on the total
// number of elements across all arrays, which can be much larger.
boolean elementHandleSafe = fieldIndex >= ArrowWriter.INITIAL_CAPACITY;
elementWriter.write(fieldIndex, array, arrIndex, elementHandleSafe);
}
offset += array.size();
listVector.endValue(rowIndex, array.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,70 @@ void testWriterExceedMaxSizeInBytes() {
"The arrow batch size is full and it shouldn't accept writing new rows, it's a bug.");
}
}

/**
* Tests that array columns work correctly when the total number of array elements exceeds
* INITIAL_CAPACITY (1024) while the row count stays below it. This reproduces a bug where
* ArrowArrayWriter used the parent's handleSafe flag (based on row count) for element writes,
* causing IndexOutOfBoundsException when element indices exceeded the vector's initial
* capacity.
*/
@Test
void testArrayWriterWithManyElements() throws IOException {
// Schema with array column
RowType rowType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("arr", DataTypes.ARRAY(DataTypes.INT())));

try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
VectorSchemaRoot root =
VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
ArrowWriterPool provider = new ArrowWriterPool(allocator);
ArrowWriter writer =
provider.getOrCreateWriter(
1L, 1, Integer.MAX_VALUE, rowType, NO_COMPRESSION)) {

// Write 200 rows, each with a 10-element array.
// Total elements = 2000, exceeding INITIAL_CAPACITY (1024).
// But row count (200) < 1024, so handleSafe would be false without the fix.
int numRows = 200;
int arraySize = 10;
for (int i = 0; i < numRows; i++) {
Integer[] elements = new Integer[arraySize];
for (int j = 0; j < arraySize; j++) {
elements[j] = i * arraySize + j;
}
writer.writeRow(GenericRow.of(i, GenericArray.of(elements)));
}

// Verify serialization works without IndexOutOfBoundsException
AbstractPagedOutputView pagedOutputView =
new ManagedPagedOutputView(new TestingMemorySegmentPool(64 * 1024));
int size =
writer.serializeToOutputView(
pagedOutputView, arrowChangeTypeOffset(CURRENT_LOG_MAGIC_VALUE));
assertThat(size).isGreaterThan(0);

// Verify the data can be read back correctly
int heapMemorySize = Math.max(size, writer.estimatedSizeInBytes());
MemorySegment segment = MemorySegment.allocateHeapMemory(heapMemorySize);
MemorySegment firstSegment = pagedOutputView.getCurrentSegment();
firstSegment.copyTo(arrowChangeTypeOffset(CURRENT_LOG_MAGIC_VALUE), segment, 0, size);

ArrowReader reader =
ArrowUtils.createArrowReader(segment, 0, size, root, allocator, rowType);
assertThat(reader.getRowCount()).isEqualTo(numRows);

for (int i = 0; i < numRows; i++) {
ColumnarRow row = reader.read(i);
row.setRowId(i);
assertThat(row.getInt(0)).isEqualTo(i);
assertThat(row.getArray(1).size()).isEqualTo(arraySize);
for (int j = 0; j < arraySize; j++) {
assertThat(row.getArray(1).getInt(j)).isEqualTo(i * arraySize + j);
}
}
}
}
}