Skip to content

[VL] Support multiple segments per partition in columnar shuffle#11722

Open
guowangy wants to merge 23 commits intoapache:mainfrom
guowangy:partition-multi-segments
Open

[VL] Support multiple segments per partition in columnar shuffle#11722
guowangy wants to merge 23 commits intoapache:mainfrom
guowangy:partition-multi-segments

Conversation

@guowangy
Copy link

@guowangy guowangy commented Mar 9, 2026

What changes are proposed in this pull request?

Introduces multi-segment-per-partition support in the Velox backend columnar shuffle writer, enabling incremental flushing of partition data to the final data file during processing — reducing peak memory usage without requiring full in-memory buffering or temporary spill files. The implementation can reduce total latency of TPC-H(SF6T) by ~16% using sort-based shuffle with low memory capacity in 2-socket Xeon 6960P system.

New index file format (ColumnarIndexShuffleBlockResolver)

Extends IndexShuffleBlockResolver with a new index format supporting multiple (offset, length) segments per partition:

[Partition Index: (N+1) × 8-byte big-endian offsets]
[Segment Data: per-partition list of (data_offset, length) pairs, each 8 bytes]
[1-byte end marker]  ← distinguishes from legacy format (size always multiple of 8)
image

ColumnarShuffleManager now uses this resolver. Multi-segment mode activates only when external shuffle service, push-based shuffle, and dictionary encoding are all disabled (dictionary encoding requires all-batches-complete before writing).

New I/O abstractions

  • FileSegmentsInputStreamInputStream over non-contiguous (offset, size) file segments; supports zero-copy native reads via read(destAddress, maxSize)
  • FileSegmentsManagedBufferManagedBuffer backed by discontiguous segments; supports nioByteBuffer(), createInputStream(), convertToNetty()
  • DiscontiguousFileRegion — Netty FileRegion mapping a logical range to multiple physical segments for zero-copy network transfer
  • LowCopyFileSegmentsJniByteInputStream — zero-copy JNI wrapper over FileSegmentsInputStream; wired into JniByteInputStreams.create()

C++ LocalPartitionWriter changes

  • usePartitionMultipleSegments_ flag + partitionSegments_ vector tracking (start, length) per partition
  • flushCachedPayloads() — incremental flush after each hashEvict
  • writeMemoryPayload() — direct write to final data file during sortEvict
  • writeIndexFile() — serializes the new index at stop time
  • PayloadCache::writeIncremental() — flushes completed (non-active) partitions without touching the in-use partition

JNI/JVM wiring

LocalPartitionWriterJniWrapper and JniWrapper.cc accept a new optional indexFile parameter; ColumnarShuffleWriter passes the temp index file path when multi-segment mode is active.

How was this patch tested?

New unit test suites:

  • ColumnarIndexShuffleBlockResolverSuite — index format read/write, format detection, multi-segment block lookup
  • FileSegmentsInputStreamSuite — sequential reads, multi-segment traversal, skip, zero-copy native reads
  • FileSegmentsManagedBufferSuitenioByteBuffer, createInputStream, convertToNetty, EOF and mmap edge cases
  • DiscontiguousFileRegionSuite — Netty transfer across discontiguous segments, lazy open
  • LowCopyFileSegmentsJniByteInputStreamTest — JNI wrapper correctness for ByteInputStream

Was this patch authored or co-authored using generative AI tooling?

@github-actions github-actions bot added CORE works for Gluten Core VELOX labels Mar 9, 2026
@github-actions
Copy link

github-actions bot commented Mar 9, 2026

Run Gluten Clickhouse CI on x86

@zhouyuan zhouyuan requested a review from marin-ma March 10, 2026 15:43
Copy link
Contributor

@marin-ma marin-ma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guowangy Thanks for contributing this feature. Please check my comments below.

#endif
}

arrow::Status LocalPartitionWriter::writeIndexFile() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some c++ unit tests for the multi-segment partition write?


if (usePartitionMultipleSegments_) {
// If multiple segments per partition is enabled, write directly to the final data file.
RETURN_NOT_OK(writeMemoryPayload(partitionId, std::move(inMemoryPayload)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain a bit more on how this can reduce the memory usage? Looks like the memory is still only get reclaimed by OOM and spilling.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reduce memory usage does not apply to sortEvict since it is usually triggered at spill.
But it is applicable for hashEvict because payloads don't need cache to memory.

RETURN_NOT_OK(payloadCache_->cache(partitionId, std::move(payload)));
}
if (usePartitionMultipleSegments_) {
RETURN_NOT_OK(flushCachedPayloads());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hashEvict is not only called for spilling. When the evictType is kCache, then it try to cache as much payload in memory as possible to reduce spilling.

And when the evitType is kSpill, the data will be written to a spilled data file. Two evict types can exist in the same job. Is evictType == kSpill being properly handled for multi-segments write?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, evictType == kSpill is properly handled.
Because when usePartitionMultipleSegments_ enabled, the cache mechanism of payloadCache_ is not used. Payloads will directly flush into disk; thus, we don't need to distinguish between kCache and kSpill.

@marin-ma
Copy link
Contributor

The implementation can reduce total latency of TPC-H(SF6T) by ~16% using sort-based shuffle with low memory capacity in 2-socket Xeon 6960P system.

Can you explain where this improvement mainly comes from?

Currently we follow the same file layout as vanilla spark to have each partition output contiguous. I think one major benefit for this design is to reduce small random disk IO from the shuffle reader side. If memory is tight then the spill will be triggered more frequently, and it will be more likely to produce small output blocks for each partition. In this case this design will not be IO friendly.

@guowangy
Copy link
Author

The implementation can reduce total latency of TPC-H(SF6T) by ~16% using sort-based shuffle with low memory capacity in 2-socket Xeon 6960P system.

Can you explain where this improvement mainly comes from?

Currently we follow the same file layout as vanilla spark to have each partition output contiguous. I think one major benefit for this design is to reduce small random disk IO from the shuffle reader side. If memory is tight then the spill will be triggered more frequently, and it will be more likely to produce small output blocks for each partition. In this case this design will not be IO friendly.

For existing design, if spill happens, interim blocks of partition data will be saved to the disk; at the end of the shuffle write, these interim data will be reloaded from disk and saved as final partition data. This optimization is trying to avoid such interim write/reload operations, when spill happens, data will be directly saved as final partition blocks.

As for IO friendly, for existing design, if spill is triggered frequently, there will be also many small interim blocks saved to disk, it's not IO friendly when reload them from disk and packed as final partition data. With this PR, it does not make IO friendly worse, just move such situation from shuffle writer to reader.

@github-actions
Copy link

Run Gluten Clickhouse CI on x86

@github-actions github-actions bot added the DOCS label Mar 11, 2026
@marin-ma
Copy link
Contributor

@guowangy I draw a graph based on my understanding. Please correct me if I'm wrong: The existing design only has one random I/O access to one mapper output per reducer, but the new design has more random I/O accesses when reading all the segments from one mapper per reducer.

image

@guowangy
Copy link
Author

guowangy commented Mar 11, 2026

@guowangy I draw a graph based on my understanding. Please correct me if I'm wrong: The existing design only has one random I/O access to one mapper output per reducer, but the new design has more random I/O accesses when reading all the segments from one mapper per reducer.

Yes, for shuffle reader, that's true.

@marin-ma
Copy link
Contributor

@guowangy In general, random I/O is considered a bottleneck in shuffle, and that's why there are so many remote shuffle service projects and solutions like celeborn, uniffle are aimed at. The remote shuffle service usually coalesce the shuffle outputs from mapper side to reduce the random IO access. However, the design in this PR seems to go in the opposite direction, since it may introduce more random I/O during reads.

Directly writing the segments to the data file would make the partition writer logic simpler, but we intentionally didn't choose that approach based on the above consideration. I'm not sure if your test is based on single node or on a cluster. If it's on single node and disk IO is not bottleneck, then the solution may not be practical in real use case.

Besides, based on our experience, external shuffle service is usually enabled in real production environments because it provides better stability when executor process is down, and it's more like a must-have feature that the shuffle framework should support.

@marin-ma
Copy link
Contributor

cc: @FelixYBW

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CORE works for Gluten Core DOCS VELOX

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants