-
Notifications
You must be signed in to change notification settings - Fork 455
[lake] Support schema evolution for lake-enabled tables with AddColum… #2189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@buvb Thanks for your great job. I have left some adive.
Would you like to add test in PaimonTieringITCase or PaimonTieringTest for a e2e test (just like what PaimonTieringITCase#testTieringForAlterTable does). Add coumn before and bewteen tier service.
...ke-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java
Show resolved
Hide resolved
...s-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
Outdated
Show resolved
Hide resolved
| private final int bucket; | ||
| private LogRecord logRecord; | ||
| private int originRowFieldCount; | ||
| private final int businessFieldCount; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe dataFieldCount?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer businessFieldCount because it clearly distinguishes from system columns (__bucket, __offset, __timestamp). dataFieldCount might be confused with total field count.
| // update the schema | ||
| zookeeperClient.registerSchema(tablePath, newSchema, table.getSchemaId() + 1); | ||
| // update the schema in Fluss (ZK) first - Fluss is the source of truth | ||
| if (!newSchema.equals(table.getSchema())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It better check whether lakehouse can do it before zookeeperClient.registerSchema.
- check that fluss can apply schema change.
SchemaUpdate.applySchemaChanges(table, schemaChanges) - Check whether the schema can be apply to lake.
- zookeeperClient.registerSchema(tablePath, newSchema, table.getSchemaId() + 1). This can be rarely happen because only zookeeper.
- syncSchemaChangesToLake.
For example, currently, iceberg catalog is not support add column. If we zookeeperClient.registerSchema successfully, problem will occur.
Also a problem left is what if zookeeperClient.registerSchema success but syncSchemaChangesToLake fails? A assumed that:
- Add column A
- zookeeperClient.registerSchema success
- syncSchemaChangesToLake fails
- Add column B
- zookeeperClient.registerSchema success
- syncSchemaChangesToLake success
- ReAdd column A
- SchemaUpdate.applySchemaChanges is Idempotent
- syncSchemaChangesToLake success,
Finally, Fluss Table is : column A, column B
Finally, Lake Table is : column B, column A.
I advice each time before zookeeperClient.registerSchema, check whether the column number is same:
- Add column A
- zookeeperClient.registerSchema success
- syncSchemaChangesToLake fails
- Add column B
- Columns number is not same, just refuse it.
@wuchong , WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ZK success but Lake sync fails
This is already handled by the idempotent design:
- [SchemaUpdate.addColumn()] - treats existing column with same type/comment as no-op
- [PaimonLakeCatalog.alterTable()] - catches
ColumnAlreadyExistExceptionas success
So when user retries, both Fluss and Paimon will handle it gracefully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@buvb Yes, If no other add column is happened, it will handle it gracefully. However, as I mention before:
Aassumed that:
- Add column A
- zookeeperClient.registerSchema success
- syncSchemaChangesToLake fails
- Add column B
- zookeeperClient.registerSchema success
- syncSchemaChangesToLake success
- ReAdd column A
- SchemaUpdate.applySchemaChanges is Idempotent
- syncSchemaChangesToLake success.
Finally, Fluss Table is : column A, column B
Finally, Lake Table is : column B, column A.
I have been talked with jark. He advice to add column in lakehouse before fluss. Maybe each time before apply schema change to paimon, we can get the paimon rowType, and check whether is safe.
RowType rowType = paimonCatalog.getTable(new Identifier(tablePath.getDatabaseName(), tablePath.getTableName())).rowType();
fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
Show resolved
Hide resolved
|
@luoyuxia @wuchong The main changes are as follows
|
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please rebase your branch to trigger a fresh CI run? The base branch is quite outdated, and running against the latest changes will help uncover any potential issues that might otherwise be hidden.
| assertThat(flussRecordAsPaimonRow.isNullAt(1)).isTrue(); | ||
| assertThat(flussRecordAsPaimonRow.getInt(2)).isEqualTo(tableBucket); | ||
| assertThat(flussRecordAsPaimonRow.getLong(3)).isEqualTo(logOffset); | ||
| assertThat(flussRecordAsPaimonRow.getLong(4)).isEqualTo(timeStamp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should assert getTimestamp because this is a timestamp type.
| } catch (Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) { | ||
| // shouldn't happen before we support schema change | ||
| } catch (Catalog.ColumnAlreadyExistException e) { | ||
| // Column already exists, treat as idempotent success for retry scenarios. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that we may execute multiple TableChange operations in a single statement (e.g., adding several columns at once), blindly ignoring ColumnAlreadyExistException could silently skip the addition of some columns, leading to an incomplete schema update.
A simpler and safer approach, in my view, is to compare the current Paimon table schema with the expected target schema before performing any ALTER TABLE (like how you did in org.apache.fluss.server.coordinator.MetadataManager#alterTableSchema):
- If the schemas differ, proceed with the
ALTER TABLEand report any errors faithfully to the user (who can then re-execute if needed). - If the schemas already match, log a clear message (e.g., “Column(s) already exist—skipping ALTER TABLE”) and skip the operation.
This ensures correctness, avoids silent failures, and provides transparent feedback to the user.
| // Update Fluss schema (ZK) after Lake sync succeeds | ||
| if (!newSchema.equals(table.getSchema())) { | ||
| zookeeperClient.registerSchema(tablePath, newSchema, table.getSchemaId() + 1); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log a clear message (e.g., “Column(s) already exist—skipping ALTER TABLE”) and skip the operation.
| if (existingColumn != null) { | ||
| // Allow idempotent retries: if column name/type/comment match existing, treat as no-op | ||
| if (!existingColumn.getDataType().equals(addColumn.getDataType()) | ||
| || !Objects.equals(existingColumn.getComment(), addColumn.getComment())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| || !Objects.equals(existingColumn.getComment(), addColumn.getComment())) { | |
| || !Objects.equals(existingColumn.getComment().orElse(null), addColumn.getComment())) { |
The return type of existingColumn.getComment() is Optional<String> which is not comparable with addColumn.getComment()
| List<InternalRow> allRows = new ArrayList<>(); | ||
| allRows.addAll(initialRows); | ||
| allRows.addAll(newRows); | ||
| checkDataInPaimonAppendOnlyTable(tablePath, allRows, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest constructing an expected list of rows that includes all six columns (the new column, original user columns, and relevant system columns), and then asserting that this expected list exactly matches the rows read from Paimon. This will ensure comprehensive validation of both schema evolution and data correctness.
| List<String> fieldNames = paimonTable.rowType().getFieldNames(); | ||
|
|
||
| // Should have: a, b, c3, __bucket, __offset, __timestamp | ||
| assertThat(fieldNames).contains("a", "b", "c3"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert exactly all the field names and order.
Purpose
Linked issue: close #2128
This PR enables schema evolution for datalake-enabled tables, specifically supporting
ADD COLUMN ... LASTwithNULLABLEcolumns. When a user executesALTER TABLE ADD COLUMNon a lake-enabled table, the schema change is first applied to Fluss (source of truth), then synchronized to Paimon.Brief change log
LakeCatalogandLakeCatalogContextto [alterTableSchema()]for Paimon synchronizationColumnAlreadyExistExceptionas idempotent success for retry scenariosAddColumnto PaimonSchemaChange, inserting new column before system columnsmin(internalRow.getFieldCount(), businessFieldCount)Tests
LakeEnabledTableCreateITCase#testAlterLakeEnabledTableSchema- Verify ADD COLUMN syncs to Paimon with correct type and commentFlussRecordAsPaimonRowTest#testFlussRecordWiderThanPaimonSchema- Verify tiering doesn't crash when Fluss record has more fields than Paimon schemaFlussRecordAsPaimonRowTest#testPaimonSchemaWiderThanFlussRecord- Verify padding NULL for missing fieldsAPI and Format
No API or storage format changes.
Documentation
No documentation changes required for this MVP. Future documentation may be needed when more schema evolution operations are supported.