Some network refactoring, updated dependencies.

This commit is contained in:
Revertron
2022-01-02 15:10:33 +01:00
parent 0f73741433
commit 6ce092f184
4 changed files with 277 additions and 234 deletions
+216 -210
View File
@@ -101,8 +101,7 @@ impl Network {
SERVER => {
//debug!("Event for server socket {} is {:?}", event.token().0, &event);
// If this is an event for the server, it means a connection is ready to be accepted.
let connection = server.accept();
if let Ok((mut stream, mut address)) = connection {
while let Ok((mut stream, mut address)) = server.accept() {
// Checking if it is an ipv4-mapped ipv6 if yes convert to ipv4
if address.is_ipv6() {
if let IpAddr::V6(ipv6) = address.ip() {
@@ -210,222 +209,229 @@ impl Network {
}
if event.is_readable() {
let data = {
let token = event.token();
match self.peers.get_mut_peer(&token) {
None => {
error!("Error getting peer for connection {}", token.0);
return false;
}
Some(peer) => {
if event.is_read_closed() {
debug!("Node from {} disconnected", peer.get_addr().ip());
return false;
}
match peer.get_state().clone() {
State::Connected => {
let stream = peer.get_stream();
return match read_client_handshake(stream) {
Ok(key) => {
let mut buf = [0u8; 32];
buf.copy_from_slice(key.as_slice());
let public_key: PublicKey = PublicKey::from(buf);
let shared = self.secret_key.diffie_hellman(&public_key);
let mut nonce = [0u8; 12];
let mut rng = rand::thread_rng();
rng.fill(&mut nonce);
let chacha = Chacha::new(shared.as_bytes(), &nonce);
registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap();
peer.set_cipher(chacha);
peer.set_state(State::ServerHandshake);
//trace!("Client hello read successfully");
true
}
Err(_) => {
debug!("Error reading client handshake from {}.", peer.get_addr());
false
}
};
}
State::ServerHandshake => {
let stream = peer.get_stream();
return match read_server_handshake(stream) {
Ok(data) => {
if data.len() != 32 + 12 {
warn!("Server handshake of {} bytes instead of {}", data.len(), 32 + 12);
return false;
}
let mut buf = [0u8; 32];
buf.copy_from_slice(&data.as_slice()[0..32]);
let public_key: PublicKey = PublicKey::from(buf);
let mut nonce = [0u8; 12];
nonce.copy_from_slice(&data.as_slice()[32..]);
let shared = self.secret_key.diffie_hellman(&public_key);
let chacha = Chacha::new(shared.as_bytes(), &nonce);
registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap();
peer.set_cipher(chacha);
peer.set_state(State::HandshakeFinished);
//trace!("Server hello read successfully");
true
}
Err(_) => {
debug!("Error reading client handshake from {}", peer.get_addr());
false
}
};
}
_ => {
let stream = peer.get_stream();
read_message(stream)
}
}
}
}
};
if let Ok(data) = data {
let data = {
match self.peers.get_peer(&event.token()) {
Some(peer) => {
match decode_message(&data, peer.get_cipher()) {
Ok(data) => data,
Err(_) => {
vec![]
}
}
}
None => {
vec![]
}
}
};
match Message::from_bytes(data) {
Ok(message) => {
//let m = format!("{:?}", &message);
let new_state = self.handle_message(message, &event.token());
let peer = self.peers.get_mut_peer(&event.token()).unwrap();
//debug!("Got message from {}: {:?}", &peer.get_addr(), &m);
let stream = peer.get_stream();
match new_state {
State::Message { data } => {
registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap();
peer.set_state(State::Message { data });
}
State::Connecting => {}
State::Connected => {}
State::ServerHandshake => {}
State::HandshakeFinished => {}
State::Idle { .. } => {
peer.set_state(State::idle());
}
State::Error => {}
State::Banned => {
self.peers.ignore_peer(registry, &event.token());
}
State::Offline { .. } => {
peer.set_state(State::offline());
}
State::Loop => {
peer.set_state(State::Loop);
self.peers.ignore_peer(registry, &event.token());
}
State::SendLoop => {
registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap();
peer.set_state(State::SendLoop);
}
State::Twin => {
peer.set_state(State::Twin);
// TODO set something in [Peers], maybe ignore this IP?
return false;
}
}
}
Err(e) => {
let peer = self.peers.get_peer(&event.token()).unwrap();
warn!("Error deserializing message from {}: {}", &peer.get_addr(), e.to_string());
return false;
}
}
} else {
let error = data.err().unwrap();
let addr = match self.peers.get_peer(&event.token()) {
None => String::from("unknown"),
Some(peer) => peer.get_addr().to_string()
};
debug!("Error reading message from {}, error = {}", addr, error);
return false;
}
return self.process_readable(registry, event);
}
if event.is_writable() {
let my_id = self.peers.get_my_id().to_owned();
match self.peers.get_mut_peer(&event.token()) {
None => {}
Some(peer) => {
match peer.get_state().clone() {
State::Connecting => {
if send_client_handshake(peer.get_stream(), self.public_key.as_bytes()).is_err() {
return false;
}
peer.set_state(State::ServerHandshake);
}
State::ServerHandshake => {
if send_server_handshake(peer, self.public_key.as_bytes()).is_err() {
return false;
}
peer.set_state(State::HandshakeFinished);
//trace!("Server handshake sent");
}
State::HandshakeFinished => {
//debug!("Connected to peer {}, sending hello...", &peer.get_addr());
let data: Vec<u8> = {
let c = self.context.lock().unwrap();
let message = Message::hand(&c.app_version, &c.settings.origin, CHAIN_VERSION, c.settings.net.public, &my_id);
//info!("Sending: {:?}", &message);
encode_message(&message, peer.get_cipher()).unwrap()
};
send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending hello {}", e));
//debug!("Sent hello to {}", &peer.get_addr());
}
State::Connected => {}
State::Message { data } => {
//debug!("Sending data to {}: {}", &peer.get_addr(), &String::from_utf8(data.clone()).unwrap());
if let Ok(data) = encode_bytes(&data, peer.get_cipher()) {
send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending message {}", e));
}
}
State::Idle { from } => {
debug!("Odd version of pings :)");
if from.elapsed().as_secs() >= 30 {
let data: Vec<u8> = {
let c = self.context.lock().unwrap();
let message = Message::ping(c.chain.get_height(), c.chain.get_last_hash());
encode_message(&message, peer.get_cipher()).unwrap()
};
send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending ping {}", e));
}
}
State::Error => {}
State::Banned => {}
State::Offline { .. } => {}
State::Loop => {}
State::SendLoop => {
let data = encode_message(&Message::Loop, peer.get_cipher()).unwrap();
send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending loop {}", e));
}
State::Twin => {
let data = encode_message(&Message::Twin, peer.get_cipher()).unwrap();
send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending loop {}", e));
}
}
registry.reregister(peer.get_stream(), event.token(), Interest::READABLE).unwrap();
}
}
return self.process_writable(registry, event);
}
true
}
fn process_readable(&mut self, registry: &Registry, event: &Event) -> bool {
let data = {
let token = event.token();
match self.peers.get_mut_peer(&token) {
None => {
error!("Error getting peer for connection {}", token.0);
return false;
}
Some(peer) => {
if event.is_read_closed() {
debug!("Node from {} disconnected", peer.get_addr().ip());
return false;
}
match peer.get_state().clone() {
State::Connected => {
let stream = peer.get_stream();
return match read_client_handshake(stream) {
Ok(key) => {
let mut buf = [0u8; 32];
buf.copy_from_slice(key.as_slice());
let public_key: PublicKey = PublicKey::from(buf);
let shared = self.secret_key.diffie_hellman(&public_key);
let mut nonce = [0u8; 12];
let mut rng = rand::thread_rng();
rng.fill(&mut nonce);
let chacha = Chacha::new(shared.as_bytes(), &nonce);
registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap();
peer.set_cipher(chacha);
peer.set_state(State::ServerHandshake);
//trace!("Client hello read successfully");
true
}
Err(_) => {
debug!("Error reading client handshake from {}.", peer.get_addr());
false
}
};
}
State::ServerHandshake => {
let stream = peer.get_stream();
return match read_server_handshake(stream) {
Ok(data) => {
if data.len() != 32 + 12 {
warn!("Server handshake of {} bytes instead of {}", data.len(), 32 + 12);
return false;
}
let mut buf = [0u8; 32];
buf.copy_from_slice(&data.as_slice()[0..32]);
let public_key: PublicKey = PublicKey::from(buf);
let mut nonce = [0u8; 12];
nonce.copy_from_slice(&data.as_slice()[32..]);
let shared = self.secret_key.diffie_hellman(&public_key);
let chacha = Chacha::new(shared.as_bytes(), &nonce);
registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap();
peer.set_cipher(chacha);
peer.set_state(State::HandshakeFinished);
//trace!("Server hello read successfully");
true
}
Err(_) => {
debug!("Error reading client handshake from {}", peer.get_addr());
false
}
};
}
_ => {
let stream = peer.get_stream();
read_message(stream)
}
}
}
}
};
if let Ok(data) = data {
let data = {
match self.peers.get_peer(&event.token()) {
Some(peer) => {
match decode_message(&data, peer.get_cipher()) {
Ok(data) => data,
Err(_) => {
vec![]
}
}
}
None => {
vec![]
}
}
};
match Message::from_bytes(data) {
Ok(message) => {
//let m = format!("{:?}", &message);
let new_state = self.handle_message(message, &event.token());
let peer = self.peers.get_mut_peer(&event.token()).unwrap();
//debug!("Got message from {}: {:?}", &peer.get_addr(), &m);
let stream = peer.get_stream();
match new_state {
State::Message { data } => {
registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap();
peer.set_state(State::Message { data });
}
State::Connecting => {}
State::Connected => {}
State::ServerHandshake => {}
State::HandshakeFinished => {}
State::Idle { .. } => {
peer.set_state(State::idle());
}
State::Error => {}
State::Banned => {
self.peers.ignore_peer(registry, &event.token());
}
State::Offline { .. } => {
peer.set_state(State::offline());
}
State::Loop => {
peer.set_state(State::Loop);
self.peers.ignore_peer(registry, &event.token());
}
State::SendLoop => {
registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap();
peer.set_state(State::SendLoop);
}
State::Twin => {
peer.set_state(State::Twin);
// TODO set something in [Peers], maybe ignore this IP?
return false;
}
}
}
Err(e) => {
let peer = self.peers.get_peer(&event.token()).unwrap();
warn!("Error deserializing message from {}: {}", &peer.get_addr(), e.to_string());
return false;
}
}
} else {
let error = data.err().unwrap();
let addr = match self.peers.get_peer(&event.token()) {
None => String::from("unknown"),
Some(peer) => peer.get_addr().to_string()
};
debug!("Error reading message from {}, error = {}", addr, error);
return false;
}
true
}
fn process_writable(&mut self, registry: &Registry, event: &Event) -> bool {
let my_id = self.peers.get_my_id().to_owned();
if let Some(peer) = self.peers.get_mut_peer(&event.token()) {
match peer.get_state().clone() {
State::Connecting => {
if send_client_handshake(peer.get_stream(), self.public_key.as_bytes()).is_err() {
return false;
}
peer.set_state(State::ServerHandshake);
}
State::ServerHandshake => {
if send_server_handshake(peer, self.public_key.as_bytes()).is_err() {
return false;
}
peer.set_state(State::HandshakeFinished);
//trace!("Server handshake sent");
}
State::HandshakeFinished => {
//debug!("Connected to peer {}, sending hello...", &peer.get_addr());
let data: Vec<u8> = {
let c = self.context.lock().unwrap();
let message = Message::hand(&c.app_version, &c.settings.origin, CHAIN_VERSION, c.settings.net.public, &my_id);
//info!("Sending: {:?}", &message);
encode_message(&message, peer.get_cipher()).unwrap()
};
send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending hello {}", e));
//debug!("Sent hello to {}", &peer.get_addr());
}
State::Connected => {}
State::Message { data } => {
//debug!("Sending data to {}: {}", &peer.get_addr(), &String::from_utf8(data.clone()).unwrap());
if let Ok(data) = encode_bytes(&data, peer.get_cipher()) {
send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending message {}", e));
}
}
State::Idle { from } => {
debug!("Odd version of pings :)");
if from.elapsed().as_secs() >= 30 {
let data: Vec<u8> = {
let c = self.context.lock().unwrap();
let message = Message::ping(c.chain.get_height(), c.chain.get_last_hash());
encode_message(&message, peer.get_cipher()).unwrap()
};
send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending ping {}", e));
}
}
State::Error => {}
State::Banned => {}
State::Offline { .. } => {}
State::Loop => {}
State::SendLoop => {
let data = encode_message(&Message::Loop, peer.get_cipher()).unwrap();
send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending loop {}", e));
}
State::Twin => {
let data = encode_message(&Message::Twin, peer.get_cipher()).unwrap();
send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending loop {}", e));
}
}
registry.reregister(peer.get_stream(), event.token(), Interest::READABLE).unwrap();
}
true
}
fn handle_message(&mut self, message: Message, token: &Token) -> State {
let (my_height, my_hash, my_origin, my_version, me_public) = {
let context = self.context.lock().unwrap();
+15 -16
View File
@@ -52,49 +52,48 @@ impl Peers {
}
pub fn close_peer(&mut self, registry: &Registry, token: &Token) {
let peer = self.peers.get_mut(token);
if let Some(peer) = peer {
let peer = self.peers.remove(token);
if let Some(mut peer) = peer {
let stream = peer.get_stream();
let _ = stream.shutdown(Shutdown::Both);
let _ = registry.deregister(stream);
let addr = peer.get_addr();
match peer.get_state() {
State::Connecting => {
debug!("Peer connection {} to {:?} has timed out", &token.0, &peer.get_addr());
debug!("Peer connection {} to {:?} has timed out", &token.0, &addr);
}
State::Connected => {
debug!("Peer connection {} to {:?} disconnected", &token.0, &peer.get_addr());
debug!("Peer connection {} to {:?} disconnected", &token.0, &addr);
}
State::Idle { .. } | State::Message { .. } => {
debug!("Peer connection {} to {:?} disconnected", &token.0, &peer.get_addr());
debug!("Peer connection {} to {:?} disconnected", &token.0, &addr);
}
State::Error => {
debug!("Peer connection {} to {:?} has shut down on error", &token.0, &peer.get_addr());
debug!("Peer connection {} to {:?} has shut down on error", &token.0, &addr);
}
State::Banned => {
debug!("Peer connection {} to {:?} has shut down, banned", &token.0, &peer.get_addr());
self.ignored.insert(peer.get_addr().ip());
debug!("Peer connection {} to {:?} has shut down, banned", &token.0, &addr);
self.ignored.insert(addr.ip());
}
State::Offline { .. } => {
debug!("Peer connection {} to {:?} is offline", &token.0, &peer.get_addr());
debug!("Peer connection {} to {:?} is offline", &token.0, &addr);
}
State::SendLoop => {
debug!("Peer connection {} from {:?} is a loop", &token.0, &peer.get_addr());
debug!("Peer connection {} from {:?} is a loop", &token.0, &addr);
}
State::Loop => {
debug!("Peer connection {} to {:?} is a loop", &token.0, &peer.get_addr());
debug!("Peer connection {} to {:?} is a loop", &token.0, &addr);
}
State::Twin => {
debug!("Peer connection {} to {:?} is a twin", &token.0, &peer.get_addr());
debug!("Peer connection {} to {:?} is a twin", &token.0, &addr);
}
State::ServerHandshake => {
debug!("Peer connection {} from {:?} didn't shake hands", &token.0, &peer.get_addr());
debug!("Peer connection {} from {:?} didn't shake hands", &token.0, &addr);
}
State::HandshakeFinished => {
debug!("Peer connection {} from {:?} shook hands, but then failed", &token.0, &peer.get_addr());
debug!("Peer connection {} from {:?} shook hands, but then failed", &token.0, &addr);
}
}
self.peers.remove(token);
}
}