Skip to content

Commit dc09f3c

Browse files
committed
main, instance_manager: Go back to running subgraphs with spawn_blocking
1 parent 67d742d commit dc09f3c

File tree

5 files changed

+29
-16
lines changed

5 files changed

+29
-16
lines changed

core/src/subgraph/instance_manager.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -446,8 +446,10 @@ where
446446
// forward; this is easier than updating the existing block stream.
447447
//
448448
// This task has many calls to the store, so mark it as `blocking`.
449-
graph::spawn_thread(deployment_id.to_string(), move || {
450-
if let Err(e) = graph::block_on(run_subgraph(ctx)) {
449+
// This call is the reason why the size of the blocking thread pool
450+
// size must always be well above the number of deployed subgraphs.
451+
graph::spawn_blocking(async move {
452+
if let Err(e) = run_subgraph(ctx).await {
451453
error!(
452454
&logger,
453455
"Subgraph instance failed to run: {}",

docs/environment-variables.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,3 +154,8 @@ those.
154154
given the other load management configuration settings, but never
155155
actually decline to run a query, instead log about load management
156156
decisions. Set to `true` to turn simulation on, defaults to `false`
157+
- `GRAPH_MAX_BLOCKING_THREADS`: Maximum number of blocking threads in the tokio blocking thread
158+
pool. In an index node this should always be well above the number of subgraphs deployed to it,
159+
because each subgraph permanently takes up one thread. Graphql queries are currently also run on
160+
the blocking thread pool, but the DB connection pool size is usually the limiting factor for
161+
queries. Defaults to 2000.

graph/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub mod mock {
2727
/// Wrapper for spawning tasks that abort on panic, which is our default.
2828
mod task_spawn;
2929
pub use task_spawn::{
30-
block_on, spawn, spawn_allow_panic, spawn_blocking, spawn_blocking_allow_panic, spawn_thread,
30+
block_on, spawn, spawn_allow_panic, spawn_blocking, spawn_blocking_allow_panic,
3131
};
3232

3333
pub use bytes;

graph/src/task_spawn.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,3 @@ pub fn spawn_blocking_allow_panic<R: 'static + Send>(
5454
pub fn block_on<T>(f: impl Future03<Output = T>) -> T {
5555
tokio::runtime::Handle::current().block_on(f)
5656
}
57-
58-
/// Spawns a thread with access to the tokio runtime. Panics if the thread cannot be spawned.
59-
pub fn spawn_thread(name: String, f: impl 'static + FnOnce() + Send) {
60-
let conf = std::thread::Builder::new().name(name);
61-
let runtime = tokio::runtime::Handle::current();
62-
conf.spawn(move || {
63-
let _runtime_guard = runtime.enter();
64-
f()
65-
})
66-
.unwrap();
67-
}

node/src/main.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,15 @@ lazy_static! {
5555
.map(|s| BlockNumber::from_str(&s)
5656
.unwrap_or_else(|_| panic!("failed to parse env var ETHEREUM_ANCESTOR_COUNT")))
5757
.unwrap_or(50);
58+
59+
// Maximum number of blocking threads in the tokio blocking thread pool. This should always be
60+
// well above the number of subgraphs deployed to an index node, because each subgraph takes up
61+
// one thread. Defaults to 2000.
62+
static ref MAX_BLOCKING_THREADS: usize = env::var("GRAPH_MAX_BLOCKING_THREADS")
63+
.ok()
64+
.map(|s| usize::from_str(&s)
65+
.unwrap_or_else(|_| panic!("failed to parse env var ETHEREUM_ANCESTOR_COUNT")))
66+
.unwrap_or(2000);
5867
}
5968

6069
/// How long we will hold up node startup to get the net version and genesis
@@ -93,8 +102,16 @@ fn read_expensive_queries() -> Result<Vec<Arc<q::Document>>, std::io::Error> {
93102
Ok(queries)
94103
}
95104

96-
#[tokio::main]
97-
async fn main() {
105+
fn main() {
106+
tokio::runtime::Builder::new_multi_thread()
107+
.enable_all()
108+
.max_blocking_threads(*MAX_BLOCKING_THREADS)
109+
.build()
110+
.unwrap()
111+
.block_on(async { async_main().await })
112+
}
113+
114+
async fn async_main() {
98115
env_logger::init();
99116

100117
// Allow configuring fail points on debug builds. Used for integration tests.

0 commit comments

Comments
 (0)