Optimized network connections (added write timeouts).

This commit is contained in:
Revertron
2023-01-30 14:40:59 +01:00
parent da7db8dfa4
commit c199a62a04
5 changed files with 112 additions and 26 deletions
+29 -4
View File
@@ -430,7 +430,7 @@ impl Network {
peer.set_state(State::idle());
}
State::Idle { from } => {
debug!("Odd version of pings :)");
debug!("Odd version of pings for {}", peer.get_addr().ip());
if from.elapsed().as_secs() >= 30 {
let data: Vec<u8> = {
let c = self.context.lock().unwrap();
@@ -470,7 +470,8 @@ impl Network {
let answer = match message {
Message::Hand { app_version, origin, version, public, rand_id } => {
if !version_compatible(&app_version) {
info!("Banning peer with version {}", &app_version);
let peer = self.peers.get_peer(token).unwrap();
info!("Banning peer with version {}, at {}", &app_version, peer.get_addr().ip());
return State::Banned;
}
if self.peers.is_our_own_connect(&rand_id) {
@@ -505,7 +506,8 @@ impl Network {
return State::Twin;
}
if !version_compatible(&app_version) {
info!("Banning peer with version {}", &app_version);
let peer = self.peers.get_peer(token).unwrap();
info!("Banning peer with version {} at {}", &app_version, peer.get_addr().ip());
return State::Banned;
}
let nodes = self.peers.get_peers_active_count();
@@ -893,7 +895,7 @@ fn send_message(connection: &mut TcpStream, data: &[u8]) -> io::Result<()> {
let mut buf: Vec<u8> = Vec::with_capacity(data.len() + 2);
buf.write_u16::<BigEndian>(data_len ^ 0xAAAA)?;
buf.write_all(data)?;
connection.write_all(&buf)?;
write_all(connection, &buf)?;
connection.flush()
}
@@ -926,6 +928,29 @@ fn interrupted(err: &io::Error) -> bool {
err.kind() == io::ErrorKind::Interrupted
}
fn write_all(connection: &mut TcpStream, mut buf: &[u8]) -> io::Result<()> {
let start = Instant::now();
let timeout = Duration::from_secs(3);
let delay = Duration::from_millis(2);
while !buf.is_empty() {
match connection.write(buf) {
Ok(0) => {
return Err(io::Error::from(ErrorKind::WriteZero));
}
Ok(n) => buf = &buf[n..],
Err(ref e) if e.kind() == ErrorKind::Interrupted => thread::sleep(delay),
Err(e) => return Err(e),
}
if start.elapsed() > timeout {
warn!("Error writing data to {}", connection.peer_addr().unwrap());
return Err(io::Error::from(ErrorKind::BrokenPipe));
} else {
thread::sleep(delay);
}
}
Ok(())
}
fn version_compatible(version: &str) -> bool {
let my_version = env!("CARGO_PKG_VERSION");
let parts = my_version.split('.').collect::<Vec<&str>>();
+4
View File
@@ -141,6 +141,10 @@ impl Peer {
self.active && self.last_active.elapsed().as_secs() < 120
}
pub fn active_recently(&self) -> bool {
self.active && self.last_active.elapsed().as_secs() < 10
}
pub fn reconnects(&self) -> u32 {
self.reconnects
}
+1 -1
View File
@@ -303,7 +303,7 @@ impl Peers {
let mut rng = rand::thread_rng();
match self.peers
.iter_mut()
.filter_map(|(token, peer)| if peer.is_lower(height) && peer.get_state().is_idle() && peer.get_sent_height() < height { Some((token, peer)) } else { None })
.filter_map(|(token, peer)| if peer.is_lower(height) && peer.get_state().is_idle() && !peer.active_recently() && peer.get_sent_height() < height { Some((token, peer)) } else { None })
.choose(&mut rng) {
None => {}
Some((token, peer)) => {