Made peer connections more reliable.
This commit is contained in:
+20
-5
@@ -4,7 +4,7 @@ extern crate serde_json;
|
||||
use std::{io, thread};
|
||||
use std::io::{Read, Write};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
|
||||
use mio::{Events, Interest, Poll, Registry, Token};
|
||||
@@ -17,6 +17,8 @@ use std::net::{SocketAddr, IpAddr, SocketAddrV4, Shutdown};
|
||||
const SERVER: Token = Token(0);
|
||||
const POLL_TIMEOUT: Option<Duration> = Some(Duration::from_millis(3000));
|
||||
pub const LISTEN_PORT: u16 = 4244;
|
||||
const MAX_PACKET_SIZE: usize = 10 * 1024 * 1024; // 10 Mb
|
||||
const MAX_READ_BLOCK_TIME: u128 = 500;
|
||||
|
||||
pub struct Network {
|
||||
context: Arc<Mutex<Context>>
|
||||
@@ -220,7 +222,8 @@ fn handle_connection_event(context: Arc<Mutex<Context>>, peers: &mut Peers, regi
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
fn read_message(stream: &mut &mut TcpStream) -> Result<Vec<u8>, Vec<u8>> {
|
||||
fn read_message(stream: &mut TcpStream) -> Result<Vec<u8>, Vec<u8>> {
|
||||
let instant = Instant::now();
|
||||
let data_size = match stream.read_u32::<BigEndian>() {
|
||||
Ok(size) => { size as usize }
|
||||
Err(e) => {
|
||||
@@ -229,8 +232,10 @@ fn read_message(stream: &mut &mut TcpStream) -> Result<Vec<u8>, Vec<u8>> {
|
||||
}
|
||||
};
|
||||
println!("Payload size is {}", data_size);
|
||||
if data_size > MAX_PACKET_SIZE {
|
||||
return Err(Vec::new());
|
||||
}
|
||||
|
||||
// TODO check for very big buffer, make it no more 10Mb
|
||||
let mut buf = vec![0u8; data_size];
|
||||
let mut bytes_read = 0;
|
||||
loop {
|
||||
@@ -239,10 +244,20 @@ fn read_message(stream: &mut &mut TcpStream) -> Result<Vec<u8>, Vec<u8>> {
|
||||
bytes_read += bytes;
|
||||
}
|
||||
// Would block "errors" are the OS's way of saying that the connection is not actually ready to perform this I/O operation.
|
||||
Err(ref err) if would_block(err) => break,
|
||||
Err(ref err) if would_block(err) => {
|
||||
// We give every connection no more than 500ms to read a message
|
||||
if instant.elapsed().as_millis() < MAX_READ_BLOCK_TIME {
|
||||
continue;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
},
|
||||
Err(ref err) if interrupted(err) => continue,
|
||||
// Other errors we'll consider fatal.
|
||||
Err(_) => return Err(buf),
|
||||
Err(_) => {
|
||||
println!("Error reading message, only {} bytes read", bytes_read);
|
||||
return Err(buf)
|
||||
},
|
||||
}
|
||||
}
|
||||
if buf.len() == data_size {
|
||||
|
||||
Reference in New Issue
Block a user