diff --git a/benchmark/http2/respond-with-fd.js b/benchmark/http2/respond-with-fd.js index fa7b2fbd16b3e6..cc9992e8ca0bd5 100644 --- a/benchmark/http2/respond-with-fd.js +++ b/benchmark/http2/respond-with-fd.js @@ -7,7 +7,7 @@ const fs = require('fs'); const file = path.join(path.resolve(__dirname, '../fixtures'), 'alice.html'); const bench = common.createBenchmark(main, { - requests: [100, 1000, 10000, 100000, 1000000], + requests: [100, 1000, 10000, 100000], streams: [100, 200, 1000], clients: [1, 2], benchmarker: ['h2load'] diff --git a/doc/api/errors.md b/doc/api/errors.md index 7d6e238f93bdaa..f5190dc52bffa9 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -971,7 +971,14 @@ client. ### ERR_HTTP2_SEND_FILE An attempt was made to use the `Http2Stream.prototype.responseWithFile()` API to -send something other than a regular file. +send a directory. + + +### ERR_HTTP2_SEND_FILE_NOSEEK + +An attempt was made to use the `Http2Stream.prototype.responseWithFile()` API to +send something other than a regular file, but `offset` or `length` options were +provided. ### ERR_HTTP2_SESSION_ERROR diff --git a/doc/api/http2.md b/doc/api/http2.md index 94f25377d0acbf..13aa4d1e2103d1 100644 --- a/doc/api/http2.md +++ b/doc/api/http2.md @@ -1223,6 +1223,11 @@ if the `getTrailers` callback attempts to set such header fields. #### http2stream.respondWithFD(fd[, headers[, options]]) * `fd` {number} A readable file descriptor. @@ -1313,6 +1318,11 @@ if the `getTrailers` callback attempts to set such header fields. #### http2stream.respondWithFile(path[, headers[, options]]) * `path` {string|Buffer|URL} diff --git a/lib/internal/errors.js b/lib/internal/errors.js index 8ec72dd647b1b2..3d0b6cf9b39177 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -714,7 +714,9 @@ E('ERR_HTTP2_PING_LENGTH', 'HTTP2 ping payload must be 8 bytes', RangeError); E('ERR_HTTP2_PSEUDOHEADER_NOT_ALLOWED', 'Cannot set HTTP/2 pseudo-headers', Error); E('ERR_HTTP2_PUSH_DISABLED', 'HTTP/2 client has disabled push streams', Error); -E('ERR_HTTP2_SEND_FILE', 'Only regular files can be sent', Error); +E('ERR_HTTP2_SEND_FILE', 'Directories cannot be sent', Error); +E('ERR_HTTP2_SEND_FILE_NOSEEK', + 'Offset or length can only be specified for regular files', Error); E('ERR_HTTP2_SESSION_ERROR', 'Session closed with error code %s', Error); E('ERR_HTTP2_SOCKET_BOUND', 'The socket is already bound to an Http2Session', Error); diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 2a4ef0421f7c84..7137e527df07aa 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -4,9 +4,13 @@ require('internal/util').assertCrypto(); +const { internalBinding } = require('internal/bootstrap_loaders'); const { async_id_symbol } = require('internal/async_hooks').symbols; +const { UV_EOF } = process.binding('uv'); const http = require('http'); const binding = process.binding('http2'); +const { FileHandle } = process.binding('fs'); +const { StreamPipe } = internalBinding('stream_pipe'); const assert = require('assert'); const { Buffer } = require('buffer'); const EventEmitter = require('events'); @@ -38,6 +42,7 @@ const { ERR_HTTP2_PING_LENGTH, ERR_HTTP2_PUSH_DISABLED, ERR_HTTP2_SEND_FILE, + ERR_HTTP2_SEND_FILE_NOSEEK, ERR_HTTP2_SESSION_ERROR, ERR_HTTP2_SETTINGS_CANCEL, ERR_HTTP2_SOCKET_BOUND, @@ -65,6 +70,7 @@ const { onServerStream, const { utcDate } = require('internal/http'); const { promisify } = require('internal/util'); const { isArrayBufferView } = require('internal/util/types'); +const { defaultTriggerAsyncIdScope } = require('internal/async_hooks'); const { _connectionListener: httpConnectionListener } = require('http'); const { createPromise, promiseResolve } = process.binding('util'); const debug = util.debuglog('http2'); @@ -345,9 +351,7 @@ function onStreamClose(code) { stream.end(); } - if (state.fd !== undefined) - tryClose(state.fd); - + state.fd = -1; // Defer destroy we actually emit end. if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) { // If errored or ended, we can destroy immediately. @@ -1928,6 +1932,26 @@ function processHeaders(headers) { return headers; } +function onFileCloseError(stream, err) { + stream.emit(err); +} + +function onFileUnpipe() { + const stream = this.sink[kOwner]; + if (stream.ownsFd) + this.source.close().catch(onFileCloseError.bind(stream)); + else + this.source.releaseFD(); +} + +// This is only called once the pipe has returned back control, so +// it only has to handle errors and End-of-File. +function onPipedFileHandleRead(err) { + if (err < 0 && err !== UV_EOF) { + this.stream.close(NGHTTP2_INTERNAL_ERROR); + } +} + function processRespondWithFD(self, fd, headers, offset = 0, length = -1, streamOptions = 0) { const state = self[kState]; @@ -1940,18 +1964,32 @@ function processRespondWithFD(self, fd, headers, offset = 0, length = -1, return; } - - // Close the writable side of the stream + // Close the writable side of the stream, but only as far as the writable + // stream implementation is concerned. + self._final = null; self.end(); - const ret = self[kHandle].respondFD(fd, headersList, - offset, length, - streamOptions); + const ret = self[kHandle].respond(headersList, streamOptions); if (ret < 0) { self.destroy(new NghttpError(ret)); return; } + + defaultTriggerAsyncIdScope(self[async_id_symbol], startFilePipe, + self, fd, offset, length); +} + +function startFilePipe(self, fd, offset, length) { + const handle = new FileHandle(fd, offset, length); + handle.onread = onPipedFileHandleRead; + handle.stream = self; + + const pipe = new StreamPipe(handle._externalStream, + self[kHandle]._externalStream); + pipe.onunpipe = onFileUnpipe; + pipe.start(); + // exact length of the file doesn't matter here, since the // stream is closing anyway - just use 1 to signify that // a write does exist @@ -2008,12 +2046,21 @@ function doSendFileFD(session, options, fd, headers, streamOptions, err, stat) { } if (!stat.isFile()) { - const err = new ERR_HTTP2_SEND_FILE(); - if (onError) - onError(err); - else - this.destroy(err); - return; + const isDirectory = stat.isDirectory(); + if (options.offset !== undefined || options.offset > 0 || + options.length !== undefined || options.length >= 0 || + isDirectory) { + const err = isDirectory ? + new ERR_HTTP2_SEND_FILE() : new ERR_HTTP2_SEND_FILE_NOSEEK(); + if (onError) + onError(err); + else + this.destroy(err); + return; + } + + options.offset = -1; + options.length = -1; } if (this.destroyed || this.closed) { @@ -2038,12 +2085,14 @@ function doSendFileFD(session, options, fd, headers, streamOptions, err, stat) { return; } - statOptions.length = - statOptions.length < 0 ? stat.size - (+statOptions.offset) : - Math.min(stat.size - (+statOptions.offset), - statOptions.length); + if (stat.isFile()) { + statOptions.length = + statOptions.length < 0 ? stat.size - (+statOptions.offset) : + Math.min(stat.size - (+statOptions.offset), + statOptions.length); - headers[HTTP2_HEADER_CONTENT_LENGTH] = statOptions.length; + headers[HTTP2_HEADER_CONTENT_LENGTH] = statOptions.length; + } processRespondWithFD(this, fd, headers, options.offset | 0, @@ -2270,8 +2319,9 @@ class ServerHttp2Stream extends Http2Stream { throw new ERR_INVALID_ARG_TYPE('fd', 'number'); debug(`Http2Stream ${this[kID]} [Http2Session ` + - `${sessionName(session[kType])}]: initiating response`); + `${sessionName(session[kType])}]: initiating response from fd`); this[kUpdateTimer](); + this.ownsFd = false; headers = processHeaders(headers); const statusCode = headers[HTTP2_HEADER_STATUS] |= 0; @@ -2333,9 +2383,9 @@ class ServerHttp2Stream extends Http2Stream { const session = this[kSession]; debug(`Http2Stream ${this[kID]} [Http2Session ` + - `${sessionName(session[kType])}]: initiating response`); + `${sessionName(session[kType])}]: initiating response from file`); this[kUpdateTimer](); - + this.ownsFd = true; headers = processHeaders(headers); const statusCode = headers[HTTP2_HEADER_STATUS] |= 0; diff --git a/node.gyp b/node.gyp index 5efe2323599cff..1b047fe9ac52f2 100644 --- a/node.gyp +++ b/node.gyp @@ -338,6 +338,7 @@ 'src/string_decoder.cc', 'src/string_search.cc', 'src/stream_base.cc', + 'src/stream_pipe.cc', 'src/stream_wrap.cc', 'src/tcp_wrap.cc', 'src/timer_wrap.cc', @@ -394,6 +395,7 @@ 'src/string_decoder-inl.h', 'src/stream_base.h', 'src/stream_base-inl.h', + 'src/stream_pipe.h', 'src/stream_wrap.h', 'src/tracing/agent.h', 'src/tracing/node_trace_buffer.h', diff --git a/src/async_wrap-inl.h b/src/async_wrap-inl.h index 21b1f9cee9f0e8..c9f12333243092 100644 --- a/src/async_wrap-inl.h +++ b/src/async_wrap-inl.h @@ -45,6 +45,22 @@ inline double AsyncWrap::get_trigger_async_id() const { } +inline AsyncWrap::AsyncScope::AsyncScope(AsyncWrap* wrap) + : wrap_(wrap) { + Environment* env = wrap->env(); + if (env->async_hooks()->fields()[Environment::AsyncHooks::kBefore] == 0) + return; + EmitBefore(env, wrap->get_async_id()); +} + +inline AsyncWrap::AsyncScope::~AsyncScope() { + Environment* env = wrap_->env(); + if (env->async_hooks()->fields()[Environment::AsyncHooks::kAfter] == 0) + return; + EmitAfter(env, wrap_->get_async_id()); +} + + inline v8::MaybeLocal AsyncWrap::MakeCallback( const v8::Local symbol, int argc, diff --git a/src/async_wrap.h b/src/async_wrap.h index b7aed5d789754b..f0689d32f3c69f 100644 --- a/src/async_wrap.h +++ b/src/async_wrap.h @@ -58,6 +58,7 @@ namespace node { V(SHUTDOWNWRAP) \ V(SIGNALWRAP) \ V(STATWATCHER) \ + V(STREAMPIPE) \ V(TCPCONNECTWRAP) \ V(TCPSERVERWRAP) \ V(TCPWRAP) \ @@ -169,6 +170,18 @@ class AsyncWrap : public BaseObject { static void WeakCallback(const v8::WeakCallbackInfo &info); + // This is a simplified version of InternalCallbackScope that only runs + // the `before` and `after` hooks. Only use it when not actually calling + // back into JS; otherwise, use InternalCallbackScope. + class AsyncScope { + public: + explicit inline AsyncScope(AsyncWrap* wrap); + ~AsyncScope(); + + private: + AsyncWrap* wrap_ = nullptr; + }; + private: friend class PromiseWrap; diff --git a/src/env-inl.h b/src/env-inl.h index 31f2af402417dc..3fe57f808cfec7 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -469,6 +469,14 @@ inline void Environment::set_http_parser_buffer(char* buffer) { http_parser_buffer_ = buffer; } +inline bool Environment::http_parser_buffer_in_use() const { + return http_parser_buffer_in_use_; +} + +inline void Environment::set_http_parser_buffer_in_use(bool in_use) { + http_parser_buffer_in_use_ = in_use; +} + inline http2::http2_state* Environment::http2_state() const { return http2_state_.get(); } @@ -484,6 +492,11 @@ Environment::fs_stats_field_array() { return &fs_stats_field_array_; } +inline std::vector>& +Environment::file_handle_read_wrap_freelist() { + return file_handle_read_wrap_freelist_; +} + void Environment::CreateImmediate(native_immediate_callback cb, void* data, v8::Local obj, diff --git a/src/env.cc b/src/env.cc index 36a3612a780731..d361262e22ba09 100644 --- a/src/env.cc +++ b/src/env.cc @@ -2,6 +2,7 @@ #include "async_wrap.h" #include "node_buffer.h" #include "node_platform.h" +#include "node_file.h" #include #include diff --git a/src/env.h b/src/env.h index c778e7ba112340..4b874677935edb 100644 --- a/src/env.h +++ b/src/env.h @@ -48,6 +48,10 @@ struct nghttp2_rcbuf; namespace node { +namespace fs { +class FileHandleReadWrap; +} + namespace performance { class performance_state; } @@ -218,6 +222,7 @@ struct PackageConfig { V(onstop_string, "onstop") \ V(onstreamclose_string, "onstreamclose") \ V(ontrailers_string, "ontrailers") \ + V(onunpipe_string, "onunpipe") \ V(onwrite_string, "onwrite") \ V(openssl_error_stack, "opensslErrorStack") \ V(output_string, "output") \ @@ -229,6 +234,8 @@ struct PackageConfig { V(pbkdf2_error_string, "PBKDF2 Error") \ V(pid_string, "pid") \ V(pipe_string, "pipe") \ + V(pipe_target_string, "pipeTarget") \ + V(pipe_source_string, "pipeSource") \ V(port_string, "port") \ V(preference_string, "preference") \ V(priority_string, "priority") \ @@ -251,9 +258,11 @@ struct PackageConfig { V(session_id_string, "sessionId") \ V(shell_string, "shell") \ V(signal_string, "signal") \ + V(sink_string, "sink") \ V(size_string, "size") \ V(sni_context_err_string, "Invalid SNI context") \ V(sni_context_string, "sni_context") \ + V(source_string, "source") \ V(stack_string, "stack") \ V(status_string, "status") \ V(stdio_string, "stdio") \ @@ -297,6 +306,7 @@ struct PackageConfig { V(context, v8::Context) \ V(domain_callback, v8::Function) \ V(fd_constructor_template, v8::ObjectTemplate) \ + V(filehandlereadwrap_template, v8::ObjectTemplate) \ V(fsreqpromise_constructor_template, v8::ObjectTemplate) \ V(fdclose_constructor_template, v8::ObjectTemplate) \ V(host_import_module_dynamically_callback, v8::Function) \ @@ -319,7 +329,7 @@ struct PackageConfig { V(script_context_constructor_template, v8::FunctionTemplate) \ V(script_data_constructor_function, v8::Function) \ V(secure_context_constructor_template, v8::FunctionTemplate) \ - V(shutdown_wrap_constructor_function, v8::Function) \ + V(shutdown_wrap_template, v8::ObjectTemplate) \ V(tcp_constructor_template, v8::FunctionTemplate) \ V(tick_callback_function, v8::Function) \ V(timers_callback_function, v8::Function) \ @@ -328,7 +338,7 @@ struct PackageConfig { V(udp_constructor_function, v8::Function) \ V(vm_parsing_context_symbol, v8::Symbol) \ V(url_constructor_function, v8::Function) \ - V(write_wrap_constructor_function, v8::Function) \ + V(write_wrap_template, v8::ObjectTemplate) \ V(fs_use_promises_symbol, v8::Symbol) class Environment; @@ -636,12 +646,17 @@ class Environment { inline char* http_parser_buffer() const; inline void set_http_parser_buffer(char* buffer); + inline bool http_parser_buffer_in_use() const; + inline void set_http_parser_buffer_in_use(bool in_use); inline http2::http2_state* http2_state() const; inline void set_http2_state(std::unique_ptr state); inline AliasedBuffer* fs_stats_field_array(); + inline std::vector>& + file_handle_read_wrap_freelist(); + inline performance::performance_state* performance_state(); inline std::map* performance_marks(); @@ -815,6 +830,7 @@ class Environment { double* heap_space_statistics_buffer_ = nullptr; char* http_parser_buffer_; + bool http_parser_buffer_in_use_ = false; std::unique_ptr http2_state_; // stat fields contains twice the number of entries because `fs.StatWatcher` @@ -822,6 +838,9 @@ class Environment { static const int kFsStatsFieldsLength = 2 * 14; AliasedBuffer fs_stats_field_array_; + std::vector> + file_handle_read_wrap_freelist_; + struct ExitCallback { void (*cb_)(void* arg); void* arg_; diff --git a/src/node_file.cc b/src/node_file.cc index fe3b0e1383e8cb..36bb326aa51517 100644 --- a/src/node_file.cc +++ b/src/node_file.cc @@ -26,6 +26,7 @@ #include "node_file.h" #include "req_wrap-inl.h" +#include "stream_base-inl.h" #include "string_bytes.h" #include "string_search.h" @@ -41,7 +42,6 @@ #endif #include -#include namespace node { @@ -115,11 +115,13 @@ using v8::Value; // The FileHandle object wraps a file descriptor and will close it on garbage // collection if necessary. If that happens, a process warning will be // emitted (or a fatal exception will occur if the fd cannot be closed.) -FileHandle::FileHandle(Environment* env, int fd) +FileHandle::FileHandle(Environment* env, int fd, Local obj) : AsyncWrap(env, - env->fd_constructor_template() - ->NewInstance(env->context()).ToLocalChecked(), - AsyncWrap::PROVIDER_FILEHANDLE), fd_(fd) { + obj.IsEmpty() ? env->fd_constructor_template() + ->NewInstance(env->context()).ToLocalChecked() : obj, + AsyncWrap::PROVIDER_FILEHANDLE), + StreamBase(env), + fd_(fd) { MakeWeak(this); v8::PropertyAttribute attr = static_cast(v8::ReadOnly | v8::DontDelete); @@ -129,6 +131,19 @@ FileHandle::FileHandle(Environment* env, int fd) attr).FromJust(); } +void FileHandle::New(const v8::FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + CHECK(args.IsConstructCall()); + CHECK(args[0]->IsInt32()); + + FileHandle* handle = + new FileHandle(env, args[0].As()->Value(), args.This()); + if (args[1]->IsNumber()) + handle->read_offset_ = args[1]->IntegerValue(env->context()).FromJust(); + if (args[2]->IsNumber()) + handle->read_length_ = args[2]->IntegerValue(env->context()).FromJust(); +} + FileHandle::~FileHandle() { CHECK(!closing_); // We should not be deleting while explicitly closing! Close(); // Close synchronously and emit warning @@ -142,10 +157,10 @@ FileHandle::~FileHandle() { // will crash the process immediately. inline void FileHandle::Close() { if (closed_) return; - closed_ = true; uv_fs_t req; int ret = uv_fs_close(env()->event_loop(), &req, fd_, nullptr); uv_fs_req_cleanup(&req); + AfterClose(); struct err_detail { int ret; int fd; }; @@ -219,18 +234,18 @@ inline MaybeLocal FileHandle::ClosePromise() { CHECK(!maybe_resolver.IsEmpty()); Local resolver = maybe_resolver.ToLocalChecked(); Local promise = resolver.As(); + CHECK(!reading_); if (!closed_ && !closing_) { closing_ = true; CloseReq* req = new CloseReq(env(), promise, object()); auto AfterClose = [](uv_fs_t* req) { CloseReq* close = static_cast(req->data); CHECK_NE(close, nullptr); - close->file_handle()->closing_ = false; + close->file_handle()->AfterClose(); Isolate* isolate = close->env()->isolate(); if (req->result < 0) { close->Reject(UVException(isolate, req->result, "close")); } else { - close->file_handle()->closed_ = true; close->Resolve(); } delete close; @@ -256,6 +271,162 @@ void FileHandle::Close(const FunctionCallbackInfo& args) { } +void FileHandle::ReleaseFD(const FunctionCallbackInfo& args) { + FileHandle* fd; + ASSIGN_OR_RETURN_UNWRAP(&fd, args.Holder()); + // Just act as if this FileHandle has been closed. + fd->AfterClose(); +} + + +void FileHandle::AfterClose() { + closing_ = false; + closed_ = true; + if (reading_ && !persistent().IsEmpty()) + EmitRead(UV_EOF); +} + + +FileHandleReadWrap::FileHandleReadWrap(FileHandle* handle, Local obj) + : ReqWrap(handle->env(), obj, AsyncWrap::PROVIDER_FSREQWRAP), + file_handle_(handle) {} + +int FileHandle::ReadStart() { + if (!IsAlive() || IsClosing()) + return UV_EOF; + + reading_ = true; + + if (current_read_) + return 0; + + std::unique_ptr read_wrap; + + if (read_length_ == 0) { + EmitRead(UV_EOF); + return 0; + } + + { + // Create a new FileHandleReadWrap or re-use one. + // Either way, we need these two scopes for AsyncReset() or otherwise + // for creating the new instance. + HandleScope handle_scope(env()->isolate()); + AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(this); + + auto& freelist = env()->file_handle_read_wrap_freelist(); + if (freelist.size() > 0) { + read_wrap = std::move(freelist.back()); + freelist.pop_back(); + read_wrap->AsyncReset(); + read_wrap->file_handle_ = this; + } else { + Local wrap_obj = env()->filehandlereadwrap_template() + ->NewInstance(env()->context()).ToLocalChecked(); + read_wrap.reset(new FileHandleReadWrap(this, wrap_obj)); + } + } + int64_t recommended_read = 65536; + if (read_length_ >= 0 && read_length_ <= recommended_read) + recommended_read = read_length_; + + read_wrap->buffer_ = EmitAlloc(recommended_read); + read_wrap->Dispatched(); + + current_read_ = std::move(read_wrap); + + uv_fs_read(env()->event_loop(), + current_read_->req(), + fd_, + ¤t_read_->buffer_, + 1, + read_offset_, + [](uv_fs_t* req) { + FileHandle* handle; + { + FileHandleReadWrap* req_wrap = FileHandleReadWrap::from_req(req); + handle = req_wrap->file_handle_; + CHECK_EQ(handle->current_read_.get(), req_wrap); + } + + // ReadStart() checks whether current_read_ is set to determine whether + // a read is in progress. Moving it into a local variable makes sure that + // the ReadStart() call below doesn’t think we’re still actively reading. + std::unique_ptr read_wrap = + std::move(handle->current_read_); + + int result = req->result; + uv_buf_t buffer = read_wrap->buffer_; + + uv_fs_req_cleanup(req); + + // Push the read wrap back to the freelist, or let it be destroyed + // once we’re exiting the current scope. + constexpr size_t wanted_freelist_fill = 100; + auto& freelist = handle->env()->file_handle_read_wrap_freelist(); + if (freelist.size() < wanted_freelist_fill) + freelist.emplace_back(std::move(read_wrap)); + + if (result >= 0) { + // Read at most as many bytes as we originally planned to. + if (handle->read_length_ >= 0 && handle->read_length_ < result) + result = handle->read_length_; + + // If we read data and we have an expected length, decrease it by + // how much we have read. + if (handle->read_length_ >= 0) + handle->read_length_ -= result; + + // If we have an offset, increase it by how much we have read. + if (handle->read_offset_ >= 0) + handle->read_offset_ += result; + } + + // Reading 0 bytes from a file always means EOF, or that we reached + // the end of the requested range. + if (result == 0) + result = UV_EOF; + + handle->EmitRead(result, buffer); + + // Start over, if EmitRead() didn’t tell us to stop. + if (handle->reading_) + handle->ReadStart(); + }); + + return 0; +} + +int FileHandle::ReadStop() { + reading_ = false; + return 0; +} + +typedef SimpleShutdownWrap> FileHandleCloseWrap; + +ShutdownWrap* FileHandle::CreateShutdownWrap(Local object) { + return new FileHandleCloseWrap(this, object); +} + +int FileHandle::DoShutdown(ShutdownWrap* req_wrap) { + FileHandleCloseWrap* wrap = static_cast(req_wrap); + closing_ = true; + wrap->Dispatched(); + uv_fs_close(env()->event_loop(), wrap->req(), fd_, [](uv_fs_t* req) { + FileHandleCloseWrap* wrap = static_cast( + FileHandleCloseWrap::from_req(req)); + FileHandle* handle = static_cast(wrap->stream()); + handle->AfterClose(); + + int result = req->result; + uv_fs_req_cleanup(req); + wrap->Done(result); + }); + + return 0; +} + + void FSReqWrap::Reject(Local reject) { MakeCallback(env()->oncomplete_string(), 1, &reject); } @@ -1730,6 +1901,17 @@ void InitFs(Local target, fst->SetClassName(wrapString); target->Set(context, wrapString, fst->GetFunction()).FromJust(); + // Create FunctionTemplate for FileHandleReadWrap. There’s no need + // to do anything in the constructor, so we only store the instance template. + Local fh_rw = FunctionTemplate::New(env->isolate()); + fh_rw->InstanceTemplate()->SetInternalFieldCount(1); + AsyncWrap::AddWrapMethods(env, fh_rw); + Local fhWrapString = + FIXED_ONE_BYTE_STRING(env->isolate(), "FileHandleReqWrap"); + fh_rw->SetClassName(fhWrapString); + env->set_filehandlereadwrap_template( + fst->InstanceTemplate()); + // Create Function Template for FSReqPromise Local fpt = FunctionTemplate::New(env->isolate()); AsyncWrap::AddWrapMethods(env, fpt); @@ -1741,14 +1923,16 @@ void InitFs(Local target, env->set_fsreqpromise_constructor_template(fpo); // Create FunctionTemplate for FileHandle - Local fd = FunctionTemplate::New(env->isolate()); + Local fd = env->NewFunctionTemplate(FileHandle::New); AsyncWrap::AddWrapMethods(env, fd); env->SetProtoMethod(fd, "close", FileHandle::Close); + env->SetProtoMethod(fd, "releaseFD", FileHandle::ReleaseFD); Local fdt = fd->InstanceTemplate(); fdt->SetInternalFieldCount(1); Local handleString = FIXED_ONE_BYTE_STRING(env->isolate(), "FileHandle"); fd->SetClassName(handleString); + StreamBase::AddMethods(env, fd, StreamBase::kFlagNone); target->Set(context, handleString, fd->GetFunction()).FromJust(); env->set_fd_constructor_template(fdt); diff --git a/src/node_file.h b/src/node_file.h index bf277a0e433525..fa373d46ad0003 100644 --- a/src/node_file.h +++ b/src/node_file.h @@ -4,6 +4,7 @@ #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS #include "node.h" +#include "stream_base.h" #include "req_wrap-inl.h" namespace node { @@ -20,6 +21,7 @@ using v8::Value; namespace fs { + class FSReqBase : public ReqWrap { public: FSReqBase(Environment* env, Local req, AsyncWrap::ProviderType type) @@ -120,13 +122,38 @@ class FSReqAfterScope { Context::Scope context_scope_; }; +class FileHandle; + +// A request wrap specifically for uv_fs_read()s scheduled for reading +// from a FileHandle. +class FileHandleReadWrap : public ReqWrap { + public: + FileHandleReadWrap(FileHandle* handle, v8::Local obj); + + static inline FileHandleReadWrap* from_req(uv_fs_t* req) { + return static_cast(ReqWrap::from_req(req)); + } + + size_t self_size() const override { return sizeof(*this); } + + private: + FileHandle* file_handle_; + uv_buf_t buffer_; + + friend class FileHandle; +}; + // A wrapper for a file descriptor that will automatically close the fd when // the object is garbage collected -class FileHandle : public AsyncWrap { +class FileHandle : public AsyncWrap, public StreamBase { public: - FileHandle(Environment* env, int fd); + FileHandle(Environment* env, + int fd, + v8::Local obj = v8::Local()); virtual ~FileHandle(); + static void New(const v8::FunctionCallbackInfo& args); + int fd() const { return fd_; } size_t self_size() const override { return sizeof(*this); } @@ -134,9 +161,32 @@ class FileHandle : public AsyncWrap { // be resolved once closing is complete. static void Close(const FunctionCallbackInfo& args); + // Releases ownership of the FD. + static void ReleaseFD(const FunctionCallbackInfo& args); + + // StreamBase interface: + int ReadStart() override; + int ReadStop() override; + + bool IsAlive() override { return !closed_; } + bool IsClosing() override { return closing_; } + AsyncWrap* GetAsyncWrap() override { return this; } + + // In the case of file streams, shutting down corresponds to closing. + ShutdownWrap* CreateShutdownWrap(v8::Local object) override; + int DoShutdown(ShutdownWrap* req_wrap) override; + + int DoWrite(WriteWrap* w, + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle) override { + return UV_ENOSYS; // Not implemented (yet). + } + private: // Synchronous close that emits a warning - inline void Close(); + void Close(); + void AfterClose(); class CloseReq : public ReqWrap { public: @@ -176,6 +226,12 @@ class FileHandle : public AsyncWrap { int fd_; bool closing_ = false; bool closed_ = false; + int64_t read_offset_ = -1; + int64_t read_length_ = -1; + + bool reading_ = false; + std::unique_ptr current_read_ = nullptr; + DISALLOW_COPY_AND_ASSIGN(FileHandle); }; diff --git a/src/node_http2.cc b/src/node_http2.cc index d683b075bc2ed3..68a684025ce4a0 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -1132,6 +1132,8 @@ void Http2StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { Http2Stream* stream = static_cast(stream_); Http2Session* session = stream->session(); Environment* env = stream->env(); + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); if (nread < 0) { PassReadErrorToPreviousListener(nread); @@ -1422,6 +1424,7 @@ void Http2Session::OnStreamAfterWrite(WriteWrap* w, int status) { void Http2Session::MaybeScheduleWrite() { CHECK_EQ(flags_ & SESSION_STATE_WRITE_SCHEDULED, 0); if (session_ != nullptr && nghttp2_session_want_write(session_)) { + HandleScope handle_scope(env()->isolate()); DEBUG_HTTP2SESSION(this, "scheduling write"); flags_ |= SESSION_STATE_WRITE_SCHEDULED; env()->SetImmediate([](Environment* env, void* data) { @@ -1632,6 +1635,8 @@ inline Http2Stream* Http2Session::SubmitRequest( // Callback used to receive inbound data from the i/o stream void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { + HandleScope handle_scope(env()->isolate()); + Context::Scope context_scope(env()->context()); Http2Scope h2scope(this); CHECK_NE(stream_, nullptr); DEBUG_HTTP2SESSION2(this, "receiving %d bytes", nread); @@ -1661,8 +1666,6 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { CHECK_LE(static_cast(nread), stream_buf_.len); Isolate* isolate = env()->isolate(); - HandleScope scope(isolate); - Context::Scope context_scope(env()->context()); // Create an array buffer for the read data. DATA frames will be emitted // as slices of this array buffer to avoid having to copy memory. @@ -1810,7 +1813,9 @@ inline void Http2Stream::Close(int32_t code) { } int Http2Stream::DoShutdown(ShutdownWrap* req_wrap) { - CHECK(!this->IsDestroyed()); + if (IsDestroyed()) + return UV_EPIPE; + { Http2Scope h2scope(this); flags_ |= NGHTTP2_STREAM_FLAG_SHUT; @@ -1883,28 +1888,6 @@ inline int Http2Stream::SubmitResponse(nghttp2_nv* nva, } -// Initiate a response that contains data read from a file descriptor. -inline int Http2Stream::SubmitFile(int fd, - nghttp2_nv* nva, size_t len, - int64_t offset, - int64_t length, - int options) { - CHECK(!this->IsDestroyed()); - Http2Scope h2scope(this); - DEBUG_HTTP2STREAM(this, "submitting file"); - if (options & STREAM_OPTION_GET_TRAILERS) - flags_ |= NGHTTP2_STREAM_FLAG_TRAILERS; - - if (offset > 0) fd_offset_ = offset; - if (length > -1) fd_length_ = length; - - Http2Stream::Provider::FD prov(this, options, fd); - int ret = nghttp2_submit_response(session_->session(), id_, nva, len, *prov); - CHECK_NE(ret, NGHTTP2_ERR_NOMEM); - return ret; -} - - // Submit informational headers for a stream. inline int Http2Stream::SubmitInfo(nghttp2_nv* nva, size_t len) { CHECK(!this->IsDestroyed()); @@ -2080,87 +2063,6 @@ Http2Stream::Provider::~Provider() { provider_.source.ptr = nullptr; } -// The FD Provider pulls data from a file descriptor using libuv. All of the -// data transfer occurs in C++, without any chunks being passed through JS -// land. -Http2Stream::Provider::FD::FD(Http2Stream* stream, int options, int fd) - : Http2Stream::Provider(stream, options) { - CHECK(!stream->IsDestroyed()); - provider_.source.fd = fd; - provider_.read_callback = Http2Stream::Provider::FD::OnRead; -} - -Http2Stream::Provider::FD::FD(int options, int fd) - : Http2Stream::Provider(options) { - provider_.source.fd = fd; - provider_.read_callback = Http2Stream::Provider::FD::OnRead; -} - -ssize_t Http2Stream::Provider::FD::OnRead(nghttp2_session* handle, - int32_t id, - uint8_t* buf, - size_t length, - uint32_t* flags, - nghttp2_data_source* source, - void* user_data) { - Http2Session* session = static_cast(user_data); - Http2Stream* stream = session->FindStream(id); - if (stream->statistics_.first_byte_sent == 0) - stream->statistics_.first_byte_sent = uv_hrtime(); - - DEBUG_HTTP2SESSION2(session, "reading outbound file data for stream %d", id); - CHECK_EQ(id, stream->id()); - - int fd = source->fd; - int64_t offset = stream->fd_offset_; - ssize_t numchars = 0; - - if (stream->fd_length_ >= 0 && - stream->fd_length_ < static_cast(length)) - length = stream->fd_length_; - - uv_buf_t data; - data.base = reinterpret_cast(buf); - data.len = length; - - uv_fs_t read_req; - - if (length > 0) { - // TODO(addaleax): Never use synchronous I/O on the main thread. - numchars = uv_fs_read(session->event_loop(), - &read_req, - fd, &data, 1, - offset, nullptr); - uv_fs_req_cleanup(&read_req); - } - - // Close the stream with an error if reading fails - if (numchars < 0) - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; - - // Update the read offset for the next read - stream->fd_offset_ += numchars; - stream->fd_length_ -= numchars; - - DEBUG_HTTP2SESSION2(session, "sending %d bytes", numchars); - - // if numchars < length, assume that we are done. - if (static_cast(numchars) < length || length <= 0) { - DEBUG_HTTP2SESSION2(session, "no more data for stream %d", id); - *flags |= NGHTTP2_DATA_FLAG_EOF; - session->GetTrailers(stream, flags); - // If the stream or session gets destroyed during the GetTrailers - // callback, check that here and close down the stream - if (stream->IsDestroyed()) - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; - if (session->IsDestroyed()) - return NGHTTP2_ERR_CALLBACK_FAILURE; - } - - stream->statistics_.sent_bytes += numchars; - return numchars; -} - // The Stream Provider pulls data from a linked list of uv_buf_t structs // built via the StreamBase API and the Streams js API. Http2Stream::Provider::Stream::Stream(int options) @@ -2216,6 +2118,11 @@ ssize_t Http2Stream::Provider::Stream::OnRead(nghttp2_session* handle, if (amount == 0 && stream->IsWritable()) { CHECK(stream->queue_.empty()); DEBUG_HTTP2SESSION2(session, "deferring stream %d", id); + stream->EmitWantsWrite(length); + if (stream->available_outbound_length_ > 0 || !stream->IsWritable()) { + // EmitWantsWrite() did something interesting synchronously, restart: + return OnRead(handle, id, buf, length, flags, source, user_data); + } return NGHTTP2_ERR_DEFERRED; } @@ -2498,27 +2405,6 @@ void Http2Stream::Respond(const FunctionCallbackInfo& args) { DEBUG_HTTP2STREAM(stream, "response submitted"); } -// Initiates a response on the Http2Stream using a file descriptor to provide -// outbound DATA frames. -void Http2Stream::RespondFD(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - Local context = env->context(); - Isolate* isolate = env->isolate(); - Http2Stream* stream; - ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder()); - - int fd = args[0]->Int32Value(context).ToChecked(); - Local headers = args[1].As(); - - int64_t offset = args[2]->IntegerValue(context).ToChecked(); - int64_t length = args[3]->IntegerValue(context).ToChecked(); - int options = args[4]->IntegerValue(context).ToChecked(); - - Headers list(isolate, context, headers); - args.GetReturnValue().Set(stream->SubmitFile(fd, *list, list.length(), - offset, length, options)); - DEBUG_HTTP2STREAM2(stream, "file response submitted for fd %d", fd); -} // Submits informational headers on the Http2Stream void Http2Stream::Info(const FunctionCallbackInfo& args) { @@ -2881,7 +2767,6 @@ void Initialize(Local target, env->SetProtoMethod(stream, "priority", Http2Stream::Priority); env->SetProtoMethod(stream, "pushPromise", Http2Stream::PushPromise); env->SetProtoMethod(stream, "info", Http2Stream::Info); - env->SetProtoMethod(stream, "respondFD", Http2Stream::RespondFD); env->SetProtoMethod(stream, "respond", Http2Stream::Respond); env->SetProtoMethod(stream, "rstStream", Http2Stream::RstStream); env->SetProtoMethod(stream, "refreshState", Http2Stream::RefreshState); diff --git a/src/node_http2.h b/src/node_http2.h index 08109dcf046ba4..0fac6cca00f4db 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -573,18 +573,13 @@ class Http2Stream : public AsyncWrap, // Required for StreamBase int DoShutdown(ShutdownWrap* req_wrap) override; + bool HasWantsWrite() const override { return true; } + // Initiate a response on this stream. inline int SubmitResponse(nghttp2_nv* nva, size_t len, int options); - // Send data read from a file descriptor as the response on this stream. - inline int SubmitFile(int fd, - nghttp2_nv* nva, size_t len, - int64_t offset, - int64_t length, - int options); - // Submit informational headers for this stream inline int SubmitInfo(nghttp2_nv* nva, size_t len); @@ -707,7 +702,6 @@ class Http2Stream : public AsyncWrap, static void PushPromise(const FunctionCallbackInfo& args); static void RefreshState(const FunctionCallbackInfo& args); static void Info(const FunctionCallbackInfo& args); - static void RespondFD(const FunctionCallbackInfo& args); static void Respond(const FunctionCallbackInfo& args); static void RstStream(const FunctionCallbackInfo& args); @@ -751,8 +745,6 @@ class Http2Stream : public AsyncWrap, // waiting to be written out to the socket. std::queue queue_; size_t available_outbound_length_ = 0; - int64_t fd_offset_ = 0; - int64_t fd_length_ = -1; Http2StreamListener stream_listener_; @@ -778,20 +770,6 @@ class Http2Stream::Provider { bool empty_ = false; }; -class Http2Stream::Provider::FD : public Http2Stream::Provider { - public: - FD(int options, int fd); - FD(Http2Stream* stream, int options, int fd); - - static ssize_t OnRead(nghttp2_session* session, - int32_t id, - uint8_t* buf, - size_t length, - uint32_t* flags, - nghttp2_data_source* source, - void* user_data); -}; - class Http2Stream::Provider::Stream : public Http2Stream::Provider { public: Stream(Http2Stream* stream, int options); diff --git a/src/node_http_parser.cc b/src/node_http_parser.cc index 207310f4068f43..8ab13e07340929 100644 --- a/src/node_http_parser.cc +++ b/src/node_http_parser.cc @@ -525,6 +525,14 @@ class Parser : public AsyncWrap, public StreamListener { static const size_t kAllocBufferSize = 64 * 1024; uv_buf_t OnStreamAlloc(size_t suggested_size) override { + // For most types of streams, OnStreamRead will be immediately after + // OnStreamAlloc, and will consume all data, so using a static buffer for + // reading is more efficient. For other streams, just use the default + // allocator, which uses Malloc(). + if (env()->http_parser_buffer_in_use()) + return StreamListener::OnStreamAlloc(suggested_size); + env()->set_http_parser_buffer_in_use(true); + if (env()->http_parser_buffer() == nullptr) env()->set_http_parser_buffer(new char[kAllocBufferSize]); @@ -534,6 +542,15 @@ class Parser : public AsyncWrap, public StreamListener { void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override { HandleScope scope(env()->isolate()); + // Once we’re done here, either indicate that the HTTP parser buffer + // is free for re-use, or free() the data if it didn’t come from there + // in the first place. + OnScopeLeave on_scope_leave([&]() { + if (buf.base == env()->http_parser_buffer()) + env()->set_http_parser_buffer_in_use(false); + else + free(buf.base); + }); if (nread < 0) { PassReadErrorToPreviousListener(nread); diff --git a/src/node_internals.h b/src/node_internals.h index 2faa6f93475ad7..79c2ce553200f3 100644 --- a/src/node_internals.h +++ b/src/node_internals.h @@ -120,6 +120,7 @@ struct sockaddr; V(serdes) \ V(signal_wrap) \ V(spawn_sync) \ + V(stream_pipe) \ V(stream_wrap) \ V(string_decoder) \ V(tcp_wrap) \ diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index 1534dcd1d53359..f4c228d7c5956f 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -67,8 +67,14 @@ inline void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) { inline StreamResource::~StreamResource() { while (listener_ != nullptr) { - listener_->OnStreamDestroy(); - RemoveStreamListener(listener_); + StreamListener* listener = listener_; + listener->OnStreamDestroy(); + // Remove the listener if it didn’t remove itself. This makes the logic + // logic in `OnStreamDestroy()` implementations easier, because they + // may call generic cleanup functions which can just remove the + // listener unconditionally. + if (listener == listener_) + RemoveStreamListener(listener_); } } @@ -106,25 +112,43 @@ inline void StreamResource::RemoveStreamListener(StreamListener* listener) { listener->previous_listener_ = nullptr; } - inline uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) { +#ifdef DEBUG + v8::SealHandleScope handle_scope(v8::Isolate::GetCurrent()); +#endif return listener_->OnStreamAlloc(suggested_size); } inline void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) { +#ifdef DEBUG + v8::SealHandleScope handle_scope(v8::Isolate::GetCurrent()); +#endif if (nread > 0) bytes_read_ += static_cast(nread); listener_->OnStreamRead(nread, buf); } inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) { +#ifdef DEBUG + v8::SealHandleScope handle_scope(v8::Isolate::GetCurrent()); +#endif listener_->OnStreamAfterWrite(w, status); } inline void StreamResource::EmitAfterShutdown(ShutdownWrap* w, int status) { +#ifdef DEBUG + v8::SealHandleScope handle_scope(v8::Isolate::GetCurrent()); +#endif listener_->OnStreamAfterShutdown(w, status); } +inline void StreamResource::EmitWantsWrite(size_t suggested_size) { +#ifdef DEBUG + v8::SealHandleScope handle_scope(v8::Isolate::GetCurrent()); +#endif + listener_->OnStreamWantsWrite(suggested_size); +} + inline StreamBase::StreamBase(Environment* env) : env_(env) { PushStreamListener(&default_listener_); } @@ -133,35 +157,16 @@ inline Environment* StreamBase::stream_env() const { return env_; } -inline void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) { - AfterRequest(req_wrap, [&]() { - EmitAfterWrite(req_wrap, status); - }); -} - -inline void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) { - AfterRequest(req_wrap, [&]() { - EmitAfterShutdown(req_wrap, status); - }); -} - -template -inline void StreamBase::AfterRequest(Wrap* req_wrap, EmitEvent emit) { +inline int StreamBase::Shutdown(v8::Local req_wrap_obj) { Environment* env = stream_env(); - v8::HandleScope handle_scope(env->isolate()); - v8::Context::Scope context_scope(env->context()); + HandleScope handle_scope(env->isolate()); - emit(); - req_wrap->Dispose(); -} - -inline int StreamBase::Shutdown(v8::Local req_wrap_obj) { - Environment* env = stream_env(); if (req_wrap_obj.IsEmpty()) { req_wrap_obj = - env->shutdown_wrap_constructor_function() + env->shutdown_wrap_template() ->NewInstance(env->context()).ToLocalChecked(); + StreamReq::ResetObject(req_wrap_obj); } AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap()); @@ -195,10 +200,13 @@ inline StreamWriteResult StreamBase::Write( } } + HandleScope handle_scope(env->isolate()); + if (req_wrap_obj.IsEmpty()) { req_wrap_obj = - env->write_wrap_constructor_function() + env->write_wrap_template() ->NewInstance(env->context()).ToLocalChecked(); + StreamReq::ResetObject(req_wrap_obj); } AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap()); @@ -387,7 +395,8 @@ void StreamBase::JSMethod(const FunctionCallbackInfo& args) { inline void ShutdownWrap::OnDone(int status) { - stream()->AfterShutdown(this, status); + stream()->EmitAfterShutdown(this, status); + Dispose(); } inline void WriteWrap::SetAllocatedStorage(char* data, size_t size) { @@ -405,7 +414,8 @@ inline size_t WriteWrap::StorageSize() const { } inline void WriteWrap::OnDone(int status) { - stream()->AfterWrite(this, status); + stream()->EmitAfterWrite(this, status); + Dispose(); } inline void StreamReq::Done(int status, const char* error_str) { @@ -419,6 +429,15 @@ inline void StreamReq::Done(int status, const char* error_str) { OnDone(status); } +inline void StreamReq::ResetObject(v8::Local obj) { +#ifdef DEBUG + CHECK_GT(obj->InternalFieldCount(), StreamReq::kStreamReqField); +#endif + ClearWrap(obj); + obj->SetAlignedPointerInInternalField(StreamReq::kStreamReqField, nullptr); +} + + } // namespace node #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS diff --git a/src/stream_base.cc b/src/stream_base.cc index 1d1d324841537f..8838a1a6dfb6b3 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -387,6 +387,8 @@ void ReportWritesToJSStreamListener::OnStreamAfterReqFinished( StreamBase* stream = static_cast(stream_); Environment* env = stream->stream_env(); AsyncWrap* async_wrap = req_wrap->GetAsyncWrap(); + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); Local req_wrap_obj = async_wrap->object(); Local argv[] = { diff --git a/src/stream_base.h b/src/stream_base.h index 8af05059f49e47..3267e544a04195 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -46,6 +46,13 @@ class StreamReq { static StreamReq* FromObject(v8::Local req_wrap_obj); + // Sets all internal fields of `req_wrap_obj` to `nullptr`. + // This is what the `WriteWrap` and `ShutdownWrap` JS constructors do, + // and what we use in C++ after creating these objects from their + // v8::ObjectTemplates, to avoid the overhead of calling the + // constructor explicitly. + static inline void ResetObject(v8::Local req_wrap_obj); + protected: virtual void OnDone(int status) = 0; @@ -61,7 +68,8 @@ class ShutdownWrap : public StreamReq { v8::Local req_wrap_obj) : StreamReq(stream, req_wrap_obj) { } - void OnDone(int status) override; // Just calls stream()->AfterShutdown() + // Call stream()->EmitAfterShutdown() and dispose of this request wrap. + void OnDone(int status) override; }; class WriteWrap : public StreamReq { @@ -78,7 +86,8 @@ class WriteWrap : public StreamReq { free(storage_); } - void OnDone(int status) override; // Just calls stream()->AfterWrite() + // Call stream()->EmitAfterWrite() and dispose of this request wrap. + void OnDone(int status) override; private: char* storage_ = nullptr; @@ -93,7 +102,7 @@ class StreamListener { public: virtual ~StreamListener(); - // This is called when a stream wants to allocate memory immediately before + // This is called when a stream wants to allocate memory before // reading data into the freshly allocated buffer (i.e. it is always followed // by a `OnStreamRead()` call). // This memory may be statically or dynamically allocated; for example, @@ -103,6 +112,9 @@ class StreamListener { // The returned buffer does not need to contain `suggested_size` bytes. // The default implementation of this method returns a buffer that has exactly // the suggested size and is allocated using malloc(). + // It is not valid to return a zero-length buffer from this method. + // It is not guaranteed that the corresponding `OnStreamRead()` call + // happens in the same event loop turn as this call. virtual uv_buf_t OnStreamAlloc(size_t suggested_size); // `OnStreamRead()` is called when data is available on the socket and has @@ -126,9 +138,19 @@ class StreamListener { // (and raises an assertion if there is none). virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status); + // This is called by the stream if it determines that it wants more data + // to be written to it. Not all streams support this. + // This callback will not be called as long as there are active writes. + // It is not supported by all streams; `stream->HasWantsWrite()` returns + // true if it is supported by a stream. + virtual void OnStreamWantsWrite(size_t suggested_size) {} + // This is called immediately before the stream is destroyed. virtual void OnStreamDestroy() {} + // The stream this is currently associated with, or nullptr if there is none. + inline StreamResource* stream() { return stream_; } + protected: // Pass along a read error to the `StreamListener` instance that was active // before this one. For example, a protocol parser does not care about read @@ -194,6 +216,9 @@ class StreamResource { size_t count, uv_stream_t* send_handle) = 0; + // Returns true if the stream supports the `OnStreamWantsWrite()` interface. + virtual bool HasWantsWrite() const { return false; } + // Optionally, this may provide an error message to be used for // failing writes. virtual const char* Error() const; @@ -217,6 +242,8 @@ class StreamResource { void EmitAfterWrite(WriteWrap* w, int status); // Call the current listener's OnStreamAfterShutdown() method. void EmitAfterShutdown(ShutdownWrap* w, int status); + // Call the current listener's OnStreamWantsWrite() method. + void EmitWantsWrite(size_t suggested_size); StreamListener* listener_ = nullptr; uint64_t bytes_read_ = 0; @@ -306,13 +333,6 @@ class StreamBase : public StreamResource { Environment* env_; EmitToJSStreamListener default_listener_; - // These are called by the respective {Write,Shutdown}Wrap class. - void AfterShutdown(ShutdownWrap* req, int status); - void AfterWrite(WriteWrap* req, int status); - - template - void AfterRequest(Wrap* req_wrap, EmitEvent emit); - friend class WriteWrap; friend class ShutdownWrap; }; diff --git a/src/stream_pipe.cc b/src/stream_pipe.cc new file mode 100644 index 00000000000000..8f0263cd9ae99b --- /dev/null +++ b/src/stream_pipe.cc @@ -0,0 +1,266 @@ +#include "stream_pipe.h" +#include "stream_base-inl.h" +#include "node_buffer.h" +#include "node_internals.h" + +using v8::Context; +using v8::External; +using v8::FunctionCallbackInfo; +using v8::FunctionTemplate; +using v8::Local; +using v8::Object; +using v8::Value; + +namespace node { + +StreamPipe::StreamPipe(StreamBase* source, + StreamBase* sink, + Local obj) + : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) { + MakeWeak(this); + + CHECK_NE(sink, nullptr); + CHECK_NE(source, nullptr); + + source->PushStreamListener(&readable_listener_); + sink->PushStreamListener(&writable_listener_); + + CHECK(sink->HasWantsWrite()); + + // Set up links between this object and the source/sink objects. + // In particular, this makes sure that they are garbage collected as a group, + // if that applies to the given streams (for example, Http2Streams use + // weak references). + obj->Set(env()->context(), env()->source_string(), source->GetObject()) + .FromJust(); + source->GetObject()->Set(env()->context(), env()->pipe_target_string(), obj) + .FromJust(); + obj->Set(env()->context(), env()->sink_string(), sink->GetObject()) + .FromJust(); + sink->GetObject()->Set(env()->context(), env()->pipe_source_string(), obj) + .FromJust(); +} + +StreamPipe::~StreamPipe() { + CHECK(is_closed_); +} + +StreamBase* StreamPipe::source() { + return static_cast(readable_listener_.stream()); +} + +StreamBase* StreamPipe::sink() { + return static_cast(writable_listener_.stream()); +} + +void StreamPipe::Unpipe() { + if (is_closed_) + return; + + // Note that we cannot use virtual methods on `source` and `sink` here, + // because this function can be called from their destructors via + // `OnStreamDestroy()`. + + is_closed_ = true; + is_reading_ = false; + source()->RemoveStreamListener(&readable_listener_); + sink()->RemoveStreamListener(&writable_listener_); + + // Delay the JS-facing part with SetImmediate, because this might be from + // inside the garbage collector, so we can’t run JS here. + HandleScope handle_scope(env()->isolate()); + env()->SetImmediate([](Environment* env, void* data) { + StreamPipe* pipe = static_cast(data); + + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + Local object = pipe->object(); + + if (object->Has(env->context(), env->onunpipe_string()).FromJust()) { + pipe->MakeCallback(env->onunpipe_string(), 0, nullptr).ToLocalChecked(); + } + + // Set all the links established in the constructor to `null`. + Local null = Null(env->isolate()); + + Local source_v; + Local sink_v; + source_v = object->Get(env->context(), env->source_string()) + .ToLocalChecked(); + sink_v = object->Get(env->context(), env->sink_string()) + .ToLocalChecked(); + CHECK(source_v->IsObject()); + CHECK(sink_v->IsObject()); + + object->Set(env->context(), env->source_string(), null).FromJust(); + object->Set(env->context(), env->sink_string(), null).FromJust(); + source_v.As()->Set(env->context(), + env->pipe_target_string(), + null).FromJust(); + sink_v.As()->Set(env->context(), + env->pipe_source_string(), + null).FromJust(); + }, static_cast(this), object()); +} + +uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) { + StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this); + size_t size = std::min(suggested_size, pipe->wanted_data_); + CHECK_GT(size, 0); + return uv_buf_init(Malloc(size), size); +} + +void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread, + const uv_buf_t& buf) { + StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this); + AsyncScope async_scope(pipe); + if (nread < 0) { + // EOF or error; stop reading and pass the error to the previous listener + // (which might end up in JS). + free(buf.base); + pipe->is_eof_ = true; + stream()->ReadStop(); + CHECK_NE(previous_listener_, nullptr); + previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0)); + // If we’re not writing, close now. Otherwise, we’ll do that in + // `OnStreamAfterWrite()`. + if (!pipe->is_writing_) { + pipe->ShutdownWritable(); + pipe->Unpipe(); + } + return; + } + + pipe->ProcessData(nread, buf); +} + +void StreamPipe::ProcessData(size_t nread, const uv_buf_t& buf) { + uv_buf_t buffer = uv_buf_init(buf.base, nread); + StreamWriteResult res = sink()->Write(&buffer, 1); + if (!res.async) { + free(buf.base); + writable_listener_.OnStreamAfterWrite(nullptr, res.err); + } else { + is_writing_ = true; + is_reading_ = false; + res.wrap->SetAllocatedStorage(buf.base, buf.len); + source()->ReadStop(); + } +} + +void StreamPipe::ShutdownWritable() { + sink()->Shutdown(); +} + +void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w, + int status) { + StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this); + pipe->is_writing_ = false; + if (pipe->is_eof_) { + AsyncScope async_scope(pipe); + pipe->ShutdownWritable(); + pipe->Unpipe(); + return; + } + + if (status != 0) { + CHECK_NE(previous_listener_, nullptr); + StreamListener* prev = previous_listener_; + pipe->Unpipe(); + prev->OnStreamAfterWrite(w, status); + return; + } +} + +void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w, + int status) { + StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this); + CHECK_NE(previous_listener_, nullptr); + StreamListener* prev = previous_listener_; + pipe->Unpipe(); + prev->OnStreamAfterShutdown(w, status); +} + +void StreamPipe::ReadableListener::OnStreamDestroy() { + StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this); + if (!pipe->is_eof_) { + OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0)); + } +} + +void StreamPipe::WritableListener::OnStreamDestroy() { + StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this); + pipe->is_eof_ = true; + pipe->Unpipe(); +} + +void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) { + StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this); + pipe->wanted_data_ = suggested_size; + if (pipe->is_reading_ || pipe->is_closed_) + return; + AsyncScope async_scope(pipe); + pipe->is_reading_ = true; + pipe->source()->ReadStart(); +} + +uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) { + CHECK_NE(previous_listener_, nullptr); + return previous_listener_->OnStreamAlloc(suggested_size); +} + +void StreamPipe::WritableListener::OnStreamRead(ssize_t nread, + const uv_buf_t& buf) { + CHECK_NE(previous_listener_, nullptr); + return previous_listener_->OnStreamRead(nread, buf); +} + +void StreamPipe::New(const FunctionCallbackInfo& args) { + CHECK(args.IsConstructCall()); + CHECK(args[0]->IsExternal()); + CHECK(args[1]->IsExternal()); + auto source = static_cast(args[0].As()->Value()); + auto sink = static_cast(args[1].As()->Value()); + + new StreamPipe(source, sink, args.This()); +} + +void StreamPipe::Start(const FunctionCallbackInfo& args) { + StreamPipe* pipe; + ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder()); + pipe->is_closed_ = false; + if (pipe->wanted_data_ > 0) + pipe->writable_listener_.OnStreamWantsWrite(pipe->wanted_data_); +} + +void StreamPipe::Unpipe(const FunctionCallbackInfo& args) { + StreamPipe* pipe; + ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder()); + pipe->Unpipe(); +} + +namespace { + +void InitializeStreamPipe(Local target, + Local unused, + Local context) { + Environment* env = Environment::GetCurrent(context); + + // Create FunctionTemplate for FileHandle::CloseReq + Local pipe = env->NewFunctionTemplate(StreamPipe::New); + Local stream_pipe_string = + FIXED_ONE_BYTE_STRING(env->isolate(), "StreamPipe"); + env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe); + env->SetProtoMethod(pipe, "start", StreamPipe::Start); + AsyncWrap::AddWrapMethods(env, pipe); + pipe->SetClassName(stream_pipe_string); + pipe->InstanceTemplate()->SetInternalFieldCount(1); + target->Set(context, stream_pipe_string, pipe->GetFunction()).FromJust(); +} + +} // anonymous namespace + +} // namespace node + +NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_pipe, + node::InitializeStreamPipe) diff --git a/src/stream_pipe.h b/src/stream_pipe.h new file mode 100644 index 00000000000000..98d6dae11be841 --- /dev/null +++ b/src/stream_pipe.h @@ -0,0 +1,68 @@ +#ifndef SRC_STREAM_PIPE_H_ +#define SRC_STREAM_PIPE_H_ + +#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#include "stream_base.h" + +namespace node { + +class StreamPipe : public AsyncWrap { + public: + StreamPipe(StreamBase* source, StreamBase* sink, v8::Local obj); + ~StreamPipe(); + + void Unpipe(); + + static void New(const v8::FunctionCallbackInfo& args); + static void Start(const v8::FunctionCallbackInfo& args); + static void Unpipe(const v8::FunctionCallbackInfo& args); + + size_t self_size() const override { return sizeof(*this); } + + private: + StreamBase* source(); + StreamBase* sink(); + + void ShutdownWritable(); + void FlushToWritable(); + + bool is_reading_ = false; + bool is_writing_ = false; + bool is_eof_ = false; + bool is_closed_ = true; + + // Set a default value so that when we’re coming from Start(), we know + // that we don’t want to read just yet. + // This will likely need to be changed when supporting streams without + // `OnStreamWantsWrite()` support. + size_t wanted_data_ = 0; + + void ProcessData(size_t nread, const uv_buf_t& buf); + + class ReadableListener : public StreamListener { + public: + uv_buf_t OnStreamAlloc(size_t suggested_size) override; + void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; + void OnStreamDestroy() override; + }; + + class WritableListener : public StreamListener { + public: + uv_buf_t OnStreamAlloc(size_t suggested_size) override; + void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; + void OnStreamAfterWrite(WriteWrap* w, int status) override; + void OnStreamAfterShutdown(ShutdownWrap* w, int status) override; + void OnStreamWantsWrite(size_t suggested_size) override; + void OnStreamDestroy() override; + }; + + ReadableListener readable_listener_; + WritableListener writable_listener_; +}; + +} // namespace node + +#endif + +#endif // SRC_STREAM_PIPE_H_ diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index 27fe48d1165c75..c696404849c9cc 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -60,9 +60,7 @@ void LibuvStreamWrap::Initialize(Local target, auto is_construct_call_callback = [](const FunctionCallbackInfo& args) { CHECK(args.IsConstructCall()); - ClearWrap(args.This()); - args.This()->SetAlignedPointerInInternalField( - StreamReq::kStreamReqField, nullptr); + StreamReq::ResetObject(args.This()); }; Local sw = FunctionTemplate::New(env->isolate(), is_construct_call_callback); @@ -72,7 +70,7 @@ void LibuvStreamWrap::Initialize(Local target, sw->SetClassName(wrapString); AsyncWrap::AddWrapMethods(env, sw); target->Set(wrapString, sw->GetFunction()); - env->set_shutdown_wrap_constructor_function(sw->GetFunction()); + env->set_shutdown_wrap_template(sw->InstanceTemplate()); Local ww = FunctionTemplate::New(env->isolate(), is_construct_call_callback); @@ -82,7 +80,7 @@ void LibuvStreamWrap::Initialize(Local target, ww->SetClassName(writeWrapString); AsyncWrap::AddWrapMethods(env, ww); target->Set(writeWrapString, ww->GetFunction()); - env->set_write_wrap_constructor_function(ww->GetFunction()); + env->set_write_wrap_template(ww->InstanceTemplate()); } diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index 1cc5478bb57296..cddef66c44a8e5 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -220,6 +220,8 @@ void TLSWrap::SSLInfoCallback(const SSL* ssl_, int where, int ret) { SSL* ssl = const_cast(ssl_); TLSWrap* c = static_cast(SSL_get_app_data(ssl)); Environment* env = c->env(); + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); Local object = c->object(); if (where & SSL_CB_HANDSHAKE_START) { @@ -289,6 +291,8 @@ void TLSWrap::EncOut() { NODE_COUNT_NET_BYTES_SENT(write_size_); if (!res.async) { + HandleScope handle_scope(env()->isolate()); + // Simulate asynchronous finishing, TLS cannot handle this at the moment. env()->SetImmediate([](Environment* env, void* data) { static_cast(data)->OnStreamAfterWrite(nullptr, 0); @@ -427,6 +431,7 @@ void TLSWrap::ClearOut() { // shutdown cleanly (SSL_ERROR_ZERO_RETURN) even when read == 0. // See node#1642 and SSL_read(3SSL) for details. if (read <= 0) { + HandleScope handle_scope(env()->isolate()); int err; Local arg = GetSSLError(read, &err, nullptr); @@ -477,6 +482,9 @@ bool TLSWrap::ClearIn() { } // Error or partial write + HandleScope handle_scope(env()->isolate()); + Context::Scope context_scope(env()->context()); + int err; std::string error_str; Local arg = GetSSLError(written, &err, &error_str); @@ -814,6 +822,9 @@ int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) { if (servername == nullptr) return SSL_TLSEXT_ERR_OK; + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + // Call the SNI callback and use its return value as context Local object = p->object(); Local ctx = object->Get(env->sni_context_string()); diff --git a/src/util.h b/src/util.h index 7c679952d5fb1f..e871fc63a5c46a 100644 --- a/src/util.h +++ b/src/util.h @@ -34,6 +34,7 @@ #include #include +#include // std::function #include // std::remove_reference namespace node { @@ -433,9 +434,16 @@ class BufferValue : public MaybeStackBuffer { // Use this when a variable or parameter is unused in order to explicitly // silence a compiler warning about that. template inline void USE(T&&) {} - } // namespace node +// Run a function when exiting the current scope. +struct OnScopeLeave { + std::function fn_; + + explicit OnScopeLeave(std::function fn) : fn_(fn) {} + ~OnScopeLeave() { fn_(); } +}; + #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS #endif // SRC_UTIL_H_ diff --git a/test/parallel/test-http2-respond-file-error-dir.js b/test/parallel/test-http2-respond-file-error-dir.js index 6818616227df89..24a6d2dc96597e 100644 --- a/test/parallel/test-http2-respond-file-error-dir.js +++ b/test/parallel/test-http2-respond-file-error-dir.js @@ -15,7 +15,7 @@ server.on('stream', (stream) => { common.expectsError({ code: 'ERR_HTTP2_SEND_FILE', type: Error, - message: 'Only regular files can be sent' + message: 'Directories cannot be sent' })(err); stream.respond({ ':status': 404 }); diff --git a/test/parallel/test-http2-respond-file-error-pipe-offset.js b/test/parallel/test-http2-respond-file-error-pipe-offset.js new file mode 100644 index 00000000000000..eb782e2dea66c4 --- /dev/null +++ b/test/parallel/test-http2-respond-file-error-pipe-offset.js @@ -0,0 +1,61 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +if (common.isWindows) + common.skip('no mkfifo on Windows'); +const child_process = require('child_process'); +const path = require('path'); +const fs = require('fs'); +const http2 = require('http2'); +const assert = require('assert'); + +const tmpdir = require('../common/tmpdir'); +tmpdir.refresh(); + +const pipeName = path.join(tmpdir.path, 'pipe'); + +const mkfifo = child_process.spawnSync('mkfifo', [ pipeName ]); +if (mkfifo.error && mkfifo.error.code === 'ENOENT') { + common.skip('missing mkfifo'); +} + +process.on('exit', () => fs.unlinkSync(pipeName)); + +const server = http2.createServer(); +server.on('stream', (stream) => { + stream.respondWithFile(pipeName, { + 'content-type': 'text/plain' + }, { + offset: 10, + onError(err) { + common.expectsError({ + code: 'ERR_HTTP2_SEND_FILE_NOSEEK', + type: Error, + message: 'Offset or length can only be specified for regular files' + })(err); + + stream.respond({ ':status': 404 }); + stream.end(); + }, + statCheck: common.mustNotCall() + }); +}); +server.listen(0, () => { + + const client = http2.connect(`http://localhost:${server.address().port}`); + const req = client.request(); + + req.on('response', common.mustCall((headers) => { + assert.strictEqual(headers[':status'], 404); + })); + req.on('data', common.mustNotCall()); + req.on('end', common.mustCall(() => { + client.close(); + server.close(); + })); + req.end(); +}); + +fs.writeFile(pipeName, 'Hello, world!\n', common.mustCall()); diff --git a/test/parallel/test-http2-respond-file-with-pipe.js b/test/parallel/test-http2-respond-file-with-pipe.js new file mode 100644 index 00000000000000..2b7ca3fc9a51ee --- /dev/null +++ b/test/parallel/test-http2-respond-file-with-pipe.js @@ -0,0 +1,58 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +if (common.isWindows) + common.skip('no mkfifo on Windows'); +const child_process = require('child_process'); +const path = require('path'); +const fs = require('fs'); +const http2 = require('http2'); +const assert = require('assert'); + +const tmpdir = require('../common/tmpdir'); +tmpdir.refresh(); + +const pipeName = path.join(tmpdir.path, 'pipe'); + +const mkfifo = child_process.spawnSync('mkfifo', [ pipeName ]); +if (mkfifo.error && mkfifo.error.code === 'ENOENT') { + common.skip('missing mkfifo'); +} + +process.on('exit', () => fs.unlinkSync(pipeName)); + +const server = http2.createServer(); +server.on('stream', (stream) => { + stream.respondWithFile(pipeName, { + 'content-type': 'text/plain' + }, { + onError: common.mustNotCall(), + statCheck: common.mustCall() + }); +}); + +server.listen(0, () => { + const client = http2.connect(`http://localhost:${server.address().port}`); + const req = client.request(); + + req.on('response', common.mustCall((headers) => { + assert.strictEqual(headers[':status'], 200); + })); + let body = ''; + req.setEncoding('utf8'); + req.on('data', (chunk) => body += chunk); + req.on('end', common.mustCall(() => { + assert.strictEqual(body, 'Hello, world!\n'); + client.close(); + server.close(); + })); + req.end(); +}); + +fs.open(pipeName, 'w', common.mustCall((err, fd) => { + assert.ifError(err); + fs.writeSync(fd, 'Hello, world!\n'); + fs.closeSync(fd); +})); diff --git a/test/parallel/test-http2-respond-with-fd-errors.js b/test/parallel/test-http2-respond-with-fd-errors.js index 0eccd231c63a2e..3a671a3e36490a 100644 --- a/test/parallel/test-http2-respond-with-fd-errors.js +++ b/test/parallel/test-http2-respond-with-fd-errors.js @@ -46,8 +46,8 @@ const tests = specificTests.concat(genericTests); let currentError; -// mock respondFD because we only care about testing error handling -Http2Stream.prototype.respondFD = () => currentError.ngError; +// mock `respond` because we only care about testing error handling +Http2Stream.prototype.respond = () => currentError.ngError; const server = http2.createServer(); server.on('stream', common.mustCall((stream, headers) => { diff --git a/test/sequential/test-async-wrap-getasyncid.js b/test/sequential/test-async-wrap-getasyncid.js index 66eaabec25d977..64c4fd5cd8ab50 100644 --- a/test/sequential/test-async-wrap-getasyncid.js +++ b/test/sequential/test-async-wrap-getasyncid.js @@ -35,6 +35,7 @@ common.crashOnUnhandledRejection(); delete providers.HTTP2STREAM; delete providers.HTTP2PING; delete providers.HTTP2SETTINGS; + delete providers.STREAMPIPE; const objKeys = Object.keys(providers); if (objKeys.length > 0)