-
Notifications
You must be signed in to change notification settings - Fork 267
Fix BloomFilter buffer incompatibility between Spark and Comet #3003
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fix BloomFilter buffer incompatibility between Spark and Comet #3003
Conversation
Handle Spark's full serialization format (12-byte header + bits) in merge_filter() to support Spark partial / Comet final execution. The fix automatically detects the format and extracts bits data accordingly. Fixes apache#2889
|
Thanks @Shekharrajak. Looks like you need to run |
Done. Please help in triggering the workflow. Thanks! |
|
@Shekharrajak there are compilation errors |
|
@andygrove , now it is looking fine locally. Do we have a way to run all the workflow checks to run locally so that we will be make sure everything is fine , before running the workflow in GitHub ? |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3003 +/- ##
============================================
- Coverage 56.12% 54.57% -1.56%
- Complexity 976 1261 +285
============================================
Files 119 167 +48
Lines 11743 15556 +3813
Branches 2251 2584 +333
============================================
+ Hits 6591 8490 +1899
- Misses 4012 5844 +1832
- Partials 1140 1222 +82 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
I think in the compilation error case that should be pretty reproducible locally. I definitely recommend running |
|
In the absence of any new tests, it feels like we should be relaxing a fallback constraint in operators.scala or modifying existing tests to exercise this behavior. Otherwise I suspect we're still falling back. @andygrove do you recall where we might want to make changes to test this behavior? |
|
I think this is the condition: https://github.com/apache/datafusion-comet/blob/main/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala#L1074 |
mbutrovich
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Shekharrajak! Can you remove any relevant fallbacks and/or modify tests so we know that we're exercising this behavior?
…terAggregate merge
Can we consider adding precommit hook? |
Done, please trigger workflow . |
| // Check if the incoming bloom filter has compatible size | ||
| let incoming_bits_size = bits_end - bits_start; | ||
| if incoming_bits_size != expected_bits_size { | ||
| panic!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use CometError::Internal(String) instead of panic!? (You'll need to return a Result)
| let expected_bits_size = self.bits.byte_size(); | ||
| const SPARK_HEADER_SIZE: usize = 12; // version (4) + num_hash_functions (4) + num_words (4) | ||
|
|
||
| let bits_data = if other.len() >= SPARK_HEADER_SIZE { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be strictly greater than SPARK_HEADER_SIZE?
| let version = i32::from_be_bytes([ | ||
| other[0], other[1], other[2], other[3], | ||
| ]); | ||
| if version == SPARK_BLOOM_FILTER_VERSION_1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this sufficient to ensure that this is a spark bloom filter? Isn't there a chance the starting 4 bytes of the Comet bloom filter might match the pattern?
Handle Spark's full serialization format (12-byte header + bits) in merge_filter() to support Spark partial / Comet final execution. The fix automatically detects the format and extracts bits data accordingly.
Fixes #2889
Rationale for this change
Spark's serialize() returns full format: 12-byte header (version + numHashFunctions + numWords) + bits data
Comet's state_as_bytes() returns bits data only
When Spark partial sends full format, Comet's merge_filter() expects bits-only, causing mismatch
Ref https://github.com/apache/spark/blob/master/common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java#L99
Ref https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala#L219
Spark format: BloomFilterImpl.writeTo() (4+4 bytes) + BitArray.writeTo() (4 bytes + bits)
What changes are included in this PR?
Detects Spark format (buffer size = 12 + expected_bits_size)
Extracts bits data by skipping 12-byte header if Spark format
Returns bits as-is if Comet format
How are these changes tested?
Spark SQL test