diff --git a/src/commons/constants.rs b/src/commons/constants.rs index 6bf94da..5339036 100644 --- a/src/commons/constants.rs +++ b/src/commons/constants.rs @@ -39,7 +39,7 @@ pub const LISTEN_PORT: u16 = 4244; pub const UI_REFRESH_DELAY_MS: u128 = 500; pub const LOG_REFRESH_DELAY_SEC: u64 = 60; -pub const POLL_TIMEOUT: Option = Some(Duration::from_millis(250)); +pub const POLL_TIMEOUT: Option = Some(Duration::from_millis(500)); /// We start syncing blocks only when we got 8 and more connected nodes pub const MIN_CONNECTED_NODES_START_SYNC: usize = 8; pub const MAX_RECONNECTS: u32 = 5; diff --git a/src/p2p/network.rs b/src/p2p/network.rs index 22a07b6..9af533f 100644 --- a/src/p2p/network.rs +++ b/src/p2p/network.rs @@ -24,7 +24,7 @@ use crate::blockchain::types::BlockQuality; use crate::commons::*; use crate::eventbus::{register, post}; use crate::crypto::Chacha; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; const SERVER: Token = Token(0); @@ -36,7 +36,7 @@ pub struct Network { // States of peer connections, and some data to send when sockets become writable peers: Peers, // Orphan blocks from future - blocks: HashMap, + future_blocks: HashMap, } impl Network { @@ -46,7 +46,7 @@ impl Network { let secret_key = StaticSecret::new(&mut thread_rng); let public_key = PublicKey::from(&secret_key); let peers = Peers::new(); - Network { context, secret_key, public_key, token: Token(1), peers, blocks: HashMap::new() } + Network { context, secret_key, public_key, token: Token(1), peers, future_blocks: HashMap::new() } } pub fn start(&mut self) { @@ -183,7 +183,9 @@ impl Network { } (blocks, max_height, context.chain.get_last_hash()) }; - self.peers.update(poll.registry(), height, max_height, hash); + + let have_blocks: HashSet = self.future_blocks.values().map(|block| block.index).collect(); + self.peers.update(poll.registry(), hash, height, max_height, have_blocks); ui_timer = Instant::now(); } } @@ -571,11 +573,12 @@ impl Network { let mut next_index = block.index + 1; context.chain.add_block(block); // If we have some consequent blocks in a bucket of 'future blocks', we add them - while let Some(block) = self.blocks.remove(&next_index) { + while let Some(block) = self.future_blocks.remove(&next_index) { if context.chain.check_new_block(&block) == BlockQuality::Good { info!("Added block {} from future blocks", next_index); context.chain.add_block(block); } else { + warn!("Block {} in future blocks is bad!", block.index); break; } next_index += 1; @@ -585,7 +588,7 @@ impl Network { // If it was the last block to sync if my_height == max_height { post(crate::event::Event::SyncFinished); - self.blocks.clear(); + self.future_blocks.clear(); } else { let event = crate::event::Event::Syncing { have: my_height, height: max(max_height, my_height) }; post(event); @@ -597,7 +600,7 @@ impl Network { BlockQuality::Twin => { debug!("Ignoring duplicate block {}", block.index); } BlockQuality::Future => { debug!("Got future block {}", block.index); - self.blocks.insert(block.index, block); + self.future_blocks.insert(block.index, block); } BlockQuality::Bad => { // TODO save bad public keys to banned table diff --git a/src/p2p/peers.rs b/src/p2p/peers.rs index b28b917..46086c1 100644 --- a/src/p2p/peers.rs +++ b/src/p2p/peers.rs @@ -246,7 +246,7 @@ impl Peers { false } - pub fn update(&mut self, registry: &Registry, height: u64, max_height: u64, hash: Bytes) { + pub fn update(&mut self, registry: &Registry, hash: Bytes, height: u64, max_height: u64, have_blocks: HashSet) { let nodes = self.get_peers_active_count(); let random_time = random::() % PING_PERIOD; @@ -274,7 +274,7 @@ impl Peers { if nodes >= MIN_CONNECTED_NODES_START_SYNC { if height < max_height { let count = min(max_height - height, (nodes - 5) as u64); - self.ask_blocks_from_peers(registry, height, height + count); + self.ask_blocks_from_peers(registry, height, height + count, have_blocks); } } @@ -340,7 +340,7 @@ impl Peers { } } - fn ask_blocks_from_peers(&mut self, registry: &Registry, height: u64, max_height: u64) { + fn ask_blocks_from_peers(&mut self, registry: &Registry, height: u64, max_height: u64, have_blocks: HashSet) { let mut rng = rand::thread_rng(); let peers = self.peers .iter_mut() @@ -348,6 +348,11 @@ impl Peers { .choose_multiple(&mut rng, (max_height - height) as usize); let mut index = height + 1; for (token, peer) in peers { + if have_blocks.contains(&index) { + debug!("We have block {} in map, skipping request", index); + index += 1; + continue; + } debug!("Peer {} is higher than we are, requesting block {}", &peer.get_addr().ip(), index); registry.reregister(peer.get_stream(), token.clone(), Interest::WRITABLE).unwrap(); peer.set_state(State::message(Message::GetBlock { index }));