From 34830c0207a649db0eee2e1aecd89bd208d72416 Mon Sep 17 00:00:00 2001 From: forwardxu Date: Thu, 25 Dec 2025 21:27:01 +0800 Subject: [PATCH 1/2] [FLUSS-2251] Support NestedRow types for paimon - Created FlussNestedRowAsPaimonRow adapter class to convert Fluss nested rows to Paimon nested rows - Implemented getRow method in FlussRowAsPaimonRow to support nested row fields in tables - Implemented getRow method in FlussArrayAsPaimonArray to support arrays of nested rows - Added comprehensive test cases covering: * Simple nested rows with primitive types * Deeply nested rows (row within row) * Arrays of nested rows * Nested rows with array fields * Nested rows with all primitive types * Null nested rows * Nested rows with nullable fields * Nested rows with decimal and timestamp fields --- .../source/FlussArrayAsPaimonArray.java | 7 +- .../paimon/source/FlussRowAsPaimonRow.java | 10 +- .../tiering/FlussRecordAsPaimonRowTest.java | 216 ++++++++++++++++++ .../paimon/tiering/PaimonTieringITCase.java | 84 +++++++ .../paimon/utils/PaimonRowAsFlussRowTest.java | 135 +++++++++++ 5 files changed, 447 insertions(+), 5 deletions(-) diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java index 37438582ba..9c0dc1d200 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java @@ -166,8 +166,11 @@ public InternalMap getMap(int pos) { @Override public InternalRow getRow(int pos, int numFields) { - throw new UnsupportedOperationException( - "getRow is not supported for Fluss array currently."); + org.apache.fluss.row.InternalRow nestedFlussRow = flussArray.getRow(pos, numFields); + return nestedFlussRow == null + ? null + : new FlussRowAsPaimonRow( + nestedFlussRow, (org.apache.paimon.types.RowType) elementType); } @Override diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java index e2df08cc3d..01026dbee6 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java @@ -175,8 +175,12 @@ public InternalMap getMap(int pos) { } @Override - public InternalRow getRow(int pos, int pos1) { - throw new UnsupportedOperationException( - "getRow is not support for Fluss record currently."); + public InternalRow getRow(int pos, int numFields) { + org.apache.fluss.row.InternalRow nestedFlussRow = internalRow.getRow(pos, numFields); + return nestedFlussRow == null + ? null + : new FlussRowAsPaimonRow( + nestedFlussRow, + (org.apache.paimon.types.RowType) tableRowType.getField(pos).type()); } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java index 369fc74984..76aa25841a 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java @@ -606,4 +606,220 @@ void testFlussRecordWiderThanPaimonSchema() { .hasMessageContaining( "Fluss record has 2 fields but Paimon schema only has 1 business fields"); } + + @Test + void testNestedRowType() { + int tableBucket = 0; + + // Build nested row types + org.apache.paimon.types.RowType simpleNestedRowType = + org.apache.paimon.types.RowType.builder() + .field("nested_int", new org.apache.paimon.types.IntType()) + .field("nested_string", new org.apache.paimon.types.VarCharType()) + .build(); + + org.apache.paimon.types.RowType allTypesNestedRowType = + org.apache.paimon.types.RowType.builder() + .field("bool_field", new org.apache.paimon.types.BooleanType()) + .field("byte_field", new org.apache.paimon.types.TinyIntType()) + .field("short_field", new org.apache.paimon.types.SmallIntType()) + .field("int_field", new org.apache.paimon.types.IntType()) + .field("long_field", new org.apache.paimon.types.BigIntType()) + .field("float_field", new org.apache.paimon.types.FloatType()) + .field("double_field", new org.apache.paimon.types.DoubleType()) + .field("string_field", new org.apache.paimon.types.VarCharType()) + .build(); + + org.apache.paimon.types.RowType decimalTimestampRowType = + org.apache.paimon.types.RowType.builder() + .field("decimal_field", new org.apache.paimon.types.DecimalType(10, 2)) + .field("timestamp_field", new org.apache.paimon.types.TimestampType(3)) + .build(); + + org.apache.paimon.types.RowType innerRowType = + org.apache.paimon.types.RowType.builder() + .field("inner_value", new org.apache.paimon.types.IntType()) + .build(); + + org.apache.paimon.types.RowType middleRowType = + org.apache.paimon.types.RowType.builder() + .field("middle_int", new org.apache.paimon.types.IntType()) + .field("inner_row", innerRowType) + .build(); + + org.apache.paimon.types.RowType rowWithArrayType = + org.apache.paimon.types.RowType.builder() + .field("id", new org.apache.paimon.types.IntType()) + .field( + "values", + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.IntType())) + .build(); + + org.apache.paimon.types.RowType arrayElementRowType = + org.apache.paimon.types.RowType.builder() + .field("id", new org.apache.paimon.types.IntType()) + .field("name", new org.apache.paimon.types.VarCharType()) + .build(); + + org.apache.paimon.types.RowType nullableFieldsRowType = + org.apache.paimon.types.RowType.builder() + .field("id", new org.apache.paimon.types.IntType()) + .field( + "nullable_field", + new org.apache.paimon.types.VarCharType().nullable()) + .build(); + + RowType tableRowType = + RowType.of( + simpleNestedRowType, + allTypesNestedRowType, + decimalTimestampRowType, + middleRowType, + rowWithArrayType, + new org.apache.paimon.types.ArrayType(arrayElementRowType), + simpleNestedRowType.nullable(), + nullableFieldsRowType, + // system columns + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.LocalZonedTimestampType(3)); + + FlussRecordAsPaimonRow flussRecordAsPaimonRow = + new FlussRecordAsPaimonRow(tableBucket, tableRowType); + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(8); + + // Simple nested row + GenericRow simpleNestedRow = new GenericRow(2); + simpleNestedRow.setField(0, 100); + simpleNestedRow.setField(1, BinaryString.fromString("nested_value")); + genericRow.setField(0, simpleNestedRow); + + // Nested row with all primitive types + GenericRow allTypesNestedRow = new GenericRow(8); + allTypesNestedRow.setField(0, true); + allTypesNestedRow.setField(1, (byte) 127); + allTypesNestedRow.setField(2, (short) 32000); + allTypesNestedRow.setField(3, 2147483647); + allTypesNestedRow.setField(4, 9223372036854775807L); + allTypesNestedRow.setField(5, 3.14f); + allTypesNestedRow.setField(6, 2.718281828); + allTypesNestedRow.setField(7, BinaryString.fromString("test_string")); + genericRow.setField(1, allTypesNestedRow); + + // Nested row with decimal and timestamp + GenericRow decimalTimestampRow = new GenericRow(2); + decimalTimestampRow.setField(0, Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2)); + decimalTimestampRow.setField(1, TimestampNtz.fromMillis(1698235273182L)); + genericRow.setField(2, decimalTimestampRow); + + // Deeply nested row + GenericRow innerRow = new GenericRow(1); + innerRow.setField(0, 999); + GenericRow middleRow = new GenericRow(2); + middleRow.setField(0, 500); + middleRow.setField(1, innerRow); + genericRow.setField(3, middleRow); + + // Nested row with array field + GenericRow nestedRowWithArray = new GenericRow(2); + nestedRowWithArray.setField(0, 123); + nestedRowWithArray.setField(1, new GenericArray(new int[] {10, 20, 30})); + genericRow.setField(4, nestedRowWithArray); + + // Array of nested rows + GenericRow arrayRow1 = new GenericRow(2); + arrayRow1.setField(0, 1); + arrayRow1.setField(1, BinaryString.fromString("Alice")); + GenericRow arrayRow2 = new GenericRow(2); + arrayRow2.setField(0, 2); + arrayRow2.setField(1, BinaryString.fromString("Bob")); + genericRow.setField(5, new GenericArray(new Object[] {arrayRow1, arrayRow2})); + + // Null nested row + genericRow.setField(6, null); + + // Nested row with nullable fields + GenericRow nullableFieldsRow = new GenericRow(2); + nullableFieldsRow.setField(0, 42); + nullableFieldsRow.setField(1, null); + genericRow.setField(7, nullableFieldsRow); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + // Test simple nested row + org.apache.paimon.data.InternalRow paimonSimpleRow = flussRecordAsPaimonRow.getRow(0, 2); + assertThat(paimonSimpleRow).isNotNull(); + assertThat(paimonSimpleRow.getInt(0)).isEqualTo(100); + assertThat(paimonSimpleRow.getString(1).toString()).isEqualTo("nested_value"); + + // Test nested row with all primitive types + org.apache.paimon.data.InternalRow paimonAllTypesRow = flussRecordAsPaimonRow.getRow(1, 8); + assertThat(paimonAllTypesRow).isNotNull(); + assertThat(paimonAllTypesRow.getBoolean(0)).isTrue(); + assertThat(paimonAllTypesRow.getByte(1)).isEqualTo((byte) 127); + assertThat(paimonAllTypesRow.getShort(2)).isEqualTo((short) 32000); + assertThat(paimonAllTypesRow.getInt(3)).isEqualTo(2147483647); + assertThat(paimonAllTypesRow.getLong(4)).isEqualTo(9223372036854775807L); + assertThat(paimonAllTypesRow.getFloat(5)).isEqualTo(3.14f); + assertThat(paimonAllTypesRow.getDouble(6)).isEqualTo(2.718281828); + assertThat(paimonAllTypesRow.getString(7).toString()).isEqualTo("test_string"); + + // Test nested row with decimal and timestamp + org.apache.paimon.data.InternalRow paimonDecimalTimestampRow = + flussRecordAsPaimonRow.getRow(2, 2); + assertThat(paimonDecimalTimestampRow).isNotNull(); + assertThat(paimonDecimalTimestampRow.getDecimal(0, 10, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("123.45")); + assertThat(paimonDecimalTimestampRow.getTimestamp(1, 3).getMillisecond()) + .isEqualTo(1698235273182L); + + // Test deeply nested row + org.apache.paimon.data.InternalRow paimonMiddleRow = flussRecordAsPaimonRow.getRow(3, 2); + assertThat(paimonMiddleRow).isNotNull(); + assertThat(paimonMiddleRow.getInt(0)).isEqualTo(500); + org.apache.paimon.data.InternalRow paimonInnerRow = paimonMiddleRow.getRow(1, 1); + assertThat(paimonInnerRow).isNotNull(); + assertThat(paimonInnerRow.getInt(0)).isEqualTo(999); + + // Test nested row with array field + org.apache.paimon.data.InternalRow paimonRowWithArray = flussRecordAsPaimonRow.getRow(4, 2); + assertThat(paimonRowWithArray).isNotNull(); + assertThat(paimonRowWithArray.getInt(0)).isEqualTo(123); + org.apache.paimon.data.InternalArray arrayInRow = paimonRowWithArray.getArray(1); + assertThat(arrayInRow).isNotNull(); + assertThat(arrayInRow.size()).isEqualTo(3); + assertThat(arrayInRow.getInt(0)).isEqualTo(10); + assertThat(arrayInRow.getInt(1)).isEqualTo(20); + assertThat(arrayInRow.getInt(2)).isEqualTo(30); + + // Test array of nested rows + org.apache.paimon.data.InternalArray arrayOfRows = flussRecordAsPaimonRow.getArray(5); + assertThat(arrayOfRows).isNotNull(); + assertThat(arrayOfRows.size()).isEqualTo(2); + org.apache.paimon.data.InternalRow paimonRow1 = arrayOfRows.getRow(0, 2); + assertThat(paimonRow1.getInt(0)).isEqualTo(1); + assertThat(paimonRow1.getString(1).toString()).isEqualTo("Alice"); + org.apache.paimon.data.InternalRow paimonRow2 = arrayOfRows.getRow(1, 2); + assertThat(paimonRow2.getInt(0)).isEqualTo(2); + assertThat(paimonRow2.getString(1).toString()).isEqualTo("Bob"); + + // Test null nested row + assertThat(flussRecordAsPaimonRow.isNullAt(6)).isTrue(); + + // Test nested row with nullable fields + org.apache.paimon.data.InternalRow paimonNullableFieldsRow = + flussRecordAsPaimonRow.getRow(7, 2); + assertThat(paimonNullableFieldsRow).isNotNull(); + assertThat(paimonNullableFieldsRow.getInt(0)).isEqualTo(42); + assertThat(paimonNullableFieldsRow.isNullAt(1)).isTrue(); + + // Verify system columns + assertThat(flussRecordAsPaimonRow.getInt(8)).isEqualTo(tableBucket); + assertThat(flussRecordAsPaimonRow.getLong(9)).isEqualTo(logOffset); + assertThat(flussRecordAsPaimonRow.getLong(10)).isEqualTo(timeStamp); + } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java index 73aba60cba..698941cb11 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java @@ -50,6 +50,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; @@ -489,6 +490,89 @@ private String getPartitionOffsetStr(Map partitionNameByIds) { return "[" + String.join(",", partitionOffsetStrs) + "]"; } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testTieringForNestedRow(boolean isPrimaryKeyTable) throws Exception { + TablePath t1 = + TablePath.of( + DEFAULT_DB, + isPrimaryKeyTable ? "pkTableForNestedRow" : "logTableForNestedRow"); + + Schema.Builder builder = + Schema.newBuilder() + .column("c0", DataTypes.INT()) + .column("c1", DataTypes.STRING()) + .column( + "c2", + DataTypes.ROW( + DataTypes.FIELD("nested_int", DataTypes.INT()), + DataTypes.FIELD("nested_string", DataTypes.STRING()), + DataTypes.FIELD("nested_double", DataTypes.DOUBLE()))); + + if (isPrimaryKeyTable) { + builder.primaryKey("c0", "c1"); + } + + TableDescriptor.Builder tableDescriptor = + TableDescriptor.builder() + .schema(builder.build()) + .distributedBy(1, "c0") + .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") + .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)); + tableDescriptor.customProperties(Collections.emptyMap()); + tableDescriptor.properties(Collections.emptyMap()); + long t1Id = createTable(t1, tableDescriptor.build()); + TableBucket t1Bucket = new TableBucket(t1Id, 0); + + List rows = + Collections.singletonList( + row( + builder.build().getRowType(), + 1, + "outer_value", + new Object[] {100, "nested_value", 3.14})); + writeRows(t1, rows, !isPrimaryKeyTable); + + if (isPrimaryKeyTable) { + waitUntilSnapshot(t1Id, 1, 0); + } + + JobClient jobClient = buildTieringJob(execEnv); + + try { + assertReplicaStatus(t1Bucket, 1); + + Iterator paimonRowIterator = + getPaimonRowCloseableIterator(t1); + for (InternalRow expectedRow : rows) { + org.apache.paimon.data.InternalRow row = paimonRowIterator.next(); + assertThat(row.getInt(0)).isEqualTo(expectedRow.getInt(0)); + assertThat(row.getString(1).toString()) + .isEqualTo(expectedRow.getString(1).toString()); + + org.apache.paimon.data.InternalRow paimonNestedRow = row.getRow(2, 3); + assertThat(paimonNestedRow).isNotNull(); + org.apache.fluss.row.InternalRow flussNestedRow = expectedRow.getRow(2, 3); + assertThat(paimonNestedRow.getInt(0)).isEqualTo(flussNestedRow.getInt(0)); + assertThat(paimonNestedRow.getString(1).toString()) + .isEqualTo(flussNestedRow.getString(1).toString()); + assertThat(paimonNestedRow.getDouble(2)).isEqualTo(flussNestedRow.getDouble(2)); + } + + Map properties = + new HashMap() { + { + put( + FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, + "[{\"bucket\":0,\"offset\":1}]"); + } + }; + checkSnapshotPropertyInPaimon(t1, properties); + } finally { + jobClient.cancel().get(); + } + } + @Test void testTieringToDvEnabledTable() throws Exception { TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableWithDv"); diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java index 3c1e2c5519..35b64d17ab 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java @@ -18,6 +18,7 @@ package org.apache.fluss.lake.paimon.utils; import org.apache.fluss.row.InternalArray; +import org.apache.fluss.row.InternalRow; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; @@ -195,4 +196,138 @@ void testArrayWithAllTypes() { InternalArray innerArray2 = outerArray.getArray(1); assertThat(innerArray2.toIntArray()).isEqualTo(new int[] {3, 4, 5}); } + + @Test + void testNestedRowWithAllTypes() { + GenericRow paimonRow = new GenericRow(10); + + // Simple nested row with primitive types + GenericRow simpleNestedRow = new GenericRow(3); + simpleNestedRow.setField(0, 100); + simpleNestedRow.setField(1, BinaryString.fromString("nested_value")); + simpleNestedRow.setField(2, true); + paimonRow.setField(0, simpleNestedRow); + + // Nested row with all primitive types + GenericRow allTypesNestedRow = new GenericRow(8); + allTypesNestedRow.setField(0, true); + allTypesNestedRow.setField(1, (byte) 127); + allTypesNestedRow.setField(2, (short) 32000); + allTypesNestedRow.setField(3, 2147483647); + allTypesNestedRow.setField(4, 9223372036854775807L); + allTypesNestedRow.setField(5, 3.14f); + allTypesNestedRow.setField(6, 2.718281828); + allTypesNestedRow.setField(7, BinaryString.fromString("test_string")); + paimonRow.setField(1, allTypesNestedRow); + + // Nested row with decimal and timestamp + GenericRow decimalTimestampRow = new GenericRow(2); + decimalTimestampRow.setField(0, Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2)); + decimalTimestampRow.setField(1, Timestamp.fromEpochMillis(1698235273182L)); + paimonRow.setField(2, decimalTimestampRow); + + // Deeply nested row + GenericRow innerRow = new GenericRow(1); + innerRow.setField(0, 999); + GenericRow middleRow = new GenericRow(2); + middleRow.setField(0, 500); + middleRow.setField(1, innerRow); + paimonRow.setField(3, middleRow); + + // Nested row with array field + GenericRow rowWithArray = new GenericRow(2); + rowWithArray.setField(0, 123); + rowWithArray.setField(1, new GenericArray(new int[] {10, 20, 30})); + paimonRow.setField(4, rowWithArray); + + // Array of nested rows + GenericRow arrayRow1 = new GenericRow(2); + arrayRow1.setField(0, 1); + arrayRow1.setField(1, BinaryString.fromString("Alice")); + GenericRow arrayRow2 = new GenericRow(2); + arrayRow2.setField(0, 2); + arrayRow2.setField(1, BinaryString.fromString("Bob")); + paimonRow.setField(5, new GenericArray(new Object[] {arrayRow1, arrayRow2})); + + // Null nested row + paimonRow.setField(6, null); + + // Nested row with nullable fields + GenericRow rowWithNulls = new GenericRow(2); + rowWithNulls.setField(0, 42); + rowWithNulls.setField(1, null); + paimonRow.setField(7, rowWithNulls); + + // System columns + paimonRow.setField(8, 0); + paimonRow.setField(9, 0L); + + PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); + + // Test simple nested row + InternalRow simpleRow = flussRow.getRow(0, 3); + assertThat(simpleRow).isNotNull(); + assertThat(simpleRow.getInt(0)).isEqualTo(100); + assertThat(simpleRow.getString(1).toString()).isEqualTo("nested_value"); + assertThat(simpleRow.getBoolean(2)).isTrue(); + + // Test nested row with all primitive types + InternalRow allTypesRow = flussRow.getRow(1, 8); + assertThat(allTypesRow).isNotNull(); + assertThat(allTypesRow.getBoolean(0)).isTrue(); + assertThat(allTypesRow.getByte(1)).isEqualTo((byte) 127); + assertThat(allTypesRow.getShort(2)).isEqualTo((short) 32000); + assertThat(allTypesRow.getInt(3)).isEqualTo(2147483647); + assertThat(allTypesRow.getLong(4)).isEqualTo(9223372036854775807L); + assertThat(allTypesRow.getFloat(5)).isEqualTo(3.14f); + assertThat(allTypesRow.getDouble(6)).isEqualTo(2.718281828); + assertThat(allTypesRow.getString(7).toString()).isEqualTo("test_string"); + + // Test nested row with decimal and timestamp + InternalRow decimalTimestampFlussRow = flussRow.getRow(2, 2); + assertThat(decimalTimestampFlussRow).isNotNull(); + assertThat(decimalTimestampFlussRow.getDecimal(0, 10, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("123.45")); + assertThat(decimalTimestampFlussRow.getTimestampNtz(1, 3).getMillisecond()) + .isEqualTo(1698235273182L); + + // Test deeply nested row + InternalRow middleFlussRow = flussRow.getRow(3, 2); + assertThat(middleFlussRow).isNotNull(); + assertThat(middleFlussRow.getInt(0)).isEqualTo(500); + InternalRow innerFlussRow = middleFlussRow.getRow(1, 1); + assertThat(innerFlussRow).isNotNull(); + assertThat(innerFlussRow.getInt(0)).isEqualTo(999); + + // Test nested row with array field + InternalRow rowWithArrayFluss = flussRow.getRow(4, 2); + assertThat(rowWithArrayFluss).isNotNull(); + assertThat(rowWithArrayFluss.getInt(0)).isEqualTo(123); + InternalArray arrayInRow = rowWithArrayFluss.getArray(1); + assertThat(arrayInRow).isNotNull(); + assertThat(arrayInRow.size()).isEqualTo(3); + assertThat(arrayInRow.getInt(0)).isEqualTo(10); + assertThat(arrayInRow.getInt(1)).isEqualTo(20); + assertThat(arrayInRow.getInt(2)).isEqualTo(30); + + // Test array of nested rows + InternalArray arrayOfRows = flussRow.getArray(5); + assertThat(arrayOfRows).isNotNull(); + assertThat(arrayOfRows.size()).isEqualTo(2); + InternalRow row1 = arrayOfRows.getRow(0, 2); + assertThat(row1.getInt(0)).isEqualTo(1); + assertThat(row1.getString(1).toString()).isEqualTo("Alice"); + InternalRow row2 = arrayOfRows.getRow(1, 2); + assertThat(row2.getInt(0)).isEqualTo(2); + assertThat(row2.getString(1).toString()).isEqualTo("Bob"); + + // Test null nested row + assertThat(flussRow.isNullAt(6)).isTrue(); + + // Test nested row with nullable fields + InternalRow rowWithNullsFluss = flussRow.getRow(7, 2); + assertThat(rowWithNullsFluss).isNotNull(); + assertThat(rowWithNullsFluss.getInt(0)).isEqualTo(42); + assertThat(rowWithNullsFluss.isNullAt(1)).isTrue(); + } } From c432f366a064138a52ca26db19e1375f6b3495c9 Mon Sep 17 00:00:00 2001 From: forwardxu Date: Sun, 28 Dec 2025 17:45:59 +0800 Subject: [PATCH 2/2] [FLUSS-2251] Support NestedRow types for paimon --- .../FlinkUnionReadPrimaryKeyTableITCase.java | 41 +++++++-- .../paimon/tiering/PaimonTieringITCase.java | 84 ------------------- 2 files changed, 33 insertions(+), 92 deletions(-) diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java index df57974bff..62d5d9df14 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java @@ -133,7 +133,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { String partitionName = isPartitioned ? waitUntilPartitions(t1).values().iterator().next() : null; if (partitionName != null) { - queryFilterStr = queryFilterStr + " and c17= '" + partitionName + "'"; + queryFilterStr = queryFilterStr + " and c18= '" + partitionName + "'"; } List expectedRows = new ArrayList<>(); @@ -157,6 +157,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(100, "nested_value_1", 3.14), partition)); expectedRows.add( @@ -177,6 +178,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(200, "nested_value_2", 6.28), partition)); } } else { @@ -199,6 +201,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(100, "nested_value_1", 3.14), null), Row.of( true, @@ -217,6 +220,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(200, "nested_value_2", 6.28), null)); } tableResult = @@ -230,7 +234,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { row -> { boolean isMatch = row.getField(3).equals(30); if (partitionName != null) { - isMatch = isMatch && row.getField(16).equals(partitionName); + isMatch = isMatch && row.getField(17).equals(partitionName); } return isMatch; }) @@ -306,6 +310,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(100, "nested_value_1", 3.14), partition)); expectedRows.add( @@ -326,6 +331,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, new float[] {2.1f, 2.2f, 2.3f}, + Row.of(300, "nested_value_3", 9.99), partition)); } } else { @@ -348,6 +354,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(100, "nested_value_1", 3.14), null), Row.of( true, @@ -366,6 +373,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, new float[] {2.1f, 2.2f, 2.3f}, + Row.of(300, "nested_value_3", 9.99), null)); } @@ -483,6 +491,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(100, "nested_value_1", 3.14), partition)); expectedRows.add( Row.of( @@ -502,6 +511,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(200, "nested_value_2", 6.28), partition)); } } else { @@ -524,6 +534,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(100, "nested_value_1", 3.14), null), Row.of( true, @@ -542,6 +553,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(200, "nested_value_2", 6.28), null)); } @@ -585,6 +597,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(200, "nested_value_2", 6.28), partition)); expectedRows2.add( Row.ofKind( @@ -605,6 +618,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, new float[] {2.1f, 2.2f, 2.3f}, + Row.of(300, "nested_value_3", 9.99), partition)); } } else { @@ -627,6 +641,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(200, "nested_value_2", 6.28), null)); expectedRows2.add( Row.ofKind( @@ -647,6 +662,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, new float[] {2.1f, 2.2f, 2.3f}, + Row.of(300, "nested_value_3", 9.99), null)); } @@ -979,21 +995,21 @@ private String buildExpectedResult(boolean isPartitioned, int record1, int recor + "2023-10-25T12:01:13.182005Z, " + "2023-10-25T12:01:13.183, " + "2023-10-25T12:01:13.183006, " - + "[1, 2, 3, 4], [1.1, 1.2, 1.3], %s]"); + + "[1, 2, 3, 4], [1.1, 1.2, 1.3], +I[100, nested_value_1, 3.14], %s]"); records.add( "+I[true, 10, 20, 30, 40, 50.1, 60.0, another_string, 0.90, 100, " + "2023-10-25T12:01:13.200Z, " + "2023-10-25T12:01:13.200005Z, " + "2023-10-25T12:01:13.201, " + "2023-10-25T12:01:13.201006, " - + "[1, 2, 3, 4], [1.1, 1.2, 1.3], %s]"); + + "[1, 2, 3, 4], [1.1, 1.2, 1.3], +I[200, nested_value_2, 6.28], %s]"); records.add( "+I[true, 100, 200, 30, 400, 500.1, 600.0, another_string_2, 9.00, 1000, " + "2023-10-25T12:01:13.400Z, " + "2023-10-25T12:01:13.400007Z, " + "2023-10-25T12:01:13.501, " + "2023-10-25T12:01:13.501008, " - + "[5, 6, 7, 8], [2.1, 2.2, 2.3], %s]"); + + "[5, 6, 7, 8], [2.1, 2.2, 2.3], +I[300, nested_value_3, 9.99], %s]"); if (isPartitioned) { return String.format( @@ -1030,9 +1046,15 @@ protected long createPkTableFullType(TablePath tablePath, int bucketNum, boolean .column("c14", DataTypes.TIMESTAMP(6)) .column("c15", DataTypes.BINARY(4)) .column("c16", DataTypes.ARRAY(DataTypes.FLOAT())) - .column("c17", DataTypes.STRING()); - - return createPkTable(tablePath, bucketNum, isPartitioned, true, schemaBuilder, "c4", "c17"); + .column( + "c17", + DataTypes.ROW( + DataTypes.FIELD("nested_int", DataTypes.INT()), + DataTypes.FIELD("nested_string", DataTypes.STRING()), + DataTypes.FIELD("nested_double", DataTypes.DOUBLE()))) + .column("c18", DataTypes.STRING()); + + return createPkTable(tablePath, bucketNum, isPartitioned, true, schemaBuilder, "c4", "c18"); } protected long createSimplePkTable( @@ -1098,6 +1120,7 @@ private void writeFullTypeRow(TablePath tablePath, String partition) throws Exce TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, new GenericArray(new float[] {2.1f, 2.2f, 2.3f}), + GenericRow.of(300, BinaryString.fromString("nested_value_3"), 9.99), partition)); writeRows(tablePath, rows, false); } @@ -1121,6 +1144,7 @@ private static List generateKvRowsFullType(@Nullable String partiti TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, new GenericArray(new float[] {1.1f, 1.2f, 1.3f}), + GenericRow.of(100, BinaryString.fromString("nested_value_1"), 3.14), partition), row( true, @@ -1139,6 +1163,7 @@ private static List generateKvRowsFullType(@Nullable String partiti TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, new GenericArray(new float[] {1.1f, 1.2f, 1.3f}), + GenericRow.of(200, BinaryString.fromString("nested_value_2"), 6.28), partition)); } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java index 698941cb11..73aba60cba 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java @@ -50,7 +50,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; @@ -490,89 +489,6 @@ private String getPartitionOffsetStr(Map partitionNameByIds) { return "[" + String.join(",", partitionOffsetStrs) + "]"; } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testTieringForNestedRow(boolean isPrimaryKeyTable) throws Exception { - TablePath t1 = - TablePath.of( - DEFAULT_DB, - isPrimaryKeyTable ? "pkTableForNestedRow" : "logTableForNestedRow"); - - Schema.Builder builder = - Schema.newBuilder() - .column("c0", DataTypes.INT()) - .column("c1", DataTypes.STRING()) - .column( - "c2", - DataTypes.ROW( - DataTypes.FIELD("nested_int", DataTypes.INT()), - DataTypes.FIELD("nested_string", DataTypes.STRING()), - DataTypes.FIELD("nested_double", DataTypes.DOUBLE()))); - - if (isPrimaryKeyTable) { - builder.primaryKey("c0", "c1"); - } - - TableDescriptor.Builder tableDescriptor = - TableDescriptor.builder() - .schema(builder.build()) - .distributedBy(1, "c0") - .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") - .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)); - tableDescriptor.customProperties(Collections.emptyMap()); - tableDescriptor.properties(Collections.emptyMap()); - long t1Id = createTable(t1, tableDescriptor.build()); - TableBucket t1Bucket = new TableBucket(t1Id, 0); - - List rows = - Collections.singletonList( - row( - builder.build().getRowType(), - 1, - "outer_value", - new Object[] {100, "nested_value", 3.14})); - writeRows(t1, rows, !isPrimaryKeyTable); - - if (isPrimaryKeyTable) { - waitUntilSnapshot(t1Id, 1, 0); - } - - JobClient jobClient = buildTieringJob(execEnv); - - try { - assertReplicaStatus(t1Bucket, 1); - - Iterator paimonRowIterator = - getPaimonRowCloseableIterator(t1); - for (InternalRow expectedRow : rows) { - org.apache.paimon.data.InternalRow row = paimonRowIterator.next(); - assertThat(row.getInt(0)).isEqualTo(expectedRow.getInt(0)); - assertThat(row.getString(1).toString()) - .isEqualTo(expectedRow.getString(1).toString()); - - org.apache.paimon.data.InternalRow paimonNestedRow = row.getRow(2, 3); - assertThat(paimonNestedRow).isNotNull(); - org.apache.fluss.row.InternalRow flussNestedRow = expectedRow.getRow(2, 3); - assertThat(paimonNestedRow.getInt(0)).isEqualTo(flussNestedRow.getInt(0)); - assertThat(paimonNestedRow.getString(1).toString()) - .isEqualTo(flussNestedRow.getString(1).toString()); - assertThat(paimonNestedRow.getDouble(2)).isEqualTo(flussNestedRow.getDouble(2)); - } - - Map properties = - new HashMap() { - { - put( - FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, - "[{\"bucket\":0,\"offset\":1}]"); - } - }; - checkSnapshotPropertyInPaimon(t1, properties); - } finally { - jobClient.cancel().get(); - } - } - @Test void testTieringToDvEnabledTable() throws Exception { TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableWithDv");