Optimized initial blocks sync.
This commit is contained in:
Generated
+1
-1
@@ -65,7 +65,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alfis"
|
||||
version = "0.6.2"
|
||||
version = "0.6.3"
|
||||
dependencies = [
|
||||
"base64",
|
||||
"bincode",
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "alfis"
|
||||
version = "0.6.2"
|
||||
version = "0.6.3"
|
||||
authors = ["Revertron <alfis@revertron.com>"]
|
||||
edition = "2018"
|
||||
build = "build.rs"
|
||||
|
||||
@@ -265,7 +265,7 @@ impl Chain {
|
||||
trace!("We can't sign blocks without keys");
|
||||
return None;
|
||||
}
|
||||
if self.get_height() < self.max_height() {
|
||||
if self.get_height() < self.get_max_height() {
|
||||
trace!("No signing while syncing");
|
||||
return None;
|
||||
}
|
||||
@@ -702,7 +702,7 @@ impl Chain {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn max_height(&self) -> u64 {
|
||||
pub fn get_max_height(&self) -> u64 {
|
||||
self.max_height
|
||||
}
|
||||
|
||||
@@ -727,7 +727,7 @@ impl Chain {
|
||||
}
|
||||
if let Some(last) = last_block {
|
||||
if block.index > last.index + 1 {
|
||||
info!("Ignoring future block:\n{:?}", &block);
|
||||
info!("Got future block {}", block.index);
|
||||
return Future;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,6 +40,8 @@ pub const UI_REFRESH_DELAY_MS: u128 = 500;
|
||||
pub const LOG_REFRESH_DELAY_SEC: u64 = 60;
|
||||
|
||||
pub const POLL_TIMEOUT: Option<Duration> = Some(Duration::from_millis(250));
|
||||
/// We start syncing blocks only when we got 8 and more connected nodes
|
||||
pub const MIN_CONNECTED_NODES_START_SYNC: usize = 8;
|
||||
pub const MAX_RECONNECTS: u32 = 5;
|
||||
pub const MAX_IDLE_SECONDS: u64 = 180;
|
||||
pub const MAX_NODES: usize = 15;
|
||||
|
||||
+30
-14
@@ -24,6 +24,7 @@ use crate::blockchain::types::BlockQuality;
|
||||
use crate::commons::*;
|
||||
use crate::eventbus::{register, post};
|
||||
use crate::crypto::Chacha;
|
||||
use std::collections::HashMap;
|
||||
|
||||
const SERVER: Token = Token(0);
|
||||
|
||||
@@ -34,6 +35,8 @@ pub struct Network {
|
||||
token: Token,
|
||||
// States of peer connections, and some data to send when sockets become writable
|
||||
peers: Peers,
|
||||
// Orphan blocks from future
|
||||
blocks: HashMap<u64, Block>,
|
||||
}
|
||||
|
||||
impl Network {
|
||||
@@ -43,7 +46,7 @@ impl Network {
|
||||
let secret_key = StaticSecret::new(&mut thread_rng);
|
||||
let public_key = PublicKey::from(&secret_key);
|
||||
let peers = Peers::new();
|
||||
Network { context, secret_key, public_key, token: Token(1), peers }
|
||||
Network { context, secret_key, public_key, token: Token(1), peers, blocks: HashMap::new() }
|
||||
}
|
||||
|
||||
pub fn start(&mut self) {
|
||||
@@ -155,9 +158,10 @@ impl Network {
|
||||
|
||||
if ui_timer.elapsed().as_millis() > UI_REFRESH_DELAY_MS {
|
||||
// Send pings to idle peers
|
||||
let (height, hash) = {
|
||||
let (height, max_height, hash) = {
|
||||
let context = self.context.lock().unwrap();
|
||||
let blocks = context.chain.get_height();
|
||||
let max_height = context.chain.get_max_height();
|
||||
let nodes = self.peers.get_peers_active_count();
|
||||
let banned = self.peers.get_peers_banned_count();
|
||||
|
||||
@@ -177,9 +181,9 @@ impl Network {
|
||||
self.peers.connect_new_peers(poll.registry(), &mut self.token, yggdrasil_only);
|
||||
connect_timer = Instant::now();
|
||||
}
|
||||
(blocks, context.chain.get_last_hash())
|
||||
(blocks, max_height, context.chain.get_last_hash())
|
||||
};
|
||||
self.peers.update(poll.registry(), height, hash);
|
||||
self.peers.update(poll.registry(), height, max_height, hash);
|
||||
ui_timer = Instant::now();
|
||||
}
|
||||
}
|
||||
@@ -561,15 +565,27 @@ impl Network {
|
||||
peer.set_received_block(block.index);
|
||||
|
||||
let mut context = self.context.lock().unwrap();
|
||||
let max_height = context.chain.max_height();
|
||||
let max_height = context.chain.get_max_height();
|
||||
match context.chain.check_new_block(&block) {
|
||||
BlockQuality::Good => {
|
||||
let mut next_index = block.index + 1;
|
||||
context.chain.add_block(block);
|
||||
// If we have some consequent blocks in a bucket of 'future blocks', we add them
|
||||
while let Some(block) = self.blocks.remove(&next_index) {
|
||||
if context.chain.check_new_block(&block) == BlockQuality::Good {
|
||||
info!("Added block {} from future blocks", next_index);
|
||||
context.chain.add_block(block);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
next_index += 1;
|
||||
}
|
||||
let my_height = context.chain.get_height();
|
||||
post(crate::event::Event::BlockchainChanged { index: my_height });
|
||||
// If it was the last block to sync
|
||||
if my_height == max_height {
|
||||
post(crate::event::Event::SyncFinished);
|
||||
self.blocks.clear();
|
||||
} else {
|
||||
let event = crate::event::Event::Syncing { have: my_height, height: max(max_height, my_height) };
|
||||
post(event);
|
||||
@@ -577,21 +593,21 @@ impl Network {
|
||||
let domains = context.chain.get_domains_count();
|
||||
let keys = context.chain.get_users_count();
|
||||
post(crate::event::Event::NetworkStatus { blocks: my_height, domains, keys, nodes: peers_count });
|
||||
// To load blocks from different nodes we randomize requests of new blocks
|
||||
// TODO rethink this approach
|
||||
if max_height > my_height && random::<u8>() < 200 {
|
||||
return State::message(Message::GetBlock { index: my_height + 1 });
|
||||
}
|
||||
}
|
||||
BlockQuality::Twin => { debug!("Ignoring duplicate block {}", block.index); }
|
||||
BlockQuality::Future => { debug!("Ignoring future block {}", block.index); }
|
||||
BlockQuality::Future => {
|
||||
debug!("Got future block {}", block.index);
|
||||
self.blocks.insert(block.index, block);
|
||||
}
|
||||
BlockQuality::Bad => {
|
||||
// TODO save bad public keys to banned table
|
||||
debug!("Ignoring bad block from {}:\n{:?}", peer.get_addr(), &block);
|
||||
let height = context.chain.get_height();
|
||||
context.chain.update_max_height(height);
|
||||
post(crate::event::Event::SyncFinished);
|
||||
return State::Banned;
|
||||
if height + 1 == block.index {
|
||||
context.chain.update_max_height(height);
|
||||
post(crate::event::Event::SyncFinished);
|
||||
return State::Banned;
|
||||
}
|
||||
}
|
||||
BlockQuality::Rewind => {
|
||||
debug!("Got some orphan block, requesting its parent");
|
||||
|
||||
+40
-13
@@ -13,6 +13,7 @@ use crate::{Bytes, commons};
|
||||
use crate::commons::*;
|
||||
use crate::p2p::{Message, Peer, State};
|
||||
use std::io;
|
||||
use std::cmp::min;
|
||||
|
||||
const PING_PERIOD: u64 = 30;
|
||||
|
||||
@@ -245,7 +246,7 @@ impl Peers {
|
||||
false
|
||||
}
|
||||
|
||||
pub fn update(&mut self, registry: &Registry, height: u64, hash: Bytes) {
|
||||
pub fn update(&mut self, registry: &Registry, height: u64, max_height: u64, hash: Bytes) {
|
||||
let nodes = self.get_peers_active_count();
|
||||
|
||||
let random_time = random::<u64>() % PING_PERIOD;
|
||||
@@ -270,18 +271,10 @@ impl Peers {
|
||||
}
|
||||
|
||||
// If someone has more blocks we sync
|
||||
{
|
||||
let mut rng = rand::thread_rng();
|
||||
match self.peers
|
||||
.iter_mut()
|
||||
.filter_map(|(token, peer)| if peer.has_more_blocks(height) { Some((token, peer)) } else { None })
|
||||
.choose(&mut rng) {
|
||||
None => {}
|
||||
Some((token, peer)) => {
|
||||
debug!("Peer {} is higher than we are, requesting block {}", &peer.get_addr().ip(), height + 1);
|
||||
registry.reregister(peer.get_stream(), token.clone(), Interest::WRITABLE).unwrap();
|
||||
peer.set_state(State::message(Message::GetBlock { index: height + 1 }));
|
||||
}
|
||||
if nodes >= MIN_CONNECTED_NODES_START_SYNC {
|
||||
if height < max_height {
|
||||
let count = min(max_height - height, (nodes - 5) as u64);
|
||||
self.ask_blocks_from_peers(registry, height, height + count);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -331,6 +324,40 @@ impl Peers {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn ask_block_from_peer(&mut self, registry: &Registry, height: u64) {
|
||||
let mut rng = rand::thread_rng();
|
||||
match self.peers
|
||||
.iter_mut()
|
||||
.filter_map(|(token, peer)| if peer.has_more_blocks(height) { Some((token, peer)) } else { None })
|
||||
.choose(&mut rng) {
|
||||
None => {}
|
||||
Some((token, peer)) => {
|
||||
debug!("Peer {} is higher than we are, requesting block {}", &peer.get_addr().ip(), height + 1);
|
||||
registry.reregister(peer.get_stream(), token.clone(), Interest::WRITABLE).unwrap();
|
||||
peer.set_state(State::message(Message::GetBlock { index: height + 1 }));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn ask_blocks_from_peers(&mut self, registry: &Registry, height: u64, max_height: u64) {
|
||||
let mut rng = rand::thread_rng();
|
||||
let peers = self.peers
|
||||
.iter_mut()
|
||||
.filter_map(|(token, peer)| if peer.has_more_blocks(height) { Some((token, peer)) } else { None })
|
||||
.choose_multiple(&mut rng, (max_height - height) as usize);
|
||||
let mut index = height + 1;
|
||||
for (token, peer) in peers {
|
||||
debug!("Peer {} is higher than we are, requesting block {}", &peer.get_addr().ip(), index);
|
||||
registry.reregister(peer.get_stream(), token.clone(), Interest::WRITABLE).unwrap();
|
||||
peer.set_state(State::message(Message::GetBlock { index }));
|
||||
index += 1;
|
||||
if index > max_height {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn connect_new_peers(&mut self, registry: &Registry, unique_token: &mut Token, yggdrasil_only: bool) {
|
||||
if self.new_peers.is_empty() {
|
||||
return;
|
||||
|
||||
Reference in New Issue
Block a user