diff --git a/Cargo.lock b/Cargo.lock index 03b55a00f6eef..b3fa0137cf430 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2876,6 +2876,7 @@ dependencies = [ "jsonrpc-pubsub 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc-hex 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-client 0.1.0", "substrate-codec 0.1.0", "substrate-executor 0.1.0", diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index bd7f3c5581a2e..623daeff3f3a0 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -568,7 +568,7 @@ mod tests { use test_client::{self, TestClient}; use test_client::client::BlockOrigin; use test_client::client::backend::Backend as TestBackend; - use test_client::runtime as test_runtime; + use test_client::{runtime as test_runtime, BlockBuilderExt}; use test_client::runtime::{Transfer, Extrinsic}; #[test] @@ -602,23 +602,18 @@ mod tests { assert_eq!(client.info().unwrap().chain.best_number, 1); } - fn sign_tx(tx: Transfer) -> Extrinsic { - let signature = Keyring::from_raw_public(tx.from.0.clone()).unwrap().sign(&tx.encode()).into(); - Extrinsic { transfer: tx, signature } - } - #[test] fn block_builder_works_with_transactions() { let client = test_client::new(); let mut builder = client.new_block().unwrap(); - builder.push(sign_tx(Transfer { + builder.push_transfer(Transfer { from: Keyring::Alice.to_raw_public().into(), to: Keyring::Ferdie.to_raw_public().into(), amount: 42, nonce: 0, - })).unwrap(); + }).unwrap(); client.justify_and_import(BlockOrigin::Own, builder.bake().unwrap()).unwrap(); @@ -646,19 +641,19 @@ mod tests { let mut builder = client.new_block().unwrap(); - builder.push(sign_tx(Transfer { + builder.push_transfer(Transfer { from: Keyring::Alice.to_raw_public().into(), to: Keyring::Ferdie.to_raw_public().into(), amount: 42, nonce: 0, - })).unwrap(); + }).unwrap(); - assert!(builder.push(sign_tx(Transfer { + assert!(builder.push_transfer(Transfer { from: Keyring::Eve.to_raw_public().into(), to: Keyring::Alice.to_raw_public().into(), amount: 42, nonce: 0, - })).is_err()); + }).is_err()); client.justify_and_import(BlockOrigin::Own, builder.bake().unwrap()).unwrap(); diff --git a/substrate/client/src/notifications.rs b/substrate/client/src/notifications.rs index a64ee0f4dd6ac..390513d37a8f1 100644 --- a/substrate/client/src/notifications.rs +++ b/substrate/client/src/notifications.rs @@ -103,6 +103,11 @@ impl StorageNotifications { } } + // Don't send empty notifications + if changes.is_empty() { + return; + } + let changes = Arc::new(changes); // Trigger the events for subscriber in subscribers { @@ -264,4 +269,21 @@ mod tests { assert_eq!(notifications.listeners.len(), 0); assert_eq!(notifications.wildcard_listeners.len(), 0); } + + #[test] + fn should_not_send_empty_notifications() { + // given + let mut recv = { + let mut notifications = StorageNotifications::::default(); + let recv = notifications.listen(None).wait(); + + // when + let changeset = vec![]; + notifications.trigger(&1.into(), changeset.into_iter()); + recv + }; + + // then + assert_eq!(recv.next(), None); + } } diff --git a/substrate/rpc/Cargo.toml b/substrate/rpc/Cargo.toml index fb73d9ef653f7..9f257364928ea 100644 --- a/substrate/rpc/Cargo.toml +++ b/substrate/rpc/Cargo.toml @@ -22,3 +22,4 @@ tokio = "0.1.7" [dev-dependencies] assert_matches = "1.1" substrate-test-client = { path = "../test-client" } +rustc-hex = "2.0" diff --git a/substrate/rpc/src/chain/error.rs b/substrate/rpc/src/chain/error.rs index 59035a030b14c..2c42e8c98956f 100644 --- a/substrate/rpc/src/chain/error.rs +++ b/substrate/rpc/src/chain/error.rs @@ -14,11 +14,15 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . +use client; use rpc; use errors; error_chain! { + links { + Client(client::error::Error, client::error::ErrorKind) #[doc = "Client error"]; + } errors { /// Not implemented yet Unimplemented { diff --git a/substrate/rpc/src/chain/mod.rs b/substrate/rpc/src/chain/mod.rs index a7a8e8c0e046a..3c0491c4226e9 100644 --- a/substrate/rpc/src/chain/mod.rs +++ b/substrate/rpc/src/chain/mod.rs @@ -22,7 +22,7 @@ use client::{self, Client, BlockchainEvents}; use jsonrpc_macros::pubsub; use jsonrpc_pubsub::SubscriptionId; use rpc::Result as RpcResult; -use rpc::futures::{Future, Sink, Stream}; +use rpc::futures::{stream, Future, Sink, Stream}; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::Block as BlockT; use tokio::runtime::TaskExecutor; @@ -33,7 +33,7 @@ mod error; #[cfg(test)] mod tests; -use self::error::{Result, ResultExt}; +use self::error::Result; build_rpc_trait! { /// Polkadot blockchain API @@ -86,22 +86,35 @@ impl ChainApi for Chain wh type Metadata = ::metadata::Metadata; fn header(&self, hash: Block::Hash) -> Result> { - self.client.header(&BlockId::Hash(hash)).chain_err(|| "Blockchain error") + Ok(self.client.header(&BlockId::Hash(hash))?) } fn head(&self) -> Result { - Ok(self.client.info().chain_err(|| "Blockchain error")?.chain.best_hash) + Ok(self.client.info()?.chain.best_hash) } fn subscribe_new_head(&self, _metadata: Self::Metadata, subscriber: pubsub::Subscriber) { self.subscriptions.add(subscriber, |sink| { + // send current head right at the start. + let header = self.head() + .and_then(|hash| self.header(hash)) + .and_then(|header| { + header.ok_or_else(|| self::error::ErrorKind::Unimplemented.into()) + }) + .map_err(Into::into); + + // send further subscriptions let stream = self.client.import_notification_stream() .filter(|notification| notification.is_new_best) .map(|notification| Ok(notification.header)) .map_err(|e| warn!("Block notification stream error: {:?}", e)); + sink .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) - .send_all(stream) + .send_all( + stream::iter_result(vec![Ok(header)]) + .chain(stream) + ) // we ignore the resulting Stream (if the first stream is over we are unsubscribed) .map(|_| ()) }); diff --git a/substrate/rpc/src/chain/tests.rs b/substrate/rpc/src/chain/tests.rs index 0ac2506292e8c..1f0a3a9d0108c 100644 --- a/substrate/rpc/src/chain/tests.rs +++ b/substrate/rpc/src/chain/tests.rs @@ -67,9 +67,12 @@ fn should_notify_about_latest_block() { api.client.justify_and_import(BlockOrigin::Own, builder.bake().unwrap()).unwrap(); } - // assert notification send to transport + // assert initial head sent. let (notification, next) = core.block_on(transport.into_future()).unwrap(); assert!(notification.is_some()); + // assert notification sent to transport + let (notification, next) = core.block_on(next.into_future()).unwrap(); + assert!(notification.is_some()); // no more notifications on this channel assert_eq!(core.block_on(next.into_future()).unwrap().0, None); } diff --git a/substrate/rpc/src/lib.rs b/substrate/rpc/src/lib.rs index 5dcc9d337aab3..2cd0023bb380a 100644 --- a/substrate/rpc/src/lib.rs +++ b/substrate/rpc/src/lib.rs @@ -41,6 +41,8 @@ extern crate log; extern crate assert_matches; #[cfg(test)] extern crate substrate_test_client as test_client; +#[cfg(test)] +extern crate rustc_hex; mod errors; mod subscriptions; diff --git a/substrate/rpc/src/state/mod.rs b/substrate/rpc/src/state/mod.rs index 6cf6907712f97..4fd08678272a0 100644 --- a/substrate/rpc/src/state/mod.rs +++ b/substrate/rpc/src/state/mod.rs @@ -25,7 +25,7 @@ use jsonrpc_pubsub::SubscriptionId; use primitives::hexdisplay::HexDisplay; use primitives::storage::{StorageKey, StorageData, StorageChangeSet}; use rpc::Result as RpcResult; -use rpc::futures::{Future, Sink, Stream}; +use rpc::futures::{stream, Future, Sink, Stream}; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::Block as BlockT; use tokio::runtime::TaskExecutor; @@ -141,6 +141,7 @@ impl StateApi for State where fn storage(&self, key: StorageKey) -> Result { self.storage_at(key, self.client.info()?.chain.best_hash) + } fn call(&self, method: String, data: Vec) -> Result> { @@ -162,6 +163,20 @@ impl StateApi for State where }, }; + // initial values + let initial = stream::iter_result(keys + .map(|keys| { + let changes = keys + .into_iter() + .map(|key| self.storage(key.clone()) + .map(|val| (key.clone(), Some(val))) + .unwrap_or_else(|_| (key, None)) + ) + .collect(); + let block = self.client.info().map(|info| info.chain.best_hash).unwrap_or_default(); + vec![Ok(Ok(StorageChangeSet { block, changes }))] + }).unwrap_or_default()); + self.subscriptions.add(subscriber, |sink| { let stream = stream .map_err(|e| warn!("Error creating storage notification stream: {:?}", e)) @@ -169,9 +184,10 @@ impl StateApi for State where block, changes: changes.iter().cloned().collect(), })); + sink .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) - .send_all(stream) + .send_all(initial.chain(stream)) // we ignore the resulting Stream (if the first stream is over we are unsubscribed) .map(|_| ()) }) diff --git a/substrate/rpc/src/state/tests.rs b/substrate/rpc/src/state/tests.rs index 37d86cb9266b9..774a16603d1b0 100644 --- a/substrate/rpc/src/state/tests.rs +++ b/substrate/rpc/src/state/tests.rs @@ -16,9 +16,12 @@ use super::*; use self::error::{Error, ErrorKind}; -use jsonrpc_macros::pubsub; + use client::BlockOrigin; -use test_client::{self, TestClient}; +use jsonrpc_macros::pubsub; +use rustc_hex::FromHex; +use test_client::{self, runtime, keyring::Keyring, TestClient, BlockBuilderExt}; + #[test] fn should_return_storage() { @@ -63,13 +66,58 @@ fn should_notify_about_storage_changes() { // assert id assigned assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(0)))); - let builder = api.client.new_block().unwrap(); + let mut builder = api.client.new_block().unwrap(); + builder.push_transfer(runtime::Transfer { + from: Keyring::Alice.to_raw_public().into(), + to: Keyring::Ferdie.to_raw_public().into(), + amount: 42, + nonce: 0, + }).unwrap(); api.client.justify_and_import(BlockOrigin::Own, builder.bake().unwrap()).unwrap(); } - // assert notification send to transport + // assert notification sent to transport let (notification, next) = core.block_on(transport.into_future()).unwrap(); assert!(notification.is_some()); // no more notifications on this channel assert_eq!(core.block_on(next.into_future()).unwrap().0, None); } + +#[test] +fn should_send_initial_storage_changes_and_notifications() { + let mut core = ::tokio::runtime::Runtime::new().unwrap(); + let remote = core.executor(); + let (subscriber, id, transport) = pubsub::Subscriber::new_test("test"); + + { + let api = State { + client: Arc::new(test_client::new()), + subscriptions: Subscriptions::new(remote), + }; + + api.subscribe_storage(Default::default(), subscriber, Some(vec![ + StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()), + ]).into()); + + // assert id assigned + assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(0)))); + + let mut builder = api.client.new_block().unwrap(); + builder.push_transfer(runtime::Transfer { + from: Keyring::Alice.to_raw_public().into(), + to: Keyring::Ferdie.to_raw_public().into(), + amount: 42, + nonce: 0, + }).unwrap(); + api.client.justify_and_import(BlockOrigin::Own, builder.bake().unwrap()).unwrap(); + } + + // assert initial values sent to transport + let (notification, next) = core.block_on(transport.into_future()).unwrap(); + assert!(notification.is_some()); + // assert notification sent to transport + let (notification, next) = core.block_on(next.into_future()).unwrap(); + assert!(notification.is_some()); + // no more notifications on this channel + assert_eq!(core.block_on(next.into_future()).unwrap().0, None); +} diff --git a/substrate/test-client/src/block_builder_ext.rs b/substrate/test-client/src/block_builder_ext.rs new file mode 100644 index 0000000000000..17fdbeba0a537 --- /dev/null +++ b/substrate/test-client/src/block_builder_ext.rs @@ -0,0 +1,41 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Block Builder extensions for tests. + +use codec; +use client; +use keyring; +use runtime; + +use {Backend, Executor}; + +/// Extension trait for test block builder. +pub trait BlockBuilderExt { + /// Add transfer extrinsic to the block. + fn push_transfer(&mut self, transfer: runtime::Transfer) -> Result<(), client::error::Error>; +} + +impl BlockBuilderExt for client::block_builder::BlockBuilder { + fn push_transfer(&mut self, transfer: runtime::Transfer) -> Result<(), client::error::Error> { + self.push(sign_tx(transfer)) + } +} + +fn sign_tx(transfer: runtime::Transfer) -> runtime::Extrinsic { + let signature = keyring::Keyring::from_raw_public(transfer.from.0.clone()).unwrap().sign(&codec::Encode::encode(&transfer)).into(); + runtime::Extrinsic { transfer, signature } +} diff --git a/substrate/test-client/src/lib.rs b/substrate/test-client/src/lib.rs index d50b49b3c7101..a0709210ec843 100644 --- a/substrate/test-client/src/lib.rs +++ b/substrate/test-client/src/lib.rs @@ -21,18 +21,20 @@ extern crate rhododendron; extern crate substrate_bft as bft; extern crate substrate_codec as codec; -extern crate substrate_keyring as keyring; extern crate substrate_primitives as primitives; extern crate substrate_runtime_support as runtime_support; extern crate substrate_runtime_primitives as runtime_primitives; #[macro_use] extern crate substrate_executor as executor; -pub extern crate substrate_test_runtime as runtime; pub extern crate substrate_client as client; +pub extern crate substrate_keyring as keyring; +pub extern crate substrate_test_runtime as runtime; mod client_ext; +mod block_builder_ext; pub use client_ext::TestClient; +pub use block_builder_ext::BlockBuilderExt; mod local_executor { #![allow(missing_docs)]