diff --git a/lib/dgram.js b/lib/dgram.js index aae2f51bc86b..684e77624c6a 100644 --- a/lib/dgram.js +++ b/lib/dgram.js @@ -110,6 +110,7 @@ function Socket(type, listener) { this._bindState = BIND_STATE_UNBOUND; this.type = type; this.fd = null; // compatibility hack + this.highWaterMark = 100; // If true - UV_UDP_REUSEADDR flag will be set this._reuseAddr = options && options.reuseAddr; @@ -150,6 +151,11 @@ function replaceHandle(self, newHandle) { self._handle = newHandle; } +function cleanupAndError(self, err) { + self._sendQueue = undefined; + self.emit('error', err); +} + Socket.prototype.bind = function(port /*, address, callback*/) { var self = this; @@ -186,7 +192,7 @@ Socket.prototype.bind = function(port /*, address, callback*/) { self._handle.lookup(address, function(err, ip) { if (err) { self._bindState = BIND_STATE_UNBOUND; - self.emit('error', err); + cleanupAndError(self, err); return; } @@ -196,7 +202,7 @@ Socket.prototype.bind = function(port /*, address, callback*/) { if (cluster.isWorker && !exclusive) { cluster._getServer(self, ip, port, self.type, -1, function(err, handle) { if (err) { - self.emit('error', errnoException(err, 'bind')); + cleanupAndError(self, errnoException(err, 'bind')); self._bindState = BIND_STATE_UNBOUND; return; } @@ -219,7 +225,7 @@ Socket.prototype.bind = function(port /*, address, callback*/) { var err = self._handle.bind(ip, port || 0, flags); if (err) { - self.emit('error', errnoException(err, 'bind')); + cleanupAndError(self, errnoException(err, 'bind')); self._bindState = BIND_STATE_UNBOUND; // Todo: close? return; @@ -247,6 +253,15 @@ Socket.prototype.sendto = function(buffer, this.send(buffer, offset, length, port, address, callback); }; +function SocketSendQueueItem(buffer, offset, length, port, address, callback) { + this.buffer = buffer; + this.offset = offset; + this.length = length; + this.port = port; + this.address = address; + this.callback = callback; +} + Socket.prototype.send = function(buffer, offset, @@ -293,6 +308,16 @@ Socket.prototype.send = function(buffer, if (self._bindState == BIND_STATE_UNBOUND) self.bind(0, null); + function onListening() { + // Flush the send queue. + for (var i = 0; i < self._sendQueue.length; i++) { + var item = self._sendQueue[i]; + self.send.call(self, item.buffer, item.offset, item.length, + item.port, item.address, item.callback); + } + self._sendQueue = undefined; + } + // If the socket hasn't been bound yet, push the outbound packet onto the // send queue and send after binding is complete. if (self._bindState != BIND_STATE_BOUND) { @@ -300,21 +325,22 @@ Socket.prototype.send = function(buffer, // event handler that flushes the send queue after binding is done. if (!self._sendQueue) { self._sendQueue = []; - self.once('listening', function() { - // Flush the send queue. - for (var i = 0; i < self._sendQueue.length; i++) - self.send.apply(self, self._sendQueue[i]); - self._sendQueue = undefined; - }); + self.once('listening', onListening); + } + + if (self._sendQueue.length <= self.highWaterMark) { + self._sendQueue.push(new SocketSendQueueItem( + buffer, offset, length, port, address, callback)); } - self._sendQueue.push([buffer, offset, length, port, address, callback]); return; } - self._handle.lookup(address, function(ex, ip) { + self._handle.lookup(address, dgramSocketSendLookupCallback); + + function dgramSocketSendLookupCallback(ex, ip) { if (ex) { if (callback) callback(ex); - self.emit('error', ex); + cleanupAndError(self, ex); } else if (self._handle) { var req = { buffer: buffer, length: length }; // Keep reference alive. @@ -336,7 +362,7 @@ Socket.prototype.send = function(buffer, }); } } - }); + } }; @@ -462,7 +488,7 @@ Socket.prototype._stopReceiving = function() { function onMessage(nread, handle, buf, rinfo) { var self = handle.owner; if (nread < 0) { - return self.emit('error', errnoException(nread, 'recvmsg')); + return cleanupAndError(self, errnoException(nread, 'recvmsg')); } rinfo.size = buf.length; // compatibility self.emit('message', buf, rinfo);