diff --git a/src/commons/constants.rs b/src/commons/constants.rs index c5a4293..85b99c0 100644 --- a/src/commons/constants.rs +++ b/src/commons/constants.rs @@ -42,6 +42,7 @@ pub const LOG_REFRESH_DELAY_SEC: u64 = 60; pub const POLL_TIMEOUT: Option = Some(Duration::from_millis(500)); /// We start syncing blocks only when we got 4 and more connected nodes pub const MIN_CONNECTED_NODES_START_SYNC: usize = 4; +pub const MAX_READ_BLOCK_TIME: u128 = 100; pub const MAX_RECONNECTS: u32 = 5; pub const MAX_IDLE_SECONDS: u64 = 180; pub const MAX_NODES: usize = 15; diff --git a/src/p2p/network.rs b/src/p2p/network.rs index 7f01c25..b28b8af 100644 --- a/src/p2p/network.rs +++ b/src/p2p/network.rs @@ -7,7 +7,7 @@ use std::io::{Error, ErrorKind, Read, Write}; use std::net::{IpAddr, Shutdown, SocketAddr, SocketAddrV4}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; -use std::time::Instant; +use std::time::{Duration, Instant}; use std::{io, thread}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; @@ -692,14 +692,47 @@ fn decode_message(data: &[u8], cipher: &Option) -> Result, chach } fn read_message(stream: &mut TcpStream) -> Result, Error> { + let instant = Instant::now(); let data_size = (stream.read_u16::()? ^ 0xAAAA) as usize; if data_size == 0 { return Err(io::Error::from(ErrorKind::InvalidInput)); } let mut buf = vec![0u8; data_size]; - stream.read_exact(&mut buf)?; - Ok(buf) + let mut bytes_read = 0; + let delay = Duration::from_millis(2); + loop { + match stream.read(&mut buf[bytes_read..]) { + Ok(bytes) => { + bytes_read += bytes; + if bytes_read == data_size { + break; + } + } + // Would block "errors" are the OS's way of saying that the connection is not actually ready to perform this I/O operation. + Err(ref err) if would_block(err) => { + // We give every connection no more than 100ms to read a message + if instant.elapsed().as_millis() < MAX_READ_BLOCK_TIME { + // We need to sleep a bit, otherwise it can eat CPU + thread::sleep(delay); + continue; + } else { + break; + } + }, + Err(ref err) if interrupted(err) => continue, + // Other errors we'll consider fatal. + Err(e) => { + debug!("Error reading message, only {}/{} bytes read", bytes_read, data_size); + return Err(e) + }, + } + } + if buf.len() == data_size { + Ok(buf) + } else { + Err(io::Error::from(ErrorKind::WouldBlock)) + } } /// Sends one byte [garbage_size], [random bytes], and [public_key] @@ -803,3 +836,11 @@ fn send_message(connection: &mut TcpStream, data: &[u8]) -> io::Result<()> { connection.write_all(&buf)?; connection.flush() } + +fn would_block(err: &io::Error) -> bool { + err.kind() == io::ErrorKind::WouldBlock +} + +fn interrupted(err: &io::Error) -> bool { + err.kind() == io::ErrorKind::Interrupted +} \ No newline at end of file