diff --git a/benchmark/http2/respond-with-fd.js b/benchmark/http2/respond-with-fd.js
index fa7b2fbd16b3e6..cc9992e8ca0bd5 100644
--- a/benchmark/http2/respond-with-fd.js
+++ b/benchmark/http2/respond-with-fd.js
@@ -7,7 +7,7 @@ const fs = require('fs');
const file = path.join(path.resolve(__dirname, '../fixtures'), 'alice.html');
const bench = common.createBenchmark(main, {
- requests: [100, 1000, 10000, 100000, 1000000],
+ requests: [100, 1000, 10000, 100000],
streams: [100, 200, 1000],
clients: [1, 2],
benchmarker: ['h2load']
diff --git a/doc/api/errors.md b/doc/api/errors.md
index 7d6e238f93bdaa..f5190dc52bffa9 100644
--- a/doc/api/errors.md
+++ b/doc/api/errors.md
@@ -971,7 +971,14 @@ client.
### ERR_HTTP2_SEND_FILE
An attempt was made to use the `Http2Stream.prototype.responseWithFile()` API to
-send something other than a regular file.
+send a directory.
+
+
+### ERR_HTTP2_SEND_FILE_NOSEEK
+
+An attempt was made to use the `Http2Stream.prototype.responseWithFile()` API to
+send something other than a regular file, but `offset` or `length` options were
+provided.
### ERR_HTTP2_SESSION_ERROR
diff --git a/doc/api/http2.md b/doc/api/http2.md
index 94f25377d0acbf..13aa4d1e2103d1 100644
--- a/doc/api/http2.md
+++ b/doc/api/http2.md
@@ -1223,6 +1223,11 @@ if the `getTrailers` callback attempts to set such header fields.
#### http2stream.respondWithFD(fd[, headers[, options]])
* `fd` {number} A readable file descriptor.
@@ -1313,6 +1318,11 @@ if the `getTrailers` callback attempts to set such header fields.
#### http2stream.respondWithFile(path[, headers[, options]])
* `path` {string|Buffer|URL}
diff --git a/lib/internal/errors.js b/lib/internal/errors.js
index 8ec72dd647b1b2..3d0b6cf9b39177 100644
--- a/lib/internal/errors.js
+++ b/lib/internal/errors.js
@@ -714,7 +714,9 @@ E('ERR_HTTP2_PING_LENGTH', 'HTTP2 ping payload must be 8 bytes', RangeError);
E('ERR_HTTP2_PSEUDOHEADER_NOT_ALLOWED',
'Cannot set HTTP/2 pseudo-headers', Error);
E('ERR_HTTP2_PUSH_DISABLED', 'HTTP/2 client has disabled push streams', Error);
-E('ERR_HTTP2_SEND_FILE', 'Only regular files can be sent', Error);
+E('ERR_HTTP2_SEND_FILE', 'Directories cannot be sent', Error);
+E('ERR_HTTP2_SEND_FILE_NOSEEK',
+ 'Offset or length can only be specified for regular files', Error);
E('ERR_HTTP2_SESSION_ERROR', 'Session closed with error code %s', Error);
E('ERR_HTTP2_SOCKET_BOUND',
'The socket is already bound to an Http2Session', Error);
diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js
index 2a4ef0421f7c84..7137e527df07aa 100644
--- a/lib/internal/http2/core.js
+++ b/lib/internal/http2/core.js
@@ -4,9 +4,13 @@
require('internal/util').assertCrypto();
+const { internalBinding } = require('internal/bootstrap_loaders');
const { async_id_symbol } = require('internal/async_hooks').symbols;
+const { UV_EOF } = process.binding('uv');
const http = require('http');
const binding = process.binding('http2');
+const { FileHandle } = process.binding('fs');
+const { StreamPipe } = internalBinding('stream_pipe');
const assert = require('assert');
const { Buffer } = require('buffer');
const EventEmitter = require('events');
@@ -38,6 +42,7 @@ const {
ERR_HTTP2_PING_LENGTH,
ERR_HTTP2_PUSH_DISABLED,
ERR_HTTP2_SEND_FILE,
+ ERR_HTTP2_SEND_FILE_NOSEEK,
ERR_HTTP2_SESSION_ERROR,
ERR_HTTP2_SETTINGS_CANCEL,
ERR_HTTP2_SOCKET_BOUND,
@@ -65,6 +70,7 @@ const { onServerStream,
const { utcDate } = require('internal/http');
const { promisify } = require('internal/util');
const { isArrayBufferView } = require('internal/util/types');
+const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
const { _connectionListener: httpConnectionListener } = require('http');
const { createPromise, promiseResolve } = process.binding('util');
const debug = util.debuglog('http2');
@@ -345,9 +351,7 @@ function onStreamClose(code) {
stream.end();
}
- if (state.fd !== undefined)
- tryClose(state.fd);
-
+ state.fd = -1;
// Defer destroy we actually emit end.
if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) {
// If errored or ended, we can destroy immediately.
@@ -1928,6 +1932,26 @@ function processHeaders(headers) {
return headers;
}
+function onFileCloseError(stream, err) {
+ stream.emit(err);
+}
+
+function onFileUnpipe() {
+ const stream = this.sink[kOwner];
+ if (stream.ownsFd)
+ this.source.close().catch(onFileCloseError.bind(stream));
+ else
+ this.source.releaseFD();
+}
+
+// This is only called once the pipe has returned back control, so
+// it only has to handle errors and End-of-File.
+function onPipedFileHandleRead(err) {
+ if (err < 0 && err !== UV_EOF) {
+ this.stream.close(NGHTTP2_INTERNAL_ERROR);
+ }
+}
+
function processRespondWithFD(self, fd, headers, offset = 0, length = -1,
streamOptions = 0) {
const state = self[kState];
@@ -1940,18 +1964,32 @@ function processRespondWithFD(self, fd, headers, offset = 0, length = -1,
return;
}
-
- // Close the writable side of the stream
+ // Close the writable side of the stream, but only as far as the writable
+ // stream implementation is concerned.
+ self._final = null;
self.end();
- const ret = self[kHandle].respondFD(fd, headersList,
- offset, length,
- streamOptions);
+ const ret = self[kHandle].respond(headersList, streamOptions);
if (ret < 0) {
self.destroy(new NghttpError(ret));
return;
}
+
+ defaultTriggerAsyncIdScope(self[async_id_symbol], startFilePipe,
+ self, fd, offset, length);
+}
+
+function startFilePipe(self, fd, offset, length) {
+ const handle = new FileHandle(fd, offset, length);
+ handle.onread = onPipedFileHandleRead;
+ handle.stream = self;
+
+ const pipe = new StreamPipe(handle._externalStream,
+ self[kHandle]._externalStream);
+ pipe.onunpipe = onFileUnpipe;
+ pipe.start();
+
// exact length of the file doesn't matter here, since the
// stream is closing anyway - just use 1 to signify that
// a write does exist
@@ -2008,12 +2046,21 @@ function doSendFileFD(session, options, fd, headers, streamOptions, err, stat) {
}
if (!stat.isFile()) {
- const err = new ERR_HTTP2_SEND_FILE();
- if (onError)
- onError(err);
- else
- this.destroy(err);
- return;
+ const isDirectory = stat.isDirectory();
+ if (options.offset !== undefined || options.offset > 0 ||
+ options.length !== undefined || options.length >= 0 ||
+ isDirectory) {
+ const err = isDirectory ?
+ new ERR_HTTP2_SEND_FILE() : new ERR_HTTP2_SEND_FILE_NOSEEK();
+ if (onError)
+ onError(err);
+ else
+ this.destroy(err);
+ return;
+ }
+
+ options.offset = -1;
+ options.length = -1;
}
if (this.destroyed || this.closed) {
@@ -2038,12 +2085,14 @@ function doSendFileFD(session, options, fd, headers, streamOptions, err, stat) {
return;
}
- statOptions.length =
- statOptions.length < 0 ? stat.size - (+statOptions.offset) :
- Math.min(stat.size - (+statOptions.offset),
- statOptions.length);
+ if (stat.isFile()) {
+ statOptions.length =
+ statOptions.length < 0 ? stat.size - (+statOptions.offset) :
+ Math.min(stat.size - (+statOptions.offset),
+ statOptions.length);
- headers[HTTP2_HEADER_CONTENT_LENGTH] = statOptions.length;
+ headers[HTTP2_HEADER_CONTENT_LENGTH] = statOptions.length;
+ }
processRespondWithFD(this, fd, headers,
options.offset | 0,
@@ -2270,8 +2319,9 @@ class ServerHttp2Stream extends Http2Stream {
throw new ERR_INVALID_ARG_TYPE('fd', 'number');
debug(`Http2Stream ${this[kID]} [Http2Session ` +
- `${sessionName(session[kType])}]: initiating response`);
+ `${sessionName(session[kType])}]: initiating response from fd`);
this[kUpdateTimer]();
+ this.ownsFd = false;
headers = processHeaders(headers);
const statusCode = headers[HTTP2_HEADER_STATUS] |= 0;
@@ -2333,9 +2383,9 @@ class ServerHttp2Stream extends Http2Stream {
const session = this[kSession];
debug(`Http2Stream ${this[kID]} [Http2Session ` +
- `${sessionName(session[kType])}]: initiating response`);
+ `${sessionName(session[kType])}]: initiating response from file`);
this[kUpdateTimer]();
-
+ this.ownsFd = true;
headers = processHeaders(headers);
const statusCode = headers[HTTP2_HEADER_STATUS] |= 0;
diff --git a/node.gyp b/node.gyp
index 5efe2323599cff..1b047fe9ac52f2 100644
--- a/node.gyp
+++ b/node.gyp
@@ -338,6 +338,7 @@
'src/string_decoder.cc',
'src/string_search.cc',
'src/stream_base.cc',
+ 'src/stream_pipe.cc',
'src/stream_wrap.cc',
'src/tcp_wrap.cc',
'src/timer_wrap.cc',
@@ -394,6 +395,7 @@
'src/string_decoder-inl.h',
'src/stream_base.h',
'src/stream_base-inl.h',
+ 'src/stream_pipe.h',
'src/stream_wrap.h',
'src/tracing/agent.h',
'src/tracing/node_trace_buffer.h',
diff --git a/src/async_wrap-inl.h b/src/async_wrap-inl.h
index 21b1f9cee9f0e8..c9f12333243092 100644
--- a/src/async_wrap-inl.h
+++ b/src/async_wrap-inl.h
@@ -45,6 +45,22 @@ inline double AsyncWrap::get_trigger_async_id() const {
}
+inline AsyncWrap::AsyncScope::AsyncScope(AsyncWrap* wrap)
+ : wrap_(wrap) {
+ Environment* env = wrap->env();
+ if (env->async_hooks()->fields()[Environment::AsyncHooks::kBefore] == 0)
+ return;
+ EmitBefore(env, wrap->get_async_id());
+}
+
+inline AsyncWrap::AsyncScope::~AsyncScope() {
+ Environment* env = wrap_->env();
+ if (env->async_hooks()->fields()[Environment::AsyncHooks::kAfter] == 0)
+ return;
+ EmitAfter(env, wrap_->get_async_id());
+}
+
+
inline v8::MaybeLocal AsyncWrap::MakeCallback(
const v8::Local symbol,
int argc,
diff --git a/src/async_wrap.h b/src/async_wrap.h
index b7aed5d789754b..f0689d32f3c69f 100644
--- a/src/async_wrap.h
+++ b/src/async_wrap.h
@@ -58,6 +58,7 @@ namespace node {
V(SHUTDOWNWRAP) \
V(SIGNALWRAP) \
V(STATWATCHER) \
+ V(STREAMPIPE) \
V(TCPCONNECTWRAP) \
V(TCPSERVERWRAP) \
V(TCPWRAP) \
@@ -169,6 +170,18 @@ class AsyncWrap : public BaseObject {
static void WeakCallback(const v8::WeakCallbackInfo &info);
+ // This is a simplified version of InternalCallbackScope that only runs
+ // the `before` and `after` hooks. Only use it when not actually calling
+ // back into JS; otherwise, use InternalCallbackScope.
+ class AsyncScope {
+ public:
+ explicit inline AsyncScope(AsyncWrap* wrap);
+ ~AsyncScope();
+
+ private:
+ AsyncWrap* wrap_ = nullptr;
+ };
+
private:
friend class PromiseWrap;
diff --git a/src/env-inl.h b/src/env-inl.h
index 31f2af402417dc..3fe57f808cfec7 100644
--- a/src/env-inl.h
+++ b/src/env-inl.h
@@ -469,6 +469,14 @@ inline void Environment::set_http_parser_buffer(char* buffer) {
http_parser_buffer_ = buffer;
}
+inline bool Environment::http_parser_buffer_in_use() const {
+ return http_parser_buffer_in_use_;
+}
+
+inline void Environment::set_http_parser_buffer_in_use(bool in_use) {
+ http_parser_buffer_in_use_ = in_use;
+}
+
inline http2::http2_state* Environment::http2_state() const {
return http2_state_.get();
}
@@ -484,6 +492,11 @@ Environment::fs_stats_field_array() {
return &fs_stats_field_array_;
}
+inline std::vector>&
+Environment::file_handle_read_wrap_freelist() {
+ return file_handle_read_wrap_freelist_;
+}
+
void Environment::CreateImmediate(native_immediate_callback cb,
void* data,
v8::Local obj,
diff --git a/src/env.cc b/src/env.cc
index 36a3612a780731..d361262e22ba09 100644
--- a/src/env.cc
+++ b/src/env.cc
@@ -2,6 +2,7 @@
#include "async_wrap.h"
#include "node_buffer.h"
#include "node_platform.h"
+#include "node_file.h"
#include
#include
diff --git a/src/env.h b/src/env.h
index c778e7ba112340..4b874677935edb 100644
--- a/src/env.h
+++ b/src/env.h
@@ -48,6 +48,10 @@ struct nghttp2_rcbuf;
namespace node {
+namespace fs {
+class FileHandleReadWrap;
+}
+
namespace performance {
class performance_state;
}
@@ -218,6 +222,7 @@ struct PackageConfig {
V(onstop_string, "onstop") \
V(onstreamclose_string, "onstreamclose") \
V(ontrailers_string, "ontrailers") \
+ V(onunpipe_string, "onunpipe") \
V(onwrite_string, "onwrite") \
V(openssl_error_stack, "opensslErrorStack") \
V(output_string, "output") \
@@ -229,6 +234,8 @@ struct PackageConfig {
V(pbkdf2_error_string, "PBKDF2 Error") \
V(pid_string, "pid") \
V(pipe_string, "pipe") \
+ V(pipe_target_string, "pipeTarget") \
+ V(pipe_source_string, "pipeSource") \
V(port_string, "port") \
V(preference_string, "preference") \
V(priority_string, "priority") \
@@ -251,9 +258,11 @@ struct PackageConfig {
V(session_id_string, "sessionId") \
V(shell_string, "shell") \
V(signal_string, "signal") \
+ V(sink_string, "sink") \
V(size_string, "size") \
V(sni_context_err_string, "Invalid SNI context") \
V(sni_context_string, "sni_context") \
+ V(source_string, "source") \
V(stack_string, "stack") \
V(status_string, "status") \
V(stdio_string, "stdio") \
@@ -297,6 +306,7 @@ struct PackageConfig {
V(context, v8::Context) \
V(domain_callback, v8::Function) \
V(fd_constructor_template, v8::ObjectTemplate) \
+ V(filehandlereadwrap_template, v8::ObjectTemplate) \
V(fsreqpromise_constructor_template, v8::ObjectTemplate) \
V(fdclose_constructor_template, v8::ObjectTemplate) \
V(host_import_module_dynamically_callback, v8::Function) \
@@ -319,7 +329,7 @@ struct PackageConfig {
V(script_context_constructor_template, v8::FunctionTemplate) \
V(script_data_constructor_function, v8::Function) \
V(secure_context_constructor_template, v8::FunctionTemplate) \
- V(shutdown_wrap_constructor_function, v8::Function) \
+ V(shutdown_wrap_template, v8::ObjectTemplate) \
V(tcp_constructor_template, v8::FunctionTemplate) \
V(tick_callback_function, v8::Function) \
V(timers_callback_function, v8::Function) \
@@ -328,7 +338,7 @@ struct PackageConfig {
V(udp_constructor_function, v8::Function) \
V(vm_parsing_context_symbol, v8::Symbol) \
V(url_constructor_function, v8::Function) \
- V(write_wrap_constructor_function, v8::Function) \
+ V(write_wrap_template, v8::ObjectTemplate) \
V(fs_use_promises_symbol, v8::Symbol)
class Environment;
@@ -636,12 +646,17 @@ class Environment {
inline char* http_parser_buffer() const;
inline void set_http_parser_buffer(char* buffer);
+ inline bool http_parser_buffer_in_use() const;
+ inline void set_http_parser_buffer_in_use(bool in_use);
inline http2::http2_state* http2_state() const;
inline void set_http2_state(std::unique_ptr state);
inline AliasedBuffer* fs_stats_field_array();
+ inline std::vector>&
+ file_handle_read_wrap_freelist();
+
inline performance::performance_state* performance_state();
inline std::map* performance_marks();
@@ -815,6 +830,7 @@ class Environment {
double* heap_space_statistics_buffer_ = nullptr;
char* http_parser_buffer_;
+ bool http_parser_buffer_in_use_ = false;
std::unique_ptr http2_state_;
// stat fields contains twice the number of entries because `fs.StatWatcher`
@@ -822,6 +838,9 @@ class Environment {
static const int kFsStatsFieldsLength = 2 * 14;
AliasedBuffer fs_stats_field_array_;
+ std::vector>
+ file_handle_read_wrap_freelist_;
+
struct ExitCallback {
void (*cb_)(void* arg);
void* arg_;
diff --git a/src/node_file.cc b/src/node_file.cc
index fe3b0e1383e8cb..36bb326aa51517 100644
--- a/src/node_file.cc
+++ b/src/node_file.cc
@@ -26,6 +26,7 @@
#include "node_file.h"
#include "req_wrap-inl.h"
+#include "stream_base-inl.h"
#include "string_bytes.h"
#include "string_search.h"
@@ -41,7 +42,6 @@
#endif
#include
-#include
namespace node {
@@ -115,11 +115,13 @@ using v8::Value;
// The FileHandle object wraps a file descriptor and will close it on garbage
// collection if necessary. If that happens, a process warning will be
// emitted (or a fatal exception will occur if the fd cannot be closed.)
-FileHandle::FileHandle(Environment* env, int fd)
+FileHandle::FileHandle(Environment* env, int fd, Local