Skip to content

Commit d0282c6

Browse files
authored
Update substreams encoding (#3972)
1 parent 3fd82bb commit d0282c6

File tree

10 files changed

+255
-276
lines changed

10 files changed

+255
-276
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chain/substreams/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ dirs-next = "2.0"
2222
anyhow = "1.0"
2323
tiny-keccak = "1.5.0"
2424
hex = "0.4.3"
25-
semver = "1.0.14"
25+
semver = "1.0.12"
26+
base64 = "0.13.0"
2627

2728
itertools = "0.10.5"
2829

chain/substreams/examples/substreams.rs

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,7 @@ use graph::blockchain::block_stream::BlockStreamEvent;
33
use graph::blockchain::substreams_block_stream::SubstreamsBlockStream;
44
use graph::prelude::{info, tokio, DeploymentHash, Registry};
55
use graph::tokio_stream::StreamExt;
6-
use graph::{
7-
env::env_var,
8-
firehose::FirehoseEndpoint,
9-
log::logger,
10-
substreams::{self},
11-
};
6+
use graph::{env::env_var, firehose::FirehoseEndpoint, log::logger, substreams};
127
use graph_chain_substreams::mapper::Mapper;
138
use graph_core::MetricsRegistry;
149
use prost::Message;
@@ -27,7 +22,7 @@ async fn main() -> Result<(), Error> {
2722

2823
let endpoint = env_var(
2924
"SUBSTREAMS_ENDPOINT",
30-
"https://api-dev.streamingfast.io".to_string(),
25+
"https://api.streamingfast.io".to_string(),
3126
);
3227

3328
let package_file = env_var("SUBSTREAMS_PACKAGE", "".to_string());
@@ -79,17 +74,9 @@ async fn main() -> Result<(), Error> {
7974
Ok(block_stream_event) => match block_stream_event {
8075
BlockStreamEvent::Revert(_, _) => {}
8176
BlockStreamEvent::ProcessBlock(block_with_trigger, _) => {
82-
let changes = block_with_trigger.block;
83-
for change in changes.entity_changes {
84-
info!(&logger, "----- Entity -----");
85-
info!(
86-
&logger,
87-
"name: {} operation: {}", change.entity, change.operation
88-
);
77+
for change in block_with_trigger.block.changes.entity_changes {
8978
for field in change.fields {
90-
info!(&logger, "field: {}, type: {}", field.name, field.value_type);
91-
info!(&logger, "new value: {}", hex::encode(field.new_value));
92-
info!(&logger, "old value: {}", hex::encode(field.old_value));
79+
info!(&logger, "field: {:?}", field);
9380
}
9481
}
9582
}

chain/substreams/proto/codec.proto

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,16 @@ syntax = "proto3";
22

33
package substreams.entity.v1;
44

5-
message EntitiesChanges {
6-
bytes block_id = 1;
7-
uint64 block_number = 2;
8-
bytes prev_block_id = 3;
9-
uint64 prev_block_number = 4;
10-
repeated EntityChange entityChanges = 5;
5+
message EntityChanges {
6+
repeated EntityChange entity_changes = 5;
117
}
128

139
message EntityChange {
1410
string entity = 1;
15-
bytes id = 2;
11+
string id = 2;
1612
uint64 ordinal = 3;
1713
enum Operation {
18-
UNSET = 0; // Protobuf default should not be used, this is used so that the consume can ensure that the value was actually specified
14+
UNSET = 0; // Protobuf default should not be used, this is used so that the consume can ensure that the value was actually specified
1915
CREATE = 1;
2016
UPDATE = 2;
2117
DELETE = 3;
@@ -24,19 +20,27 @@ message EntityChange {
2420
repeated Field fields = 5;
2521
}
2622

23+
message Value {
24+
oneof typed {
25+
int32 int32 = 1;
26+
string bigdecimal = 2;
27+
string bigint = 3;
28+
string string = 4;
29+
bytes bytes = 5;
30+
bool bool = 6;
31+
32+
//reserved 7 to 9; // For future types
33+
34+
Array array = 10;
35+
}
36+
}
37+
38+
message Array {
39+
repeated Value value = 1;
40+
}
41+
2742
message Field {
2843
string name = 1;
29-
enum Type {
30-
UNSET = 0; // Protobuf default should not be used, this is used so that the consume can ensure that the value was actually specified
31-
BIGDECIMAL = 1;
32-
BIGINT = 2;
33-
INT = 3; // int32
34-
BYTES = 4;
35-
STRING = 5;
36-
}
37-
Type value_type = 2;
38-
bytes new_value = 3;
39-
bool new_value_null = 4;
40-
bytes old_value = 5;
41-
bool old_value_null = 6;
44+
optional Value new_value = 3;
45+
optional Value old_value = 5;
4246
}

chain/substreams/src/chain.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{data_source::*, Block, TriggerData, TriggerFilter, TriggersAdapter};
1+
use crate::{data_source::*, EntityChanges, TriggerData, TriggerFilter, TriggersAdapter};
22
use anyhow::Error;
33
use core::fmt;
44
use graph::firehose::FirehoseEndpoints;
@@ -17,19 +17,23 @@ use graph::{
1717
};
1818
use std::{str::FromStr, sync::Arc};
1919

20+
#[derive(Default, Debug, Clone)]
21+
pub struct Block {
22+
pub hash: BlockHash,
23+
pub number: BlockNumber,
24+
pub changes: EntityChanges,
25+
}
26+
2027
impl blockchain::Block for Block {
2128
fn ptr(&self) -> BlockPtr {
22-
return BlockPtr {
23-
hash: BlockHash(Box::from(self.block_id.clone())),
24-
number: self.block_number as i32,
25-
};
29+
BlockPtr {
30+
hash: self.hash.clone(),
31+
number: self.number,
32+
}
2633
}
2734

2835
fn parent_ptr(&self) -> Option<BlockPtr> {
29-
Some(BlockPtr {
30-
hash: BlockHash(Box::from(self.prev_block_id.clone())),
31-
number: self.prev_block_number as i32,
32-
})
36+
None
3337
}
3438
}
3539

chain/substreams/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,8 @@ pub mod mapper;
88

99
pub use block_stream::BlockStreamBuilder;
1010
pub use chain::*;
11-
pub use codec::EntitiesChanges as Block;
11+
pub use codec::EntityChanges;
1212
pub use data_source::*;
1313
pub use trigger::*;
1414

15-
pub use codec::field::Type as FieldType;
1615
pub use codec::Field;

chain/substreams/src/mapper.rs

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
use crate::{Block, Chain, TriggerData};
1+
use crate::{Block, Chain, EntityChanges, TriggerData};
22
use graph::blockchain::block_stream::SubstreamsError::{
33
MultipleModuleOutputError, UnexpectedStoreDeltaOutput,
44
};
55
use graph::blockchain::block_stream::{
66
BlockStreamEvent, BlockWithTriggers, FirehoseCursor, SubstreamsError, SubstreamsMapper,
77
};
8-
use graph::prelude::{async_trait, BlockNumber, BlockPtr, Logger};
8+
use graph::prelude::{async_trait, BlockHash, BlockNumber, BlockPtr, Logger};
99
use graph::substreams::module_output::Data;
10-
use graph::substreams::{BlockScopedData, ForkStep};
10+
use graph::substreams::{BlockScopedData, Clock, ForkStep};
1111
use prost::Message;
1212

1313
pub struct Mapper {}
@@ -19,28 +19,50 @@ impl SubstreamsMapper<Chain> for Mapper {
1919
_logger: &Logger,
2020
block_scoped_data: &BlockScopedData,
2121
) -> Result<Option<BlockStreamEvent<Chain>>, SubstreamsError> {
22-
let step = ForkStep::from_i32(block_scoped_data.step).unwrap_or_else(|| {
22+
let BlockScopedData {
23+
outputs,
24+
clock,
25+
step,
26+
cursor: _,
27+
} = block_scoped_data;
28+
29+
let step = ForkStep::from_i32(*step).unwrap_or_else(|| {
2330
panic!(
2431
"unknown step i32 value {}, maybe you forgot update & re-regenerate the protobuf definitions?",
25-
block_scoped_data.step
32+
step
2633
)
2734
});
2835

29-
if block_scoped_data.outputs.len() == 0 {
36+
if outputs.len() == 0 {
3037
return Ok(None);
3138
}
3239

33-
if block_scoped_data.outputs.len() > 1 {
34-
return Err(MultipleModuleOutputError());
40+
if outputs.len() > 1 {
41+
return Err(MultipleModuleOutputError);
3542
}
3643

3744
//todo: handle step
3845
let module_output = &block_scoped_data.outputs[0];
3946
let cursor = &block_scoped_data.cursor;
4047

41-
match module_output.data.as_ref().unwrap() {
42-
Data::MapOutput(msg) => {
43-
let changes: Block = Message::decode(msg.value.as_slice()).unwrap();
48+
let clock = match clock {
49+
Some(clock) => clock,
50+
None => return Err(SubstreamsError::MissingClockError),
51+
};
52+
53+
let Clock {
54+
id: hash,
55+
number,
56+
timestamp: _,
57+
} = clock;
58+
59+
let hash: BlockHash = hash.as_str().try_into()?;
60+
let number: BlockNumber = *number as BlockNumber;
61+
62+
match module_output.data.as_ref() {
63+
Some(Data::MapOutput(msg)) => {
64+
let changes: EntityChanges = Message::decode(msg.value.as_slice())
65+
.map_err(SubstreamsError::DecodingError)?;
4466

4567
use ForkStep::*;
4668
match step {
@@ -52,14 +74,18 @@ impl SubstreamsMapper<Chain> for Mapper {
5274

5375
// TODO(filipe): Fix once either trigger data can be empty
5476
// or we move the changes into trigger data.
55-
BlockWithTriggers::new(changes, vec![TriggerData {}]),
77+
BlockWithTriggers::new(
78+
Block {
79+
hash,
80+
number,
81+
changes,
82+
},
83+
vec![TriggerData {}],
84+
),
5685
FirehoseCursor::from(cursor.clone()),
5786
))),
5887
StepUndo => {
59-
let parent_ptr = BlockPtr {
60-
hash: changes.prev_block_id.clone().into(),
61-
number: changes.prev_block_number as BlockNumber,
62-
};
88+
let parent_ptr = BlockPtr { hash, number };
6389

6490
Ok(Some(BlockStreamEvent::Revert(
6591
parent_ptr,
@@ -71,7 +97,8 @@ impl SubstreamsMapper<Chain> for Mapper {
7197
}
7298
}
7399
}
74-
Data::StoreDeltas(_) => Err(UnexpectedStoreDeltaOutput()),
100+
Some(Data::StoreDeltas(_)) => Err(UnexpectedStoreDeltaOutput),
101+
_ => Err(SubstreamsError::ModuleOutputNotPresentOrUnexpected),
75102
}
76103
}
77104
}
Lines changed: 39 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,14 @@
11
#[derive(Clone, PartialEq, ::prost::Message)]
2-
pub struct EntitiesChanges {
3-
#[prost(bytes="vec", tag="1")]
4-
pub block_id: ::prost::alloc::vec::Vec<u8>,
5-
#[prost(uint64, tag="2")]
6-
pub block_number: u64,
7-
#[prost(bytes="vec", tag="3")]
8-
pub prev_block_id: ::prost::alloc::vec::Vec<u8>,
9-
#[prost(uint64, tag="4")]
10-
pub prev_block_number: u64,
2+
pub struct EntityChanges {
113
#[prost(message, repeated, tag="5")]
124
pub entity_changes: ::prost::alloc::vec::Vec<EntityChange>,
135
}
146
#[derive(Clone, PartialEq, ::prost::Message)]
157
pub struct EntityChange {
168
#[prost(string, tag="1")]
179
pub entity: ::prost::alloc::string::String,
18-
#[prost(bytes="vec", tag="2")]
19-
pub id: ::prost::alloc::vec::Vec<u8>,
10+
#[prost(string, tag="2")]
11+
pub id: ::prost::alloc::string::String,
2012
#[prost(uint64, tag="3")]
2113
pub ordinal: u64,
2214
#[prost(enumeration="entity_change::Operation", tag="4")]
@@ -37,32 +29,43 @@ pub mod entity_change {
3729
}
3830
}
3931
#[derive(Clone, PartialEq, ::prost::Message)]
32+
pub struct Value {
33+
#[prost(oneof="value::Typed", tags="1, 2, 3, 4, 5, 6, 10")]
34+
pub typed: ::core::option::Option<value::Typed>,
35+
}
36+
/// Nested message and enum types in `Value`.
37+
pub mod value {
38+
#[derive(Clone, PartialEq, ::prost::Oneof)]
39+
pub enum Typed {
40+
#[prost(int32, tag="1")]
41+
Int32(i32),
42+
#[prost(string, tag="2")]
43+
Bigdecimal(::prost::alloc::string::String),
44+
#[prost(string, tag="3")]
45+
Bigint(::prost::alloc::string::String),
46+
#[prost(string, tag="4")]
47+
String(::prost::alloc::string::String),
48+
#[prost(bytes, tag="5")]
49+
Bytes(::prost::alloc::vec::Vec<u8>),
50+
#[prost(bool, tag="6")]
51+
Bool(bool),
52+
//reserved 7 to 9; // For future types
53+
54+
#[prost(message, tag="10")]
55+
Array(super::Array),
56+
}
57+
}
58+
#[derive(Clone, PartialEq, ::prost::Message)]
59+
pub struct Array {
60+
#[prost(message, repeated, tag="1")]
61+
pub value: ::prost::alloc::vec::Vec<Value>,
62+
}
63+
#[derive(Clone, PartialEq, ::prost::Message)]
4064
pub struct Field {
4165
#[prost(string, tag="1")]
4266
pub name: ::prost::alloc::string::String,
43-
#[prost(enumeration="field::Type", tag="2")]
44-
pub value_type: i32,
45-
#[prost(bytes="vec", tag="3")]
46-
pub new_value: ::prost::alloc::vec::Vec<u8>,
47-
#[prost(bool, tag="4")]
48-
pub new_value_null: bool,
49-
#[prost(bytes="vec", tag="5")]
50-
pub old_value: ::prost::alloc::vec::Vec<u8>,
51-
#[prost(bool, tag="6")]
52-
pub old_value_null: bool,
53-
}
54-
/// Nested message and enum types in `Field`.
55-
pub mod field {
56-
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
57-
#[repr(i32)]
58-
pub enum Type {
59-
/// Protobuf default should not be used, this is used so that the consume can ensure that the value was actually specified
60-
Unset = 0,
61-
Bigdecimal = 1,
62-
Bigint = 2,
63-
/// int32
64-
Int = 3,
65-
Bytes = 4,
66-
String = 5,
67-
}
67+
#[prost(message, optional, tag="3")]
68+
pub new_value: ::core::option::Option<Value>,
69+
#[prost(message, optional, tag="5")]
70+
pub old_value: ::core::option::Option<Value>,
6871
}

0 commit comments

Comments
 (0)