From 6ce092f184edfc70e6525359528e7f3bd6b1f9be Mon Sep 17 00:00:00 2001 From: Revertron Date: Sun, 2 Jan 2022 15:10:33 +0100 Subject: [PATCH] Some network refactoring, updated dependencies. --- Cargo.lock | 50 +++++- Cargo.toml | 4 +- src/p2p/network.rs | 426 +++++++++++++++++++++++---------------------- src/p2p/peers.rs | 31 ++-- 4 files changed, 277 insertions(+), 234 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dfc2f30..6f37384 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "aead" version = "0.3.2" @@ -323,6 +329,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcb25d077389e53838a8158c8e99174c5a9d902dee4904320db714f3c653ffba" +[[package]] +name = "crc32fast" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "738c290dfaea84fc1ca15ad9c168d083b05a714e1efddd8edaab678dc28d2836" +dependencies = [ + "cfg-if", +] + [[package]] name = "crypto-common" version = "0.1.1" @@ -447,6 +462,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "flate2" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e6988e897c1c9c485f43b47a529cef42fde0547f9d8d41a7062518f1d8fc53f" +dependencies = [ + "cfg-if", + "crc32fast", + "libc", + "miniz_oxide", +] + [[package]] name = "form_urlencoded" version = "1.0.1" @@ -720,6 +747,16 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" +[[package]] +name = "miniz_oxide" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b" +dependencies = [ + "adler", + "autocfg", +] + [[package]] name = "mio" version = "0.8.0" @@ -783,9 +820,9 @@ dependencies = [ [[package]] name = "num_cpus" -version = "1.13.0" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" +checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" dependencies = [ "hermit-abi", "libc", @@ -996,9 +1033,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.20.0" +version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b5ac6078ca424dc1d3ae2328526a76787fecc7f8011f520e3276730e711fc95" +checksum = "d37e5e2290f3e040b594b1a9e04377c2c671f1a1cfd9bfdef82106ac1c113f84" dependencies = [ "log", "ring", @@ -1374,12 +1411,13 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" [[package]] name = "ureq" -version = "2.3.1" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5c448dcb78ec38c7d59ec61f87f70a98ea19171e06c139357e012ee226fec90" +checksum = "9399fa2f927a3d327187cbd201480cee55bee6ac5d3c77dd27f0c6814cff16d5" dependencies = [ "base64", "chunked_transfer", + "flate2", "log", "once_cell", "rustls", diff --git a/Cargo.toml b/Cargo.toml index b98bc0e..e80d857 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ ecies-ed25519 = "0.5" chacha20poly1305 = "0.9" signature = "1.3.1" blakeout = "0.3.0" -num_cpus = "1.13.0" +num_cpus = "1.13.1" byteorder = "1.4.3" serde = { version = "1.0.132", features = ["derive"] } serde_json = "1.0.73" @@ -38,7 +38,7 @@ rand-old = { package = "rand", version = "0.7.0" } # For ed25519-dalek sqlite = "0.26.0" uuid = { version = "0.8.2", features = ["serde", "v4"] } mio = { version = "0.8.0", features = ["os-poll", "net"] } -ureq = { version = "2.3.1", optional = true } +ureq = { version = "2.4", optional = true } lru = "0.7.1" derive_more = "0.99.17" lazy_static = "1.4.0" diff --git a/src/p2p/network.rs b/src/p2p/network.rs index b28b8af..365e6a5 100644 --- a/src/p2p/network.rs +++ b/src/p2p/network.rs @@ -101,8 +101,7 @@ impl Network { SERVER => { //debug!("Event for server socket {} is {:?}", event.token().0, &event); // If this is an event for the server, it means a connection is ready to be accepted. - let connection = server.accept(); - if let Ok((mut stream, mut address)) = connection { + while let Ok((mut stream, mut address)) = server.accept() { // Checking if it is an ipv4-mapped ipv6 if yes convert to ipv4 if address.is_ipv6() { if let IpAddr::V6(ipv6) = address.ip() { @@ -210,222 +209,229 @@ impl Network { } if event.is_readable() { - let data = { - let token = event.token(); - match self.peers.get_mut_peer(&token) { - None => { - error!("Error getting peer for connection {}", token.0); - return false; - } - Some(peer) => { - if event.is_read_closed() { - debug!("Node from {} disconnected", peer.get_addr().ip()); - return false; - } - match peer.get_state().clone() { - State::Connected => { - let stream = peer.get_stream(); - return match read_client_handshake(stream) { - Ok(key) => { - let mut buf = [0u8; 32]; - buf.copy_from_slice(key.as_slice()); - let public_key: PublicKey = PublicKey::from(buf); - let shared = self.secret_key.diffie_hellman(&public_key); - let mut nonce = [0u8; 12]; - let mut rng = rand::thread_rng(); - rng.fill(&mut nonce); - let chacha = Chacha::new(shared.as_bytes(), &nonce); - registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap(); - peer.set_cipher(chacha); - peer.set_state(State::ServerHandshake); - //trace!("Client hello read successfully"); - true - } - Err(_) => { - debug!("Error reading client handshake from {}.", peer.get_addr()); - false - } - }; - } - State::ServerHandshake => { - let stream = peer.get_stream(); - return match read_server_handshake(stream) { - Ok(data) => { - if data.len() != 32 + 12 { - warn!("Server handshake of {} bytes instead of {}", data.len(), 32 + 12); - return false; - } - let mut buf = [0u8; 32]; - buf.copy_from_slice(&data.as_slice()[0..32]); - let public_key: PublicKey = PublicKey::from(buf); - let mut nonce = [0u8; 12]; - nonce.copy_from_slice(&data.as_slice()[32..]); - let shared = self.secret_key.diffie_hellman(&public_key); - let chacha = Chacha::new(shared.as_bytes(), &nonce); - registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap(); - peer.set_cipher(chacha); - peer.set_state(State::HandshakeFinished); - //trace!("Server hello read successfully"); - true - } - Err(_) => { - debug!("Error reading client handshake from {}", peer.get_addr()); - false - } - }; - } - _ => { - let stream = peer.get_stream(); - read_message(stream) - } - } - } - } - }; - - if let Ok(data) = data { - let data = { - match self.peers.get_peer(&event.token()) { - Some(peer) => { - match decode_message(&data, peer.get_cipher()) { - Ok(data) => data, - Err(_) => { - vec![] - } - } - } - None => { - vec![] - } - } - }; - match Message::from_bytes(data) { - Ok(message) => { - //let m = format!("{:?}", &message); - let new_state = self.handle_message(message, &event.token()); - let peer = self.peers.get_mut_peer(&event.token()).unwrap(); - //debug!("Got message from {}: {:?}", &peer.get_addr(), &m); - let stream = peer.get_stream(); - match new_state { - State::Message { data } => { - registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap(); - peer.set_state(State::Message { data }); - } - State::Connecting => {} - State::Connected => {} - State::ServerHandshake => {} - State::HandshakeFinished => {} - State::Idle { .. } => { - peer.set_state(State::idle()); - } - State::Error => {} - State::Banned => { - self.peers.ignore_peer(registry, &event.token()); - } - State::Offline { .. } => { - peer.set_state(State::offline()); - } - State::Loop => { - peer.set_state(State::Loop); - self.peers.ignore_peer(registry, &event.token()); - } - State::SendLoop => { - registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap(); - peer.set_state(State::SendLoop); - } - State::Twin => { - peer.set_state(State::Twin); - // TODO set something in [Peers], maybe ignore this IP? - return false; - } - } - } - Err(e) => { - let peer = self.peers.get_peer(&event.token()).unwrap(); - warn!("Error deserializing message from {}: {}", &peer.get_addr(), e.to_string()); - return false; - } - } - } else { - let error = data.err().unwrap(); - let addr = match self.peers.get_peer(&event.token()) { - None => String::from("unknown"), - Some(peer) => peer.get_addr().to_string() - }; - debug!("Error reading message from {}, error = {}", addr, error); - return false; - } + return self.process_readable(registry, event); } if event.is_writable() { - let my_id = self.peers.get_my_id().to_owned(); - match self.peers.get_mut_peer(&event.token()) { - None => {} - Some(peer) => { - match peer.get_state().clone() { - State::Connecting => { - if send_client_handshake(peer.get_stream(), self.public_key.as_bytes()).is_err() { - return false; - } - peer.set_state(State::ServerHandshake); - } - State::ServerHandshake => { - if send_server_handshake(peer, self.public_key.as_bytes()).is_err() { - return false; - } - peer.set_state(State::HandshakeFinished); - //trace!("Server handshake sent"); - } - State::HandshakeFinished => { - //debug!("Connected to peer {}, sending hello...", &peer.get_addr()); - let data: Vec = { - let c = self.context.lock().unwrap(); - let message = Message::hand(&c.app_version, &c.settings.origin, CHAIN_VERSION, c.settings.net.public, &my_id); - //info!("Sending: {:?}", &message); - encode_message(&message, peer.get_cipher()).unwrap() - }; - send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending hello {}", e)); - //debug!("Sent hello to {}", &peer.get_addr()); - } - State::Connected => {} - State::Message { data } => { - //debug!("Sending data to {}: {}", &peer.get_addr(), &String::from_utf8(data.clone()).unwrap()); - if let Ok(data) = encode_bytes(&data, peer.get_cipher()) { - send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending message {}", e)); - } - } - State::Idle { from } => { - debug!("Odd version of pings :)"); - if from.elapsed().as_secs() >= 30 { - let data: Vec = { - let c = self.context.lock().unwrap(); - let message = Message::ping(c.chain.get_height(), c.chain.get_last_hash()); - encode_message(&message, peer.get_cipher()).unwrap() - }; - send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending ping {}", e)); - } - } - State::Error => {} - State::Banned => {} - State::Offline { .. } => {} - State::Loop => {} - State::SendLoop => { - let data = encode_message(&Message::Loop, peer.get_cipher()).unwrap(); - send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending loop {}", e)); - } - State::Twin => { - let data = encode_message(&Message::Twin, peer.get_cipher()).unwrap(); - send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending loop {}", e)); - } - } - registry.reregister(peer.get_stream(), event.token(), Interest::READABLE).unwrap(); - } - } + return self.process_writable(registry, event); } true } + fn process_readable(&mut self, registry: &Registry, event: &Event) -> bool { + let data = { + let token = event.token(); + match self.peers.get_mut_peer(&token) { + None => { + error!("Error getting peer for connection {}", token.0); + return false; + } + Some(peer) => { + if event.is_read_closed() { + debug!("Node from {} disconnected", peer.get_addr().ip()); + return false; + } + match peer.get_state().clone() { + State::Connected => { + let stream = peer.get_stream(); + return match read_client_handshake(stream) { + Ok(key) => { + let mut buf = [0u8; 32]; + buf.copy_from_slice(key.as_slice()); + let public_key: PublicKey = PublicKey::from(buf); + let shared = self.secret_key.diffie_hellman(&public_key); + let mut nonce = [0u8; 12]; + let mut rng = rand::thread_rng(); + rng.fill(&mut nonce); + let chacha = Chacha::new(shared.as_bytes(), &nonce); + registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap(); + peer.set_cipher(chacha); + peer.set_state(State::ServerHandshake); + //trace!("Client hello read successfully"); + true + } + Err(_) => { + debug!("Error reading client handshake from {}.", peer.get_addr()); + false + } + }; + } + State::ServerHandshake => { + let stream = peer.get_stream(); + return match read_server_handshake(stream) { + Ok(data) => { + if data.len() != 32 + 12 { + warn!("Server handshake of {} bytes instead of {}", data.len(), 32 + 12); + return false; + } + let mut buf = [0u8; 32]; + buf.copy_from_slice(&data.as_slice()[0..32]); + let public_key: PublicKey = PublicKey::from(buf); + let mut nonce = [0u8; 12]; + nonce.copy_from_slice(&data.as_slice()[32..]); + let shared = self.secret_key.diffie_hellman(&public_key); + let chacha = Chacha::new(shared.as_bytes(), &nonce); + registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap(); + peer.set_cipher(chacha); + peer.set_state(State::HandshakeFinished); + //trace!("Server hello read successfully"); + true + } + Err(_) => { + debug!("Error reading client handshake from {}", peer.get_addr()); + false + } + }; + } + _ => { + let stream = peer.get_stream(); + read_message(stream) + } + } + } + } + }; + + if let Ok(data) = data { + let data = { + match self.peers.get_peer(&event.token()) { + Some(peer) => { + match decode_message(&data, peer.get_cipher()) { + Ok(data) => data, + Err(_) => { + vec![] + } + } + } + None => { + vec![] + } + } + }; + match Message::from_bytes(data) { + Ok(message) => { + //let m = format!("{:?}", &message); + let new_state = self.handle_message(message, &event.token()); + let peer = self.peers.get_mut_peer(&event.token()).unwrap(); + //debug!("Got message from {}: {:?}", &peer.get_addr(), &m); + let stream = peer.get_stream(); + match new_state { + State::Message { data } => { + registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap(); + peer.set_state(State::Message { data }); + } + State::Connecting => {} + State::Connected => {} + State::ServerHandshake => {} + State::HandshakeFinished => {} + State::Idle { .. } => { + peer.set_state(State::idle()); + } + State::Error => {} + State::Banned => { + self.peers.ignore_peer(registry, &event.token()); + } + State::Offline { .. } => { + peer.set_state(State::offline()); + } + State::Loop => { + peer.set_state(State::Loop); + self.peers.ignore_peer(registry, &event.token()); + } + State::SendLoop => { + registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap(); + peer.set_state(State::SendLoop); + } + State::Twin => { + peer.set_state(State::Twin); + // TODO set something in [Peers], maybe ignore this IP? + return false; + } + } + } + Err(e) => { + let peer = self.peers.get_peer(&event.token()).unwrap(); + warn!("Error deserializing message from {}: {}", &peer.get_addr(), e.to_string()); + return false; + } + } + } else { + let error = data.err().unwrap(); + let addr = match self.peers.get_peer(&event.token()) { + None => String::from("unknown"), + Some(peer) => peer.get_addr().to_string() + }; + debug!("Error reading message from {}, error = {}", addr, error); + return false; + } + true + } + + fn process_writable(&mut self, registry: &Registry, event: &Event) -> bool { + let my_id = self.peers.get_my_id().to_owned(); + if let Some(peer) = self.peers.get_mut_peer(&event.token()) { + match peer.get_state().clone() { + State::Connecting => { + if send_client_handshake(peer.get_stream(), self.public_key.as_bytes()).is_err() { + return false; + } + peer.set_state(State::ServerHandshake); + } + State::ServerHandshake => { + if send_server_handshake(peer, self.public_key.as_bytes()).is_err() { + return false; + } + peer.set_state(State::HandshakeFinished); + //trace!("Server handshake sent"); + } + State::HandshakeFinished => { + //debug!("Connected to peer {}, sending hello...", &peer.get_addr()); + let data: Vec = { + let c = self.context.lock().unwrap(); + let message = Message::hand(&c.app_version, &c.settings.origin, CHAIN_VERSION, c.settings.net.public, &my_id); + //info!("Sending: {:?}", &message); + encode_message(&message, peer.get_cipher()).unwrap() + }; + send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending hello {}", e)); + //debug!("Sent hello to {}", &peer.get_addr()); + } + State::Connected => {} + State::Message { data } => { + //debug!("Sending data to {}: {}", &peer.get_addr(), &String::from_utf8(data.clone()).unwrap()); + if let Ok(data) = encode_bytes(&data, peer.get_cipher()) { + send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending message {}", e)); + } + } + State::Idle { from } => { + debug!("Odd version of pings :)"); + if from.elapsed().as_secs() >= 30 { + let data: Vec = { + let c = self.context.lock().unwrap(); + let message = Message::ping(c.chain.get_height(), c.chain.get_last_hash()); + encode_message(&message, peer.get_cipher()).unwrap() + }; + send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending ping {}", e)); + } + } + State::Error => {} + State::Banned => {} + State::Offline { .. } => {} + State::Loop => {} + State::SendLoop => { + let data = encode_message(&Message::Loop, peer.get_cipher()).unwrap(); + send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending loop {}", e)); + } + State::Twin => { + let data = encode_message(&Message::Twin, peer.get_cipher()).unwrap(); + send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending loop {}", e)); + } + } + registry.reregister(peer.get_stream(), event.token(), Interest::READABLE).unwrap(); + } + true + } + fn handle_message(&mut self, message: Message, token: &Token) -> State { let (my_height, my_hash, my_origin, my_version, me_public) = { let context = self.context.lock().unwrap(); diff --git a/src/p2p/peers.rs b/src/p2p/peers.rs index 59a880b..39dbb6f 100644 --- a/src/p2p/peers.rs +++ b/src/p2p/peers.rs @@ -52,49 +52,48 @@ impl Peers { } pub fn close_peer(&mut self, registry: &Registry, token: &Token) { - let peer = self.peers.get_mut(token); - if let Some(peer) = peer { + let peer = self.peers.remove(token); + if let Some(mut peer) = peer { let stream = peer.get_stream(); let _ = stream.shutdown(Shutdown::Both); let _ = registry.deregister(stream); + let addr = peer.get_addr(); match peer.get_state() { State::Connecting => { - debug!("Peer connection {} to {:?} has timed out", &token.0, &peer.get_addr()); + debug!("Peer connection {} to {:?} has timed out", &token.0, &addr); } State::Connected => { - debug!("Peer connection {} to {:?} disconnected", &token.0, &peer.get_addr()); + debug!("Peer connection {} to {:?} disconnected", &token.0, &addr); } State::Idle { .. } | State::Message { .. } => { - debug!("Peer connection {} to {:?} disconnected", &token.0, &peer.get_addr()); + debug!("Peer connection {} to {:?} disconnected", &token.0, &addr); } State::Error => { - debug!("Peer connection {} to {:?} has shut down on error", &token.0, &peer.get_addr()); + debug!("Peer connection {} to {:?} has shut down on error", &token.0, &addr); } State::Banned => { - debug!("Peer connection {} to {:?} has shut down, banned", &token.0, &peer.get_addr()); - self.ignored.insert(peer.get_addr().ip()); + debug!("Peer connection {} to {:?} has shut down, banned", &token.0, &addr); + self.ignored.insert(addr.ip()); } State::Offline { .. } => { - debug!("Peer connection {} to {:?} is offline", &token.0, &peer.get_addr()); + debug!("Peer connection {} to {:?} is offline", &token.0, &addr); } State::SendLoop => { - debug!("Peer connection {} from {:?} is a loop", &token.0, &peer.get_addr()); + debug!("Peer connection {} from {:?} is a loop", &token.0, &addr); } State::Loop => { - debug!("Peer connection {} to {:?} is a loop", &token.0, &peer.get_addr()); + debug!("Peer connection {} to {:?} is a loop", &token.0, &addr); } State::Twin => { - debug!("Peer connection {} to {:?} is a twin", &token.0, &peer.get_addr()); + debug!("Peer connection {} to {:?} is a twin", &token.0, &addr); } State::ServerHandshake => { - debug!("Peer connection {} from {:?} didn't shake hands", &token.0, &peer.get_addr()); + debug!("Peer connection {} from {:?} didn't shake hands", &token.0, &addr); } State::HandshakeFinished => { - debug!("Peer connection {} from {:?} shook hands, but then failed", &token.0, &peer.get_addr()); + debug!("Peer connection {} from {:?} shook hands, but then failed", &token.0, &addr); } } - - self.peers.remove(token); } }