Skip to content

Commit 5e20033

Browse files
committed
fetch network resource on worker thread
1 parent b8f2aa9 commit 5e20033

16 files changed

+505
-242
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
'use strict';
2+
const { addFallThroughListener } = internalBinding('inspector');
3+
const { InternalWorker } = require('internal/worker');
4+
5+
function spawnLoadNetworkResourceWorker() {
6+
const worker = new InternalWorker('internal/inspector/load_network_resource_worker', {
7+
stderr: false,
8+
stdin: false,
9+
stdout: false,
10+
trackUnmanagedFds: false,
11+
});
12+
13+
worker.unref();
14+
15+
addFallThroughListener((sessionId, callId, method, message) => {
16+
if (method === 'Network.loadNetworkResource') {
17+
worker.postMessage({
18+
sessionId,
19+
callId,
20+
method,
21+
message,
22+
});
23+
}
24+
});
25+
}
26+
27+
module.exports = {
28+
spawnLoadNetworkResourceWorker,
29+
};
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
'use strict';
2+
const {
3+
JSONParse,
4+
JSONStringify,
5+
} = primordials;
6+
const {
7+
emitProtocolResponseInParent,
8+
addIoData,
9+
} = internalBinding('inspector');
10+
const { AbortController } = require('internal/abort_controller');
11+
const { setTimeout, clearTimeout } = require('timers');
12+
const { fetch } = require('internal/deps/undici/undici');
13+
14+
module.exports = function setupInspectorWorker(_, port) {
15+
function emitResponse(callId, sessionId, success, streamId = null) {
16+
const result = {
17+
id: callId,
18+
result: {
19+
resource: {
20+
success,
21+
},
22+
},
23+
};
24+
if (success && streamId !== null) {
25+
result.result.resource.stream = streamId.toString();
26+
}
27+
emitProtocolResponseInParent(callId, JSONStringify(result), sessionId);
28+
}
29+
30+
port.on('message', (msg) => {
31+
const { sessionId, callId, message } = msg;
32+
const url = JSONParse(message).params.url;
33+
34+
const controller = new AbortController();
35+
const timeoutId = setTimeout(() => controller.abort(), 2000);
36+
37+
fetch(url, { signal: controller.signal })
38+
.then((response) => {
39+
clearTimeout(timeoutId);
40+
if (!response.ok) {
41+
emitResponse(callId, sessionId, false);
42+
return null;
43+
}
44+
return response.text().then((text) => {
45+
const streamId = addIoData(text);
46+
emitResponse(callId, sessionId, true, streamId);
47+
});
48+
})
49+
.catch(() => {
50+
clearTimeout(timeoutId);
51+
emitResponse(callId, sessionId, false);
52+
});
53+
});
54+
};

lib/internal/modules/run_main.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ function runEntryPointWithESMLoader(callback) {
136136
* @param {string} main - First positional CLI argument, such as `'entry.js'` from `node entry.js`
137137
*/
138138
function executeUserEntryPoint(main = process.argv[1]) {
139+
if (getOptionValue('--experimental-inspector-network-resource')) {
140+
require('internal/inspector/fall_through_handle').spawnLoadNetworkResourceWorker();
141+
}
142+
139143
let useESMLoader;
140144
let resolvedMain;
141145
if (getOptionValue('--entry-url')) {

src/inspector/io_agent.cc

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#include "io_agent.h"
2+
#include <string>
23
#include "crdtp/dispatch.h"
4+
#include "node_mutex.h"
35

46
namespace node::inspector::protocol {
57

@@ -8,17 +10,47 @@ void IoAgent::Wire(UberDispatcher* dispatcher) {
810
IO::Dispatcher::wire(dispatcher, this);
911
}
1012

13+
std::unordered_map<int, int> IoAgent::offset_map_;
14+
std::unordered_map<int, std::string> IoAgent::data_map_;
15+
std::atomic<int> IoAgent::stream_counter_{1};
16+
Mutex IoAgent::data_mutex_;
17+
18+
int IoAgent::setData(const std::string& value) {
19+
int key = getNextStreamId();
20+
Mutex::ScopedLock lock(data_mutex_);
21+
data_map_[key] = value;
22+
23+
return key;
24+
}
25+
26+
int IoAgent::getNextStreamId() {
27+
return stream_counter_++;
28+
}
29+
1130
DispatchResponse IoAgent::read(const String& in_handle,
1231
Maybe<int> in_offset,
1332
Maybe<int> in_size,
1433
String* out_data,
1534
bool* out_eof) {
16-
std::string txt = data_map_[in_handle];
35+
Mutex::ScopedReadLock lock(data_mutex_);
36+
std::string in_handle_str = in_handle;
37+
int stream_id = 0;
38+
bool is_number = std::all_of(in_handle_str.begin(),
39+
in_handle_str.end(),
40+
::isdigit);
41+
if (is_number) {
42+
stream_id = std::stoi(in_handle_str);
43+
} else {
44+
out_data = new String("");
45+
*out_eof = true;
46+
}
47+
48+
std::string txt = data_map_[stream_id];
1749
int offset = 0;
1850
if (in_offset.isJust()) {
1951
offset = in_offset.fromJust();
20-
} else if (offset_map_.find(in_handle) != offset_map_.end()) {
21-
offset = offset_map_[in_handle];
52+
} else if (offset_map_.find(stream_id) != offset_map_.end()) {
53+
offset = offset_map_[stream_id];
2254
}
2355
int size = 1 << 20;
2456
if (in_size.isJust()) {
@@ -28,18 +60,28 @@ DispatchResponse IoAgent::read(const String& in_handle,
2860
if (static_cast<std::size_t>(offset) < txt.length()) {
2961
std::string out_txt = txt.substr(offset, size);
3062
out_data->assign(out_txt);
63+
*out_eof = false;
3164
} else {
3265
*out_eof = true;
3366
}
3467

35-
offset_map_[in_handle] = offset + size;
68+
offset_map_[stream_id] = offset + size;
3669

3770
return DispatchResponse::Success();
3871
}
3972

4073
DispatchResponse IoAgent::close(const String& in_handle) {
41-
offset_map_.erase(in_handle);
42-
data_map_.erase(in_handle);
74+
Mutex::ScopedWriteLock lock(data_mutex_);
75+
std::string in_handle_str = in_handle;
76+
int stream_id = 0;
77+
bool is_number = std::all_of(in_handle_str.begin(),
78+
in_handle_str.end(),
79+
::isdigit);
80+
if (is_number) {
81+
stream_id = std::stoi(in_handle_str);
82+
offset_map_.erase(stream_id);
83+
data_map_.erase(stream_id);
84+
}
4385
return DispatchResponse::Success();
4486
}
4587
} // namespace node::inspector::protocol

src/inspector/io_agent.h

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include <unordered_map>
55
#include "node/inspector/protocol/IO.h"
6+
#include "node_mutex.h"
67

78
namespace node::inspector::protocol {
89

@@ -17,15 +18,16 @@ class IoAgent : public IO::Backend {
1718
bool* out_eof) override;
1819
DispatchResponse close(const String& in_handle) override;
1920

20-
void setData(const std::string& key, const std::string value) {
21-
data_map_[key] = value;
22-
}
21+
static int setData(const std::string& value);
2322

2423
private:
2524
std::shared_ptr<IO::Frontend> frontend_;
26-
std::unordered_map<std::string, int> offset_map_;
27-
std::unordered_map<std::string, std::string> data_map_;
25+
static int getNextStreamId();
26+
27+
static std::unordered_map<int, std::string> data_map_;
28+
static std::unordered_map<int, int> offset_map_;
29+
static std::atomic<int> stream_counter_;
30+
static Mutex data_mutex_;
2831
};
2932
} // namespace node::inspector::protocol
30-
3133
#endif // SRC_INSPECTOR_IO_AGENT_H_

src/inspector/network_agent.cc

Lines changed: 3 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -183,12 +183,10 @@ std::unique_ptr<protocol::Network::Response> createResponseFromObject(
183183

184184
NetworkAgent::NetworkAgent(NetworkInspector* inspector,
185185
v8_inspector::V8Inspector* v8_inspector,
186-
Environment* env,
187-
std::shared_ptr<protocol::IoAgent> io_agent)
186+
Environment* env)
188187
: inspector_(inspector),
189188
v8_inspector_(v8_inspector),
190-
env_(env),
191-
io_agent_(io_agent) {
189+
env_(env) {
192190
event_notifier_map_["requestWillBeSent"] = &NetworkAgent::requestWillBeSent;
193191
event_notifier_map_["responseReceived"] = &NetworkAgent::responseReceived;
194192
event_notifier_map_["loadingFailed"] = &NetworkAgent::loadingFailed;
@@ -221,144 +219,11 @@ protocol::DispatchResponse NetworkAgent::disable() {
221219
return protocol::DispatchResponse::Success();
222220
}
223221

224-
std::tuple<int, std::string, std::string> NetworkAgent::spawnFetchProcess(
225-
std::string_view code, Environment* env, std::string_view url) {
226-
std::string stdout_result;
227-
std::string stderr_result;
228-
uv_loop_t* loop = new uv_loop_t;
229-
uv_loop_init(loop);
230-
uv_process_t child;
231-
uv_pipe_t stdout_pipe;
232-
uv_pipe_init(loop, &stdout_pipe, 0);
233-
uv_pipe_t stderr_pipe;
234-
uv_pipe_init(loop, &stderr_pipe, 0);
235-
236-
uv_process_options_t uv_process_options;
237-
std::string command =
238-
env->exec_path() + " --eval \"" + code.data() + "\" -- " + url.data();
239-
240-
const char* file = env->exec_path().c_str();
241-
char* args[] = {const_cast<char*>(file),
242-
const_cast<char*>("--eval"),
243-
reinterpret_cast<char*>(const_cast<char*>(code.data())),
244-
reinterpret_cast<char*>(const_cast<char*>(url.data())),
245-
nullptr};
246-
247-
uv_stdio_container_t stdio[3];
248-
uv_process_options.file = file;
249-
uv_process_options.args = args;
250-
uv_process_options.flags = 0;
251-
uv_process_options.stdio_count = 3;
252-
uv_process_options.stdio = stdio;
253-
uv_process_options.cwd = nullptr;
254-
uv_process_options.env = nullptr;
255-
256-
uv_process_options.exit_cb =
257-
[](uv_process_t* req, int64_t exit_status, int term_signal) {
258-
uv_close(reinterpret_cast<uv_handle_t*>(req), nullptr);
259-
};
260-
261-
stdio[0].flags = UV_INHERIT_FD;
262-
stdio[0].data.fd = 0;
263-
stdio[1].flags =
264-
static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
265-
stdio[1].data.stream = reinterpret_cast<uv_stream_t*>(&stdout_pipe);
266-
stdio[2].flags =
267-
static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
268-
stdio[2].data.stream = reinterpret_cast<uv_stream_t*>(&stderr_pipe);
269-
270-
int r = uv_spawn(loop, &child, &uv_process_options);
271-
272-
if (r != 0) {
273-
uv_loop_close(loop);
274-
delete loop;
275-
return {r, stdout_result, stderr_result};
276-
}
277-
278-
auto alloc_cb =
279-
[](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
280-
buf->base = static_cast<char*>(malloc(suggested_size));
281-
buf->len = suggested_size;
282-
};
283-
284-
auto read_cb = [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
285-
auto* response = static_cast<std::string*>(stream->data);
286-
if (nread > 0) {
287-
response->append(buf->base, nread);
288-
} else if (nread < 0) {
289-
if (!response->empty() && response->back() == '\n') {
290-
response->pop_back();
291-
}
292-
uv_close(reinterpret_cast<uv_handle_t*>(stream), nullptr);
293-
}
294-
if (buf->base) free(buf->base);
295-
};
296-
297-
stdout_pipe.data = &stdout_result;
298-
uv_read_start(
299-
reinterpret_cast<uv_stream_t*>(&stdout_pipe), alloc_cb, read_cb);
300-
301-
stderr_pipe.data = &stderr_result;
302-
uv_read_start(
303-
reinterpret_cast<uv_stream_t*>(&stderr_pipe), alloc_cb, read_cb);
304-
305-
uv_run(loop, UV_RUN_DEFAULT);
306-
307-
uv_walk(
308-
loop,
309-
[](uv_handle_t* handle, void*) {
310-
if (!uv_is_closing(handle)) {
311-
uv_close(handle, nullptr);
312-
}
313-
},
314-
nullptr);
315-
316-
uv_run(loop, UV_RUN_DEFAULT);
317-
318-
uv_loop_close(loop);
319-
delete loop;
320-
return {r, stdout_result, stderr_result};
321-
}
322-
323222
protocol::DispatchResponse NetworkAgent::loadNetworkResource(
324223
const protocol::String& in_url,
325224
std::unique_ptr<protocol::Network::LoadNetworkResourcePageResult>*
326225
out_resource) {
327-
if (!env_->options()->experimental_inspector_network_resource) {
328-
return protocol::DispatchResponse::MethodNotFound(
329-
"Network.loadNetworkResource is not supported in this environment. "
330-
"Please enable the experimental-inspector-network-resource option.");
331-
}
332-
DCHECK(io_agent_);
333-
334-
std::string code = R"(
335-
fetch(process.argv[1], {signal: AbortSignal.timeout(2000) }).then(res => {
336-
if (res.ok) {
337-
res.text().then(console.log)
338-
} else {
339-
throw new Error('Network error: ' + res.status);
340-
}
341-
})
342-
)";
343-
344-
auto [r, response, err] = spawnFetchProcess(code, env_, in_url);
345-
if (r == 0 && err.empty()) {
346-
std::string uuid = std::to_string(load_id_counter_);
347-
load_id_counter_++;
348-
io_agent_->setData(uuid, response);
349-
auto result = protocol::Network::LoadNetworkResourcePageResult::create()
350-
.setSuccess(true)
351-
.setStream(uuid)
352-
.build();
353-
out_resource->reset(result.release());
354-
} else {
355-
auto result = protocol::Network::LoadNetworkResourcePageResult::create()
356-
.setSuccess(false)
357-
.build();
358-
out_resource->reset(result.release());
359-
}
360-
361-
return protocol::DispatchResponse::Success();
226+
return protocol::DispatchResponse::FallThrough();
362227
}
363228

364229
void NetworkAgent::requestWillBeSent(v8::Local<v8::Context> context,

src/inspector/network_agent.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ class NetworkAgent : public protocol::Network::Backend {
1616
public:
1717
explicit NetworkAgent(NetworkInspector* inspector,
1818
v8_inspector::V8Inspector* v8_inspector,
19-
Environment* env,
20-
std::shared_ptr<protocol::IoAgent> io_agent);
19+
Environment* env);
2120

2221
void Wire(protocol::UberDispatcher* dispatcher);
2322

@@ -56,8 +55,6 @@ class NetworkAgent : public protocol::Network::Backend {
5655
using EventNotifier = void (NetworkAgent::*)(v8::Local<v8::Context> context,
5756
v8::Local<v8::Object>);
5857
std::unordered_map<protocol::String, EventNotifier> event_notifier_map_;
59-
std::shared_ptr<protocol::IoAgent> io_agent_;
60-
int load_id_counter_ = 1;
6158
};
6259

6360
} // namespace inspector

0 commit comments

Comments
 (0)