diff --git a/Cargo.toml b/Cargo.toml index 3cb27c5..2109bc3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "alfis" -version = "0.5.9" +version = "0.6.0" authors = ["Revertron "] edition = "2018" build = "build.rs" @@ -18,7 +18,9 @@ toml = "0.5.8" digest = "0.9.0" sha2 = "0.9.5" ed25519-dalek = "1.0.1" +x25519-dalek = "1.1.1" ecies-ed25519 = "0.5.1" +chacha20poly1305 = "0.8.0" signature = "1.3.0" blakeout = "0.3.0" num_cpus = "1.13.0" @@ -26,6 +28,7 @@ byteorder = "1.4.3" serde = { version = "1.0.126", features = ["derive"] } serde_json = "1.0.64" bincode = "1.3.3" +serde_cbor = "0.11.1" base64 = "0.13.0" num-bigint = "0.4.0" num-traits = "0.2.14" diff --git a/src/blockchain/block.rs b/src/blockchain/block.rs index 8f64901..7decddb 100644 --- a/src/blockchain/block.rs +++ b/src/blockchain/block.rs @@ -61,13 +61,23 @@ impl Block { } } + pub fn from_bytes(data: &[u8]) -> serde_cbor::Result { + serde_cbor::from_slice(data) + } + pub fn is_genesis(&self) -> bool { self.index == 1 && matches!(Transaction::get_type(&self.transaction), TransactionType::Origin) && self.prev_block_hash == Bytes::default() } + /// Serializes block to CBOR for network pub fn as_bytes(&self) -> Vec { + serde_cbor::to_vec(&self).unwrap() + } + + /// Serializes block to bincode format for hashing. + pub fn as_bytes_compact(&self) -> Vec { bincode::serialize(&self).unwrap() } diff --git a/src/blockchain/chain.rs b/src/blockchain/chain.rs index 07a20f1..10908f5 100644 --- a/src/blockchain/chain.rs +++ b/src/blockchain/chain.rs @@ -1010,8 +1010,10 @@ impl SignersCache { pub mod tests { use log::LevelFilter; use simplelog::{ColorChoice, ConfigBuilder, TerminalMode, TermLogger}; + #[allow(unused_imports)] + use log::{debug, error, info, trace, warn}; - use crate::{Chain, Settings}; + use crate::{Chain, Settings, Block}; fn init_logger() { let config = ConfigBuilder::new() @@ -1035,4 +1037,28 @@ pub mod tests { chain.check_chain(u64::MAX); assert_eq!(chain.get_height(), 149); } + + #[test] + pub fn check_serde() { + let settings = Settings::default(); + let chain = Chain::new(&settings, "./tests/blockchain.db"); + + // Check the first block, its transaction doesn't have identity + let block = chain.get_block(1).unwrap(); + let buf = serde_cbor::to_vec(&block).unwrap(); + let block2: Block = serde_cbor::from_slice(&buf[..]).unwrap(); + assert_eq!(block, block2); + + // Check second block, it is common "full" block with domain + let block = chain.get_block(2).unwrap(); + let buf = serde_cbor::to_vec(&block).unwrap(); + let block2: Block = serde_cbor::from_slice(&buf[..]).unwrap(); + assert_eq!(block, block2); + + // Check block 36, it is an "empty" block, used to sign full blocks + let block = chain.get_block(36).unwrap(); + let buf = serde_cbor::to_vec(&block).unwrap(); + let block2: Block = serde_cbor::from_slice(&buf[..]).unwrap(); + assert_eq!(block, block2); + } } \ No newline at end of file diff --git a/src/blockchain/hash_utils.rs b/src/blockchain/hash_utils.rs index b731e62..e8d9251 100644 --- a/src/blockchain/hash_utils.rs +++ b/src/blockchain/hash_utils.rs @@ -9,7 +9,7 @@ pub fn check_block_hash(block: &Block) -> bool { let mut copy: Block = block.clone(); copy.hash = Bytes::default(); copy.signature = Bytes::default(); - blakeout_data(©.as_bytes()) == block.hash + blakeout_data(©.as_bytes_compact()) == block.hash } /// Hashes data by given hasher @@ -23,7 +23,7 @@ pub fn blakeout_data(data: &[u8]) -> Bytes { pub fn check_block_signature(block: &Block) -> bool { let mut copy = block.clone(); copy.signature = Bytes::default(); - Keystore::check(©.as_bytes(), ©.pub_key, &block.signature) + Keystore::check(©.as_bytes_compact(), ©.pub_key, &block.signature) } /// Hashes some identity (domain in case of DNS). If you give it a public key, it will hash with it as well. diff --git a/src/blockchain/transaction.rs b/src/blockchain/transaction.rs index 1897448..8712d32 100644 --- a/src/blockchain/transaction.rs +++ b/src/blockchain/transaction.rs @@ -49,11 +49,6 @@ impl Transaction { } } - pub fn get_bytes(&self) -> Vec { - // Let it panic if something is not okay - serde_json::to_vec(&self).unwrap() - } - pub fn to_string(&self) -> String { // Let it panic if something is not okay serde_json::to_string(&self).unwrap() diff --git a/src/commons/mod.rs b/src/commons/mod.rs index ec40f7a..f944411 100644 --- a/src/commons/mod.rs +++ b/src/commons/mod.rs @@ -1,7 +1,6 @@ use std::net::IpAddr; use std::num; -use mio::Token; use rand::Rng; #[cfg(not(target_os = "macos"))] use thread_priority::*; @@ -116,13 +115,6 @@ pub fn is_yggdrasil(addr: &IpAddr) -> bool { false } -/// Gets new token from old token, mutating the last -pub fn next(current: &mut Token) -> Token { - let next = current.0; - current.0 += 1; - Token(next) -} - /// Checks if this record has IP from Yggdrasil network /// https://yggdrasil-network.github.io pub fn is_yggdrasil_record(record: &DnsRecord) -> bool { diff --git a/src/crypto/chacha.rs b/src/crypto/chacha.rs new file mode 100644 index 0000000..0cca1c4 --- /dev/null +++ b/src/crypto/chacha.rs @@ -0,0 +1,66 @@ +use chacha20poly1305::{ChaCha20Poly1305, Key, Nonce}; +use chacha20poly1305::aead::{Aead, NewAead}; +use std::fmt::{Debug, Formatter}; +use std::fmt; + +pub const ZERO_NONCE: [u8; 12] = [0u8; 12]; +const FAILURE: &str = "encryption failure!"; + +/// A small wrap-up to use Chacha20 encryption for domain names. +#[derive(Clone)] +pub struct Chacha { + cipher: ChaCha20Poly1305, + nonce: [u8; 12] +} + +impl Chacha { + pub fn new(key: &[u8], nonce: &[u8]) -> Self { + let key = Key::from_slice(key); + let cipher = ChaCha20Poly1305::new(key); + let mut buf = [0u8; 12]; + buf.copy_from_slice(nonce); + Chacha { cipher, nonce: buf } + } + + pub fn encrypt(&self, data: &[u8]) -> Vec { + let nonce = Nonce::from(self.nonce.clone()); + self.cipher.encrypt(&nonce, data.as_ref()).expect(FAILURE) + } + + pub fn decrypt(&self, data: &[u8]) -> Vec { + let nonce = Nonce::from(self.nonce.clone()); + self.cipher.decrypt(&nonce, data.as_ref()).expect(FAILURE) + } + + pub fn get_nonce(&self) -> &[u8; 12] { + &self.nonce + } +} + +impl Debug for Chacha { + fn fmt(&self, fmt: &mut Formatter<'_>) -> fmt::Result { + fmt.write_str("ChaCha20Poly1305") + } +} + +#[cfg(test)] +mod tests { + use crate::crypto::Chacha; + use crate::{to_hex}; + + #[test] + pub fn test_chacha() { + let buf = b"178135D209C697625E3EC71DA5C760382E54936F824EE5083908DA66B14ECE18"; + let chacha1 = Chacha::new(b"178135D209C697625E3EC71DA5C76038", &buf[..12]); + let bytes1 = chacha1.encrypt(b"TEST"); + println!("{}", to_hex(&bytes1)); + + let chacha2 = Chacha::new(b"178135D209C697625E3EC71DA5C76038", &buf[..12]); + let bytes2 = chacha2.decrypt(&bytes1); + assert_eq!(String::from_utf8(bytes2).unwrap(), "TEST"); + + let bytes2 = chacha2.encrypt(b"TEST"); + + assert_eq!(bytes1, bytes2); + } +} \ No newline at end of file diff --git a/src/crypto/mod.rs b/src/crypto/mod.rs index 25839bb..9ab04b9 100644 --- a/src/crypto/mod.rs +++ b/src/crypto/mod.rs @@ -1,3 +1,6 @@ mod crypto_box; +mod chacha; -pub use crypto_box::CryptoBox; \ No newline at end of file +pub use crypto_box::CryptoBox; +pub use chacha::Chacha; +pub use chacha::ZERO_NONCE; \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 1dbf756..6098d96 100644 --- a/src/main.rs +++ b/src/main.rs @@ -184,7 +184,11 @@ fn main() { let miner: Arc> = Arc::new(Mutex::new(miner_obj)); let mut network = Network::new(Arc::clone(&context)); - network.start().expect("Error starting network component"); + thread::spawn(move || { + // Give UI some time to appear :) + thread::sleep(Duration::from_millis(1000)); + network.start(); + }); create_genesis_if_needed(&context, &miner); if no_gui { diff --git a/src/miner.rs b/src/miner.rs index 83dbeff..d00559d 100644 --- a/src/miner.rs +++ b/src/miner.rs @@ -280,7 +280,7 @@ impl Miner { Some(mut block) => { let index = block.index; let mut context = context.lock().unwrap(); - block.signature = Bytes::from_bytes(&job.keystore.sign(&block.as_bytes())); + block.signature = Bytes::from_bytes(&job.keystore.sign(&block.as_bytes_compact())); let mut success = false; if context.chain.check_new_block(&block) != BlockQuality::Good { warn!("Error adding mined block!"); @@ -341,7 +341,7 @@ fn find_hash(context: Arc>, mut block: Block, running: Arc= target_diff { block.hash = Bytes::from_bytes(digest.result()); diff --git a/src/p2p/message.rs b/src/p2p/message.rs index 9ec261e..67bf335 100644 --- a/src/p2p/message.rs +++ b/src/p2p/message.rs @@ -7,8 +7,8 @@ use crate::Bytes; #[derive(Debug, Serialize, Deserialize)] pub enum Message { Error, - Hand { #[serde(default = "default_version")] app_version: String, origin: String, version: u32, public: bool, #[serde(default)] rand: String }, - Shake { #[serde(default = "default_version")] app_version: String, origin: String, version: u32, ok: bool, height: u64 }, + Hand { app_version: String, origin: String, version: u32, public: bool, rand_id: String, }, + Shake { app_version: String, origin: String, version: u32, public: bool, rand_id: String, height: u64 }, Ping { height: u64, hash: Bytes }, Pong { height: u64, hash: Bytes }, Twin, @@ -16,24 +16,23 @@ pub enum Message { GetPeers, Peers { peers: Vec }, GetBlock { index: u64 }, - Block { index: u64, block: String }, + Block { index: u64, block: Vec }, } impl Message { pub fn from_bytes(bytes: Vec) -> Result { - let text = String::from_utf8(bytes).unwrap_or(String::from("Error{}")); - match serde_json::from_str(&text) { + match serde_cbor::from_slice(bytes.as_slice()) { Ok(cmd) => Ok(cmd), Err(_) => Err(()) } } - pub fn hand(app_version: &str, origin: &str, version: u32, public: bool, rand: &str) -> Self { - Message::Hand { app_version: app_version.to_owned(), origin: origin.to_owned(), version, public, rand: rand.to_owned() } + pub fn hand(app_version: &str, origin: &str, version: u32, public: bool, rand_id: &str) -> Self { + Message::Hand { app_version: app_version.to_owned(), origin: origin.to_owned(), version, public, rand_id: rand_id.to_owned() } } - pub fn shake(app_version: &str, origin: &str, version: u32, ok: bool, height: u64) -> Self { - Message::Shake { app_version: app_version.to_owned(), origin: origin.to_owned(), version, ok, height } + pub fn shake(app_version: &str, origin: &str, version: u32, public: bool, rand_id: &str, height: u64) -> Self { + Message::Shake { app_version: app_version.to_owned(), origin: origin.to_owned(), version, public, rand_id: rand_id.to_owned(), height } } pub fn ping(height: u64, hash: Bytes) -> Self { @@ -44,24 +43,7 @@ impl Message { Message::Pong { height, hash } } - pub fn block(height: u64, str: String) -> Self { - Message::Block { index: height, block: str } + pub fn block(height: u64, block: Vec) -> Self { + Message::Block { index: height, block } } -} - -fn default_version() -> String { - String::from("0.0.0") -} - -#[cfg(test)] -mod tests { - use crate::p2p::Message; - - #[test] - pub fn test_hand() { - assert!(serde_json::from_str::("\"Error\"").is_ok()); - assert!(serde_json::from_str::("{\"Hand\":{\"origin\":\"\",\"version\":1,\"public\":false,\"rand\":\"123\"}}").is_ok()); - assert!(serde_json::from_str::("{\"Hand\":{\"origin\":\"\",\"version\":1,\"public\":false}}").is_ok()); - } - } \ No newline at end of file diff --git a/src/p2p/network.rs b/src/p2p/network.rs index 7886ab4..a149415 100644 --- a/src/p2p/network.rs +++ b/src/p2p/network.rs @@ -3,7 +3,7 @@ extern crate serde_json; use std::{io, thread}; use std::cmp::max; -use std::io::{Read, Write}; +use std::io::{Read, Write, Error}; use std::net::{IpAddr, Shutdown, SocketAddr, SocketAddrV4}; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicBool, Ordering}; @@ -15,25 +15,38 @@ use log::{debug, error, info, trace, warn}; use mio::{Events, Interest, Poll, Registry, Token}; use mio::event::Event; use mio::net::{TcpListener, TcpStream}; -use rand::random; +use rand::{random, RngCore, Rng}; +use rand_old::prelude::thread_rng; +use x25519_dalek::{StaticSecret, PublicKey}; use crate::{Block, Context, p2p::Message, p2p::Peer, p2p::Peers, p2p::State}; use crate::blockchain::types::BlockQuality; use crate::commons::*; use crate::eventbus::{register, post}; +use crate::crypto::Chacha; const SERVER: Token = Token(0); pub struct Network { - context: Arc> + context: Arc>, + secret_key: StaticSecret, + public_key: PublicKey, + token: Token, + // States of peer connections, and some data to send when sockets become writable + peers: Peers, } impl Network { pub fn new(context: Arc>) -> Self { - Network { context } + // P2P encryption primitives + let mut thread_rng = thread_rng(); + let secret_key = StaticSecret::new(&mut thread_rng); + let public_key = PublicKey::from(&secret_key); + let peers = Peers::new(); + Network { context, secret_key, public_key, token: Token(1), peers } } - pub fn start(&mut self) -> Result<(), String> { + pub fn start(&mut self) { let (listen_addr, peers_addrs, yggdrasil_only) = { let c = self.context.lock().unwrap(); (c.settings.net.listen.clone(), c.settings.net.peers.clone(), c.settings.net.yggdrasil_only) @@ -47,145 +60,564 @@ impl Network { let mut server = TcpListener::bind(addr).expect("Can't bind to address"); debug!("Started node listener on {}", server.local_addr().unwrap()); - let mut events = Events::with_capacity(1024); + let mut events = Events::with_capacity(64); let mut poll = Poll::new().expect("Unable to create poll"); poll.registry().register(&mut server, SERVER, Interest::READABLE).expect("Error registering poll"); - let context = Arc::clone(&self.context); - thread::spawn(move || { - // Give UI some time to appear :) - thread::sleep(Duration::from_millis(2000)); - // Unique token for each incoming connection. - let mut unique_token = Token(SERVER.0 + 1); - // States of peer connections, and some data to send when sockets become writable - let mut peers = Peers::new(); - // Starting peer connections to bootstrap nodes - peers.connect_peers(&peers_addrs, &poll.registry(), &mut unique_token, yggdrasil_only); - let mut ui_timer = Instant::now(); - let mut log_timer = Instant::now(); - let mut bootstrap_timer = Instant::now(); - let mut connect_timer = Instant::now(); - let mut last_events_time = Instant::now(); - loop { - if peers.get_peers_count() == 0 && bootstrap_timer.elapsed().as_secs() > 60 { - warn!("Restarting swarm connections..."); - // Starting peer connections to bootstrap nodes - peers.connect_peers(&peers_addrs, &poll.registry(), &mut unique_token, yggdrasil_only); - bootstrap_timer = Instant::now(); - last_events_time = Instant::now(); - } - // Poll Mio for events, blocking until we get an event. - poll.poll(&mut events, POLL_TIMEOUT).expect("Error polling sockets"); - if !running.load(Ordering::SeqCst) { - break; - } + // Starting peer connections to bootstrap nodes + self.peers.connect_peers(&peers_addrs, &poll.registry(), &mut self.token, yggdrasil_only); - // Process each event. - for event in events.iter() { - //trace!("Event for socket {} is {:?}", event.token().0, &event); - // We can use the token we previously provided to `register` to determine for which socket the event is. - match event.token() { - SERVER => { - //debug!("Event for server socket {} is {:?}", event.token().0, &event); - // If this is an event for the server, it means a connection is ready to be accepted. - let connection = server.accept(); - match connection { - Ok((mut stream, mut address)) => { - // Checking if it is an ipv4-mapped ipv6 if yes convert to ipv4 - if address.is_ipv6() { - if let IpAddr::V6(ipv6) = address.ip() { - if let Some(ipv4) = ipv6.to_ipv4() { - address = SocketAddr::V4(SocketAddrV4::new(ipv4, address.port())) - } + let mut ui_timer = Instant::now(); + let mut log_timer = Instant::now(); + let mut bootstrap_timer = Instant::now(); + let mut connect_timer = Instant::now(); + let mut last_events_time = Instant::now(); + loop { + if self.peers.get_peers_count() == 0 && bootstrap_timer.elapsed().as_secs() > 60 { + warn!("Restarting swarm connections..."); + // Starting peer connections to bootstrap nodes + self.peers.connect_peers(&peers_addrs, &poll.registry(), &mut self.token, yggdrasil_only); + bootstrap_timer = Instant::now(); + last_events_time = Instant::now(); + } + // Poll Mio for events, blocking until we get an event. + poll.poll(&mut events, POLL_TIMEOUT).expect("Error polling sockets"); + if !running.load(Ordering::SeqCst) { + break; + } + + // Process each event. + for event in events.iter() { + //trace!("Event for socket {} is {:?}", event.token().0, &event); + // We can use the token we previously provided to `register` to determine for which socket the event is. + match event.token() { + SERVER => { + //debug!("Event for server socket {} is {:?}", event.token().0, &event); + // If this is an event for the server, it means a connection is ready to be accepted. + let connection = server.accept(); + match connection { + Ok((mut stream, mut address)) => { + // Checking if it is an ipv4-mapped ipv6 if yes convert to ipv4 + if address.is_ipv6() { + if let IpAddr::V6(ipv6) = address.ip() { + if let Some(ipv4) = ipv6.to_ipv4() { + address = SocketAddr::V4(SocketAddrV4::new(ipv4, address.port())) } } - - if peers.is_ignored(&address.ip()) { - debug!("Ignoring connection from banned {:?}", &address.ip()); - continue; - } - - if yggdrasil_only && !is_yggdrasil(&address.ip()) { - debug!("Dropping connection from Internet"); - stream.shutdown(Shutdown::Both).unwrap_or_else(|e|{ warn!("Error in shutdown, {}", e); }); - let _ = poll.registry().reregister(&mut server, SERVER, Interest::READABLE); - continue; - } - - //debug!("Accepted connection from: {} to local IP: {}", address, local_ip); - let token = next(&mut unique_token); - poll.registry().register(&mut stream, token, Interest::READABLE).expect("Error registering poll"); - peers.add_peer(token, Peer::new(address, stream, State::Connected, true)); } - Err(_) => {} - } - match poll.registry().reregister(&mut server, SERVER, Interest::READABLE) { - Ok(_) => {} - Err(e) => { - panic!("Error reregistering server token!\n{}", e); + + if self.peers.is_ignored(&address.ip()) { + debug!("Ignoring connection from banned {:?}", &address.ip()); + continue; } + + if yggdrasil_only && !is_yggdrasil(&address.ip()) { + debug!("Dropping connection from Internet"); + stream.shutdown(Shutdown::Both).unwrap_or_else(|e|{ warn!("Error in shutdown, {}", e); }); + let _ = poll.registry().reregister(&mut server, SERVER, Interest::READABLE); + continue; + } + + //debug!("Accepted connection from: {} to local IP: {}", address, local_ip); + let token = self.next_token(); + poll.registry().register(&mut stream, token, Interest::READABLE).expect("Error registering poll"); + let peer = Peer::new(address, stream, State::Connected, true); + self.peers.add_peer(token, peer); } + Err(_) => {} } - token => { - if !handle_connection_event(Arc::clone(&context), &mut peers, &poll.registry(), &event) { - let _ = peers.close_peer(poll.registry(), &token); - let blocks = context.lock().unwrap().chain.get_height(); - let keys = context.lock().unwrap().chain.get_users_count(); - let domains = context.lock().unwrap().chain.get_domains_count(); - post(crate::event::Event::NetworkStatus { blocks, domains, keys, nodes: peers.get_peers_active_count() }); - } + if let Err(e) = poll.registry().reregister(&mut server, SERVER, Interest::READABLE) { + panic!("Error reregistering server token!\n{}", e); } } - } - if !events.is_empty() { - last_events_time = Instant::now(); - } else if last_events_time.elapsed().as_secs() > MAX_IDLE_SECONDS { - if peers.get_peers_count() > 0 { - warn!("Something is wrong with swarm connections, closing all."); - peers.close_all_peers(poll.registry()); - continue; - } else { - thread::sleep(POLL_TIMEOUT.unwrap()); + token => { + if !self.handle_connection_event(&poll.registry(), &event) { + let _ = self.peers.close_peer(poll.registry(), &token); + let blocks = self.context.lock().unwrap().chain.get_height(); + let keys = self.context.lock().unwrap().chain.get_users_count(); + let domains = self.context.lock().unwrap().chain.get_domains_count(); + post(crate::event::Event::NetworkStatus { blocks, domains, keys, nodes: self.peers.get_peers_active_count() }); + } } } - - if ui_timer.elapsed().as_millis() > UI_REFRESH_DELAY_MS { - // Send pings to idle peers - let (height, hash) = { - let context = context.lock().unwrap(); - let blocks = context.chain.get_height(); - let nodes = peers.get_peers_active_count(); - let banned = peers.get_peers_banned_count(); - - let keys = context.chain.get_users_count(); - let domains = context.chain.get_domains_count(); - post(crate::event::Event::NetworkStatus { blocks, domains, keys, nodes }); - - if log_timer.elapsed().as_secs() > LOG_REFRESH_DELAY_SEC { - info!("Active nodes count: {}, banned count: {}, blocks count: {}", nodes, banned, blocks); - let elapsed = last_events_time.elapsed().as_secs(); - if elapsed >= 10 { - warn!("Last network events time {} seconds ago", elapsed); - } - log_timer = Instant::now(); - } - if nodes < MAX_NODES && connect_timer.elapsed().as_secs() >= 5 { - peers.connect_new_peers(poll.registry(), &mut unique_token, yggdrasil_only); - connect_timer = Instant::now(); - } - (blocks, context.chain.get_last_hash()) - }; - peers.update(poll.registry(), height, hash); - ui_timer = Instant::now(); - } } - if !running.load(Ordering::SeqCst) { - info!("Network loop finished"); + if !events.is_empty() { + last_events_time = Instant::now(); + } else if last_events_time.elapsed().as_secs() > MAX_IDLE_SECONDS { + if self.peers.get_peers_count() > 0 { + warn!("Something is wrong with swarm connections, closing all."); + self.peers.close_all_peers(poll.registry()); + continue; + } else { + thread::sleep(POLL_TIMEOUT.unwrap()); + } + } + + if ui_timer.elapsed().as_millis() > UI_REFRESH_DELAY_MS { + // Send pings to idle peers + let (height, hash) = { + let context = self.context.lock().unwrap(); + let blocks = context.chain.get_height(); + let nodes = self.peers.get_peers_active_count(); + let banned = self.peers.get_peers_banned_count(); + + let keys = context.chain.get_users_count(); + let domains = context.chain.get_domains_count(); + post(crate::event::Event::NetworkStatus { blocks, domains, keys, nodes }); + + if log_timer.elapsed().as_secs() > LOG_REFRESH_DELAY_SEC { + info!("Active nodes count: {}, banned count: {}, blocks count: {}", nodes, banned, blocks); + let elapsed = last_events_time.elapsed().as_secs(); + if elapsed >= 10 { + warn!("Last network events time {} seconds ago", elapsed); + } + log_timer = Instant::now(); + } + if nodes < MAX_NODES && connect_timer.elapsed().as_secs() >= 5 { + self.peers.connect_new_peers(poll.registry(), &mut self.token, yggdrasil_only); + connect_timer = Instant::now(); + } + (blocks, context.chain.get_last_hash()) + }; + self.peers.update(poll.registry(), height, hash); + ui_timer = Instant::now(); + } + } + if !running.load(Ordering::SeqCst) { + info!("Network loop finished"); + } else { + panic!("Network loop has broken prematurely!"); + } + } + + fn handle_connection_event(&mut self, registry: &Registry, event: &Event) -> bool { + if event.is_error() || (event.is_read_closed() && event.is_write_closed()) { + return false; + } + + if event.is_readable() { + let data = { + let token = event.token(); + match self.peers.get_mut_peer(&token) { + None => { + error!("Error getting peer for connection {}", token.0); + return false; + } + Some(peer) => { + if event.is_read_closed() { + debug!("Node from {} disconnected", peer.get_addr().ip()); + return false; + } + match peer.get_state().clone() { + State::Connected => { + let mut stream = peer.get_stream(); + return match read_client_handshake(&mut stream) { + Ok(key) => { + let mut buf = [0u8; 32]; + buf.copy_from_slice(key.as_slice()); + let public_key: PublicKey = PublicKey::from(buf); + let shared = self.secret_key.diffie_hellman(&public_key); + let mut nonce = [0u8; 12]; + let mut rng = rand::thread_rng(); + rng.fill(&mut nonce); + let chacha = Chacha::new(shared.as_bytes(), &nonce); + registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap(); + std::mem::drop(stream); + peer.set_cipher(chacha); + peer.set_state(State::ServerHandshake); + info!("Client hello read successfully"); + true + } + Err(e) => { + warn!("Error reading client handshake. {}", e); + false + } + } + } + State::ServerHandshake => { + let mut stream = peer.get_stream(); + return match read_server_handshake(&mut stream) { + Ok(data) => { + if data.len() != 32 + 12 { + warn!("Server handshake of {} bytes instead of {}", data.len(), 32 + 12); + return false; + } + let mut buf = [0u8; 32]; + buf.copy_from_slice(&data.as_slice()[0..32]); + let public_key: PublicKey = PublicKey::from(buf); + let mut nonce = [0u8; 12]; + nonce.copy_from_slice(&data.as_slice()[32..]); + let shared = self.secret_key.diffie_hellman(&public_key); + let chacha = Chacha::new(shared.as_bytes(), &nonce); + registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap(); + std::mem::drop(stream); + peer.set_cipher(chacha); + peer.set_state(State::HandshakeFinished); + info!("Server hello read successfully"); + true + } + Err(e) => { + warn!("Error reading server handshake. {}", e); + false + } + } + } + _ => { + let mut stream = peer.get_stream(); + read_message(&mut stream) + } + } + } + } + }; + + if data.is_ok() { + let data = { + match self.peers.get_peer(&event.token()) { + Some(peer) => { + let data = data.unwrap(); + //info!("Decoding message {:?}", to_hex(data.as_slice())); + match decode_message(&data, peer.get_cipher()) { + Ok(data) => { + data + } + Err(_) => { + vec![] + } + } + } + None => { + vec![] + } + } + }; + match Message::from_bytes(data) { + Ok(message) => { + let m = format!("{:?}", &message); + let new_state = self.handle_message(message, &event.token()); + let peer = self.peers.get_mut_peer(&event.token()).unwrap(); + debug!("Got message from {}: {:?}", &peer.get_addr(), &m); + let stream = peer.get_stream(); + match new_state { + State::Message { data } => { + registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap(); + peer.set_state(State::Message { data }); + } + State::Connecting => {} + State::Connected => {} + State::ServerHandshake => {} + State::HandshakeFinished => {} + State::Idle { .. } => { + peer.set_state(State::idle()); + } + State::Error => {} + State::Banned => { + self.peers.ignore_peer(registry, &event.token()); + } + State::Offline { .. } => { + peer.set_state(State::offline()); + } + State::Loop => { + peer.set_state(State::Loop); + self.peers.ignore_peer(registry, &event.token()); + } + State::SendLoop => { + registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap(); + peer.set_state(State::SendLoop); + } + State::Twin => { + peer.set_state(State::Twin); + // TODO set something in [Peers], maybe ignore this IP? + return false; + } + } + } + Err(_) => { + let peer = self.peers.get_peer(&event.token()).unwrap(); + warn!("Error deserializing message from {}", &peer.get_addr()); + return false; + } + } } else { - panic!("Network loop has broken prematurely!"); + return false; } - }); - Ok(()) + } + + if event.is_writable() { + let my_id = self.peers.get_my_id().to_owned(); + match self.peers.get_mut_peer(&event.token()) { + None => {} + Some(peer) => { + match peer.get_state().clone() { + State::Connecting => { + if send_client_handshake(&mut peer.get_stream(), self.public_key.as_bytes()).is_err() { + return false; + } + peer.set_state(State::ServerHandshake); + } + State::ServerHandshake => { + if send_server_handshake(peer, self.public_key.as_bytes()).is_err() { + return false; + } + peer.set_state(State::HandshakeFinished); + info!("Server handshake sent"); + } + State::HandshakeFinished => { + //debug!("Connected to peer {}, sending hello...", &peer.get_addr()); + let data: Vec = { + let c = self.context.lock().unwrap(); + let message = Message::hand(&c.app_version, &c.settings.origin, CHAIN_VERSION, c.settings.net.public, &my_id); + info!("Sending: {:?}", &message); + encode_message(&message, peer.get_cipher()).unwrap() + }; + send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending hello {}", e)); + //debug!("Sent hello to {}", &peer.get_addr()); + } + State::Connected => {} + State::Message { data } => { + //debug!("Sending data to {}: {}", &peer.get_addr(), &String::from_utf8(data.clone()).unwrap()); + let data = encode_bytes(&data, peer.get_cipher()); + send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending message {}", e)); + } + State::Idle { from } => { + debug!("Odd version of pings :)"); + if from.elapsed().as_secs() >= 30 { + let data: Vec = { + let c = self.context.lock().unwrap(); + let message = Message::ping(c.chain.get_height(), c.chain.get_last_hash()); + encode_message(&message, peer.get_cipher()).unwrap() + }; + send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending ping {}", e)); + } + } + State::Error => {} + State::Banned => {} + State::Offline { .. } => {} + State::Loop => {} + State::SendLoop => { + let data = encode_message(&Message::Loop, peer.get_cipher()).unwrap(); + send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending loop {}", e)); + } + State::Twin => { + let data = encode_message(&Message::Twin, peer.get_cipher()).unwrap(); + send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending loop {}", e)); + } + } + registry.reregister(peer.get_stream(), event.token(), Interest::READABLE).unwrap(); + } + } + } + + true + } + + fn handle_message(&mut self, message: Message, token: &Token) -> State { + let (my_height, my_hash, my_origin, my_version, me_public) = { + let context = self.context.lock().unwrap(); + // TODO cache it somewhere + (context.chain.get_height(), context.chain.get_last_hash(), &context.settings.origin.clone(), CHAIN_VERSION, context.settings.net.public) + }; + let my_id = self.peers.get_my_id().to_owned(); + let answer = match message { + Message::Hand { app_version, origin, version, public, rand_id } => { + if self.peers.is_our_own_connect(&rand_id) { + warn!("Detected loop connect"); + State::SendLoop + } else { + if origin.eq(my_origin) && version == my_version { + let peer = self.peers.get_mut_peer(token).unwrap(); + peer.set_public(public); + peer.set_active(true); + debug!("Incoming v{} on {}", &app_version, peer.get_addr().ip()); + let app_version = self.context.lock().unwrap().app_version.clone(); + State::message(Message::shake(&app_version, &origin, version, me_public, &my_id, my_height)) + } else { + warn!("Handshake from unsupported chain or version"); + State::Banned + } + } + } + Message::Shake { app_version, origin, version, public, rand_id, height } => { + if origin.ne(my_origin) || version != my_version { + return State::Banned; + } + if self.peers.is_tween_connect(&rand_id) { + return State::Twin; + } + let nodes = self.peers.get_peers_active_count(); + let peer = self.peers.get_mut_peer(token).unwrap(); + // TODO check rand_id whether we have this peers connection already + debug!("Outgoing v{} on {}", &app_version, peer.get_addr().ip()); + peer.set_height(height); + peer.set_active(true); + peer.set_public(public); + peer.reset_reconnects(); + let mut context = self.context.lock().unwrap(); + if peer.is_higher(my_height) { + context.chain.update_max_height(height); + let event = crate::event::Event::Syncing { have: my_height, height: max(height, my_height) }; + post(event); + } + if nodes < MAX_NODES && random::() { + debug!("Requesting more peers from {}", peer.get_addr().ip()); + State::message(Message::GetPeers) + } else { + State::idle() + } + } + Message::Error => { State::Error } + Message::Ping { height, hash } => { + let peer = self.peers.get_mut_peer(token).unwrap(); + peer.set_height(height); + peer.set_active(true); + if peer.is_higher(my_height) { + let mut context = self.context.lock().unwrap(); + context.chain.update_max_height(height); + info!("Peer is higher, requesting block {} from {}", height, peer.get_addr().ip()); + State::message(Message::GetBlock { index: my_height + 1 }) + } else if my_height == height && hash.ne(&my_hash) { + info!("Hashes are different, requesting block {} from {}", my_height, peer.get_addr().ip()); + info!("My hash: {:?}, their hash: {:?}", &my_hash, &hash); + State::message(Message::GetBlock { index: my_height }) + } else { + State::message(Message::pong(my_height, my_hash)) + } + } + Message::Pong { height, hash } => { + let active_count = self.peers.get_peers_active_count(); + let peer = self.peers.get_mut_peer(token).unwrap(); + peer.set_height(height); + peer.set_active(true); + if peer.is_higher(my_height) { + let mut context = self.context.lock().unwrap(); + context.chain.update_max_height(height); + info!("Peer is higher, requesting block {} from {}", height, peer.get_addr().ip()); + State::message(Message::GetBlock { index: my_height + 1 }) + } else if my_height == height && hash.ne(&my_hash) { + info!("Hashes are different, requesting block {} from {}", my_height, peer.get_addr().ip()); + info!("My hash: {:?}, their hash: {:?}", &my_hash, &hash); + State::message(Message::GetBlock { index: my_height }) + } else { + if active_count < MAX_NODES && random::() < 50 { + debug!("Requesting more peers from {}", peer.get_addr().ip()); + State::message(Message::GetPeers) + } else { + State::idle() + } + } + } + Message::GetPeers => { + let addr = { + let peer = self.peers.get_mut_peer(token).unwrap(); + peer.set_active(true); + peer.get_addr().clone() + }; + State::message(Message::Peers { peers: self.peers.get_peers_for_exchange(&addr) }) + } + Message::Peers { peers: new_peers } => { + let peer = self.peers.get_mut_peer(token).unwrap(); + peer.set_active(true); + self.peers.add_peers_from_exchange(new_peers); + State::idle() + } + Message::GetBlock { index } => { + let peer = self.peers.get_mut_peer(token).unwrap(); + peer.set_active(true); + let context = self.context.lock().unwrap(); + match context.chain.get_block(index) { + Some(block) => State::message(Message::block(block.index, block.as_bytes())), + None => State::Error + } + } + Message::Block { index, block } => { + let peer = self.peers.get_mut_peer(token).unwrap(); + peer.set_active(true); + let block: Block = match Block::from_bytes(block.as_slice()) { + Ok(block) => block, + Err(e) => { + warn!("Error deserializing block! {}", e); + return State::Banned + } + }; + if index != block.index { + return State::Banned; + } + info!("Received block {} with hash {:?}", block.index, &block.hash); + self.handle_block(token, block) + } + Message::Twin => { State::Twin } + Message::Loop => { State::Loop } + }; + answer + } + + fn handle_block(&mut self, token: &Token, block: Block) -> State { + let peers_count = self.peers.get_peers_active_count(); + let peer = self.peers.get_mut_peer(token).unwrap(); + peer.set_received_block(block.index); + + let mut context = self.context.lock().unwrap(); + let max_height = context.chain.max_height(); + match context.chain.check_new_block(&block) { + BlockQuality::Good => { + context.chain.add_block(block); + let my_height = context.chain.get_height(); + post(crate::event::Event::BlockchainChanged { index: my_height }); + // If it was the last block to sync + if my_height == max_height { + post(crate::event::Event::SyncFinished); + } else { + let event = crate::event::Event::Syncing { have: my_height, height: max(max_height, my_height) }; + post(event); + } + let domains = context.chain.get_domains_count(); + let keys = context.chain.get_users_count(); + post(crate::event::Event::NetworkStatus { blocks: my_height, domains, keys, nodes: peers_count }); + // To load blocks from different nodes we randomize requests of new blocks + // TODO rethink this approach + if max_height > my_height && random::() < 200 { + return State::message(Message::GetBlock { index: my_height + 1 }); + } + } + BlockQuality::Twin => { debug!("Ignoring duplicate block {}", block.index); } + BlockQuality::Future => { debug!("Ignoring future block {}", block.index); } + BlockQuality::Bad => { + // TODO save bad public keys to banned table + debug!("Ignoring bad block from {}:\n{:?}", peer.get_addr(), &block); + let height = context.chain.get_height(); + context.chain.update_max_height(height); + post(crate::event::Event::SyncFinished); + return State::Banned; + } + BlockQuality::Rewind => { + debug!("Got some orphan block, requesting its parent"); + return State::message(Message::GetBlock { index: block.index - 1 }); + } + BlockQuality::Fork => { + debug!("Got forked block {} with hash {:?}", block.index, block.hash); + // If we are very much behind of blockchain + let lagged = block.index == context.chain.get_height() && block.index + LIMITED_CONFIDENCE_DEPTH <= max_height; + let last_block = context.chain.last_block().unwrap(); + if block.is_better_than(&last_block) || lagged { + context.chain.replace_block(block).expect("Error replacing block with fork"); + let index = context.chain.get_height(); + post(crate::event::Event::BlockchainChanged { index }); + } else { + debug!("Fork in not better than our block, dropping."); + if let Some(block) = context.chain.get_block(block.index) { + return State::message(Message::block(block.index, block.as_bytes())); + } + } + } + } + State::idle() + } + + /// Gets new token from old token, mutating the last + pub fn next_token(&mut self) -> Token { + let current = self.token.0; + self.token.0 += 1; + Token(current) } } @@ -203,141 +635,56 @@ fn subscribe_to_bus(running: Arc) { }); } -fn handle_connection_event(context: Arc>, peers: &mut Peers, registry: &Registry, event: &Event) -> bool { - if event.is_error() || (event.is_read_closed() && event.is_write_closed()) { - return false; - } - if event.is_readable() { - let data = { - let token = event.token(); - match peers.get_mut_peer(&token) { +fn encode_bytes(data: &Vec, cipher: &Option) -> Vec { + match cipher { + None => { data.clone() } + Some(chacha) => { + chacha.encrypt(data.as_slice()) + } + } +} + +fn encode_message(message: &Message, cipher: &Option) -> Result, ()> { + match serde_cbor::to_vec(message) { + Ok(vec) => { + match cipher { None => { - error!("Error getting peer for connection {}", token.0); - return false; + //info!("No cipher, not encoding message: {:?}", to_hex(&vec)); + Ok(vec) } - Some(peer) => { - if event.is_read_closed() { - debug!("Node from {} disconnected", peer.get_addr().ip()); - return false; - } - let mut stream = peer.get_stream(); - read_message(&mut stream) + Some(chacha) => { + //info!("Encoding message: {:?}", to_hex(&vec)); + Ok(chacha.encrypt(vec.as_slice())) } } - }; - - if data.is_ok() { - let data = data.unwrap(); - match Message::from_bytes(data) { - Ok(message) => { - //let m = format!("{:?}", &message); - let new_state = handle_message(Arc::clone(&context), message, peers, &event.token()); - let peer = peers.get_mut_peer(&event.token()).unwrap(); - //debug!("Got message from {}: {:?}", &peer.get_addr(), &m); - let stream = peer.get_stream(); - match new_state { - State::Message { data } => { - registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap(); - peer.set_state(State::Message { data }); - } - State::Connecting => {} - State::Connected => {} - State::Idle { .. } => { - peer.set_state(State::idle()); - } - State::Error => {} - State::Banned => { - peers.ignore_peer(registry, &event.token()); - } - State::Offline { .. } => { - peer.set_state(State::offline()); - } - State::Loop => { - peer.set_state(State::Loop); - peers.ignore_peer(registry, &event.token()); - } - State::SendLoop => { - registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap(); - peer.set_state(State::SendLoop); - } - State::Twin => { - registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap(); - peer.set_state(State::Twin); - } - } - } - Err(_) => { return false; } - } - } else { - return false; - } - } - - if event.is_writable() { - //trace!("Socket {} is writable", event.token().0); - let my_id = peers.get_my_id().to_owned(); - match peers.get_mut_peer(&event.token()) { - None => {} - Some(peer) => { - match peer.get_state().clone() { - State::Connecting => { - //debug!("Connected to peer {}, sending hello...", &peer.get_addr()); - let data: String = { - let c = context.lock().unwrap(); - let message = Message::hand(&c.app_version, &c.settings.origin, CHAIN_VERSION, c.settings.net.public, &my_id); - serde_json::to_string(&message).unwrap() - }; - send_message(peer.get_stream(), &data.into_bytes()).unwrap_or_else(|e| warn!("Error sending hello {}", e)); - //debug!("Sent hello to {}", &peer.get_addr()); - } - State::Message { data } => { - //debug!("Sending data to {}: {}", &peer.get_addr(), &String::from_utf8(data.clone()).unwrap()); - send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending message {}", e)); - } - State::Connected => {} - State::Idle { from } => { - debug!("Odd version of pings :)"); - if from.elapsed().as_secs() >= 30 { - let data: String = { - let c = context.lock().unwrap(); - let message = Message::ping(c.chain.get_height(), c.chain.get_last_hash()); - serde_json::to_string(&message).unwrap() - }; - send_message(peer.get_stream(), &data.into_bytes()).unwrap_or_else(|e| warn!("Error sending ping {}", e)); - } - } - State::Error => {} - State::Banned => {} - State::Offline { .. } => {} - State::Loop => {} - State::SendLoop => { - let data = serde_json::to_string(&Message::Loop).unwrap(); - send_message(peer.get_stream(), &data.into_bytes()).unwrap_or_else(|e| warn!("Error sending loop {}", e)); - } - State::Twin => { - let data = serde_json::to_string(&Message::Twin).unwrap(); - send_message(peer.get_stream(), &data.into_bytes()).unwrap_or_else(|e| warn!("Error sending loop {}", e)); - } - } - registry.reregister(peer.get_stream(), event.token(), Interest::READABLE).unwrap(); - } + } + Err(e) => { + warn!("Could not encode message! {}", e); + Err(()) } } +} - true +fn decode_message(data: &Vec, cipher: &Option) -> Result, Error> { + match cipher { + None => { Ok(data.clone()) } + Some(chacha) => { + Ok(chacha.decrypt(data.as_slice())) + } + } } fn read_message(stream: &mut TcpStream) -> Result, ()> { let instant = Instant::now(); - let data_size = match stream.read_u32::() { - Ok(size) => { size as usize } + let data_size = match stream.read_u16::() { + Ok(size) => { (size ^ 0xAAAA) as usize } Err(e) => { error!("Error reading from socket! {}", e); 0 } }; - //trace!("Payload size is {}", data_size); + trace!("Payload size is {}", data_size); if data_size > MAX_PACKET_SIZE || data_size == 0 { return Err(()); } @@ -379,207 +726,102 @@ fn read_message(stream: &mut TcpStream) -> Result, ()> { } } -fn send_message(connection: &mut TcpStream, data: &Vec) -> io::Result<()> { - connection.write_u32::(data.len() as u32)?; - connection.write_all(&data)?; - connection.flush() +/// Sends one byte [garbage_size], [random bytes], and [public_key] +fn send_client_handshake(stream: &mut TcpStream, public_key: &[u8]) -> io::Result<()> { + let mut rng = rand::thread_rng(); + let packet_size: usize = rng.gen_range(64..255); + let mut buf = vec![0u8; packet_size]; + rng.fill_bytes(&mut buf); + let garbage_size = packet_size - 33; + buf[0] = garbage_size as u8 ^ 0xA; // key length and 1 byte size + for i in 0..public_key.len() { + buf[i + garbage_size + 1] = public_key[i]; + } + stream.write_all(buf.as_slice())?; + stream.flush() } -fn handle_message(context: Arc>, message: Message, peers: &mut Peers, token: &Token) -> State { - let (my_height, my_hash, my_origin, my_version) = { - let context = context.lock().unwrap(); - // TODO cache it somewhere - (context.chain.get_height(), context.chain.get_last_hash(), &context.settings.origin.clone(), CHAIN_VERSION) +fn read_client_handshake(stream: &mut TcpStream) -> Result, Error> { + // First, we read garbage size + let data_size = match stream.read_u8() { + Ok(size) => { (size ^ 0xA) as usize } + Err(e) => { + error!("Error reading from socket! {}", e); + return Err(e) + } }; - let answer = match message { - Message::Hand { app_version, origin, version, public, rand} => { - if peers.is_our_own_connect(&rand) { - warn!("Detected loop connect"); - State::SendLoop - } else { - if origin.eq(my_origin) && version == my_version { - let peer = peers.get_mut_peer(token).unwrap(); - peer.set_public(public); - peer.set_active(true); - debug!("Incoming v{} on {}", &app_version, peer.get_addr().ip()); - let app_version = context.lock().unwrap().app_version.clone(); - State::message(Message::shake(&app_version, &origin, version, true, my_height)) - } else { - warn!("Handshake from unsupported chain or version"); - State::Banned - } - } - } - Message::Shake { app_version, origin, version, ok, height } => { - if origin.ne(my_origin) || version != my_version { - return State::Banned; - } - if ok { - let nodes = peers.get_peers_active_count(); - let peer = peers.get_mut_peer(token).unwrap(); - debug!("Outgoing v{} on {}", &app_version, peer.get_addr().ip()); - peer.set_height(height); - peer.set_active(true); - peer.reset_reconnects(); - let mut context = context.lock().unwrap(); - if peer.is_higher(my_height) { - context.chain.update_max_height(height); - let event = crate::event::Event::Syncing { have: my_height, height: max(height, my_height) }; - post(event); - } - if nodes < MAX_NODES && random::() { - debug!("Requesting more peers from {}", peer.get_addr().ip()); - State::message(Message::GetPeers) - } else { - State::idle() - } - } else { - State::Banned - } - } - Message::Error => { State::Error } - Message::Ping { height, hash } => { - let peer = peers.get_mut_peer(token).unwrap(); - peer.set_height(height); - peer.set_active(true); - if peer.is_higher(my_height) { - let mut context = context.lock().unwrap(); - context.chain.update_max_height(height); - info!("Peer is higher, requesting block {} from {}", height, peer.get_addr().ip()); - State::message(Message::GetBlock { index: my_height + 1 }) - } else if my_height == height && hash.ne(&my_hash) { - info!("Hashes are different, requesting block {} from {}", my_height, peer.get_addr().ip()); - info!("My hash: {:?}, their hash: {:?}", &my_hash, &hash); - State::message(Message::GetBlock { index: my_height }) - } else { - State::message(Message::pong(my_height, my_hash)) - } - } - Message::Pong { height, hash } => { - let active_count = peers.get_peers_active_count(); - let peer = peers.get_mut_peer(token).unwrap(); - peer.set_height(height); - peer.set_active(true); - if peer.is_higher(my_height) { - let mut context = context.lock().unwrap(); - context.chain.update_max_height(height); - info!("Peer is higher, requesting block {} from {}", height, peer.get_addr().ip()); - State::message(Message::GetBlock { index: my_height + 1 }) - } else if my_height == height && hash.ne(&my_hash) { - info!("Hashes are different, requesting block {} from {}", my_height, peer.get_addr().ip()); - info!("My hash: {:?}, their hash: {:?}", &my_hash, &hash); - State::message(Message::GetBlock { index: my_height }) - } else { - if active_count < MAX_NODES && random::() < 50 { - debug!("Requesting more peers from {}", peer.get_addr().ip()); - State::message(Message::GetPeers) - } else { - State::idle() - } - } - } - Message::GetPeers => { - let addr = { - let peer = peers.get_mut_peer(token).unwrap(); - peer.set_active(true); - peer.get_addr().clone() - }; - State::message(Message::Peers { peers: peers.get_peers_for_exchange(&addr) }) - } - Message::Peers { peers: new_peers } => { - let peer = peers.get_mut_peer(token).unwrap(); - peer.set_active(true); - peers.add_peers_from_exchange(new_peers); - State::idle() - } - Message::GetBlock { index } => { - let peer = peers.get_mut_peer(token).unwrap(); - peer.set_active(true); - let context = context.lock().unwrap(); - match context.chain.get_block(index) { - Some(block) => State::message(Message::block(block.index, serde_json::to_string(&block).unwrap())), - None => State::Error - } - } - Message::Block { index, block } => { - let peer = peers.get_mut_peer(token).unwrap(); - peer.set_active(true); - let block: Block = match serde_json::from_str(&block) { - Ok(block) => block, - Err(_) => return State::Banned - }; - if index != block.index { - return State::Banned; - } - info!("Received block {} with hash {:?}", block.index, &block.hash); - handle_block(context, peers, token, block) - } - Message::Twin => { State::Twin } - Message::Loop => { State::Loop } - }; - answer -} - -fn handle_block(context: Arc>, peers: &mut Peers, token: &Token, block: Block) -> State { - let peers_count = peers.get_peers_active_count(); - let peer = peers.get_mut_peer(token).unwrap(); - peer.set_received_block(block.index); - - let mut context = context.lock().unwrap(); - let max_height = context.chain.max_height(); - match context.chain.check_new_block(&block) { - BlockQuality::Good => { - context.chain.add_block(block); - let my_height = context.chain.get_height(); - post(crate::event::Event::BlockchainChanged { index: my_height }); - // If it was the last block to sync - if my_height == max_height { - post(crate::event::Event::SyncFinished); - } else { - let event = crate::event::Event::Syncing { have: my_height, height: max(max_height, my_height) }; - post(event); - } - let domains = context.chain.get_domains_count(); - let keys = context.chain.get_users_count(); - post(crate::event::Event::NetworkStatus { blocks: my_height, domains, keys, nodes: peers_count }); - // To load blocks from different nodes we randomize requests of new blocks - // TODO rethink this approach - if max_height > my_height && random::() < 200 { - return State::message(Message::GetBlock { index: my_height + 1 }); - } - } - BlockQuality::Twin => { debug!("Ignoring duplicate block {}", block.index); } - BlockQuality::Future => { debug!("Ignoring future block {}", block.index); } - BlockQuality::Bad => { - // TODO save bad public keys to banned table - debug!("Ignoring bad block from {}:\n{:?}", peer.get_addr(), &block); - let height = context.chain.get_height(); - context.chain.update_max_height(height); - post(crate::event::Event::SyncFinished); - return State::Banned; - } - BlockQuality::Rewind => { - debug!("Got some orphan block, requesting its parent"); - return State::message(Message::GetBlock { index: block.index - 1 }); - } - BlockQuality::Fork => { - debug!("Got forked block {} with hash {:?}", block.index, block.hash); - // If we are very much behind of blockchain - let lagged = block.index == context.chain.get_height() && block.index + LIMITED_CONFIDENCE_DEPTH <= max_height; - let last_block = context.chain.last_block().unwrap(); - if block.is_better_than(&last_block) || lagged { - context.chain.replace_block(block).expect("Error replacing block with fork"); - let index = context.chain.get_height(); - post(crate::event::Event::BlockchainChanged { index }); - } else { - debug!("Fork in not better than our block, dropping."); - if let Some(block) = context.chain.get_block(block.index) { - return State::message(Message::block(block.index, serde_json::to_string(&block).unwrap())); - } - } + // Read the garbage + let mut buf = vec![0u8; data_size]; + match stream.read_exact(&mut buf) { + Ok(_) => {} + Err(e) => { return Err(e); } + } + // Then we have public key for ECDH + let mut buf = vec![0u8; 32]; + match stream.read_exact(&mut buf) { + Ok(_) => { Ok(buf) } + Err(e) => { + warn!("Error reading handshake!"); + Err(e) } } - State::idle() +} + +fn send_server_handshake(peer: &mut Peer, public_key: &[u8]) -> io::Result<()> { + let mut rng = rand::thread_rng(); + let packet_size: usize = rng.gen_range(64..255); + let mut buf = vec![0u8; packet_size]; + rng.fill_bytes(&mut buf); + let nonce = peer.get_nonce(); + // We will write 1 byte size, garbage, public key, nonce + let garbage_size = packet_size - 1 - 32 - 12; + buf[0] = garbage_size as u8 ^ 0xA; + for i in 0..public_key.len() { + buf[i + garbage_size + 1] = public_key[i]; + } + for i in 0..nonce.len() { + buf[i + garbage_size + 32 + 1] = nonce[i]; + } + let stream = peer.get_stream(); + stream.write_all(buf.as_slice())?; + stream.flush() +} + +fn read_server_handshake(stream: &mut TcpStream) -> Result, Error> { + // First, we read garbage size + let data_size = match stream.read_u8() { + Ok(size) => { (size ^ 0xA) as usize } + Err(e) => { + error!("Error reading from socket! {}", e); + return Err(e) + } + }; + // Read the garbage + let mut buf = vec![0u8; data_size]; + match stream.read_exact(&mut buf) { + Ok(_) => {} + Err(e) => { return Err(e); } + } + // Then we have public key for ECDH, plus nonce 12 bytes + let mut buf = vec![0u8; 32 + 12]; + match stream.read_exact(&mut buf) { + Ok(_) => { Ok(buf) } + Err(e) => { + warn!("Error reading handshake!"); + Err(e) + } + } +} + +fn send_message(connection: &mut TcpStream, data: &Vec) -> io::Result<()> { + let data_len = data.len() as u16; + //debug!("Sending {} bytes", data_len); + //debug!("Message: {:?}", to_hex(&data)); + let mut buf: Vec = Vec::with_capacity(data.len() + 2); + buf.write_u16::(data_len ^ 0xAAAA)?; + buf.write_all(&data)?; + connection.write_all(&buf)?; + connection.flush() } fn would_block(err: &io::Error) -> bool { diff --git a/src/p2p/peer.rs b/src/p2p/peer.rs index 82f279a..b4ec42f 100644 --- a/src/p2p/peer.rs +++ b/src/p2p/peer.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use mio::net::TcpStream; use crate::p2p::State; use crate::Block; +use crate::crypto::Chacha; #[derive(Debug)] pub struct Peer { @@ -16,6 +17,7 @@ pub struct Peer { active: bool, reconnects: u32, received_block: u64, + cipher: Option, fork: HashMap } @@ -32,10 +34,26 @@ impl Peer { active: false, reconnects: 0, received_block: 0, + cipher: None, fork: HashMap::new() } } + pub fn set_cipher(&mut self, cipher: Chacha) { + self.cipher = Some(cipher); + } + + pub fn get_cipher(&self) -> &Option { + &self.cipher + } + + pub fn get_nonce(&self) -> &[u8; 12] { + match &self.cipher { + None => { &crate::crypto::ZERO_NONCE } + Some(chacha) => { chacha.get_nonce() } + } + } + pub fn get_addr(&self) -> SocketAddr { self.addr.clone() } diff --git a/src/p2p/peers.rs b/src/p2p/peers.rs index 917a158..dbe3558 100644 --- a/src/p2p/peers.rs +++ b/src/p2p/peers.rs @@ -12,7 +12,6 @@ use rand::seq::IteratorRandom; use crate::{Bytes, commons}; use crate::commons::*; use crate::p2p::{Message, Peer, State}; -use crate::commons::next; use std::io; const PING_PERIOD: u64 = 30; @@ -84,6 +83,12 @@ impl Peers { State::Twin => { info!("Peer connection {} to {:?} is a twin", &token.0, &peer.get_addr()); } + State::ServerHandshake => { + info!("Peer connection {} from {:?} didn't shake hands", &token.0, &peer.get_addr()); + } + State::HandshakeFinished => { + info!("Peer connection {} from {:?} shaked hands, but then failed", &token.0, &peer.get_addr()); + } } self.peers.remove(token); @@ -193,6 +198,15 @@ impl Peers { count } + pub fn is_tween_connect(&self, id: &str) -> bool { + for (_, peer) in self.peers.iter() { + if peer.active() && peer.get_id() == id { + return true; + } + } + false + } + pub fn get_peers_banned_count(&self) -> usize { self.ignored.len() } @@ -401,6 +415,13 @@ impl Peers { } } +/// Gets new token from old token, mutating the last +pub fn next(current: &mut Token) -> Token { + let next = current.0; + current.0 += 1; + Token(next) +} + fn skip_private_addr(addr: &SocketAddr) -> bool { if addr.ip().is_loopback() { return true; diff --git a/src/p2p/state.rs b/src/p2p/state.rs index 1d79c4d..a2d9579 100644 --- a/src/p2p/state.rs +++ b/src/p2p/state.rs @@ -5,6 +5,8 @@ use crate::p2p::Message; pub enum State { Connecting, Connected, + ServerHandshake, + HandshakeFinished, Idle { from: Instant }, Message { data: Vec }, Error, @@ -25,8 +27,8 @@ impl State { } pub fn message(message: Message) -> Self { - let response = serde_json::to_string(&message).unwrap(); - State::Message {data: Vec::from(response.as_bytes()) } + let data = serde_cbor::to_vec(&message).unwrap(); + State::Message { data } } pub fn is_idle(&self) -> bool {