- 
                Notifications
    You must be signed in to change notification settings 
- Fork 28.9k
[SPARK-19365][Core]Optimize RequestMessage serialization #16706
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -37,8 +37,8 @@ import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap | |
| import org.apache.spark.network.netty.SparkTransportConf | ||
| import org.apache.spark.network.server._ | ||
| import org.apache.spark.rpc._ | ||
| import org.apache.spark.serializer.{JavaSerializer, JavaSerializerInstance} | ||
| import org.apache.spark.util.{ThreadUtils, Utils} | ||
| import org.apache.spark.serializer.{JavaSerializer, JavaSerializerInstance, SerializationStream} | ||
| import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, ThreadUtils, Utils} | ||
|  | ||
| private[netty] class NettyRpcEnv( | ||
| val conf: SparkConf, | ||
|  | @@ -189,7 +189,7 @@ private[netty] class NettyRpcEnv( | |
| } | ||
| } else { | ||
| // Message to a remote RPC endpoint. | ||
| postToOutbox(message.receiver, OneWayOutboxMessage(serialize(message))) | ||
| postToOutbox(message.receiver, OneWayOutboxMessage(message.serialize(this))) | ||
| } | ||
| } | ||
|  | ||
|  | @@ -224,7 +224,7 @@ private[netty] class NettyRpcEnv( | |
| }(ThreadUtils.sameThread) | ||
| dispatcher.postLocalMessage(message, p) | ||
| } else { | ||
| val rpcMessage = RpcOutboxMessage(serialize(message), | ||
| val rpcMessage = RpcOutboxMessage(message.serialize(this), | ||
| onFailure, | ||
| (client, response) => onSuccess(deserialize[Any](client, response))) | ||
| postToOutbox(message.receiver, rpcMessage) | ||
|  | @@ -253,6 +253,13 @@ private[netty] class NettyRpcEnv( | |
| javaSerializerInstance.serialize(content) | ||
| } | ||
|  | ||
| /** | ||
| * Returns [[SerializationStream]] that forwards the serialized bytes to `out`. | ||
| */ | ||
| private[netty] def serializeStream(out: OutputStream): SerializationStream = { | ||
| javaSerializerInstance.serializeStream(out) | ||
| } | ||
|  | ||
| private[netty] def deserialize[T: ClassTag](client: TransportClient, bytes: ByteBuffer): T = { | ||
| NettyRpcEnv.currentClient.withValue(client) { | ||
| deserialize { () => | ||
|  | @@ -480,16 +487,13 @@ private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging { | |
| */ | ||
| private[netty] class NettyRpcEndpointRef( | ||
| @transient private val conf: SparkConf, | ||
| endpointAddress: RpcEndpointAddress, | ||
| @transient @volatile private var nettyEnv: NettyRpcEnv) | ||
| extends RpcEndpointRef(conf) with Serializable with Logging { | ||
| private val endpointAddress: RpcEndpointAddress, | ||
| @transient @volatile private var nettyEnv: NettyRpcEnv) extends RpcEndpointRef(conf) { | ||
|  | ||
| @transient @volatile var client: TransportClient = _ | ||
|  | ||
| private val _address = if (endpointAddress.rpcAddress != null) endpointAddress else null | ||
| private val _name = endpointAddress.name | ||
|  | ||
| override def address: RpcAddress = if (_address != null) _address.rpcAddress else null | ||
| override def address: RpcAddress = | ||
| if (endpointAddress.rpcAddress != null) endpointAddress.rpcAddress else null | ||
|  | ||
| private def readObject(in: ObjectInputStream): Unit = { | ||
| in.defaultReadObject() | ||
|  | @@ -501,34 +505,103 @@ private[netty] class NettyRpcEndpointRef( | |
| out.defaultWriteObject() | ||
| } | ||
|  | ||
| override def name: String = _name | ||
| override def name: String = endpointAddress.name | ||
|  | ||
| override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = { | ||
| nettyEnv.ask(RequestMessage(nettyEnv.address, this, message), timeout) | ||
| nettyEnv.ask(new RequestMessage(nettyEnv.address, this, message), timeout) | ||
| } | ||
|  | ||
| override def send(message: Any): Unit = { | ||
| require(message != null, "Message is null") | ||
| nettyEnv.send(RequestMessage(nettyEnv.address, this, message)) | ||
| nettyEnv.send(new RequestMessage(nettyEnv.address, this, message)) | ||
| } | ||
|  | ||
| override def toString: String = s"NettyRpcEndpointRef(${_address})" | ||
|  | ||
| def toURI: URI = new URI(_address.toString) | ||
| override def toString: String = s"NettyRpcEndpointRef(${endpointAddress})" | ||
|  | ||
| final override def equals(that: Any): Boolean = that match { | ||
| case other: NettyRpcEndpointRef => _address == other._address | ||
| case other: NettyRpcEndpointRef => endpointAddress == other.endpointAddress | ||
| case _ => false | ||
| } | ||
|  | ||
| final override def hashCode(): Int = if (_address == null) 0 else _address.hashCode() | ||
| final override def hashCode(): Int = | ||
| if (endpointAddress == null) 0 else endpointAddress.hashCode() | ||
| } | ||
|  | ||
| /** | ||
| * The message that is sent from the sender to the receiver. | ||
| * | ||
| * @param senderAddress the sender address. It's `null` if this message is from a client | ||
| * `NettyRpcEnv`. | ||
| * @param receiver the receiver of this message. | ||
| * @param content the message content. | ||
| */ | ||
| private[netty] case class RequestMessage( | ||
| senderAddress: RpcAddress, receiver: NettyRpcEndpointRef, content: Any) | ||
| private[netty] class RequestMessage( | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed  | ||
| val senderAddress: RpcAddress, | ||
| val receiver: NettyRpcEndpointRef, | ||
| val content: Any) { | ||
|  | ||
| /** Manually serialize [[RequestMessage]] to minimize the size. */ | ||
| def serialize(nettyEnv: NettyRpcEnv): ByteBuffer = { | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't you just want to implement the standard Java serialization mechanism here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's different. If I just implement  | ||
| val bos = new ByteBufferOutputStream() | ||
| val out = new DataOutputStream(bos) | ||
| try { | ||
| writeRpcAddress(out, senderAddress) | ||
| writeRpcAddress(out, receiver.address) | ||
| out.writeUTF(receiver.name) | ||
| val s = nettyEnv.serializeStream(out) | ||
| try { | ||
| s.writeObject(content) | ||
| } finally { | ||
| s.close() | ||
| } | ||
| } finally { | ||
| out.close() | ||
| } | ||
| bos.toByteBuffer | ||
| } | ||
|  | ||
| private def writeRpcAddress(out: DataOutputStream, rpcAddress: RpcAddress): Unit = { | ||
| if (rpcAddress == null) { | ||
| out.writeBoolean(false) | ||
| } else { | ||
| out.writeBoolean(true) | ||
| out.writeUTF(rpcAddress.host) | ||
| out.writeInt(rpcAddress.port) | ||
| } | ||
| } | ||
|  | ||
| override def toString: String = s"RequestMessage($senderAddress, $receiver, $content)" | ||
| } | ||
|  | ||
| private[netty] object RequestMessage { | ||
|  | ||
| private def readRpcAddress(in: DataInputStream): RpcAddress = { | ||
| val hasRpcAddress = in.readBoolean() | ||
| if (hasRpcAddress) { | ||
| RpcAddress(in.readUTF(), in.readInt()) | ||
| } else { | ||
| null | ||
| } | ||
| } | ||
|  | ||
| def apply(nettyEnv: NettyRpcEnv, client: TransportClient, bytes: ByteBuffer): RequestMessage = { | ||
| val bis = new ByteBufferInputStream(bytes) | ||
| val in = new DataInputStream(bis) | ||
| try { | ||
| val senderAddress = readRpcAddress(in) | ||
| val endpointAddress = RpcEndpointAddress(readRpcAddress(in), in.readUTF()) | ||
| val ref = new NettyRpcEndpointRef(nettyEnv.conf, endpointAddress, nettyEnv) | ||
| ref.client = client | ||
| new RequestMessage( | ||
| senderAddress, | ||
| ref, | ||
| // The remaining bytes in `bytes` are the message content. | ||
| nettyEnv.deserialize(client, bytes)) | ||
| } finally { | ||
| in.close() | ||
| } | ||
| } | ||
| } | ||
|  | ||
| /** | ||
| * A response that indicates some failure happens in the receiver side. | ||
|  | @@ -574,10 +647,10 @@ private[netty] class NettyRpcHandler( | |
| val addr = client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress] | ||
| assert(addr != null) | ||
| val clientAddr = RpcAddress(addr.getHostString, addr.getPort) | ||
| val requestMessage = nettyEnv.deserialize[RequestMessage](client, message) | ||
| val requestMessage = RequestMessage(nettyEnv, client, message) | ||
| if (requestMessage.senderAddress == null) { | ||
| // Create a new message with the socket address of the client as the sender. | ||
| RequestMessage(clientAddr, requestMessage.receiver, requestMessage.content) | ||
| new RequestMessage(clientAddr, requestMessage.receiver, requestMessage.content) | ||
| } else { | ||
| // The remote RpcEnv listens to some port, we should also fire a RemoteProcessConnected for | ||
| // the listening address | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
_addressand_nameto save some bytes.