Reworked p2p code, optimized a lot.

This commit is contained in:
Revertron
2021-04-16 16:53:31 +02:00
parent 1242cbac87
commit 6a1aa6c0d5
4 changed files with 76 additions and 49 deletions
+45 -25
View File
@@ -11,19 +11,29 @@ use rand::seq::IteratorRandom;
use log::{trace, debug, info, warn, error};
use crate::{Bytes, is_yggdrasil, commons};
use crate::commons::MAX_RECONNECTS;
use chrono::Utc;
pub struct Peers {
peers: HashMap<Token, Peer>,
new_peers: Vec<SocketAddr>,
ignored: HashSet<IpAddr>,
my_id: String
my_id: String,
asked_block: u64,
asked_time: i64,
}
const PING_PERIOD: u64 = 60;
impl Peers {
pub fn new() -> Self {
Peers { peers: HashMap::new(), new_peers: Vec::new(), ignored: HashSet::new(), my_id: commons::random_string(6) }
Peers {
peers: HashMap::new(),
new_peers: Vec::new(),
ignored: HashSet::new(),
my_id: commons::random_string(6),
asked_block: 0,
asked_time: 0
}
}
pub fn add_peer(&mut self, token: Token, peer: Peer) {
@@ -60,18 +70,14 @@ impl Peers {
}
State::Banned => {
trace!("Peer connection {} to {:?} has shut down, banned", &token.0, &peer.get_addr());
self.ignored.insert(peer.get_addr().ip().clone());
}
State::Offline { .. } => {
trace!("Peer connection {} to {:?} is offline", &token.0, &peer.get_addr());
}
}
if !peer.disabled() && !peer.is_inbound() {
peer.set_state(State::offline());
peer.set_active(false);
} else {
self.peers.remove(token);
}
self.peers.remove(token);
}
None => {}
}
@@ -84,6 +90,7 @@ impl Peers {
peers.insert(peer.to_owned());
peers
});
debug!("Got {} peers", peers.len());
//debug!("Got {} peers: {:?}", peers.len(), &peers);
// TODO make it return error if these peers are wrong and seem like an attack
for peer in peers.iter() {
@@ -112,7 +119,7 @@ impl Peers {
}
if self.ignored.contains(&addr.ip()) {
trace!("Skipping address from exchange: {}", &addr);
info!("Skipping ignored address from exchange: {}", &addr);
continue;
}
@@ -120,16 +127,6 @@ impl Peers {
//debug!("Skipping address from exchange: {}", &addr);
continue; // Return error in future
}
let mut found = false;
for (_token, p) in self.peers.iter() {
if p.equals(&addr) {
found = true;
break;
}
}
if found {
continue;
}
self.new_peers.push(addr);
}
}
@@ -151,13 +148,20 @@ impl Peers {
if peer.equals(peer_address) {
continue;
}
if peer.is_public() {
if peer.is_public() && peer.active() {
result.push(SocketAddr::new(peer.get_addr().ip(), LISTEN_PORT).to_string());
}
if result.len() >= 10 {
break;
}
}
result
}
pub fn get_peers_count(&self) -> usize {
self.peers.len()
}
pub fn get_peers_active_count(&self) -> usize {
let mut count = 0;
for (_, peer) in self.peers.iter() {
@@ -187,6 +191,7 @@ impl Peers {
}
pub fn ignore_ip(&mut self, ip: &IpAddr) {
info!("Adding {} to ignored peers", &ip);
self.ignored.insert(ip.clone());
}
@@ -199,7 +204,7 @@ impl Peers {
false
}
pub fn send_pings(&mut self, registry: &Registry, height: u64, hash: Bytes) {
pub fn update(&mut self, registry: &Registry, height: u64, hash: Bytes) {
let mut ping_sent = false;
for (token, peer) in self.peers.iter_mut() {
match peer.get_state() {
@@ -225,20 +230,25 @@ impl Peers {
}
// If someone has more blocks we sync
if !ping_sent {
if self.need_ask_block(height + 1) {
let mut rng = rand::thread_rng();
let mut asked = false;
match self.peers
.iter_mut()
.filter_map(|(token, peer)| if peer.has_more_blocks(height) { Some((token, peer)) } else { None })
.choose(&mut rng) {
None => {}
Some((token, peer)) => {
debug!("Found some peer higher than we are, sending block request");
debug!("Found some peer higher than we are, requesting block {}", height + 1);
registry.reregister(peer.get_stream(), token.clone(), Interest::WRITABLE).unwrap();
peer.set_state(State::message(Message::GetBlock { index: height + 1 }));
ping_sent = true;
asked = true;
}
}
if asked {
self.set_asked_block(height + 1);
}
}
// If someone has less blocks (we mined a new block) we send a ping with our height
@@ -274,7 +284,7 @@ impl Peers {
if peer.get_state().need_reconnect() {
let addr = peer.get_addr();
if let Ok(mut stream) = TcpStream::connect(addr.clone()) {
trace!("Trying to connect to peer {}", &addr);
debug!("Trying to reconnect to peer {}, count {}", &addr, peer.reconnects());
registry.register(&mut stream, token.clone(), Interest::WRITABLE).unwrap();
peer.set_state(State::Connecting);
peer.inc_reconnects();
@@ -296,7 +306,7 @@ impl Peers {
}
/// Connecting to configured (bootstrap) peers
pub fn connect_peers(&mut self, peers_addrs: Vec<String>, registry: &Registry, unique_token: &mut Token, yggdrasil_only: bool) {
pub fn connect_peers(&mut self, peers_addrs: &Vec<String>, registry: &Registry, unique_token: &mut Token, yggdrasil_only: bool) {
let mut set = HashSet::new();
for peer in peers_addrs.iter() {
let mut addresses: Vec<SocketAddr> = match peer.to_socket_addrs() {
@@ -332,6 +342,16 @@ impl Peers {
self.peers.insert(token, peer);
}
}
pub fn set_asked_block(&mut self, index: u64) {
self.asked_block = index;
self.asked_time = Utc::now().timestamp();
}
pub fn need_ask_block(&self, index: u64) -> bool {
index > self.asked_block || self.asked_time + 3 < Utc::now().timestamp()
}
}
fn skip_private_addr(addr: &SocketAddr) -> bool {