-
Notifications
You must be signed in to change notification settings - Fork 265
perf: Optimize contains expression with SIMD-based scalar pattern sea… #2991
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
perf: Optimize contains expression with SIMD-based scalar pattern sea… #2991
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2991 +/- ##
============================================
+ Coverage 56.12% 59.55% +3.42%
- Complexity 976 1379 +403
============================================
Files 119 167 +48
Lines 11743 15495 +3752
Branches 2251 2568 +317
============================================
+ Hits 6591 9228 +2637
- Misses 4012 4971 +959
- Partials 1140 1296 +156 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
I tested locally and see good performance now: |
native/spark-expr/Cargo.toml
Outdated
|
|
||
| [dependencies] | ||
| arrow = { workspace = true } | ||
| arrow-string = "57.0.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already depend on arrow. Is contains not re-exported in the arrow crate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks - removed it a449297
|
Thanks @Shekharrajak this is a really nice speedup! Could you fix the clippy errors (you can probably just run |
54dc054 to
27929a3
Compare
Co-authored-by: Andy Grove <agrove@apache.org>
| * Override waitForTasksToFinish to ensure SparkContext is active before checking tasks. This | ||
| * fixes the issue where waitForTasksToFinish returns -1 when SparkContext is not active. | ||
| */ | ||
| override protected def waitForTasksToFinish(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| //! scalar path in arrow-rs. | ||
| use arrow::array::{Array, ArrayRef, AsArray, BooleanArray}; | ||
| use arrow::datatypes::DataType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fmt check error fix
| op.cmd match { | ||
| case cmd: InsertIntoHadoopFsRelationCommand => | ||
| // Skip INSERT OVERWRITE DIRECTORY operations (catalogTable is None for directory writes) | ||
| if (cmd.catalogTable.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix for error :
RROR org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand: Failed to write to directory Some(file:/__w/datafusion-comet/datafusion-comet/apache-spark/target/tmp/spark-76b62d31-5bd6-4d4b-9770-262cb08e84f3)
org.apache.spark.sql.AnalysisException: [COLUMN_ALREADY_EXISTS] The column `id` already exists. Choose another name or rename the existing column. SQLSTATE: 42711
at org.apache.spark.sql.errors.QueryCompilationErrors$.columnAlreadyExistsError(QueryCompilationErrors.scala:2700)
at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:151)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:86)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:117)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:115)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:129)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$2(QueryExecution.scala:155)
[info] - SPARK-25389 INSERT OVERWRITE LOCAL DIRECTORY ... STORED AS with duplicated names(caseSensitivity=true, format=orc) (22 milliseconds)
18:44:25.173 ERROR org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand: Failed to write to directory Some(file:/__w/datafusion-comet/datafusion-comet/apache-spark/target/tmp/spark-76ef391d-5d5f-4997-afb4-97ac714c1697)

Which issue does this PR close?
Closes #2972.
Rationale for this change
The contains expression shows poor performance in Comet (0.2X vs Spark) because DataFusion's make_scalar_function wrapper expands scalar patterns to arrays, bypassing arrow-rs's optimized scalar path.
What changes are included in this PR?
How are these changes tested?