Skip to content

Conversation

@zsxwing
Copy link
Member

@zsxwing zsxwing commented Jan 26, 2017

What changes were proposed in this pull request?

Right now Netty PRC serializes RequestMessage using Java serialization, and the size of a single message (e.g., RequestMessage(..., "hello")`) is almost 1KB.

This PR optimizes it by serializing RequestMessage manually (eliminate unnecessary information from most messages, e.g., class names of RequestMessage, NettyRpcEndpointRef, ...), and reduces the above message size to 100+ bytes.

How was this patch tested?

Jenkins

I did a simple test to measure the improvement:

Before

$ bin/spark-shell --master local-cluster[1,4,1024]
...
scala> for (i <- 1 to 10) {
     |   val start = System.nanoTime
     |   val s = sc.parallelize(1 to 1000000, 10 * 1000).count()
     |   val end = System.nanoTime
     |   println(s"$i\t" + ((end - start)/1000/1000))
     | }
1       6830                                                                    
2       4353                                                                    
3       3322                                                                    
4       3107                                                                    
5       3235                                                                    
6       3139                                                                    
7       3156                                                                    
8       3166                                                                    
9       3091                                                                    
10      3029

After:

$ bin/spark-shell --master local-cluster[1,4,1024]
...
scala> for (i <- 1 to 10) {
     |   val start = System.nanoTime
     |   val s = sc.parallelize(1 to 1000000, 10 * 1000).count()
     |   val end = System.nanoTime
     |   println(s"$i\t" + ((end - start)/1000/1000))
     | }
1       6431                                                                    
2       3643                                                                    
3       2913                                                                    
4       2679                                                                    
5       2760                                                                    
6       2710                                                                    
7       2747                                                                    
8       2793                                                                    
9       2679                                                                    
10      2651  

I also captured the TCP packets for this test. Before this patch, the total size of TCP packets is ~1.5GB. After it, it reduces to ~1.2GB.

*/
private[netty] case class RequestMessage(
senderAddress: RpcAddress, receiver: NettyRpcEndpointRef, content: Any)
private[netty] class RequestMessage(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed case to make RequestMessage non-serializable to avoid using Java serialization occasionally.


@transient @volatile var client: TransportClient = _

private val _address = if (endpointAddress.rpcAddress != null) endpointAddress else null
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed _address and _name to save some bytes.

out.writeUTF(senderAddress.host)
out.writeInt(senderAddress.port)
}
val receiverAddress = receiver.endpointAddress
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Write receiver.endpointAddress rather than NettyRpcEndpointRef since we only need the address to recreate them.

@SparkQA
Copy link

SparkQA commented Jan 26, 2017

Test build #72008 has finished for PR 16706 at commit b373c10.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

* @param name Name of the endpoint.
*/
private[spark] case class RpcEndpointAddress(val rpcAddress: RpcAddress, val name: String) {
private[spark] case class RpcEndpointAddress(@Nullable rpcAddress: RpcAddress, name: String) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Nullable here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I was working on this, I found I need to read codes to understand if this is nullable. Hence, I just added this annotation to improve the document.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that we don't use this otherwise in Spark and it's not actually a JDK class. It's probably not worth using it in a handful of places only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 we could add a comment there instead of pulling in a new annotation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true. But since NettyRpcEnv already used it (added by @vanzin), I prefer to also use it in other places under the netty rpc package.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okey. I removed Nullable.

endpointAddress: RpcEndpointAddress,
@transient @volatile private var nettyEnv: NettyRpcEnv)
extends RpcEndpointRef(conf) with Serializable with Logging {
val endpointAddress: RpcEndpointAddress,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private while we're at it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

val receiver: NettyRpcEndpointRef, val content: Any) {

/** Manually serialize [[RequestMessage]] to minimize the size of bytes. */
def serialize(nettyEnv: NettyRpcEnv): ByteBuffer = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't you just want to implement the standard Java serialization mechanism here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's different. If I just implement writeObject and call Java serialization APIs to write RequestMessage, it will write the full class name RequestMessage and a serialization id which are not needed by all RPC messages.

@zsxwing
Copy link
Member Author

zsxwing commented Jan 26, 2017

cc @vanzin

Copy link
Contributor

@shivaram shivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall the change seems fine to me. However I wonder if a more general change is possible where we say do something like a 'closure cleaning' on the input or output to java serialization to reduce the class information written. But I guess we can discuss this in a separate JIRA

* @param name Name of the endpoint.
*/
private[spark] case class RpcEndpointAddress(val rpcAddress: RpcAddress, val name: String) {
private[spark] case class RpcEndpointAddress(@Nullable rpcAddress: RpcAddress, name: String) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 we could add a comment there instead of pulling in a new annotation

@zsxwing
Copy link
Member Author

zsxwing commented Jan 26, 2017

However I wonder if a more general change is possible where we say do something like a 'closure cleaning' on the input or output to java serialization to reduce the class information written.

Sounds like implementing Kryo :)

@SparkQA
Copy link

SparkQA commented Jan 26, 2017

Test build #72036 has finished for PR 16706 at commit c4b8ff0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks reasonable given the improvements. It'd be nice to have a targeted unit test for the new code in RequestMessage, though.

senderAddress: RpcAddress, receiver: NettyRpcEndpointRef, content: Any)
private[netty] class RequestMessage(
val senderAddress: RpcAddress,
val receiver: NettyRpcEndpointRef, val content: Any) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move content to next line

val senderAddress: RpcAddress,
val receiver: NettyRpcEndpointRef, val content: Any) {

/** Manually serialize [[RequestMessage]] to minimize the size of bytes. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove "of bytes".

writeRpcAddress(out, senderAddress)
writeRpcAddress(out, receiver.address)
out.writeUTF(receiver.name)
val contentBytes = nettyEnv.serialize(content)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... could you use JavaSerializerInstance.serializeStream here instead?

You avoid: extra object allocations in serialize, two copies of the serialized content in memory, and the extra copy operation below in out.write.

You could also use ObjectOutputStream directly (it implements DataOutput) but that makes it difficult to use Kryo later.

@SparkQA
Copy link

SparkQA commented Jan 27, 2017

Test build #72047 has finished for PR 16706 at commit 195adc7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 27, 2017

Test build #72065 has started for PR 16706 at commit 05e9a0c.

@zsxwing
Copy link
Member Author

zsxwing commented Jan 27, 2017

retest this please

2 similar comments
@zsxwing
Copy link
Member Author

zsxwing commented Jan 27, 2017

retest this please

@zsxwing
Copy link
Member Author

zsxwing commented Jan 27, 2017

retest this please

@SparkQA
Copy link

SparkQA commented Jan 27, 2017

Test build #72083 has finished for PR 16706 at commit 05e9a0c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Jan 27, 2017

LGTM.

@zsxwing
Copy link
Member Author

zsxwing commented Jan 27, 2017

Thanks! Merging to master.

@asfgit asfgit closed this in 21aa8c3 Jan 27, 2017
@zsxwing zsxwing deleted the rpc-opt branch January 27, 2017 23:10
cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 15, 2017
## What changes were proposed in this pull request?

Right now Netty PRC serializes `RequestMessage` using Java serialization, and the size of a single message (e.g., RequestMessage(..., "hello")`) is almost 1KB.

This PR optimizes it by serializing `RequestMessage` manually (eliminate unnecessary information from most messages, e.g., class names of `RequestMessage`, `NettyRpcEndpointRef`, ...), and reduces the above message size to 100+ bytes.

## How was this patch tested?

Jenkins

I did a simple test to measure the improvement:

Before
```
$ bin/spark-shell --master local-cluster[1,4,1024]
...
scala> for (i <- 1 to 10) {
     |   val start = System.nanoTime
     |   val s = sc.parallelize(1 to 1000000, 10 * 1000).count()
     |   val end = System.nanoTime
     |   println(s"$i\t" + ((end - start)/1000/1000))
     | }
1       6830
2       4353
3       3322
4       3107
5       3235
6       3139
7       3156
8       3166
9       3091
10      3029
```
After:
```
$ bin/spark-shell --master local-cluster[1,4,1024]
...
scala> for (i <- 1 to 10) {
     |   val start = System.nanoTime
     |   val s = sc.parallelize(1 to 1000000, 10 * 1000).count()
     |   val end = System.nanoTime
     |   println(s"$i\t" + ((end - start)/1000/1000))
     | }
1       6431
2       3643
3       2913
4       2679
5       2760
6       2710
7       2747
8       2793
9       2679
10      2651
```

I also captured the TCP packets for this test. Before this patch, the total size of TCP packets is ~1.5GB. After it, it reduces to ~1.2GB.

Author: Shixiong Zhu <[email protected]>

Closes apache#16706 from zsxwing/rpc-opt.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants