Skip to content

Commit 3b3b458

Browse files
authored
fix: Allow grafts to add data sources (#3989)
1 parent e747946 commit 3b3b458

File tree

23 files changed

+312
-104
lines changed

23 files changed

+312
-104
lines changed

core/src/subgraph/instance_manager.rs

Lines changed: 39 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use graph::blockchain::Blockchain;
88
use graph::blockchain::NodeCapabilities;
99
use graph::blockchain::{BlockchainKind, TriggerFilter};
1010
use graph::components::subgraph::ProofOfIndexingVersion;
11-
use graph::data::subgraph::SPEC_VERSION_0_0_6;
11+
use graph::data::subgraph::{UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6};
1212
use graph::prelude::{SubgraphInstanceManager as SubgraphInstanceManagerTrait, *};
1313
use graph::{blockchain::BlockchainMap, components::store::DeploymentLocator};
1414
use graph_runtime_wasm::module::ToAscPtr;
@@ -176,47 +176,49 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
176176
.writable(logger.clone(), deployment.id)
177177
.await?;
178178

179+
let raw_yaml = serde_yaml::to_string(&manifest).unwrap();
180+
let manifest = UnresolvedSubgraphManifest::parse(deployment.hash.cheap_clone(), manifest)?;
181+
182+
// Make sure the `raw_yaml` is present on both this subgraph and the graft base.
183+
self.subgraph_store
184+
.set_manifest_raw_yaml(&deployment.hash, raw_yaml)
185+
.await?;
186+
if let Some(graft) = &manifest.graft {
187+
let file_bytes = self
188+
.link_resolver
189+
.cat(&logger, &graft.base.to_ipfs_link())
190+
.await?;
191+
let yaml = String::from_utf8(file_bytes)?;
192+
self.subgraph_store
193+
.set_manifest_raw_yaml(&graft.base, yaml)
194+
.await?;
195+
}
196+
197+
info!(logger, "Resolve subgraph files using IPFS");
198+
199+
// Allow for infinite retries for subgraph definition files.
200+
let link_resolver = Arc::from(self.link_resolver.with_retries());
201+
let mut manifest = manifest
202+
.resolve(&link_resolver, &logger, ENV_VARS.max_spec_version.clone())
203+
.await?;
204+
205+
info!(logger, "Successfully resolved subgraph files using IPFS");
206+
207+
let manifest_idx_and_name: Vec<(u32, String)> = manifest.template_idx_and_name().collect();
208+
179209
// Start the subgraph deployment before reading dynamic data
180210
// sources; if the subgraph is a graft or a copy, starting it will
181211
// do the copying and dynamic data sources won't show up until after
182212
// that is done
183213
store.start_subgraph_deployment(&logger).await?;
184214

185-
let (manifest, manifest_idx_and_name, static_data_sources) = {
186-
info!(logger, "Resolve subgraph files using IPFS");
187-
188-
let mut manifest = SubgraphManifest::resolve_from_raw(
189-
deployment.hash.cheap_clone(),
190-
manifest,
191-
// Allow for infinite retries for subgraph definition files.
192-
&Arc::from(self.link_resolver.with_retries()),
193-
&logger,
194-
ENV_VARS.max_spec_version.clone(),
195-
)
196-
.await
197-
.context("Failed to resolve subgraph from IPFS")?;
198-
199-
// We cannot include static data sources in the map because a static data source and a
200-
// template may have the same name in the manifest.
201-
let ds_len = manifest.data_sources.len() as u32;
202-
let manifest_idx_and_name: Vec<(u32, String)> = manifest
203-
.templates
204-
.iter()
205-
.map(|t| t.name().to_owned())
206-
.enumerate()
207-
.map(|(idx, name)| (ds_len + idx as u32, name))
208-
.collect();
209-
210-
let data_sources = load_dynamic_data_sources(
211-
store.clone(),
212-
logger.clone(),
213-
&manifest,
214-
manifest_idx_and_name.clone(),
215-
)
216-
.await
217-
.context("Failed to load dynamic data sources")?;
218-
219-
info!(logger, "Successfully resolved subgraph files using IPFS");
215+
// Dynamic data sources are loaded by appending them to the manifest.
216+
//
217+
// Refactor: Preferrably we'd avoid any mutation of the manifest.
218+
let (manifest, static_data_sources) = {
219+
let data_sources = load_dynamic_data_sources(store.clone(), logger.clone(), &manifest)
220+
.await
221+
.context("Failed to load dynamic data sources")?;
220222

221223
let static_data_sources = manifest.data_sources.clone();
222224

@@ -229,7 +231,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
229231
manifest.data_sources.len()
230232
);
231233

232-
(manifest, manifest_idx_and_name, static_data_sources)
234+
(manifest, static_data_sources)
233235
};
234236

235237
let static_filters =

core/src/subgraph/loader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ pub async fn load_dynamic_data_sources<C: Blockchain>(
99
store: Arc<dyn WritableStore>,
1010
logger: Logger,
1111
manifest: &SubgraphManifest<C>,
12-
manifest_idx_and_name: Vec<(u32, String)>,
1312
) -> Result<Vec<DataSource<C>>, Error> {
13+
let manifest_idx_and_name = manifest.template_idx_and_name().collect();
1414
let start_time = Instant::now();
1515

1616
let mut data_sources: Vec<DataSource<C>> = vec![];

core/src/subgraph/registrar.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,7 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
552552
version_switching_mode: SubgraphVersionSwitchingMode,
553553
resolver: &Arc<dyn LinkResolver>,
554554
) -> Result<DeploymentLocator, SubgraphRegistrarError> {
555+
let raw_string = serde_yaml::to_string(&raw).unwrap();
555556
let unvalidated = UnvalidatedSubgraphManifest::<C>::resolve(
556557
deployment,
557558
raw,
@@ -618,7 +619,7 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
618619

619620
// Apply the subgraph versioning and deployment operations,
620621
// creating a new subgraph deployment if one doesn't exist.
621-
let deployment = DeploymentCreate::new(&manifest, start_block)
622+
let deployment = DeploymentCreate::new(raw_string, &manifest, start_block)
622623
.graft(base_block)
623624
.debug(debug_fork);
624625
deployment_store

graph/src/components/store/traits.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,13 @@ pub trait SubgraphStore: Send + Sync + 'static {
152152

153153
/// Find the deployment locators for the subgraph with the given hash
154154
fn locators(&self, hash: &str) -> Result<Vec<DeploymentLocator>, StoreError>;
155+
156+
/// This migrates subgraphs that existed before the raw_yaml column was added.
157+
async fn set_manifest_raw_yaml(
158+
&self,
159+
hash: &DeploymentHash,
160+
raw_yaml: String,
161+
) -> Result<(), StoreError>;
155162
}
156163

157164
pub trait ReadStore: Send + Sync + 'static {

graph/src/data/subgraph/mod.rs

Lines changed: 46 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,12 @@ pub mod status;
1010

1111
pub use features::{SubgraphFeature, SubgraphFeatureValidationError};
1212

13-
use anyhow::ensure;
1413
use anyhow::{anyhow, Error};
1514
use futures03::{future::try_join3, stream::FuturesOrdered, TryStreamExt as _};
1615
use semver::Version;
1716
use serde::{de, ser};
1817
use serde_yaml;
19-
use slog::{debug, info, Logger};
18+
use slog::{info, Logger};
2019
use stable_hash::{FieldAddress, StableHash};
2120
use stable_hash_legacy::SequenceNumber;
2221
use std::{collections::BTreeSet, marker::PhantomData};
@@ -25,6 +24,7 @@ use wasmparser;
2524
use web3::types::Address;
2625

2726
use crate::{
27+
bail,
2828
blockchain::{BlockPtr, Blockchain, DataSource as _},
2929
components::{
3030
link_resolver::LinkResolver,
@@ -41,6 +41,7 @@ use crate::{
4141
offchain::OFFCHAIN_KINDS, DataSource, DataSourceTemplate, UnresolvedDataSource,
4242
UnresolvedDataSourceTemplate,
4343
},
44+
ensure,
4445
prelude::{r, CheapClone, ENV_VARS},
4546
};
4647

@@ -356,7 +357,7 @@ pub enum SubgraphManifestResolveError {
356357
#[error("subgraph is not valid YAML")]
357358
InvalidFormat,
358359
#[error("resolve error: {0}")]
359-
ResolveError(anyhow::Error),
360+
ResolveError(#[from] anyhow::Error),
360361
}
361362

362363
/// Data source contexts are conveniently represented as entities.
@@ -496,7 +497,7 @@ pub struct BaseSubgraphManifest<C, S, D, T> {
496497
}
497498

498499
/// SubgraphManifest with IPFS links unresolved
499-
type UnresolvedSubgraphManifest<C> = BaseSubgraphManifest<
500+
pub type UnresolvedSubgraphManifest<C> = BaseSubgraphManifest<
500501
C,
501502
UnresolvedSchema,
502503
UnresolvedDataSource<C>,
@@ -614,35 +615,16 @@ impl<C: Blockchain> SubgraphManifest<C> {
614615
/// Entry point for resolving a subgraph definition.
615616
pub async fn resolve_from_raw(
616617
id: DeploymentHash,
617-
mut raw: serde_yaml::Mapping,
618+
raw: serde_yaml::Mapping,
618619
resolver: &Arc<dyn LinkResolver>,
619620
logger: &Logger,
620621
max_spec_version: semver::Version,
621622
) -> Result<Self, SubgraphManifestResolveError> {
622-
// Inject the IPFS hash as the ID of the subgraph into the definition.
623-
raw.insert("id".into(), id.to_string().into());
624-
625-
// Parse the YAML data into an UnresolvedSubgraphManifest
626-
let unresolved: UnresolvedSubgraphManifest<C> = serde_yaml::from_value(raw.into())?;
627-
628-
debug!(logger, "Features {:?}", unresolved.features);
623+
let unresolved = UnresolvedSubgraphManifest::parse(id, raw)?;
629624

630625
let resolved = unresolved
631626
.resolve(resolver, logger, max_spec_version)
632-
.await
633-
.map_err(SubgraphManifestResolveError::ResolveError)?;
634-
635-
if (resolved.spec_version < SPEC_VERSION_0_0_7)
636-
&& resolved
637-
.data_sources
638-
.iter()
639-
.any(|ds| OFFCHAIN_KINDS.contains(&ds.kind()))
640-
{
641-
return Err(SubgraphManifestResolveError::ResolveError(anyhow!(
642-
"Offchain data sources not supported prior to {}",
643-
SPEC_VERSION_0_0_7
644-
)));
645-
}
627+
.await?;
646628

647629
Ok(resolved)
648630
}
@@ -685,15 +667,37 @@ impl<C: Blockchain> SubgraphManifest<C> {
685667
) -> Result<UnifiedMappingApiVersion, DifferentMappingApiVersions> {
686668
UnifiedMappingApiVersion::try_from_versions(self.api_versions())
687669
}
670+
671+
pub fn template_idx_and_name(&self) -> impl Iterator<Item = (u32, String)> + '_ {
672+
// We cannot include static data sources in the map because a static data source and a
673+
// template may have the same name in the manifest. Duplicated with
674+
// `UnresolvedSubgraphManifest::template_idx_and_name`.
675+
let ds_len = self.data_sources.len() as u32;
676+
self.templates
677+
.iter()
678+
.map(|t| t.name().to_owned())
679+
.enumerate()
680+
.map(move |(idx, name)| (ds_len + idx as u32, name))
681+
}
688682
}
689683

690684
impl<C: Blockchain> UnresolvedSubgraphManifest<C> {
685+
pub fn parse(
686+
id: DeploymentHash,
687+
mut raw: serde_yaml::Mapping,
688+
) -> Result<Self, SubgraphManifestResolveError> {
689+
// Inject the IPFS hash as the ID of the subgraph into the definition.
690+
raw.insert("id".into(), id.to_string().into());
691+
692+
serde_yaml::from_value(raw.into()).map_err(Into::into)
693+
}
694+
691695
pub async fn resolve(
692696
self,
693697
resolver: &Arc<dyn LinkResolver>,
694698
logger: &Logger,
695699
max_spec_version: semver::Version,
696-
) -> Result<SubgraphManifest<C>, anyhow::Error> {
700+
) -> Result<SubgraphManifest<C>, SubgraphManifestResolveError> {
697701
let UnresolvedSubgraphManifest {
698702
id,
699703
spec_version,
@@ -714,14 +718,14 @@ impl<C: Blockchain> UnresolvedSubgraphManifest<C> {
714718
max_spec_version,
715719
id,
716720
spec_version
717-
));
721+
).into());
718722
}
719723

720724
let ds_count = data_sources.len();
721725
if ds_count as u64 + templates.len() as u64 > u32::MAX as u64 {
722-
return Err(anyhow!(
723-
"Subgraph has too many declared data sources and templates",
724-
));
726+
return Err(
727+
anyhow!("Subgraph has too many declared data sources and templates",).into(),
728+
);
725729
}
726730

727731
let (schema, data_sources, templates) = try_join3(
@@ -754,6 +758,17 @@ impl<C: Blockchain> UnresolvedSubgraphManifest<C> {
754758
);
755759
}
756760

761+
if spec_version < SPEC_VERSION_0_0_7
762+
&& data_sources
763+
.iter()
764+
.any(|ds| OFFCHAIN_KINDS.contains(&ds.kind()))
765+
{
766+
bail!(
767+
"Offchain data sources not supported prior to {}",
768+
SPEC_VERSION_0_0_7
769+
);
770+
}
771+
757772
Ok(SubgraphManifest {
758773
id,
759774
spec_version,

graph/src/data/subgraph/schema.rs

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Entity types that contain the graph-node state.
22
3-
use anyhow::{anyhow, Error};
3+
use anyhow::{anyhow, bail, Error};
44
use hex;
55
use lazy_static::lazy_static;
66
use rand::rngs::OsRng;
@@ -110,11 +110,12 @@ pub struct DeploymentCreate {
110110

111111
impl DeploymentCreate {
112112
pub fn new(
113+
raw_manifest: String,
113114
source_manifest: &SubgraphManifest<impl Blockchain>,
114115
earliest_block: Option<BlockPtr>,
115116
) -> Self {
116117
Self {
117-
manifest: SubgraphManifestEntity::from(source_manifest),
118+
manifest: SubgraphManifestEntity::new(raw_manifest, source_manifest),
118119
earliest_block: earliest_block.cheap_clone(),
119120
graft_base: None,
120121
graft_block: None,
@@ -163,18 +164,52 @@ pub struct SubgraphManifestEntity {
163164
pub repository: Option<String>,
164165
pub features: Vec<String>,
165166
pub schema: String,
167+
pub raw_yaml: Option<String>,
166168
}
167169

168-
impl<'a, C: Blockchain> From<&'a super::SubgraphManifest<C>> for SubgraphManifestEntity {
169-
fn from(manifest: &'a super::SubgraphManifest<C>) -> Self {
170+
impl SubgraphManifestEntity {
171+
pub fn new(raw_yaml: String, manifest: &super::SubgraphManifest<impl Blockchain>) -> Self {
170172
Self {
171173
spec_version: manifest.spec_version.to_string(),
172174
description: manifest.description.clone(),
173175
repository: manifest.repository.clone(),
174176
features: manifest.features.iter().map(|f| f.to_string()).collect(),
175177
schema: manifest.schema.document.clone().to_string(),
178+
raw_yaml: Some(raw_yaml),
176179
}
177180
}
181+
182+
pub fn template_idx_and_name(&self) -> Result<Vec<(i32, String)>, Error> {
183+
#[derive(Debug, Deserialize)]
184+
struct MinimalDs {
185+
name: String,
186+
}
187+
#[derive(Debug, Deserialize)]
188+
#[serde(rename_all = "camelCase")]
189+
struct MinimalManifest {
190+
data_sources: Vec<MinimalDs>,
191+
#[serde(default)]
192+
templates: Vec<MinimalDs>,
193+
}
194+
195+
let raw_yaml = match &self.raw_yaml {
196+
Some(raw_yaml) => raw_yaml,
197+
None => bail!("raw_yaml not present"),
198+
};
199+
200+
let manifest: MinimalManifest = serde_yaml::from_str(raw_yaml)?;
201+
202+
let ds_len = manifest.data_sources.len() as i32;
203+
let template_idx_and_name = manifest
204+
.templates
205+
.iter()
206+
.map(|t| t.name.to_owned())
207+
.enumerate()
208+
.map(move |(idx, name)| (ds_len + idx as i32, name))
209+
.collect();
210+
211+
Ok(template_idx_and_name)
212+
}
178213
}
179214

180215
#[derive(Clone, Debug)]

graph/src/util/error.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,12 @@ macro_rules! ensure {
1717
}
1818
};
1919
}
20+
21+
// `bail!` from `anyhow`, but calling `from`.
22+
// For context see https://github.com/dtolnay/anyhow/issues/112#issuecomment-704549251.
23+
#[macro_export]
24+
macro_rules! bail {
25+
($($err:tt)*) => {
26+
return Err(anyhow::anyhow!($($err)*).into());
27+
};
28+
}

0 commit comments

Comments
 (0)