[VL] Support multiple segments per partition in columnar shuffle#11722
[VL] Support multiple segments per partition in columnar shuffle#11722guowangy wants to merge 23 commits intoapache:mainfrom
Conversation
… segments support
|
Run Gluten Clickhouse CI on x86 |
| #endif | ||
| } | ||
|
|
||
| arrow::Status LocalPartitionWriter::writeIndexFile() { |
There was a problem hiding this comment.
Can you add some c++ unit tests for the multi-segment partition write?
backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
Show resolved
Hide resolved
|
|
||
| if (usePartitionMultipleSegments_) { | ||
| // If multiple segments per partition is enabled, write directly to the final data file. | ||
| RETURN_NOT_OK(writeMemoryPayload(partitionId, std::move(inMemoryPayload))); |
There was a problem hiding this comment.
Can you explain a bit more on how this can reduce the memory usage? Looks like the memory is still only get reclaimed by OOM and spilling.
There was a problem hiding this comment.
Reduce memory usage does not apply to sortEvict since it is usually triggered at spill.
But it is applicable for hashEvict because payloads don't need cache to memory.
| RETURN_NOT_OK(payloadCache_->cache(partitionId, std::move(payload))); | ||
| } | ||
| if (usePartitionMultipleSegments_) { | ||
| RETURN_NOT_OK(flushCachedPayloads()); |
There was a problem hiding this comment.
The hashEvict is not only called for spilling. When the evictType is kCache, then it try to cache as much payload in memory as possible to reduce spilling.
And when the evitType is kSpill, the data will be written to a spilled data file. Two evict types can exist in the same job. Is evictType == kSpill being properly handled for multi-segments write?
There was a problem hiding this comment.
Yes, evictType == kSpill is properly handled.
Because when usePartitionMultipleSegments_ enabled, the cache mechanism of payloadCache_ is not used. Payloads will directly flush into disk; thus, we don't need to distinguish between kCache and kSpill.
Can you explain where this improvement mainly comes from? Currently we follow the same file layout as vanilla spark to have each partition output contiguous. I think one major benefit for this design is to reduce small random disk IO from the shuffle reader side. If memory is tight then the spill will be triggered more frequently, and it will be more likely to produce small output blocks for each partition. In this case this design will not be IO friendly. |
For existing design, if spill happens, interim blocks of partition data will be saved to the disk; at the end of the shuffle write, these interim data will be reloaded from disk and saved as final partition data. This optimization is trying to avoid such interim write/reload operations, when spill happens, data will be directly saved as final partition blocks. As for IO friendly, for existing design, if spill is triggered frequently, there will be also many small interim blocks saved to disk, it's not IO friendly when reload them from disk and packed as final partition data. With this PR, it does not make IO friendly worse, just move such situation from shuffle writer to reader. |
|
Run Gluten Clickhouse CI on x86 |
|
@guowangy I draw a graph based on my understanding. Please correct me if I'm wrong: The existing design only has one random I/O access to one mapper output per reducer, but the new design has more random I/O accesses when reading all the segments from one mapper per reducer.
|
Yes, for shuffle reader, that's true. |
|
@guowangy In general, random I/O is considered a bottleneck in shuffle, and that's why there are so many remote shuffle service projects and solutions like celeborn, uniffle are aimed at. The remote shuffle service usually coalesce the shuffle outputs from mapper side to reduce the random IO access. However, the design in this PR seems to go in the opposite direction, since it may introduce more random I/O during reads. Directly writing the segments to the data file would make the partition writer logic simpler, but we intentionally didn't choose that approach based on the above consideration. I'm not sure if your test is based on single node or on a cluster. If it's on single node and disk IO is not bottleneck, then the solution may not be practical in real use case. Besides, based on our experience, external shuffle service is usually enabled in real production environments because it provides better stability when executor process is down, and it's more like a must-have feature that the shuffle framework should support. |
|
cc: @FelixYBW |

What changes are proposed in this pull request?
Introduces multi-segment-per-partition support in the Velox backend columnar shuffle writer, enabling incremental flushing of partition data to the final data file during processing — reducing peak memory usage without requiring full in-memory buffering or temporary spill files. The implementation can reduce total latency of TPC-H(SF6T) by ~16% using sort-based shuffle with low memory capacity in 2-socket Xeon 6960P system.
New index file format (
ColumnarIndexShuffleBlockResolver)Extends
IndexShuffleBlockResolverwith a new index format supporting multiple(offset, length)segments per partition:ColumnarShuffleManagernow uses this resolver. Multi-segment mode activates only when external shuffle service, push-based shuffle, and dictionary encoding are all disabled (dictionary encoding requires all-batches-complete before writing).New I/O abstractions
FileSegmentsInputStream—InputStreamover non-contiguous(offset, size)file segments; supports zero-copy native reads viaread(destAddress, maxSize)FileSegmentsManagedBuffer—ManagedBufferbacked by discontiguous segments; supportsnioByteBuffer(),createInputStream(),convertToNetty()DiscontiguousFileRegion— NettyFileRegionmapping a logical range to multiple physical segments for zero-copy network transferLowCopyFileSegmentsJniByteInputStream— zero-copy JNI wrapper overFileSegmentsInputStream; wired intoJniByteInputStreams.create()C++
LocalPartitionWriterchangesusePartitionMultipleSegments_flag +partitionSegments_vector tracking(start, length)per partitionflushCachedPayloads()— incremental flush after eachhashEvictwriteMemoryPayload()— direct write to final data file duringsortEvictwriteIndexFile()— serializes the new index at stop timePayloadCache::writeIncremental()— flushes completed (non-active) partitions without touching the in-use partitionJNI/JVM wiring
LocalPartitionWriterJniWrapperandJniWrapper.ccaccept a new optionalindexFileparameter;ColumnarShuffleWriterpasses the temp index file path when multi-segment mode is active.How was this patch tested?
New unit test suites:
ColumnarIndexShuffleBlockResolverSuite— index format read/write, format detection, multi-segment block lookupFileSegmentsInputStreamSuite— sequential reads, multi-segment traversal, skip, zero-copy native readsFileSegmentsManagedBufferSuite—nioByteBuffer,createInputStream,convertToNetty, EOF and mmap edge casesDiscontiguousFileRegionSuite— Netty transfer across discontiguous segments, lazy openLowCopyFileSegmentsJniByteInputStreamTest— JNI wrapper correctness for ByteInputStreamWas this patch authored or co-authored using generative AI tooling?