Skip to content

Commit fef1e62

Browse files
committed
firehose connection loadbalance
1 parent d0282c6 commit fef1e62

File tree

6 files changed

+34
-33
lines changed

6 files changed

+34
-33
lines changed

chain/ethereum/examples/firehose.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ async fn main() -> Result<(), Error> {
2525
token,
2626
false,
2727
false,
28-
1,
2928
));
3029

3130
loop {

chain/substreams/examples/substreams.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ async fn main() -> Result<(), Error> {
4646
token,
4747
false,
4848
false,
49-
1,
5049
));
5150

5251
let mut stream: SubstreamsBlockStream<graph_chain_substreams::Chain> =

docker/docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,5 @@ services:
4141
POSTGRES_PASSWORD: let-me-in
4242
POSTGRES_DB: graph-node
4343
PGDATA: "/data/postgres"
44-
volumes:
45-
- ./data/postgres:/var/lib/postgresql/data
44+
# volumes:
45+
# - ./data/postgres:/var/lib/postgresql/data

graph/src/firehose/endpoints.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,8 @@ use crate::{
99
};
1010
use futures03::StreamExt;
1111
use http::uri::{Scheme, Uri};
12-
use rand::prelude::IteratorRandom;
1312
use slog::Logger;
14-
use std::{collections::BTreeMap, fmt::Display, iter, sync::Arc, time::Duration};
13+
use std::{collections::BTreeMap, fmt::Display, sync::Arc, time::Duration};
1514
use tonic::{
1615
metadata::MetadataValue,
1716
transport::{Channel, ClientTlsConfig},
@@ -42,7 +41,6 @@ impl FirehoseEndpoint {
4241
token: Option<String>,
4342
filters_enabled: bool,
4443
compression_enabled: bool,
45-
conn_pool_size: u16,
4644
) -> Self {
4745
let uri = url
4846
.as_ref()
@@ -75,8 +73,7 @@ impl FirehoseEndpoint {
7573
// Timeout on each request, so the timeout to estabilish each 'Blocks' stream.
7674
.timeout(Duration::from_secs(120));
7775

78-
// Load balancing on a same endpoint is useful because it creates a connection pool.
79-
let channel = Channel::balance_list(iter::repeat(endpoint).take(conn_pool_size as usize));
76+
let channel = Channel::balance_list(vec![endpoint].into_iter());
8077

8178
FirehoseEndpoint {
8279
provider: provider.as_ref().to_string(),
@@ -267,10 +264,12 @@ impl FirehoseEndpoints {
267264
self.0.len()
268265
}
269266

267+
// selects the FirehoseEndpoint with the lest amount of references, which will help with spliting
268+
// the load naively across the entire list.
270269
pub fn random(&self) -> Option<&Arc<FirehoseEndpoint>> {
271-
// Select from the matching adapters randomly
272-
let mut rng = rand::thread_rng();
273-
self.0.iter().choose(&mut rng)
270+
self.0
271+
.iter()
272+
.min_by(|x, y| Arc::strong_count(x).cmp(&Arc::strong_count(y)))
274273
}
275274

276275
pub fn remove(&mut self, provider: &str) {

node/src/chain.rs

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -124,19 +124,22 @@ pub fn create_substreams_networks(
124124
"provider" => &provider.label,
125125
);
126126

127-
let endpoint = FirehoseEndpoint::new(
128-
&provider.label,
129-
&firehose.url,
130-
firehose.token.clone(),
131-
firehose.filters_enabled(),
132-
firehose.compression_enabled(),
133-
firehose.conn_pool_size,
134-
);
135-
136127
let parsed_networks = networks_by_kind
137128
.entry(chain.protocol)
138129
.or_insert_with(|| FirehoseNetworks::new());
139-
parsed_networks.insert(name.to_string(), Arc::new(endpoint));
130+
131+
for i in 0..firehose.conn_pool_size {
132+
parsed_networks.insert(
133+
name.to_string(),
134+
Arc::new(FirehoseEndpoint::new(
135+
&format!("{}-{}", provider.label, i),
136+
&firehose.url,
137+
firehose.token.clone(),
138+
firehose.filters_enabled(),
139+
firehose.compression_enabled(),
140+
)),
141+
);
142+
}
140143
}
141144
}
142145
}
@@ -166,19 +169,21 @@ pub fn create_firehose_networks(
166169
"provider" => &provider.label,
167170
);
168171

169-
let endpoint = FirehoseEndpoint::new(
170-
&provider.label,
171-
&firehose.url,
172-
firehose.token.clone(),
173-
firehose.filters_enabled(),
174-
firehose.compression_enabled(),
175-
firehose.conn_pool_size,
176-
);
177-
178172
let parsed_networks = networks_by_kind
179173
.entry(chain.protocol)
180174
.or_insert_with(|| FirehoseNetworks::new());
181-
parsed_networks.insert(name.to_string(), Arc::new(endpoint));
175+
for i in 0..firehose.conn_pool_size {
176+
parsed_networks.insert(
177+
name.to_string(),
178+
Arc::new(FirehoseEndpoint::new(
179+
&format!("{}-{}", provider.label, i),
180+
&firehose.url,
181+
firehose.token.clone(),
182+
firehose.filters_enabled(),
183+
firehose.compression_enabled(),
184+
)),
185+
);
186+
}
182187
}
183188
}
184189
}

tests/src/fixture/ethereum.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ pub async fn chain(blocks: Vec<BlockWithTriggers<Chain>>, stores: &Stores) -> Ch
3434
None,
3535
true,
3636
false,
37-
0,
3837
))]
3938
.into();
4039

0 commit comments

Comments
 (0)