Implemented P2P peer exchange. Refactored project structure.
This commit is contained in:
@@ -2,8 +2,12 @@
|
|||||||
"chain_name": "test",
|
"chain_name": "test",
|
||||||
"version_flags": 0,
|
"version_flags": 0,
|
||||||
"key_file": "default.key",
|
"key_file": "default.key",
|
||||||
"listen": "127.0.0.1:4442",
|
"listen": "127.0.0.1:4244",
|
||||||
|
"public": true,
|
||||||
"peers": [
|
"peers": [
|
||||||
"127.0.0.1:44421"
|
"127.0.0.1:44421",
|
||||||
|
"127.0.0.1:10000",
|
||||||
|
"127.0.0.1:10001",
|
||||||
|
"127.0.0.1:10002"
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
@@ -52,6 +52,7 @@ pub struct Settings {
|
|||||||
pub version_flags: u32,
|
pub version_flags: u32,
|
||||||
pub key_file: String,
|
pub key_file: String,
|
||||||
pub listen: String,
|
pub listen: String,
|
||||||
|
pub public: bool,
|
||||||
pub peers: Vec<String>
|
pub peers: Vec<String>
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+5
-4
@@ -2,16 +2,17 @@ extern crate serde;
|
|||||||
extern crate serde_json;
|
extern crate serde_json;
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use crate::p2p::peer::Peer;
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub enum Message {
|
pub enum Message {
|
||||||
Error,
|
Error,
|
||||||
Hand { chain: String, version: u32 },
|
Hand { chain: String, version: u32, public: bool },
|
||||||
Shake { ok: bool, height: u64 },
|
Shake { ok: bool, height: u64 },
|
||||||
Ping { height: u64 },
|
Ping { height: u64 },
|
||||||
Pong { height: u64 },
|
Pong { height: u64 },
|
||||||
GetPeers,
|
GetPeers,
|
||||||
Peers,
|
Peers { peers: Vec<String> },
|
||||||
GetBlock { index: u64 },
|
GetBlock { index: u64 },
|
||||||
Block { index: u64, block: String },
|
Block { index: u64, block: String },
|
||||||
}
|
}
|
||||||
@@ -25,8 +26,8 @@ impl Message {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn hand(chain: &str, version: u32) -> Self {
|
pub fn hand(chain: &str, version: u32, public: bool) -> Self {
|
||||||
Message::Hand { chain: chain.to_owned(), version }
|
Message::Hand { chain: chain.to_owned(), version, public }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn shake(ok: bool, height: u64) -> Self {
|
pub fn shake(ok: bool, height: u64) -> Self {
|
||||||
|
|||||||
@@ -2,8 +2,11 @@ pub mod network;
|
|||||||
pub mod message;
|
pub mod message;
|
||||||
pub mod state;
|
pub mod state;
|
||||||
pub mod peer;
|
pub mod peer;
|
||||||
|
pub mod peers;
|
||||||
|
|
||||||
pub use network::Network;
|
pub use network::Network;
|
||||||
pub use message::Message;
|
pub use message::Message;
|
||||||
pub use state::State;
|
pub use state::State;
|
||||||
|
pub use peer::Peer;
|
||||||
|
pub use peers::Peers;
|
||||||
|
|
||||||
|
|||||||
+137
-108
@@ -13,13 +13,13 @@ use mio::event::Event;
|
|||||||
use mio::net::{TcpListener, TcpStream};
|
use mio::net::{TcpListener, TcpStream};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::{Context, Block};
|
use crate::{Context, Block, p2p::Message, p2p::State, p2p::Peer, p2p::Peers};
|
||||||
use crate::p2p::Message;
|
use std::net::{SocketAddr, IpAddr, SocketAddrV4};
|
||||||
use crate::p2p::State;
|
use std::borrow::BorrowMut;
|
||||||
use crate::p2p::peer::Peer;
|
|
||||||
|
|
||||||
const SERVER: Token = Token(0);
|
const SERVER: Token = Token(0);
|
||||||
const POLL_TIMEOUT: Option<Duration> = Some(Duration::from_millis(1000));
|
const POLL_TIMEOUT: Option<Duration> = Some(Duration::from_millis(3000));
|
||||||
|
pub const LISTEN_PORT: u16 = 4244;
|
||||||
|
|
||||||
pub struct Network {
|
pub struct Network {
|
||||||
context: Arc<Mutex<Context>>
|
context: Arc<Mutex<Context>>
|
||||||
@@ -31,7 +31,7 @@ impl Network {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn start(&mut self) -> Result<(), String> {
|
pub fn start(&mut self) -> Result<(), String> {
|
||||||
let (listen_addr, peers) = {
|
let (listen_addr, peers_addrs) = {
|
||||||
let c = self.context.lock().unwrap();
|
let c = self.context.lock().unwrap();
|
||||||
(c.settings.listen.clone(), c.settings.peers.clone())
|
(c.settings.listen.clone(), c.settings.peers.clone())
|
||||||
};
|
};
|
||||||
@@ -46,63 +46,58 @@ impl Network {
|
|||||||
poll.registry().register(&mut server, SERVER, Interest::READABLE).expect("Error registering poll");
|
poll.registry().register(&mut server, SERVER, Interest::READABLE).expect("Error registering poll");
|
||||||
let context = self.context.clone();
|
let context = self.context.clone();
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
|
// Give UI some time to appear :)
|
||||||
|
thread::sleep(Duration::from_millis(2000));
|
||||||
// Unique token for each incoming connection.
|
// Unique token for each incoming connection.
|
||||||
let mut unique_token = Token(SERVER.0 + 1);
|
let mut unique_token = Token(SERVER.0 + 1);
|
||||||
// Map of `Token` -> `TcpStream`.
|
|
||||||
let mut connections = HashMap::new();
|
|
||||||
// States of peer connections, and some data to send when sockets become writable
|
// States of peer connections, and some data to send when sockets become writable
|
||||||
let mut peer_state: HashMap<Token, Peer> = HashMap::new();
|
let mut peers = Peers::new();
|
||||||
// Starting peer connections to bootstrap nodes
|
// Starting peer connections to bootstrap nodes
|
||||||
for peer in peers.iter() {
|
connect_peers(peers_addrs, &mut poll, &mut peers, &mut unique_token);
|
||||||
match TcpStream::connect(peer.parse().expect("Error parsing peer address")) {
|
|
||||||
Ok(mut stream) => {
|
|
||||||
println!("Created connection to peer {}", &peer);
|
|
||||||
let token = next(&mut unique_token);
|
|
||||||
poll.registry().register(&mut stream, token, Interest::WRITABLE).unwrap();
|
|
||||||
peer_state.insert(token, Peer::new(peer.clone(), State::Connecting));
|
|
||||||
connections.insert(token, stream);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
println!("Error connecting to peer {}: {}", &peer, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// Poll Mio for events, blocking until we get an event.
|
// Poll Mio for events, blocking until we get an event.
|
||||||
poll.poll(&mut events, POLL_TIMEOUT).expect("Error polling sockets");
|
poll.poll(&mut events, POLL_TIMEOUT).expect("Error polling sockets");
|
||||||
//println!("Polling finished, got events: {}", !events.is_empty());
|
|
||||||
|
|
||||||
// Process each event.
|
// Process each event.
|
||||||
for event in events.iter() {
|
for event in events.iter() {
|
||||||
println!("Event for {} is {:?}", event.token().0, &event);
|
println!("Event for socket {} is {:?}", event.token().0, &event);
|
||||||
// We can use the token we previously provided to `register` to determine for which socket the event is.
|
// We can use the token we previously provided to `register` to determine for which socket the event is.
|
||||||
match event.token() {
|
match event.token() {
|
||||||
SERVER => {
|
SERVER => {
|
||||||
// If this is an event for the server, it means a connection is ready to be accepted.
|
// If this is an event for the server, it means a connection is ready to be accepted.
|
||||||
let connection = server.accept();
|
let connection = server.accept();
|
||||||
match connection {
|
match connection {
|
||||||
Ok((mut connection, address)) => {
|
Ok((mut stream, mut address)) => {
|
||||||
|
// Checking if it is an ipv4-mapped ipv6 if yes convert to ipv4
|
||||||
|
if address.is_ipv6() {
|
||||||
|
if let IpAddr::V6(ipv6) = address.ip() {
|
||||||
|
if let Some(ipv4) = ipv6.to_ipv4() {
|
||||||
|
address = SocketAddr::V4(SocketAddrV4::new(ipv4, address.port()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
println!("Accepted connection from: {}", address);
|
println!("Accepted connection from: {}", address);
|
||||||
let token = next(&mut unique_token);
|
let token = next(&mut unique_token);
|
||||||
poll.registry().register(&mut connection, token, Interest::READABLE).expect("Error registering poll");
|
poll.registry().register(&mut stream, token, Interest::READABLE).expect("Error registering poll");
|
||||||
peer_state.insert(token, Peer::new(address.to_string(), State::Connected));
|
peers.add_peer(token, Peer::new(address, stream, State::Connected, true));
|
||||||
connections.insert(token, connection);
|
|
||||||
}
|
}
|
||||||
Err(_) => {}
|
Err(_) => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
token => {
|
token => {
|
||||||
match connections.get_mut(&token) {
|
match peers.get_mut_peer(&token) {
|
||||||
Some(connection) => {
|
Some(peer) => {
|
||||||
match handle_connection_event(context.clone(), &mut peer_state, &poll.registry(), connection, &event) {
|
match handle_connection_event(context.clone(), &mut peers, &poll.registry(), &event) {
|
||||||
Ok(result) => {
|
Ok(result) => {
|
||||||
if !result {
|
if !result {
|
||||||
connections.remove(&token);
|
peers.remove_peer(&token);
|
||||||
peer_state.remove(&token);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(err) => {}
|
Err(_err) => {
|
||||||
|
peers.remove_peer(&token);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => { println!("Odd event from poll"); }
|
None => { println!("Odd event from poll"); }
|
||||||
@@ -110,88 +105,60 @@ impl Network {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send pings to idle peers
|
// Send pings to idle peers
|
||||||
for (token, peer) in peer_state.iter_mut() {
|
let height = { context.lock().unwrap().blockchain.height() };
|
||||||
match peer.get_state() {
|
peers.send_pings(poll.registry(), height);
|
||||||
State::Idle { from } => {
|
peers.connect_new_peers(poll.registry(), &mut unique_token);
|
||||||
if from.elapsed().as_secs() >= 30 {
|
|
||||||
let c = context.lock().unwrap();
|
|
||||||
peer.set_state(State::message(Message::ping(c.blockchain.height())));
|
|
||||||
let mut connection = connections.get_mut(&token).unwrap();
|
|
||||||
poll.registry().reregister(connection, token.clone(), Interest::WRITABLE).unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_connection_event(context: Arc<Mutex<Context>>, peer_state: &mut HashMap<Token, Peer>, registry: &Registry, connection: &mut TcpStream, event: &Event) -> io::Result<bool> {
|
fn handle_connection_event(context: Arc<Mutex<Context>>, peers: &mut Peers, registry: &Registry, event: &Event) -> io::Result<bool> {
|
||||||
if event.is_error() {
|
if event.is_error() || (event.is_read_closed() && event.is_write_closed()) {
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
if event.is_readable() {
|
if event.is_readable() {
|
||||||
let data_size = match connection.read_u32::<BigEndian>() {
|
let data = {
|
||||||
Ok(size) => { size as usize }
|
let mut peer = peers.get_mut_peer(&event.token()).expect("Error getting peer for connection");
|
||||||
Err(e) => {
|
let mut stream = peer.get_stream();
|
||||||
println!("Error reading from socket! {}", e);
|
read_message(&mut stream)
|
||||||
0
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
println!("Payload size is {}", data_size);
|
|
||||||
|
|
||||||
// TODO check for very big buffer, make it no more 10Mb
|
if data.is_ok() {
|
||||||
let mut buf = vec![0u8; data_size];
|
let data = data.unwrap();
|
||||||
let mut bytes_read = 0;
|
match Message::from_bytes(data) {
|
||||||
loop {
|
|
||||||
match connection.read(&mut buf[bytes_read..]) {
|
|
||||||
Ok(bytes) => {
|
|
||||||
bytes_read += bytes;
|
|
||||||
}
|
|
||||||
// Would block "errors" are the OS's way of saying that the connection is not actually ready to perform this I/O operation.
|
|
||||||
Err(ref err) if would_block(err) => break,
|
|
||||||
Err(ref err) if interrupted(err) => continue,
|
|
||||||
// Other errors we'll consider fatal.
|
|
||||||
Err(err) => return Err(err),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if bytes_read == data_size {
|
|
||||||
match Message::from_bytes(buf) {
|
|
||||||
Ok(message) => {
|
Ok(message) => {
|
||||||
println!("Got message from socket {}: {:?}", &event.token().0, &message);
|
println!("Got message from socket {}: {:?}", &event.token().0, &message);
|
||||||
let new_state = handle_message(context.clone(), message);
|
let new_state = handle_message(context.clone(), message, peers, &event.token());
|
||||||
|
let mut peer = peers.get_mut_peer(&event.token()).unwrap();
|
||||||
|
let mut stream = peer.get_stream();
|
||||||
match new_state {
|
match new_state {
|
||||||
State::Message { data } => {
|
State::Message { data } => {
|
||||||
if event.is_writable() {
|
if event.is_writable() {
|
||||||
// TODO handle all errors and buffer data to send
|
// TODO handle all errors and buffer data to send
|
||||||
send_message(connection, &data);
|
send_message(stream, &data);
|
||||||
} else {
|
} else {
|
||||||
registry.reregister(connection, event.token(), Interest::WRITABLE).unwrap();
|
registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap();
|
||||||
let mut peer = peer_state.get_mut(&event.token()).unwrap();
|
|
||||||
peer.set_state(State::Message { data });
|
peer.set_state(State::Message { data });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
State::Connecting => {}
|
State::Connecting => {}
|
||||||
State::Connected => {}
|
State::Connected => {}
|
||||||
State::Idle { .. } => {
|
State::Idle { .. } => {
|
||||||
let mut peer = peer_state.get_mut(&event.token()).unwrap();
|
|
||||||
peer.set_state(State::idle());
|
peer.set_state(State::idle());
|
||||||
}
|
}
|
||||||
State::Error => {}
|
State::Error => {}
|
||||||
State::Banned => {}
|
State::Banned => {}
|
||||||
State::Offline { .. } => {
|
State::Offline { .. } => {
|
||||||
let mut peer = peer_state.get_mut(&event.token()).unwrap();
|
|
||||||
peer.set_state(State::offline(1));
|
peer.set_state(State::offline(1));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(_) => {}
|
Err(_) => { return Ok(false); }
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Consider connection as unreliable
|
// Consider connection as unreliable
|
||||||
@@ -200,48 +167,81 @@ fn handle_connection_event(context: Arc<Mutex<Context>>, peer_state: &mut HashMa
|
|||||||
}
|
}
|
||||||
|
|
||||||
if event.is_writable() {
|
if event.is_writable() {
|
||||||
println!("Socket {} is writable", event.token().0);
|
//println!("Socket {} is writable", event.token().0);
|
||||||
match peer_state.get(&event.token()) {
|
match peers.get_mut_peer(&event.token()) {
|
||||||
None => {}
|
None => {}
|
||||||
Some(peer) => {
|
Some(peer) => {
|
||||||
match peer.get_state() {
|
match peer.get_state().clone() {
|
||||||
State::Connecting => {
|
State::Connecting => {
|
||||||
println!("Hello needed for socket {}", event.token().0);
|
println!("Sending hello to socket {}", event.token().0);
|
||||||
let data: String = {
|
let data: String = {
|
||||||
let mut c = context.lock().unwrap();
|
let mut c = context.lock().unwrap();
|
||||||
let message = Message::Hand { chain: c.settings.chain_name.clone(), version: c.settings.version_flags };
|
let message = Message::hand(&c.settings.chain_name, c.settings.version_flags, c.settings.public);
|
||||||
serde_json::to_string(&message).unwrap()
|
serde_json::to_string(&message).unwrap()
|
||||||
};
|
};
|
||||||
send_message(connection, &data.into_bytes());
|
send_message(peer.get_stream(), &data.into_bytes());
|
||||||
println!("Sent hello through socket {}", event.token().0);
|
//println!("Sent hello through socket {}", event.token().0);
|
||||||
}
|
}
|
||||||
State::Message { data } => {
|
State::Message { data } => {
|
||||||
println!("Sending data to socket {}: {}", event.token().0, &String::from_utf8(data.clone()).unwrap());
|
println!("Sending data to socket {}: {}", event.token().0, &String::from_utf8(data.clone()).unwrap());
|
||||||
send_message(connection, data);
|
send_message(peer.get_stream(), &data);
|
||||||
}
|
}
|
||||||
State::Connected => {}
|
State::Connected => {}
|
||||||
State::Idle { from } => {
|
State::Idle { from } => {
|
||||||
|
println!("Odd version of pings :)");
|
||||||
if from.elapsed().as_secs() >= 30 {
|
if from.elapsed().as_secs() >= 30 {
|
||||||
let data: String = {
|
let data: String = {
|
||||||
let mut c = context.lock().unwrap();
|
let mut c = context.lock().unwrap();
|
||||||
let message = Message::ping(c.blockchain.height());
|
let message = Message::ping(c.blockchain.height());
|
||||||
serde_json::to_string(&message).unwrap()
|
serde_json::to_string(&message).unwrap()
|
||||||
};
|
};
|
||||||
send_message(connection, &data.into_bytes());
|
send_message(peer.get_stream(), &data.into_bytes());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
State::Error => {}
|
State::Error => {}
|
||||||
State::Banned => {}
|
State::Banned => {}
|
||||||
State::Offline { .. } => {}
|
State::Offline { .. } => {}
|
||||||
}
|
}
|
||||||
|
registry.reregister(peer.get_stream(), event.token(), Interest::READABLE).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
registry.reregister(connection, event.token(), Interest::READABLE).unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn read_message(stream: &mut &mut TcpStream) -> Result<Vec<u8>, Vec<u8>> {
|
||||||
|
let data_size = match stream.read_u32::<BigEndian>() {
|
||||||
|
Ok(size) => { size as usize }
|
||||||
|
Err(e) => {
|
||||||
|
println!("Error reading from socket! {}", e);
|
||||||
|
0
|
||||||
|
}
|
||||||
|
};
|
||||||
|
println!("Payload size is {}", data_size);
|
||||||
|
|
||||||
|
// TODO check for very big buffer, make it no more 10Mb
|
||||||
|
let mut buf = vec![0u8; data_size];
|
||||||
|
let mut bytes_read = 0;
|
||||||
|
loop {
|
||||||
|
match stream.read(&mut buf[bytes_read..]) {
|
||||||
|
Ok(bytes) => {
|
||||||
|
bytes_read += bytes;
|
||||||
|
}
|
||||||
|
// Would block "errors" are the OS's way of saying that the connection is not actually ready to perform this I/O operation.
|
||||||
|
Err(ref err) if would_block(err) => break,
|
||||||
|
Err(ref err) if interrupted(err) => continue,
|
||||||
|
// Other errors we'll consider fatal.
|
||||||
|
Err(err) => return Err(buf),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if buf.len() == data_size {
|
||||||
|
Ok(buf)
|
||||||
|
} else {
|
||||||
|
Err(buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn send_message(connection: &mut TcpStream, data: &Vec<u8>) {
|
fn send_message(connection: &mut TcpStream, data: &Vec<u8>) {
|
||||||
// TODO handle errors
|
// TODO handle errors
|
||||||
connection.write_u32::<BigEndian>(data.len() as u32);
|
connection.write_u32::<BigEndian>(data.len() as u32);
|
||||||
@@ -249,11 +249,17 @@ fn send_message(connection: &mut TcpStream, data: &Vec<u8>) {
|
|||||||
connection.flush();
|
connection.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_message(context: Arc<Mutex<Context>>, message: Message) -> State {
|
fn handle_message(context: Arc<Mutex<Context>>, message: Message, peers: &mut Peers, token: &Token) -> State {
|
||||||
|
let my_height = {
|
||||||
|
let context = context.lock().unwrap();
|
||||||
|
context.blockchain.height()
|
||||||
|
};
|
||||||
match message {
|
match message {
|
||||||
Message::Hand { chain, version } => {
|
Message::Hand { chain, version, public } => {
|
||||||
let context = context.lock().unwrap();
|
let context = context.lock().unwrap();
|
||||||
if chain == context.settings.chain_name && version == context.settings.version_flags {
|
if chain == context.settings.chain_name && version == context.settings.version_flags {
|
||||||
|
let mut peer = peers.get_mut_peer(token).unwrap();
|
||||||
|
peer.set_public(public);
|
||||||
State::message(Message::shake(true, context.blockchain.height()))
|
State::message(Message::shake(true, context.blockchain.height()))
|
||||||
} else {
|
} else {
|
||||||
State::Error
|
State::Error
|
||||||
@@ -261,11 +267,10 @@ fn handle_message(context: Arc<Mutex<Context>>, message: Message) -> State {
|
|||||||
}
|
}
|
||||||
Message::Shake { ok, height } => {
|
Message::Shake { ok, height } => {
|
||||||
if ok {
|
if ok {
|
||||||
let context = context.lock().unwrap();
|
if height > my_height {
|
||||||
if height > context.blockchain.height() {
|
State::message(Message::GetBlock { index: my_height + 1u64 })
|
||||||
State::message(Message::GetBlock { index: context.blockchain.height() + 1u64 })
|
|
||||||
} else {
|
} else {
|
||||||
State::idle()
|
State::message(Message::GetPeers)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
State::Error
|
State::Error
|
||||||
@@ -273,23 +278,27 @@ fn handle_message(context: Arc<Mutex<Context>>, message: Message) -> State {
|
|||||||
}
|
}
|
||||||
Message::Error => { State::Error }
|
Message::Error => { State::Error }
|
||||||
Message::Ping { height } => {
|
Message::Ping { height } => {
|
||||||
let context = context.lock().unwrap();
|
if height > my_height {
|
||||||
if height > context.blockchain.height() {
|
State::message(Message::GetBlock { index: my_height + 1u64 })
|
||||||
State::message(Message::GetBlock { index: context.blockchain.height() + 1u64 })
|
|
||||||
} else {
|
} else {
|
||||||
State::message(Message::pong(context.blockchain.height()))
|
State::message(Message::pong(my_height))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Message::Pong { height } => {
|
Message::Pong { height } => {
|
||||||
let context = context.lock().unwrap();
|
if height > my_height {
|
||||||
if height > context.blockchain.height() {
|
State::message(Message::GetBlock { index: my_height + 1u64 })
|
||||||
State::message(Message::GetBlock { index: context.blockchain.height() + 1u64 })
|
|
||||||
} else {
|
} else {
|
||||||
State::idle()
|
State::idle()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Message::GetPeers => { State::Error }
|
Message::GetPeers => {
|
||||||
Message::Peers => { State::Error }
|
let peer = peers.get_peer(token).unwrap();
|
||||||
|
State::message(Message::Peers { peers: peers.get_peers_for_exchange(&peer.get_addr()) })
|
||||||
|
}
|
||||||
|
Message::Peers { peers: new_peers } => {
|
||||||
|
peers.add_peers_from_exchange(new_peers);
|
||||||
|
State::idle()
|
||||||
|
}
|
||||||
Message::GetBlock { index } => {
|
Message::GetBlock { index } => {
|
||||||
let context = context.lock().unwrap();
|
let context = context.lock().unwrap();
|
||||||
match context.blockchain.get_block(index) {
|
match context.blockchain.get_block(index) {
|
||||||
@@ -314,7 +323,27 @@ fn handle_message(context: Arc<Mutex<Context>>, message: Message) -> State {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn next(current: &mut Token) -> Token {
|
/// Connecting to configured (bootstrap) peers
|
||||||
|
fn connect_peers(peers_addrs: Vec<String>, poll: &mut Poll, peers: &mut Peers, unique_token: &mut Token) {
|
||||||
|
for peer in peers_addrs.iter() {
|
||||||
|
let addr: SocketAddr = peer.parse().expect(&format!("Error parsing peer address {}", &peer));
|
||||||
|
match TcpStream::connect(addr.clone()) {
|
||||||
|
Ok(mut stream) => {
|
||||||
|
println!("Created connection to peer {}", &peer);
|
||||||
|
let token = next(unique_token);
|
||||||
|
poll.registry().register(&mut stream, token, Interest::WRITABLE).unwrap();
|
||||||
|
let mut peer = Peer::new(addr, stream, State::Connecting, false);
|
||||||
|
peer.set_public(true);
|
||||||
|
peers.add_peer(token, peer);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
println!("Error connecting to peer {}: {}", &peer, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn next(current: &mut Token) -> Token {
|
||||||
let next = current.0;
|
let next = current.0;
|
||||||
current.0 += 1;
|
current.0 += 1;
|
||||||
Token(next)
|
Token(next)
|
||||||
|
|||||||
+44
-3
@@ -1,13 +1,28 @@
|
|||||||
use crate::p2p::State;
|
use crate::p2p::State;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use mio::net::TcpStream;
|
||||||
|
use std::sync::RwLock;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
pub struct Peer {
|
pub struct Peer {
|
||||||
addr: String,
|
addr: SocketAddr,
|
||||||
|
stream: TcpStream,
|
||||||
state: State,
|
state: State,
|
||||||
|
inbound: bool,
|
||||||
|
public: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Peer {
|
impl Peer {
|
||||||
pub fn new(addr: String, state: State) -> Self {
|
pub fn new(addr: SocketAddr, stream: TcpStream, state: State, inbound: bool) -> Self {
|
||||||
Peer { addr, state }
|
Peer { addr, stream, state, inbound, public: false }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_addr(&self) -> SocketAddr {
|
||||||
|
self.addr.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_stream(&mut self) -> &mut TcpStream {
|
||||||
|
&mut self.stream
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_state(&self) -> &State {
|
pub fn get_state(&self) -> &State {
|
||||||
@@ -17,4 +32,30 @@ impl Peer {
|
|||||||
pub fn set_state(&mut self, state: State) {
|
pub fn set_state(&mut self, state: State) {
|
||||||
self.state = state;
|
self.state = state;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn is_public(&self) -> bool {
|
||||||
|
self.public
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_public(&mut self, public: bool) {
|
||||||
|
self.public = public;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn active(&self) -> bool {
|
||||||
|
self.state.active()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn disabled(&self) -> bool {
|
||||||
|
self.state.disabled()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn equals(&self, addr: &SocketAddr) -> bool {
|
||||||
|
/// If loopback address then we care about ip and port.
|
||||||
|
/// If regular address then we only care about the ip and ignore the port.
|
||||||
|
if self.addr.ip().is_loopback() {
|
||||||
|
self.addr == *addr
|
||||||
|
} else {
|
||||||
|
self.addr.ip() == addr.ip()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -0,0 +1,126 @@
|
|||||||
|
use std::collections::HashMap;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use mio::{Token, Interest, Registry};
|
||||||
|
use mio::net::TcpStream;
|
||||||
|
use crate::p2p::{Peer, State, Message};
|
||||||
|
use crate::p2p::network::LISTEN_PORT;
|
||||||
|
use crate::p2p::network::next;
|
||||||
|
use rand::random;
|
||||||
|
|
||||||
|
pub struct Peers {
|
||||||
|
peers: HashMap<Token, Peer>,
|
||||||
|
new_peers: Vec<SocketAddr>
|
||||||
|
}
|
||||||
|
|
||||||
|
const PING_PERIOD: u64 = 30;
|
||||||
|
|
||||||
|
impl Peers {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Peers { peers: HashMap::new(), new_peers: Vec::new() }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_peer(&mut self, token: Token, peer: Peer) {
|
||||||
|
self.peers.insert(token, peer);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_peer(&self, token: &Token) -> Option<&Peer> {
|
||||||
|
self.peers.get(token)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_mut_peer(&mut self, token: &Token) -> Option<&mut Peer> {
|
||||||
|
self.peers.get_mut(token)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove_peer(&mut self, token: &Token) -> Option<Peer> {
|
||||||
|
self.peers.remove(token)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_peers_from_exchange(&mut self, peers: Vec<String>) {
|
||||||
|
println!("Got peers: {:?}", &peers);
|
||||||
|
// TODO make it return error if these peers are wrong and seem like an attack
|
||||||
|
for peer in peers.iter() {
|
||||||
|
let addr: SocketAddr = peer.parse().expect(&format!("Error parsing peer {}", peer));
|
||||||
|
if addr.ip().is_loopback() {
|
||||||
|
continue; // Return error in future
|
||||||
|
}
|
||||||
|
let mut found = false;
|
||||||
|
for (_token, p) in self.peers.iter() {
|
||||||
|
if p.equals(&addr) {
|
||||||
|
found = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if found {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
self.new_peers.push(addr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_peers_for_exchange(&self, peer_address: &SocketAddr) -> Vec<String> {
|
||||||
|
let mut result: Vec<String> = Vec::new();
|
||||||
|
for (_, peer) in self.peers.iter() {
|
||||||
|
if peer.equals(peer_address) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if peer.is_public() {
|
||||||
|
result.push(SocketAddr::new(peer.get_addr().ip(), LISTEN_PORT).to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn skip_peer_connection(&self, addr: &SocketAddr) -> bool {
|
||||||
|
for (_, peer) in self.peers.iter() {
|
||||||
|
if peer.equals(addr) && (!peer.is_public() || peer.active() || peer.disabled()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send_pings(&mut self, registry: &Registry, height: u64) {
|
||||||
|
for (token, mut peer) in self.peers.iter_mut() {
|
||||||
|
match peer.get_state() {
|
||||||
|
State::Idle { from } => {
|
||||||
|
if from.elapsed().as_secs() >= PING_PERIOD {
|
||||||
|
// Sometimes we check for new peers instead of pinging
|
||||||
|
let random: u8 = random();
|
||||||
|
let message = if random < 16 {
|
||||||
|
Message::GetPeers
|
||||||
|
} else {
|
||||||
|
Message::ping(height)
|
||||||
|
};
|
||||||
|
|
||||||
|
peer.set_state(State::message(message));
|
||||||
|
let mut stream = peer.get_stream();
|
||||||
|
registry.reregister(stream, token.clone(), Interest::WRITABLE).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn connect_new_peers(&mut self, registry: &Registry, unique_token: &mut Token) {
|
||||||
|
if self.new_peers.is_empty() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for addr in self.new_peers.iter() {
|
||||||
|
match TcpStream::connect(addr.clone()) {
|
||||||
|
Ok(mut stream) => {
|
||||||
|
println!("Created connection to peer {}", &addr);
|
||||||
|
let token = next(unique_token);
|
||||||
|
registry.register(&mut stream, token, Interest::WRITABLE).unwrap();
|
||||||
|
let mut peer = Peer::new(addr.clone(), stream, State::Connecting, false);
|
||||||
|
peer.set_public(true);
|
||||||
|
self.peers.insert(token, peer);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
println!("Error connecting to peer {}: {}", &addr, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.new_peers.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use crate::p2p::Message;
|
use crate::p2p::Message;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq)]
|
||||||
pub enum State {
|
pub enum State {
|
||||||
Connecting,
|
Connecting,
|
||||||
Connected,
|
Connected,
|
||||||
@@ -35,4 +36,25 @@ impl State {
|
|||||||
let response = serde_json::to_string(&message).unwrap();
|
let response = serde_json::to_string(&message).unwrap();
|
||||||
State::Message {data: Vec::from(response.as_bytes()) }
|
State::Message {data: Vec::from(response.as_bytes()) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn active(&self) -> bool {
|
||||||
|
match self {
|
||||||
|
State::Connecting => { true }
|
||||||
|
State::Connected => { true }
|
||||||
|
State::Idle { .. } => { true }
|
||||||
|
State::Message { .. } => { true }
|
||||||
|
_ => { false }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn disabled(&self) -> bool {
|
||||||
|
match self {
|
||||||
|
State::Error => { true }
|
||||||
|
State::Banned => { true }
|
||||||
|
State::Offline { from, attempts } => {
|
||||||
|
from.elapsed().as_secs() < 60 // We check offline peers to become online every 5 minutes
|
||||||
|
}
|
||||||
|
_ => { false }
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Vendored
+1
-1
@@ -10831,7 +10831,7 @@ label.panel-block:hover {
|
|||||||
}
|
}
|
||||||
/*# sourceMappingURL=bulma.css.map */
|
/*# sourceMappingURL=bulma.css.map */
|
||||||
|
|
||||||
// TODO move to another file
|
/* TODO move to another file */
|
||||||
.container {
|
.container {
|
||||||
margin: 10pt;
|
margin: 10pt;
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user