diff --git a/Cargo.toml b/Cargo.toml index 7ef4d49..906be11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,7 @@ lto = true strip = true # Automatically strip symbols from the binary. [profile.dev] -opt-level = 2 +opt-level = 1 [profile.test] opt-level = 2 diff --git a/src/blockchain/chain.rs b/src/blockchain/chain.rs index 41bd72f..e80ddd7 100644 --- a/src/blockchain/chain.rs +++ b/src/blockchain/chain.rs @@ -975,13 +975,23 @@ impl Chain { return false; } // If this signers' public key has already locked/signed that block we return error + // Use signing_keys cache to avoid repeated DB loads for the same blocks + let mut cache = self.signers.borrow_mut(); for i in (full_block.index + 1)..block.index { - let signer = self.get_block(i).expect("Error in DB!"); - if signer.pub_key == block.pub_key { + let pub_key = if let Some(cached) = cache.signing_keys.get(&i) { + cached.clone() + } else { + let signer = self.get_block(i).expect("Error in DB!"); + cache.signing_keys.insert(i, signer.pub_key.clone()); + signer.pub_key + }; + if pub_key == block.pub_key { warn!("Ignoring block {} from '{:?}', already signed by this key", block.index, &block.pub_key); return false; } } + // Cache this block's pub_key too for future checks + cache.signing_keys.insert(block.index, block.pub_key.clone()); true } @@ -1066,6 +1076,7 @@ impl Chain { let mut signers = self.signers.borrow_mut(); signers.index = block.index; signers.signers = result.clone(); + signers.signing_keys.clear(); result } @@ -1087,12 +1098,15 @@ impl Chain { struct SignersCache { index: u64, - signers: Vec + signers: Vec, + /// Cache of block_index → pub_key for signing blocks since last full_block. + /// Avoids repeated DB loads in is_good_signer_for_block duplicate check. + signing_keys: HashMap, } impl SignersCache { pub fn new() -> RefCell { - let cache = SignersCache { index: 0, signers: Vec::new() }; + let cache = SignersCache { index: 0, signers: Vec::new(), signing_keys: HashMap::new() }; RefCell::new(cache) } @@ -1103,6 +1117,7 @@ impl SignersCache { pub fn clear(&mut self) { self.index = 0; self.signers.clear(); + self.signing_keys.clear(); } } diff --git a/src/p2p/mod.rs b/src/p2p/mod.rs index b7056ba..6ee3d96 100644 --- a/src/p2p/mod.rs +++ b/src/p2p/mod.rs @@ -3,6 +3,7 @@ pub mod network; pub mod peer; pub mod peers; pub mod state; +pub mod version; pub use message::Message; pub use network::Network; diff --git a/src/p2p/network.rs b/src/p2p/network.rs index 6ffae2b..36b1d25 100644 --- a/src/p2p/network.rs +++ b/src/p2p/network.rs @@ -10,17 +10,22 @@ use std::sync::{Arc, mpsc, Mutex}; use std::time::{Duration, Instant}; use std::{io, thread}; +use crossbeam_channel::{bounded, Receiver, Sender}; + use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; #[allow(unused_imports)] use log::{debug, error, info, trace, warn}; use mio::event::Event; use mio::net::{TcpListener, TcpStream}; use mio::{Events, Interest, Poll, Registry, Token}; +use crate::commons::rtt_tracker::RttTracker; +use crate::p2p::version::Version; use rand::{random, Rng, RngCore}; use rand::prelude::thread_rng; use x25519_dalek::{PublicKey, ReusableSecret}; use crate::blockchain::types::BlockQuality; +use crate::blockchain::hash_utils::{check_block_hash, check_block_signature, hash_difficulty}; use crate::commons::*; use crate::crypto::Chacha; use crate::eventbus::{post, register}; @@ -29,6 +34,20 @@ use crate::{Block, Bytes, Context}; const SERVER: Token = Token(0); +/// Job sent to validation worker threads +struct ValidationJob { + token: Token, + block: Block, +} + +/// Result from validation worker threads after CPU-intensive checks +enum PreValidationResult { + /// Block passed hash and signature checks, needs DB validation + NeedsDbValidation(Token, Block), + /// Block failed basic validation + Invalid(Token, Block), +} + pub struct Network { context: Arc>, secret_key: ReusableSecret, @@ -37,7 +56,14 @@ pub struct Network { // States of peer connections, and some data to send when sockets become writable peers: Peers, // Orphan blocks from future - future_blocks: HashMap + future_blocks: HashMap, + // Validation thread pool channels + validation_sender: Sender, + validation_receiver: Receiver, + // Track pending block requests: block_index -> (request_time, peer_token) + pending_requests: HashMap, + // Track peer response times for adaptive selection + peer_rtt: RttTracker, } impl Network { @@ -47,7 +73,88 @@ impl Network { let secret_key = ReusableSecret::random_from_rng(&mut thread_rng); let public_key = PublicKey::from(&secret_key); let peers = Peers::new(); - Network { context, secret_key, public_key, token: Token(1), peers, future_blocks: HashMap::new() } + + // Create validation thread pool + let cpus = num_cpus::get(); + let num_workers = cpus.min((cpus / 2).max(1)); // At most half cpus + info!("Starting {num_workers} validation threads"); + let channel_capacity = (num_workers * 4).max(100); + let (job_sender, job_receiver) = bounded::(channel_capacity); + let (result_sender, result_receiver) = bounded::(channel_capacity); + + // Spawn validation worker threads + for i in 0..num_workers { + let job_rx = job_receiver.clone(); + let result_tx = result_sender.clone(); + + thread::Builder::new() + .name(format!("block-validator-{}", i)) + .spawn(move || { + Self::validation_worker(job_rx, result_tx); + }) + .expect("Failed to spawn validation worker thread"); + } + + // Drop the extra senders/receivers we don't need + drop(result_sender); + + Network { + context, + secret_key, + public_key, + token: Token(1), + peers, + future_blocks: HashMap::new(), + validation_sender: job_sender, + validation_receiver: result_receiver, + pending_requests: HashMap::new(), + peer_rtt: RttTracker::new(), + } + } + + /// Worker thread that performs CPU-intensive block validation + fn validation_worker( + job_receiver: Receiver, + result_sender: Sender, + ) { + loop { + match job_receiver.recv() { + Ok(job) => { + let ValidationJob { token, block } = job; + + // Perform CPU-intensive validation without holding any locks + // These checks don't require database access + + // Check 1: Verify block hash + if !check_block_hash(&block) { + debug!("Block {} failed hash validation", block.index); + let _ = result_sender.send(PreValidationResult::Invalid(token, block)); + continue; + } + + // Check 2: Verify block signature + if !check_block_signature(&block) { + debug!("Block {} failed signature validation", block.index); + let _ = result_sender.send(PreValidationResult::Invalid(token, block)); + continue; + } + + // Check 3: Verify hash difficulty matches claimed difficulty + if hash_difficulty(&block.hash) < block.difficulty { + debug!("Block {} hash difficulty doesn't match claimed difficulty", block.index); + let _ = result_sender.send(PreValidationResult::Invalid(token, block)); + continue; + } + + // Block passed CPU-intensive checks, send for DB validation + let _ = result_sender.send(PreValidationResult::NeedsDbValidation(token, block)); + } + Err(_) => { + // Channel closed, exit worker thread + break; + } + } + } } pub fn start(&mut self) { @@ -175,6 +282,9 @@ impl Network { }; let _ = debug_send.send(format!("Handle connection event: {:?} for peer {}", &event, &peer)); if !self.handle_connection_event(poll.registry(), event, &mut seen_blocks, &mut buffer) { + // Record failure and remove pending requests for this peer + self.peer_rtt.record_failure(&token); + self.pending_requests.retain(|_index, (_time, t)| *t != token); let _ = self.peers.close_peer(poll.registry(), &token); let blocks = self.context.lock().unwrap().chain.get_height(); let keys = self.context.lock().unwrap().chain.get_users_count(); @@ -185,6 +295,11 @@ impl Network { } } let _ = debug_send.send(String::from("After events iter")); + + // Process validation results from worker threads + let _ = debug_send.send(String::from("Process validation results")); + self.process_validation_results(); + if last_events_time.elapsed().as_secs() > MAX_IDLE_SECONDS { if self.peers.get_peers_count() > 0 { warn!("Something is wrong with swarm connections, closing all."); @@ -230,7 +345,6 @@ impl Network { warn!("Last network events time {} seconds ago", elapsed); } log_timer = Instant::now(); - seen_blocks.clear(); } if nodes < MAX_NODES && connect_timer.elapsed().as_secs() >= 2 { self.peers.connect_new_peers(poll.registry(), &mut self.token, yggdrasil_only); @@ -239,8 +353,20 @@ impl Network { (blocks, max_height, context.chain.get_last_hash()) }; + // Periodic sync maintenance: gap retries and idle peer kicks + if height < max_height { + self.sync_maintain(height, max_height); + } else if height >= max_height && !self.future_blocks.is_empty() { + // We've caught up but have stale future_blocks — clean them up + self.future_blocks.clear(); + self.pending_requests.clear(); + post(crate::event::Event::SyncFinished); + } + let _ = debug_send.send(String::from("Peers update")); - let have_blocks: HashSet = self.future_blocks.values().map(|block| block.index).collect(); + let mut have_blocks: HashSet = self.future_blocks.keys().copied().collect(); + // Also include blocks that are pending validation to avoid re-requesting them + have_blocks.extend(self.pending_requests.keys().copied()); self.peers.update(poll.registry(), hash, height, max_height, have_blocks); ui_timer = Instant::now(); } @@ -454,14 +580,23 @@ impl Network { peer.set_state(State::idle()); } State::Idle { from } => { - debug!("Odd version of pings for {}", peer.get_addr().ip()); - if from.elapsed().as_secs() >= 120 { - let data: Vec = { - let c = self.context.lock().unwrap(); - let message = Message::ping(c.chain.get_height(), c.chain.get_last_hash()); - encode_message(&message, peer.get_cipher()).unwrap() - }; - send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending ping {}", e)); + if peer.has_queued_messages() { + // Send ONE queued message at a time to avoid flooding remote peer + if let Some(queued_data) = peer.pop_message() { + if let Ok(data) = encode_bytes(&queued_data, peer.get_cipher()) { + send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending queued message {}", e)); + } + } + } else { + debug!("Odd version of pings for {}", peer.get_addr().ip()); + if from.elapsed().as_secs() >= 120 { + let data: Vec = { + let c = self.context.lock().unwrap(); + let message = Message::ping(c.chain.get_height(), c.chain.get_last_hash()); + encode_message(&message, peer.get_cipher()).unwrap() + }; + send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending ping {}", e)); + } } } State::Error => {} @@ -504,6 +639,7 @@ impl Network { } else if origin.eq(my_origin) { let peer = self.peers.get_mut_peer(token).unwrap(); debug!("Incoming v{} on {}", &app_version, peer.get_addr().ip()); + peer.set_version(Version::parse(&app_version)); let app_version = self.context.lock().unwrap().app_version.clone(); if version == my_version { peer.set_public(public); @@ -538,6 +674,7 @@ impl Network { let peer = self.peers.get_mut_peer(token).unwrap(); // TODO check rand_id whether we have this peers connection already debug!("Outgoing v{} on {}", &app_version, peer.get_addr().ip()); + peer.set_version(Version::parse(&app_version)); peer.set_height(height); peer.set_active(true); peer.set_public(public); @@ -564,10 +701,20 @@ impl Network { return State::message(Message::pong(my_height, my_hash)); } if peer.is_higher(my_height) { - let mut context = self.context.lock().unwrap(); - context.chain.update_max_height(height); - info!("Peer is higher, requesting block {} from {}", my_height + 1, peer.get_addr().ip()); - State::message(Message::GetBlock { index: my_height + 1 }) + let max_height = { + let mut context = self.context.lock().unwrap(); + context.chain.update_max_height(height); + context.chain.get_max_height() + }; + // Start pipeline: queue a block request, then send pong + if let Some(idx) = self.next_block_to_request(my_height, max_height) { + let peer = self.peers.get_mut_peer(token).unwrap(); + if peer.can_send() { + peer.queue_message(Message::GetBlock { index: idx }); + self.pending_requests.insert(idx, (Instant::now(), *token)); + } + } + State::message(Message::pong(my_height, my_hash)) } else if my_height == height && hash.ne(&my_hash) { info!("Hashes are different, requesting block {} from {}", my_height, peer.get_addr().ip()); info!("My hash: {:?}, their hash: {:?}", &my_hash, &hash); @@ -585,10 +732,17 @@ impl Network { return State::idle(); } if peer.is_higher(my_height) { - let mut context = self.context.lock().unwrap(); - context.chain.update_max_height(height); - info!("Peer is higher, requesting block {} from {}", my_height + 1, peer.get_addr().ip()); - State::message(Message::GetBlock { index: my_height + 1 }) + let max_height = { + let mut context = self.context.lock().unwrap(); + context.chain.update_max_height(height); + context.chain.get_max_height() + }; + // Start pipeline: request next needed block from this peer + if let Some(idx) = self.next_block_to_request(my_height, max_height) { + self.pending_requests.insert(idx, (Instant::now(), *token)); + return State::message(Message::GetBlock { index: idx }); + } + State::idle() } else if my_height == height && hash.ne(&my_hash) { info!("Hashes are different, requesting block {} from {}", my_height, peer.get_addr().ip()); info!("My hash: {:?}, their hash: {:?}", &my_hash, &hash); @@ -636,12 +790,53 @@ impl Network { if index != block.index { return State::Banned; } - debug!("Received block {} with hash {:?}", block.index, &block.hash); - if !seen_blocks.contains(&block.hash) { - self.handle_block(token, block, seen_blocks) - } else { - State::idle() + let peer_addr = self.peers.get_peer(token).map_or("unknown".to_string(), |p| p.get_addr().ip().to_string()); + debug!("Received block {} with hash {:?} from {}", block.index, &block.hash, &peer_addr); + // Record RTT but keep in pending_requests until validation completes + // (prevents re-requesting while block is in validation channel) + if let Some((request_time, peer_token)) = self.pending_requests.get(&block.index) { + let rtt_ms = request_time.elapsed().as_secs_f64() * 1000.0; + self.peer_rtt.record_success(peer_token, rtt_ms); } + // Skip blocks we already have in the chain + let current_height = self.context.lock().unwrap().chain.get_height(); + if block.index <= current_height { + let peer_addr = self.peers.get_peer(token).map_or("unknown".to_string(), |p| p.get_addr().ip().to_string()); + debug!("Skipping stale block {} from {} (height is {})", block.index, peer_addr, current_height); + return State::idle(); + } + if !seen_blocks.contains(&block.hash) { + seen_blocks.insert(block.hash.clone()); + // Send block to validation worker threads for parallel processing + match self.validation_sender.try_send(ValidationJob { + token: *token, + block, + }) { + Ok(_) => {}, + Err(crossbeam_channel::TrySendError::Full(job)) => { + debug!("Validation queue full, deferring block {}", job.block.index); + self.future_blocks.insert(job.block.index, job.block); + }, + Err(crossbeam_channel::TrySendError::Disconnected(_)) => { + warn!("Validation worker threads have stopped"); + }, + } + } + + // Pipeline: immediately request the next needed block from this peer + let (my_height, max_height) = { + let c = self.context.lock().unwrap(); + (c.chain.get_height(), c.chain.get_max_height()) + }; + if my_height < max_height { + if let Some(next_idx) = self.next_block_to_request(my_height, max_height) { + let peer_addr = self.peers.get_peer(token).map_or("unknown".to_string(), |p| p.get_addr().ip().to_string()); + debug!("Requesting block {next_idx} from {}", &peer_addr); + self.pending_requests.insert(next_idx, (Instant::now(), *token)); + return State::message(Message::GetBlock { index: next_idx }); + } + } + State::idle() } Message::Twin => State::Twin, Message::Loop => State::Loop @@ -649,79 +844,260 @@ impl Network { answer } - fn handle_block(&mut self, token: &Token, block: Block, seen_blocks: &mut HashSet) -> State { - seen_blocks.insert(block.hash.clone()); - let peers_count = self.peers.get_peers_active_count(); - let peer = self.peers.get_mut_peer(token).unwrap(); - peer.set_received_block(block.index); - trace!("New block from {}", &peer.get_addr()); + /// Find the next block that needs to be requested within the sync window. + /// Returns None if all blocks in the window are already requested or received. + fn next_block_to_request(&self, my_height: u64, max_height: u64) -> Option { + const SYNC_WINDOW: u64 = 500; + let end = max_height.min(my_height + SYNC_WINDOW); + for idx in (my_height + 1)..=end { + if !self.future_blocks.contains_key(&idx) && !self.pending_requests.contains_key(&idx) { + return Some(idx); + } + } + None + } - let mut context = self.context.lock().unwrap(); - let max_height = context.chain.get_max_height(); - match context.chain.check_new_block(&block) { - BlockQuality::Good => { - let mut next_index = block.index + 1; - context.chain.add_block(block); - // If we have some consequent blocks in a bucket of 'future blocks', we add them - while let Some(block) = self.future_blocks.remove(&next_index) { - if context.chain.check_new_block(&block) == BlockQuality::Good { - debug!("Added block {} from future blocks", next_index); - context.chain.add_block(block); - } else { - warn!("Block {} in future blocks is bad!", block.index); + /// Periodic sync maintenance: retry gap blocks, kick idle peers into the pipeline. + fn sync_maintain(&mut self, my_height: u64, max_height: u64) { + const GAP_TIMEOUT_MSECS: u128 = 1500; + + let now = Instant::now(); + + // Record failures for timed-out requests before cleanup + let timed_out: Vec = self.pending_requests.iter() + .filter(|(_, (t, _))| t.elapsed().as_millis() >= GAP_TIMEOUT_MSECS) + .map(|(_, (_, tok))| *tok) + .collect(); + for tok in &timed_out { + self.peer_rtt.record_failure(tok); + } + + // Clean up stale requests: timed out or peer no longer active + self.pending_requests.retain(|_index, (request_time, token)| { + if request_time.elapsed().as_millis() >= GAP_TIMEOUT_MSECS * 2 { + return false; + } + match self.peers.get_peer(token) { + Some(peer) => peer.active(), + None => false, + } + }); + + let raw_peers = self.peers.get_active_peer_tokens(); + if raw_peers.is_empty() { + return; + } + let active_peers = self.peer_rtt.select_ordered(&raw_peers); + + // Retry gap blocks with shorter timeout — push to FRONT of fastest peer's queue + let min_future_block = self.future_blocks.keys().min().copied(); + if let Some(min_future) = min_future_block { + // active_peers is already ranked by RTT (fastest first) + let mut gap_peer_idx = 0; + for block_index in (my_height + 1)..min_future { + if self.future_blocks.contains_key(&block_index) { + continue; + } + if let Some((req_time, old_token)) = self.pending_requests.get(&block_index) { + if req_time.elapsed().as_millis() < GAP_TIMEOUT_MSECS { + continue; + } + self.peer_rtt.record_failure(old_token); + // Skip the failed peer + if let Some(pos) = active_peers.iter().position(|t| t == old_token) { + if pos == gap_peer_idx { + gap_peer_idx = (gap_peer_idx + 1) % active_peers.len(); + } + } + debug!("Gap block {} timed out, re-requesting (have future from {})", block_index, min_future); + } + + // Find a peer that can accept a message (old nodes only allow 1 in flight) + let mut sent = false; + for _ in 0..active_peers.len() { + let peer_token = active_peers[gap_peer_idx % active_peers.len()]; + gap_peer_idx = (gap_peer_idx + 1) % active_peers.len(); + if let Some(peer) = self.peers.get_mut_peer(&peer_token) { + if !peer.can_send() { + continue; + } + peer.queue_priority_message(Message::GetBlock { index: block_index }); + self.pending_requests.insert(block_index, (now, peer_token)); + debug!("Requesting gap block {} from {} (priority)", block_index, peer.get_addr().ip()); + sent = true; break; } - next_index += 1; } - let my_height = context.chain.get_height(); - post(crate::event::Event::BlockchainChanged { index: my_height }); - // If it was the last block to sync - if my_height == max_height { - post(crate::event::Event::SyncFinished); - self.future_blocks.clear(); - } else { - let event = crate::event::Event::Syncing { have: my_height, height: max(max_height, my_height) }; - post(event); - } - let domains = context.chain.get_domains_count(); - let keys = context.chain.get_users_count(); - post(crate::event::Event::NetworkStatus { blocks: my_height, domains, keys, nodes: peers_count }); - } - BlockQuality::Twin => { debug!("Ignoring duplicate block {}", block.index); } - BlockQuality::Future => { - debug!("Got future block {}", block.index); - self.future_blocks.insert(block.index, block); - } - BlockQuality::Bad => { - // TODO save bad public keys to banned table - debug!("Ignoring bad block from {}:\n{:?}", peer.get_addr(), &block); - let height = context.chain.get_height(); - if height + 1 == block.index { - context.chain.update_max_height(height); - post(crate::event::Event::SyncFinished); - } - return State::Banned; - } - BlockQuality::Rewind => { - debug!("Got some orphan block, requesting its parent"); - return State::message(Message::GetBlock { index: block.index - 1 }); - } - BlockQuality::Fork => { - debug!("Got forked block {} with hash {:?}", block.index, block.hash); - // If we are very much behind of blockchain - let lagged = block.index == context.chain.get_height() && block.index + LIMITED_CONFIDENCE_DEPTH <= max_height; - let our_block = context.chain.get_block(block.index).unwrap(); - if block.is_better_than(&our_block) || lagged { - context.chain.replace_block(block).expect("Error replacing block with fork"); - let index = context.chain.get_height(); - post(crate::event::Event::BlockchainChanged { index }); - } else { - debug!("Fork in not better than our block, dropping."); - return State::message(Message::block(our_block.index, our_block.as_bytes())); + if !sent { + break; // All peers busy, try next cycle + } + } + } + + // Kick idle peers into the pipeline (peers with no pending requests) + for t in &active_peers { + let has_pending = self.pending_requests.values().any(|(_, tk)| tk == t); + if has_pending { + continue; + } + if let Some(peer) = self.peers.get_peer(t) { + if !peer.get_state().is_idle() || peer.has_queued_messages() { + continue; + } + } + if let Some(idx) = self.next_block_to_request(my_height, max_height) { + if let Some(peer) = self.peers.get_mut_peer(t) { + peer.queue_message(Message::GetBlock { index: idx }); + self.pending_requests.insert(idx, (now, *t)); + debug!("Kicking idle peer {} with block {}", peer.get_addr().ip(), idx); + } + } + } + } + + /// Process validation results from worker threads and add validated blocks to chain + fn process_validation_results(&mut self) { + // Process all available validation results without blocking + while let Ok(result) = self.validation_receiver.try_recv() { + match result { + PreValidationResult::NeedsDbValidation(token, block) => { + // CPU-intensive validation passed, now do DB-dependent validation + let peers_count = self.peers.get_peers_active_count(); + + // Update peer state + if let Some(peer) = self.peers.get_mut_peer(&token) { + peer.set_received_block(block.index); + trace!("Validated block {} from {}", block.index, peer.get_addr()); + } else { + // Peer disconnected, but we can still process the block + trace!("Validated block {} from disconnected peer", block.index); + } + + // Lock context only for DB operations + let mut context = self.context.lock().unwrap(); + let my_height = context.chain.get_height(); + let max_height = context.chain.get_max_height(); + + // Skip stale blocks that are at or below current height (late pipeline responses) + if block.index <= my_height { + debug!("Ignoring stale block {} (height is {})", block.index, my_height); + self.pending_requests.remove(&block.index); + continue; + } + + // Do remaining DB-dependent validation and add to chain + match context.chain.check_new_block(&block) { + BlockQuality::Good => { + let block_index = block.index; + let mut next_index = block.index + 1; + context.chain.add_block(block); + + // Clean up pending request for this block + self.pending_requests.remove(&block_index); + + // Process future blocks that are now ready + while let Some(block) = self.future_blocks.remove(&next_index) { + if context.chain.check_new_block(&block) == BlockQuality::Good { + debug!("Added block {} from future blocks", next_index); + context.chain.add_block(block); + self.pending_requests.remove(&next_index); + } else { + warn!("Block {} in future blocks is bad!", block.index); + break; + } + next_index += 1; + } + + let my_height = context.chain.get_height(); + post(crate::event::Event::BlockchainChanged { index: my_height }); + + // Check if sync is finished + if my_height >= max_height { + post(crate::event::Event::SyncFinished); + self.future_blocks.clear(); + } else { + let event = crate::event::Event::Syncing { + have: my_height, + height: max(max_height, my_height) + }; + post(event); + } + + let domains = context.chain.get_domains_count(); + let keys = context.chain.get_users_count(); + post(crate::event::Event::NetworkStatus { + blocks: my_height, + domains, + keys, + nodes: peers_count + }); + } + BlockQuality::Twin => { + debug!("Ignoring duplicate block {}", block.index); + } + BlockQuality::Future => { + debug!("Got future block {}", block.index); + let block_index = block.index; + self.future_blocks.insert(block.index, block); + // Clean up pending request since we have this block now + self.pending_requests.remove(&block_index); + } + BlockQuality::Bad => { + debug!("Block {} failed DB validation", block.index); + if let Some(peer) = self.peers.get_mut_peer(&token) { + debug!("Banning peer {} for bad block", peer.get_addr()); + // Mark peer for banning + peer.set_state(State::Banned); + } + let height = context.chain.get_height(); + if height + 1 == block.index { + context.chain.update_max_height(height); + post(crate::event::Event::SyncFinished); + } + } + BlockQuality::Rewind => { + debug!("Got orphan block {}, requesting parent", block.index); + if let Some(peer) = self.peers.get_mut_peer(&token) { + peer.set_state(State::message(Message::GetBlock { + index: block.index - 1 + })); + } + } + BlockQuality::Fork => { + debug!("Got forked block {} with hash {:?}", block.index, block.hash); + let lagged = block.index == context.chain.get_height() + && block.index + LIMITED_CONFIDENCE_DEPTH <= max_height; + + if let Some(our_block) = context.chain.get_block(block.index) { + if block.is_better_than(&our_block) || lagged { + context.chain.replace_block(block) + .expect("Error replacing block with fork"); + let index = context.chain.get_height(); + post(crate::event::Event::BlockchainChanged { index }); + } else { + debug!("Fork is not better than our block, dropping"); + if let Some(peer) = self.peers.get_mut_peer(&token) { + peer.set_state(State::message(Message::block( + our_block.index, + our_block.as_bytes() + ))); + } + } + } + } + } + // Context lock is dropped here + } + PreValidationResult::Invalid(token, block) => { + // Block failed CPU-intensive validation + debug!("Block {} failed pre-validation (hash/signature)", block.index); + if let Some(peer) = self.peers.get_mut_peer(&token) { + debug!("Banning peer {} for invalid block", peer.get_addr()); + peer.set_state(State::Banned); + } } } } - State::idle() } /// Gets new token from old token, mutating the last diff --git a/src/p2p/peer.rs b/src/p2p/peer.rs index 86d6030..e9117fe 100644 --- a/src/p2p/peer.rs +++ b/src/p2p/peer.rs @@ -1,3 +1,4 @@ +use std::collections::VecDeque; use std::fmt::Display; use std::net::SocketAddr; use std::time::Instant; @@ -5,13 +6,16 @@ use std::time::Instant; use mio::net::TcpStream; use crate::crypto::Chacha; +use crate::p2p::message::Message; use crate::p2p::State; +use crate::p2p::version::Version; #[derive(Debug)] pub struct Peer { addr: SocketAddr, stream: TcpStream, state: State, + outgoing: VecDeque>, id: String, height: u64, inbound: bool, @@ -21,7 +25,8 @@ pub struct Peer { reconnects: u32, received_block: u64, sent_height: u64, - cipher: Option + cipher: Option, + version: Version, } impl Peer { @@ -30,6 +35,7 @@ impl Peer { addr, stream, state, + outgoing: VecDeque::new(), id: String::new(), height: 0, inbound, @@ -39,7 +45,8 @@ impl Peer { reconnects: 0, received_block: 0, sent_height: 0, - cipher: None + cipher: None, + version: Version::default(), } } @@ -78,6 +85,52 @@ impl Peer { self.state = state; } + /// Queue a message for sending. Does not change peer state. + pub fn queue_message(&mut self, msg: Message) { + let data = serde_cbor::to_vec(&msg).unwrap(); + self.outgoing.push_back(data); + } + + /// Queue a high-priority message at the front (e.g. gap block requests) + pub fn queue_priority_message(&mut self, msg: Message) { + let data = serde_cbor::to_vec(&msg).unwrap(); + self.outgoing.push_front(data); + } + + pub fn has_queued_messages(&self) -> bool { + !self.outgoing.is_empty() + } + + pub fn queued_count(&self) -> usize { + self.outgoing.len() + } + + pub fn pop_message(&mut self) -> Option> { + self.outgoing.pop_front() + } + + pub fn set_version(&mut self, version: Version) { + self.version = version; + } + + pub fn get_version(&self) -> &Version { + &self.version + } + + /// Old nodes (< 0.8.9) overwrite outgoing messages, can only have 1 in flight + pub fn supports_queue(&self) -> bool { + self.version >= Version { major: 0, minor: 8, patch: 9 } + } + + /// Check if we can send another message to this peer + pub fn can_send(&self) -> bool { + if self.supports_queue() { + true + } else { + self.outgoing.is_empty() + } + } + pub fn get_id(&self) -> &str { &self.id } diff --git a/src/p2p/peers.rs b/src/p2p/peers.rs index 5e1a342..d97a249 100644 --- a/src/p2p/peers.rs +++ b/src/p2p/peers.rs @@ -202,6 +202,13 @@ impl Peers { count } + pub fn get_active_peer_tokens(&self) -> Vec { + self.peers.iter() + .filter(|(_, peer)| peer.active()) + .map(|(token, _)| *token) + .collect() + } + pub fn is_tween_connect(&self, id: &str) -> bool { for (_, peer) in self.peers.iter() { if peer.active() && peer.get_id() == id { @@ -255,7 +262,10 @@ impl Peers { let mut stale_tokens = Vec::new(); for (token, peer) in self.peers.iter_mut() { if let State::Idle { from } = peer.get_state() { - if from.elapsed().as_secs() >= PING_PERIOD + random_time { + if peer.has_queued_messages() { + let stream = peer.get_stream(); + registry.reregister(stream, *token, Interest::WRITABLE).unwrap(); + } else if from.elapsed().as_secs() >= PING_PERIOD + random_time { // Sometimes we check for new peers instead of pinging let message = if nodes < MAX_NODES && random::() { Message::GetPeers @@ -273,7 +283,7 @@ impl Peers { stale_tokens.push((token.clone(), peer.get_addr())); continue; } - if matches!(peer.get_state(), State::Message {..}) { + if matches!(peer.get_state(), State::Message {..}) || peer.has_queued_messages() { let stream = peer.get_stream(); registry.reregister(stream, *token, Interest::WRITABLE).unwrap(); } @@ -288,7 +298,7 @@ impl Peers { self.ignored.retain(|_addr, time| { time.elapsed().as_secs() < 600 }); // If someone has more blocks we sync - if nodes >= MIN_CONNECTED_NODES_START_SYNC && height < max_height { + if nodes >= MIN_CONNECTED_NODES_START_SYNC && height < max_height && have_blocks.is_empty() { // Give some opportunity to get more peers instead of requests for blocks only let request_blocks = nodes >= 10 || random::(); if request_blocks {