Fixed network message reading.
This commit is contained in:
@@ -42,6 +42,7 @@ pub const LOG_REFRESH_DELAY_SEC: u64 = 60;
|
||||
pub const POLL_TIMEOUT: Option<Duration> = Some(Duration::from_millis(500));
|
||||
/// We start syncing blocks only when we got 4 and more connected nodes
|
||||
pub const MIN_CONNECTED_NODES_START_SYNC: usize = 4;
|
||||
pub const MAX_READ_BLOCK_TIME: u128 = 100;
|
||||
pub const MAX_RECONNECTS: u32 = 5;
|
||||
pub const MAX_IDLE_SECONDS: u64 = 180;
|
||||
pub const MAX_NODES: usize = 15;
|
||||
|
||||
+44
-3
@@ -7,7 +7,7 @@ use std::io::{Error, ErrorKind, Read, Write};
|
||||
use std::net::{IpAddr, Shutdown, SocketAddr, SocketAddrV4};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Instant;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{io, thread};
|
||||
|
||||
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
|
||||
@@ -692,14 +692,47 @@ fn decode_message(data: &[u8], cipher: &Option<Chacha>) -> Result<Vec<u8>, chach
|
||||
}
|
||||
|
||||
fn read_message(stream: &mut TcpStream) -> Result<Vec<u8>, Error> {
|
||||
let instant = Instant::now();
|
||||
let data_size = (stream.read_u16::<BigEndian>()? ^ 0xAAAA) as usize;
|
||||
if data_size == 0 {
|
||||
return Err(io::Error::from(ErrorKind::InvalidInput));
|
||||
}
|
||||
|
||||
let mut buf = vec![0u8; data_size];
|
||||
stream.read_exact(&mut buf)?;
|
||||
Ok(buf)
|
||||
let mut bytes_read = 0;
|
||||
let delay = Duration::from_millis(2);
|
||||
loop {
|
||||
match stream.read(&mut buf[bytes_read..]) {
|
||||
Ok(bytes) => {
|
||||
bytes_read += bytes;
|
||||
if bytes_read == data_size {
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Would block "errors" are the OS's way of saying that the connection is not actually ready to perform this I/O operation.
|
||||
Err(ref err) if would_block(err) => {
|
||||
// We give every connection no more than 100ms to read a message
|
||||
if instant.elapsed().as_millis() < MAX_READ_BLOCK_TIME {
|
||||
// We need to sleep a bit, otherwise it can eat CPU
|
||||
thread::sleep(delay);
|
||||
continue;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
},
|
||||
Err(ref err) if interrupted(err) => continue,
|
||||
// Other errors we'll consider fatal.
|
||||
Err(e) => {
|
||||
debug!("Error reading message, only {}/{} bytes read", bytes_read, data_size);
|
||||
return Err(e)
|
||||
},
|
||||
}
|
||||
}
|
||||
if buf.len() == data_size {
|
||||
Ok(buf)
|
||||
} else {
|
||||
Err(io::Error::from(ErrorKind::WouldBlock))
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends one byte [garbage_size], [random bytes], and [public_key]
|
||||
@@ -803,3 +836,11 @@ fn send_message(connection: &mut TcpStream, data: &[u8]) -> io::Result<()> {
|
||||
connection.write_all(&buf)?;
|
||||
connection.flush()
|
||||
}
|
||||
|
||||
fn would_block(err: &io::Error) -> bool {
|
||||
err.kind() == io::ErrorKind::WouldBlock
|
||||
}
|
||||
|
||||
fn interrupted(err: &io::Error) -> bool {
|
||||
err.kind() == io::ErrorKind::Interrupted
|
||||
}
|
||||
Reference in New Issue
Block a user