Some optimization for P2P part. Added panic on error of reregistering server token - needed for investigation of stuck network.

This commit is contained in:
Revertron
2021-04-01 14:44:37 +02:00
parent d70b107955
commit 8d7f1b2c6b
3 changed files with 42 additions and 54 deletions
+8 -3
View File
@@ -2,7 +2,7 @@ extern crate serde;
extern crate serde_json;
use std::{io, thread};
use std::io::{Read, Write};
use std::io::{Read, Write, Error};
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::{Duration, Instant};
@@ -111,7 +111,12 @@ impl Network {
}
Err(_) => {}
}
let _ = poll.registry().reregister(&mut server, SERVER, Interest::READABLE);
match poll.registry().reregister(&mut server, SERVER, Interest::READABLE) {
Ok(_) => {}
Err(e) => {
panic!("Error reregistering server token!\n{}", e);
}
}
}
token => {
if !handle_connection_event(Arc::clone(&context), &mut peers, &poll.registry(), &event) {
@@ -449,7 +454,7 @@ fn mine_locker_block(context: Arc<Mutex<Context>>) {
if let Some(block) = context.chain.last_block() {
if let Some(keystore) = &context.keystore {
if block.index < context.chain.max_height() {
info!("No locker mining while syncing");
trace!("No locker mining while syncing");
return;
}
let lockers: HashSet<Bytes> = context.chain.get_block_lockers(&block).into_iter().collect();
+1 -1
View File
@@ -109,7 +109,7 @@ impl Peer {
}
pub fn disabled(&self) -> bool {
self.state.disabled()
self.state.disabled() || self.reconnects > 2
}
pub fn is_inbound(&self) -> bool {
+33 -50
View File
@@ -44,7 +44,7 @@ impl Peers {
let stream = peer.get_stream();
let _ = stream.shutdown(Shutdown::Both);
let _ = registry.deregister(stream);
info!("Peer connection {:?} has shut down", &peer.get_addr());
info!("Peer connection {} to {:?} has shut down", &token.0, &peer.get_addr());
if !peer.disabled() && !peer.is_inbound() {
peer.set_state(State::offline());
@@ -121,6 +121,9 @@ impl Peers {
pub fn get_peers_for_exchange(&self, peer_address: &SocketAddr) -> Vec<String> {
let mut result: Vec<String> = Vec::new();
for (_, peer) in self.peers.iter() {
if peer.disabled() {
continue;
}
if peer.equals(peer_address) {
continue;
}
@@ -236,18 +239,15 @@ impl Peers {
for (token, peer) in self.peers.iter_mut() {
if peer.get_state().need_reconnect() {
let addr = peer.get_addr();
match TcpStream::connect(addr.clone()) {
Ok(mut stream) => {
info!("Created connection to peer {}", &addr);
registry.register(&mut stream, token.clone(), Interest::WRITABLE).unwrap();
peer.set_state(State::Connecting);
peer.inc_reconnects();
peer.set_stream(stream);
}
Err(e) => {
error!("Error connecting to peer {}: {}", &addr, e);
}
if let Ok(mut stream) = TcpStream::connect(addr.clone()) {
info!("Created connection to peer {}", &addr);
registry.register(&mut stream, token.clone(), Interest::WRITABLE).unwrap();
peer.set_state(State::Connecting);
peer.inc_reconnects();
peer.set_stream(stream);
}
// We make reconnects only to one at a time
break;
}
}
}
@@ -256,27 +256,8 @@ impl Peers {
if self.new_peers.is_empty() {
return;
}
for addr in self.new_peers.iter() {
if self.ignored.contains(&addr.ip()) {
continue;
}
if yggdrasil_only && !is_yggdrasil(&addr.ip()) {
info!("Ignoring not Yggdrasil address '{}'", &addr.ip());
continue;
}
match TcpStream::connect(addr.clone()) {
Ok(mut stream) => {
info!("Created connection to peer {}", &addr);
let token = next(unique_token);
registry.register(&mut stream, token, Interest::WRITABLE).unwrap();
let mut peer = Peer::new(addr.clone(), stream, State::Connecting, false);
peer.set_public(true);
self.peers.insert(token, peer);
}
Err(e) => {
error!("Error connecting to peer {}: {}", &addr, e);
}
}
for addr in &self.new_peers.clone() {
self.connect_peer(&addr, registry, unique_token, yggdrasil_only);
}
self.new_peers.clear();
}
@@ -290,26 +271,28 @@ impl Peers {
};
for addr in addresses {
if yggdrasil_only && !is_yggdrasil(&addr.ip()) {
info!("Ignoring not Yggdrasil address '{}'", &addr.ip());
continue;
}
match TcpStream::connect(addr.clone()) {
Ok(mut stream) => {
info!("Created connection to peer {}", &addr);
let token = next(unique_token);
registry.register(&mut stream, token, Interest::WRITABLE).unwrap();
let mut peer = Peer::new(addr, stream, State::Connecting, false);
peer.set_public(true);
self.add_peer(token, peer);
}
Err(e) => {
error!("Error connecting to peer {}: {}", &addr, e);
}
}
self.connect_peer(&addr, registry, unique_token, yggdrasil_only);
}
}
}
fn connect_peer(&mut self, addr: &SocketAddr, registry: &Registry, unique_token: &mut Token, yggdrasil_only: bool) {
if self.ignored.contains(&addr.ip()) {
return;
}
if yggdrasil_only && !is_yggdrasil(&addr.ip()) {
info!("Ignoring not Yggdrasil address '{}'", &addr.ip());
return;
}
if let Ok(mut stream) = TcpStream::connect(addr.clone()) {
let token = next(unique_token);
info!("Created connection {} to peer {}", &token.0, &addr);
registry.register(&mut stream, token, Interest::WRITABLE).unwrap();
let mut peer = Peer::new(addr.clone(), stream, State::Connecting, false);
peer.set_public(true);
self.peers.insert(token, peer);
}
}
}
fn skip_addr(addr: &SocketAddr) -> bool {