From 5e63bf5a7781b1b65b4ac7fc74135f056f1e0c16 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Sat, 24 Feb 2018 17:42:27 +0100 Subject: [PATCH 01/11] src: tighten handle scopes for stream operations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Put `HandleScope`s and `Context::Scope`s where they are used, and don’t create one for native stream callbacks automatically. This is slightly less convenient but means that stream listeners that don’t actually call back into JS don’t have to pay the (small) cost of setting these up. PR-URL: https://github.com/nodejs/node/pull/18936 Reviewed-By: James M Snell Reviewed-By: Matteo Collina --- src/node_http2.cc | 7 +++++-- src/stream_base-inl.h | 42 ++++++++++++++++-------------------------- src/stream_base.cc | 2 ++ src/stream_base.h | 13 ++++--------- src/tls_wrap.cc | 11 +++++++++++ 5 files changed, 38 insertions(+), 37 deletions(-) diff --git a/src/node_http2.cc b/src/node_http2.cc index d683b075bc2ed3..939e0011bdfa42 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -1132,6 +1132,8 @@ void Http2StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { Http2Stream* stream = static_cast(stream_); Http2Session* session = stream->session(); Environment* env = stream->env(); + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); if (nread < 0) { PassReadErrorToPreviousListener(nread); @@ -1422,6 +1424,7 @@ void Http2Session::OnStreamAfterWrite(WriteWrap* w, int status) { void Http2Session::MaybeScheduleWrite() { CHECK_EQ(flags_ & SESSION_STATE_WRITE_SCHEDULED, 0); if (session_ != nullptr && nghttp2_session_want_write(session_)) { + HandleScope handle_scope(env()->isolate()); DEBUG_HTTP2SESSION(this, "scheduling write"); flags_ |= SESSION_STATE_WRITE_SCHEDULED; env()->SetImmediate([](Environment* env, void* data) { @@ -1632,6 +1635,8 @@ inline Http2Stream* Http2Session::SubmitRequest( // Callback used to receive inbound data from the i/o stream void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { + HandleScope handle_scope(env()->isolate()); + Context::Scope context_scope(env()->context()); Http2Scope h2scope(this); CHECK_NE(stream_, nullptr); DEBUG_HTTP2SESSION2(this, "receiving %d bytes", nread); @@ -1661,8 +1666,6 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { CHECK_LE(static_cast(nread), stream_buf_.len); Isolate* isolate = env()->isolate(); - HandleScope scope(isolate); - Context::Scope context_scope(env()->context()); // Create an array buffer for the read data. DATA frames will be emitted // as slices of this array buffer to avoid having to copy memory. diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index 1534dcd1d53359..c87393e6fc1c72 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -106,22 +106,33 @@ inline void StreamResource::RemoveStreamListener(StreamListener* listener) { listener->previous_listener_ = nullptr; } - inline uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) { +#ifdef DEBUG + v8::SealHandleScope handle_scope(v8::Isolate::GetCurrent()); +#endif return listener_->OnStreamAlloc(suggested_size); } inline void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) { +#ifdef DEBUG + v8::SealHandleScope handle_scope(v8::Isolate::GetCurrent()); +#endif if (nread > 0) bytes_read_ += static_cast(nread); listener_->OnStreamRead(nread, buf); } inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) { +#ifdef DEBUG + v8::SealHandleScope handle_scope(v8::Isolate::GetCurrent()); +#endif listener_->OnStreamAfterWrite(w, status); } inline void StreamResource::EmitAfterShutdown(ShutdownWrap* w, int status) { +#ifdef DEBUG + v8::SealHandleScope handle_scope(v8::Isolate::GetCurrent()); +#endif listener_->OnStreamAfterShutdown(w, status); } @@ -133,29 +144,6 @@ inline Environment* StreamBase::stream_env() const { return env_; } -inline void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) { - AfterRequest(req_wrap, [&]() { - EmitAfterWrite(req_wrap, status); - }); -} - -inline void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) { - AfterRequest(req_wrap, [&]() { - EmitAfterShutdown(req_wrap, status); - }); -} - -template -inline void StreamBase::AfterRequest(Wrap* req_wrap, EmitEvent emit) { - Environment* env = stream_env(); - - v8::HandleScope handle_scope(env->isolate()); - v8::Context::Scope context_scope(env->context()); - - emit(); - req_wrap->Dispose(); -} - inline int StreamBase::Shutdown(v8::Local req_wrap_obj) { Environment* env = stream_env(); if (req_wrap_obj.IsEmpty()) { @@ -387,7 +375,8 @@ void StreamBase::JSMethod(const FunctionCallbackInfo& args) { inline void ShutdownWrap::OnDone(int status) { - stream()->AfterShutdown(this, status); + stream()->EmitAfterShutdown(this, status); + Dispose(); } inline void WriteWrap::SetAllocatedStorage(char* data, size_t size) { @@ -405,7 +394,8 @@ inline size_t WriteWrap::StorageSize() const { } inline void WriteWrap::OnDone(int status) { - stream()->AfterWrite(this, status); + stream()->EmitAfterWrite(this, status); + Dispose(); } inline void StreamReq::Done(int status, const char* error_str) { diff --git a/src/stream_base.cc b/src/stream_base.cc index 1d1d324841537f..8838a1a6dfb6b3 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -387,6 +387,8 @@ void ReportWritesToJSStreamListener::OnStreamAfterReqFinished( StreamBase* stream = static_cast(stream_); Environment* env = stream->stream_env(); AsyncWrap* async_wrap = req_wrap->GetAsyncWrap(); + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); Local req_wrap_obj = async_wrap->object(); Local argv[] = { diff --git a/src/stream_base.h b/src/stream_base.h index 8af05059f49e47..6962648650e1a6 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -61,7 +61,8 @@ class ShutdownWrap : public StreamReq { v8::Local req_wrap_obj) : StreamReq(stream, req_wrap_obj) { } - void OnDone(int status) override; // Just calls stream()->AfterShutdown() + // Call stream()->EmitAfterShutdown() and dispose of this request wrap. + void OnDone(int status) override; }; class WriteWrap : public StreamReq { @@ -78,7 +79,8 @@ class WriteWrap : public StreamReq { free(storage_); } - void OnDone(int status) override; // Just calls stream()->AfterWrite() + // Call stream()->EmitAfterWrite() and dispose of this request wrap. + void OnDone(int status) override; private: char* storage_ = nullptr; @@ -306,13 +308,6 @@ class StreamBase : public StreamResource { Environment* env_; EmitToJSStreamListener default_listener_; - // These are called by the respective {Write,Shutdown}Wrap class. - void AfterShutdown(ShutdownWrap* req, int status); - void AfterWrite(WriteWrap* req, int status); - - template - void AfterRequest(Wrap* req_wrap, EmitEvent emit); - friend class WriteWrap; friend class ShutdownWrap; }; diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index 1cc5478bb57296..cddef66c44a8e5 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -220,6 +220,8 @@ void TLSWrap::SSLInfoCallback(const SSL* ssl_, int where, int ret) { SSL* ssl = const_cast(ssl_); TLSWrap* c = static_cast(SSL_get_app_data(ssl)); Environment* env = c->env(); + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); Local object = c->object(); if (where & SSL_CB_HANDSHAKE_START) { @@ -289,6 +291,8 @@ void TLSWrap::EncOut() { NODE_COUNT_NET_BYTES_SENT(write_size_); if (!res.async) { + HandleScope handle_scope(env()->isolate()); + // Simulate asynchronous finishing, TLS cannot handle this at the moment. env()->SetImmediate([](Environment* env, void* data) { static_cast(data)->OnStreamAfterWrite(nullptr, 0); @@ -427,6 +431,7 @@ void TLSWrap::ClearOut() { // shutdown cleanly (SSL_ERROR_ZERO_RETURN) even when read == 0. // See node#1642 and SSL_read(3SSL) for details. if (read <= 0) { + HandleScope handle_scope(env()->isolate()); int err; Local arg = GetSSLError(read, &err, nullptr); @@ -477,6 +482,9 @@ bool TLSWrap::ClearIn() { } // Error or partial write + HandleScope handle_scope(env()->isolate()); + Context::Scope context_scope(env()->context()); + int err; std::string error_str; Local arg = GetSSLError(written, &err, &error_str); @@ -814,6 +822,9 @@ int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) { if (servername == nullptr) return SSL_TLSEXT_ERR_OK; + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + // Call the SNI callback and use its return value as context Local object = p->object(); Local ctx = object->Get(env->sni_context_string()); From 68a6d157db7a9001d769518edf2850d6698c36c6 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Mon, 12 Feb 2018 22:33:20 +0100 Subject: [PATCH 02/11] src: make `FileHandle` a (readonly) `StreamBase` This enables accessing files using a more standard pattern. Once some more refactoring has been performed on the other existing `StreamBase` streams, this could also be used to implement `fs` streams in a more standard manner. PR-URL: https://github.com/nodejs/node/pull/18936 Reviewed-By: James M Snell Reviewed-By: Matteo Collina --- src/env-inl.h | 5 ++ src/env.cc | 1 + src/env.h | 11 +++ src/node_file.cc | 202 ++++++++++++++++++++++++++++++++++++++++-- src/node_file.h | 62 ++++++++++++- src/stream_base-inl.h | 5 ++ src/stream_base.h | 5 +- src/util.h | 2 +- 8 files changed, 279 insertions(+), 14 deletions(-) diff --git a/src/env-inl.h b/src/env-inl.h index 31f2af402417dc..05c7a90c52fee7 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -484,6 +484,11 @@ Environment::fs_stats_field_array() { return &fs_stats_field_array_; } +inline std::vector>& +Environment::file_handle_read_wrap_freelist() { + return file_handle_read_wrap_freelist_; +} + void Environment::CreateImmediate(native_immediate_callback cb, void* data, v8::Local obj, diff --git a/src/env.cc b/src/env.cc index 36a3612a780731..d361262e22ba09 100644 --- a/src/env.cc +++ b/src/env.cc @@ -2,6 +2,7 @@ #include "async_wrap.h" #include "node_buffer.h" #include "node_platform.h" +#include "node_file.h" #include #include diff --git a/src/env.h b/src/env.h index c778e7ba112340..e0f6856f8d0b1d 100644 --- a/src/env.h +++ b/src/env.h @@ -48,6 +48,10 @@ struct nghttp2_rcbuf; namespace node { +namespace fs { +class FileHandleReadWrap; +} + namespace performance { class performance_state; } @@ -297,6 +301,7 @@ struct PackageConfig { V(context, v8::Context) \ V(domain_callback, v8::Function) \ V(fd_constructor_template, v8::ObjectTemplate) \ + V(filehandlereadwrap_template, v8::ObjectTemplate) \ V(fsreqpromise_constructor_template, v8::ObjectTemplate) \ V(fdclose_constructor_template, v8::ObjectTemplate) \ V(host_import_module_dynamically_callback, v8::Function) \ @@ -642,6 +647,9 @@ class Environment { inline AliasedBuffer* fs_stats_field_array(); + inline std::vector>& + file_handle_read_wrap_freelist(); + inline performance::performance_state* performance_state(); inline std::map* performance_marks(); @@ -822,6 +830,9 @@ class Environment { static const int kFsStatsFieldsLength = 2 * 14; AliasedBuffer fs_stats_field_array_; + std::vector> + file_handle_read_wrap_freelist_; + struct ExitCallback { void (*cb_)(void* arg); void* arg_; diff --git a/src/node_file.cc b/src/node_file.cc index fe3b0e1383e8cb..36bb326aa51517 100644 --- a/src/node_file.cc +++ b/src/node_file.cc @@ -26,6 +26,7 @@ #include "node_file.h" #include "req_wrap-inl.h" +#include "stream_base-inl.h" #include "string_bytes.h" #include "string_search.h" @@ -41,7 +42,6 @@ #endif #include -#include namespace node { @@ -115,11 +115,13 @@ using v8::Value; // The FileHandle object wraps a file descriptor and will close it on garbage // collection if necessary. If that happens, a process warning will be // emitted (or a fatal exception will occur if the fd cannot be closed.) -FileHandle::FileHandle(Environment* env, int fd) +FileHandle::FileHandle(Environment* env, int fd, Local obj) : AsyncWrap(env, - env->fd_constructor_template() - ->NewInstance(env->context()).ToLocalChecked(), - AsyncWrap::PROVIDER_FILEHANDLE), fd_(fd) { + obj.IsEmpty() ? env->fd_constructor_template() + ->NewInstance(env->context()).ToLocalChecked() : obj, + AsyncWrap::PROVIDER_FILEHANDLE), + StreamBase(env), + fd_(fd) { MakeWeak(this); v8::PropertyAttribute attr = static_cast(v8::ReadOnly | v8::DontDelete); @@ -129,6 +131,19 @@ FileHandle::FileHandle(Environment* env, int fd) attr).FromJust(); } +void FileHandle::New(const v8::FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + CHECK(args.IsConstructCall()); + CHECK(args[0]->IsInt32()); + + FileHandle* handle = + new FileHandle(env, args[0].As()->Value(), args.This()); + if (args[1]->IsNumber()) + handle->read_offset_ = args[1]->IntegerValue(env->context()).FromJust(); + if (args[2]->IsNumber()) + handle->read_length_ = args[2]->IntegerValue(env->context()).FromJust(); +} + FileHandle::~FileHandle() { CHECK(!closing_); // We should not be deleting while explicitly closing! Close(); // Close synchronously and emit warning @@ -142,10 +157,10 @@ FileHandle::~FileHandle() { // will crash the process immediately. inline void FileHandle::Close() { if (closed_) return; - closed_ = true; uv_fs_t req; int ret = uv_fs_close(env()->event_loop(), &req, fd_, nullptr); uv_fs_req_cleanup(&req); + AfterClose(); struct err_detail { int ret; int fd; }; @@ -219,18 +234,18 @@ inline MaybeLocal FileHandle::ClosePromise() { CHECK(!maybe_resolver.IsEmpty()); Local resolver = maybe_resolver.ToLocalChecked(); Local promise = resolver.As(); + CHECK(!reading_); if (!closed_ && !closing_) { closing_ = true; CloseReq* req = new CloseReq(env(), promise, object()); auto AfterClose = [](uv_fs_t* req) { CloseReq* close = static_cast(req->data); CHECK_NE(close, nullptr); - close->file_handle()->closing_ = false; + close->file_handle()->AfterClose(); Isolate* isolate = close->env()->isolate(); if (req->result < 0) { close->Reject(UVException(isolate, req->result, "close")); } else { - close->file_handle()->closed_ = true; close->Resolve(); } delete close; @@ -256,6 +271,162 @@ void FileHandle::Close(const FunctionCallbackInfo& args) { } +void FileHandle::ReleaseFD(const FunctionCallbackInfo& args) { + FileHandle* fd; + ASSIGN_OR_RETURN_UNWRAP(&fd, args.Holder()); + // Just act as if this FileHandle has been closed. + fd->AfterClose(); +} + + +void FileHandle::AfterClose() { + closing_ = false; + closed_ = true; + if (reading_ && !persistent().IsEmpty()) + EmitRead(UV_EOF); +} + + +FileHandleReadWrap::FileHandleReadWrap(FileHandle* handle, Local obj) + : ReqWrap(handle->env(), obj, AsyncWrap::PROVIDER_FSREQWRAP), + file_handle_(handle) {} + +int FileHandle::ReadStart() { + if (!IsAlive() || IsClosing()) + return UV_EOF; + + reading_ = true; + + if (current_read_) + return 0; + + std::unique_ptr read_wrap; + + if (read_length_ == 0) { + EmitRead(UV_EOF); + return 0; + } + + { + // Create a new FileHandleReadWrap or re-use one. + // Either way, we need these two scopes for AsyncReset() or otherwise + // for creating the new instance. + HandleScope handle_scope(env()->isolate()); + AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(this); + + auto& freelist = env()->file_handle_read_wrap_freelist(); + if (freelist.size() > 0) { + read_wrap = std::move(freelist.back()); + freelist.pop_back(); + read_wrap->AsyncReset(); + read_wrap->file_handle_ = this; + } else { + Local wrap_obj = env()->filehandlereadwrap_template() + ->NewInstance(env()->context()).ToLocalChecked(); + read_wrap.reset(new FileHandleReadWrap(this, wrap_obj)); + } + } + int64_t recommended_read = 65536; + if (read_length_ >= 0 && read_length_ <= recommended_read) + recommended_read = read_length_; + + read_wrap->buffer_ = EmitAlloc(recommended_read); + read_wrap->Dispatched(); + + current_read_ = std::move(read_wrap); + + uv_fs_read(env()->event_loop(), + current_read_->req(), + fd_, + ¤t_read_->buffer_, + 1, + read_offset_, + [](uv_fs_t* req) { + FileHandle* handle; + { + FileHandleReadWrap* req_wrap = FileHandleReadWrap::from_req(req); + handle = req_wrap->file_handle_; + CHECK_EQ(handle->current_read_.get(), req_wrap); + } + + // ReadStart() checks whether current_read_ is set to determine whether + // a read is in progress. Moving it into a local variable makes sure that + // the ReadStart() call below doesn’t think we’re still actively reading. + std::unique_ptr read_wrap = + std::move(handle->current_read_); + + int result = req->result; + uv_buf_t buffer = read_wrap->buffer_; + + uv_fs_req_cleanup(req); + + // Push the read wrap back to the freelist, or let it be destroyed + // once we’re exiting the current scope. + constexpr size_t wanted_freelist_fill = 100; + auto& freelist = handle->env()->file_handle_read_wrap_freelist(); + if (freelist.size() < wanted_freelist_fill) + freelist.emplace_back(std::move(read_wrap)); + + if (result >= 0) { + // Read at most as many bytes as we originally planned to. + if (handle->read_length_ >= 0 && handle->read_length_ < result) + result = handle->read_length_; + + // If we read data and we have an expected length, decrease it by + // how much we have read. + if (handle->read_length_ >= 0) + handle->read_length_ -= result; + + // If we have an offset, increase it by how much we have read. + if (handle->read_offset_ >= 0) + handle->read_offset_ += result; + } + + // Reading 0 bytes from a file always means EOF, or that we reached + // the end of the requested range. + if (result == 0) + result = UV_EOF; + + handle->EmitRead(result, buffer); + + // Start over, if EmitRead() didn’t tell us to stop. + if (handle->reading_) + handle->ReadStart(); + }); + + return 0; +} + +int FileHandle::ReadStop() { + reading_ = false; + return 0; +} + +typedef SimpleShutdownWrap> FileHandleCloseWrap; + +ShutdownWrap* FileHandle::CreateShutdownWrap(Local object) { + return new FileHandleCloseWrap(this, object); +} + +int FileHandle::DoShutdown(ShutdownWrap* req_wrap) { + FileHandleCloseWrap* wrap = static_cast(req_wrap); + closing_ = true; + wrap->Dispatched(); + uv_fs_close(env()->event_loop(), wrap->req(), fd_, [](uv_fs_t* req) { + FileHandleCloseWrap* wrap = static_cast( + FileHandleCloseWrap::from_req(req)); + FileHandle* handle = static_cast(wrap->stream()); + handle->AfterClose(); + + int result = req->result; + uv_fs_req_cleanup(req); + wrap->Done(result); + }); + + return 0; +} + + void FSReqWrap::Reject(Local reject) { MakeCallback(env()->oncomplete_string(), 1, &reject); } @@ -1730,6 +1901,17 @@ void InitFs(Local target, fst->SetClassName(wrapString); target->Set(context, wrapString, fst->GetFunction()).FromJust(); + // Create FunctionTemplate for FileHandleReadWrap. There’s no need + // to do anything in the constructor, so we only store the instance template. + Local fh_rw = FunctionTemplate::New(env->isolate()); + fh_rw->InstanceTemplate()->SetInternalFieldCount(1); + AsyncWrap::AddWrapMethods(env, fh_rw); + Local fhWrapString = + FIXED_ONE_BYTE_STRING(env->isolate(), "FileHandleReqWrap"); + fh_rw->SetClassName(fhWrapString); + env->set_filehandlereadwrap_template( + fst->InstanceTemplate()); + // Create Function Template for FSReqPromise Local fpt = FunctionTemplate::New(env->isolate()); AsyncWrap::AddWrapMethods(env, fpt); @@ -1741,14 +1923,16 @@ void InitFs(Local target, env->set_fsreqpromise_constructor_template(fpo); // Create FunctionTemplate for FileHandle - Local fd = FunctionTemplate::New(env->isolate()); + Local fd = env->NewFunctionTemplate(FileHandle::New); AsyncWrap::AddWrapMethods(env, fd); env->SetProtoMethod(fd, "close", FileHandle::Close); + env->SetProtoMethod(fd, "releaseFD", FileHandle::ReleaseFD); Local fdt = fd->InstanceTemplate(); fdt->SetInternalFieldCount(1); Local handleString = FIXED_ONE_BYTE_STRING(env->isolate(), "FileHandle"); fd->SetClassName(handleString); + StreamBase::AddMethods(env, fd, StreamBase::kFlagNone); target->Set(context, handleString, fd->GetFunction()).FromJust(); env->set_fd_constructor_template(fdt); diff --git a/src/node_file.h b/src/node_file.h index bf277a0e433525..fa373d46ad0003 100644 --- a/src/node_file.h +++ b/src/node_file.h @@ -4,6 +4,7 @@ #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS #include "node.h" +#include "stream_base.h" #include "req_wrap-inl.h" namespace node { @@ -20,6 +21,7 @@ using v8::Value; namespace fs { + class FSReqBase : public ReqWrap { public: FSReqBase(Environment* env, Local req, AsyncWrap::ProviderType type) @@ -120,13 +122,38 @@ class FSReqAfterScope { Context::Scope context_scope_; }; +class FileHandle; + +// A request wrap specifically for uv_fs_read()s scheduled for reading +// from a FileHandle. +class FileHandleReadWrap : public ReqWrap { + public: + FileHandleReadWrap(FileHandle* handle, v8::Local obj); + + static inline FileHandleReadWrap* from_req(uv_fs_t* req) { + return static_cast(ReqWrap::from_req(req)); + } + + size_t self_size() const override { return sizeof(*this); } + + private: + FileHandle* file_handle_; + uv_buf_t buffer_; + + friend class FileHandle; +}; + // A wrapper for a file descriptor that will automatically close the fd when // the object is garbage collected -class FileHandle : public AsyncWrap { +class FileHandle : public AsyncWrap, public StreamBase { public: - FileHandle(Environment* env, int fd); + FileHandle(Environment* env, + int fd, + v8::Local obj = v8::Local()); virtual ~FileHandle(); + static void New(const v8::FunctionCallbackInfo& args); + int fd() const { return fd_; } size_t self_size() const override { return sizeof(*this); } @@ -134,9 +161,32 @@ class FileHandle : public AsyncWrap { // be resolved once closing is complete. static void Close(const FunctionCallbackInfo& args); + // Releases ownership of the FD. + static void ReleaseFD(const FunctionCallbackInfo& args); + + // StreamBase interface: + int ReadStart() override; + int ReadStop() override; + + bool IsAlive() override { return !closed_; } + bool IsClosing() override { return closing_; } + AsyncWrap* GetAsyncWrap() override { return this; } + + // In the case of file streams, shutting down corresponds to closing. + ShutdownWrap* CreateShutdownWrap(v8::Local object) override; + int DoShutdown(ShutdownWrap* req_wrap) override; + + int DoWrite(WriteWrap* w, + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle) override { + return UV_ENOSYS; // Not implemented (yet). + } + private: // Synchronous close that emits a warning - inline void Close(); + void Close(); + void AfterClose(); class CloseReq : public ReqWrap { public: @@ -176,6 +226,12 @@ class FileHandle : public AsyncWrap { int fd_; bool closing_ = false; bool closed_ = false; + int64_t read_offset_ = -1; + int64_t read_length_ = -1; + + bool reading_ = false; + std::unique_ptr current_read_ = nullptr; + DISALLOW_COPY_AND_ASSIGN(FileHandle); }; diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index c87393e6fc1c72..b7495a80ac63e0 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -146,6 +146,9 @@ inline Environment* StreamBase::stream_env() const { inline int StreamBase::Shutdown(v8::Local req_wrap_obj) { Environment* env = stream_env(); + + HandleScope handle_scope(env->isolate()); + if (req_wrap_obj.IsEmpty()) { req_wrap_obj = env->shutdown_wrap_constructor_function() @@ -183,6 +186,8 @@ inline StreamWriteResult StreamBase::Write( } } + HandleScope handle_scope(env->isolate()); + if (req_wrap_obj.IsEmpty()) { req_wrap_obj = env->write_wrap_constructor_function() diff --git a/src/stream_base.h b/src/stream_base.h index 6962648650e1a6..f3e010d5bc94c9 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -95,7 +95,7 @@ class StreamListener { public: virtual ~StreamListener(); - // This is called when a stream wants to allocate memory immediately before + // This is called when a stream wants to allocate memory before // reading data into the freshly allocated buffer (i.e. it is always followed // by a `OnStreamRead()` call). // This memory may be statically or dynamically allocated; for example, @@ -105,6 +105,9 @@ class StreamListener { // The returned buffer does not need to contain `suggested_size` bytes. // The default implementation of this method returns a buffer that has exactly // the suggested size and is allocated using malloc(). + // It is not valid to return a zero-length buffer from this method. + // It is not guaranteed that the corresponding `OnStreamRead()` call + // happens in the same event loop turn as this call. virtual uv_buf_t OnStreamAlloc(size_t suggested_size); // `OnStreamRead()` is called when data is available on the socket and has diff --git a/src/util.h b/src/util.h index 7c679952d5fb1f..c822390ec56f8b 100644 --- a/src/util.h +++ b/src/util.h @@ -34,6 +34,7 @@ #include #include +#include // std::function #include // std::remove_reference namespace node { @@ -433,7 +434,6 @@ class BufferValue : public MaybeStackBuffer { // Use this when a variable or parameter is unused in order to explicitly // silence a compiler warning about that. template inline void USE(T&&) {} - } // namespace node #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS From 80d958025b81eb43fa1b8ec4173396c04f50f89a Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Thu, 22 Feb 2018 14:37:58 +0100 Subject: [PATCH 03/11] src: give StreamBases the capability to ask for data Add a `OnStreamWantsWrite()` event that allows streams to ask for more input data if they want some. PR-URL: https://github.com/nodejs/node/pull/18936 Reviewed-By: James M Snell Reviewed-By: Matteo Collina --- src/node_http2.cc | 5 +++++ src/node_http2.h | 2 ++ src/stream_base-inl.h | 7 +++++++ src/stream_base.h | 12 ++++++++++++ 4 files changed, 26 insertions(+) diff --git a/src/node_http2.cc b/src/node_http2.cc index 939e0011bdfa42..8dd222a692a52c 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -2219,6 +2219,11 @@ ssize_t Http2Stream::Provider::Stream::OnRead(nghttp2_session* handle, if (amount == 0 && stream->IsWritable()) { CHECK(stream->queue_.empty()); DEBUG_HTTP2SESSION2(session, "deferring stream %d", id); + stream->EmitWantsWrite(length); + if (stream->available_outbound_length_ > 0 || !stream->IsWritable()) { + // EmitWantsWrite() did something interesting synchronously, restart: + return OnRead(handle, id, buf, length, flags, source, user_data); + } return NGHTTP2_ERR_DEFERRED; } diff --git a/src/node_http2.h b/src/node_http2.h index 08109dcf046ba4..2d55989fd7d2e7 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -573,6 +573,8 @@ class Http2Stream : public AsyncWrap, // Required for StreamBase int DoShutdown(ShutdownWrap* req_wrap) override; + bool HasWantsWrite() const override { return true; } + // Initiate a response on this stream. inline int SubmitResponse(nghttp2_nv* nva, size_t len, diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index b7495a80ac63e0..f0d522a7b06b6c 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -136,6 +136,13 @@ inline void StreamResource::EmitAfterShutdown(ShutdownWrap* w, int status) { listener_->OnStreamAfterShutdown(w, status); } +inline void StreamResource::EmitWantsWrite(size_t suggested_size) { +#ifdef DEBUG + v8::SealHandleScope handle_scope(v8::Isolate::GetCurrent()); +#endif + listener_->OnStreamWantsWrite(suggested_size); +} + inline StreamBase::StreamBase(Environment* env) : env_(env) { PushStreamListener(&default_listener_); } diff --git a/src/stream_base.h b/src/stream_base.h index f3e010d5bc94c9..96a7787e5bb41c 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -131,6 +131,13 @@ class StreamListener { // (and raises an assertion if there is none). virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status); + // This is called by the stream if it determines that it wants more data + // to be written to it. Not all streams support this. + // This callback will not be called as long as there are active writes. + // It is not supported by all streams; `stream->HasWantsWrite()` returns + // true if it is supported by a stream. + virtual void OnStreamWantsWrite(size_t suggested_size) {} + // This is called immediately before the stream is destroyed. virtual void OnStreamDestroy() {} @@ -199,6 +206,9 @@ class StreamResource { size_t count, uv_stream_t* send_handle) = 0; + // Returns true if the stream supports the `OnStreamWantsWrite()` interface. + virtual bool HasWantsWrite() const { return false; } + // Optionally, this may provide an error message to be used for // failing writes. virtual const char* Error() const; @@ -222,6 +232,8 @@ class StreamResource { void EmitAfterWrite(WriteWrap* w, int status); // Call the current listener's OnStreamAfterShutdown() method. void EmitAfterShutdown(ShutdownWrap* w, int status); + // Call the current listener's OnStreamWantsWrite() method. + void EmitWantsWrite(size_t suggested_size); StreamListener* listener_ = nullptr; uint64_t bytes_read_ = 0; From 21587bafc09a28060f0a1c774f0b26b2fad18112 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Wed, 7 Mar 2018 17:55:24 +0100 Subject: [PATCH 04/11] src: add helper for before/after scope without JS calls Add `AsyncScope` for cases where the async_hooks `before` and `after` callbacks should be called, to track async context, but no actual JS is called in between and we can therefore skip things like draining the microtask or `nextTick` queues. PR-URL: https://github.com/nodejs/node/pull/18936 Reviewed-By: James M Snell Reviewed-By: Matteo Collina --- src/async_wrap-inl.h | 16 ++++++++++++++++ src/async_wrap.h | 12 ++++++++++++ 2 files changed, 28 insertions(+) diff --git a/src/async_wrap-inl.h b/src/async_wrap-inl.h index 21b1f9cee9f0e8..c9f12333243092 100644 --- a/src/async_wrap-inl.h +++ b/src/async_wrap-inl.h @@ -45,6 +45,22 @@ inline double AsyncWrap::get_trigger_async_id() const { } +inline AsyncWrap::AsyncScope::AsyncScope(AsyncWrap* wrap) + : wrap_(wrap) { + Environment* env = wrap->env(); + if (env->async_hooks()->fields()[Environment::AsyncHooks::kBefore] == 0) + return; + EmitBefore(env, wrap->get_async_id()); +} + +inline AsyncWrap::AsyncScope::~AsyncScope() { + Environment* env = wrap_->env(); + if (env->async_hooks()->fields()[Environment::AsyncHooks::kAfter] == 0) + return; + EmitAfter(env, wrap_->get_async_id()); +} + + inline v8::MaybeLocal AsyncWrap::MakeCallback( const v8::Local symbol, int argc, diff --git a/src/async_wrap.h b/src/async_wrap.h index b7aed5d789754b..608764bab5361c 100644 --- a/src/async_wrap.h +++ b/src/async_wrap.h @@ -169,6 +169,18 @@ class AsyncWrap : public BaseObject { static void WeakCallback(const v8::WeakCallbackInfo &info); + // This is a simplified version of InternalCallbackScope that only runs + // the `before` and `after` hooks. Only use it when not actually calling + // back into JS; otherwise, use InternalCallbackScope. + class AsyncScope { + public: + explicit inline AsyncScope(AsyncWrap* wrap); + ~AsyncScope(); + + private: + AsyncWrap* wrap_ = nullptr; + }; + private: friend class PromiseWrap; From 0414d8866cceb660c275bb6b645c843e50d41a25 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Tue, 13 Feb 2018 01:23:50 +0100 Subject: [PATCH 05/11] src: introduce native-layer stream piping Provide a way to create pipes between native `StreamBase` instances that acts more directly than a `.pipe()` call would. PR-URL: https://github.com/nodejs/node/pull/18936 Reviewed-By: James M Snell Reviewed-By: Matteo Collina --- node.gyp | 2 + src/async_wrap.h | 1 + src/env.h | 5 + src/node_http2.cc | 4 +- src/node_internals.h | 1 + src/stream_base-inl.h | 10 +- src/stream_base.h | 3 + src/stream_pipe.cc | 265 ++++++++++++++++++ src/stream_pipe.h | 68 +++++ test/sequential/test-async-wrap-getasyncid.js | 1 + 10 files changed, 357 insertions(+), 3 deletions(-) create mode 100644 src/stream_pipe.cc create mode 100644 src/stream_pipe.h diff --git a/node.gyp b/node.gyp index 5efe2323599cff..1b047fe9ac52f2 100644 --- a/node.gyp +++ b/node.gyp @@ -338,6 +338,7 @@ 'src/string_decoder.cc', 'src/string_search.cc', 'src/stream_base.cc', + 'src/stream_pipe.cc', 'src/stream_wrap.cc', 'src/tcp_wrap.cc', 'src/timer_wrap.cc', @@ -394,6 +395,7 @@ 'src/string_decoder-inl.h', 'src/stream_base.h', 'src/stream_base-inl.h', + 'src/stream_pipe.h', 'src/stream_wrap.h', 'src/tracing/agent.h', 'src/tracing/node_trace_buffer.h', diff --git a/src/async_wrap.h b/src/async_wrap.h index 608764bab5361c..f0689d32f3c69f 100644 --- a/src/async_wrap.h +++ b/src/async_wrap.h @@ -58,6 +58,7 @@ namespace node { V(SHUTDOWNWRAP) \ V(SIGNALWRAP) \ V(STATWATCHER) \ + V(STREAMPIPE) \ V(TCPCONNECTWRAP) \ V(TCPSERVERWRAP) \ V(TCPWRAP) \ diff --git a/src/env.h b/src/env.h index e0f6856f8d0b1d..4fc6b31ffebd21 100644 --- a/src/env.h +++ b/src/env.h @@ -222,6 +222,7 @@ struct PackageConfig { V(onstop_string, "onstop") \ V(onstreamclose_string, "onstreamclose") \ V(ontrailers_string, "ontrailers") \ + V(onunpipe_string, "onunpipe") \ V(onwrite_string, "onwrite") \ V(openssl_error_stack, "opensslErrorStack") \ V(output_string, "output") \ @@ -233,6 +234,8 @@ struct PackageConfig { V(pbkdf2_error_string, "PBKDF2 Error") \ V(pid_string, "pid") \ V(pipe_string, "pipe") \ + V(pipe_target_string, "pipeTarget") \ + V(pipe_source_string, "pipeSource") \ V(port_string, "port") \ V(preference_string, "preference") \ V(priority_string, "priority") \ @@ -255,9 +258,11 @@ struct PackageConfig { V(session_id_string, "sessionId") \ V(shell_string, "shell") \ V(signal_string, "signal") \ + V(sink_string, "sink") \ V(size_string, "size") \ V(sni_context_err_string, "Invalid SNI context") \ V(sni_context_string, "sni_context") \ + V(source_string, "source") \ V(stack_string, "stack") \ V(status_string, "status") \ V(stdio_string, "stdio") \ diff --git a/src/node_http2.cc b/src/node_http2.cc index 8dd222a692a52c..d6df93cf3804a7 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -1813,7 +1813,9 @@ inline void Http2Stream::Close(int32_t code) { } int Http2Stream::DoShutdown(ShutdownWrap* req_wrap) { - CHECK(!this->IsDestroyed()); + if (IsDestroyed()) + return UV_EPIPE; + { Http2Scope h2scope(this); flags_ |= NGHTTP2_STREAM_FLAG_SHUT; diff --git a/src/node_internals.h b/src/node_internals.h index 2faa6f93475ad7..79c2ce553200f3 100644 --- a/src/node_internals.h +++ b/src/node_internals.h @@ -120,6 +120,7 @@ struct sockaddr; V(serdes) \ V(signal_wrap) \ V(spawn_sync) \ + V(stream_pipe) \ V(stream_wrap) \ V(string_decoder) \ V(tcp_wrap) \ diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index f0d522a7b06b6c..7523b3a545355f 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -67,8 +67,14 @@ inline void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) { inline StreamResource::~StreamResource() { while (listener_ != nullptr) { - listener_->OnStreamDestroy(); - RemoveStreamListener(listener_); + StreamListener* listener = listener_; + listener->OnStreamDestroy(); + // Remove the listener if it didn’t remove itself. This makes the logic + // logic in `OnStreamDestroy()` implementations easier, because they + // may call generic cleanup functions which can just remove the + // listener unconditionally. + if (listener == listener_) + RemoveStreamListener(listener_); } } diff --git a/src/stream_base.h b/src/stream_base.h index 96a7787e5bb41c..7264824265a579 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -141,6 +141,9 @@ class StreamListener { // This is called immediately before the stream is destroyed. virtual void OnStreamDestroy() {} + // The stream this is currently associated with, or nullptr if there is none. + inline StreamResource* stream() { return stream_; } + protected: // Pass along a read error to the `StreamListener` instance that was active // before this one. For example, a protocol parser does not care about read diff --git a/src/stream_pipe.cc b/src/stream_pipe.cc new file mode 100644 index 00000000000000..9ad90610b361f4 --- /dev/null +++ b/src/stream_pipe.cc @@ -0,0 +1,265 @@ +#include "stream_pipe.h" +#include "stream_base-inl.h" +#include "node_buffer.h" +#include "node_internals.h" + +using v8::Context; +using v8::External; +using v8::FunctionCallbackInfo; +using v8::FunctionTemplate; +using v8::Local; +using v8::Object; +using v8::Value; + +namespace node { + +StreamPipe::StreamPipe(StreamBase* source, + StreamBase* sink, + Local obj) + : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) { + MakeWeak(this); + + CHECK_NE(sink, nullptr); + CHECK_NE(source, nullptr); + + source->PushStreamListener(&readable_listener_); + sink->PushStreamListener(&writable_listener_); + + CHECK(sink->HasWantsWrite()); + + // Set up links between this object and the source/sink objects. + // In particular, this makes sure that they are garbage collected as a group, + // if that applies to the given streams (for example, Http2Streams use + // weak references). + obj->Set(env()->context(), env()->source_string(), source->GetObject()) + .FromJust(); + source->GetObject()->Set(env()->context(), env()->pipe_target_string(), obj) + .FromJust(); + obj->Set(env()->context(), env()->sink_string(), sink->GetObject()) + .FromJust(); + sink->GetObject()->Set(env()->context(), env()->pipe_source_string(), obj) + .FromJust(); +} + +StreamPipe::~StreamPipe() { + CHECK(is_closed_); +} + +StreamBase* StreamPipe::source() { + return static_cast(readable_listener_.stream()); +} + +StreamBase* StreamPipe::sink() { + return static_cast(writable_listener_.stream()); +} + +void StreamPipe::Unpipe() { + if (is_closed_) + return; + + // Note that we cannot use virtual methods on `source` and `sink` here, + // because this function can be called from their destructors via + // `OnStreamDestroy()`. + + is_closed_ = true; + is_reading_ = false; + source()->RemoveStreamListener(&readable_listener_); + sink()->RemoveStreamListener(&writable_listener_); + + // Delay the JS-facing part with SetImmediate, because this might be from + // inside the garbage collector, so we can’t run JS here. + env()->SetImmediate([](Environment* env, void* data) { + StreamPipe* pipe = static_cast(data); + + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + Local object = pipe->object(); + + if (object->Has(env->context(), env->onunpipe_string()).FromJust()) { + pipe->MakeCallback(env->onunpipe_string(), 0, nullptr).ToLocalChecked(); + } + + // Set all the links established in the constructor to `null`. + Local null = Null(env->isolate()); + + Local source_v; + Local sink_v; + source_v = object->Get(env->context(), env->source_string()) + .ToLocalChecked(); + sink_v = object->Get(env->context(), env->sink_string()) + .ToLocalChecked(); + CHECK(source_v->IsObject()); + CHECK(sink_v->IsObject()); + + object->Set(env->context(), env->source_string(), null).FromJust(); + object->Set(env->context(), env->sink_string(), null).FromJust(); + source_v.As()->Set(env->context(), + env->pipe_target_string(), + null).FromJust(); + sink_v.As()->Set(env->context(), + env->pipe_source_string(), + null).FromJust(); + }, static_cast(this), object()); +} + +uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) { + StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this); + size_t size = std::min(suggested_size, pipe->wanted_data_); + CHECK_GT(size, 0); + return uv_buf_init(Malloc(size), size); +} + +void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread, + const uv_buf_t& buf) { + StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this); + AsyncScope async_scope(pipe); + if (nread < 0) { + // EOF or error; stop reading and pass the error to the previous listener + // (which might end up in JS). + free(buf.base); + pipe->is_eof_ = true; + stream()->ReadStop(); + CHECK_NE(previous_listener_, nullptr); + previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0)); + // If we’re not writing, close now. Otherwise, we’ll do that in + // `OnStreamAfterWrite()`. + if (!pipe->is_writing_) { + pipe->ShutdownWritable(); + pipe->Unpipe(); + } + return; + } + + pipe->ProcessData(nread, buf); +} + +void StreamPipe::ProcessData(size_t nread, const uv_buf_t& buf) { + uv_buf_t buffer = uv_buf_init(buf.base, nread); + StreamWriteResult res = sink()->Write(&buffer, 1); + if (!res.async) { + free(buf.base); + writable_listener_.OnStreamAfterWrite(nullptr, res.err); + } else { + is_writing_ = true; + is_reading_ = false; + res.wrap->SetAllocatedStorage(buf.base, buf.len); + source()->ReadStop(); + } +} + +void StreamPipe::ShutdownWritable() { + sink()->Shutdown(); +} + +void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w, + int status) { + StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this); + pipe->is_writing_ = false; + if (pipe->is_eof_) { + AsyncScope async_scope(pipe); + pipe->ShutdownWritable(); + pipe->Unpipe(); + return; + } + + if (status != 0) { + CHECK_NE(previous_listener_, nullptr); + StreamListener* prev = previous_listener_; + pipe->Unpipe(); + prev->OnStreamAfterWrite(w, status); + return; + } +} + +void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w, + int status) { + StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this); + CHECK_NE(previous_listener_, nullptr); + StreamListener* prev = previous_listener_; + pipe->Unpipe(); + prev->OnStreamAfterShutdown(w, status); +} + +void StreamPipe::ReadableListener::OnStreamDestroy() { + StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this); + if (!pipe->is_eof_) { + OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0)); + } +} + +void StreamPipe::WritableListener::OnStreamDestroy() { + StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this); + pipe->is_eof_ = true; + pipe->Unpipe(); +} + +void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) { + StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this); + pipe->wanted_data_ = suggested_size; + if (pipe->is_reading_ || pipe->is_closed_) + return; + AsyncScope async_scope(pipe); + pipe->is_reading_ = true; + pipe->source()->ReadStart(); +} + +uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) { + CHECK_NE(previous_listener_, nullptr); + return previous_listener_->OnStreamAlloc(suggested_size); +} + +void StreamPipe::WritableListener::OnStreamRead(ssize_t nread, + const uv_buf_t& buf) { + CHECK_NE(previous_listener_, nullptr); + return previous_listener_->OnStreamRead(nread, buf); +} + +void StreamPipe::New(const FunctionCallbackInfo& args) { + CHECK(args.IsConstructCall()); + CHECK(args[0]->IsExternal()); + CHECK(args[1]->IsExternal()); + auto source = static_cast(args[0].As()->Value()); + auto sink = static_cast(args[1].As()->Value()); + + new StreamPipe(source, sink, args.This()); +} + +void StreamPipe::Start(const FunctionCallbackInfo& args) { + StreamPipe* pipe; + ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder()); + pipe->is_closed_ = false; + if (pipe->wanted_data_ > 0) + pipe->writable_listener_.OnStreamWantsWrite(pipe->wanted_data_); +} + +void StreamPipe::Unpipe(const FunctionCallbackInfo& args) { + StreamPipe* pipe; + ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder()); + pipe->Unpipe(); +} + +namespace { + +void InitializeStreamPipe(Local target, + Local unused, + Local context) { + Environment* env = Environment::GetCurrent(context); + + // Create FunctionTemplate for FileHandle::CloseReq + Local pipe = env->NewFunctionTemplate(StreamPipe::New); + Local stream_pipe_string = + FIXED_ONE_BYTE_STRING(env->isolate(), "StreamPipe"); + env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe); + env->SetProtoMethod(pipe, "start", StreamPipe::Start); + AsyncWrap::AddWrapMethods(env, pipe); + pipe->SetClassName(stream_pipe_string); + pipe->InstanceTemplate()->SetInternalFieldCount(1); + target->Set(context, stream_pipe_string, pipe->GetFunction()).FromJust(); +} + +} // anonymous namespace + +} // namespace node + +NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_pipe, + node::InitializeStreamPipe) diff --git a/src/stream_pipe.h b/src/stream_pipe.h new file mode 100644 index 00000000000000..98d6dae11be841 --- /dev/null +++ b/src/stream_pipe.h @@ -0,0 +1,68 @@ +#ifndef SRC_STREAM_PIPE_H_ +#define SRC_STREAM_PIPE_H_ + +#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#include "stream_base.h" + +namespace node { + +class StreamPipe : public AsyncWrap { + public: + StreamPipe(StreamBase* source, StreamBase* sink, v8::Local obj); + ~StreamPipe(); + + void Unpipe(); + + static void New(const v8::FunctionCallbackInfo& args); + static void Start(const v8::FunctionCallbackInfo& args); + static void Unpipe(const v8::FunctionCallbackInfo& args); + + size_t self_size() const override { return sizeof(*this); } + + private: + StreamBase* source(); + StreamBase* sink(); + + void ShutdownWritable(); + void FlushToWritable(); + + bool is_reading_ = false; + bool is_writing_ = false; + bool is_eof_ = false; + bool is_closed_ = true; + + // Set a default value so that when we’re coming from Start(), we know + // that we don’t want to read just yet. + // This will likely need to be changed when supporting streams without + // `OnStreamWantsWrite()` support. + size_t wanted_data_ = 0; + + void ProcessData(size_t nread, const uv_buf_t& buf); + + class ReadableListener : public StreamListener { + public: + uv_buf_t OnStreamAlloc(size_t suggested_size) override; + void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; + void OnStreamDestroy() override; + }; + + class WritableListener : public StreamListener { + public: + uv_buf_t OnStreamAlloc(size_t suggested_size) override; + void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; + void OnStreamAfterWrite(WriteWrap* w, int status) override; + void OnStreamAfterShutdown(ShutdownWrap* w, int status) override; + void OnStreamWantsWrite(size_t suggested_size) override; + void OnStreamDestroy() override; + }; + + ReadableListener readable_listener_; + WritableListener writable_listener_; +}; + +} // namespace node + +#endif + +#endif // SRC_STREAM_PIPE_H_ diff --git a/test/sequential/test-async-wrap-getasyncid.js b/test/sequential/test-async-wrap-getasyncid.js index 66eaabec25d977..64c4fd5cd8ab50 100644 --- a/test/sequential/test-async-wrap-getasyncid.js +++ b/test/sequential/test-async-wrap-getasyncid.js @@ -35,6 +35,7 @@ common.crashOnUnhandledRejection(); delete providers.HTTP2STREAM; delete providers.HTTP2PING; delete providers.HTTP2SETTINGS; + delete providers.STREAMPIPE; const objKeys = Object.keys(providers); if (objKeys.length > 0) From 551db4dc5afa047812899117df69cdc2bf271f9a Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Wed, 14 Mar 2018 22:58:29 +0100 Subject: [PATCH 06/11] [squash] fix debug build --- src/stream_pipe.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/stream_pipe.cc b/src/stream_pipe.cc index 9ad90610b361f4..8f0263cd9ae99b 100644 --- a/src/stream_pipe.cc +++ b/src/stream_pipe.cc @@ -68,6 +68,7 @@ void StreamPipe::Unpipe() { // Delay the JS-facing part with SetImmediate, because this might be from // inside the garbage collector, so we can’t run JS here. + HandleScope handle_scope(env()->isolate()); env()->SetImmediate([](Environment* env, void* data) { StreamPipe* pipe = static_cast(data); From a91ab8ddc916b158d9b27f74b7f28ebeaa9839e1 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Tue, 13 Feb 2018 01:32:27 +0100 Subject: [PATCH 07/11] http2: use native pipe instead of synchronous I/O This resolves the issue of using synchronous I/O for `respondWithFile()` and `respondWithFD()`, and enables scenarios in which the underlying file does not need to be a regular file. PR-URL: https://github.com/nodejs/node/pull/18936 Reviewed-By: James M Snell Reviewed-By: Matteo Collina --- lib/internal/http2/core.js | 60 +++++++-- src/node_http2.cc | 125 ------------------ src/node_http2.h | 24 ---- .../test-http2-respond-with-fd-errors.js | 4 +- 4 files changed, 51 insertions(+), 162 deletions(-) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 2a4ef0421f7c84..607ae3fd2d297d 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -4,9 +4,13 @@ require('internal/util').assertCrypto(); +const { internalBinding } = require('internal/bootstrap_loaders'); const { async_id_symbol } = require('internal/async_hooks').symbols; +const { UV_EOF } = process.binding('uv'); const http = require('http'); const binding = process.binding('http2'); +const { FileHandle } = process.binding('fs'); +const { StreamPipe } = internalBinding('stream_pipe'); const assert = require('assert'); const { Buffer } = require('buffer'); const EventEmitter = require('events'); @@ -65,6 +69,7 @@ const { onServerStream, const { utcDate } = require('internal/http'); const { promisify } = require('internal/util'); const { isArrayBufferView } = require('internal/util/types'); +const { defaultTriggerAsyncIdScope } = require('internal/async_hooks'); const { _connectionListener: httpConnectionListener } = require('http'); const { createPromise, promiseResolve } = process.binding('util'); const debug = util.debuglog('http2'); @@ -345,9 +350,7 @@ function onStreamClose(code) { stream.end(); } - if (state.fd !== undefined) - tryClose(state.fd); - + state.fd = -1; // Defer destroy we actually emit end. if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) { // If errored or ended, we can destroy immediately. @@ -1928,6 +1931,26 @@ function processHeaders(headers) { return headers; } +function onFileCloseError(stream, err) { + stream.emit(err); +} + +function onFileUnpipe() { + const stream = this.sink[kOwner]; + if (stream.ownsFd) + this.source.close().catch(onFileCloseError.bind(stream)); + else + this.source.releaseFD(); +} + +// This is only called once the pipe has returned back control, so +// it only has to handle errors and End-of-File. +function onPipedFileHandleRead(err) { + if (err < 0 && err !== UV_EOF) { + this.stream.close(NGHTTP2_INTERNAL_ERROR); + } +} + function processRespondWithFD(self, fd, headers, offset = 0, length = -1, streamOptions = 0) { const state = self[kState]; @@ -1940,18 +1963,32 @@ function processRespondWithFD(self, fd, headers, offset = 0, length = -1, return; } - - // Close the writable side of the stream + // Close the writable side of the stream, but only as far as the writable + // stream implementation is concerned. + self._final = null; self.end(); - const ret = self[kHandle].respondFD(fd, headersList, - offset, length, - streamOptions); + const ret = self[kHandle].respond(headersList, streamOptions); if (ret < 0) { self.destroy(new NghttpError(ret)); return; } + + defaultTriggerAsyncIdScope(self[async_id_symbol], startFilePipe, + self, fd, offset, length); +} + +function startFilePipe(self, fd, offset, length) { + const handle = new FileHandle(fd, offset, length); + handle.onread = onPipedFileHandleRead; + handle.stream = self; + + const pipe = new StreamPipe(handle._externalStream, + self[kHandle]._externalStream); + pipe.onunpipe = onFileUnpipe; + pipe.start(); + // exact length of the file doesn't matter here, since the // stream is closing anyway - just use 1 to signify that // a write does exist @@ -2270,8 +2307,9 @@ class ServerHttp2Stream extends Http2Stream { throw new ERR_INVALID_ARG_TYPE('fd', 'number'); debug(`Http2Stream ${this[kID]} [Http2Session ` + - `${sessionName(session[kType])}]: initiating response`); + `${sessionName(session[kType])}]: initiating response from fd`); this[kUpdateTimer](); + this.ownsFd = false; headers = processHeaders(headers); const statusCode = headers[HTTP2_HEADER_STATUS] |= 0; @@ -2333,9 +2371,9 @@ class ServerHttp2Stream extends Http2Stream { const session = this[kSession]; debug(`Http2Stream ${this[kID]} [Http2Session ` + - `${sessionName(session[kType])}]: initiating response`); + `${sessionName(session[kType])}]: initiating response from file`); this[kUpdateTimer](); - + this.ownsFd = true; headers = processHeaders(headers); const statusCode = headers[HTTP2_HEADER_STATUS] |= 0; diff --git a/src/node_http2.cc b/src/node_http2.cc index d6df93cf3804a7..68a684025ce4a0 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -1888,28 +1888,6 @@ inline int Http2Stream::SubmitResponse(nghttp2_nv* nva, } -// Initiate a response that contains data read from a file descriptor. -inline int Http2Stream::SubmitFile(int fd, - nghttp2_nv* nva, size_t len, - int64_t offset, - int64_t length, - int options) { - CHECK(!this->IsDestroyed()); - Http2Scope h2scope(this); - DEBUG_HTTP2STREAM(this, "submitting file"); - if (options & STREAM_OPTION_GET_TRAILERS) - flags_ |= NGHTTP2_STREAM_FLAG_TRAILERS; - - if (offset > 0) fd_offset_ = offset; - if (length > -1) fd_length_ = length; - - Http2Stream::Provider::FD prov(this, options, fd); - int ret = nghttp2_submit_response(session_->session(), id_, nva, len, *prov); - CHECK_NE(ret, NGHTTP2_ERR_NOMEM); - return ret; -} - - // Submit informational headers for a stream. inline int Http2Stream::SubmitInfo(nghttp2_nv* nva, size_t len) { CHECK(!this->IsDestroyed()); @@ -2085,87 +2063,6 @@ Http2Stream::Provider::~Provider() { provider_.source.ptr = nullptr; } -// The FD Provider pulls data from a file descriptor using libuv. All of the -// data transfer occurs in C++, without any chunks being passed through JS -// land. -Http2Stream::Provider::FD::FD(Http2Stream* stream, int options, int fd) - : Http2Stream::Provider(stream, options) { - CHECK(!stream->IsDestroyed()); - provider_.source.fd = fd; - provider_.read_callback = Http2Stream::Provider::FD::OnRead; -} - -Http2Stream::Provider::FD::FD(int options, int fd) - : Http2Stream::Provider(options) { - provider_.source.fd = fd; - provider_.read_callback = Http2Stream::Provider::FD::OnRead; -} - -ssize_t Http2Stream::Provider::FD::OnRead(nghttp2_session* handle, - int32_t id, - uint8_t* buf, - size_t length, - uint32_t* flags, - nghttp2_data_source* source, - void* user_data) { - Http2Session* session = static_cast(user_data); - Http2Stream* stream = session->FindStream(id); - if (stream->statistics_.first_byte_sent == 0) - stream->statistics_.first_byte_sent = uv_hrtime(); - - DEBUG_HTTP2SESSION2(session, "reading outbound file data for stream %d", id); - CHECK_EQ(id, stream->id()); - - int fd = source->fd; - int64_t offset = stream->fd_offset_; - ssize_t numchars = 0; - - if (stream->fd_length_ >= 0 && - stream->fd_length_ < static_cast(length)) - length = stream->fd_length_; - - uv_buf_t data; - data.base = reinterpret_cast(buf); - data.len = length; - - uv_fs_t read_req; - - if (length > 0) { - // TODO(addaleax): Never use synchronous I/O on the main thread. - numchars = uv_fs_read(session->event_loop(), - &read_req, - fd, &data, 1, - offset, nullptr); - uv_fs_req_cleanup(&read_req); - } - - // Close the stream with an error if reading fails - if (numchars < 0) - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; - - // Update the read offset for the next read - stream->fd_offset_ += numchars; - stream->fd_length_ -= numchars; - - DEBUG_HTTP2SESSION2(session, "sending %d bytes", numchars); - - // if numchars < length, assume that we are done. - if (static_cast(numchars) < length || length <= 0) { - DEBUG_HTTP2SESSION2(session, "no more data for stream %d", id); - *flags |= NGHTTP2_DATA_FLAG_EOF; - session->GetTrailers(stream, flags); - // If the stream or session gets destroyed during the GetTrailers - // callback, check that here and close down the stream - if (stream->IsDestroyed()) - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; - if (session->IsDestroyed()) - return NGHTTP2_ERR_CALLBACK_FAILURE; - } - - stream->statistics_.sent_bytes += numchars; - return numchars; -} - // The Stream Provider pulls data from a linked list of uv_buf_t structs // built via the StreamBase API and the Streams js API. Http2Stream::Provider::Stream::Stream(int options) @@ -2508,27 +2405,6 @@ void Http2Stream::Respond(const FunctionCallbackInfo& args) { DEBUG_HTTP2STREAM(stream, "response submitted"); } -// Initiates a response on the Http2Stream using a file descriptor to provide -// outbound DATA frames. -void Http2Stream::RespondFD(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - Local context = env->context(); - Isolate* isolate = env->isolate(); - Http2Stream* stream; - ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder()); - - int fd = args[0]->Int32Value(context).ToChecked(); - Local headers = args[1].As(); - - int64_t offset = args[2]->IntegerValue(context).ToChecked(); - int64_t length = args[3]->IntegerValue(context).ToChecked(); - int options = args[4]->IntegerValue(context).ToChecked(); - - Headers list(isolate, context, headers); - args.GetReturnValue().Set(stream->SubmitFile(fd, *list, list.length(), - offset, length, options)); - DEBUG_HTTP2STREAM2(stream, "file response submitted for fd %d", fd); -} // Submits informational headers on the Http2Stream void Http2Stream::Info(const FunctionCallbackInfo& args) { @@ -2891,7 +2767,6 @@ void Initialize(Local target, env->SetProtoMethod(stream, "priority", Http2Stream::Priority); env->SetProtoMethod(stream, "pushPromise", Http2Stream::PushPromise); env->SetProtoMethod(stream, "info", Http2Stream::Info); - env->SetProtoMethod(stream, "respondFD", Http2Stream::RespondFD); env->SetProtoMethod(stream, "respond", Http2Stream::Respond); env->SetProtoMethod(stream, "rstStream", Http2Stream::RstStream); env->SetProtoMethod(stream, "refreshState", Http2Stream::RefreshState); diff --git a/src/node_http2.h b/src/node_http2.h index 2d55989fd7d2e7..0fac6cca00f4db 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -580,13 +580,6 @@ class Http2Stream : public AsyncWrap, size_t len, int options); - // Send data read from a file descriptor as the response on this stream. - inline int SubmitFile(int fd, - nghttp2_nv* nva, size_t len, - int64_t offset, - int64_t length, - int options); - // Submit informational headers for this stream inline int SubmitInfo(nghttp2_nv* nva, size_t len); @@ -709,7 +702,6 @@ class Http2Stream : public AsyncWrap, static void PushPromise(const FunctionCallbackInfo& args); static void RefreshState(const FunctionCallbackInfo& args); static void Info(const FunctionCallbackInfo& args); - static void RespondFD(const FunctionCallbackInfo& args); static void Respond(const FunctionCallbackInfo& args); static void RstStream(const FunctionCallbackInfo& args); @@ -753,8 +745,6 @@ class Http2Stream : public AsyncWrap, // waiting to be written out to the socket. std::queue queue_; size_t available_outbound_length_ = 0; - int64_t fd_offset_ = 0; - int64_t fd_length_ = -1; Http2StreamListener stream_listener_; @@ -780,20 +770,6 @@ class Http2Stream::Provider { bool empty_ = false; }; -class Http2Stream::Provider::FD : public Http2Stream::Provider { - public: - FD(int options, int fd); - FD(Http2Stream* stream, int options, int fd); - - static ssize_t OnRead(nghttp2_session* session, - int32_t id, - uint8_t* buf, - size_t length, - uint32_t* flags, - nghttp2_data_source* source, - void* user_data); -}; - class Http2Stream::Provider::Stream : public Http2Stream::Provider { public: Stream(Http2Stream* stream, int options); diff --git a/test/parallel/test-http2-respond-with-fd-errors.js b/test/parallel/test-http2-respond-with-fd-errors.js index 0eccd231c63a2e..3a671a3e36490a 100644 --- a/test/parallel/test-http2-respond-with-fd-errors.js +++ b/test/parallel/test-http2-respond-with-fd-errors.js @@ -46,8 +46,8 @@ const tests = specificTests.concat(genericTests); let currentError; -// mock respondFD because we only care about testing error handling -Http2Stream.prototype.respondFD = () => currentError.ngError; +// mock `respond` because we only care about testing error handling +Http2Stream.prototype.respond = () => currentError.ngError; const server = http2.createServer(); server.on('stream', common.mustCall((stream, headers) => { From 3be8a42a9d223668eece88447a1b816ffc57fdc9 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Wed, 14 Feb 2018 00:18:18 +0100 Subject: [PATCH 08/11] http2: remove regular-file-only restriction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Requiring `respondWithFile()` to only work with regular files is an artificial restriction on Node’s side and has become unnecessary. Offsets or lengths cannot be specified for those files, but that is an inherent property of other file types. PR-URL: https://github.com/nodejs/node/pull/18936 Reviewed-By: James M Snell Reviewed-By: Matteo Collina --- doc/api/errors.md | 9 ++- doc/api/http2.md | 10 +++ lib/internal/errors.js | 4 +- lib/internal/http2/core.js | 34 +++++++---- .../test-http2-respond-file-error-dir.js | 2 +- ...st-http2-respond-file-error-pipe-offset.js | 61 +++++++++++++++++++ .../test-http2-respond-file-with-pipe.js | 58 ++++++++++++++++++ 7 files changed, 164 insertions(+), 14 deletions(-) create mode 100644 test/parallel/test-http2-respond-file-error-pipe-offset.js create mode 100644 test/parallel/test-http2-respond-file-with-pipe.js diff --git a/doc/api/errors.md b/doc/api/errors.md index 7d6e238f93bdaa..f5190dc52bffa9 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -971,7 +971,14 @@ client. ### ERR_HTTP2_SEND_FILE An attempt was made to use the `Http2Stream.prototype.responseWithFile()` API to -send something other than a regular file. +send a directory. + + +### ERR_HTTP2_SEND_FILE_NOSEEK + +An attempt was made to use the `Http2Stream.prototype.responseWithFile()` API to +send something other than a regular file, but `offset` or `length` options were +provided. ### ERR_HTTP2_SESSION_ERROR diff --git a/doc/api/http2.md b/doc/api/http2.md index 94f25377d0acbf..13aa4d1e2103d1 100644 --- a/doc/api/http2.md +++ b/doc/api/http2.md @@ -1223,6 +1223,11 @@ if the `getTrailers` callback attempts to set such header fields. #### http2stream.respondWithFD(fd[, headers[, options]]) * `fd` {number} A readable file descriptor. @@ -1313,6 +1318,11 @@ if the `getTrailers` callback attempts to set such header fields. #### http2stream.respondWithFile(path[, headers[, options]]) * `path` {string|Buffer|URL} diff --git a/lib/internal/errors.js b/lib/internal/errors.js index 8ec72dd647b1b2..3d0b6cf9b39177 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -714,7 +714,9 @@ E('ERR_HTTP2_PING_LENGTH', 'HTTP2 ping payload must be 8 bytes', RangeError); E('ERR_HTTP2_PSEUDOHEADER_NOT_ALLOWED', 'Cannot set HTTP/2 pseudo-headers', Error); E('ERR_HTTP2_PUSH_DISABLED', 'HTTP/2 client has disabled push streams', Error); -E('ERR_HTTP2_SEND_FILE', 'Only regular files can be sent', Error); +E('ERR_HTTP2_SEND_FILE', 'Directories cannot be sent', Error); +E('ERR_HTTP2_SEND_FILE_NOSEEK', + 'Offset or length can only be specified for regular files', Error); E('ERR_HTTP2_SESSION_ERROR', 'Session closed with error code %s', Error); E('ERR_HTTP2_SOCKET_BOUND', 'The socket is already bound to an Http2Session', Error); diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 607ae3fd2d297d..7137e527df07aa 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -42,6 +42,7 @@ const { ERR_HTTP2_PING_LENGTH, ERR_HTTP2_PUSH_DISABLED, ERR_HTTP2_SEND_FILE, + ERR_HTTP2_SEND_FILE_NOSEEK, ERR_HTTP2_SESSION_ERROR, ERR_HTTP2_SETTINGS_CANCEL, ERR_HTTP2_SOCKET_BOUND, @@ -2045,12 +2046,21 @@ function doSendFileFD(session, options, fd, headers, streamOptions, err, stat) { } if (!stat.isFile()) { - const err = new ERR_HTTP2_SEND_FILE(); - if (onError) - onError(err); - else - this.destroy(err); - return; + const isDirectory = stat.isDirectory(); + if (options.offset !== undefined || options.offset > 0 || + options.length !== undefined || options.length >= 0 || + isDirectory) { + const err = isDirectory ? + new ERR_HTTP2_SEND_FILE() : new ERR_HTTP2_SEND_FILE_NOSEEK(); + if (onError) + onError(err); + else + this.destroy(err); + return; + } + + options.offset = -1; + options.length = -1; } if (this.destroyed || this.closed) { @@ -2075,12 +2085,14 @@ function doSendFileFD(session, options, fd, headers, streamOptions, err, stat) { return; } - statOptions.length = - statOptions.length < 0 ? stat.size - (+statOptions.offset) : - Math.min(stat.size - (+statOptions.offset), - statOptions.length); + if (stat.isFile()) { + statOptions.length = + statOptions.length < 0 ? stat.size - (+statOptions.offset) : + Math.min(stat.size - (+statOptions.offset), + statOptions.length); - headers[HTTP2_HEADER_CONTENT_LENGTH] = statOptions.length; + headers[HTTP2_HEADER_CONTENT_LENGTH] = statOptions.length; + } processRespondWithFD(this, fd, headers, options.offset | 0, diff --git a/test/parallel/test-http2-respond-file-error-dir.js b/test/parallel/test-http2-respond-file-error-dir.js index 6818616227df89..24a6d2dc96597e 100644 --- a/test/parallel/test-http2-respond-file-error-dir.js +++ b/test/parallel/test-http2-respond-file-error-dir.js @@ -15,7 +15,7 @@ server.on('stream', (stream) => { common.expectsError({ code: 'ERR_HTTP2_SEND_FILE', type: Error, - message: 'Only regular files can be sent' + message: 'Directories cannot be sent' })(err); stream.respond({ ':status': 404 }); diff --git a/test/parallel/test-http2-respond-file-error-pipe-offset.js b/test/parallel/test-http2-respond-file-error-pipe-offset.js new file mode 100644 index 00000000000000..eb782e2dea66c4 --- /dev/null +++ b/test/parallel/test-http2-respond-file-error-pipe-offset.js @@ -0,0 +1,61 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +if (common.isWindows) + common.skip('no mkfifo on Windows'); +const child_process = require('child_process'); +const path = require('path'); +const fs = require('fs'); +const http2 = require('http2'); +const assert = require('assert'); + +const tmpdir = require('../common/tmpdir'); +tmpdir.refresh(); + +const pipeName = path.join(tmpdir.path, 'pipe'); + +const mkfifo = child_process.spawnSync('mkfifo', [ pipeName ]); +if (mkfifo.error && mkfifo.error.code === 'ENOENT') { + common.skip('missing mkfifo'); +} + +process.on('exit', () => fs.unlinkSync(pipeName)); + +const server = http2.createServer(); +server.on('stream', (stream) => { + stream.respondWithFile(pipeName, { + 'content-type': 'text/plain' + }, { + offset: 10, + onError(err) { + common.expectsError({ + code: 'ERR_HTTP2_SEND_FILE_NOSEEK', + type: Error, + message: 'Offset or length can only be specified for regular files' + })(err); + + stream.respond({ ':status': 404 }); + stream.end(); + }, + statCheck: common.mustNotCall() + }); +}); +server.listen(0, () => { + + const client = http2.connect(`http://localhost:${server.address().port}`); + const req = client.request(); + + req.on('response', common.mustCall((headers) => { + assert.strictEqual(headers[':status'], 404); + })); + req.on('data', common.mustNotCall()); + req.on('end', common.mustCall(() => { + client.close(); + server.close(); + })); + req.end(); +}); + +fs.writeFile(pipeName, 'Hello, world!\n', common.mustCall()); diff --git a/test/parallel/test-http2-respond-file-with-pipe.js b/test/parallel/test-http2-respond-file-with-pipe.js new file mode 100644 index 00000000000000..2b7ca3fc9a51ee --- /dev/null +++ b/test/parallel/test-http2-respond-file-with-pipe.js @@ -0,0 +1,58 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +if (common.isWindows) + common.skip('no mkfifo on Windows'); +const child_process = require('child_process'); +const path = require('path'); +const fs = require('fs'); +const http2 = require('http2'); +const assert = require('assert'); + +const tmpdir = require('../common/tmpdir'); +tmpdir.refresh(); + +const pipeName = path.join(tmpdir.path, 'pipe'); + +const mkfifo = child_process.spawnSync('mkfifo', [ pipeName ]); +if (mkfifo.error && mkfifo.error.code === 'ENOENT') { + common.skip('missing mkfifo'); +} + +process.on('exit', () => fs.unlinkSync(pipeName)); + +const server = http2.createServer(); +server.on('stream', (stream) => { + stream.respondWithFile(pipeName, { + 'content-type': 'text/plain' + }, { + onError: common.mustNotCall(), + statCheck: common.mustCall() + }); +}); + +server.listen(0, () => { + const client = http2.connect(`http://localhost:${server.address().port}`); + const req = client.request(); + + req.on('response', common.mustCall((headers) => { + assert.strictEqual(headers[':status'], 200); + })); + let body = ''; + req.setEncoding('utf8'); + req.on('data', (chunk) => body += chunk); + req.on('end', common.mustCall(() => { + assert.strictEqual(body, 'Hello, world!\n'); + client.close(); + server.close(); + })); + req.end(); +}); + +fs.open(pipeName, 'w', common.mustCall((err, fd) => { + assert.ifError(err); + fs.writeSync(fd, 'Hello, world!\n'); + fs.closeSync(fd); +})); From cf72772498b952b85f6fa88446ec72b7cfd2ab17 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Fri, 23 Feb 2018 16:53:54 +0100 Subject: [PATCH 09/11] src: use ObjectTemplate for creating stream req objs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This allows V8 to avoid preparing a execution context for the constructor, to give a (kinda) small but noticeable perf gain. Benchmarks (only this commit): $ ./node benchmark/compare.js --new ./node --old ./node-master --filter net-c2s.js --set len=10 --set type=asc --runs 360 net | Rscript benchmark/compare.R [01:15:27|% 100| 1/1 files | 720/720 runs | 1/1 configs]: Done confidence improvement accuracy (*) (**) (***) net/net-c2s.js dur=5 type='asc' len=10 *** 0.69 % ±0.31% ±0.41% ±0.53% PR-URL: https://github.com/nodejs/node/pull/18936 Reviewed-By: James M Snell Reviewed-By: Matteo Collina --- src/env.h | 4 ++-- src/stream_base-inl.h | 15 +++++++++++++-- src/stream_base.h | 7 +++++++ src/stream_wrap.cc | 8 +++----- 4 files changed, 25 insertions(+), 9 deletions(-) diff --git a/src/env.h b/src/env.h index 4fc6b31ffebd21..868f3e9f212f67 100644 --- a/src/env.h +++ b/src/env.h @@ -329,7 +329,7 @@ struct PackageConfig { V(script_context_constructor_template, v8::FunctionTemplate) \ V(script_data_constructor_function, v8::Function) \ V(secure_context_constructor_template, v8::FunctionTemplate) \ - V(shutdown_wrap_constructor_function, v8::Function) \ + V(shutdown_wrap_template, v8::ObjectTemplate) \ V(tcp_constructor_template, v8::FunctionTemplate) \ V(tick_callback_function, v8::Function) \ V(timers_callback_function, v8::Function) \ @@ -338,7 +338,7 @@ struct PackageConfig { V(udp_constructor_function, v8::Function) \ V(vm_parsing_context_symbol, v8::Symbol) \ V(url_constructor_function, v8::Function) \ - V(write_wrap_constructor_function, v8::Function) \ + V(write_wrap_template, v8::ObjectTemplate) \ V(fs_use_promises_symbol, v8::Symbol) class Environment; diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index 7523b3a545355f..f4c228d7c5956f 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -164,8 +164,9 @@ inline int StreamBase::Shutdown(v8::Local req_wrap_obj) { if (req_wrap_obj.IsEmpty()) { req_wrap_obj = - env->shutdown_wrap_constructor_function() + env->shutdown_wrap_template() ->NewInstance(env->context()).ToLocalChecked(); + StreamReq::ResetObject(req_wrap_obj); } AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap()); @@ -203,8 +204,9 @@ inline StreamWriteResult StreamBase::Write( if (req_wrap_obj.IsEmpty()) { req_wrap_obj = - env->write_wrap_constructor_function() + env->write_wrap_template() ->NewInstance(env->context()).ToLocalChecked(); + StreamReq::ResetObject(req_wrap_obj); } AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap()); @@ -427,6 +429,15 @@ inline void StreamReq::Done(int status, const char* error_str) { OnDone(status); } +inline void StreamReq::ResetObject(v8::Local obj) { +#ifdef DEBUG + CHECK_GT(obj->InternalFieldCount(), StreamReq::kStreamReqField); +#endif + ClearWrap(obj); + obj->SetAlignedPointerInInternalField(StreamReq::kStreamReqField, nullptr); +} + + } // namespace node #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS diff --git a/src/stream_base.h b/src/stream_base.h index 7264824265a579..3267e544a04195 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -46,6 +46,13 @@ class StreamReq { static StreamReq* FromObject(v8::Local req_wrap_obj); + // Sets all internal fields of `req_wrap_obj` to `nullptr`. + // This is what the `WriteWrap` and `ShutdownWrap` JS constructors do, + // and what we use in C++ after creating these objects from their + // v8::ObjectTemplates, to avoid the overhead of calling the + // constructor explicitly. + static inline void ResetObject(v8::Local req_wrap_obj); + protected: virtual void OnDone(int status) = 0; diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index 27fe48d1165c75..c696404849c9cc 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -60,9 +60,7 @@ void LibuvStreamWrap::Initialize(Local target, auto is_construct_call_callback = [](const FunctionCallbackInfo& args) { CHECK(args.IsConstructCall()); - ClearWrap(args.This()); - args.This()->SetAlignedPointerInInternalField( - StreamReq::kStreamReqField, nullptr); + StreamReq::ResetObject(args.This()); }; Local sw = FunctionTemplate::New(env->isolate(), is_construct_call_callback); @@ -72,7 +70,7 @@ void LibuvStreamWrap::Initialize(Local target, sw->SetClassName(wrapString); AsyncWrap::AddWrapMethods(env, sw); target->Set(wrapString, sw->GetFunction()); - env->set_shutdown_wrap_constructor_function(sw->GetFunction()); + env->set_shutdown_wrap_template(sw->InstanceTemplate()); Local ww = FunctionTemplate::New(env->isolate(), is_construct_call_callback); @@ -82,7 +80,7 @@ void LibuvStreamWrap::Initialize(Local target, ww->SetClassName(writeWrapString); AsyncWrap::AddWrapMethods(env, ww); target->Set(writeWrapString, ww->GetFunction()); - env->set_write_wrap_constructor_function(ww->GetFunction()); + env->set_write_wrap_template(ww->InstanceTemplate()); } From 4199388f6c463d7a93468c3db8c05441f9da1b25 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Wed, 7 Mar 2018 18:45:21 +0100 Subject: [PATCH 10/11] benchmark: remove excessive value from http2 benchmark MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `requests = 1000000` took about 10 minutes per run for me and doesn’t seem to add much value on its own. PR-URL: https://github.com/nodejs/node/pull/18936 Reviewed-By: James M Snell Reviewed-By: Matteo Collina --- benchmark/http2/respond-with-fd.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmark/http2/respond-with-fd.js b/benchmark/http2/respond-with-fd.js index fa7b2fbd16b3e6..cc9992e8ca0bd5 100644 --- a/benchmark/http2/respond-with-fd.js +++ b/benchmark/http2/respond-with-fd.js @@ -7,7 +7,7 @@ const fs = require('fs'); const file = path.join(path.resolve(__dirname, '../fixtures'), 'alice.html'); const bench = common.createBenchmark(main, { - requests: [100, 1000, 10000, 100000, 1000000], + requests: [100, 1000, 10000, 100000], streams: [100, 200, 1000], clients: [1, 2], benchmarker: ['h2load'] From d8eb113a65555d3f688a897b1681e59b73b1f763 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Wed, 7 Mar 2018 14:16:34 +0100 Subject: [PATCH 11/11] http: align parser with StreamBase interface changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `StreamBase` interface changed, so that `OnStreamRead()` and `OnStreamAlloc()` are not guaranteed to be emitted in the same tick any more. This means that, while it isn’t causing any trouble right now, we should not assume that it’s safe to return a static buffer in the HTTP parser’s `OnStreamAlloc()` method. PR-URL: https://github.com/nodejs/node/pull/18936 Reviewed-By: James M Snell Reviewed-By: Matteo Collina --- src/env-inl.h | 8 ++++++++ src/env.h | 3 +++ src/node_http_parser.cc | 17 +++++++++++++++++ src/util.h | 8 ++++++++ 4 files changed, 36 insertions(+) diff --git a/src/env-inl.h b/src/env-inl.h index 05c7a90c52fee7..3fe57f808cfec7 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -469,6 +469,14 @@ inline void Environment::set_http_parser_buffer(char* buffer) { http_parser_buffer_ = buffer; } +inline bool Environment::http_parser_buffer_in_use() const { + return http_parser_buffer_in_use_; +} + +inline void Environment::set_http_parser_buffer_in_use(bool in_use) { + http_parser_buffer_in_use_ = in_use; +} + inline http2::http2_state* Environment::http2_state() const { return http2_state_.get(); } diff --git a/src/env.h b/src/env.h index 868f3e9f212f67..4b874677935edb 100644 --- a/src/env.h +++ b/src/env.h @@ -646,6 +646,8 @@ class Environment { inline char* http_parser_buffer() const; inline void set_http_parser_buffer(char* buffer); + inline bool http_parser_buffer_in_use() const; + inline void set_http_parser_buffer_in_use(bool in_use); inline http2::http2_state* http2_state() const; inline void set_http2_state(std::unique_ptr state); @@ -828,6 +830,7 @@ class Environment { double* heap_space_statistics_buffer_ = nullptr; char* http_parser_buffer_; + bool http_parser_buffer_in_use_ = false; std::unique_ptr http2_state_; // stat fields contains twice the number of entries because `fs.StatWatcher` diff --git a/src/node_http_parser.cc b/src/node_http_parser.cc index 207310f4068f43..8ab13e07340929 100644 --- a/src/node_http_parser.cc +++ b/src/node_http_parser.cc @@ -525,6 +525,14 @@ class Parser : public AsyncWrap, public StreamListener { static const size_t kAllocBufferSize = 64 * 1024; uv_buf_t OnStreamAlloc(size_t suggested_size) override { + // For most types of streams, OnStreamRead will be immediately after + // OnStreamAlloc, and will consume all data, so using a static buffer for + // reading is more efficient. For other streams, just use the default + // allocator, which uses Malloc(). + if (env()->http_parser_buffer_in_use()) + return StreamListener::OnStreamAlloc(suggested_size); + env()->set_http_parser_buffer_in_use(true); + if (env()->http_parser_buffer() == nullptr) env()->set_http_parser_buffer(new char[kAllocBufferSize]); @@ -534,6 +542,15 @@ class Parser : public AsyncWrap, public StreamListener { void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override { HandleScope scope(env()->isolate()); + // Once we’re done here, either indicate that the HTTP parser buffer + // is free for re-use, or free() the data if it didn’t come from there + // in the first place. + OnScopeLeave on_scope_leave([&]() { + if (buf.base == env()->http_parser_buffer()) + env()->set_http_parser_buffer_in_use(false); + else + free(buf.base); + }); if (nread < 0) { PassReadErrorToPreviousListener(nread); diff --git a/src/util.h b/src/util.h index c822390ec56f8b..e871fc63a5c46a 100644 --- a/src/util.h +++ b/src/util.h @@ -436,6 +436,14 @@ class BufferValue : public MaybeStackBuffer { template inline void USE(T&&) {} } // namespace node +// Run a function when exiting the current scope. +struct OnScopeLeave { + std::function fn_; + + explicit OnScopeLeave(std::function fn) : fn_(fn) {} + ~OnScopeLeave() { fn_(); } +}; + #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS #endif // SRC_UTIL_H_