From 8d7f1b2c6b9b9b742e8874ba3724b1a234a0a6c7 Mon Sep 17 00:00:00 2001 From: Revertron Date: Thu, 1 Apr 2021 14:44:37 +0200 Subject: [PATCH] Some optimization for P2P part. Added panic on error of reregistering server token - needed for investigation of stuck network. --- src/p2p/network.rs | 11 ++++-- src/p2p/peer.rs | 2 +- src/p2p/peers.rs | 83 ++++++++++++++++++---------------------------- 3 files changed, 42 insertions(+), 54 deletions(-) diff --git a/src/p2p/network.rs b/src/p2p/network.rs index 43b0d88..d198c63 100644 --- a/src/p2p/network.rs +++ b/src/p2p/network.rs @@ -2,7 +2,7 @@ extern crate serde; extern crate serde_json; use std::{io, thread}; -use std::io::{Read, Write}; +use std::io::{Read, Write, Error}; use std::sync::{Arc, Mutex, MutexGuard}; use std::time::{Duration, Instant}; @@ -111,7 +111,12 @@ impl Network { } Err(_) => {} } - let _ = poll.registry().reregister(&mut server, SERVER, Interest::READABLE); + match poll.registry().reregister(&mut server, SERVER, Interest::READABLE) { + Ok(_) => {} + Err(e) => { + panic!("Error reregistering server token!\n{}", e); + } + } } token => { if !handle_connection_event(Arc::clone(&context), &mut peers, &poll.registry(), &event) { @@ -449,7 +454,7 @@ fn mine_locker_block(context: Arc>) { if let Some(block) = context.chain.last_block() { if let Some(keystore) = &context.keystore { if block.index < context.chain.max_height() { - info!("No locker mining while syncing"); + trace!("No locker mining while syncing"); return; } let lockers: HashSet = context.chain.get_block_lockers(&block).into_iter().collect(); diff --git a/src/p2p/peer.rs b/src/p2p/peer.rs index 8e00ab2..ebf0d67 100644 --- a/src/p2p/peer.rs +++ b/src/p2p/peer.rs @@ -109,7 +109,7 @@ impl Peer { } pub fn disabled(&self) -> bool { - self.state.disabled() + self.state.disabled() || self.reconnects > 2 } pub fn is_inbound(&self) -> bool { diff --git a/src/p2p/peers.rs b/src/p2p/peers.rs index 0958b07..ccb419c 100644 --- a/src/p2p/peers.rs +++ b/src/p2p/peers.rs @@ -44,7 +44,7 @@ impl Peers { let stream = peer.get_stream(); let _ = stream.shutdown(Shutdown::Both); let _ = registry.deregister(stream); - info!("Peer connection {:?} has shut down", &peer.get_addr()); + info!("Peer connection {} to {:?} has shut down", &token.0, &peer.get_addr()); if !peer.disabled() && !peer.is_inbound() { peer.set_state(State::offline()); @@ -121,6 +121,9 @@ impl Peers { pub fn get_peers_for_exchange(&self, peer_address: &SocketAddr) -> Vec { let mut result: Vec = Vec::new(); for (_, peer) in self.peers.iter() { + if peer.disabled() { + continue; + } if peer.equals(peer_address) { continue; } @@ -236,18 +239,15 @@ impl Peers { for (token, peer) in self.peers.iter_mut() { if peer.get_state().need_reconnect() { let addr = peer.get_addr(); - match TcpStream::connect(addr.clone()) { - Ok(mut stream) => { - info!("Created connection to peer {}", &addr); - registry.register(&mut stream, token.clone(), Interest::WRITABLE).unwrap(); - peer.set_state(State::Connecting); - peer.inc_reconnects(); - peer.set_stream(stream); - } - Err(e) => { - error!("Error connecting to peer {}: {}", &addr, e); - } + if let Ok(mut stream) = TcpStream::connect(addr.clone()) { + info!("Created connection to peer {}", &addr); + registry.register(&mut stream, token.clone(), Interest::WRITABLE).unwrap(); + peer.set_state(State::Connecting); + peer.inc_reconnects(); + peer.set_stream(stream); } + // We make reconnects only to one at a time + break; } } } @@ -256,27 +256,8 @@ impl Peers { if self.new_peers.is_empty() { return; } - for addr in self.new_peers.iter() { - if self.ignored.contains(&addr.ip()) { - continue; - } - if yggdrasil_only && !is_yggdrasil(&addr.ip()) { - info!("Ignoring not Yggdrasil address '{}'", &addr.ip()); - continue; - } - match TcpStream::connect(addr.clone()) { - Ok(mut stream) => { - info!("Created connection to peer {}", &addr); - let token = next(unique_token); - registry.register(&mut stream, token, Interest::WRITABLE).unwrap(); - let mut peer = Peer::new(addr.clone(), stream, State::Connecting, false); - peer.set_public(true); - self.peers.insert(token, peer); - } - Err(e) => { - error!("Error connecting to peer {}: {}", &addr, e); - } - } + for addr in &self.new_peers.clone() { + self.connect_peer(&addr, registry, unique_token, yggdrasil_only); } self.new_peers.clear(); } @@ -290,26 +271,28 @@ impl Peers { }; for addr in addresses { - if yggdrasil_only && !is_yggdrasil(&addr.ip()) { - info!("Ignoring not Yggdrasil address '{}'", &addr.ip()); - continue; - } - match TcpStream::connect(addr.clone()) { - Ok(mut stream) => { - info!("Created connection to peer {}", &addr); - let token = next(unique_token); - registry.register(&mut stream, token, Interest::WRITABLE).unwrap(); - let mut peer = Peer::new(addr, stream, State::Connecting, false); - peer.set_public(true); - self.add_peer(token, peer); - } - Err(e) => { - error!("Error connecting to peer {}: {}", &addr, e); - } - } + self.connect_peer(&addr, registry, unique_token, yggdrasil_only); } } } + + fn connect_peer(&mut self, addr: &SocketAddr, registry: &Registry, unique_token: &mut Token, yggdrasil_only: bool) { + if self.ignored.contains(&addr.ip()) { + return; + } + if yggdrasil_only && !is_yggdrasil(&addr.ip()) { + info!("Ignoring not Yggdrasil address '{}'", &addr.ip()); + return; + } + if let Ok(mut stream) = TcpStream::connect(addr.clone()) { + let token = next(unique_token); + info!("Created connection {} to peer {}", &token.0, &addr); + registry.register(&mut stream, token, Interest::WRITABLE).unwrap(); + let mut peer = Peer::new(addr.clone(), stream, State::Connecting, false); + peer.set_public(true); + self.peers.insert(token, peer); + } + } } fn skip_addr(addr: &SocketAddr) -> bool {