Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 39 additions & 37 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use graph::blockchain::Blockchain;
use graph::blockchain::NodeCapabilities;
use graph::blockchain::{BlockchainKind, TriggerFilter};
use graph::components::subgraph::ProofOfIndexingVersion;
use graph::data::subgraph::SPEC_VERSION_0_0_6;
use graph::data::subgraph::{UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6};
use graph::prelude::{SubgraphInstanceManager as SubgraphInstanceManagerTrait, *};
use graph::{blockchain::BlockchainMap, components::store::DeploymentLocator};
use graph_runtime_wasm::module::ToAscPtr;
Expand Down Expand Up @@ -176,47 +176,49 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
.writable(logger.clone(), deployment.id)
.await?;

let raw_yaml = serde_yaml::to_string(&manifest).unwrap();
let manifest = UnresolvedSubgraphManifest::parse(deployment.hash.cheap_clone(), manifest)?;

// Make sure the `raw_yaml` is present on both this subgraph and the graft base.
self.subgraph_store
.set_manifest_raw_yaml(&deployment.hash, raw_yaml)
.await?;
if let Some(graft) = &manifest.graft {
let file_bytes = self
.link_resolver
.cat(&logger, &graft.base.to_ipfs_link())
.await?;
let yaml = String::from_utf8(file_bytes)?;
self.subgraph_store
.set_manifest_raw_yaml(&graft.base, yaml)
.await?;
}

info!(logger, "Resolve subgraph files using IPFS");

// Allow for infinite retries for subgraph definition files.
let link_resolver = Arc::from(self.link_resolver.with_retries());
let mut manifest = manifest
.resolve(&link_resolver, &logger, ENV_VARS.max_spec_version.clone())
.await?;

info!(logger, "Successfully resolved subgraph files using IPFS");

let manifest_idx_and_name: Vec<(u32, String)> = manifest.template_idx_and_name().collect();

// Start the subgraph deployment before reading dynamic data
// sources; if the subgraph is a graft or a copy, starting it will
// do the copying and dynamic data sources won't show up until after
// that is done
store.start_subgraph_deployment(&logger).await?;

let (manifest, manifest_idx_and_name, static_data_sources) = {
info!(logger, "Resolve subgraph files using IPFS");

let mut manifest = SubgraphManifest::resolve_from_raw(
deployment.hash.cheap_clone(),
manifest,
// Allow for infinite retries for subgraph definition files.
&Arc::from(self.link_resolver.with_retries()),
&logger,
ENV_VARS.max_spec_version.clone(),
)
.await
.context("Failed to resolve subgraph from IPFS")?;

// We cannot include static data sources in the map because a static data source and a
// template may have the same name in the manifest.
let ds_len = manifest.data_sources.len() as u32;
let manifest_idx_and_name: Vec<(u32, String)> = manifest
.templates
.iter()
.map(|t| t.name().to_owned())
.enumerate()
.map(|(idx, name)| (ds_len + idx as u32, name))
.collect();

let data_sources = load_dynamic_data_sources(
store.clone(),
logger.clone(),
&manifest,
manifest_idx_and_name.clone(),
)
.await
.context("Failed to load dynamic data sources")?;

info!(logger, "Successfully resolved subgraph files using IPFS");
// Dynamic data sources are loaded by appending them to the manifest.
//
// Refactor: Preferrably we'd avoid any mutation of the manifest.
let (manifest, static_data_sources) = {
let data_sources = load_dynamic_data_sources(store.clone(), logger.clone(), &manifest)
.await
.context("Failed to load dynamic data sources")?;

let static_data_sources = manifest.data_sources.clone();

Expand All @@ -229,7 +231,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
manifest.data_sources.len()
);

(manifest, manifest_idx_and_name, static_data_sources)
(manifest, static_data_sources)
};

let static_filters =
Expand Down
2 changes: 1 addition & 1 deletion core/src/subgraph/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ pub async fn load_dynamic_data_sources<C: Blockchain>(
store: Arc<dyn WritableStore>,
logger: Logger,
manifest: &SubgraphManifest<C>,
manifest_idx_and_name: Vec<(u32, String)>,
) -> Result<Vec<DataSource<C>>, Error> {
let manifest_idx_and_name = manifest.template_idx_and_name().collect();
let start_time = Instant::now();

let mut data_sources: Vec<DataSource<C>> = vec![];
Expand Down
3 changes: 2 additions & 1 deletion core/src/subgraph/registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
version_switching_mode: SubgraphVersionSwitchingMode,
resolver: &Arc<dyn LinkResolver>,
) -> Result<DeploymentLocator, SubgraphRegistrarError> {
let raw_string = serde_yaml::to_string(&raw).unwrap();
let unvalidated = UnvalidatedSubgraphManifest::<C>::resolve(
deployment,
raw,
Expand Down Expand Up @@ -618,7 +619,7 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(

// Apply the subgraph versioning and deployment operations,
// creating a new subgraph deployment if one doesn't exist.
let deployment = DeploymentCreate::new(&manifest, start_block)
let deployment = DeploymentCreate::new(raw_string, &manifest, start_block)
.graft(base_block)
.debug(debug_fork);
deployment_store
Expand Down
7 changes: 7 additions & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ pub trait SubgraphStore: Send + Sync + 'static {

/// Find the deployment locators for the subgraph with the given hash
fn locators(&self, hash: &str) -> Result<Vec<DeploymentLocator>, StoreError>;

/// This migrates subgraphs that existed before the raw_yaml column was added.
async fn set_manifest_raw_yaml(
&self,
hash: &DeploymentHash,
raw_yaml: String,
) -> Result<(), StoreError>;
}

pub trait ReadStore: Send + Sync + 'static {
Expand Down
77 changes: 46 additions & 31 deletions graph/src/data/subgraph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ pub mod status;

pub use features::{SubgraphFeature, SubgraphFeatureValidationError};

use anyhow::ensure;
use anyhow::{anyhow, Error};
use futures03::{future::try_join3, stream::FuturesOrdered, TryStreamExt as _};
use semver::Version;
use serde::{de, ser};
use serde_yaml;
use slog::{debug, info, Logger};
use slog::{info, Logger};
use stable_hash::{FieldAddress, StableHash};
use stable_hash_legacy::SequenceNumber;
use std::{collections::BTreeSet, marker::PhantomData};
Expand All @@ -25,6 +24,7 @@ use wasmparser;
use web3::types::Address;

use crate::{
bail,
blockchain::{BlockPtr, Blockchain, DataSource as _},
components::{
link_resolver::LinkResolver,
Expand All @@ -41,6 +41,7 @@ use crate::{
offchain::OFFCHAIN_KINDS, DataSource, DataSourceTemplate, UnresolvedDataSource,
UnresolvedDataSourceTemplate,
},
ensure,
prelude::{r, CheapClone, ENV_VARS},
};

Expand Down Expand Up @@ -356,7 +357,7 @@ pub enum SubgraphManifestResolveError {
#[error("subgraph is not valid YAML")]
InvalidFormat,
#[error("resolve error: {0}")]
ResolveError(anyhow::Error),
ResolveError(#[from] anyhow::Error),
}

/// Data source contexts are conveniently represented as entities.
Expand Down Expand Up @@ -496,7 +497,7 @@ pub struct BaseSubgraphManifest<C, S, D, T> {
}

/// SubgraphManifest with IPFS links unresolved
type UnresolvedSubgraphManifest<C> = BaseSubgraphManifest<
pub type UnresolvedSubgraphManifest<C> = BaseSubgraphManifest<
C,
UnresolvedSchema,
UnresolvedDataSource<C>,
Expand Down Expand Up @@ -614,35 +615,16 @@ impl<C: Blockchain> SubgraphManifest<C> {
/// Entry point for resolving a subgraph definition.
pub async fn resolve_from_raw(
id: DeploymentHash,
mut raw: serde_yaml::Mapping,
raw: serde_yaml::Mapping,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
max_spec_version: semver::Version,
) -> Result<Self, SubgraphManifestResolveError> {
// Inject the IPFS hash as the ID of the subgraph into the definition.
raw.insert("id".into(), id.to_string().into());

// Parse the YAML data into an UnresolvedSubgraphManifest
let unresolved: UnresolvedSubgraphManifest<C> = serde_yaml::from_value(raw.into())?;

debug!(logger, "Features {:?}", unresolved.features);
let unresolved = UnresolvedSubgraphManifest::parse(id, raw)?;

let resolved = unresolved
.resolve(resolver, logger, max_spec_version)
.await
.map_err(SubgraphManifestResolveError::ResolveError)?;

if (resolved.spec_version < SPEC_VERSION_0_0_7)
&& resolved
.data_sources
.iter()
.any(|ds| OFFCHAIN_KINDS.contains(&ds.kind()))
{
return Err(SubgraphManifestResolveError::ResolveError(anyhow!(
"Offchain data sources not supported prior to {}",
SPEC_VERSION_0_0_7
)));
}
.await?;

Ok(resolved)
}
Expand Down Expand Up @@ -685,15 +667,37 @@ impl<C: Blockchain> SubgraphManifest<C> {
) -> Result<UnifiedMappingApiVersion, DifferentMappingApiVersions> {
UnifiedMappingApiVersion::try_from_versions(self.api_versions())
}

pub fn template_idx_and_name(&self) -> impl Iterator<Item = (u32, String)> + '_ {
// We cannot include static data sources in the map because a static data source and a
// template may have the same name in the manifest. Duplicated with
// `UnresolvedSubgraphManifest::template_idx_and_name`.
let ds_len = self.data_sources.len() as u32;
self.templates
.iter()
.map(|t| t.name().to_owned())
.enumerate()
.map(move |(idx, name)| (ds_len + idx as u32, name))
}
}

impl<C: Blockchain> UnresolvedSubgraphManifest<C> {
pub fn parse(
id: DeploymentHash,
mut raw: serde_yaml::Mapping,
) -> Result<Self, SubgraphManifestResolveError> {
// Inject the IPFS hash as the ID of the subgraph into the definition.
raw.insert("id".into(), id.to_string().into());

serde_yaml::from_value(raw.into()).map_err(Into::into)
}

pub async fn resolve(
self,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
max_spec_version: semver::Version,
) -> Result<SubgraphManifest<C>, anyhow::Error> {
) -> Result<SubgraphManifest<C>, SubgraphManifestResolveError> {
let UnresolvedSubgraphManifest {
id,
spec_version,
Expand All @@ -714,14 +718,14 @@ impl<C: Blockchain> UnresolvedSubgraphManifest<C> {
max_spec_version,
id,
spec_version
));
).into());
}

let ds_count = data_sources.len();
if ds_count as u64 + templates.len() as u64 > u32::MAX as u64 {
return Err(anyhow!(
"Subgraph has too many declared data sources and templates",
));
return Err(
anyhow!("Subgraph has too many declared data sources and templates",).into(),
);
}

let (schema, data_sources, templates) = try_join3(
Expand Down Expand Up @@ -754,6 +758,17 @@ impl<C: Blockchain> UnresolvedSubgraphManifest<C> {
);
}

if spec_version < SPEC_VERSION_0_0_7
&& data_sources
.iter()
.any(|ds| OFFCHAIN_KINDS.contains(&ds.kind()))
{
bail!(
"Offchain data sources not supported prior to {}",
SPEC_VERSION_0_0_7
);
}

Ok(SubgraphManifest {
id,
spec_version,
Expand Down
43 changes: 39 additions & 4 deletions graph/src/data/subgraph/schema.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Entity types that contain the graph-node state.

use anyhow::{anyhow, Error};
use anyhow::{anyhow, bail, Error};
use hex;
use lazy_static::lazy_static;
use rand::rngs::OsRng;
Expand Down Expand Up @@ -110,11 +110,12 @@ pub struct DeploymentCreate {

impl DeploymentCreate {
pub fn new(
raw_manifest: String,
source_manifest: &SubgraphManifest<impl Blockchain>,
earliest_block: Option<BlockPtr>,
) -> Self {
Self {
manifest: SubgraphManifestEntity::from(source_manifest),
manifest: SubgraphManifestEntity::new(raw_manifest, source_manifest),
earliest_block: earliest_block.cheap_clone(),
graft_base: None,
graft_block: None,
Expand Down Expand Up @@ -163,18 +164,52 @@ pub struct SubgraphManifestEntity {
pub repository: Option<String>,
pub features: Vec<String>,
pub schema: String,
pub raw_yaml: Option<String>,
}

impl<'a, C: Blockchain> From<&'a super::SubgraphManifest<C>> for SubgraphManifestEntity {
fn from(manifest: &'a super::SubgraphManifest<C>) -> Self {
impl SubgraphManifestEntity {
pub fn new(raw_yaml: String, manifest: &super::SubgraphManifest<impl Blockchain>) -> Self {
Self {
spec_version: manifest.spec_version.to_string(),
description: manifest.description.clone(),
repository: manifest.repository.clone(),
features: manifest.features.iter().map(|f| f.to_string()).collect(),
schema: manifest.schema.document.clone().to_string(),
raw_yaml: Some(raw_yaml),
}
}

pub fn template_idx_and_name(&self) -> Result<Vec<(i32, String)>, Error> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand we're using i32 to simplify comparing these values with query results, but it would be nice to postpone the conversion and create a type alias for this tuple.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be but I'll be lazy and leave it for now

#[derive(Debug, Deserialize)]
struct MinimalDs {
name: String,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct MinimalManifest {
data_sources: Vec<MinimalDs>,
#[serde(default)]
templates: Vec<MinimalDs>,
}

let raw_yaml = match &self.raw_yaml {
Some(raw_yaml) => raw_yaml,
None => bail!("raw_yaml not present"),
};

let manifest: MinimalManifest = serde_yaml::from_str(raw_yaml)?;

let ds_len = manifest.data_sources.len() as i32;
let template_idx_and_name = manifest
.templates
.iter()
.map(|t| t.name.to_owned())
.enumerate()
.map(move |(idx, name)| (ds_len + idx as i32, name))
.collect();

Ok(template_idx_and_name)
}
}

#[derive(Clone, Debug)]
Expand Down
9 changes: 9 additions & 0 deletions graph/src/util/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,12 @@ macro_rules! ensure {
}
};
}

// `bail!` from `anyhow`, but calling `from`.
// For context see https://github.com/dtolnay/anyhow/issues/112#issuecomment-704549251.
#[macro_export]
macro_rules! bail {
($($err:tt)*) => {
return Err(anyhow::anyhow!($($err)*).into());
};
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we just use the existing anyhow::bail! macro? If this is truly needed, maybe add a comment why.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't call .into(), yhea needs a comment.

Loading