Skip to content

Conversation

@YannByron
Copy link
Contributor

Purpose

to support spark batch write.

Linked issue: close #xxx

Brief change log

Tests

org.apache.fluss.spark.row.SparkAsFlussRowTest
org.apache.fluss.spark.row.SparkAsFlussArrayTest
org.apache.fluss.spark.SparkWriteTest

API and Format

Documentation

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @YannByron , I left some comments. Besides, I pushed a commit to improve the javadoc a bit.

* representation (see {@link TimestampNtz}).
*/
override def getTimestampNtz(pos: Int, precision: Int): TimestampNtz =
TimestampNtz.fromMillis(SparkDateTimeUtils.microsToMillis(arrayData.getLong(pos)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can introduce a method TimestampNtz.fromMicros like TimestampLtz.fromEpochMicros to suppoert convert from micro to TimestampNtz. Converting from micro to millis will lose the nano precisions.

* representation (see {@link TimestampNtz}).
*/
override def getTimestampNtz(pos: Int, precision: Int): TimestampNtz =
TimestampNtz.fromMillis(SparkDateTimeUtils.microsToMillis(row.getLong(pos)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

import org.apache.spark.sql.types.{ArrayType => SparkArrayType, DataType => SparkDataType, StructType}

/** Wraps a Spark [[SparkArrayData]] as a Fluss [[FlussInternalArray]]. */
class SparkAsFlussArray(arrayData: SparkArrayData, elementType: SparkDataType)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw SparkAsFlussRow extends Serializable interface, do we need to make SparkAsFlussArray also extend Serializable?

Comment on lines 95 to 100
GenericRowBuilder(4)
.setField(0, 600L)
.setField(1, 21L)
.setField(2, 601)
.setField(3, BinaryString.fromString("addr1"))
.builder(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can replace the builder with GenericRow.of(800L, 23L, 603, fromString("addr3")), which is more concise.

Comment on lines 216 to 222
"+U",
GenericRowBuilder(4)
.setField(0, 800L)
.setField(1, 230L)
.setField(2, 603)
.setField(3, BinaryString.fromString("addr3"))
.builder()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move this +U near after the corresponding -U message

.builder())
)
assertThat(flussRows2.length).isEqualTo(4)
assertThat(flussRows2).containsAll(expectRows2.toIterable.asJava)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we set the bucket number to 1, so the changelog is in order globally, and we can assert .containsExactlyElementsOf here to also check the changelog order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. But Spark will start two tasks to write multiple records, and there is no way to guarantee the order of these change data between records. So to achieve this goal, I only insert one piece of data at a time to validate the order of -U and +U using containsExactlyElementsOf.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, then the original code makes sense to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Let revert the latest change to the original one about this.

Comment on lines 81 to 86
writer.append(flussRow.replace(record)).whenComplete {
(_, exception) =>
{
if (exception != null) {
// logError("Exception occurs while append row to fluss.", exception);
throw new RuntimeException("Failed to append record", exception)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We must not throw exceptions directly in the completion callback, as they won’t propagate to the Spark writer and may be silently ignored.

Instead, we should capture any exception in a volatile field (e.g., asyncWriterException) within the Spark writer. Then, we can expose a checkAsyncException() method that throws the captured exception if it’s non-null.

This check should be invoked:

  • At the beginning of DataWriter#write, to catch failures from prior async operations before processing new records, and
  • And after writer.flush() in DataWriter#commit, to ensure any failure during flush or finalization is surfaced during commit.

This pattern ensures async errors are properly reported through Spark’s writer lifecycle. You can take FlinkSinkWriter as an example.

Comment on lines 107 to 108
logError("Exception occurs while upsert row to fluss.", exception);
throw new RuntimeException("Failed to upsert record", exception)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. and we can move the logging to the checkAsyncException() method.

* number is the number of microseconds before {@code 1970-01-01 00:00:00}
*/
public static TimestampNtz fromMicros(long microseconds) {
return new TimestampNtz(Math.floorDiv(microseconds, MICROS_PER_MILLIS), 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not convert the microsecond component to nanoseconds? From my perspective, using zero would actually lose precision. The method org.apache.fluss.row.TimestampLtz#fromEpochMicros intentionally preserves the full microsecond resolution, maybe we can use the same implementation here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I will modify this.

@wuchong wuchong merged commit 4e49f2d into apache:main Jan 4, 2026
6 checks passed
@wuchong wuchong linked an issue Jan 4, 2026 that may be closed by this pull request
2 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Support batch write of spark

2 participants