Eliminated connection loops.

Optimized network processes.
Removed some unnecessary logging.
Fixed status bar info inconsistency.
This commit is contained in:
Revertron
2021-03-21 00:19:09 +01:00
parent 4497dc515b
commit dcf5bb72b0
13 changed files with 178 additions and 85 deletions
+46 -53
View File
@@ -13,11 +13,11 @@ use mio::net::{TcpListener, TcpStream};
#[allow(unused_imports)]
use log::{trace, debug, info, warn, error};
use std::net::{SocketAddr, IpAddr, SocketAddrV4, Shutdown};
use std::collections::HashSet;
use crate::{Context, Block, p2p::Message, p2p::State, p2p::Peer, p2p::Peers, Bytes};
use std::net::{SocketAddr, IpAddr, SocketAddrV4, ToSocketAddrs};
use crate::blockchain::enums::BlockQuality;
use crate::blockchain::CHAIN_VERSION;
use std::collections::HashSet;
const SERVER: Token = Token(0);
const POLL_TIMEOUT: Option<Duration> = Some(Duration::from_millis(3000));
@@ -57,7 +57,7 @@ impl Network {
// States of peer connections, and some data to send when sockets become writable
let mut peers = Peers::new();
// Starting peer connections to bootstrap nodes
connect_peers(peers_addrs, &mut poll, &mut peers, &mut unique_token);
peers.connect_peers(peers_addrs, &poll.registry(), &mut unique_token);
loop {
// Poll Mio for events, blocking until we get an event.
@@ -65,7 +65,7 @@ impl Network {
// Process each event.
for event in events.iter() {
trace!("Event for socket {} is {:?}", event.token().0, &event);
//trace!("Event for socket {} is {:?}", event.token().0, &event);
// We can use the token we previously provided to `register` to determine for which socket the event is.
match event.token() {
SERVER => {
@@ -82,10 +82,18 @@ impl Network {
}
}
info!("Accepted connection from: {}", address);
let token = next(&mut unique_token);
poll.registry().register(&mut stream, token, Interest::READABLE).expect("Error registering poll");
peers.add_peer(token, Peer::new(address, stream, State::Connected, true));
// If connection is from the same IP and not from loopback we ignore it to avoid connection loops
let local_ip = stream.local_addr().unwrap_or("0.0.0.0:0".parse().unwrap());
if !local_ip.ip().is_loopback() && local_ip.ip() == address.ip() {
peers.ignore_ip(&address.ip());
stream.shutdown(Shutdown::Both);
warn!("Detected connection loop, ignoring IP: {}", &address.ip());
} else {
info!("Accepted connection from: {} to local IP: {}", address, local_ip);
let token = next(&mut unique_token);
poll.registry().register(&mut stream, token, Interest::READABLE).expect("Error registering poll");
peers.add_peer(token, Peer::new(address, stream, State::Connected, true));
}
}
Err(_) => {}
}
@@ -105,8 +113,13 @@ impl Network {
// Send pings to idle peers
let (height, hash) = {
let context = context.lock().unwrap();
(context.chain.height(), context.chain.last_hash())
let mut context = context.lock().unwrap();
let height = context.chain.height();
let nodes = peers.get_peers_active_count();
if nodes > 0 {
context.bus.post(crate::event::Event::NetworkStatus { nodes, blocks: height });
}
(height, context.chain.last_hash())
};
mine_locker_block(context.clone());
peers.send_pings(poll.registry(), height, hash);
@@ -153,7 +166,9 @@ fn handle_connection_event(context: Arc<Mutex<Context>>, peers: &mut Peers, regi
State::Idle { .. } => {
peer.set_state(State::idle());
}
State::Error => {}
State::Error => {
peers.ignore_peer(registry, &event.token());
}
State::Banned => {}
State::Offline { .. } => {
peer.set_state(State::offline());
@@ -168,7 +183,7 @@ fn handle_connection_event(context: Arc<Mutex<Context>>, peers: &mut Peers, regi
}
if event.is_writable() {
trace!("Socket {} is writable", event.token().0);
//trace!("Socket {} is writable", event.token().0);
match peers.get_mut_peer(&event.token()) {
None => {}
Some(peer) => {
@@ -177,11 +192,11 @@ fn handle_connection_event(context: Arc<Mutex<Context>>, peers: &mut Peers, regi
debug!("Sending hello to {}", &peer.get_addr());
let data: String = {
let c = context.lock().unwrap();
let message = Message::hand(&c.settings.origin, CHAIN_VERSION, c.settings.public);
let message = Message::hand(&c.settings.origin, CHAIN_VERSION, c.settings.public, peer.get_rand());
serde_json::to_string(&message).unwrap()
};
send_message(peer.get_stream(), &data.into_bytes());
debug!("Sent hello to {}", &peer.get_addr());
//debug!("Sent hello to {}", &peer.get_addr());
}
State::Message { data } => {
debug!("Sending data to {}: {}", &peer.get_addr(), &String::from_utf8(data.clone()).unwrap());
@@ -220,7 +235,7 @@ fn read_message(stream: &mut TcpStream) -> Result<Vec<u8>, ()> {
0
}
};
trace!("Payload size is {}", data_size);
//trace!("Payload size is {}", data_size);
if data_size > MAX_PACKET_SIZE || data_size == 0 {
return Err(());
}
@@ -256,11 +271,10 @@ fn read_message(stream: &mut TcpStream) -> Result<Vec<u8>, ()> {
}
}
fn send_message(connection: &mut TcpStream, data: &Vec<u8>) {
// TODO handle errors
connection.write_u32::<BigEndian>(data.len() as u32).expect("Error sending message");
connection.write_all(&data).expect("Error writing to socket");
connection.flush().expect("Error sending message");
fn send_message(connection: &mut TcpStream, data: &Vec<u8>) -> io::Result<()> {
connection.write_u32::<BigEndian>(data.len() as u32)?;
connection.write_all(&data)?;
connection.flush()
}
fn handle_message(context: Arc<Mutex<Context>>, message: Message, peers: &mut Peers, token: &Token) -> State {
@@ -270,14 +284,19 @@ fn handle_message(context: Arc<Mutex<Context>>, message: Message, peers: &mut Pe
(context.chain.height(), context.chain.last_hash(), &context.settings.origin.clone(), CHAIN_VERSION)
};
match message {
Message::Hand { origin, version, public } => {
if origin.eq(my_origin) && version == my_version {
let peer = peers.get_mut_peer(token).unwrap();
peer.set_public(public);
State::message(Message::shake(&origin, version, true, my_height))
} else {
warn!("Handshake from unsupported chain or version");
Message::Hand { origin, version, public, rand} => {
if peers.is_our_own_connect(&rand) {
warn!("Detected loop connect");
State::Error
} else {
if origin.eq(my_origin) && version == my_version {
let peer = peers.get_mut_peer(token).unwrap();
peer.set_public(public);
State::message(Message::shake(&origin, version, true, my_height))
} else {
warn!("Handshake from unsupported chain or version");
State::Error
}
}
}
Message::Shake { origin, version, ok, height } => {
@@ -456,32 +475,6 @@ fn check_block(block: &Block, prev: &Block) -> bool {
prev.index == block.index - 1 && prev.hash == block.prev_block_hash
}
/// Connecting to configured (bootstrap) peers
fn connect_peers(peers_addrs: Vec<String>, poll: &mut Poll, peers: &mut Peers, unique_token: &mut Token) {
for peer in peers_addrs.iter() {
let addresses: Vec<SocketAddr> = match peer.to_socket_addrs() {
Ok(peers) => { peers.collect() }
Err(_) => { error!("Can't resolve address {}", &peer); continue; }
};
for addr in addresses {
match TcpStream::connect(addr.clone()) {
Ok(mut stream) => {
info!("Created connection to peer {}", &addr);
let token = next(unique_token);
poll.registry().register(&mut stream, token, Interest::WRITABLE).unwrap();
let mut peer = Peer::new(addr, stream, State::Connecting, false);
peer.set_public(true);
peers.add_peer(token, peer);
}
Err(e) => {
error!("Error connecting to peer {}: {}", &addr, e);
}
}
}
}
}
pub(crate) fn next(current: &mut Token) -> Token {
let next = current.0;
current.0 += 1;