Optimized read of network messages.
This commit is contained in:
Generated
+1
-1
@@ -65,7 +65,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "alfis"
|
name = "alfis"
|
||||||
version = "0.6.1"
|
version = "0.6.2"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"base64",
|
"base64",
|
||||||
"bincode",
|
"bincode",
|
||||||
|
|||||||
+1
-1
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "alfis"
|
name = "alfis"
|
||||||
version = "0.6.1"
|
version = "0.6.2"
|
||||||
authors = ["Revertron <alfis@revertron.com>"]
|
authors = ["Revertron <alfis@revertron.com>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
build = "build.rs"
|
build = "build.rs"
|
||||||
|
|||||||
@@ -40,8 +40,6 @@ pub const UI_REFRESH_DELAY_MS: u128 = 500;
|
|||||||
pub const LOG_REFRESH_DELAY_SEC: u64 = 60;
|
pub const LOG_REFRESH_DELAY_SEC: u64 = 60;
|
||||||
|
|
||||||
pub const POLL_TIMEOUT: Option<Duration> = Some(Duration::from_millis(250));
|
pub const POLL_TIMEOUT: Option<Duration> = Some(Duration::from_millis(250));
|
||||||
pub const MAX_PACKET_SIZE: usize = 1 * 1024 * 1024; // 1 Mb
|
|
||||||
pub const MAX_READ_BLOCK_TIME: u128 = 500;
|
|
||||||
pub const MAX_RECONNECTS: u32 = 5;
|
pub const MAX_RECONNECTS: u32 = 5;
|
||||||
pub const MAX_IDLE_SECONDS: u64 = 180;
|
pub const MAX_IDLE_SECONDS: u64 = 180;
|
||||||
pub const MAX_NODES: usize = 15;
|
pub const MAX_NODES: usize = 15;
|
||||||
|
|||||||
+14
-57
@@ -3,11 +3,11 @@ extern crate serde_json;
|
|||||||
|
|
||||||
use std::{io, thread};
|
use std::{io, thread};
|
||||||
use std::cmp::max;
|
use std::cmp::max;
|
||||||
use std::io::{Read, Write, Error};
|
use std::io::{Read, Write, Error, ErrorKind};
|
||||||
use std::net::{IpAddr, Shutdown, SocketAddr, SocketAddrV4};
|
use std::net::{IpAddr, Shutdown, SocketAddr, SocketAddrV4};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::Instant;
|
||||||
|
|
||||||
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
|
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
|
||||||
#[allow(unused_imports)]
|
#[allow(unused_imports)]
|
||||||
@@ -274,7 +274,6 @@ impl Network {
|
|||||||
match self.peers.get_peer(&event.token()) {
|
match self.peers.get_peer(&event.token()) {
|
||||||
Some(peer) => {
|
Some(peer) => {
|
||||||
let data = data.unwrap();
|
let data = data.unwrap();
|
||||||
//info!("Decoding message {:?}", to_hex(data.as_slice()));
|
|
||||||
match decode_message(&data, peer.get_cipher()) {
|
match decode_message(&data, peer.get_cipher()) {
|
||||||
Ok(data) => {
|
Ok(data) => {
|
||||||
data
|
data
|
||||||
@@ -337,6 +336,12 @@ impl Network {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
let error = data.err().unwrap();
|
||||||
|
let addr = match self.peers.get_peer(&event.token()) {
|
||||||
|
None => { String::from("unknown") }
|
||||||
|
Some(peer) => { peer.get_addr().to_string() }
|
||||||
|
};
|
||||||
|
debug!("Error reading message from {}, error = {}", addr, error);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -674,55 +679,15 @@ fn decode_message(data: &Vec<u8>, cipher: &Option<Chacha>) -> Result<Vec<u8>, ch
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_message(stream: &mut TcpStream) -> Result<Vec<u8>, ()> {
|
fn read_message(stream: &mut TcpStream) -> Result<Vec<u8>, Error> {
|
||||||
let instant = Instant::now();
|
let data_size = (stream.read_u16::<BigEndian>()? ^ 0xAAAA) as usize;
|
||||||
let data_size = match stream.read_u16::<BigEndian>() {
|
if data_size == 0 {
|
||||||
Ok(size) => { (size ^ 0xAAAA) as usize }
|
return Err(io::Error::from(ErrorKind::InvalidInput));
|
||||||
Err(e) => {
|
|
||||||
error!("Error reading from socket! {}", e);
|
|
||||||
0
|
|
||||||
}
|
|
||||||
};
|
|
||||||
//trace!("Payload size is {}", data_size);
|
|
||||||
if data_size > MAX_PACKET_SIZE || data_size == 0 {
|
|
||||||
return Err(());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut buf = vec![0u8; data_size];
|
let mut buf = vec![0u8; data_size];
|
||||||
let mut bytes_read = 0;
|
stream.read_exact(&mut buf)?;
|
||||||
loop {
|
Ok(buf)
|
||||||
match stream.read(&mut buf[bytes_read..]) {
|
|
||||||
Ok(bytes) => {
|
|
||||||
bytes_read += bytes;
|
|
||||||
if bytes_read == data_size {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Would block "errors" are the OS's way of saying that the connection is not actually ready to perform this I/O operation.
|
|
||||||
Err(ref err) if would_block(err) => {
|
|
||||||
// We give every connection no more than 200ms to read a message
|
|
||||||
if instant.elapsed().as_millis() < MAX_READ_BLOCK_TIME {
|
|
||||||
// We need to sleep a bit, otherwise it can eat CPU
|
|
||||||
let delay = Duration::from_millis(2);
|
|
||||||
thread::sleep(delay);
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(ref err) if interrupted(err) => continue,
|
|
||||||
// Other errors we'll consider fatal.
|
|
||||||
Err(_) => {
|
|
||||||
debug!("Error reading message, only {}/{} bytes read", bytes_read, data_size);
|
|
||||||
return Err(())
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if buf.len() == data_size {
|
|
||||||
Ok(buf)
|
|
||||||
} else {
|
|
||||||
Err(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sends one byte [garbage_size], [random bytes], and [public_key]
|
/// Sends one byte [garbage_size], [random bytes], and [public_key]
|
||||||
@@ -822,11 +787,3 @@ fn send_message(connection: &mut TcpStream, data: &Vec<u8>) -> io::Result<()> {
|
|||||||
connection.write_all(&buf)?;
|
connection.write_all(&buf)?;
|
||||||
connection.flush()
|
connection.flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn would_block(err: &io::Error) -> bool {
|
|
||||||
err.kind() == io::ErrorKind::WouldBlock
|
|
||||||
}
|
|
||||||
|
|
||||||
fn interrupted(err: &io::Error) -> bool {
|
|
||||||
err.kind() == io::ErrorKind::Interrupted
|
|
||||||
}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user