From bb1f1acce5d0de7f8464a681e66a89a2e1570aed Mon Sep 17 00:00:00 2001 From: Revertron Date: Wed, 2 Jun 2021 13:12:54 +0200 Subject: [PATCH] Optimized read of network messages. --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/commons/constants.rs | 2 -- src/p2p/network.rs | 71 ++++++++-------------------------------- 4 files changed, 16 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ea8a81a..38e61c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -65,7 +65,7 @@ dependencies = [ [[package]] name = "alfis" -version = "0.6.1" +version = "0.6.2" dependencies = [ "base64", "bincode", diff --git a/Cargo.toml b/Cargo.toml index c549eb5..4d0425c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "alfis" -version = "0.6.1" +version = "0.6.2" authors = ["Revertron "] edition = "2018" build = "build.rs" diff --git a/src/commons/constants.rs b/src/commons/constants.rs index 74356d5..c804545 100644 --- a/src/commons/constants.rs +++ b/src/commons/constants.rs @@ -40,8 +40,6 @@ pub const UI_REFRESH_DELAY_MS: u128 = 500; pub const LOG_REFRESH_DELAY_SEC: u64 = 60; pub const POLL_TIMEOUT: Option = Some(Duration::from_millis(250)); -pub const MAX_PACKET_SIZE: usize = 1 * 1024 * 1024; // 1 Mb -pub const MAX_READ_BLOCK_TIME: u128 = 500; pub const MAX_RECONNECTS: u32 = 5; pub const MAX_IDLE_SECONDS: u64 = 180; pub const MAX_NODES: usize = 15; diff --git a/src/p2p/network.rs b/src/p2p/network.rs index bfbd22e..1588346 100644 --- a/src/p2p/network.rs +++ b/src/p2p/network.rs @@ -3,11 +3,11 @@ extern crate serde_json; use std::{io, thread}; use std::cmp::max; -use std::io::{Read, Write, Error}; +use std::io::{Read, Write, Error, ErrorKind}; use std::net::{IpAddr, Shutdown, SocketAddr, SocketAddrV4}; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::{Duration, Instant}; +use std::time::Instant; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; #[allow(unused_imports)] @@ -274,7 +274,6 @@ impl Network { match self.peers.get_peer(&event.token()) { Some(peer) => { let data = data.unwrap(); - //info!("Decoding message {:?}", to_hex(data.as_slice())); match decode_message(&data, peer.get_cipher()) { Ok(data) => { data @@ -337,6 +336,12 @@ impl Network { } } } else { + let error = data.err().unwrap(); + let addr = match self.peers.get_peer(&event.token()) { + None => { String::from("unknown") } + Some(peer) => { peer.get_addr().to_string() } + }; + debug!("Error reading message from {}, error = {}", addr, error); return false; } } @@ -674,55 +679,15 @@ fn decode_message(data: &Vec, cipher: &Option) -> Result, ch } } -fn read_message(stream: &mut TcpStream) -> Result, ()> { - let instant = Instant::now(); - let data_size = match stream.read_u16::() { - Ok(size) => { (size ^ 0xAAAA) as usize } - Err(e) => { - error!("Error reading from socket! {}", e); - 0 - } - }; - //trace!("Payload size is {}", data_size); - if data_size > MAX_PACKET_SIZE || data_size == 0 { - return Err(()); +fn read_message(stream: &mut TcpStream) -> Result, Error> { + let data_size = (stream.read_u16::()? ^ 0xAAAA) as usize; + if data_size == 0 { + return Err(io::Error::from(ErrorKind::InvalidInput)); } let mut buf = vec![0u8; data_size]; - let mut bytes_read = 0; - loop { - match stream.read(&mut buf[bytes_read..]) { - Ok(bytes) => { - bytes_read += bytes; - if bytes_read == data_size { - break; - } - } - // Would block "errors" are the OS's way of saying that the connection is not actually ready to perform this I/O operation. - Err(ref err) if would_block(err) => { - // We give every connection no more than 200ms to read a message - if instant.elapsed().as_millis() < MAX_READ_BLOCK_TIME { - // We need to sleep a bit, otherwise it can eat CPU - let delay = Duration::from_millis(2); - thread::sleep(delay); - continue; - } else { - break; - } - }, - Err(ref err) if interrupted(err) => continue, - // Other errors we'll consider fatal. - Err(_) => { - debug!("Error reading message, only {}/{} bytes read", bytes_read, data_size); - return Err(()) - }, - } - } - if buf.len() == data_size { - Ok(buf) - } else { - Err(()) - } + stream.read_exact(&mut buf)?; + Ok(buf) } /// Sends one byte [garbage_size], [random bytes], and [public_key] @@ -822,11 +787,3 @@ fn send_message(connection: &mut TcpStream, data: &Vec) -> io::Result<()> { connection.write_all(&buf)?; connection.flush() } - -fn would_block(err: &io::Error) -> bool { - err.kind() == io::ErrorKind::WouldBlock -} - -fn interrupted(err: &io::Error) -> bool { - err.kind() == io::ErrorKind::Interrupted -}