diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index 872fc04edbadc8..4c60a2158a0b20 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -51,10 +51,6 @@ const { Buffer } = require('buffer'); const common = require('_http_common'); const checkIsHttpToken = common._checkIsHttpToken; const checkInvalidHeaderChar = common._checkInvalidHeaderChar; -const { - defaultTriggerAsyncIdScope, - symbols: { async_id_symbol } -} = require('internal/async_hooks'); const { codes: { ERR_HTTP_HEADERS_SENT, @@ -75,6 +71,10 @@ const { } = require('internal/errors'); const { validateString } = require('internal/validators'); const { isUint8Array } = require('internal/util/types'); +const { + defaultTriggerAsyncIdScope, + symbols: { async_id_symbol } +} = require('internal/async_hooks'); const HIGH_WATER_MARK = getDefaultHighWaterMark(); const { CRLF, debug } = common; @@ -312,7 +312,6 @@ OutgoingMessage.prototype.destroy = function destroy(error) { return this; }; - // This abstract either writing directly to the socket or buffering it. OutgoingMessage.prototype._send = function _send(data, encoding, callback) { // This is a shameful hack to get the headers and first body chunk onto @@ -341,17 +340,21 @@ OutgoingMessage.prototype._send = function _send(data, encoding, callback) { OutgoingMessage.prototype._writeRaw = _writeRaw; function _writeRaw(data, encoding, callback) { const conn = this.socket; - if (conn && conn.destroyed) { - // The socket was destroyed. If we're still trying to write to it, - // then we haven't gotten the 'close' event yet. - return false; - } if (typeof encoding === 'function') { callback = encoding; encoding = null; } + if (conn?.destroyed) { + if (typeof callback === 'function') { + process.nextTick(callback, new ERR_STREAM_DESTROYED('write')); + } + // The socket was destroyed. If we're still trying to write to it, + // then we haven't gotten the 'close' event yet. + return false; + } + if (conn && conn._httpMessage === this && conn.writable) { // There might be pending data in the this.output buffer. if (this.outputData.length) { @@ -689,23 +692,6 @@ OutgoingMessage.prototype.write = function write(chunk, encoding, callback) { return ret; }; -function onError(msg, err, callback) { - const triggerAsyncId = msg.socket ? msg.socket[async_id_symbol] : undefined; - defaultTriggerAsyncIdScope(triggerAsyncId, - process.nextTick, - emitErrorNt, - msg, - err, - callback); -} - -function emitErrorNt(msg, err, callback) { - callback(err); - if (typeof msg.emit === 'function' && !msg._closed) { - msg.emit('error', err); - } -} - function write_(msg, chunk, encoding, callback, fromEnd) { if (typeof callback !== 'function') callback = nop; @@ -735,6 +721,7 @@ function write_(msg, chunk, encoding, callback, fromEnd) { } else { process.nextTick(callback, err); } + msg.destroy(err); return false; } @@ -804,62 +791,91 @@ OutgoingMessage.prototype.addTrailers = function addTrailers(headers) { } }; -function onFinish(outmsg) { - if (outmsg && outmsg.socket && outmsg.socket._hadError) return; - outmsg.emit('finish'); +function onFinish(err) { + if (err || this.socket?._hadError) return; + this.emit('finish'); +} + +function onError(msg, err, callback) { + const triggerAsyncId = msg.socket ? msg.socket[async_id_symbol] : undefined; + defaultTriggerAsyncIdScope(triggerAsyncId, + process.nextTick, + emitErrorNt, + msg, + err, + callback); } -OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { +function emitErrorNt(msg, err, callback) { + callback?.(err); + if (typeof msg.emit === 'function' && !msg._closed) { + msg.emit('error', err); + } +} + +OutgoingMessage.prototype.end = function end(chunk, encoding, cb) { if (typeof chunk === 'function') { - callback = chunk; + cb = chunk; chunk = null; encoding = null; } else if (typeof encoding === 'function') { - callback = encoding; + cb = encoding; encoding = null; } - if (chunk) { + if (chunk !== null && chunk !== undefined) { if (this.finished) { - onError(this, - new ERR_STREAM_WRITE_AFTER_END(), - typeof callback !== 'function' ? nop : callback); + onError(this, new ERR_STREAM_WRITE_AFTER_END(), cb); return this; } if (this.socket) { - this.socket.cork(); + this.socket?.cork(); } write_(this, chunk, encoding, null, true); - } else if (this.finished) { - if (typeof callback === 'function') { - if (!this.writableFinished) { - this.on('finish', callback); - } else { - callback(new ERR_STREAM_ALREADY_FINISHED('end')); + } + + let err; + if (!this.finished) { + if (!this._header) { + if (this.socket) { + this.socket.cork(); } + this._contentLength = 0; + this._implicitHeader(); } - return this; - } else if (!this._header) { - if (this.socket) { - this.socket.cork(); + + const finish = FunctionPrototypeBind(onFinish, this); + + if (this._hasBody && this.chunkedEncoding) { + this._send('0\r\n' + this._trailer + '\r\n', 'latin1', finish); + } else { + // Force a flush, HACK. + this._send('', 'latin1', finish); } - this._contentLength = 0; - this._implicitHeader(); + this.finished = true; // aka. WritableState.ended + } else if (this.writableFinished) { + err = new ERR_STREAM_ALREADY_FINISHED('end'); + } else if (this.destroyed) { + err = new ERR_STREAM_DESTROYED('end'); } - if (typeof callback === 'function') - this.once('finish', callback); - - const finish = FunctionPrototypeBind(onFinish, undefined, this); + if (typeof cb === 'function') { + if (err || this.writableFinished) { + process.nextTick(cb, err); + } else { + // TODO (fix): What if error? See kOnFinished in writable.js. + this.once('finish', cb); + } + } - if (this._hasBody && this.chunkedEncoding) { - this._send('0\r\n' + this._trailer + '\r\n', 'latin1', finish); - } else { - // Force a flush, HACK. - this._send('', 'latin1', finish); + if (err) { + if (this.socket) { + this.socket.uncork(); + } + return this; } if (this.socket) { @@ -869,14 +885,10 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { } this[kCorked] = 0; - this.finished = true; - // There is the first message on the outgoing queue, and we've sent // everything to the socket. debug('outgoing message end.'); - if (this.outputData.length === 0 && - this.socket && - this.socket._httpMessage === this) { + if (this.outputData.length === 0 && this.socket?._httpMessage === this) { this._finish(); } diff --git a/test/parallel/test-http-outgoing-socket-destroyed.js b/test/parallel/test-http-outgoing-socket-destroyed.js new file mode 100644 index 00000000000000..b60e0bcd467026 --- /dev/null +++ b/test/parallel/test-http-outgoing-socket-destroyed.js @@ -0,0 +1,61 @@ +'use strict'; + +const common = require('../common'); +const { createServer, request } = require('http'); + +{ + const server = createServer((req, res) => { + server.close(); + + req.socket.destroy(); + + res.write('hello', common.expectsError({ + code: 'ERR_STREAM_DESTROYED' + })); + }); + + server.listen(0, common.mustCall(() => { + const req = request({ + host: 'localhost', + port: server.address().port + }); + + req.on('response', common.mustNotCall()); + req.on('error', common.expectsError({ + code: 'ECONNRESET' + })); + + req.end(); + })); +} + +{ + const server = createServer((req, res) => { + res.write('hello'); + req.resume(); + + const onError = common.expectsError({ + code: 'ERR_STREAM_DESTROYED' + }); + + res.on('close', () => { + res.write('world', common.mustCall((err) => { + onError(err); + server.close(); + })); + }); + }); + + server.listen(0, common.mustCall(() => { + const req = request({ + host: 'localhost', + port: server.address().port + }); + + req.on('response', common.mustCall((res) => { + res.socket.destroy(); + })); + + req.end(); + })); +}