Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.ProjectedRow;
import org.apache.fluss.row.indexed.IndexedRow;
import org.apache.fluss.types.BigIntType;
import org.apache.fluss.types.DataTypes;
Expand Down Expand Up @@ -454,6 +455,181 @@ void testInvalidPrefixLookup() throws Exception {
+ "because the lookup columns [b, a] must contain all bucket keys [a, b] in order.");
}

@Test
void testSingleBucketPutAutoIncColumnAndLookup() throws Exception {
Schema schema =
Schema.newBuilder()
.column("col1", DataTypes.STRING())
.withComment("col1 is first column")
.column("col2", DataTypes.BIGINT())
.withComment("col2 is second column, auto increment column")
.column("col3", DataTypes.STRING())
.withComment("col3 is third column")
.enableAutoIncrement("col2")
.primaryKey("col1")
.build();
TableDescriptor tableDescriptor =
TableDescriptor.builder().schema(schema).distributedBy(1, "col1").build();
// create the table
TablePath tablePath =
TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_pk_table_auto_inc");
createTable(tablePath, tableDescriptor, true);
Table autoIncTable = conn.getTable(tablePath);
Object[][] records = {
{"a", null, "batch1"},
{"b", null, "batch1"},
{"c", null, "batch1"},
{"d", null, "batch1"},
{"e", null, "batch1"},
{"d", null, "batch2"},
{"e", null, "batch2"}
};
partialUpdateRecords(new String[] {"col1", "col3"}, records, autoIncTable);

Object[][] expectedRecords = {
{"a", 0L, "batch1"},
{"b", 1L, "batch1"},
{"c", 2L, "batch1"},
{"d", 3L, "batch2"},
{"e", 4L, "batch2"}
};
verifyRecords(expectedRecords, autoIncTable, schema);

admin.alterTable(
tablePath,
Collections.singletonList(
TableChange.addColumn(
"col4",
DataTypes.INT(),
null,
TableChange.ColumnPosition.last())),
false)
.get();
Table newSchemaTable = conn.getTable(tablePath);
Schema newSchema = newSchemaTable.getTableInfo().getSchema();

// schema change case1: read new data with new schema.
Object[][] expectedRecordsWithOldSchema = {
{"a", 0L, "batch1"},
{"b", 1L, "batch1"},
{"c", 2L, "batch1"},
{"d", 3L, "batch2"},
{"e", 4L, "batch2"}
};
verifyRecords(expectedRecordsWithOldSchema, autoIncTable, schema);

// schema change case2: update new data with new schema.

Object[][] recordsWithNewSchema = {
{"a", null, "batch3", 10},
{"b", null, "batch3", 11}
};
partialUpdateRecords(
new String[] {"col1", "col3", "col4"}, recordsWithNewSchema, newSchemaTable);

// schema change case3: read data with old schema.
expectedRecordsWithOldSchema[0][2] = "batch3";
expectedRecordsWithOldSchema[1][2] = "batch3";
verifyRecords(expectedRecordsWithOldSchema, autoIncTable, schema);

// schema change case4: read data with new schema.
Object[][] expectedRecordsWithNewSchema = {
{"a", 0L, "batch3", 10},
{"b", 1L, "batch3", 11},
{"c", 2L, "batch1", null},
{"d", 3L, "batch2", null},
{"e", 4L, "batch2", null}
};
verifyRecords(expectedRecordsWithNewSchema, newSchemaTable, newSchema);

// kill and restart all tablet server
for (int i = 0; i < 3; i++) {
FLUSS_CLUSTER_EXTENSION.stopTabletServer(i);
FLUSS_CLUSTER_EXTENSION.startTabletServer(i);
}

// reconnect fluss server
conn = ConnectionFactory.createConnection(clientConf);
newSchemaTable = conn.getTable(tablePath);
verifyRecords(expectedRecordsWithNewSchema, newSchemaTable, newSchema);

Object[][] restartWriteRecords = {{"f", null, "batch4", 12}};
partialUpdateRecords(
new String[] {"col1", "col3", "col4"}, restartWriteRecords, newSchemaTable);

// The auto-increment column should start from a new segment for now, and local cached
// IDs have been discarded.
Object[][] expectedRestartWriteRecords = {{"f", 100000L, "batch4", 12}};
verifyRecords(expectedRestartWriteRecords, newSchemaTable, newSchema);
}

@Test
void testPutAutoIncColumnAndLookup() throws Exception {
Schema schema =
Schema.newBuilder()
.column("dt", DataTypes.STRING())
.column("col1", DataTypes.STRING())
.withComment("col1 is first column")
.column("col2", DataTypes.BIGINT())
.withComment("col2 is second column, auto increment column")
.column("col3", DataTypes.STRING())
.withComment("col3 is third column")
.enableAutoIncrement("col2")
.primaryKey("dt", "col1")
.build();
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(schema)
.partitionedBy("dt")
.distributedBy(2, "col1")
.build();
// create the table
TablePath tablePath =
TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_pk_table_auto_inc");
createTable(tablePath, tableDescriptor, true);
Table autoIncTable = conn.getTable(tablePath);
Object[][] records = {
{"2026-01-06", "a", null, "batch1"},
{"2026-01-06", "b", null, "batch1"},
{"2026-01-06", "c", null, "batch1"},
{"2026-01-06", "d", null, "batch1"},
{"2026-01-07", "e", null, "batch1"},
{"2026-01-06", "a", null, "batch2"},
{"2026-01-06", "b", null, "batch2"},
};

// upsert records with auto inc column col1 null value
partialUpdateRecords(new String[] {"dt", "col1", "col3"}, records, autoIncTable);
Object[][] expectedRecords = {
{"2026-01-06", "a", 0L, "batch2"},
{"2026-01-06", "b", 100000L, "batch2"},
{"2026-01-06", "c", 1L, "batch1"},
{"2026-01-06", "d", 100001L, "batch1"},
{"2026-01-07", "e", 200000L, "batch1"}
};
verifyRecords(expectedRecords, autoIncTable, schema);
}

private void partialUpdateRecords(String[] targetColumns, Object[][] records, Table table) {
UpsertWriter upsertWriter = table.newUpsert().partialUpdate(targetColumns).createWriter();
for (Object[] record : records) {
upsertWriter.upsert(row(record));
// flush immediately to ensure auto-increment values are assigned sequentially across
// multiple buckets.
upsertWriter.flush();
}
}

private void verifyRecords(Object[][] records, Table table, Schema schema) throws Exception {
Lookuper lookuper = table.newLookup().createLookuper();
ProjectedRow keyRow = ProjectedRow.from(schema.getPrimaryKeyIndexes());
for (Object[] record : records) {
assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row(record))))
.withSchema(schema.getRowType())
.isEqualTo(row(record));
}
}

@Test
void testLookupForNotReadyTable() throws Exception {
TablePath tablePath = TablePath.of("test_db_1", "test_lookup_unready_table_t1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1456,7 +1456,18 @@ public class ConfigOptions {
+ "The auto increment column can only be used in primary-key table."
+ "With an auto increment column in the table, whenever a new row is inserted into the table, the new row will be assigned with the next available value from the auto-increment sequence."
+ "The auto increment column can only be used in primary-key table. The data type of the auto increment column must be INT or BIGINT."
+ "Currently a table can have only one auto-increment column.");
+ "Currently a table can have only one auto-increment column."
+ "Adding an auto increment column to an existing table is not supported.");

public static final ConfigOption<Long> TABLE_AUTO_INC_BATCH_SIZE =
key("table.auto-inc.batch-size")
.longType()
.defaultValue(100000L)
.withDescription(
"The batch size of auto-increment IDs fetched from the distributed counter each time. "
+ "This value determines the length of the locally cached ID segment. Default: 100000. "
+ "A larger batch size may cause significant auto-increment ID gaps, especially when unused cached ID segments are discarded due to TabletServer restarts or abnormal terminations. "
+ "Conversely, a smaller batch size increases the frequency of ID fetch requests to the distributed counter, introducing extra network overhead and reducing write throughput and performance.");

public static final ConfigOption<ChangelogImage> TABLE_CHANGELOG_IMAGE =
key("table.changelog.image")
Expand All @@ -1468,7 +1479,7 @@ public class ConfigOptions {
+ "The supported modes are `FULL` (default) and `WAL`. "
+ "The `FULL` mode produces both UPDATE_BEFORE and UPDATE_AFTER records for update operations, capturing complete information about updates and allowing tracking of previous values. "
+ "The `WAL` mode does not produce UPDATE_BEFORE records. Only INSERT, UPDATE_AFTER (and DELETE if allowed) records are emitted. "
+ "When WAL mode is enabled with default merge engine (no merge engine configured) and full row updates (not partial update), an optimization is applied to skip looking up old values, "
+ "When WAL mode is enabled, the default merge engine is used (no merge engine configured), updates are full row updates (not partial update), and there is no auto-increment column, an optimization is applied to skip looking up old values, "
+ "and in this case INSERT operations are converted to UPDATE_AFTER events. "
+ "This mode reduces storage and transmission costs but loses the ability to track previous values. "
+ "This option only affects primary key tables.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ public enum ChangelogImage {

/**
* WAL mode does not produce UPDATE_BEFORE records. Only INSERT, UPDATE_AFTER (and DELETE if
* allowed) records are emitted. When WAL mode is enabled with default merge engine (no merge
* engine configured) and full row updates (not partial update), an optimization is applied to
* skip looking up old values, and in this case INSERT operations are converted to UPDATE_AFTER
* events, similar to database WAL (Write-Ahead Log) behavior. This mode reduces storage and
* transmission costs but loses the ability to track previous values.
* allowed) records are emitted. When WAL mode is enabled, the default merge engine (no merge
* engine configured) us used, updates are full row (not partial update), and there is no
* auto-increment column, an optimization is applied to skip looking up old values, and in this
* case INSERT operations are converted to UPDATE_AFTER events, similar to database WAL
* (Write-Ahead Log) behavior. This mode reduces storage and transmission costs but loses the
* ability to track previous values.
*/
WAL;

Expand Down
18 changes: 15 additions & 3 deletions fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ public int[] getPrimaryKeyIndexes() {
.orElseGet(() -> new int[0]);
}

/** Returns the auto-increment columnIds, if any, otherwise returns an empty array. */
public int[] getAutoIncColumnIds() {
Set<String> autoIncColSet = new HashSet<>(getAutoIncrementColumnNames());
return getColumns().stream()
.filter(column -> autoIncColSet.contains(column.getName()))
.mapToInt(Column::getColumnId)
.toArray();
}

/** Returns the primary key column names, if any, otherwise returns an empty array. */
public List<String> getPrimaryKeyColumnNames() {
return getPrimaryKey().map(PrimaryKey::getColumnNames).orElse(Collections.emptyList());
Expand Down Expand Up @@ -170,6 +179,11 @@ public List<String> getColumnNames(int[] columnIndexes) {
return columnNames;
}

/** Returns the column name in given column index. */
public String getColumnName(int columnIndex) {
return columns.get(columnIndex).columnName;
}

/** Returns the indexes of the fields in the schema. */
public int[] getColumnIndexes(List<String> keyNames) {
int[] keyIndexes = new int[keyNames.size()];
Expand Down Expand Up @@ -696,9 +710,7 @@ private static List<Column> normalizeColumns(
}

// primary key and auto increment column should not nullable
if ((pkSet.contains(column.getName())
|| autoIncrementColumnNames.contains(column.getName()))
&& column.getDataType().isNullable()) {
if (pkSet.contains(column.getName()) && column.getDataType().isNullable()) {
newColumns.add(
new Column(
column.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class SchemaJsonSerdeTest extends JsonSerdeTestBase<Schema> {
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"BIGINT\"},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"STRING\"},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"TIMESTAMP_WITHOUT_TIME_ZONE\",\"precision\":6},\"comment\":\"c is third column\",\"id\":2}],\"highest_field_id\":2}";

static final String SCHEMA_JSON_4 =
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c is third column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}";
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c is third column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}";

static final Schema SCHEMA_WITH_AGG =
Schema.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1453,4 +1453,62 @@ void testWalModeWithDefaultMergeEngineAndAggregation() throws Exception {
// Collect results with timeout
assertQueryResultExactOrder(tEnv, aggQuery, expectedAggResults);
}

@Test
void testWalModeWithAutoInc() throws Exception {
// use single parallelism to make result ordering stable
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);

String tableName = "wal_mode_pk_table";
// Create a table with WAL mode and auto increment column
tEnv.executeSql(
String.format(
"create table %s ("
+ " id int not null,"
+ " auto_inc_id bigint,"
+ " amount bigint,"
+ " primary key (id) not enforced"
+ ") with ('table.changelog.image' = 'wal', 'table.auto-increment.fields'='auto_inc_id')",
tableName));

// Insert initial data
tEnv.executeSql(
String.format(
"INSERT INTO %s (id, amount) VALUES "
+ "(1, 100), "
+ "(2, 200), "
+ "(3, 150), "
+ "(4, 250)",
tableName))
.await();

// Use batch mode to update and delete records

// Upsert data, not support update/delete rows in table with auto-inc column for now.
// TODO: Support Batch Update
tEnv.executeSql(
String.format(
"INSERT INTO %s (id, amount) VALUES " + "(1, 120), " + "(3, 180)",
tableName))
.await();

List<String> expectedResults =
Arrays.asList(
"+I[1, 0, 100]",
"+I[2, 1, 200]",
"+I[3, 2, 150]",
"+I[4, 3, 250]",
"-U[1, 0, 100]",
"+U[1, 0, 120]",
"-U[3, 2, 150]",
"+U[3, 2, 180]");

// Collect results with timeout
assertQueryResultExactOrder(
tEnv,
String.format(
"SELECT id, auto_inc_id, amount FROM %s /*+ OPTIONS('scan.startup.mode' = 'earliest') */",
tableName),
expectedResults);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,12 @@ public interface SequenceIDCounter {
* @return The previous sequence ID
*/
long getAndIncrement() throws Exception;

/**
* Atomically adds the given delta to the sequence ID.
*
* @param delta The delta to add
* @return The previous sequence ID
*/
long getAndAdd(Long delta) throws Exception;
}
Loading