diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java index 37438582ba..9c0dc1d200 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java @@ -166,8 +166,11 @@ public InternalMap getMap(int pos) { @Override public InternalRow getRow(int pos, int numFields) { - throw new UnsupportedOperationException( - "getRow is not supported for Fluss array currently."); + org.apache.fluss.row.InternalRow nestedFlussRow = flussArray.getRow(pos, numFields); + return nestedFlussRow == null + ? null + : new FlussRowAsPaimonRow( + nestedFlussRow, (org.apache.paimon.types.RowType) elementType); } @Override diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java index e2df08cc3d..01026dbee6 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java @@ -175,8 +175,12 @@ public InternalMap getMap(int pos) { } @Override - public InternalRow getRow(int pos, int pos1) { - throw new UnsupportedOperationException( - "getRow is not support for Fluss record currently."); + public InternalRow getRow(int pos, int numFields) { + org.apache.fluss.row.InternalRow nestedFlussRow = internalRow.getRow(pos, numFields); + return nestedFlussRow == null + ? null + : new FlussRowAsPaimonRow( + nestedFlussRow, + (org.apache.paimon.types.RowType) tableRowType.getField(pos).type()); } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java index df57974bff..62d5d9df14 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java @@ -133,7 +133,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { String partitionName = isPartitioned ? waitUntilPartitions(t1).values().iterator().next() : null; if (partitionName != null) { - queryFilterStr = queryFilterStr + " and c17= '" + partitionName + "'"; + queryFilterStr = queryFilterStr + " and c18= '" + partitionName + "'"; } List expectedRows = new ArrayList<>(); @@ -157,6 +157,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(100, "nested_value_1", 3.14), partition)); expectedRows.add( @@ -177,6 +178,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(200, "nested_value_2", 6.28), partition)); } } else { @@ -199,6 +201,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(100, "nested_value_1", 3.14), null), Row.of( true, @@ -217,6 +220,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(200, "nested_value_2", 6.28), null)); } tableResult = @@ -230,7 +234,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { row -> { boolean isMatch = row.getField(3).equals(30); if (partitionName != null) { - isMatch = isMatch && row.getField(16).equals(partitionName); + isMatch = isMatch && row.getField(17).equals(partitionName); } return isMatch; }) @@ -306,6 +310,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(100, "nested_value_1", 3.14), partition)); expectedRows.add( @@ -326,6 +331,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, new float[] {2.1f, 2.2f, 2.3f}, + Row.of(300, "nested_value_3", 9.99), partition)); } } else { @@ -348,6 +354,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(100, "nested_value_1", 3.14), null), Row.of( true, @@ -366,6 +373,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, new float[] {2.1f, 2.2f, 2.3f}, + Row.of(300, "nested_value_3", 9.99), null)); } @@ -483,6 +491,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(100, "nested_value_1", 3.14), partition)); expectedRows.add( Row.of( @@ -502,6 +511,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(200, "nested_value_2", 6.28), partition)); } } else { @@ -524,6 +534,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(100, "nested_value_1", 3.14), null), Row.of( true, @@ -542,6 +553,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(200, "nested_value_2", 6.28), null)); } @@ -585,6 +597,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(200, "nested_value_2", 6.28), partition)); expectedRows2.add( Row.ofKind( @@ -605,6 +618,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, new float[] {2.1f, 2.2f, 2.3f}, + Row.of(300, "nested_value_3", 9.99), partition)); } } else { @@ -627,6 +641,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, + Row.of(200, "nested_value_2", 6.28), null)); expectedRows2.add( Row.ofKind( @@ -647,6 +662,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, new float[] {2.1f, 2.2f, 2.3f}, + Row.of(300, "nested_value_3", 9.99), null)); } @@ -979,21 +995,21 @@ private String buildExpectedResult(boolean isPartitioned, int record1, int recor + "2023-10-25T12:01:13.182005Z, " + "2023-10-25T12:01:13.183, " + "2023-10-25T12:01:13.183006, " - + "[1, 2, 3, 4], [1.1, 1.2, 1.3], %s]"); + + "[1, 2, 3, 4], [1.1, 1.2, 1.3], +I[100, nested_value_1, 3.14], %s]"); records.add( "+I[true, 10, 20, 30, 40, 50.1, 60.0, another_string, 0.90, 100, " + "2023-10-25T12:01:13.200Z, " + "2023-10-25T12:01:13.200005Z, " + "2023-10-25T12:01:13.201, " + "2023-10-25T12:01:13.201006, " - + "[1, 2, 3, 4], [1.1, 1.2, 1.3], %s]"); + + "[1, 2, 3, 4], [1.1, 1.2, 1.3], +I[200, nested_value_2, 6.28], %s]"); records.add( "+I[true, 100, 200, 30, 400, 500.1, 600.0, another_string_2, 9.00, 1000, " + "2023-10-25T12:01:13.400Z, " + "2023-10-25T12:01:13.400007Z, " + "2023-10-25T12:01:13.501, " + "2023-10-25T12:01:13.501008, " - + "[5, 6, 7, 8], [2.1, 2.2, 2.3], %s]"); + + "[5, 6, 7, 8], [2.1, 2.2, 2.3], +I[300, nested_value_3, 9.99], %s]"); if (isPartitioned) { return String.format( @@ -1030,9 +1046,15 @@ protected long createPkTableFullType(TablePath tablePath, int bucketNum, boolean .column("c14", DataTypes.TIMESTAMP(6)) .column("c15", DataTypes.BINARY(4)) .column("c16", DataTypes.ARRAY(DataTypes.FLOAT())) - .column("c17", DataTypes.STRING()); - - return createPkTable(tablePath, bucketNum, isPartitioned, true, schemaBuilder, "c4", "c17"); + .column( + "c17", + DataTypes.ROW( + DataTypes.FIELD("nested_int", DataTypes.INT()), + DataTypes.FIELD("nested_string", DataTypes.STRING()), + DataTypes.FIELD("nested_double", DataTypes.DOUBLE()))) + .column("c18", DataTypes.STRING()); + + return createPkTable(tablePath, bucketNum, isPartitioned, true, schemaBuilder, "c4", "c18"); } protected long createSimplePkTable( @@ -1098,6 +1120,7 @@ private void writeFullTypeRow(TablePath tablePath, String partition) throws Exce TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, new GenericArray(new float[] {2.1f, 2.2f, 2.3f}), + GenericRow.of(300, BinaryString.fromString("nested_value_3"), 9.99), partition)); writeRows(tablePath, rows, false); } @@ -1121,6 +1144,7 @@ private static List generateKvRowsFullType(@Nullable String partiti TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, new GenericArray(new float[] {1.1f, 1.2f, 1.3f}), + GenericRow.of(100, BinaryString.fromString("nested_value_1"), 3.14), partition), row( true, @@ -1139,6 +1163,7 @@ private static List generateKvRowsFullType(@Nullable String partiti TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, new GenericArray(new float[] {1.1f, 1.2f, 1.3f}), + GenericRow.of(200, BinaryString.fromString("nested_value_2"), 6.28), partition)); } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java index 369fc74984..76aa25841a 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java @@ -606,4 +606,220 @@ void testFlussRecordWiderThanPaimonSchema() { .hasMessageContaining( "Fluss record has 2 fields but Paimon schema only has 1 business fields"); } + + @Test + void testNestedRowType() { + int tableBucket = 0; + + // Build nested row types + org.apache.paimon.types.RowType simpleNestedRowType = + org.apache.paimon.types.RowType.builder() + .field("nested_int", new org.apache.paimon.types.IntType()) + .field("nested_string", new org.apache.paimon.types.VarCharType()) + .build(); + + org.apache.paimon.types.RowType allTypesNestedRowType = + org.apache.paimon.types.RowType.builder() + .field("bool_field", new org.apache.paimon.types.BooleanType()) + .field("byte_field", new org.apache.paimon.types.TinyIntType()) + .field("short_field", new org.apache.paimon.types.SmallIntType()) + .field("int_field", new org.apache.paimon.types.IntType()) + .field("long_field", new org.apache.paimon.types.BigIntType()) + .field("float_field", new org.apache.paimon.types.FloatType()) + .field("double_field", new org.apache.paimon.types.DoubleType()) + .field("string_field", new org.apache.paimon.types.VarCharType()) + .build(); + + org.apache.paimon.types.RowType decimalTimestampRowType = + org.apache.paimon.types.RowType.builder() + .field("decimal_field", new org.apache.paimon.types.DecimalType(10, 2)) + .field("timestamp_field", new org.apache.paimon.types.TimestampType(3)) + .build(); + + org.apache.paimon.types.RowType innerRowType = + org.apache.paimon.types.RowType.builder() + .field("inner_value", new org.apache.paimon.types.IntType()) + .build(); + + org.apache.paimon.types.RowType middleRowType = + org.apache.paimon.types.RowType.builder() + .field("middle_int", new org.apache.paimon.types.IntType()) + .field("inner_row", innerRowType) + .build(); + + org.apache.paimon.types.RowType rowWithArrayType = + org.apache.paimon.types.RowType.builder() + .field("id", new org.apache.paimon.types.IntType()) + .field( + "values", + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.IntType())) + .build(); + + org.apache.paimon.types.RowType arrayElementRowType = + org.apache.paimon.types.RowType.builder() + .field("id", new org.apache.paimon.types.IntType()) + .field("name", new org.apache.paimon.types.VarCharType()) + .build(); + + org.apache.paimon.types.RowType nullableFieldsRowType = + org.apache.paimon.types.RowType.builder() + .field("id", new org.apache.paimon.types.IntType()) + .field( + "nullable_field", + new org.apache.paimon.types.VarCharType().nullable()) + .build(); + + RowType tableRowType = + RowType.of( + simpleNestedRowType, + allTypesNestedRowType, + decimalTimestampRowType, + middleRowType, + rowWithArrayType, + new org.apache.paimon.types.ArrayType(arrayElementRowType), + simpleNestedRowType.nullable(), + nullableFieldsRowType, + // system columns + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.LocalZonedTimestampType(3)); + + FlussRecordAsPaimonRow flussRecordAsPaimonRow = + new FlussRecordAsPaimonRow(tableBucket, tableRowType); + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(8); + + // Simple nested row + GenericRow simpleNestedRow = new GenericRow(2); + simpleNestedRow.setField(0, 100); + simpleNestedRow.setField(1, BinaryString.fromString("nested_value")); + genericRow.setField(0, simpleNestedRow); + + // Nested row with all primitive types + GenericRow allTypesNestedRow = new GenericRow(8); + allTypesNestedRow.setField(0, true); + allTypesNestedRow.setField(1, (byte) 127); + allTypesNestedRow.setField(2, (short) 32000); + allTypesNestedRow.setField(3, 2147483647); + allTypesNestedRow.setField(4, 9223372036854775807L); + allTypesNestedRow.setField(5, 3.14f); + allTypesNestedRow.setField(6, 2.718281828); + allTypesNestedRow.setField(7, BinaryString.fromString("test_string")); + genericRow.setField(1, allTypesNestedRow); + + // Nested row with decimal and timestamp + GenericRow decimalTimestampRow = new GenericRow(2); + decimalTimestampRow.setField(0, Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2)); + decimalTimestampRow.setField(1, TimestampNtz.fromMillis(1698235273182L)); + genericRow.setField(2, decimalTimestampRow); + + // Deeply nested row + GenericRow innerRow = new GenericRow(1); + innerRow.setField(0, 999); + GenericRow middleRow = new GenericRow(2); + middleRow.setField(0, 500); + middleRow.setField(1, innerRow); + genericRow.setField(3, middleRow); + + // Nested row with array field + GenericRow nestedRowWithArray = new GenericRow(2); + nestedRowWithArray.setField(0, 123); + nestedRowWithArray.setField(1, new GenericArray(new int[] {10, 20, 30})); + genericRow.setField(4, nestedRowWithArray); + + // Array of nested rows + GenericRow arrayRow1 = new GenericRow(2); + arrayRow1.setField(0, 1); + arrayRow1.setField(1, BinaryString.fromString("Alice")); + GenericRow arrayRow2 = new GenericRow(2); + arrayRow2.setField(0, 2); + arrayRow2.setField(1, BinaryString.fromString("Bob")); + genericRow.setField(5, new GenericArray(new Object[] {arrayRow1, arrayRow2})); + + // Null nested row + genericRow.setField(6, null); + + // Nested row with nullable fields + GenericRow nullableFieldsRow = new GenericRow(2); + nullableFieldsRow.setField(0, 42); + nullableFieldsRow.setField(1, null); + genericRow.setField(7, nullableFieldsRow); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + flussRecordAsPaimonRow.setFlussRecord(logRecord); + + // Test simple nested row + org.apache.paimon.data.InternalRow paimonSimpleRow = flussRecordAsPaimonRow.getRow(0, 2); + assertThat(paimonSimpleRow).isNotNull(); + assertThat(paimonSimpleRow.getInt(0)).isEqualTo(100); + assertThat(paimonSimpleRow.getString(1).toString()).isEqualTo("nested_value"); + + // Test nested row with all primitive types + org.apache.paimon.data.InternalRow paimonAllTypesRow = flussRecordAsPaimonRow.getRow(1, 8); + assertThat(paimonAllTypesRow).isNotNull(); + assertThat(paimonAllTypesRow.getBoolean(0)).isTrue(); + assertThat(paimonAllTypesRow.getByte(1)).isEqualTo((byte) 127); + assertThat(paimonAllTypesRow.getShort(2)).isEqualTo((short) 32000); + assertThat(paimonAllTypesRow.getInt(3)).isEqualTo(2147483647); + assertThat(paimonAllTypesRow.getLong(4)).isEqualTo(9223372036854775807L); + assertThat(paimonAllTypesRow.getFloat(5)).isEqualTo(3.14f); + assertThat(paimonAllTypesRow.getDouble(6)).isEqualTo(2.718281828); + assertThat(paimonAllTypesRow.getString(7).toString()).isEqualTo("test_string"); + + // Test nested row with decimal and timestamp + org.apache.paimon.data.InternalRow paimonDecimalTimestampRow = + flussRecordAsPaimonRow.getRow(2, 2); + assertThat(paimonDecimalTimestampRow).isNotNull(); + assertThat(paimonDecimalTimestampRow.getDecimal(0, 10, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("123.45")); + assertThat(paimonDecimalTimestampRow.getTimestamp(1, 3).getMillisecond()) + .isEqualTo(1698235273182L); + + // Test deeply nested row + org.apache.paimon.data.InternalRow paimonMiddleRow = flussRecordAsPaimonRow.getRow(3, 2); + assertThat(paimonMiddleRow).isNotNull(); + assertThat(paimonMiddleRow.getInt(0)).isEqualTo(500); + org.apache.paimon.data.InternalRow paimonInnerRow = paimonMiddleRow.getRow(1, 1); + assertThat(paimonInnerRow).isNotNull(); + assertThat(paimonInnerRow.getInt(0)).isEqualTo(999); + + // Test nested row with array field + org.apache.paimon.data.InternalRow paimonRowWithArray = flussRecordAsPaimonRow.getRow(4, 2); + assertThat(paimonRowWithArray).isNotNull(); + assertThat(paimonRowWithArray.getInt(0)).isEqualTo(123); + org.apache.paimon.data.InternalArray arrayInRow = paimonRowWithArray.getArray(1); + assertThat(arrayInRow).isNotNull(); + assertThat(arrayInRow.size()).isEqualTo(3); + assertThat(arrayInRow.getInt(0)).isEqualTo(10); + assertThat(arrayInRow.getInt(1)).isEqualTo(20); + assertThat(arrayInRow.getInt(2)).isEqualTo(30); + + // Test array of nested rows + org.apache.paimon.data.InternalArray arrayOfRows = flussRecordAsPaimonRow.getArray(5); + assertThat(arrayOfRows).isNotNull(); + assertThat(arrayOfRows.size()).isEqualTo(2); + org.apache.paimon.data.InternalRow paimonRow1 = arrayOfRows.getRow(0, 2); + assertThat(paimonRow1.getInt(0)).isEqualTo(1); + assertThat(paimonRow1.getString(1).toString()).isEqualTo("Alice"); + org.apache.paimon.data.InternalRow paimonRow2 = arrayOfRows.getRow(1, 2); + assertThat(paimonRow2.getInt(0)).isEqualTo(2); + assertThat(paimonRow2.getString(1).toString()).isEqualTo("Bob"); + + // Test null nested row + assertThat(flussRecordAsPaimonRow.isNullAt(6)).isTrue(); + + // Test nested row with nullable fields + org.apache.paimon.data.InternalRow paimonNullableFieldsRow = + flussRecordAsPaimonRow.getRow(7, 2); + assertThat(paimonNullableFieldsRow).isNotNull(); + assertThat(paimonNullableFieldsRow.getInt(0)).isEqualTo(42); + assertThat(paimonNullableFieldsRow.isNullAt(1)).isTrue(); + + // Verify system columns + assertThat(flussRecordAsPaimonRow.getInt(8)).isEqualTo(tableBucket); + assertThat(flussRecordAsPaimonRow.getLong(9)).isEqualTo(logOffset); + assertThat(flussRecordAsPaimonRow.getLong(10)).isEqualTo(timeStamp); + } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java index 3c1e2c5519..35b64d17ab 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java @@ -18,6 +18,7 @@ package org.apache.fluss.lake.paimon.utils; import org.apache.fluss.row.InternalArray; +import org.apache.fluss.row.InternalRow; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; @@ -195,4 +196,138 @@ void testArrayWithAllTypes() { InternalArray innerArray2 = outerArray.getArray(1); assertThat(innerArray2.toIntArray()).isEqualTo(new int[] {3, 4, 5}); } + + @Test + void testNestedRowWithAllTypes() { + GenericRow paimonRow = new GenericRow(10); + + // Simple nested row with primitive types + GenericRow simpleNestedRow = new GenericRow(3); + simpleNestedRow.setField(0, 100); + simpleNestedRow.setField(1, BinaryString.fromString("nested_value")); + simpleNestedRow.setField(2, true); + paimonRow.setField(0, simpleNestedRow); + + // Nested row with all primitive types + GenericRow allTypesNestedRow = new GenericRow(8); + allTypesNestedRow.setField(0, true); + allTypesNestedRow.setField(1, (byte) 127); + allTypesNestedRow.setField(2, (short) 32000); + allTypesNestedRow.setField(3, 2147483647); + allTypesNestedRow.setField(4, 9223372036854775807L); + allTypesNestedRow.setField(5, 3.14f); + allTypesNestedRow.setField(6, 2.718281828); + allTypesNestedRow.setField(7, BinaryString.fromString("test_string")); + paimonRow.setField(1, allTypesNestedRow); + + // Nested row with decimal and timestamp + GenericRow decimalTimestampRow = new GenericRow(2); + decimalTimestampRow.setField(0, Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2)); + decimalTimestampRow.setField(1, Timestamp.fromEpochMillis(1698235273182L)); + paimonRow.setField(2, decimalTimestampRow); + + // Deeply nested row + GenericRow innerRow = new GenericRow(1); + innerRow.setField(0, 999); + GenericRow middleRow = new GenericRow(2); + middleRow.setField(0, 500); + middleRow.setField(1, innerRow); + paimonRow.setField(3, middleRow); + + // Nested row with array field + GenericRow rowWithArray = new GenericRow(2); + rowWithArray.setField(0, 123); + rowWithArray.setField(1, new GenericArray(new int[] {10, 20, 30})); + paimonRow.setField(4, rowWithArray); + + // Array of nested rows + GenericRow arrayRow1 = new GenericRow(2); + arrayRow1.setField(0, 1); + arrayRow1.setField(1, BinaryString.fromString("Alice")); + GenericRow arrayRow2 = new GenericRow(2); + arrayRow2.setField(0, 2); + arrayRow2.setField(1, BinaryString.fromString("Bob")); + paimonRow.setField(5, new GenericArray(new Object[] {arrayRow1, arrayRow2})); + + // Null nested row + paimonRow.setField(6, null); + + // Nested row with nullable fields + GenericRow rowWithNulls = new GenericRow(2); + rowWithNulls.setField(0, 42); + rowWithNulls.setField(1, null); + paimonRow.setField(7, rowWithNulls); + + // System columns + paimonRow.setField(8, 0); + paimonRow.setField(9, 0L); + + PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); + + // Test simple nested row + InternalRow simpleRow = flussRow.getRow(0, 3); + assertThat(simpleRow).isNotNull(); + assertThat(simpleRow.getInt(0)).isEqualTo(100); + assertThat(simpleRow.getString(1).toString()).isEqualTo("nested_value"); + assertThat(simpleRow.getBoolean(2)).isTrue(); + + // Test nested row with all primitive types + InternalRow allTypesRow = flussRow.getRow(1, 8); + assertThat(allTypesRow).isNotNull(); + assertThat(allTypesRow.getBoolean(0)).isTrue(); + assertThat(allTypesRow.getByte(1)).isEqualTo((byte) 127); + assertThat(allTypesRow.getShort(2)).isEqualTo((short) 32000); + assertThat(allTypesRow.getInt(3)).isEqualTo(2147483647); + assertThat(allTypesRow.getLong(4)).isEqualTo(9223372036854775807L); + assertThat(allTypesRow.getFloat(5)).isEqualTo(3.14f); + assertThat(allTypesRow.getDouble(6)).isEqualTo(2.718281828); + assertThat(allTypesRow.getString(7).toString()).isEqualTo("test_string"); + + // Test nested row with decimal and timestamp + InternalRow decimalTimestampFlussRow = flussRow.getRow(2, 2); + assertThat(decimalTimestampFlussRow).isNotNull(); + assertThat(decimalTimestampFlussRow.getDecimal(0, 10, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("123.45")); + assertThat(decimalTimestampFlussRow.getTimestampNtz(1, 3).getMillisecond()) + .isEqualTo(1698235273182L); + + // Test deeply nested row + InternalRow middleFlussRow = flussRow.getRow(3, 2); + assertThat(middleFlussRow).isNotNull(); + assertThat(middleFlussRow.getInt(0)).isEqualTo(500); + InternalRow innerFlussRow = middleFlussRow.getRow(1, 1); + assertThat(innerFlussRow).isNotNull(); + assertThat(innerFlussRow.getInt(0)).isEqualTo(999); + + // Test nested row with array field + InternalRow rowWithArrayFluss = flussRow.getRow(4, 2); + assertThat(rowWithArrayFluss).isNotNull(); + assertThat(rowWithArrayFluss.getInt(0)).isEqualTo(123); + InternalArray arrayInRow = rowWithArrayFluss.getArray(1); + assertThat(arrayInRow).isNotNull(); + assertThat(arrayInRow.size()).isEqualTo(3); + assertThat(arrayInRow.getInt(0)).isEqualTo(10); + assertThat(arrayInRow.getInt(1)).isEqualTo(20); + assertThat(arrayInRow.getInt(2)).isEqualTo(30); + + // Test array of nested rows + InternalArray arrayOfRows = flussRow.getArray(5); + assertThat(arrayOfRows).isNotNull(); + assertThat(arrayOfRows.size()).isEqualTo(2); + InternalRow row1 = arrayOfRows.getRow(0, 2); + assertThat(row1.getInt(0)).isEqualTo(1); + assertThat(row1.getString(1).toString()).isEqualTo("Alice"); + InternalRow row2 = arrayOfRows.getRow(1, 2); + assertThat(row2.getInt(0)).isEqualTo(2); + assertThat(row2.getString(1).toString()).isEqualTo("Bob"); + + // Test null nested row + assertThat(flussRow.isNullAt(6)).isTrue(); + + // Test nested row with nullable fields + InternalRow rowWithNullsFluss = flussRow.getRow(7, 2); + assertThat(rowWithNullsFluss).isNotNull(); + assertThat(rowWithNullsFluss.getInt(0)).isEqualTo(42); + assertThat(rowWithNullsFluss.isNullAt(1)).isTrue(); + } }