diff --git a/Cargo.toml b/Cargo.toml index 13e1883..b969978 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "alfis" -version = "0.4.4" +version = "0.4.5" authors = ["Revertron "] edition = "2018" build = "build.rs" diff --git a/src/blockchain/chain.rs b/src/blockchain/chain.rs index 8836d4a..27d40e9 100644 --- a/src/blockchain/chain.rs +++ b/src/blockchain/chain.rs @@ -563,7 +563,7 @@ impl Chain { return z.difficulty; } } - u32::max_value() + u32::MAX } pub fn last_block(&self) -> Option { @@ -816,16 +816,16 @@ impl Chain { return zone.difficulty; } } - u32::max_value() + u32::MAX } Err(_) => { warn!("Error parsing DomainData from {:?}", transaction); - u32::max_value() + u32::MAX } } } "zone" => { ZONE_DIFFICULTY } - _ => { u32::max_value() } + _ => { u32::MAX } } } diff --git a/src/p2p/network.rs b/src/p2p/network.rs index ba87d12..edb2ddb 100644 --- a/src/p2p/network.rs +++ b/src/p2p/network.rs @@ -18,9 +18,10 @@ use crate::{Context, Block, p2p::Message, p2p::State, p2p::Peer, p2p::Peers, is_ use crate::blockchain::types::BlockQuality; use crate::commons::CHAIN_VERSION; use std::sync::atomic::{AtomicBool, Ordering}; +use rand::random; const SERVER: Token = Token(0); -const POLL_TIMEOUT: Option = Some(Duration::from_millis(3000)); +const POLL_TIMEOUT: Option = Some(Duration::from_millis(1000)); pub const LISTEN_PORT: u16 = 4244; const MAX_PACKET_SIZE: usize = 1 * 1024 * 1024; // 1 Mb const MAX_READ_BLOCK_TIME: u128 = 500; @@ -60,10 +61,16 @@ impl Network { // States of peer connections, and some data to send when sockets become writable let mut peers = Peers::new(); // Starting peer connections to bootstrap nodes - peers.connect_peers(peers_addrs, &poll.registry(), &mut unique_token, yggdrasil_only); + peers.connect_peers(&peers_addrs, &poll.registry(), &mut unique_token, yggdrasil_only); let mut peers_timer = Instant::now(); + let mut bootstrap_timer = Instant::now(); loop { + if peers.get_peers_count() == 0 && bootstrap_timer.elapsed().as_secs() > 60 { + // Starting peer connections to bootstrap nodes + peers.connect_peers(&peers_addrs, &poll.registry(), &mut unique_token, yggdrasil_only); + bootstrap_timer = Instant::now(); + } // Poll Mio for events, blocking until we get an event. poll.poll(&mut events, POLL_TIMEOUT).expect("Error polling sockets"); if !running.load(Ordering::SeqCst) { @@ -131,7 +138,7 @@ impl Network { } events.clear(); - if peers_timer.elapsed().as_millis() > 500 { + if peers_timer.elapsed().as_millis() > 250 { // Send pings to idle peers let (height, hash) = { let mut context = context.lock().unwrap(); @@ -142,7 +149,7 @@ impl Network { } (height, context.chain.last_hash()) }; - peers.send_pings(poll.registry(), height, hash); + peers.update(poll.registry(), height, hash); peers.connect_new_peers(poll.registry(), &mut unique_token, yggdrasil_only); peers_timer = Instant::now(); } @@ -376,13 +383,11 @@ fn handle_message(context: Arc>, message: Message, peers: &mut Pe peer.set_active(true); peer.reset_reconnects(); let mut context = context.lock().unwrap(); - let blocks_count = context.chain.height(); - context.bus.post(crate::event::Event::NetworkStatus { nodes: active_count + 1, blocks: blocks_count }); if peer.is_higher(my_height) { context.chain.update_max_height(height); context.bus.post(crate::event::Event::Syncing { have: my_height, height}); if active_count > 3 { - State::message(Message::GetBlock { index: my_height + 1 }) + State::idle() } else { State::message(Message::GetPeers) } @@ -398,10 +403,12 @@ fn handle_message(context: Arc>, message: Message, peers: &mut Pe let peer = peers.get_mut_peer(token).unwrap(); peer.set_height(height); peer.set_active(true); - if peer.is_higher(my_height) || ( height == my_height && my_hash != hash) { + if peer.is_higher(my_height) { let mut context = context.lock().unwrap(); context.chain.update_max_height(height); - State::message(Message::GetBlock { index: my_height + 1 }) + } + if hash != my_hash { + State::message(Message::GetBlock { index: my_height }) } else { State::message(Message::pong(my_height, my_hash)) } @@ -410,19 +417,19 @@ fn handle_message(context: Arc>, message: Message, peers: &mut Pe let peer = peers.get_mut_peer(token).unwrap(); peer.set_height(height); peer.set_active(true); - let is_higher = peer.is_higher(my_height); - - let mut context = context.lock().unwrap(); - let blocks_count = context.chain.height(); - context.bus.post(crate::event::Event::NetworkStatus { nodes: peers.get_peers_active_count(), blocks: blocks_count }); - - if is_higher { + if peer.is_higher(my_height) { + let mut context = context.lock().unwrap(); context.chain.update_max_height(height); - State::message(Message::GetBlock { index: my_height + 1 }) - } else if my_hash != hash { + } + if hash != my_hash { State::message(Message::GetBlock { index: my_height }) } else { - State::idle() + if random::() < 10 { + debug!("Requesting more peers"); + State::message(Message::GetPeers) + } else { + State::idle() + } } } Message::GetPeers => { diff --git a/src/p2p/peers.rs b/src/p2p/peers.rs index 1d2d3cd..79e178f 100644 --- a/src/p2p/peers.rs +++ b/src/p2p/peers.rs @@ -11,19 +11,29 @@ use rand::seq::IteratorRandom; use log::{trace, debug, info, warn, error}; use crate::{Bytes, is_yggdrasil, commons}; use crate::commons::MAX_RECONNECTS; +use chrono::Utc; pub struct Peers { peers: HashMap, new_peers: Vec, ignored: HashSet, - my_id: String + my_id: String, + asked_block: u64, + asked_time: i64, } const PING_PERIOD: u64 = 60; impl Peers { pub fn new() -> Self { - Peers { peers: HashMap::new(), new_peers: Vec::new(), ignored: HashSet::new(), my_id: commons::random_string(6) } + Peers { + peers: HashMap::new(), + new_peers: Vec::new(), + ignored: HashSet::new(), + my_id: commons::random_string(6), + asked_block: 0, + asked_time: 0 + } } pub fn add_peer(&mut self, token: Token, peer: Peer) { @@ -60,18 +70,14 @@ impl Peers { } State::Banned => { trace!("Peer connection {} to {:?} has shut down, banned", &token.0, &peer.get_addr()); + self.ignored.insert(peer.get_addr().ip().clone()); } State::Offline { .. } => { trace!("Peer connection {} to {:?} is offline", &token.0, &peer.get_addr()); } } - if !peer.disabled() && !peer.is_inbound() { - peer.set_state(State::offline()); - peer.set_active(false); - } else { - self.peers.remove(token); - } + self.peers.remove(token); } None => {} } @@ -84,6 +90,7 @@ impl Peers { peers.insert(peer.to_owned()); peers }); + debug!("Got {} peers", peers.len()); //debug!("Got {} peers: {:?}", peers.len(), &peers); // TODO make it return error if these peers are wrong and seem like an attack for peer in peers.iter() { @@ -112,7 +119,7 @@ impl Peers { } if self.ignored.contains(&addr.ip()) { - trace!("Skipping address from exchange: {}", &addr); + info!("Skipping ignored address from exchange: {}", &addr); continue; } @@ -120,16 +127,6 @@ impl Peers { //debug!("Skipping address from exchange: {}", &addr); continue; // Return error in future } - let mut found = false; - for (_token, p) in self.peers.iter() { - if p.equals(&addr) { - found = true; - break; - } - } - if found { - continue; - } self.new_peers.push(addr); } } @@ -151,13 +148,20 @@ impl Peers { if peer.equals(peer_address) { continue; } - if peer.is_public() { + if peer.is_public() && peer.active() { result.push(SocketAddr::new(peer.get_addr().ip(), LISTEN_PORT).to_string()); } + if result.len() >= 10 { + break; + } } result } + pub fn get_peers_count(&self) -> usize { + self.peers.len() + } + pub fn get_peers_active_count(&self) -> usize { let mut count = 0; for (_, peer) in self.peers.iter() { @@ -187,6 +191,7 @@ impl Peers { } pub fn ignore_ip(&mut self, ip: &IpAddr) { + info!("Adding {} to ignored peers", &ip); self.ignored.insert(ip.clone()); } @@ -199,7 +204,7 @@ impl Peers { false } - pub fn send_pings(&mut self, registry: &Registry, height: u64, hash: Bytes) { + pub fn update(&mut self, registry: &Registry, height: u64, hash: Bytes) { let mut ping_sent = false; for (token, peer) in self.peers.iter_mut() { match peer.get_state() { @@ -225,20 +230,25 @@ impl Peers { } // If someone has more blocks we sync - if !ping_sent { + if self.need_ask_block(height + 1) { let mut rng = rand::thread_rng(); + let mut asked = false; match self.peers .iter_mut() .filter_map(|(token, peer)| if peer.has_more_blocks(height) { Some((token, peer)) } else { None }) .choose(&mut rng) { None => {} Some((token, peer)) => { - debug!("Found some peer higher than we are, sending block request"); + debug!("Found some peer higher than we are, requesting block {}", height + 1); registry.reregister(peer.get_stream(), token.clone(), Interest::WRITABLE).unwrap(); peer.set_state(State::message(Message::GetBlock { index: height + 1 })); ping_sent = true; + asked = true; } } + if asked { + self.set_asked_block(height + 1); + } } // If someone has less blocks (we mined a new block) we send a ping with our height @@ -274,7 +284,7 @@ impl Peers { if peer.get_state().need_reconnect() { let addr = peer.get_addr(); if let Ok(mut stream) = TcpStream::connect(addr.clone()) { - trace!("Trying to connect to peer {}", &addr); + debug!("Trying to reconnect to peer {}, count {}", &addr, peer.reconnects()); registry.register(&mut stream, token.clone(), Interest::WRITABLE).unwrap(); peer.set_state(State::Connecting); peer.inc_reconnects(); @@ -296,7 +306,7 @@ impl Peers { } /// Connecting to configured (bootstrap) peers - pub fn connect_peers(&mut self, peers_addrs: Vec, registry: &Registry, unique_token: &mut Token, yggdrasil_only: bool) { + pub fn connect_peers(&mut self, peers_addrs: &Vec, registry: &Registry, unique_token: &mut Token, yggdrasil_only: bool) { let mut set = HashSet::new(); for peer in peers_addrs.iter() { let mut addresses: Vec = match peer.to_socket_addrs() { @@ -332,6 +342,16 @@ impl Peers { self.peers.insert(token, peer); } } + + + pub fn set_asked_block(&mut self, index: u64) { + self.asked_block = index; + self.asked_time = Utc::now().timestamp(); + } + + pub fn need_ask_block(&self, index: u64) -> bool { + index > self.asked_block || self.asked_time + 3 < Utc::now().timestamp() + } } fn skip_private_addr(addr: &SocketAddr) -> bool {