From 864edab20339c95d7669af0ca028432585a845eb Mon Sep 17 00:00:00 2001 From: Revertron Date: Wed, 21 Apr 2021 23:11:10 +0200 Subject: [PATCH] Disabled full blockchain check on start. Fixed non-working debug version. Fixed p2p connections handling. Lowered ping interval to 30-60 seconds. Fixed stuck with lower number of blocks. --- src/blockchain/chain.rs | 8 ++-- src/p2p/network.rs | 49 ++++++++++++++--------- src/p2p/peers.rs | 89 +++++++++++++++++++++-------------------- 3 files changed, 80 insertions(+), 66 deletions(-) diff --git a/src/blockchain/chain.rs b/src/blockchain/chain.rs index ffa6131..00173fa 100644 --- a/src/blockchain/chain.rs +++ b/src/blockchain/chain.rs @@ -91,9 +91,11 @@ impl Chain { self.last_full_block = self.get_last_full_block(u64::MAX, None); } } - self.check_chain(); + // TODO Add env-var and commandline switches for full check + //self.check_chain(); } + #[allow(dead_code)] fn check_chain(&mut self) { let height = self.get_height(); info!("Local blockchain height is {}, starting full blockchain check...", height); @@ -693,7 +695,7 @@ impl Chain { } } - pub fn last_hash(&self) -> Bytes { + pub fn get_last_hash(&self) -> Bytes { match &self.last_block { None => { Bytes::default() } Some(block) => { block.hash.clone() } @@ -988,7 +990,7 @@ impl Chain { let mut count = 1; let window = block.index - 1; // Without the last block while set.len() < BLOCK_SIGNERS_ALL as usize { - let index = ((tail * count) % window) + 1; // We want it to start from 1 + let index = (tail.wrapping_mul(count) % window) + 1; // We want it to start from 1 if let Some(b) = self.get_block(index) { if b.pub_key != block.pub_key && !set.contains(&b.pub_key) { result.push(b.pub_key.clone()); diff --git a/src/p2p/network.rs b/src/p2p/network.rs index 964e265..9f624f8 100644 --- a/src/p2p/network.rs +++ b/src/p2p/network.rs @@ -21,7 +21,7 @@ use crate::blockchain::types::BlockQuality; use crate::commons::*; const SERVER: Token = Token(0); -const POLL_TIMEOUT: Option = Some(Duration::from_millis(1000)); +const POLL_TIMEOUT: Option = Some(Duration::from_millis(500)); const MAX_PACKET_SIZE: usize = 1 * 1024 * 1024; // 1 Mb const MAX_READ_BLOCK_TIME: u128 = 500; @@ -165,7 +165,7 @@ impl Network { } log_timer = Instant::now(); } - (height, context.chain.last_hash()) + (height, context.chain.get_last_hash()) }; peers.update(poll.registry(), height, hash); peers.connect_new_peers(poll.registry(), &mut unique_token, yggdrasil_only); @@ -295,7 +295,7 @@ fn handle_connection_event(context: Arc>, peers: &mut Peers, regi if from.elapsed().as_secs() >= 30 { let data: String = { let c = context.lock().unwrap(); - let message = Message::ping(c.chain.get_height(), c.chain.last_hash()); + let message = Message::ping(c.chain.get_height(), c.chain.get_last_hash()); serde_json::to_string(&message).unwrap() }; send_message(peer.get_stream(), &data.into_bytes()).unwrap_or_else(|e| warn!("Error sending ping {}", e)); @@ -371,7 +371,7 @@ fn handle_message(context: Arc>, message: Message, peers: &mut Pe let (my_height, my_hash, my_origin, my_version) = { let context = context.lock().unwrap(); // TODO cache it somewhere - (context.chain.get_height(), context.chain.last_hash(), &context.settings.origin.clone(), CHAIN_VERSION) + (context.chain.get_height(), context.chain.get_last_hash(), &context.settings.origin.clone(), CHAIN_VERSION) }; let answer = match message { Message::Hand { app_version, origin, version, public, rand} => { @@ -382,6 +382,7 @@ fn handle_message(context: Arc>, message: Message, peers: &mut Pe if origin.eq(my_origin) && version == my_version { let peer = peers.get_mut_peer(token).unwrap(); peer.set_public(public); + peer.set_active(true); debug!("Hello from v{} on {}", &app_version, peer.get_addr().ip()); State::message(Message::shake(&origin, version, true, my_height)) } else { @@ -404,13 +405,12 @@ fn handle_message(context: Arc>, message: Message, peers: &mut Pe if peer.is_higher(my_height) { context.chain.update_max_height(height); context.bus.post(crate::event::Event::Syncing { have: my_height, height}); - if active_count > 3 { - State::idle() - } else { - State::message(Message::GetPeers) - } - } else { + } + if active_count < 5 || random::() < 10 { + debug!("Requesting more peers from {}", peer.get_addr().ip()); State::message(Message::GetPeers) + } else { + State::idle() } } else { State::Banned @@ -425,15 +425,16 @@ fn handle_message(context: Arc>, message: Message, peers: &mut Pe let mut context = context.lock().unwrap(); context.chain.update_max_height(height); } - if hash.ne(&my_hash) { - debug!("1Hashes are different, requesting block {} from {}", my_height, peer.get_addr().ip()); - debug!("My hash: {:?}, their hash: {:?}", &my_hash, &hash); + if my_height == height && hash.ne(&my_hash) { + info!("Hashes are different, requesting block {} from {}", my_height, peer.get_addr().ip()); + info!("My hash: {:?}, their hash: {:?}", &my_hash, &hash); State::message(Message::GetBlock { index: my_height }) } else { State::message(Message::pong(my_height, my_hash)) } } Message::Pong { height, hash } => { + let active_count = peers.get_peers_active_count(); let peer = peers.get_mut_peer(token).unwrap(); peer.set_height(height); peer.set_active(true); @@ -441,12 +442,12 @@ fn handle_message(context: Arc>, message: Message, peers: &mut Pe let mut context = context.lock().unwrap(); context.chain.update_max_height(height); } - if hash.ne(&my_hash) { - debug!("2Hashes are different, requesting block {} from {}", my_height, peer.get_addr().ip()); - debug!("My hash: {:?}, their hash: {:?}", &my_hash, &hash); + if my_height == height && hash.ne(&my_hash) { + info!("Hashes are different, requesting block {} from {}", my_height, peer.get_addr().ip()); + info!("My hash: {:?}, their hash: {:?}", &my_hash, &hash); State::message(Message::GetBlock { index: my_height }) } else { - if random::() < 10 { + if active_count < 5 || random::() < 10 { debug!("Requesting more peers from {}", peer.get_addr().ip()); State::message(Message::GetPeers) } else { @@ -455,14 +456,22 @@ fn handle_message(context: Arc>, message: Message, peers: &mut Pe } } Message::GetPeers => { - let peer = peers.get_peer(token).unwrap(); - State::message(Message::Peers { peers: peers.get_peers_for_exchange(&peer.get_addr()) }) + let addr = { + let peer = peers.get_mut_peer(token).unwrap(); + peer.set_active(true); + peer.get_addr().clone() + }; + State::message(Message::Peers { peers: peers.get_peers_for_exchange(&addr) }) } Message::Peers { peers: new_peers } => { + let peer = peers.get_mut_peer(token).unwrap(); + peer.set_active(true); peers.add_peers_from_exchange(new_peers); State::idle() } Message::GetBlock { index } => { + let peer = peers.get_mut_peer(token).unwrap(); + peer.set_active(true); let context = context.lock().unwrap(); match context.chain.get_block(index) { Some(block) => State::message(Message::block(block.index, serde_json::to_string(&block).unwrap())), @@ -470,6 +479,8 @@ fn handle_message(context: Arc>, message: Message, peers: &mut Pe } } Message::Block { index, block } => { + let peer = peers.get_mut_peer(token).unwrap(); + peer.set_active(true); info!("Received block {}", index); let block: Block = match serde_json::from_str(&block) { Ok(block) => block, diff --git a/src/p2p/peers.rs b/src/p2p/peers.rs index b049822..ae265ba 100644 --- a/src/p2p/peers.rs +++ b/src/p2p/peers.rs @@ -13,19 +13,16 @@ use crate::{Bytes, commons}; use crate::commons::*; use crate::p2p::{Message, Peer, State}; use crate::commons::next; -use std::time::Duration; use std::io; -const PING_PERIOD: u64 = 60; -const TCP_TIMEOUT: Duration = Duration::from_millis(10000); +const PING_PERIOD: u64 = 30; pub struct Peers { peers: HashMap, new_peers: Vec, ignored: HashSet, my_id: String, - asked_block: u64, - asked_time: i64, + block_asked_time: i64, } impl Peers { @@ -35,8 +32,7 @@ impl Peers { new_peers: Vec::new(), ignored: HashSet::new(), my_id: commons::random_string(6), - asked_block: 0, - asked_time: 0 + block_asked_time: 0 } } @@ -217,7 +213,6 @@ impl Peers { } pub fn update(&mut self, registry: &Registry, height: u64, hash: Bytes) { - let mut ping_sent = false; for (token, peer) in self.peers.iter_mut() { match peer.get_state() { State::Idle { from } => { @@ -225,7 +220,7 @@ impl Peers { if from.elapsed().as_secs() >= PING_PERIOD + random_time { // Sometimes we check for new peers instead of pinging let random: u8 = random(); - let message = if random < 16 { + let message = if random < 10 { Message::GetPeers } else { Message::ping(height, hash.clone()) @@ -234,7 +229,6 @@ impl Peers { peer.set_state(State::message(message)); let stream = peer.get_stream(); registry.reregister(stream, token.clone(), Interest::WRITABLE).unwrap(); - ping_sent = true; } } _ => {} @@ -242,7 +236,7 @@ impl Peers { } // If someone has more blocks we sync - if self.need_ask_block(height + 1) { + if self.need_ask_block() { let mut rng = rand::thread_rng(); let mut asked = false; match self.peers @@ -254,17 +248,16 @@ impl Peers { debug!("Found some peer higher than we are, requesting block {}, from {}", height + 1, &peer.get_addr().ip()); registry.reregister(peer.get_stream(), token.clone(), Interest::WRITABLE).unwrap(); peer.set_state(State::message(Message::GetBlock { index: height + 1 })); - ping_sent = true; asked = true; } } if asked { - self.set_asked_block(height + 1); + self.update_asked_block_time(); } } // If someone has less blocks (we mined a new block) we send a ping with our height - if !ping_sent { + { let mut rng = rand::thread_rng(); match self.peers .iter_mut() @@ -326,25 +319,36 @@ impl Peers { pub fn connect_peers(&mut self, peers_addrs: &Vec, registry: &Registry, unique_token: &mut Token, yggdrasil_only: bool) { let mut set = HashSet::new(); for peer in peers_addrs.iter() { + info!("Resolving address {}", peer); let mut addresses: Vec = match peer.to_socket_addrs() { Ok(peers) => { peers.collect() } Err(_) => { error!("Can't resolve address {}", &peer); continue; } }; + info!("Got addresses: {:?}", &addresses); - // At first we connect to one peer address from every "peer" or domain - let addr = addresses.remove(0); - if !set.contains(&addr) { - match self.connect_peer(&addr, registry, unique_token, yggdrasil_only) { - Ok(_) => { - set.insert(addr); - } - Err(_) => { - debug!("Could not connect to {}", &addr); + // At first we connect to 5 peer addresses + if set.len() >= 10 { + break; + } + + while addresses.len() > 0 { + let addr = addresses.remove(0); + if !set.contains(&addr) { + match self.connect_peer(&addr, registry, unique_token, yggdrasil_only) { + Ok(_) => { + set.insert(addr); + } + Err(_) => { + debug!("Could not connect to {}", &addr); + } } } } + // Copy others to new_peers, to connect later - self.new_peers.append(&mut addresses); + if addresses.len() > 0 { + self.new_peers.append(&mut addresses); + } } } @@ -356,31 +360,28 @@ impl Peers { debug!("Ignoring not Yggdrasil address '{}'", &addr.ip()); return Err(io::Error::from(io::ErrorKind::InvalidInput)); } - if let Ok(stream) = std::net::TcpStream::connect_timeout(&addr.clone(), TCP_TIMEOUT) { - stream.set_nodelay(true)?; - stream.set_read_timeout(Some(TCP_TIMEOUT))?; - stream.set_write_timeout(Some(TCP_TIMEOUT))?; - stream.set_nonblocking(true)?; - - let mut stream = TcpStream::from_std(stream); - let token = next(unique_token); - trace!("Created connection {}, to peer {}", &token.0, &addr); - registry.register(&mut stream, token, Interest::WRITABLE)?; - let mut peer = Peer::new(addr.clone(), stream, State::Connecting, false); - peer.set_public(true); - self.peers.insert(token, peer); + trace!("Connecting to peer {}", &addr); + match TcpStream::connect(addr.clone()) { + Ok(mut stream ) => { + //stream.set_nodelay(true)?; + let token = next(unique_token); + trace!("Created connection {}, to peer {}", &token.0, &addr); + registry.register(&mut stream, token, Interest::WRITABLE).unwrap(); + let mut peer = Peer::new(addr.clone(), stream, State::Connecting, false); + peer.set_public(true); + self.peers.insert(token, peer); + Ok(()) + } + Err(e) => { Err(e) } } - Ok(()) } - - pub fn set_asked_block(&mut self, index: u64) { - self.asked_block = index; - self.asked_time = Utc::now().timestamp(); + pub fn update_asked_block_time(&mut self) { + self.block_asked_time = Utc::now().timestamp(); } - pub fn need_ask_block(&self, index: u64) -> bool { - index > self.asked_block || self.asked_time + 3 < Utc::now().timestamp() + pub fn need_ask_block(&self) -> bool { + self.block_asked_time + 5 < Utc::now().timestamp() } }