diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java index a65529648f..08c705ccb3 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java @@ -48,6 +48,7 @@ import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.ProjectedRow; import org.apache.fluss.row.indexed.IndexedRow; import org.apache.fluss.types.BigIntType; import org.apache.fluss.types.DataTypes; @@ -454,6 +455,181 @@ void testInvalidPrefixLookup() throws Exception { + "because the lookup columns [b, a] must contain all bucket keys [a, b] in order."); } + @Test + void testSingleBucketPutAutoIncColumnAndLookup() throws Exception { + Schema schema = + Schema.newBuilder() + .column("col1", DataTypes.STRING()) + .withComment("col1 is first column") + .column("col2", DataTypes.BIGINT()) + .withComment("col2 is second column, auto increment column") + .column("col3", DataTypes.STRING()) + .withComment("col3 is third column") + .enableAutoIncrement("col2") + .primaryKey("col1") + .build(); + TableDescriptor tableDescriptor = + TableDescriptor.builder().schema(schema).distributedBy(1, "col1").build(); + // create the table + TablePath tablePath = + TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_pk_table_auto_inc"); + createTable(tablePath, tableDescriptor, true); + Table autoIncTable = conn.getTable(tablePath); + Object[][] records = { + {"a", null, "batch1"}, + {"b", null, "batch1"}, + {"c", null, "batch1"}, + {"d", null, "batch1"}, + {"e", null, "batch1"}, + {"d", null, "batch2"}, + {"e", null, "batch2"} + }; + partialUpdateRecords(new String[] {"col1", "col3"}, records, autoIncTable); + + Object[][] expectedRecords = { + {"a", 0L, "batch1"}, + {"b", 1L, "batch1"}, + {"c", 2L, "batch1"}, + {"d", 3L, "batch2"}, + {"e", 4L, "batch2"} + }; + verifyRecords(expectedRecords, autoIncTable, schema); + + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.addColumn( + "col4", + DataTypes.INT(), + null, + TableChange.ColumnPosition.last())), + false) + .get(); + Table newSchemaTable = conn.getTable(tablePath); + Schema newSchema = newSchemaTable.getTableInfo().getSchema(); + + // schema change case1: read new data with new schema. + Object[][] expectedRecordsWithOldSchema = { + {"a", 0L, "batch1"}, + {"b", 1L, "batch1"}, + {"c", 2L, "batch1"}, + {"d", 3L, "batch2"}, + {"e", 4L, "batch2"} + }; + verifyRecords(expectedRecordsWithOldSchema, autoIncTable, schema); + + // schema change case2: update new data with new schema. + + Object[][] recordsWithNewSchema = { + {"a", null, "batch3", 10}, + {"b", null, "batch3", 11} + }; + partialUpdateRecords( + new String[] {"col1", "col3", "col4"}, recordsWithNewSchema, newSchemaTable); + + // schema change case3: read data with old schema. + expectedRecordsWithOldSchema[0][2] = "batch3"; + expectedRecordsWithOldSchema[1][2] = "batch3"; + verifyRecords(expectedRecordsWithOldSchema, autoIncTable, schema); + + // schema change case4: read data with new schema. + Object[][] expectedRecordsWithNewSchema = { + {"a", 0L, "batch3", 10}, + {"b", 1L, "batch3", 11}, + {"c", 2L, "batch1", null}, + {"d", 3L, "batch2", null}, + {"e", 4L, "batch2", null} + }; + verifyRecords(expectedRecordsWithNewSchema, newSchemaTable, newSchema); + + // kill and restart all tablet server + for (int i = 0; i < 3; i++) { + FLUSS_CLUSTER_EXTENSION.stopTabletServer(i); + FLUSS_CLUSTER_EXTENSION.startTabletServer(i); + } + + // reconnect fluss server + conn = ConnectionFactory.createConnection(clientConf); + newSchemaTable = conn.getTable(tablePath); + verifyRecords(expectedRecordsWithNewSchema, newSchemaTable, newSchema); + + Object[][] restartWriteRecords = {{"f", null, "batch4", 12}}; + partialUpdateRecords( + new String[] {"col1", "col3", "col4"}, restartWriteRecords, newSchemaTable); + + // The auto-increment column should start from a new segment for now, and local cached + // IDs have been discarded. + Object[][] expectedRestartWriteRecords = {{"f", 100000L, "batch4", 12}}; + verifyRecords(expectedRestartWriteRecords, newSchemaTable, newSchema); + } + + @Test + void testPutAutoIncColumnAndLookup() throws Exception { + Schema schema = + Schema.newBuilder() + .column("dt", DataTypes.STRING()) + .column("col1", DataTypes.STRING()) + .withComment("col1 is first column") + .column("col2", DataTypes.BIGINT()) + .withComment("col2 is second column, auto increment column") + .column("col3", DataTypes.STRING()) + .withComment("col3 is third column") + .enableAutoIncrement("col2") + .primaryKey("dt", "col1") + .build(); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(schema) + .partitionedBy("dt") + .distributedBy(2, "col1") + .build(); + // create the table + TablePath tablePath = + TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_pk_table_auto_inc"); + createTable(tablePath, tableDescriptor, true); + Table autoIncTable = conn.getTable(tablePath); + Object[][] records = { + {"2026-01-06", "a", null, "batch1"}, + {"2026-01-06", "b", null, "batch1"}, + {"2026-01-06", "c", null, "batch1"}, + {"2026-01-06", "d", null, "batch1"}, + {"2026-01-07", "e", null, "batch1"}, + {"2026-01-06", "a", null, "batch2"}, + {"2026-01-06", "b", null, "batch2"}, + }; + + // upsert records with auto inc column col1 null value + partialUpdateRecords(new String[] {"dt", "col1", "col3"}, records, autoIncTable); + Object[][] expectedRecords = { + {"2026-01-06", "a", 0L, "batch2"}, + {"2026-01-06", "b", 100000L, "batch2"}, + {"2026-01-06", "c", 1L, "batch1"}, + {"2026-01-06", "d", 100001L, "batch1"}, + {"2026-01-07", "e", 200000L, "batch1"} + }; + verifyRecords(expectedRecords, autoIncTable, schema); + } + + private void partialUpdateRecords(String[] targetColumns, Object[][] records, Table table) { + UpsertWriter upsertWriter = table.newUpsert().partialUpdate(targetColumns).createWriter(); + for (Object[] record : records) { + upsertWriter.upsert(row(record)); + // flush immediately to ensure auto-increment values are assigned sequentially across + // multiple buckets. + upsertWriter.flush(); + } + } + + private void verifyRecords(Object[][] records, Table table, Schema schema) throws Exception { + Lookuper lookuper = table.newLookup().createLookuper(); + ProjectedRow keyRow = ProjectedRow.from(schema.getPrimaryKeyIndexes()); + for (Object[] record : records) { + assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row(record)))) + .withSchema(schema.getRowType()) + .isEqualTo(row(record)); + } + } + @Test void testLookupForNotReadyTable() throws Exception { TablePath tablePath = TablePath.of("test_db_1", "test_lookup_unready_table_t1"); diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 9f4b603e98..1522c06352 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1456,7 +1456,18 @@ public class ConfigOptions { + "The auto increment column can only be used in primary-key table." + "With an auto increment column in the table, whenever a new row is inserted into the table, the new row will be assigned with the next available value from the auto-increment sequence." + "The auto increment column can only be used in primary-key table. The data type of the auto increment column must be INT or BIGINT." - + "Currently a table can have only one auto-increment column."); + + "Currently a table can have only one auto-increment column." + + "Adding an auto increment column to an existing table is not supported."); + + public static final ConfigOption TABLE_AUTO_INC_BATCH_SIZE = + key("table.auto-inc.batch-size") + .longType() + .defaultValue(100000L) + .withDescription( + "The batch size of auto-increment IDs fetched from the distributed counter each time. " + + "This value determines the length of the locally cached ID segment. Default: 100000. " + + "A larger batch size may cause significant auto-increment ID gaps, especially when unused cached ID segments are discarded due to TabletServer restarts or abnormal terminations. " + + "Conversely, a smaller batch size increases the frequency of ID fetch requests to the distributed counter, introducing extra network overhead and reducing write throughput and performance."); public static final ConfigOption TABLE_CHANGELOG_IMAGE = key("table.changelog.image") @@ -1468,7 +1479,7 @@ public class ConfigOptions { + "The supported modes are `FULL` (default) and `WAL`. " + "The `FULL` mode produces both UPDATE_BEFORE and UPDATE_AFTER records for update operations, capturing complete information about updates and allowing tracking of previous values. " + "The `WAL` mode does not produce UPDATE_BEFORE records. Only INSERT, UPDATE_AFTER (and DELETE if allowed) records are emitted. " - + "When WAL mode is enabled with default merge engine (no merge engine configured) and full row updates (not partial update), an optimization is applied to skip looking up old values, " + + "When WAL mode is enabled, the default merge engine is used (no merge engine configured), updates are full row updates (not partial update), and there is no auto-increment column, an optimization is applied to skip looking up old values, " + "and in this case INSERT operations are converted to UPDATE_AFTER events. " + "This mode reduces storage and transmission costs but loses the ability to track previous values. " + "This option only affects primary key tables."); diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/ChangelogImage.java b/fluss-common/src/main/java/org/apache/fluss/metadata/ChangelogImage.java index 02f7a5750e..094b4601e5 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/ChangelogImage.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/ChangelogImage.java @@ -36,11 +36,12 @@ public enum ChangelogImage { /** * WAL mode does not produce UPDATE_BEFORE records. Only INSERT, UPDATE_AFTER (and DELETE if - * allowed) records are emitted. When WAL mode is enabled with default merge engine (no merge - * engine configured) and full row updates (not partial update), an optimization is applied to - * skip looking up old values, and in this case INSERT operations are converted to UPDATE_AFTER - * events, similar to database WAL (Write-Ahead Log) behavior. This mode reduces storage and - * transmission costs but loses the ability to track previous values. + * allowed) records are emitted. When WAL mode is enabled, the default merge engine (no merge + * engine configured) us used, updates are full row (not partial update), and there is no + * auto-increment column, an optimization is applied to skip looking up old values, and in this + * case INSERT operations are converted to UPDATE_AFTER events, similar to database WAL + * (Write-Ahead Log) behavior. This mode reduces storage and transmission costs but loses the + * ability to track previous values. */ WAL; diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java index 63066849b5..5778287355 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java @@ -128,6 +128,15 @@ public int[] getPrimaryKeyIndexes() { .orElseGet(() -> new int[0]); } + /** Returns the auto-increment columnIds, if any, otherwise returns an empty array. */ + public int[] getAutoIncColumnIds() { + Set autoIncColSet = new HashSet<>(getAutoIncrementColumnNames()); + return getColumns().stream() + .filter(column -> autoIncColSet.contains(column.getName())) + .mapToInt(Column::getColumnId) + .toArray(); + } + /** Returns the primary key column names, if any, otherwise returns an empty array. */ public List getPrimaryKeyColumnNames() { return getPrimaryKey().map(PrimaryKey::getColumnNames).orElse(Collections.emptyList()); @@ -170,6 +179,11 @@ public List getColumnNames(int[] columnIndexes) { return columnNames; } + /** Returns the column name in given column index. */ + public String getColumnName(int columnIndex) { + return columns.get(columnIndex).columnName; + } + /** Returns the indexes of the fields in the schema. */ public int[] getColumnIndexes(List keyNames) { int[] keyIndexes = new int[keyNames.size()]; @@ -696,9 +710,7 @@ private static List normalizeColumns( } // primary key and auto increment column should not nullable - if ((pkSet.contains(column.getName()) - || autoIncrementColumnNames.contains(column.getName())) - && column.getDataType().isNullable()) { + if (pkSet.contains(column.getName()) && column.getDataType().isNullable()) { newColumns.add( new Column( column.getName(), diff --git a/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java b/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java index 018214b3fb..39d27b4887 100644 --- a/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java @@ -88,7 +88,7 @@ public class SchemaJsonSerdeTest extends JsonSerdeTestBase { "{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"BIGINT\"},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"STRING\"},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"TIMESTAMP_WITHOUT_TIME_ZONE\",\"precision\":6},\"comment\":\"c is third column\",\"id\":2}],\"highest_field_id\":2}"; static final String SCHEMA_JSON_4 = - "{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c is third column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}"; + "{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c is third column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}"; static final Schema SCHEMA_WITH_AGG = Schema.newBuilder() diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java index 15e59295c7..6d6a20f4ca 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java @@ -1453,4 +1453,62 @@ void testWalModeWithDefaultMergeEngineAndAggregation() throws Exception { // Collect results with timeout assertQueryResultExactOrder(tEnv, aggQuery, expectedAggResults); } + + @Test + void testWalModeWithAutoInc() throws Exception { + // use single parallelism to make result ordering stable + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + + String tableName = "wal_mode_pk_table"; + // Create a table with WAL mode and auto increment column + tEnv.executeSql( + String.format( + "create table %s (" + + " id int not null," + + " auto_inc_id bigint," + + " amount bigint," + + " primary key (id) not enforced" + + ") with ('table.changelog.image' = 'wal', 'table.auto-increment.fields'='auto_inc_id')", + tableName)); + + // Insert initial data + tEnv.executeSql( + String.format( + "INSERT INTO %s (id, amount) VALUES " + + "(1, 100), " + + "(2, 200), " + + "(3, 150), " + + "(4, 250)", + tableName)) + .await(); + + // Use batch mode to update and delete records + + // Upsert data, not support update/delete rows in table with auto-inc column for now. + // TODO: Support Batch Update + tEnv.executeSql( + String.format( + "INSERT INTO %s (id, amount) VALUES " + "(1, 120), " + "(3, 180)", + tableName)) + .await(); + + List expectedResults = + Arrays.asList( + "+I[1, 0, 100]", + "+I[2, 1, 200]", + "+I[3, 2, 150]", + "+I[4, 3, 250]", + "-U[1, 0, 100]", + "+U[1, 0, 120]", + "-U[3, 2, 150]", + "+U[3, 2, 180]"); + + // Collect results with timeout + assertQueryResultExactOrder( + tEnv, + String.format( + "SELECT id, auto_inc_id, amount FROM %s /*+ OPTIONS('scan.startup.mode' = 'earliest') */", + tableName), + expectedResults); + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java b/fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java index 7145d9d7fc..a5d3ff0252 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java @@ -26,4 +26,12 @@ public interface SequenceIDCounter { * @return The previous sequence ID */ long getAndIncrement() throws Exception; + + /** + * Atomically adds the given delta to the sequence ID. + * + * @param delta The delta to add + * @return The previous sequence ID + */ + long getAndAdd(Long delta) throws Exception; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java index 3162e4bed1..a8d91bb99b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java @@ -45,12 +45,14 @@ public static Schema applySchemaChanges(TableInfo tableInfo, List c private final AtomicInteger highestFieldId; private final List primaryKeys; private final Map existedColumns; + private final List autoIncColumns; public SchemaUpdate(TableInfo tableInfo) { this.columns = new ArrayList<>(); this.existedColumns = new HashMap<>(); this.highestFieldId = new AtomicInteger(tableInfo.getSchema().getHighestFieldId()); this.primaryKeys = tableInfo.getPrimaryKeys(); + this.autoIncColumns = tableInfo.getSchema().getAutoIncrementColumnNames(); this.columns.addAll(tableInfo.getSchema().getColumns()); for (Schema.Column column : columns) { existedColumns.put(column.getName(), column); @@ -65,6 +67,9 @@ public Schema getSchema() { if (!primaryKeys.isEmpty()) { builder.primaryKey(primaryKeys); } + for (String autoIncColumn : autoIncColumns) { + builder.enableAutoIncrement(autoIncColumn); + } return builder.build(); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java index 1637af39d1..39a1b77fdb 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java @@ -36,6 +36,7 @@ import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.server.TabletManagerBase; +import org.apache.fluss.server.kv.autoinc.AutoIncManager; import org.apache.fluss.server.kv.rowmerger.RowMerger; import org.apache.fluss.server.log.LogManager; import org.apache.fluss.server.log.LogTablet; @@ -248,6 +249,10 @@ public KvTablet getOrCreateKv( File tabletDir = getOrCreateTabletDir(tablePath, tableBucket); RowMerger merger = RowMerger.create(tableConfig, kvFormat, schemaGetter); + AutoIncManager autoIncManager = + new AutoIncManager( + schemaGetter, tablePath.getTablePath(), conf, zkClient); + KvTablet tablet = KvTablet.create( tablePath, @@ -263,7 +268,8 @@ public KvTablet getOrCreateKv( arrowCompressionInfo, schemaGetter, tableConfig.getChangelogImage(), - sharedRocksDBRateLimiter); + sharedRocksDBRateLimiter, + autoIncManager); currentKvs.put(tableBucket, tablet); LOG.info( @@ -358,6 +364,8 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti TableConfig tableConfig = tableInfo.getTableConfig(); RowMerger rowMerger = RowMerger.create(tableConfig, tableConfig.getKvFormat(), schemaGetter); + AutoIncManager autoIncManager = + new AutoIncManager(schemaGetter, tablePath, tableInfo.getProperties(), zkClient); KvTablet kvTablet = KvTablet.create( physicalTablePath, @@ -373,7 +381,8 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti tableConfig.getArrowCompressionInfo(), schemaGetter, tableConfig.getChangelogImage(), - sharedRocksDBRateLimiter); + sharedRocksDBRateLimiter, + autoIncManager); if (this.currentKvs.containsKey(tableBucket)) { throw new IllegalStateException( String.format( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index 0b43f0a316..07d25ba6bf 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -45,6 +45,8 @@ import org.apache.fluss.row.arrow.ArrowWriterPool; import org.apache.fluss.row.arrow.ArrowWriterProvider; import org.apache.fluss.row.encode.ValueDecoder; +import org.apache.fluss.server.kv.autoinc.AutoIncManager; +import org.apache.fluss.server.kv.autoinc.AutoIncUpdater; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason; import org.apache.fluss.server.kv.rocksdb.RocksDBKv; @@ -113,6 +115,7 @@ public final class KvTablet { // defines how to merge rows on the same primary key private final RowMerger rowMerger; private final ArrowCompressionInfo arrowCompressionInfo; + private final AutoIncManager autoIncManager; private final SchemaGetter schemaGetter; @@ -147,7 +150,8 @@ private KvTablet( ArrowCompressionInfo arrowCompressionInfo, SchemaGetter schemaGetter, ChangelogImage changelogImage, - @Nullable RocksDBStatistics rocksDBStatistics) { + @Nullable RocksDBStatistics rocksDBStatistics, + AutoIncManager autoIncManager) { this.physicalPath = physicalPath; this.tableBucket = tableBucket; this.logTablet = logTablet; @@ -164,6 +168,7 @@ private KvTablet( this.schemaGetter = schemaGetter; this.changelogImage = changelogImage; this.rocksDBStatistics = rocksDBStatistics; + this.autoIncManager = autoIncManager; } public static KvTablet create( @@ -180,7 +185,8 @@ public static KvTablet create( ArrowCompressionInfo arrowCompressionInfo, SchemaGetter schemaGetter, ChangelogImage changelogImage, - RateLimiter sharedRateLimiter) + RateLimiter sharedRateLimiter, + AutoIncManager autoIncManager) throws IOException { RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir, sharedRateLimiter); @@ -212,7 +218,8 @@ public static KvTablet create( arrowCompressionInfo, schemaGetter, changelogImage, - rocksDBStatistics); + rocksDBStatistics, + autoIncManager); } private static RocksDBKv buildRocksDBKv( @@ -300,6 +307,8 @@ public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] target RowMerger currentMerger = rowMerger.configureTargetColumns( targetColumns, latestSchemaId, latestSchema); + AutoIncUpdater currentAutoIncUpdater = + autoIncManager.getUpdaterForSchema(kvFormat, latestSchemaId); RowType latestRowType = latestSchema.getRowType(); WalBuilder walBuilder = createWalBuilder(latestSchemaId, latestRowType); @@ -316,6 +325,7 @@ public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] target kvRecords, kvRecords.schemaId(), currentMerger, + currentAutoIncUpdater, walBuilder, latestSchemaRow, logEndOffsetOfPrevBatch); @@ -369,6 +379,7 @@ private void processKvRecords( KvRecordBatch kvRecords, short schemaIdOfNewData, RowMerger currentMerger, + AutoIncUpdater currentAutoIncUpdater, WalBuilder walBuilder, PaddingRow latestSchemaRow, long startLogOffset) @@ -401,6 +412,7 @@ private void processKvRecords( key, currentValue, currentMerger, + currentAutoIncUpdater, valueDecoder, walBuilder, latestSchemaRow, @@ -450,22 +462,30 @@ private long processUpsert( KvPreWriteBuffer.Key key, BinaryValue currentValue, RowMerger currentMerger, + AutoIncUpdater currentAutoIncUpdater, ValueDecoder valueDecoder, WalBuilder walBuilder, PaddingRow latestSchemaRow, long logOffset) throws Exception { - // Optimization: when using WAL mode and merger is DefaultRowMerger (full update, not - // partial update), we can skip fetching old value for better performance since it - // always returns new value. In this case, both INSERT and UPDATE will produce - // UPDATE_AFTER. - if (changelogImage == ChangelogImage.WAL && currentMerger instanceof DefaultRowMerger) { + // Optimization: IN WAL mode,when using DefaultRowMerger (full update, not partial update) + // and there is no auto-increment column, we can skip fetching old value for better + // performance since the result always reflects the new value. In this case, both INSERT and + // UPDATE will produce UPDATE_AFTER. + if (changelogImage == ChangelogImage.WAL + && !currentAutoIncUpdater.hasAutoIncrement() + && currentMerger instanceof DefaultRowMerger) { return applyUpdate(key, null, currentValue, walBuilder, latestSchemaRow, logOffset); } byte[] oldValueBytes = getFromBufferOrKv(key); if (oldValueBytes == null) { - return applyInsert(key, currentValue, walBuilder, latestSchemaRow, logOffset); + return applyInsert( + key, + currentAutoIncUpdater.updateAutoInc(currentValue), + walBuilder, + latestSchemaRow, + logOffset); } BinaryValue oldValue = valueDecoder.decodeValue(oldValueBytes); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncManager.java new file mode 100644 index 0000000000..47744d9501 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncManager.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.autoinc; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.KvFormat; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.SchemaGetter; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.zk.ZkSequenceIDCounter; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.ZkData; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.time.Duration; + +import static org.apache.fluss.utils.Preconditions.checkState; + +/** + * Manages auto-increment logic for tables, providing schema-specific updaters that handle + * auto-increment column assignment during row writes. + */ +@NotThreadSafe +public class AutoIncManager { + // No-op implementation that returns the input unchanged. + public static final AutoIncUpdater NO_OP_UPDATER = rowValue -> rowValue; + + private final SchemaGetter schemaGetter; + private final Cache autoIncUpdaters; + private final int autoIncColumnId; + private final SequenceGenerator sequenceGenerator; + + public AutoIncManager( + SchemaGetter schemaGetter, + TablePath tablePath, + Configuration properties, + ZooKeeperClient zkClient) { + this.autoIncUpdaters = + Caffeine.newBuilder() + .maximumSize(5) + .expireAfterAccess(Duration.ofMinutes(5)) + .build(); + this.schemaGetter = schemaGetter; + int schemaId = schemaGetter.getLatestSchemaInfo().getSchemaId(); + Schema schema = schemaGetter.getSchema(schemaId); + int[] autoIncColumnIds = schema.getAutoIncColumnIds(); + + checkState( + autoIncColumnIds.length <= 1, + "Only support one auto increment column for a table, but got %d.", + autoIncColumnIds.length); + + if (autoIncColumnIds.length == 1) { + autoIncColumnId = autoIncColumnIds[0]; + sequenceGenerator = + new SegmentSequenceGenerator( + tablePath, + autoIncColumnId, + schema.getColumnName(autoIncColumnId), + new ZkSequenceIDCounter( + zkClient.getCuratorClient(), + ZkData.AutoIncrementColumnZNode.path( + tablePath, autoIncColumnId)), + properties); + } else { + autoIncColumnId = -1; + sequenceGenerator = null; + } + } + + // Supports removing or reordering columns; does NOT support adding an auto-increment column to + // an existing table. + public AutoIncUpdater getUpdaterForSchema(KvFormat kvFormat, int latestSchemaId) { + return autoIncUpdaters.get(latestSchemaId, k -> createAutoIncUpdater(kvFormat, k)); + } + + private AutoIncUpdater createAutoIncUpdater(KvFormat kvFormat, int schemaId) { + Schema schema = schemaGetter.getSchema(schemaId); + int[] autoIncColumnIds = schema.getAutoIncColumnIds(); + if (autoIncColumnId == -1) { + checkState( + autoIncColumnIds.length == 0, + "Cannot add auto-increment column after table creation."); + } else { + checkState( + autoIncColumnIds.length == 1 && autoIncColumnIds[0] == autoIncColumnId, + "Auto-increment column cannot be changed after table creation."); + } + if (autoIncColumnIds.length == 1) { + return new PerSchemaAutoIncUpdater( + kvFormat, (short) schemaId, schema, autoIncColumnIds[0], sequenceGenerator); + } else { + return NO_OP_UPDATER; + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncUpdater.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncUpdater.java new file mode 100644 index 0000000000..baf9c85df8 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncUpdater.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.autoinc; + +import org.apache.fluss.record.BinaryValue; + +/** A updater to auto increment column . */ +public interface AutoIncUpdater { + + /** + * Updates the auto-increment column in the given row by replacing its value with a new sequence + * number. + * + *

This method may return a new {@link BinaryValue} instance or the same instance if no + * update is needed (e.g., in a no-op implementation). + * + * @param rowValue the input row in binary form, must not be {@code null} + * @return a {@link BinaryValue} representing the updated row; never {@code null} + */ + BinaryValue updateAutoInc(BinaryValue rowValue); + + /** + * Returns whether this updater actually performs auto-increment logic. + * + * @return {@code true} if auto-increment is active; {@code false} otherwise. + */ + default boolean hasAutoIncrement() { + return false; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncUpdater.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncUpdater.java new file mode 100644 index 0000000000..70e4241568 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/PerSchemaAutoIncUpdater.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.autoinc; + +import org.apache.fluss.metadata.KvFormat; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.record.BinaryValue; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.encode.RowEncoder; +import org.apache.fluss.types.DataType; + +import javax.annotation.concurrent.NotThreadSafe; + +/** + * An {@link AutoIncUpdater} implementation that assigns auto-increment values to a specific column + * based on a fixed schema. It is bound to a particular schema version and assumes the + * auto-increment column position remains constant within that schema. + * + *

This class is not thread-safe and is intended to be used within a single-threaded execution + * context. + */ +@NotThreadSafe +public class PerSchemaAutoIncUpdater implements AutoIncUpdater { + private final InternalRow.FieldGetter[] flussFieldGetters; + private final RowEncoder rowEncoder; + private final DataType[] fieldDataTypes; + private final int targetColumnIdx; + private final SequenceGenerator idGenerator; + private final short schemaId; + + public PerSchemaAutoIncUpdater( + KvFormat kvFormat, + short schemaId, + Schema schema, + int autoIncColumnId, + SequenceGenerator sequenceGenerator) { + DataType[] fieldDataTypes = schema.getRowType().getChildren().toArray(new DataType[0]); + + // getter for the fields in row + InternalRow.FieldGetter[] flussFieldGetters = + new InternalRow.FieldGetter[fieldDataTypes.length]; + for (int i = 0; i < fieldDataTypes.length; i++) { + flussFieldGetters[i] = InternalRow.createFieldGetter(fieldDataTypes[i], i); + } + this.idGenerator = sequenceGenerator; + this.schemaId = schemaId; + this.targetColumnIdx = schema.getColumnIds().indexOf(autoIncColumnId); + if (targetColumnIdx == -1) { + throw new IllegalStateException( + String.format( + "Auto-increment column ID %d not found in schema columns: %s", + autoIncColumnId, schema.getColumnIds())); + } + this.rowEncoder = RowEncoder.create(kvFormat, fieldDataTypes); + this.fieldDataTypes = fieldDataTypes; + this.flussFieldGetters = flussFieldGetters; + } + + public BinaryValue updateAutoInc(BinaryValue rowValue) { + rowEncoder.startNewRow(); + for (int i = 0; i < fieldDataTypes.length; i++) { + if (targetColumnIdx == i) { + rowEncoder.encodeField(i, idGenerator.nextVal()); + } else { + // use the row value + rowEncoder.encodeField(i, flussFieldGetters[i].getFieldOrNull(rowValue.row)); + } + } + return new BinaryValue(schemaId, rowEncoder.finishRow()); + } + + @Override + public boolean hasAutoIncrement() { + return true; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGenerator.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGenerator.java new file mode 100644 index 0000000000..ac522f7bca --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SegmentSequenceGenerator.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.autoinc; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.SequenceIDCounter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.NotThreadSafe; + +/** Segment ID generator, fetch ID with a batch size. */ +@NotThreadSafe +public class SegmentSequenceGenerator implements SequenceGenerator { + private static final Logger LOG = LoggerFactory.getLogger(SegmentSequenceGenerator.class); + + private final SequenceIDCounter sequenceIDCounter; + private final TablePath tablePath; + private final int columnIdx; + private final String columnName; + + private AutoIncIdSegment segment = new AutoIncIdSegment(0, 0); + + private final long batchSize; + + public SegmentSequenceGenerator( + TablePath tablePath, + int columnIdx, + String columnName, + SequenceIDCounter sequenceIDCounter, + Configuration properties) { + batchSize = properties.getLong(ConfigOptions.TABLE_AUTO_INC_BATCH_SIZE); + this.columnName = columnName; + this.tablePath = tablePath; + this.columnIdx = columnIdx; + this.sequenceIDCounter = sequenceIDCounter; + } + + private void fetchSegment() { + try { + long start = sequenceIDCounter.getAndAdd(batchSize); + LOG.info( + "Successfully fetch auto-increment values range [{}, {}), table_path={}, column_idx={}, column_name={}.", + start, + start + batchSize, + tablePath, + columnIdx, + columnName); + segment = new AutoIncIdSegment(start, batchSize); + } catch (Exception e) { + throw new FlussRuntimeException( + String.format( + "Failed to fetch auto-increment values, table_path=%s, column_idx=%d, column_name=%s.", + tablePath, columnIdx, columnName), + e); + } + } + + @Override + public long nextVal() { + if (segment.remaining() <= 0) { + fetchSegment(); + } + return segment.tryNextVal(); + } + + private static class AutoIncIdSegment { + private long current; + private final long end; + + public AutoIncIdSegment(long start, long length) { + this.end = start + length; + this.current = start; + } + + public long remaining() { + return end - current; + } + + public long tryNextVal() { + long id = current++; + if (id >= end) { + throw new RuntimeException("Segment is empty"); + } + return id; + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGenerator.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGenerator.java new file mode 100644 index 0000000000..d448ffa4cb --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/SequenceGenerator.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.autoinc; + +/** SequenceGenerator is used to generate auto increment column ID. */ +public interface SequenceGenerator { + + /** + * Retrieves the next sequential value for the auto-increment column. + * + *

This method provides the next available value for auto-increment columns. + * + * @return the next sequential value of the auto-increment column + */ + long nextVal(); +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkSequenceIDCounter.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkSequenceIDCounter.java index acbecfa63b..7781340bc5 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkSequenceIDCounter.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkSequenceIDCounter.java @@ -34,9 +34,11 @@ public class ZkSequenceIDCounter implements SequenceIDCounter { private static final int BASE_SLEEP_MS = 100; private static final int MAX_SLEEP_MS = 1000; + private final String sequenceIDPath; private final DistributedAtomicLong sequenceIdCounter; public ZkSequenceIDCounter(CuratorFramework curatorClient, String sequenceIDPath) { + this.sequenceIDPath = sequenceIDPath; sequenceIdCounter = new DistributedAtomicLong( curatorClient, @@ -56,7 +58,28 @@ public long getAndIncrement() throws Exception { if (incrementValue.succeeded()) { return incrementValue.preValue(); } else { - throw new Exception("Failed to increment sequence id counter."); + throw new Exception( + String.format( + "Failed to increment sequence id counter. ZooKeeper sequence ID path: %s.", + sequenceIDPath)); + } + } + + /** + * Atomically adds the given delta to the current sequence ID. + * + * @return The previous sequence ID + */ + @Override + public long getAndAdd(Long delta) throws Exception { + AtomicValue incrementValue = sequenceIdCounter.add(delta); + if (incrementValue.succeeded()) { + return incrementValue.preValue(); + } else { + throw new Exception( + String.format( + "Failed to increment sequence id counter. ZooKeeper sequence ID path: %s, Delta value: %d.", + sequenceIDPath, delta)); } } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java index 4798623a74..24781f922e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java @@ -252,6 +252,28 @@ public static String path() { } } + /** + * The znode for auto increment columns of a table. The znode path is: + * + *

/metadata/databases/[databaseName]/tables/[tableName]/auto_inc + */ + public static final class AutoIncrementColumnsZNode { + public static String path(TablePath tablePath) { + return TableZNode.path(tablePath) + "/auto_inc"; + } + } + + /** + * The znode for auto increment column. The znode path is: + * + *

/metadata/databases/[databaseName]/tables/[tableName]/auto_inc/col_[columnId] + */ + public static final class AutoIncrementColumnZNode { + public static String path(TablePath tablePath, int columnId) { + return AutoIncrementColumnsZNode.path(tablePath) + String.format("/col_%d", columnId); + } + } + /** * The znode used to generate a sequence unique id for a partition. The znode path is: * diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java index 52b1080ed1..abf2328fa6 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java @@ -45,6 +45,7 @@ import org.apache.fluss.record.bytesview.MultiBytesView; import org.apache.fluss.row.BinaryRow; import org.apache.fluss.row.encode.ValueEncoder; +import org.apache.fluss.server.kv.autoinc.AutoIncManager; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Key; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.KvEntry; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Value; @@ -181,6 +182,10 @@ private KvTablet createKvTablet( throws Exception { TableConfig tableConf = new TableConfig(Configuration.fromMap(tableConfig)); RowMerger rowMerger = RowMerger.create(tableConf, KvFormat.COMPACTED, schemaGetter); + AutoIncManager autoIncManager = + new AutoIncManager( + schemaGetter, tablePath.getTablePath(), new Configuration(), null); + return KvTablet.create( tablePath, tableBucket, @@ -195,7 +200,8 @@ private KvTablet createKvTablet( DEFAULT_COMPRESSION, schemaGetter, tableConf.getChangelogImage(), - KvManager.getDefaultRateLimiter()); + KvManager.getDefaultRateLimiter(), + autoIncManager); } @Test diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentIncIDGeneratorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentIncIDGeneratorTest.java new file mode 100644 index 0000000000..1b750b6a1a --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/autoinc/SegmentIncIDGeneratorTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.autoinc; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.SequenceIDCounter; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicLong; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test class for {@link SegmentSequenceGenerator}. */ +class SegmentSequenceGeneratorTest { + + private static final TablePath TABLE_PATH = new TablePath("test_db", "test_table"); + private static final int COLUMN_IDX = 0; + private static final String COLUMN_NAME = "id"; + private static final long BATCH_SIZE = 100; + + private AtomicLong snapshotIdGenerator; + private Configuration configuration; + + @BeforeEach + void setUp() { + snapshotIdGenerator = new AtomicLong(0); + Map map = new HashMap<>(); + map.put(ConfigOptions.TABLE_AUTO_INC_BATCH_SIZE.key(), String.valueOf(BATCH_SIZE)); + configuration = Configuration.fromMap(map); + } + + @Test + void testNextValBasicContinuousId() { + SegmentSequenceGenerator generator = + new SegmentSequenceGenerator( + TABLE_PATH, + COLUMN_IDX, + COLUMN_NAME, + new TestingSnapshotIDCounter(snapshotIdGenerator), + configuration); + for (long i = 0; i < BATCH_SIZE; i++) { + assertThat(generator.nextVal()).isEqualTo(i); + } + + for (long i = BATCH_SIZE; i < 2 * BATCH_SIZE; i++) { + assertThat(generator.nextVal()).isEqualTo(i); + } + } + + @Test + void testMultiGenerator() { + ConcurrentLinkedDeque linkedDeque = new ConcurrentLinkedDeque<>(); + + for (int i = 0; i < 20; i++) { + new Thread( + () -> { + SegmentSequenceGenerator generator = + new SegmentSequenceGenerator( + new TablePath("test_db", "table1"), + COLUMN_IDX, + COLUMN_NAME + "_table1", + new TestingSnapshotIDCounter(snapshotIdGenerator), + configuration); + for (int j = 0; j < 130; j++) { + linkedDeque.add(generator.nextVal()); + } + }) + .start(); + } + + assertThat(linkedDeque.stream().mapToLong(Long::longValue).max().orElse(0)) + .isLessThanOrEqualTo(40 * BATCH_SIZE); + } + + @Test + void testFetchFailed() { + SegmentSequenceGenerator generator = + new SegmentSequenceGenerator( + new TablePath("test_db", "table1"), + COLUMN_IDX, + COLUMN_NAME + "_table1", + new TestingSnapshotIDCounter(snapshotIdGenerator, 2), + configuration); + for (int j = 0; j < BATCH_SIZE; j++) { + assertThat(generator.nextVal()).isEqualTo(j); + } + assertThatThrownBy(generator::nextVal) + .isInstanceOf(FlussRuntimeException.class) + .hasMessage( + String.format( + "Failed to fetch auto-increment values, table_path=%s, column_idx=%d, column_name=%s.", + "test_db.table1", COLUMN_IDX, COLUMN_NAME + "_table1")); + } + + private static class TestingSnapshotIDCounter implements SequenceIDCounter { + + private final AtomicLong snapshotIdGenerator; + private int fetchTime; + private final int failedTrigger; + + public TestingSnapshotIDCounter(AtomicLong snapshotIdGenerator) { + this(snapshotIdGenerator, Integer.MAX_VALUE); + } + + public TestingSnapshotIDCounter(AtomicLong snapshotIdGenerator, int failedTrigger) { + this.snapshotIdGenerator = snapshotIdGenerator; + fetchTime = 0; + this.failedTrigger = failedTrigger; + } + + @Override + public long getAndIncrement() { + return snapshotIdGenerator.getAndIncrement(); + } + + @Override + public long getAndAdd(Long delta) { + if (++fetchTime < failedTrigger) { + return snapshotIdGenerator.getAndAdd(delta); + } + throw new RuntimeException("Failed to get snapshot ID"); + } + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java index 9f9ea078da..9f26bc7726 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java @@ -659,6 +659,11 @@ private class TestingSnapshotIDCounter implements SequenceIDCounter { public long getAndIncrement() { return snapshotIdGenerator.getAndIncrement(); } + + @Override + public long getAndAdd(Long delta) { + return snapshotIdGenerator.getAndAdd(delta); + } } private enum SnapshotFailType { diff --git a/website/docs/engine-flink/options.md b/website/docs/engine-flink/options.md index ad389b1f5b..cb435bb026 100644 --- a/website/docs/engine-flink/options.md +++ b/website/docs/engine-flink/options.md @@ -61,33 +61,34 @@ See more details about [ALTER TABLE ... SET](engine-flink/ddl.md#set-properties) ## Storage Options -| Option | Type | Default | Description | -|-----------------------------------------|----------|-------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| bucket.num | int | The bucket number of Fluss cluster. | The number of buckets of a Fluss table. | -| bucket.key | String | (None) | Specific the distribution policy of the Fluss table. Data will be distributed to each bucket according to the hash value of bucket-key (It must be a subset of the primary keys excluding partition keys of the primary key table). If you specify multiple fields, delimiter is `,`. If the table has a primary key and a bucket key is not specified, the bucket key will be used as primary key(excluding the partition key). If the table has no primary key and the bucket key is not specified, the data will be distributed to each bucket randomly. | -| table.log.ttl | Duration | 7 days | The time to live for log segments. The configuration controls the maximum time we will retain a log before we will delete old segments to free up space. If set to -1, the log will not be deleted. | -| table.auto-partition.enabled | Boolean | false | Whether enable auto partition for the table. Disable by default. When auto partition is enabled, the partitions of the table will be created automatically. | -| table.auto-partition.key | String | (None) | This configuration defines the time-based partition key to be used for auto-partitioning when a table is partitioned with multiple keys. Auto-partitioning utilizes a time-based partition key to handle partitions automatically, including creating new ones and removing outdated ones, by comparing the time value of the partition with the current system time. In the case of a table using multiple partition keys (such as a composite partitioning strategy), this feature determines which key should serve as the primary time dimension for making auto-partitioning decisions. And If the table has only one partition key, this config is not necessary. Otherwise, it must be specified. | -| table.auto-partition.time-unit | ENUM | DAY | The time granularity for auto created partitions. The default value is `DAY`. Valid values are `HOUR`, `DAY`, `MONTH`, `QUARTER`, `YEAR`. If the value is `HOUR`, the partition format for auto created is yyyyMMddHH. If the value is `DAY`, the partition format for auto created is yyyyMMdd. If the value is `MONTH`, the partition format for auto created is yyyyMM. If the value is `QUARTER`, the partition format for auto created is yyyyQ. If the value is `YEAR`, the partition format for auto created is yyyy. | -| table.auto-partition.num-precreate | Integer | 2 | The number of partitions to pre-create for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11 and the value is configured as 3, then partitions 20241111, 20241112, 20241113 will be pre-created. If any one partition exists, it'll skip creating the partition. The default value is 2, which means 2 partitions will be pre-created. If the `table.auto-partition.time-unit` is `DAY`(default), one precreated partition is for today and another one is for tomorrow. For a partition table with multiple partition keys, pre-create is unsupported and will be set to 0 automatically when creating table if it is not explicitly specified. | -| table.auto-partition.num-retention | Integer | 7 | The number of history partitions to retain for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11, time-unit is DAY, and the value is configured as 3, then the history partitions 20241108, 20241109, 20241110 will be retained. The partitions earlier than 20241108 will be deleted. The default value is 7, which means that 7 partitions will be retained. | -| table.auto-partition.time-zone | String | the system time zone | The time zone for auto partitions, which is by default the same as the system time zone. | -| table.replication.factor | Integer | (None) | The replication factor for the log of the new table. When it's not set, Fluss will use the cluster's default replication factor configured by default.replication.factor. It should be a positive number and not larger than the number of tablet servers in the Fluss cluster. A value larger than the number of tablet servers in Fluss cluster will result in an error when the new table is created. | -| table.log.format | Enum | ARROW | The format of the log records in log store. The default value is `ARROW`. The supported formats are `ARROW`, `INDEXED` and `COMPACTED`. | -| table.log.arrow.compression.type | Enum | ZSTD | The compression type of the log records if the log format is set to `ARROW`. The candidate compression type is `NONE`, `LZ4_FRAME`, `ZSTD`. The default value is `ZSTD`. | -| table.log.arrow.compression.zstd.level | Integer | 3 | The compression level of the log records if the log format is set to `ARROW` and the compression type is set to `ZSTD`. The valid range is 1 to 22. The default value is 3. | -| table.kv.format | Enum | COMPACTED | The format of the kv records in kv store. The default value is `COMPACTED`. The supported formats are `COMPACTED` and `INDEXED`. | -| table.log.tiered.local-segments | Integer | 2 | The number of log segments to retain in local for each table when log tiered storage is enabled. It must be greater that 0. The default is 2. | -| table.datalake.enabled | Boolean | false | Whether enable lakehouse storage for the table. Disabled by default. When this option is set to ture and the datalake tiering service is up, the table will be tiered and compacted into datalake format stored on lakehouse storage. | -| table.datalake.format | Enum | (None) | The data lake format of the table specifies the tiered Lakehouse storage format. Currently, supported formats are `paimon`, `iceberg`, and `lance`. In the future, more kinds of data lake format will be supported, such as DeltaLake or Hudi. Once the `table.datalake.format` property is configured, Fluss adopts the key encoding and bucketing strategy used by the corresponding data lake format. This ensures consistency in key encoding and bucketing, enabling seamless **Union Read** functionality across Fluss and Lakehouse. The `table.datalake.format` can be pre-defined before enabling `table.datalake.enabled`. This allows the data lake feature to be dynamically enabled on the table without requiring table recreation. If `table.datalake.format` is not explicitly set during table creation, the table will default to the format specified by the `datalake.format` configuration in the Fluss cluster. | -| table.datalake.freshness | Duration | 3min | It defines the maximum amount of time that the datalake table's content should lag behind updates to the Fluss table. Based on this target freshness, the Fluss service automatically moves data from the Fluss table and updates to the datalake table, so that the data in the datalake table is kept up to date within this target. If the data does not need to be as fresh, you can specify a longer target freshness time to reduce costs. | -| table.datalake.auto-compaction | Boolean | false | If true, compaction will be triggered automatically when tiering service writes to the datalake. It is disabled by default. | -| table.datalake.auto-expire-snapshot | Boolean | false | If true, snapshot expiration will be triggered automatically when tiering service commits to the datalake. It is disabled by default. | -| table.merge-engine | Enum | (None) | Defines the merge engine for the primary key table. By default, primary key table uses the [default merge engine(last_row)](table-design/merge-engines/default.md). It also supports two merge engines are `first_row`, `versioned` and `aggregation`. The [first_row merge engine](table-design/merge-engines/first-row.md) will keep the first row of the same primary key. The [versioned merge engine](table-design/merge-engines/versioned.md) will keep the row with the largest version of the same primary key. The `aggregation` merge engine will aggregate rows with the same primary key using field-level aggregate functions. | -| table.merge-engine.versioned.ver-column | String | (None) | The column name of the version column for the `versioned` merge engine. If the merge engine is set to `versioned`, the version column must be set. | -| table.delete.behavior | Enum | ALLOW | Controls the behavior of delete operations on primary key tables. Three modes are supported: `ALLOW` (default for default merge engine) - allows normal delete operations; `IGNORE` - silently ignores delete requests without errors; `DISABLE` - rejects delete requests and throws explicit errors. This configuration provides system-level guarantees for some downstream pipelines (e.g., Flink Delta Join) that must not receive any delete events in the changelog of the table. For tables with `first_row` or `versioned` or `aggregation` merge engines, this option is automatically set to `IGNORE` and cannot be overridden. Note: For `aggregation` merge engine, when set to `allow`, delete operations will remove the entire record. This configuration only applicable to primary key tables. | -| table.changelog.image | Enum | FULL | Defines the changelog image mode for primary key tables. This configuration is inspired by similar settings in database systems like MySQL's `binlog_row_image` and PostgreSQL's `replica identity`. Two modes are supported: `FULL` (default) - produces both UPDATE_BEFORE and UPDATE_AFTER records for update operations, capturing complete information about updates and allowing tracking of previous values; `WAL` - does not produce UPDATE_BEFORE records. Only INSERT, UPDATE_AFTER (and DELETE if allowed) records are emitted. When WAL mode is enabled with default merge engine (no merge engine configured) and full row updates (not partial update), an optimization is applied to skip looking up old values, and in this case INSERT operations are converted to UPDATE_AFTER events. This mode reduces storage and transmission costs but loses the ability to track previous values. Only applicable to primary key tables. | - +| Option | Type | Default | Description | +|-----------------------------------------|----------|-------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| bucket.num | int | The bucket number of Fluss cluster. | The number of buckets of a Fluss table. | +| bucket.key | String | (None) | Specific the distribution policy of the Fluss table. Data will be distributed to each bucket according to the hash value of bucket-key (It must be a subset of the primary keys excluding partition keys of the primary key table). If you specify multiple fields, delimiter is `,`. If the table has a primary key and a bucket key is not specified, the bucket key will be used as primary key(excluding the partition key). If the table has no primary key and the bucket key is not specified, the data will be distributed to each bucket randomly. | +| table.log.ttl | Duration | 7 days | The time to live for log segments. The configuration controls the maximum time we will retain a log before we will delete old segments to free up space. If set to -1, the log will not be deleted. | +| table.auto-partition.enabled | Boolean | false | Whether enable auto partition for the table. Disable by default. When auto partition is enabled, the partitions of the table will be created automatically. | +| table.auto-partition.key | String | (None) | This configuration defines the time-based partition key to be used for auto-partitioning when a table is partitioned with multiple keys. Auto-partitioning utilizes a time-based partition key to handle partitions automatically, including creating new ones and removing outdated ones, by comparing the time value of the partition with the current system time. In the case of a table using multiple partition keys (such as a composite partitioning strategy), this feature determines which key should serve as the primary time dimension for making auto-partitioning decisions. And If the table has only one partition key, this config is not necessary. Otherwise, it must be specified. | +| table.auto-partition.time-unit | ENUM | DAY | The time granularity for auto created partitions. The default value is `DAY`. Valid values are `HOUR`, `DAY`, `MONTH`, `QUARTER`, `YEAR`. If the value is `HOUR`, the partition format for auto created is yyyyMMddHH. If the value is `DAY`, the partition format for auto created is yyyyMMdd. If the value is `MONTH`, the partition format for auto created is yyyyMM. If the value is `QUARTER`, the partition format for auto created is yyyyQ. If the value is `YEAR`, the partition format for auto created is yyyy. | +| table.auto-partition.num-precreate | Integer | 2 | The number of partitions to pre-create for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11 and the value is configured as 3, then partitions 20241111, 20241112, 20241113 will be pre-created. If any one partition exists, it'll skip creating the partition. The default value is 2, which means 2 partitions will be pre-created. If the `table.auto-partition.time-unit` is `DAY`(default), one precreated partition is for today and another one is for tomorrow. For a partition table with multiple partition keys, pre-create is unsupported and will be set to 0 automatically when creating table if it is not explicitly specified. | +| table.auto-partition.num-retention | Integer | 7 | The number of history partitions to retain for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11, time-unit is DAY, and the value is configured as 3, then the history partitions 20241108, 20241109, 20241110 will be retained. The partitions earlier than 20241108 will be deleted. The default value is 7, which means that 7 partitions will be retained. | +| table.auto-partition.time-zone | String | the system time zone | The time zone for auto partitions, which is by default the same as the system time zone. | +| table.replication.factor | Integer | (None) | The replication factor for the log of the new table. When it's not set, Fluss will use the cluster's default replication factor configured by default.replication.factor. It should be a positive number and not larger than the number of tablet servers in the Fluss cluster. A value larger than the number of tablet servers in Fluss cluster will result in an error when the new table is created. | +| table.log.format | Enum | ARROW | The format of the log records in log store. The default value is `ARROW`. The supported formats are `ARROW`, `INDEXED` and `COMPACTED`. | +| table.log.arrow.compression.type | Enum | ZSTD | The compression type of the log records if the log format is set to `ARROW`. The candidate compression type is `NONE`, `LZ4_FRAME`, `ZSTD`. The default value is `ZSTD`. | +| table.log.arrow.compression.zstd.level | Integer | 3 | The compression level of the log records if the log format is set to `ARROW` and the compression type is set to `ZSTD`. The valid range is 1 to 22. The default value is 3. | +| table.kv.format | Enum | COMPACTED | The format of the kv records in kv store. The default value is `COMPACTED`. The supported formats are `COMPACTED` and `INDEXED`. | +| table.log.tiered.local-segments | Integer | 2 | The number of log segments to retain in local for each table when log tiered storage is enabled. It must be greater that 0. The default is 2. | +| table.datalake.enabled | Boolean | false | Whether enable lakehouse storage for the table. Disabled by default. When this option is set to ture and the datalake tiering service is up, the table will be tiered and compacted into datalake format stored on lakehouse storage. | +| table.datalake.format | Enum | (None) | The data lake format of the table specifies the tiered Lakehouse storage format. Currently, supported formats are `paimon`, `iceberg`, and `lance`. In the future, more kinds of data lake format will be supported, such as DeltaLake or Hudi. Once the `table.datalake.format` property is configured, Fluss adopts the key encoding and bucketing strategy used by the corresponding data lake format. This ensures consistency in key encoding and bucketing, enabling seamless **Union Read** functionality across Fluss and Lakehouse. The `table.datalake.format` can be pre-defined before enabling `table.datalake.enabled`. This allows the data lake feature to be dynamically enabled on the table without requiring table recreation. If `table.datalake.format` is not explicitly set during table creation, the table will default to the format specified by the `datalake.format` configuration in the Fluss cluster. | +| table.datalake.freshness | Duration | 3min | It defines the maximum amount of time that the datalake table's content should lag behind updates to the Fluss table. Based on this target freshness, the Fluss service automatically moves data from the Fluss table and updates to the datalake table, so that the data in the datalake table is kept up to date within this target. If the data does not need to be as fresh, you can specify a longer target freshness time to reduce costs. | +| table.datalake.auto-compaction | Boolean | false | If true, compaction will be triggered automatically when tiering service writes to the datalake. It is disabled by default. | +| table.datalake.auto-expire-snapshot | Boolean | false | If true, snapshot expiration will be triggered automatically when tiering service commits to the datalake. It is disabled by default. | +| table.merge-engine | Enum | (None) | Defines the merge engine for the primary key table. By default, primary key table uses the [default merge engine(last_row)](table-design/merge-engines/default.md). It also supports two merge engines are `first_row`, `versioned` and `aggregation`. The [first_row merge engine](table-design/merge-engines/first-row.md) will keep the first row of the same primary key. The [versioned merge engine](table-design/merge-engines/versioned.md) will keep the row with the largest version of the same primary key. The `aggregation` merge engine will aggregate rows with the same primary key using field-level aggregate functions. | +| table.merge-engine.versioned.ver-column | String | (None) | The column name of the version column for the `versioned` merge engine. If the merge engine is set to `versioned`, the version column must be set. | +| table.delete.behavior | Enum | ALLOW | Controls the behavior of delete operations on primary key tables. Three modes are supported: `ALLOW` (default for default merge engine) - allows normal delete operations; `IGNORE` - silently ignores delete requests without errors; `DISABLE` - rejects delete requests and throws explicit errors. This configuration provides system-level guarantees for some downstream pipelines (e.g., Flink Delta Join) that must not receive any delete events in the changelog of the table. For tables with `first_row` or `versioned` or `aggregation` merge engines, this option is automatically set to `IGNORE` and cannot be overridden. Note: For `aggregation` merge engine, when set to `allow`, delete operations will remove the entire record. This configuration only applicable to primary key tables. | +| table.changelog.image | Enum | FULL | Defines the changelog image mode for primary key tables. This configuration is inspired by similar settings in database systems like MySQL's `binlog_row_image` and PostgreSQL's `replica identity`. Two modes are supported: `FULL` (default) - produces both UPDATE_BEFORE and UPDATE_AFTER records for update operations, capturing complete information about updates and allowing tracking of previous values; `WAL` - does not produce UPDATE_BEFORE records. Only INSERT, UPDATE_AFTER (and DELETE if allowed) records are emitted. When WAL mode is enabled, the default merge engine is used (no merge engine configured), updates are full row updates (not partial update), and there is no auto-increment column, an optimization is applied to skip looking up old values, and in this case INSERT operations are converted to UPDATE_AFTER events. This mode reduces storage and transmission costs but loses the ability to track previous values. Only applicable to primary key tables. | +| table.auto-increment.fields | String | (None) | Defines the auto increment columns. The auto increment column can only be used in primary-key table. With an auto increment column in the table, whenever a new row is inserted into the table, the new row will be assigned with the next available value from the auto-increment sequence. The auto increment column can only be used in primary-key table. The data type of the auto increment column must be INT or BIGINT. Currently a table can have only one auto-increment column. Adding an auto increment column to an existing table is not supported. | +| table.auto-inc.batch-size | Long | 100000L | The batch size of auto-increment IDs fetched from the distributed counter each time. This value determines the length of the locally cached ID segment. Default: 100000. A larger batch size may cause significant auto-increment ID gaps, especially when unused cached ID segments are discarded due to TabletServer restarts or abnormal terminations. Conversely, a smaller batch size increases the frequency of ID fetch requests to the distributed counter, introducing extra network overhead and reducing write throughput and performance. | ## Read Options