-
Notifications
You must be signed in to change notification settings - Fork 474
[spark] support batch write #2277
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
edcfb7a to
b33668a
Compare
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @YannByron , I left some comments. Besides, I pushed a commit to improve the javadoc a bit.
| * representation (see {@link TimestampNtz}). | ||
| */ | ||
| override def getTimestampNtz(pos: Int, precision: Int): TimestampNtz = | ||
| TimestampNtz.fromMillis(SparkDateTimeUtils.microsToMillis(arrayData.getLong(pos))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can introduce a method TimestampNtz.fromMicros like TimestampLtz.fromEpochMicros to suppoert convert from micro to TimestampNtz. Converting from micro to millis will lose the nano precisions.
| * representation (see {@link TimestampNtz}). | ||
| */ | ||
| override def getTimestampNtz(pos: Int, precision: Int): TimestampNtz = | ||
| TimestampNtz.fromMillis(SparkDateTimeUtils.microsToMillis(row.getLong(pos))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| import org.apache.spark.sql.types.{ArrayType => SparkArrayType, DataType => SparkDataType, StructType} | ||
|
|
||
| /** Wraps a Spark [[SparkArrayData]] as a Fluss [[FlussInternalArray]]. */ | ||
| class SparkAsFlussArray(arrayData: SparkArrayData, elementType: SparkDataType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw SparkAsFlussRow extends Serializable interface, do we need to make SparkAsFlussArray also extend Serializable?
| GenericRowBuilder(4) | ||
| .setField(0, 600L) | ||
| .setField(1, 21L) | ||
| .setField(2, 601) | ||
| .setField(3, BinaryString.fromString("addr1")) | ||
| .builder(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can replace the builder with GenericRow.of(800L, 23L, 603, fromString("addr3")), which is more concise.
| "+U", | ||
| GenericRowBuilder(4) | ||
| .setField(0, 800L) | ||
| .setField(1, 230L) | ||
| .setField(2, 603) | ||
| .setField(3, BinaryString.fromString("addr3")) | ||
| .builder()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move this +U near after the corresponding -U message
| .builder()) | ||
| ) | ||
| assertThat(flussRows2.length).isEqualTo(4) | ||
| assertThat(flussRows2).containsAll(expectRows2.toIterable.asJava) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we set the bucket number to 1, so the changelog is in order globally, and we can assert .containsExactlyElementsOf here to also check the changelog order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. But Spark will start two tasks to write multiple records, and there is no way to guarantee the order of these change data between records. So to achieve this goal, I only insert one piece of data at a time to validate the order of -U and +U using containsExactlyElementsOf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, then the original code makes sense to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Let revert the latest change to the original one about this.
| writer.append(flussRow.replace(record)).whenComplete { | ||
| (_, exception) => | ||
| { | ||
| if (exception != null) { | ||
| // logError("Exception occurs while append row to fluss.", exception); | ||
| throw new RuntimeException("Failed to append record", exception) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We must not throw exceptions directly in the completion callback, as they won’t propagate to the Spark writer and may be silently ignored.
Instead, we should capture any exception in a volatile field (e.g., asyncWriterException) within the Spark writer. Then, we can expose a checkAsyncException() method that throws the captured exception if it’s non-null.
This check should be invoked:
- At the beginning of
DataWriter#write, to catch failures from prior async operations before processing new records, and - And after
writer.flush()inDataWriter#commit, to ensure any failure during flush or finalization is surfaced during commit.
This pattern ensures async errors are properly reported through Spark’s writer lifecycle. You can take FlinkSinkWriter as an example.
| logError("Exception occurs while upsert row to fluss.", exception); | ||
| throw new RuntimeException("Failed to upsert record", exception) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto. and we can move the logging to the checkAsyncException() method.
| * number is the number of microseconds before {@code 1970-01-01 00:00:00} | ||
| */ | ||
| public static TimestampNtz fromMicros(long microseconds) { | ||
| return new TimestampNtz(Math.floorDiv(microseconds, MICROS_PER_MILLIS), 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not convert the microsecond component to nanoseconds? From my perspective, using zero would actually lose precision. The method org.apache.fluss.row.TimestampLtz#fromEpochMicros intentionally preserves the full microsecond resolution, maybe we can use the same implementation here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. I will modify this.
Purpose
to support spark batch write.
Linked issue: close #xxx
Brief change log
Tests
org.apache.fluss.spark.row.SparkAsFlussRowTest
org.apache.fluss.spark.row.SparkAsFlussArrayTest
org.apache.fluss.spark.SparkWriteTest
API and Format
Documentation