Skip to content

Commit 465c665

Browse files
committed
[SPARK-13652][CORE] Copy ByteBuffer in sendRpcSync as it will be recycled
## What changes were proposed in this pull request? `sendRpcSync` should copy the response content because the underlying buffer will be recycled and reused. ## How was this patch tested? Jenkins unit tests. Author: Shixiong Zhu <[email protected]> Closes #11499 from zsxwing/SPARK-13652.
1 parent f6ac7c3 commit 465c665

File tree

2 files changed

+11
-2
lines changed

2 files changed

+11
-2
lines changed

common/network-common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,12 @@
2424
* failure.
2525
*/
2626
public interface RpcResponseCallback {
27-
/** Successful serialized result from server. */
27+
/**
28+
* Successful serialized result from server.
29+
*
30+
* After `onSuccess` returns, `response` will be recycled and its content will become invalid.
31+
* Please copy the content of `response` if you want to use it after `onSuccess` returns.
32+
*/
2833
void onSuccess(ByteBuffer response);
2934

3035
/** Exception either propagated from server or raised on client side. */

common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,11 @@ public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs) {
257257
sendRpc(message, new RpcResponseCallback() {
258258
@Override
259259
public void onSuccess(ByteBuffer response) {
260-
result.set(response);
260+
ByteBuffer copy = ByteBuffer.allocate(response.remaining());
261+
copy.put(response);
262+
// flip "copy" to make it readable
263+
copy.flip();
264+
result.set(copy);
261265
}
262266

263267
@Override

0 commit comments

Comments
 (0)