Skip to content

Commit 15ffa54

Browse files
committed
[SPARK-56279][CORE] Enable zero-copy sendfile for FileRegion in native Netty transports
### What changes were proposed in this pull request? This PR modifies `MessageEncoder` to emit the header `ByteBuf` and `FileRegion` as **separate objects** in the outbound message list when the body is a `FileRegion` backed by `FileSegmentManagedBuffer`, instead of wrapping them together in a `MessageWithHeader`. Previously, all messages with a body were unconditionally wrapped in `MessageWithHeader`. This caused native transports (EPOLL, KQUEUE) to fall into a generic `FileRegion.transferTo()` fallback path that copies data through user-space, bypassing the optimized `sendfile()` / `splice()` zero-copy path that Netty's native transports provide. The split is only applied when the `ManagedBuffer` is a `FileSegmentManagedBuffer`, whose `release()` is a no-op, making it safe to emit the `FileRegion` independently of write lifecycle management. Other `ManagedBuffer` types (e.g., `BlockManagerManagedBuffer`) still use the `MessageWithHeader` wrapper because they perform resource cleanup in `release()` that must be tied to `MessageWithHeader.deallocate()`. ### Why are the changes needed? When using native transports (AUTO/EPOLL on Linux), file-backed shuffle fetch performance was severely degraded compared to NIO mode. The root cause lies in how Netty's native transports dispatch `FileRegion` writes. In `AbstractEpollStreamChannel.doWriteSingle()` (and the analogous KQueue path), Netty uses an `instanceof` check to choose between two write strategies: https://github.com/netty/netty/blob/eeb5674526f0b49a142580686a5a9a7147ddadec/transport-classes-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java#L474-L493 ```java } else if (msg instanceof DefaultFileRegion) { return writeDefaultFileRegion(in, (DefaultFileRegion) msg); // → socket.sendFile() (zero-copy) } else if (msg instanceof FileRegion) { return writeFileRegion(in, (FileRegion) msg); // → region.transferTo() (user-space copy) } ``` - **`writeDefaultFileRegion()`** calls `socket.sendFile()`, which maps directly to the Linux `sendfile()` syscall — a true zero-copy path where data is transferred from the file page cache to the socket buffer entirely within the kernel, with no user-space copy. https://github.com/netty/netty/blob/eeb5674526f0b49a142580686a5a9a7147ddadec/transport-classes-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java#L367-L386 ```java private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception { final long offset = region.transferred(); final long regionCount = region.count(); if (offset >= regionCount) { in.remove(); return 0; } final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset); if (flushedAmount > 0) { in.progress(flushedAmount); if (region.transferred() >= regionCount) { in.remove(); } return 1; } else if (flushedAmount == 0) { validateFileRegion(region, offset); } return WRITE_STATUS_SNDBUF_FULL; } ``` - **`writeFileRegion()`** falls back to `region.transferTo(WritableByteChannel)`, which writes data through a `SocketWritableByteChannel` wrapper — effectively a user-space copy path. https://github.com/netty/netty/blob/eeb5674526f0b49a142580686a5a9a7147ddadec/transport-classes-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java#L402-L420 ```java private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception { if (region.transferred() >= region.count()) { in.remove(); return 0; } if (byteChannel == null) { byteChannel = new EpollSocketWritableByteChannel(); } final long flushedAmount = region.transferTo(byteChannel, region.transferred()); if (flushedAmount > 0) { in.progress(flushedAmount); if (region.transferred() >= region.count()) { in.remove(); } return 1; } return WRITE_STATUS_SNDBUF_FULL; } ``` Spark's `MessageWithHeader extends AbstractFileRegion` (not `DefaultFileRegion`). When `MessageEncoder` wraps a `DefaultFileRegion` body inside `MessageWithHeader`, the resulting object is a generic `FileRegion` from Netty's perspective. This means Netty dispatches it to the `writeFileRegion()` fallback, which calls `MessageWithHeader.transferTo()`: ```java // MessageWithHeader.java, line 121 if (body instanceof FileRegion fileRegion) { writtenBody = fileRegion.transferTo(target, totalBytesTransferred - headerLength); } ``` Here, even though the inner body is a `DefaultFileRegion`, its `transferTo()` is invoked with a `WritableByteChannel` (not a file descriptor), so the data is read from the file into a user-space buffer and then written to the socket — **the zero-copy opportunity is lost**. By emitting the `DefaultFileRegion` directly into Netty's outbound buffer (instead of wrapping it in `MessageWithHeader`), Netty's native transport recognizes it via `instanceof DefaultFileRegion` and routes it to `socket.sendFile()`, restoring the zero-copy `sendfile()` path. **Benchmark results (File-Backed Shuffle Fetch) show dramatic improvement:** | Scenario | Before (ms) | After (ms) | Improvement | |---|---|---|---| | AUTO, sequential fetch (JDK17) | 2075 | 362 | **~5.7x faster** | | AUTO, parallel fetch (JDK17) | 676 | 202 | **~3.3x faster** | | AUTO, sequential fetch (JDK21) | 2093 | 360 | **~5.8x faster** | | AUTO, parallel fetch (JDK21) | 677 | 204 | **~3.3x faster** | | AUTO, sequential fetch (JDK25) | 1918 | 411 | **~4.7x faster** | | AUTO, parallel fetch (JDK25) | 652 | 196 | **~3.3x faster** | With this fix, AUTO (EPOLL) mode now matches or exceeds NIO performance for file-backed shuffle fetches, achieving the expected ~1.0X relative performance (previously ~0.2X). ### Does this PR introduce _any_ user-facing change? No. This is an internal optimization to the Netty transport layer. Users benefit from improved shuffle fetch performance when using native transports (the default on Linux) without any configuration changes. ### How was this patch tested? - Updated `MergedBlockMetaSuccessSuite` to validate the new encoder output format (2 separate objects: `ByteBuf` header + `DefaultFileRegion`, instead of a single `MessageWithHeader`). - Re-ran `NettyTransportBenchmark` across JDK 17, JDK 21, and JDK 25 to confirm the performance improvement. Updated benchmark result files accordingly. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code 4.6 Closes #55087 from LuciferYang/SPARK-56279. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: yangjie01 <yangjie01@baidu.com>
1 parent 1f9a0c7 commit 15ffa54

File tree

5 files changed

+119
-90
lines changed

5 files changed

+119
-90
lines changed

common/network-common/src/main/java/org/apache/spark/network/protocol/MessageEncoder.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import io.netty.buffer.ByteBuf;
2323
import io.netty.channel.ChannelHandler;
2424
import io.netty.channel.ChannelHandlerContext;
25+
import io.netty.channel.FileRegion;
2526
import io.netty.handler.codec.MessageToMessageEncoder;
2627

2728
import org.apache.spark.internal.LogKeys;
2829
import org.apache.spark.internal.SparkLogger;
2930
import org.apache.spark.internal.SparkLoggerFactory;
3031
import org.apache.spark.internal.MDC;
32+
import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
3133

3234
/**
3335
* Encoder used by the server side to encode server-to-client responses.
@@ -89,9 +91,24 @@ public void encode(ChannelHandlerContext ctx, Message in, List<Object> out) thro
8991
assert header.writableBytes() == 0;
9092

9193
if (body != null) {
92-
// We transfer ownership of the reference on in.body() to MessageWithHeader.
93-
// This reference will be freed when MessageWithHeader.deallocate() is called.
94-
out.add(new MessageWithHeader(in.body(), header, body, bodyLength));
94+
if (body instanceof FileRegion && in.body() instanceof FileSegmentManagedBuffer) {
95+
// Emit header and FileRegion as separate objects so that native transports
96+
// (EPOLL, KQUEUE) can apply zero-copy sendfile/splice on the FileRegion directly.
97+
// When wrapped in MessageWithHeader, native transports fall into a generic
98+
// FileRegion.transferTo() fallback that copies data through user-space, bypassing
99+
// the optimized sendfile() path.
100+
//
101+
// This split is only safe when the ManagedBuffer is FileSegmentManagedBuffer,
102+
// whose release() is a no-op. Other ManagedBuffer types (e.g.,
103+
// BlockManagerManagedBuffer) perform resource cleanup in release() that must be
104+
// tied to the write lifecycle via MessageWithHeader.deallocate().
105+
out.add(header);
106+
out.add(body);
107+
} else {
108+
// We transfer ownership of the reference on in.body() to MessageWithHeader.
109+
// This reference will be freed when MessageWithHeader.deallocate() is called.
110+
out.add(new MessageWithHeader(in.body(), header, body, bodyLength));
111+
}
95112
} else {
96113
out.add(header);
97114
}

common/network-common/src/test/java/org/apache/spark/network/protocol/MergedBlockMetaSuccessSuite.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.netty.buffer.ByteBufAllocator;
2929
import io.netty.buffer.Unpooled;
3030
import io.netty.channel.ChannelHandlerContext;
31+
import io.netty.channel.DefaultFileRegion;
3132
import org.junit.jupiter.api.Assertions;
3233
import org.junit.jupiter.api.Test;
3334
import org.roaringbitmap.RoaringBitmap;
@@ -70,14 +71,25 @@ public void testMergedBlocksMetaEncodeDecode() throws Exception {
7071
when(context.alloc()).thenReturn(ByteBufAllocator.DEFAULT);
7172

7273
MessageEncoder.INSTANCE.encode(context, expectedMeta, out);
73-
Assertions.assertEquals(1, out.size());
74-
MessageWithHeader msgWithHeader = (MessageWithHeader) out.remove(0);
74+
// FileSegmentManagedBuffer.convertToNetty() returns a DefaultFileRegion,
75+
// so MessageEncoder splits the output into header ByteBuf + FileRegion.
76+
Assertions.assertEquals(2, out.size());
77+
Assertions.assertInstanceOf(ByteBuf.class, out.get(0));
78+
Assertions.assertInstanceOf(DefaultFileRegion.class, out.get(1));
79+
ByteBuf headerBuf = (ByteBuf) out.remove(0);
80+
DefaultFileRegion fileRegion = (DefaultFileRegion) out.remove(0);
7581

76-
ByteArrayWritableChannel writableChannel =
77-
new ByteArrayWritableChannel((int) msgWithHeader.count());
78-
while (msgWithHeader.transfered() < msgWithHeader.count()) {
79-
msgWithHeader.transferTo(writableChannel, msgWithHeader.transfered());
82+
int totalLen = headerBuf.readableBytes() + (int) fileRegion.count();
83+
ByteArrayWritableChannel writableChannel = new ByteArrayWritableChannel(totalLen);
84+
// Write header
85+
writableChannel.write(headerBuf.nioBuffer());
86+
// Write file region body
87+
fileRegion.open();
88+
while (fileRegion.transferred() < fileRegion.count()) {
89+
fileRegion.transferTo(writableChannel, fileRegion.transferred());
8090
}
91+
fileRegion.release();
92+
headerBuf.release();
8193
ByteBuf messageBuf = Unpooled.wrappedBuffer(writableChannel.getData());
8294
messageBuf.readLong(); // frame length
8395
MessageDecoder.INSTANCE.decode(mock(ChannelHandlerContext.class), messageBuf, out);

core/benchmarks/NettyTransportBenchmark-jdk21-results.txt

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ OpenJDK 64-Bit Server VM 21.0.10+7-LTS on Linux 6.17.0-1008-azure
66
AMD EPYC 7763 64-Core Processor
77
RPC Latency (1 KB): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
88
------------------------------------------------------------------------------------------------------------------------
9-
1 KB payload 493 539 31 0.0 98616.6 1.0X
9+
1 KB payload 495 512 13 0.0 99079.2 1.0X
1010

1111

1212
================================================================================================
@@ -17,7 +17,7 @@ OpenJDK 64-Bit Server VM 21.0.10+7-LTS on Linux 6.17.0-1008-azure
1717
AMD EPYC 7763 64-Core Processor
1818
RPC Latency (64 KB): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
1919
------------------------------------------------------------------------------------------------------------------------
20-
64 KB payload 737 758 18 0.0 147330.6 1.0X
20+
64 KB payload 690 715 15 0.0 138099.8 1.0X
2121

2222

2323
================================================================================================
@@ -28,7 +28,7 @@ OpenJDK 64-Bit Server VM 21.0.10+7-LTS on Linux 6.17.0-1008-azure
2828
AMD EPYC 7763 64-Core Processor
2929
RPC Latency (1 MB): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
3030
------------------------------------------------------------------------------------------------------------------------
31-
1 MB payload 829 899 90 0.0 828572.3 1.0X
31+
1 MB payload 884 944 57 0.0 883906.3 1.0X
3232

3333

3434
================================================================================================
@@ -39,7 +39,7 @@ OpenJDK 64-Bit Server VM 21.0.10+7-LTS on Linux 6.17.0-1008-azure
3939
AMD EPYC 7763 64-Core Processor
4040
RPC Latency (16 MB): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
4141
------------------------------------------------------------------------------------------------------------------------
42-
16 MB payload 1271 1351 72 0.0 12713142.1 1.0X
42+
16 MB payload 1275 1303 22 0.0 12752225.6 1.0X
4343

4444

4545
================================================================================================
@@ -50,10 +50,10 @@ OpenJDK 64-Bit Server VM 21.0.10+7-LTS on Linux 6.17.0-1008-azure
5050
AMD EPYC 7763 64-Core Processor
5151
Concurrent RPC Throughput: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
5252
------------------------------------------------------------------------------------------------------------------------
53-
1 client(s) 1936 2030 85 0.0 96813.3 1.0X
54-
4 client(s) 726 748 32 0.0 36305.7 2.7X
55-
8 client(s) 548 550 1 0.0 27417.0 3.5X
56-
16 client(s) 445 458 14 0.0 22238.6 4.4X
53+
1 client(s) 1883 1927 46 0.0 94151.5 1.0X
54+
4 client(s) 726 742 16 0.0 36282.9 2.6X
55+
8 client(s) 523 530 9 0.0 26136.4 3.6X
56+
16 client(s) 424 437 12 0.0 21221.7 4.4X
5757

5858

5959
================================================================================================
@@ -64,8 +64,8 @@ OpenJDK 64-Bit Server VM 21.0.10+7-LTS on Linux 6.17.0-1008-azure
6464
AMD EPYC 7763 64-Core Processor
6565
IOMode Comparison: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
6666
------------------------------------------------------------------------------------------------------------------------
67-
NIO (8 clients) 792 803 19 0.0 39590.9 1.0X
68-
AUTO (8 clients) 835 851 16 0.0 41747.2 0.9X
67+
NIO (8 clients) 811 829 17 0.0 40567.8 1.0X
68+
AUTO (8 clients) 848 874 45 0.0 42380.6 1.0X
6969

7070

7171
================================================================================================
@@ -76,11 +76,11 @@ OpenJDK 64-Bit Server VM 21.0.10+7-LTS on Linux 6.17.0-1008-azure
7676
AMD EPYC 7763 64-Core Processor
7777
Server Thread Scaling: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
7878
------------------------------------------------------------------------------------------------------------------------
79-
2 server threads 481 491 12 0.0 24050.8 1.0X
80-
4 server threads 391 413 29 0.1 19537.1 1.2X
81-
8 server threads 416 434 15 0.0 20819.9 1.2X
82-
16 server threads 449 473 26 0.0 22455.0 1.1X
83-
32 server threads 461 499 33 0.0 23043.3 1.0X
79+
2 server threads 465 477 15 0.0 23261.5 1.0X
80+
4 server threads 386 397 14 0.1 19278.9 1.2X
81+
8 server threads 415 431 21 0.0 20738.6 1.1X
82+
16 server threads 435 472 34 0.0 21760.2 1.1X
83+
32 server threads 479 497 28 0.0 23925.6 1.0X
8484

8585

8686
================================================================================================
@@ -91,9 +91,9 @@ OpenJDK 64-Bit Server VM 21.0.10+7-LTS on Linux 6.17.0-1008-azure
9191
AMD EPYC 7763 64-Core Processor
9292
Multi-Connection Throughput: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
9393
------------------------------------------------------------------------------------------------------------------------
94-
1 conn(s), 4 threads 1601 1667 108 0.0 320283.2 1.0X
95-
2 conn(s), 4 threads 1189 1505 274 0.0 237803.7 1.3X
96-
4 conn(s), 4 threads 1754 1793 34 0.0 350832.7 0.9X
94+
1 conn(s), 4 threads 1853 2013 167 0.0 370521.8 1.0X
95+
2 conn(s), 4 threads 1557 1588 28 0.0 311449.1 1.2X
96+
4 conn(s), 4 threads 1756 1822 57 0.0 351214.2 1.1X
9797

9898

9999
================================================================================================
@@ -104,9 +104,9 @@ OpenJDK 64-Bit Server VM 21.0.10+7-LTS on Linux 6.17.0-1008-azure
104104
AMD EPYC 7763 64-Core Processor
105105
Async Write Throughput: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
106106
------------------------------------------------------------------------------------------------------------------------
107-
1 KB async burst 37 40 3 0.1 7439.9 1.0X
108-
64 KB async burst 145 155 13 0.0 29093.7 0.3X
109-
1 MB async burst 1539 1571 33 0.0 307765.1 0.0X
107+
1 KB async burst 34 48 21 0.1 6745.3 1.0X
108+
64 KB async burst 163 167 6 0.0 32507.6 0.2X
109+
1 MB async burst 1512 1574 75 0.0 302329.1 0.0X
110110

111111

112112
================================================================================================
@@ -117,8 +117,8 @@ OpenJDK 64-Bit Server VM 21.0.10+7-LTS on Linux 6.17.0-1008-azure
117117
AMD EPYC 7763 64-Core Processor
118118
16 MB Block Transfer: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
119119
------------------------------------------------------------------------------------------------------------------------
120-
Sequential sends 627 834 304 0.0 6267294.2 1.0X
121-
4-thread parallel sends 452 457 7 0.0 4520142.4 1.4X
120+
Sequential sends 658 1087 373 0.0 6575971.7 1.0X
121+
4-thread parallel sends 419 477 88 0.0 4187438.8 1.6X
122122

123123

124124
================================================================================================
@@ -129,9 +129,9 @@ OpenJDK 64-Bit Server VM 21.0.10+7-LTS on Linux 6.17.0-1008-azure
129129
AMD EPYC 7763 64-Core Processor
130130
File-Backed Shuffle Fetch: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
131131
------------------------------------------------------------------------------------------------------------------------
132-
NIO, sequential fetch 370 384 13 0.0 3700370.3 1.0X
133-
NIO, parallel fetch (4 clients) 205 213 7 0.0 2054174.4 1.8X
134-
AUTO, sequential fetch 2093 2189 85 0.0 20929166.7 0.2X
135-
AUTO, parallel fetch (4 clients) 677 787 113 0.0 6769621.7 0.5X
132+
NIO, sequential fetch 366 372 9 0.0 3655386.3 1.0X
133+
NIO, parallel fetch (4 clients) 200 211 11 0.0 2002026.0 1.8X
134+
AUTO, sequential fetch 360 369 13 0.0 3600497.3 1.0X
135+
AUTO, parallel fetch (4 clients) 204 206 2 0.0 2041001.7 1.8X
136136

137137

0 commit comments

Comments
 (0)