diff --git a/substrate/bft/src/lib.rs b/substrate/bft/src/lib.rs index 28c26fe21ae98..fcdc36d480315 100644 --- a/substrate/bft/src/lib.rs +++ b/substrate/bft/src/lib.rs @@ -189,7 +189,7 @@ pub trait Proposer { /// Block import trait. pub trait BlockImport { /// Import a block alongside its corresponding justification. - fn import_block(&self, block: B, justification: Justification); + fn import_block(&self, block: B, justification: Justification, authorities: &[AuthorityId]); } /// Trait for getting the authorities at a given block. @@ -308,7 +308,8 @@ impl Future for BftFuture for FakeClient { - fn import_block(&self, block: TestBlock, _justification: Justification) { + fn import_block(&self, block: TestBlock, _justification: Justification, _authorities: &[AuthorityId]) { assert!(self.imported_heights.lock().insert(block.header.number)) } } diff --git a/substrate/client/db/src/cache.rs b/substrate/client/db/src/cache.rs new file mode 100644 index 0000000000000..c808027d9fb39 --- /dev/null +++ b/substrate/client/db/src/cache.rs @@ -0,0 +1,447 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! DB-backed cache of blockchain data. + +use std::sync::Arc; +use parking_lot::RwLock; + +use kvdb::{KeyValueDB, DBTransaction}; + +use client::blockchain::Cache as BlockchainCache; +use client::error::Result as ClientResult; +use codec::{Codec, Encode, Decode, Input, Output}; +use primitives::AuthorityId; +use runtime_primitives::generic::BlockId; +use runtime_primitives::traits::{Block as BlockT, As, NumberFor}; +use utils::{COLUMN_META, BlockKey, db_err, meta_keys, read_id, db_key_to_number, number_to_db_key}; + +/// Database-backed cache of blockchain data. +pub struct DbCache { + db: Arc, + block_index_column: Option, + authorities_at: DbCacheList>, +} + +impl DbCache + where + Block: BlockT, + NumberFor: As, +{ + /// Create new cache. + pub fn new( + db: Arc, + block_index_column: Option, + authorities_column: Option + ) -> ClientResult { + Ok(DbCache { + db: db.clone(), + block_index_column, + authorities_at: DbCacheList::new(db, meta_keys::BEST_AUTHORITIES, authorities_column)?, + }) + } + + /// Get authorities_cache. + pub fn authorities_at_cache(&self) -> &DbCacheList> { + &self.authorities_at + } +} + +impl BlockchainCache for DbCache + where + Block: BlockT, + NumberFor: As, +{ + fn authorities_at(&self, at: BlockId) -> Option> { + let authorities_at = read_id(&*self.db, self.block_index_column, at).and_then(|at| match at { + Some(at) => self.authorities_at.value_at_key(at), + None => Ok(None), + }); + + match authorities_at { + Ok(authorities) => authorities, + Err(error) => { + warn!("Trying to read authorities from db cache has failed with: {}", error); + None + }, + } + } +} + +/// Database-backed blockchain cache which holds its entries as a list. +/// The meta column holds the pointer to the best known cache entry and +/// every entry points to the previous entry. +/// New entry appears when the set of authorities changes in block, so the +/// best entry here means the entry that is valid for the best block (and +/// probably for its ascendants). +pub struct DbCacheList { + db: Arc, + meta_key: &'static [u8], + column: Option, + /// Best entry at the moment. None means that cache has no entries at all. + best_entry: RwLock, T>>>, +} + +/// Single cache entry. +#[derive(Clone)] +#[cfg_attr(test, derive(Debug, PartialEq))] +pub struct Entry { + /// first block, when this value became actual + valid_from: N, + /// None means that we do not know the value starting from `valid_from` block + value: Option, +} + +/// Internal representation of the single cache entry. The entry points to the +/// previous entry in the cache, allowing us to traverse back in time in list-style. +#[cfg_attr(test, derive(Debug, PartialEq))] +struct StorageEntry { + /// None if valid from the beginning + prev_valid_from: Option, + /// None means that we do not know the value starting from `valid_from` block + value: Option, +} + +impl DbCacheList + where + Block: BlockT, + NumberFor: As, + T: Clone + PartialEq + Codec, +{ + /// Creates new cache list. + fn new(db: Arc, meta_key: &'static [u8], column: Option) -> ClientResult { + let best_entry = RwLock::new(db.get(COLUMN_META, meta_key) + .map_err(db_err) + .and_then(|block| match block { + Some(block) => { + let valid_from = db_key_to_number(&block)?; + read_storage_entry::(&*db, column, valid_from) + .map(|entry| Some(Entry { + valid_from, + value: entry + .expect("meta entry references the entry at the block; storage entry at block exists when referenced; qed") + .value, + })) + }, + None => Ok(None), + })?); + + Ok(DbCacheList { + db, + column, + meta_key, + best_entry, + }) + } + + /// Gets the best known entry. + pub fn best_entry(&self) -> Option, T>> { + self.best_entry.read().clone() + } + + /// Commits the new best pending value to the database. Returns Some if best entry must + /// be updated after transaction is committed. + pub fn commit_best_entry( + &self, + transaction: &mut DBTransaction, + valid_from: NumberFor, + pending_value: Option + ) -> Option, T>> { + let best_entry = self.best_entry(); + let update_best_entry = match ( + best_entry.as_ref().and_then(|a| a.value.as_ref()), + pending_value.as_ref() + ) { + (Some(best_value), Some(pending_value)) => best_value != pending_value, + (None, Some(_)) | (Some(_), None) => true, + (None, None) => false, + }; + if !update_best_entry { + return None; + } + + let valid_from_key = number_to_db_key(valid_from); + transaction.put(COLUMN_META, self.meta_key, &valid_from_key); + transaction.put(self.column, &valid_from_key, &StorageEntry { + prev_valid_from: best_entry.map(|b| b.valid_from), + value: pending_value.clone(), + }.encode()); + + Some(Entry { + valid_from, + value: pending_value, + }) + } + + /// Updates the best in-memory cache entry. Must be called after transaction with changes + /// from commit_best_entry has been committed. + pub fn update_best_entry(&self, best_entry: Option, T>>) { + *self.best_entry.write() = best_entry; + } + + /// Prune all entries from the beginning up to the block (including entry at the number). Returns + /// the number of pruned entries. Pruning never deletes the latest entry in the cache. + pub fn prune_entries( + &self, + transaction: &mut DBTransaction, + last_to_prune: NumberFor + ) -> ClientResult { + // find the last entry we want to keep + let mut last_entry_to_keep = match self.best_entry() { + Some(best_entry) => best_entry.valid_from, + None => return Ok(0), + }; + let mut first_entry_to_remove = last_entry_to_keep; + while first_entry_to_remove > last_to_prune { + last_entry_to_keep = first_entry_to_remove; + + let entry = read_storage_entry::(&*self.db, self.column, first_entry_to_remove)? + .expect("entry referenced from the next entry; entry exists when referenced; qed"); + // if we have reached the first list entry + // AND all list entries are for blocks that are later than last_to_prune + // => nothing to prune + first_entry_to_remove = match entry.prev_valid_from { + Some(prev_valid_from) => prev_valid_from, + None => return Ok(0), + } + } + + // remove all entries, starting from entry_to_remove + let mut pruned = 0; + let mut entry_to_remove = Some(first_entry_to_remove); + while let Some(current_entry) = entry_to_remove { + let entry = read_storage_entry::(&*self.db, self.column, current_entry)? + .expect("referenced entry exists; entry_to_remove is a reference to the entry; qed"); + + if current_entry != last_entry_to_keep { + transaction.delete(self.column, &number_to_db_key(current_entry)); + pruned += 1; + } + entry_to_remove = entry.prev_valid_from; + } + + let mut entry = read_storage_entry::(&*self.db, self.column, last_entry_to_keep)? + .expect("last_entry_to_keep >= first_entry_to_remove; that means that we're leaving this entry in the db; qed"); + entry.prev_valid_from = None; + transaction.put(self.column, &number_to_db_key(last_entry_to_keep), &entry.encode()); + + Ok(pruned) + } + + /// Reads the cached value, actual at given block. Returns None if the value was not cached + /// or if it has been pruned. + fn value_at_key(&self, key: BlockKey) -> ClientResult> { + let at = db_key_to_number::>(&key)?; + let best_valid_from = match self.best_entry() { + // there are entries in cache + Some(best_entry) => { + // we're looking for the best value + if at >= best_entry.valid_from { + return Ok(best_entry.value); + } + + // we're looking for the value of older blocks + best_entry.valid_from + }, + // there are no entries in the cache + None => return Ok(None), + }; + + let mut entry = read_storage_entry::(&*self.db, self.column, best_valid_from)? + .expect("self.best_entry().is_some() if there's entry for best_valid_from; qed"); + loop { + let prev_valid_from = match entry.prev_valid_from { + Some(prev_valid_from) => prev_valid_from, + None => return Ok(None), + }; + + let prev_entry = read_storage_entry::(&*self.db, self.column, prev_valid_from)? + .expect("entry referenced from the next entry; entry exists when referenced; qed"); + if at >= prev_valid_from { + return Ok(prev_entry.value); + } + + entry = prev_entry; + } + } +} + +/// Reads the entry at the block with given number. +fn read_storage_entry( + db: &KeyValueDB, + column: Option, + number: NumberFor +) -> ClientResult, T>>> + where + Block: BlockT, + NumberFor: As, + T: Codec, +{ + db.get(column, &number_to_db_key(number)) + .and_then(|entry| match entry { + Some(entry) => Ok(StorageEntry::, T>::decode(&mut &entry[..])), + None => Ok(None), + }) + .map_err(db_err) +} + +impl Encode for StorageEntry { + fn encode_to(&self, dest: &mut O) { + dest.push(&self.prev_valid_from); + dest.push(&self.value); + } +} + +impl Decode for StorageEntry { + fn decode(input: &mut I) -> Option { + Some(StorageEntry { + prev_valid_from: Decode::decode(input)?, + value: Decode::decode(input)?, + }) + } +} + +#[cfg(test)] +mod tests { + use runtime_primitives::testing::Block as RawBlock; + use light::{AUTHORITIES_ENTRIES_TO_KEEP, columns, LightStorage}; + use light::tests::insert_block; + use super::*; + + type Block = RawBlock; + + #[test] + fn authorities_storage_entry_serialized() { + let test_cases: Vec>> = vec![ + StorageEntry { prev_valid_from: Some(42), value: Some(vec![[1u8; 32].into()]) }, + StorageEntry { prev_valid_from: None, value: Some(vec![[1u8; 32].into(), [2u8; 32].into()]) }, + StorageEntry { prev_valid_from: None, value: None }, + ]; + + for expected in test_cases { + let serialized = expected.encode(); + let deserialized = StorageEntry::decode(&mut &serialized[..]).unwrap(); + assert_eq!(expected, deserialized); + } + } + + #[test] + fn best_authorities_are_updated() { + let db = LightStorage::new_test(); + let authorities_at: Vec<(usize, Option>>)> = vec![ + (0, None), + (0, None), + (1, Some(Entry { valid_from: 1, value: Some(vec![[2u8; 32].into()]) })), + (1, Some(Entry { valid_from: 1, value: Some(vec![[2u8; 32].into()]) })), + (2, Some(Entry { valid_from: 3, value: Some(vec![[4u8; 32].into()]) })), + (2, Some(Entry { valid_from: 3, value: Some(vec![[4u8; 32].into()]) })), + (3, Some(Entry { valid_from: 5, value: None })), + (3, Some(Entry { valid_from: 5, value: None })), + ]; + + // before any block, there are no entries in cache + assert!(db.cache().authorities_at_cache().best_entry().is_none()); + assert_eq!(db.db().iter(columns::AUTHORITIES).count(), 0); + + // insert blocks and check that best_authorities() returns correct result + let mut prev_hash = Default::default(); + for number in 0..authorities_at.len() { + let authorities_at_number = authorities_at[number].1.clone().and_then(|e| e.value); + prev_hash = insert_block(&db, &prev_hash, number as u64, authorities_at_number); + assert_eq!(db.cache().authorities_at_cache().best_entry(), authorities_at[number].1); + assert_eq!(db.db().iter(columns::AUTHORITIES).count(), authorities_at[number].0); + } + + // check that authorities_at() returns correct results for all retrospective blocks + for number in 1..authorities_at.len() + 1 { + assert_eq!(db.cache().authorities_at(BlockId::Number(number as u64)), + authorities_at.get(number + 1) + .or_else(|| authorities_at.last()) + .unwrap().1.clone().and_then(|e| e.value)); + } + + // now check that cache entries are pruned when new blocks are inserted + let mut current_entries_count = authorities_at.last().unwrap().0; + let pruning_starts_at = AUTHORITIES_ENTRIES_TO_KEEP as usize; + for number in authorities_at.len()..authorities_at.len() + pruning_starts_at { + prev_hash = insert_block(&db, &prev_hash, number as u64, None); + if number > pruning_starts_at { + let prev_entries_count = authorities_at[number - pruning_starts_at].0; + let entries_count = authorities_at.get(number - pruning_starts_at + 1).map(|e| e.0) + .unwrap_or_else(|| authorities_at.last().unwrap().0); + current_entries_count -= entries_count - prev_entries_count; + } + + // there's always at least 1 entry in the cache (after first insertion) + assert_eq!(db.db().iter(columns::AUTHORITIES).count(), ::std::cmp::max(current_entries_count, 1)); + } + } + + #[test] + fn best_authorities_are_pruned() { + let db = LightStorage::::new_test(); + let mut transaction = DBTransaction::new(); + + // insert first entry at block#100 + db.cache().authorities_at_cache().update_best_entry( + db.cache().authorities_at_cache().commit_best_entry(&mut transaction, 100, Some(vec![[1u8; 32].into()]))); + db.db().write(transaction).unwrap(); + + // no entries are pruned, since there's only one entry in the cache + let mut transaction = DBTransaction::new(); + assert_eq!(db.cache().authorities_at_cache().prune_entries(&mut transaction, 50).unwrap(), 0); + assert_eq!(db.cache().authorities_at_cache().prune_entries(&mut transaction, 100).unwrap(), 0); + assert_eq!(db.cache().authorities_at_cache().prune_entries(&mut transaction, 150).unwrap(), 0); + + // insert second entry at block#200 + let mut transaction = DBTransaction::new(); + db.cache().authorities_at_cache().update_best_entry( + db.cache().authorities_at_cache().commit_best_entry(&mut transaction, 200, Some(vec![[2u8; 32].into()]))); + db.db().write(transaction).unwrap(); + + let mut transaction = DBTransaction::new(); + assert_eq!(db.cache().authorities_at_cache().prune_entries(&mut transaction, 50).unwrap(), 0); + assert_eq!(db.cache().authorities_at_cache().prune_entries(&mut transaction, 100).unwrap(), 1); + assert_eq!(db.cache().authorities_at_cache().prune_entries(&mut transaction, 150).unwrap(), 1); + // still only 1 entry is removed since pruning never deletes the last entry + assert_eq!(db.cache().authorities_at_cache().prune_entries(&mut transaction, 200).unwrap(), 1); + assert_eq!(db.cache().authorities_at_cache().prune_entries(&mut transaction, 250).unwrap(), 1); + + // physically remove entry for block#100 from db + let mut transaction = DBTransaction::new(); + assert_eq!(db.cache().authorities_at_cache().prune_entries(&mut transaction, 150).unwrap(), 1); + db.db().write(transaction).unwrap(); + + assert_eq!(db.cache().authorities_at_cache().best_entry().unwrap().value, Some(vec![[2u8; 32].into()])); + assert_eq!(db.cache().authorities_at(BlockId::Number(50)), None); + assert_eq!(db.cache().authorities_at(BlockId::Number(100)), None); + assert_eq!(db.cache().authorities_at(BlockId::Number(150)), None); + assert_eq!(db.cache().authorities_at(BlockId::Number(200)), Some(vec![[2u8; 32].into()])); + assert_eq!(db.cache().authorities_at(BlockId::Number(250)), Some(vec![[2u8; 32].into()])); + + // try to delete last entry => failure (no entries are removed) + let mut transaction = DBTransaction::new(); + assert_eq!(db.cache().authorities_at_cache().prune_entries(&mut transaction, 300).unwrap(), 0); + db.db().write(transaction).unwrap(); + + assert_eq!(db.cache().authorities_at_cache().best_entry().unwrap().value, Some(vec![[2u8; 32].into()])); + assert_eq!(db.cache().authorities_at(BlockId::Number(50)), None); + assert_eq!(db.cache().authorities_at(BlockId::Number(100)), None); + assert_eq!(db.cache().authorities_at(BlockId::Number(150)), None); + assert_eq!(db.cache().authorities_at(BlockId::Number(200)), Some(vec![[2u8; 32].into()])); + assert_eq!(db.cache().authorities_at(BlockId::Number(250)), Some(vec![[2u8; 32].into()])); + } +} diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs index 741cd5cc9408f..b297341fa19f4 100644 --- a/substrate/client/db/src/lib.rs +++ b/substrate/client/db/src/lib.rs @@ -38,6 +38,7 @@ extern crate kvdb_memorydb; pub mod light; +mod cache; mod utils; use std::sync::Arc; @@ -47,7 +48,7 @@ use codec::{Decode, Encode}; use kvdb::{KeyValueDB, DBTransaction}; use memorydb::MemoryDB; use parking_lot::RwLock; -use primitives::H256; +use primitives::{H256, AuthorityId}; use runtime_primitives::generic::BlockId; use runtime_primitives::bft::Justification; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, Hash, HashFor, NumberFor, Zero}; @@ -204,6 +205,10 @@ impl client::blockchain::Backend for BlockchainDb { None => Ok(None), } } + + fn cache(&self) -> Option<&client::blockchain::Cache> { + None + } } /// Database transaction @@ -231,6 +236,10 @@ impl client::backend::BlockImportOperation for BlockImport Ok(()) } + fn update_authorities(&mut self, _authorities: Vec) { + // currently authorities are not cached on full nodes + } + fn update_storage(&mut self, update: MemoryDB) -> Result<(), client::error::Error> { self.updates = update; Ok(()) diff --git a/substrate/client/db/src/light.rs b/substrate/client/db/src/light.rs index 9ae3cefe61f18..a7e74a75711e3 100644 --- a/substrate/client/db/src/light.rs +++ b/substrate/client/db/src/light.rs @@ -21,14 +21,15 @@ use parking_lot::RwLock; use kvdb::{KeyValueDB, DBTransaction}; -use client::blockchain::{BlockStatus, HeaderBackend as BlockchainHeaderBackend, - Info as BlockchainInfo}; +use client::blockchain::{BlockStatus, Cache as BlockchainCache, + HeaderBackend as BlockchainHeaderBackend, Info as BlockchainInfo}; use client::error::{ErrorKind as ClientErrorKind, Result as ClientResult}; use client::light::blockchain::Storage as LightBlockchainStorage; use codec::{Decode, Encode}; use primitives::AuthorityId; use runtime_primitives::generic::BlockId; -use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor, Zero}; +use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor, Zero, As}; +use cache::DbCache; use utils::{meta_keys, Meta, db_err, number_to_db_key, open_database, read_db, read_id, read_meta}; use DatabaseSettings; @@ -36,12 +37,17 @@ pub(crate) mod columns { pub const META: Option = ::utils::COLUMN_META; pub const BLOCK_INDEX: Option = Some(1); pub const HEADER: Option = Some(2); + pub const AUTHORITIES: Option = Some(3); } +/// Keep authorities for last 'AUTHORITIES_ENTRIES_TO_KEEP' blocks. +pub(crate) const AUTHORITIES_ENTRIES_TO_KEEP: u64 = 2048; + /// Light blockchain storage. Stores most recent headers + CHTs for older headers. pub struct LightStorage { db: Arc, meta: RwLock::Header as HeaderT>::Number, Block::Hash>>, + cache: DbCache, } #[derive(Clone, PartialEq, Debug)] @@ -73,14 +79,26 @@ impl LightStorage } fn from_kvdb(db: Arc) -> ClientResult { + let cache = DbCache::new(db.clone(), columns::BLOCK_INDEX, columns::AUTHORITIES)?; let meta = RwLock::new(read_meta::(&*db, columns::HEADER)?); Ok(LightStorage { db, meta, + cache, }) } + #[cfg(test)] + pub(crate) fn db(&self) -> &Arc { + &self.db + } + + #[cfg(test)] + pub(crate) fn cache(&self) -> &DbCache { + &self.cache + } + fn update_meta(&self, hash: Block::Hash, number: <::Header as HeaderT>::Number, is_best: bool) { if is_best { let mut meta = self.meta.write(); @@ -139,7 +157,7 @@ impl LightBlockchainStorage for LightStorage where Block: BlockT, { - fn import_header(&self, is_new_best: bool, header: Block::Header) -> ClientResult<()> { + fn import_header(&self, is_new_best: bool, header: Block::Header, authorities: Option>) -> ClientResult<()> { let mut transaction = DBTransaction::new(); let hash = header.hash(); @@ -149,16 +167,39 @@ impl LightBlockchainStorage for LightStorage transaction.put(columns::HEADER, &key, &header.encode()); transaction.put(columns::BLOCK_INDEX, hash.as_ref(), &key); - if is_new_best { + let best_authorities = if is_new_best { transaction.put(columns::META, meta_keys::BEST_BLOCK, &key); - } + + // cache authorities for previous block + let number: u64 = number.as_(); + let previous_number = number.checked_sub(1); + let best_authorities = previous_number + .and_then(|previous_number| self.cache.authorities_at_cache() + .commit_best_entry(&mut transaction, As::sa(previous_number), authorities)); + + // prune authorities from 'ancient' blocks + if let Some(ancient_number) = number.checked_sub(AUTHORITIES_ENTRIES_TO_KEEP) { + self.cache.authorities_at_cache().prune_entries(&mut transaction, As::sa(ancient_number))?; + } + + best_authorities + } else { + None + }; debug!("Light DB Commit {:?} ({})", hash, number); self.db.write(transaction).map_err(db_err)?; self.update_meta(hash, number, is_new_best); + if let Some(best_authorities) = best_authorities { + self.cache.authorities_at_cache().update_best_entry(Some(best_authorities)); + } Ok(()) } + + fn cache(&self) -> Option<&BlockchainCache> { + Some(&self.cache) + } } #[cfg(test)] @@ -168,7 +209,12 @@ pub(crate) mod tests { type Block = RawBlock; - pub fn insert_block(db: &LightStorage, parent: &Hash, number: u32) -> Hash { + pub fn insert_block( + db: &LightStorage, + parent: &Hash, + number: u64, + authorities: Option> + ) -> Hash { let header = Header { number: number.into(), parent_hash: *parent, @@ -178,14 +224,14 @@ pub(crate) mod tests { }; let hash = header.hash(); - db.import_header(true, header).unwrap(); + db.import_header(true, header, authorities).unwrap(); hash } #[test] fn returns_known_header() { let db = LightStorage::new_test(); - let known_hash = insert_block(&db, &Default::default(), 0); + let known_hash = insert_block(&db, &Default::default(), 0, None); let header_by_hash = db.header(BlockId::Hash(known_hash)).unwrap().unwrap(); let header_by_number = db.header(BlockId::Number(0)).unwrap().unwrap(); assert_eq!(header_by_hash, header_by_number); @@ -201,12 +247,12 @@ pub(crate) mod tests { #[test] fn returns_info() { let db = LightStorage::new_test(); - let genesis_hash = insert_block(&db, &Default::default(), 0); + let genesis_hash = insert_block(&db, &Default::default(), 0, None); let info = db.info().unwrap(); assert_eq!(info.best_hash, genesis_hash); assert_eq!(info.best_number, 0); assert_eq!(info.genesis_hash, genesis_hash); - let best_hash = insert_block(&db, &genesis_hash, 1); + let best_hash = insert_block(&db, &genesis_hash, 1, None); let info = db.info().unwrap(); assert_eq!(info.best_hash, best_hash); assert_eq!(info.best_number, 1); @@ -216,7 +262,7 @@ pub(crate) mod tests { #[test] fn returns_block_status() { let db = LightStorage::new_test(); - let genesis_hash = insert_block(&db, &Default::default(), 0); + let genesis_hash = insert_block(&db, &Default::default(), 0, None); assert_eq!(db.status(BlockId::Hash(genesis_hash)).unwrap(), BlockStatus::InChain); assert_eq!(db.status(BlockId::Number(0)).unwrap(), BlockStatus::InChain); assert_eq!(db.status(BlockId::Hash(1.into())).unwrap(), BlockStatus::Unknown); @@ -226,7 +272,7 @@ pub(crate) mod tests { #[test] fn returns_block_hash() { let db = LightStorage::new_test(); - let genesis_hash = insert_block(&db, &Default::default(), 0); + let genesis_hash = insert_block(&db, &Default::default(), 0, None); assert_eq!(db.hash(0).unwrap(), Some(genesis_hash)); assert_eq!(db.hash(1).unwrap(), None); } @@ -235,11 +281,11 @@ pub(crate) mod tests { fn import_header_works() { let db = LightStorage::new_test(); - let genesis_hash = insert_block(&db, &Default::default(), 0); + let genesis_hash = insert_block(&db, &Default::default(), 0, None); assert_eq!(db.db.iter(columns::HEADER).count(), 1); assert_eq!(db.db.iter(columns::BLOCK_INDEX).count(), 1); - let _ = insert_block(&db, &genesis_hash, 1); + let _ = insert_block(&db, &genesis_hash, 1, None); assert_eq!(db.db.iter(columns::HEADER).count(), 2); assert_eq!(db.db.iter(columns::BLOCK_INDEX).count(), 2); } diff --git a/substrate/client/db/src/utils.rs b/substrate/client/db/src/utils.rs index 46d2ec09739f9..75c86ae06e151 100644 --- a/substrate/client/db/src/utils.rs +++ b/substrate/client/db/src/utils.rs @@ -41,6 +41,8 @@ pub mod meta_keys { pub const TYPE: &[u8; 4] = b"type"; /// Best block key. pub const BEST_BLOCK: &[u8; 4] = b"best"; + /// Best authorities block key. + pub const BEST_AUTHORITIES: &[u8; 4] = b"auth"; } /// Database metadata. @@ -69,6 +71,17 @@ pub fn number_to_db_key(n: N) -> BlockKey where N: As { ] } +/// Convert block key into block number. +pub fn db_key_to_number(key: &[u8]) -> client::error::Result where N: As { + match key.len() { + 4 => Ok((key[0] as u64) << 24 + | (key[1] as u64) << 16 + | (key[2] as u64) << 8 + | (key[3] as u64)).map(As::sa), + _ => Err(client::error::ErrorKind::Backend("Invalid block key".into()).into()), + } +} + /// Maps database error to client error pub fn db_err(err: kvdb::Error) -> client::error::Error { use std::error::Error; diff --git a/substrate/client/src/backend.rs b/substrate/client/src/backend.rs index 39146fc9feb21..0686e1e47aadd 100644 --- a/substrate/client/src/backend.rs +++ b/substrate/client/src/backend.rs @@ -18,6 +18,7 @@ use state_machine::backend::Backend as StateBackend; use error; +use primitives::AuthorityId; use runtime_primitives::bft::Justification; use runtime_primitives::traits::{Block as BlockT, NumberFor}; use runtime_primitives::generic::BlockId; @@ -38,6 +39,9 @@ pub trait BlockImportOperation { is_new_best: bool ) -> error::Result<()>; + /// Append authorities set to the transaction. This is a set of parent block (set which + /// has been used to check justification of this block). + fn update_authorities(&mut self, authorities: Vec); /// Inject storage data into the database. fn update_storage(&mut self, update: ::Transaction) -> error::Result<()>; /// Inject storage data into the database replacing any existing data. diff --git a/substrate/client/src/blockchain.rs b/substrate/client/src/blockchain.rs index 18aed482b8c4d..333bc8f72e156 100644 --- a/substrate/client/src/blockchain.rs +++ b/substrate/client/src/blockchain.rs @@ -16,6 +16,7 @@ //! Polkadot blockchain trait +use primitives::AuthorityId; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; use runtime_primitives::generic::BlockId; use runtime_primitives::bft::Justification; @@ -40,6 +41,15 @@ pub trait Backend: HeaderBackend { fn body(&self, id: BlockId) -> Result::Extrinsic>>>; /// Get block justification. Returns `None` if justification does not exist. fn justification(&self, id: BlockId) -> Result>>; + + /// Returns data cache reference, if it is enabled on this backend. + fn cache(&self) -> Option<&Cache>; +} + +/// Blockchain optional data cache. +pub trait Cache: Send + Sync { + /// Returns the set of authorities, that was active at given block or None if there's no entry in the cache. + fn authorities_at(&self, block: BlockId) -> Option>; } /// Block import outcome diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index c4b6ff15b5792..c125a1aa4919b 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -133,12 +133,13 @@ pub struct BlockImportNotification { pub struct JustifiedHeader { header: ::Header, justification: ::bft::Justification, + authorities: Vec, } impl JustifiedHeader { /// Deconstruct the justified header into parts. - pub fn into_inner(self) -> (::Header, ::bft::Justification) { - (self.header, self.justification) + pub fn into_inner(self) -> (::Header, ::bft::Justification, Vec) { + (self.header, self.justification, self.authorities) } } @@ -213,9 +214,12 @@ impl Client where /// Get the set of authorities at a given block. pub fn authorities_at(&self, id: &BlockId) -> error::Result> { - self.executor.call(id, "authorities", &[]) - .and_then(|r| Vec::::decode(&mut &r.return_data[..]) - .ok_or(error::ErrorKind::AuthLenInvalid.into())) + match self.backend.blockchain().cache().and_then(|cache| cache.authorities_at(*id)) { + Some(cached_value) => Ok(cached_value), + None => self.executor.call(id, "authorities",&[]) + .and_then(|r| Vec::::decode(&mut &r.return_data[..]) + .ok_or(error::ErrorKind::AuthLenInvalid.into())) + } } /// Get the RuntimeVersion at a given block. @@ -281,6 +285,7 @@ impl Client where Ok(JustifiedHeader { header, justification: just, + authorities, }) } @@ -291,7 +296,7 @@ impl Client where header: JustifiedHeader, body: Option::Extrinsic>>, ) -> error::Result { - let (header, justification) = header.into_inner(); + let (header, justification, authorities) = header.into_inner(); let parent_hash = header.parent_hash().clone(); match self.backend.blockchain().status(BlockId::Hash(parent_hash))? { blockchain::BlockStatus::InChain => {}, @@ -301,7 +306,7 @@ impl Client where let _import_lock = self.import_lock.lock(); let height: u64 = header.number().as_(); *self.importing_block.write() = Some(hash); - let result = self.execute_and_import_block(origin, hash, header, justification, body); + let result = self.execute_and_import_block(origin, hash, header, justification, body, authorities); *self.importing_block.write() = None; telemetry!("block.import"; "height" => height, @@ -318,6 +323,7 @@ impl Client where header: Block::Header, justification: bft::Justification, body: Option>, + authorities: Vec, ) -> error::Result { let parent_hash = header.parent_hash().clone(); match self.backend.blockchain().status(BlockId::Hash(hash))? { @@ -362,6 +368,7 @@ impl Client where trace!("Imported {}, (#{}), best={}, origin={:?}", hash, header.number(), is_new_best, origin); let unchecked: bft::UncheckedJustification<_> = justification.uncheck().into(); transaction.set_block_data(header.clone(), body, Some(unchecked.into()), is_new_best)?; + transaction.update_authorities(authorities); if let Some(storage_update) = storage_update { transaction.update_storage(storage_update)?; } @@ -467,11 +474,17 @@ impl bft::BlockImport for Client E: CallExecutor, Block: BlockT, { - fn import_block(&self, block: Block, justification: ::bft::Justification) { + fn import_block( + &self, + block: Block, + justification: ::bft::Justification, + authorities: &[AuthorityId] + ) { let (header, extrinsics) = block.deconstruct(); let justified_header = JustifiedHeader { header: header, justification, + authorities: authorities.to_vec(), }; let _ = self.import_block(BlockOrigin::ConsensusBroadcast, justified_header, Some(extrinsics)); @@ -533,6 +546,7 @@ mod tests { use keyring::Keyring; use test_client::{self, TestClient}; use test_client::client::BlockOrigin; + use test_client::client::backend::Backend as TestBackend; use test_client::runtime as test_runtime; use test_client::runtime::{Transfer, Extrinsic}; @@ -593,6 +607,18 @@ mod tests { assert_eq!(client.using_environment(|| test_runtime::system::balance_of(Keyring::Ferdie.to_raw_public().into())).unwrap(), 42); } + #[test] + fn client_uses_authorities_from_blockchain_cache() { + let client = test_client::new(); + test_client::client::in_mem::cache_authorities_at( + client.backend().blockchain(), + Default::default(), + Some(vec![[1u8; 32].into()])); + assert_eq!(client.authorities_at( + &BlockId::Hash(Default::default())).unwrap(), + vec![[1u8; 32].into()]); + } + #[test] fn block_builder_does_not_include_invalid() { let client = test_client::new(); diff --git a/substrate/client/src/in_mem.rs b/substrate/client/src/in_mem.rs index 2da0f4e6d0c38..a12323610a723 100644 --- a/substrate/client/src/in_mem.rs +++ b/substrate/client/src/in_mem.rs @@ -22,6 +22,7 @@ use parking_lot::RwLock; use error; use backend; use light; +use primitives::AuthorityId; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Zero, NumberFor, As}; use runtime_primitives::bft::Justification; @@ -88,9 +89,27 @@ struct BlockchainStorage { } /// In-memory blockchain. Supports concurrent reads. -#[derive(Clone)] pub struct Blockchain { storage: Arc>>, + cache: Cache, +} + +struct Cache { + storage: Arc>>, + authorities_at: RwLock>>>, +} + +impl Clone for Blockchain { + fn clone(&self) -> Self { + let storage = Arc::new(RwLock::new(self.storage.read().clone())); + Blockchain { + storage: storage.clone(), + cache: Cache { + storage, + authorities_at: RwLock::new(self.cache.authorities_at.read().clone()), + }, + } + } } impl Blockchain { @@ -113,7 +132,11 @@ impl Blockchain { genesis_hash: Default::default(), })); Blockchain { - storage: storage, + storage: storage.clone(), + cache: Cache { + storage: storage, + authorities_at: Default::default(), + }, } } @@ -197,19 +220,37 @@ impl blockchain::Backend for Blockchain { b.justification().map(|x| x.clone())) )) } + + fn cache(&self) -> Option<&blockchain::Cache> { + Some(&self.cache) + } } impl light::blockchain::Storage for Blockchain { - fn import_header(&self, is_new_best: bool, header: Block::Header) -> error::Result<()> { + fn import_header( + &self, + is_new_best: bool, + header: Block::Header, + authorities: Option> + ) -> error::Result<()> { let hash = header.hash(); + let parent_hash = *header.parent_hash(); self.insert(hash, header, None, None, is_new_best); + if is_new_best { + self.cache.insert(parent_hash, authorities); + } Ok(()) } + + fn cache(&self) -> Option<&blockchain::Cache> { + Some(&self.cache) + } } /// In-memory operation. pub struct BlockImportOperation { pending_block: Option>, + pending_authorities: Option>, old_state: InMemory, new_state: Option, } @@ -236,6 +277,10 @@ impl backend::BlockImportOperation for BlockImportOperatio Ok(()) } + fn update_authorities(&mut self, authorities: Vec) { + self.pending_authorities = Some(authorities); + } + fn update_storage(&mut self, update: ::Transaction) -> error::Result<()> { self.new_state = Some(self.old_state.update(update)); Ok(()) @@ -282,6 +327,7 @@ impl backend::Backend for Backend where Ok(BlockImportOperation { pending_block: None, + pending_authorities: None, old_state: state, new_state: None, }) @@ -292,9 +338,14 @@ impl backend::Backend for Backend where let old_state = &operation.old_state; let (header, body, justification) = pending_block.block.into_inner(); let hash = header.hash(); + let parent_hash = *header.parent_hash(); self.states.write().insert(hash, operation.new_state.unwrap_or_else(|| old_state.clone())); self.blockchain.insert(hash, header, justification, body, pending_block.is_best); + // dumb implementation - store value for each block + if pending_block.is_best { + self.blockchain.cache.insert(parent_hash, operation.pending_authorities); + } } Ok(()) } @@ -316,3 +367,29 @@ impl backend::Backend for Backend where } impl backend::LocalBackend for Backend {} + +impl Cache { + fn insert(&self, at: Block::Hash, authorities: Option>) { + self.authorities_at.write().insert(at, authorities); + } +} + +impl blockchain::Cache for Cache { + fn authorities_at(&self, block: BlockId) -> Option> { + let hash = match block { + BlockId::Hash(hash) => hash, + BlockId::Number(number) => self.storage.read().hashes.get(&number).cloned()?, + }; + + self.authorities_at.read().get(&hash).cloned().unwrap_or(None) + } +} + +/// Insert authorities entry into in-memory blockchain cache. Extracted as a separate function to use it in tests. +pub fn cache_authorities_at( + blockchain: &Blockchain, + at: Block::Hash, + authorities: Option> +) { + blockchain.cache.insert(at, authorities); +} diff --git a/substrate/client/src/light/backend.rs b/substrate/client/src/light/backend.rs index a9eb4eb09b564..d06aef80bf5ab 100644 --- a/substrate/client/src/light/backend.rs +++ b/substrate/client/src/light/backend.rs @@ -19,6 +19,7 @@ use std::sync::{Arc, Weak}; +use primitives::AuthorityId; use runtime_primitives::{bft::Justification, generic::BlockId}; use runtime_primitives::traits::{Block as BlockT, NumberFor}; use state_machine::{Backend as StateBackend, TrieBackend as StateTrieBackend, @@ -39,6 +40,7 @@ pub struct Backend { pub struct ImportOperation { is_new_best: bool, header: Option, + authorities: Option>, _phantom: ::std::marker::PhantomData, } @@ -69,13 +71,14 @@ impl ClientBackend for Backend where Block: BlockT, S: Ok(ImportOperation { is_new_best: false, header: None, + authorities: None, _phantom: Default::default(), }) } fn commit_operation(&self, operation: Self::BlockImportOperation) -> ClientResult<()> { let header = operation.header.expect("commit is called after set_block_data; set_block_data sets header; qed"); - self.blockchain.storage().import_header(operation.is_new_best, header) + self.blockchain.storage().import_header(operation.is_new_best, header, operation.authorities) } fn blockchain(&self) -> &Blockchain { @@ -121,6 +124,10 @@ impl BlockImportOperation for ImportOperation where B Ok(()) } + fn update_authorities(&mut self, authorities: Vec) { + self.authorities = Some(authorities); + } + fn update_storage(&mut self, _update: ::Transaction) -> ClientResult<()> { // we're not storing anything locally => ignore changes Ok(()) diff --git a/substrate/client/src/light/blockchain.rs b/substrate/client/src/light/blockchain.rs index 9655f91baa616..4d9e7ca38dca7 100644 --- a/substrate/client/src/light/blockchain.rs +++ b/substrate/client/src/light/blockchain.rs @@ -20,10 +20,11 @@ use std::sync::Weak; use parking_lot::Mutex; +use primitives::AuthorityId; use runtime_primitives::{bft::Justification, generic::BlockId}; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; -use blockchain::{Backend as BlockchainBackend, BlockStatus, +use blockchain::{Backend as BlockchainBackend, BlockStatus, Cache as BlockchainCache, HeaderBackend as BlockchainHeaderBackend, Info as BlockchainInfo}; use error::Result as ClientResult; use light::fetcher::Fetcher; @@ -31,7 +32,15 @@ use light::fetcher::Fetcher; /// Light client blockchain storage. pub trait Storage: BlockchainHeaderBackend { /// Store new header. - fn import_header(&self, is_new_best: bool, header: Block::Header) -> ClientResult<()>; + fn import_header( + &self, + is_new_best: bool, + header: Block::Header, + authorities: Option> + ) -> ClientResult<()>; + + /// Get storage cache. + fn cache(&self) -> Option<&BlockchainCache>; } /// Light client blockchain. @@ -92,4 +101,8 @@ impl BlockchainBackend for Blockchain where Block: Blo fn justification(&self, _id: BlockId) -> ClientResult>> { Ok(None) } + + fn cache(&self) -> Option<&BlockchainCache> { + self.storage.cache() + } }