diff --git a/Cargo.lock b/Cargo.lock index 38e61c3..2f97a7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -65,7 +65,7 @@ dependencies = [ [[package]] name = "alfis" -version = "0.6.2" +version = "0.6.3" dependencies = [ "base64", "bincode", diff --git a/Cargo.toml b/Cargo.toml index 4d0425c..005fe2e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "alfis" -version = "0.6.2" +version = "0.6.3" authors = ["Revertron "] edition = "2018" build = "build.rs" diff --git a/src/blockchain/chain.rs b/src/blockchain/chain.rs index 10908f5..782edea 100644 --- a/src/blockchain/chain.rs +++ b/src/blockchain/chain.rs @@ -265,7 +265,7 @@ impl Chain { trace!("We can't sign blocks without keys"); return None; } - if self.get_height() < self.max_height() { + if self.get_height() < self.get_max_height() { trace!("No signing while syncing"); return None; } @@ -702,7 +702,7 @@ impl Chain { } } - pub fn max_height(&self) -> u64 { + pub fn get_max_height(&self) -> u64 { self.max_height } @@ -727,7 +727,7 @@ impl Chain { } if let Some(last) = last_block { if block.index > last.index + 1 { - info!("Ignoring future block:\n{:?}", &block); + info!("Got future block {}", block.index); return Future; } } diff --git a/src/commons/constants.rs b/src/commons/constants.rs index c804545..6bf94da 100644 --- a/src/commons/constants.rs +++ b/src/commons/constants.rs @@ -40,6 +40,8 @@ pub const UI_REFRESH_DELAY_MS: u128 = 500; pub const LOG_REFRESH_DELAY_SEC: u64 = 60; pub const POLL_TIMEOUT: Option = Some(Duration::from_millis(250)); +/// We start syncing blocks only when we got 8 and more connected nodes +pub const MIN_CONNECTED_NODES_START_SYNC: usize = 8; pub const MAX_RECONNECTS: u32 = 5; pub const MAX_IDLE_SECONDS: u64 = 180; pub const MAX_NODES: usize = 15; diff --git a/src/p2p/network.rs b/src/p2p/network.rs index 1588346..22a07b6 100644 --- a/src/p2p/network.rs +++ b/src/p2p/network.rs @@ -24,6 +24,7 @@ use crate::blockchain::types::BlockQuality; use crate::commons::*; use crate::eventbus::{register, post}; use crate::crypto::Chacha; +use std::collections::HashMap; const SERVER: Token = Token(0); @@ -34,6 +35,8 @@ pub struct Network { token: Token, // States of peer connections, and some data to send when sockets become writable peers: Peers, + // Orphan blocks from future + blocks: HashMap, } impl Network { @@ -43,7 +46,7 @@ impl Network { let secret_key = StaticSecret::new(&mut thread_rng); let public_key = PublicKey::from(&secret_key); let peers = Peers::new(); - Network { context, secret_key, public_key, token: Token(1), peers } + Network { context, secret_key, public_key, token: Token(1), peers, blocks: HashMap::new() } } pub fn start(&mut self) { @@ -155,9 +158,10 @@ impl Network { if ui_timer.elapsed().as_millis() > UI_REFRESH_DELAY_MS { // Send pings to idle peers - let (height, hash) = { + let (height, max_height, hash) = { let context = self.context.lock().unwrap(); let blocks = context.chain.get_height(); + let max_height = context.chain.get_max_height(); let nodes = self.peers.get_peers_active_count(); let banned = self.peers.get_peers_banned_count(); @@ -177,9 +181,9 @@ impl Network { self.peers.connect_new_peers(poll.registry(), &mut self.token, yggdrasil_only); connect_timer = Instant::now(); } - (blocks, context.chain.get_last_hash()) + (blocks, max_height, context.chain.get_last_hash()) }; - self.peers.update(poll.registry(), height, hash); + self.peers.update(poll.registry(), height, max_height, hash); ui_timer = Instant::now(); } } @@ -561,15 +565,27 @@ impl Network { peer.set_received_block(block.index); let mut context = self.context.lock().unwrap(); - let max_height = context.chain.max_height(); + let max_height = context.chain.get_max_height(); match context.chain.check_new_block(&block) { BlockQuality::Good => { + let mut next_index = block.index + 1; context.chain.add_block(block); + // If we have some consequent blocks in a bucket of 'future blocks', we add them + while let Some(block) = self.blocks.remove(&next_index) { + if context.chain.check_new_block(&block) == BlockQuality::Good { + info!("Added block {} from future blocks", next_index); + context.chain.add_block(block); + } else { + break; + } + next_index += 1; + } let my_height = context.chain.get_height(); post(crate::event::Event::BlockchainChanged { index: my_height }); // If it was the last block to sync if my_height == max_height { post(crate::event::Event::SyncFinished); + self.blocks.clear(); } else { let event = crate::event::Event::Syncing { have: my_height, height: max(max_height, my_height) }; post(event); @@ -577,21 +593,21 @@ impl Network { let domains = context.chain.get_domains_count(); let keys = context.chain.get_users_count(); post(crate::event::Event::NetworkStatus { blocks: my_height, domains, keys, nodes: peers_count }); - // To load blocks from different nodes we randomize requests of new blocks - // TODO rethink this approach - if max_height > my_height && random::() < 200 { - return State::message(Message::GetBlock { index: my_height + 1 }); - } } BlockQuality::Twin => { debug!("Ignoring duplicate block {}", block.index); } - BlockQuality::Future => { debug!("Ignoring future block {}", block.index); } + BlockQuality::Future => { + debug!("Got future block {}", block.index); + self.blocks.insert(block.index, block); + } BlockQuality::Bad => { // TODO save bad public keys to banned table debug!("Ignoring bad block from {}:\n{:?}", peer.get_addr(), &block); let height = context.chain.get_height(); - context.chain.update_max_height(height); - post(crate::event::Event::SyncFinished); - return State::Banned; + if height + 1 == block.index { + context.chain.update_max_height(height); + post(crate::event::Event::SyncFinished); + return State::Banned; + } } BlockQuality::Rewind => { debug!("Got some orphan block, requesting its parent"); diff --git a/src/p2p/peers.rs b/src/p2p/peers.rs index dbe3558..b28b917 100644 --- a/src/p2p/peers.rs +++ b/src/p2p/peers.rs @@ -13,6 +13,7 @@ use crate::{Bytes, commons}; use crate::commons::*; use crate::p2p::{Message, Peer, State}; use std::io; +use std::cmp::min; const PING_PERIOD: u64 = 30; @@ -245,7 +246,7 @@ impl Peers { false } - pub fn update(&mut self, registry: &Registry, height: u64, hash: Bytes) { + pub fn update(&mut self, registry: &Registry, height: u64, max_height: u64, hash: Bytes) { let nodes = self.get_peers_active_count(); let random_time = random::() % PING_PERIOD; @@ -270,18 +271,10 @@ impl Peers { } // If someone has more blocks we sync - { - let mut rng = rand::thread_rng(); - match self.peers - .iter_mut() - .filter_map(|(token, peer)| if peer.has_more_blocks(height) { Some((token, peer)) } else { None }) - .choose(&mut rng) { - None => {} - Some((token, peer)) => { - debug!("Peer {} is higher than we are, requesting block {}", &peer.get_addr().ip(), height + 1); - registry.reregister(peer.get_stream(), token.clone(), Interest::WRITABLE).unwrap(); - peer.set_state(State::message(Message::GetBlock { index: height + 1 })); - } + if nodes >= MIN_CONNECTED_NODES_START_SYNC { + if height < max_height { + let count = min(max_height - height, (nodes - 5) as u64); + self.ask_blocks_from_peers(registry, height, height + count); } } @@ -331,6 +324,40 @@ impl Peers { } } + #[allow(dead_code)] + fn ask_block_from_peer(&mut self, registry: &Registry, height: u64) { + let mut rng = rand::thread_rng(); + match self.peers + .iter_mut() + .filter_map(|(token, peer)| if peer.has_more_blocks(height) { Some((token, peer)) } else { None }) + .choose(&mut rng) { + None => {} + Some((token, peer)) => { + debug!("Peer {} is higher than we are, requesting block {}", &peer.get_addr().ip(), height + 1); + registry.reregister(peer.get_stream(), token.clone(), Interest::WRITABLE).unwrap(); + peer.set_state(State::message(Message::GetBlock { index: height + 1 })); + } + } + } + + fn ask_blocks_from_peers(&mut self, registry: &Registry, height: u64, max_height: u64) { + let mut rng = rand::thread_rng(); + let peers = self.peers + .iter_mut() + .filter_map(|(token, peer)| if peer.has_more_blocks(height) { Some((token, peer)) } else { None }) + .choose_multiple(&mut rng, (max_height - height) as usize); + let mut index = height + 1; + for (token, peer) in peers { + debug!("Peer {} is higher than we are, requesting block {}", &peer.get_addr().ip(), index); + registry.reregister(peer.get_stream(), token.clone(), Interest::WRITABLE).unwrap(); + peer.set_state(State::message(Message::GetBlock { index })); + index += 1; + if index > max_height { + break; + } + } + } + pub fn connect_new_peers(&mut self, registry: &Registry, unique_token: &mut Token, yggdrasil_only: bool) { if self.new_peers.is_empty() { return;