More optimizations for initial blocks sync.

This commit is contained in:
Revertron
2021-06-06 00:03:53 +02:00
parent e82f0ee9b1
commit c8183e0e58
3 changed files with 19 additions and 11 deletions
+1 -1
View File
@@ -39,7 +39,7 @@ pub const LISTEN_PORT: u16 = 4244;
pub const UI_REFRESH_DELAY_MS: u128 = 500; pub const UI_REFRESH_DELAY_MS: u128 = 500;
pub const LOG_REFRESH_DELAY_SEC: u64 = 60; pub const LOG_REFRESH_DELAY_SEC: u64 = 60;
pub const POLL_TIMEOUT: Option<Duration> = Some(Duration::from_millis(250)); pub const POLL_TIMEOUT: Option<Duration> = Some(Duration::from_millis(500));
/// We start syncing blocks only when we got 8 and more connected nodes /// We start syncing blocks only when we got 8 and more connected nodes
pub const MIN_CONNECTED_NODES_START_SYNC: usize = 8; pub const MIN_CONNECTED_NODES_START_SYNC: usize = 8;
pub const MAX_RECONNECTS: u32 = 5; pub const MAX_RECONNECTS: u32 = 5;
+10 -7
View File
@@ -24,7 +24,7 @@ use crate::blockchain::types::BlockQuality;
use crate::commons::*; use crate::commons::*;
use crate::eventbus::{register, post}; use crate::eventbus::{register, post};
use crate::crypto::Chacha; use crate::crypto::Chacha;
use std::collections::HashMap; use std::collections::{HashMap, HashSet};
const SERVER: Token = Token(0); const SERVER: Token = Token(0);
@@ -36,7 +36,7 @@ pub struct Network {
// States of peer connections, and some data to send when sockets become writable // States of peer connections, and some data to send when sockets become writable
peers: Peers, peers: Peers,
// Orphan blocks from future // Orphan blocks from future
blocks: HashMap<u64, Block>, future_blocks: HashMap<u64, Block>,
} }
impl Network { impl Network {
@@ -46,7 +46,7 @@ impl Network {
let secret_key = StaticSecret::new(&mut thread_rng); let secret_key = StaticSecret::new(&mut thread_rng);
let public_key = PublicKey::from(&secret_key); let public_key = PublicKey::from(&secret_key);
let peers = Peers::new(); let peers = Peers::new();
Network { context, secret_key, public_key, token: Token(1), peers, blocks: HashMap::new() } Network { context, secret_key, public_key, token: Token(1), peers, future_blocks: HashMap::new() }
} }
pub fn start(&mut self) { pub fn start(&mut self) {
@@ -183,7 +183,9 @@ impl Network {
} }
(blocks, max_height, context.chain.get_last_hash()) (blocks, max_height, context.chain.get_last_hash())
}; };
self.peers.update(poll.registry(), height, max_height, hash);
let have_blocks: HashSet<u64> = self.future_blocks.values().map(|block| block.index).collect();
self.peers.update(poll.registry(), hash, height, max_height, have_blocks);
ui_timer = Instant::now(); ui_timer = Instant::now();
} }
} }
@@ -571,11 +573,12 @@ impl Network {
let mut next_index = block.index + 1; let mut next_index = block.index + 1;
context.chain.add_block(block); context.chain.add_block(block);
// If we have some consequent blocks in a bucket of 'future blocks', we add them // If we have some consequent blocks in a bucket of 'future blocks', we add them
while let Some(block) = self.blocks.remove(&next_index) { while let Some(block) = self.future_blocks.remove(&next_index) {
if context.chain.check_new_block(&block) == BlockQuality::Good { if context.chain.check_new_block(&block) == BlockQuality::Good {
info!("Added block {} from future blocks", next_index); info!("Added block {} from future blocks", next_index);
context.chain.add_block(block); context.chain.add_block(block);
} else { } else {
warn!("Block {} in future blocks is bad!", block.index);
break; break;
} }
next_index += 1; next_index += 1;
@@ -585,7 +588,7 @@ impl Network {
// If it was the last block to sync // If it was the last block to sync
if my_height == max_height { if my_height == max_height {
post(crate::event::Event::SyncFinished); post(crate::event::Event::SyncFinished);
self.blocks.clear(); self.future_blocks.clear();
} else { } else {
let event = crate::event::Event::Syncing { have: my_height, height: max(max_height, my_height) }; let event = crate::event::Event::Syncing { have: my_height, height: max(max_height, my_height) };
post(event); post(event);
@@ -597,7 +600,7 @@ impl Network {
BlockQuality::Twin => { debug!("Ignoring duplicate block {}", block.index); } BlockQuality::Twin => { debug!("Ignoring duplicate block {}", block.index); }
BlockQuality::Future => { BlockQuality::Future => {
debug!("Got future block {}", block.index); debug!("Got future block {}", block.index);
self.blocks.insert(block.index, block); self.future_blocks.insert(block.index, block);
} }
BlockQuality::Bad => { BlockQuality::Bad => {
// TODO save bad public keys to banned table // TODO save bad public keys to banned table
+8 -3
View File
@@ -246,7 +246,7 @@ impl Peers {
false false
} }
pub fn update(&mut self, registry: &Registry, height: u64, max_height: u64, hash: Bytes) { pub fn update(&mut self, registry: &Registry, hash: Bytes, height: u64, max_height: u64, have_blocks: HashSet<u64>) {
let nodes = self.get_peers_active_count(); let nodes = self.get_peers_active_count();
let random_time = random::<u64>() % PING_PERIOD; let random_time = random::<u64>() % PING_PERIOD;
@@ -274,7 +274,7 @@ impl Peers {
if nodes >= MIN_CONNECTED_NODES_START_SYNC { if nodes >= MIN_CONNECTED_NODES_START_SYNC {
if height < max_height { if height < max_height {
let count = min(max_height - height, (nodes - 5) as u64); let count = min(max_height - height, (nodes - 5) as u64);
self.ask_blocks_from_peers(registry, height, height + count); self.ask_blocks_from_peers(registry, height, height + count, have_blocks);
} }
} }
@@ -340,7 +340,7 @@ impl Peers {
} }
} }
fn ask_blocks_from_peers(&mut self, registry: &Registry, height: u64, max_height: u64) { fn ask_blocks_from_peers(&mut self, registry: &Registry, height: u64, max_height: u64, have_blocks: HashSet<u64>) {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let peers = self.peers let peers = self.peers
.iter_mut() .iter_mut()
@@ -348,6 +348,11 @@ impl Peers {
.choose_multiple(&mut rng, (max_height - height) as usize); .choose_multiple(&mut rng, (max_height - height) as usize);
let mut index = height + 1; let mut index = height + 1;
for (token, peer) in peers { for (token, peer) in peers {
if have_blocks.contains(&index) {
debug!("We have block {} in map, skipping request", index);
index += 1;
continue;
}
debug!("Peer {} is higher than we are, requesting block {}", &peer.get_addr().ip(), index); debug!("Peer {} is higher than we are, requesting block {}", &peer.get_addr().ip(), index);
registry.reregister(peer.get_stream(), token.clone(), Interest::WRITABLE).unwrap(); registry.reregister(peer.get_stream(), token.clone(), Interest::WRITABLE).unwrap();
peer.set_state(State::message(Message::GetBlock { index })); peer.set_state(State::message(Message::GetBlock { index }));