Initial sync made a lot faster.

This commit is contained in:
Revertron
2026-04-05 12:13:30 +02:00
parent ef573436d9
commit 9bea173f21
6 changed files with 556 additions and 101 deletions
+19 -4
View File
@@ -975,13 +975,23 @@ impl Chain {
return false;
}
// If this signers' public key has already locked/signed that block we return error
// Use signing_keys cache to avoid repeated DB loads for the same blocks
let mut cache = self.signers.borrow_mut();
for i in (full_block.index + 1)..block.index {
let signer = self.get_block(i).expect("Error in DB!");
if signer.pub_key == block.pub_key {
let pub_key = if let Some(cached) = cache.signing_keys.get(&i) {
cached.clone()
} else {
let signer = self.get_block(i).expect("Error in DB!");
cache.signing_keys.insert(i, signer.pub_key.clone());
signer.pub_key
};
if pub_key == block.pub_key {
warn!("Ignoring block {} from '{:?}', already signed by this key", block.index, &block.pub_key);
return false;
}
}
// Cache this block's pub_key too for future checks
cache.signing_keys.insert(block.index, block.pub_key.clone());
true
}
@@ -1066,6 +1076,7 @@ impl Chain {
let mut signers = self.signers.borrow_mut();
signers.index = block.index;
signers.signers = result.clone();
signers.signing_keys.clear();
result
}
@@ -1087,12 +1098,15 @@ impl Chain {
struct SignersCache {
index: u64,
signers: Vec<Bytes>
signers: Vec<Bytes>,
/// Cache of block_index → pub_key for signing blocks since last full_block.
/// Avoids repeated DB loads in is_good_signer_for_block duplicate check.
signing_keys: HashMap<u64, Bytes>,
}
impl SignersCache {
pub fn new() -> RefCell<SignersCache> {
let cache = SignersCache { index: 0, signers: Vec::new() };
let cache = SignersCache { index: 0, signers: Vec::new(), signing_keys: HashMap::new() };
RefCell::new(cache)
}
@@ -1103,6 +1117,7 @@ impl SignersCache {
pub fn clear(&mut self) {
self.index = 0;
self.signers.clear();
self.signing_keys.clear();
}
}
+1
View File
@@ -3,6 +3,7 @@ pub mod network;
pub mod peer;
pub mod peers;
pub mod state;
pub mod version;
pub use message::Message;
pub use network::Network;
+467 -91
View File
@@ -10,17 +10,22 @@ use std::sync::{Arc, mpsc, Mutex};
use std::time::{Duration, Instant};
use std::{io, thread};
use crossbeam_channel::{bounded, Receiver, Sender};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use mio::event::Event;
use mio::net::{TcpListener, TcpStream};
use mio::{Events, Interest, Poll, Registry, Token};
use crate::commons::rtt_tracker::RttTracker;
use crate::p2p::version::Version;
use rand::{random, Rng, RngCore};
use rand::prelude::thread_rng;
use x25519_dalek::{PublicKey, ReusableSecret};
use crate::blockchain::types::BlockQuality;
use crate::blockchain::hash_utils::{check_block_hash, check_block_signature, hash_difficulty};
use crate::commons::*;
use crate::crypto::Chacha;
use crate::eventbus::{post, register};
@@ -29,6 +34,20 @@ use crate::{Block, Bytes, Context};
const SERVER: Token = Token(0);
/// Job sent to validation worker threads
struct ValidationJob {
token: Token,
block: Block,
}
/// Result from validation worker threads after CPU-intensive checks
enum PreValidationResult {
/// Block passed hash and signature checks, needs DB validation
NeedsDbValidation(Token, Block),
/// Block failed basic validation
Invalid(Token, Block),
}
pub struct Network {
context: Arc<Mutex<Context>>,
secret_key: ReusableSecret,
@@ -37,7 +56,14 @@ pub struct Network {
// States of peer connections, and some data to send when sockets become writable
peers: Peers,
// Orphan blocks from future
future_blocks: HashMap<u64, Block>
future_blocks: HashMap<u64, Block>,
// Validation thread pool channels
validation_sender: Sender<ValidationJob>,
validation_receiver: Receiver<PreValidationResult>,
// Track pending block requests: block_index -> (request_time, peer_token)
pending_requests: HashMap<u64, (Instant, Token)>,
// Track peer response times for adaptive selection
peer_rtt: RttTracker<Token>,
}
impl Network {
@@ -47,7 +73,88 @@ impl Network {
let secret_key = ReusableSecret::random_from_rng(&mut thread_rng);
let public_key = PublicKey::from(&secret_key);
let peers = Peers::new();
Network { context, secret_key, public_key, token: Token(1), peers, future_blocks: HashMap::new() }
// Create validation thread pool
let cpus = num_cpus::get();
let num_workers = cpus.min((cpus / 2).max(1)); // At most half cpus
info!("Starting {num_workers} validation threads");
let channel_capacity = (num_workers * 4).max(100);
let (job_sender, job_receiver) = bounded::<ValidationJob>(channel_capacity);
let (result_sender, result_receiver) = bounded::<PreValidationResult>(channel_capacity);
// Spawn validation worker threads
for i in 0..num_workers {
let job_rx = job_receiver.clone();
let result_tx = result_sender.clone();
thread::Builder::new()
.name(format!("block-validator-{}", i))
.spawn(move || {
Self::validation_worker(job_rx, result_tx);
})
.expect("Failed to spawn validation worker thread");
}
// Drop the extra senders/receivers we don't need
drop(result_sender);
Network {
context,
secret_key,
public_key,
token: Token(1),
peers,
future_blocks: HashMap::new(),
validation_sender: job_sender,
validation_receiver: result_receiver,
pending_requests: HashMap::new(),
peer_rtt: RttTracker::new(),
}
}
/// Worker thread that performs CPU-intensive block validation
fn validation_worker(
job_receiver: Receiver<ValidationJob>,
result_sender: Sender<PreValidationResult>,
) {
loop {
match job_receiver.recv() {
Ok(job) => {
let ValidationJob { token, block } = job;
// Perform CPU-intensive validation without holding any locks
// These checks don't require database access
// Check 1: Verify block hash
if !check_block_hash(&block) {
debug!("Block {} failed hash validation", block.index);
let _ = result_sender.send(PreValidationResult::Invalid(token, block));
continue;
}
// Check 2: Verify block signature
if !check_block_signature(&block) {
debug!("Block {} failed signature validation", block.index);
let _ = result_sender.send(PreValidationResult::Invalid(token, block));
continue;
}
// Check 3: Verify hash difficulty matches claimed difficulty
if hash_difficulty(&block.hash) < block.difficulty {
debug!("Block {} hash difficulty doesn't match claimed difficulty", block.index);
let _ = result_sender.send(PreValidationResult::Invalid(token, block));
continue;
}
// Block passed CPU-intensive checks, send for DB validation
let _ = result_sender.send(PreValidationResult::NeedsDbValidation(token, block));
}
Err(_) => {
// Channel closed, exit worker thread
break;
}
}
}
}
pub fn start(&mut self) {
@@ -175,6 +282,9 @@ impl Network {
};
let _ = debug_send.send(format!("Handle connection event: {:?} for peer {}", &event, &peer));
if !self.handle_connection_event(poll.registry(), event, &mut seen_blocks, &mut buffer) {
// Record failure and remove pending requests for this peer
self.peer_rtt.record_failure(&token);
self.pending_requests.retain(|_index, (_time, t)| *t != token);
let _ = self.peers.close_peer(poll.registry(), &token);
let blocks = self.context.lock().unwrap().chain.get_height();
let keys = self.context.lock().unwrap().chain.get_users_count();
@@ -185,6 +295,11 @@ impl Network {
}
}
let _ = debug_send.send(String::from("After events iter"));
// Process validation results from worker threads
let _ = debug_send.send(String::from("Process validation results"));
self.process_validation_results();
if last_events_time.elapsed().as_secs() > MAX_IDLE_SECONDS {
if self.peers.get_peers_count() > 0 {
warn!("Something is wrong with swarm connections, closing all.");
@@ -230,7 +345,6 @@ impl Network {
warn!("Last network events time {} seconds ago", elapsed);
}
log_timer = Instant::now();
seen_blocks.clear();
}
if nodes < MAX_NODES && connect_timer.elapsed().as_secs() >= 2 {
self.peers.connect_new_peers(poll.registry(), &mut self.token, yggdrasil_only);
@@ -239,8 +353,20 @@ impl Network {
(blocks, max_height, context.chain.get_last_hash())
};
// Periodic sync maintenance: gap retries and idle peer kicks
if height < max_height {
self.sync_maintain(height, max_height);
} else if height >= max_height && !self.future_blocks.is_empty() {
// We've caught up but have stale future_blocks — clean them up
self.future_blocks.clear();
self.pending_requests.clear();
post(crate::event::Event::SyncFinished);
}
let _ = debug_send.send(String::from("Peers update"));
let have_blocks: HashSet<u64> = self.future_blocks.values().map(|block| block.index).collect();
let mut have_blocks: HashSet<u64> = self.future_blocks.keys().copied().collect();
// Also include blocks that are pending validation to avoid re-requesting them
have_blocks.extend(self.pending_requests.keys().copied());
self.peers.update(poll.registry(), hash, height, max_height, have_blocks);
ui_timer = Instant::now();
}
@@ -454,14 +580,23 @@ impl Network {
peer.set_state(State::idle());
}
State::Idle { from } => {
debug!("Odd version of pings for {}", peer.get_addr().ip());
if from.elapsed().as_secs() >= 120 {
let data: Vec<u8> = {
let c = self.context.lock().unwrap();
let message = Message::ping(c.chain.get_height(), c.chain.get_last_hash());
encode_message(&message, peer.get_cipher()).unwrap()
};
send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending ping {}", e));
if peer.has_queued_messages() {
// Send ONE queued message at a time to avoid flooding remote peer
if let Some(queued_data) = peer.pop_message() {
if let Ok(data) = encode_bytes(&queued_data, peer.get_cipher()) {
send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending queued message {}", e));
}
}
} else {
debug!("Odd version of pings for {}", peer.get_addr().ip());
if from.elapsed().as_secs() >= 120 {
let data: Vec<u8> = {
let c = self.context.lock().unwrap();
let message = Message::ping(c.chain.get_height(), c.chain.get_last_hash());
encode_message(&message, peer.get_cipher()).unwrap()
};
send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending ping {}", e));
}
}
}
State::Error => {}
@@ -504,6 +639,7 @@ impl Network {
} else if origin.eq(my_origin) {
let peer = self.peers.get_mut_peer(token).unwrap();
debug!("Incoming v{} on {}", &app_version, peer.get_addr().ip());
peer.set_version(Version::parse(&app_version));
let app_version = self.context.lock().unwrap().app_version.clone();
if version == my_version {
peer.set_public(public);
@@ -538,6 +674,7 @@ impl Network {
let peer = self.peers.get_mut_peer(token).unwrap();
// TODO check rand_id whether we have this peers connection already
debug!("Outgoing v{} on {}", &app_version, peer.get_addr().ip());
peer.set_version(Version::parse(&app_version));
peer.set_height(height);
peer.set_active(true);
peer.set_public(public);
@@ -564,10 +701,20 @@ impl Network {
return State::message(Message::pong(my_height, my_hash));
}
if peer.is_higher(my_height) {
let mut context = self.context.lock().unwrap();
context.chain.update_max_height(height);
info!("Peer is higher, requesting block {} from {}", my_height + 1, peer.get_addr().ip());
State::message(Message::GetBlock { index: my_height + 1 })
let max_height = {
let mut context = self.context.lock().unwrap();
context.chain.update_max_height(height);
context.chain.get_max_height()
};
// Start pipeline: queue a block request, then send pong
if let Some(idx) = self.next_block_to_request(my_height, max_height) {
let peer = self.peers.get_mut_peer(token).unwrap();
if peer.can_send() {
peer.queue_message(Message::GetBlock { index: idx });
self.pending_requests.insert(idx, (Instant::now(), *token));
}
}
State::message(Message::pong(my_height, my_hash))
} else if my_height == height && hash.ne(&my_hash) {
info!("Hashes are different, requesting block {} from {}", my_height, peer.get_addr().ip());
info!("My hash: {:?}, their hash: {:?}", &my_hash, &hash);
@@ -585,10 +732,17 @@ impl Network {
return State::idle();
}
if peer.is_higher(my_height) {
let mut context = self.context.lock().unwrap();
context.chain.update_max_height(height);
info!("Peer is higher, requesting block {} from {}", my_height + 1, peer.get_addr().ip());
State::message(Message::GetBlock { index: my_height + 1 })
let max_height = {
let mut context = self.context.lock().unwrap();
context.chain.update_max_height(height);
context.chain.get_max_height()
};
// Start pipeline: request next needed block from this peer
if let Some(idx) = self.next_block_to_request(my_height, max_height) {
self.pending_requests.insert(idx, (Instant::now(), *token));
return State::message(Message::GetBlock { index: idx });
}
State::idle()
} else if my_height == height && hash.ne(&my_hash) {
info!("Hashes are different, requesting block {} from {}", my_height, peer.get_addr().ip());
info!("My hash: {:?}, their hash: {:?}", &my_hash, &hash);
@@ -636,12 +790,53 @@ impl Network {
if index != block.index {
return State::Banned;
}
debug!("Received block {} with hash {:?}", block.index, &block.hash);
if !seen_blocks.contains(&block.hash) {
self.handle_block(token, block, seen_blocks)
} else {
State::idle()
let peer_addr = self.peers.get_peer(token).map_or("unknown".to_string(), |p| p.get_addr().ip().to_string());
debug!("Received block {} with hash {:?} from {}", block.index, &block.hash, &peer_addr);
// Record RTT but keep in pending_requests until validation completes
// (prevents re-requesting while block is in validation channel)
if let Some((request_time, peer_token)) = self.pending_requests.get(&block.index) {
let rtt_ms = request_time.elapsed().as_secs_f64() * 1000.0;
self.peer_rtt.record_success(peer_token, rtt_ms);
}
// Skip blocks we already have in the chain
let current_height = self.context.lock().unwrap().chain.get_height();
if block.index <= current_height {
let peer_addr = self.peers.get_peer(token).map_or("unknown".to_string(), |p| p.get_addr().ip().to_string());
debug!("Skipping stale block {} from {} (height is {})", block.index, peer_addr, current_height);
return State::idle();
}
if !seen_blocks.contains(&block.hash) {
seen_blocks.insert(block.hash.clone());
// Send block to validation worker threads for parallel processing
match self.validation_sender.try_send(ValidationJob {
token: *token,
block,
}) {
Ok(_) => {},
Err(crossbeam_channel::TrySendError::Full(job)) => {
debug!("Validation queue full, deferring block {}", job.block.index);
self.future_blocks.insert(job.block.index, job.block);
},
Err(crossbeam_channel::TrySendError::Disconnected(_)) => {
warn!("Validation worker threads have stopped");
},
}
}
// Pipeline: immediately request the next needed block from this peer
let (my_height, max_height) = {
let c = self.context.lock().unwrap();
(c.chain.get_height(), c.chain.get_max_height())
};
if my_height < max_height {
if let Some(next_idx) = self.next_block_to_request(my_height, max_height) {
let peer_addr = self.peers.get_peer(token).map_or("unknown".to_string(), |p| p.get_addr().ip().to_string());
debug!("Requesting block {next_idx} from {}", &peer_addr);
self.pending_requests.insert(next_idx, (Instant::now(), *token));
return State::message(Message::GetBlock { index: next_idx });
}
}
State::idle()
}
Message::Twin => State::Twin,
Message::Loop => State::Loop
@@ -649,79 +844,260 @@ impl Network {
answer
}
fn handle_block(&mut self, token: &Token, block: Block, seen_blocks: &mut HashSet<Bytes>) -> State {
seen_blocks.insert(block.hash.clone());
let peers_count = self.peers.get_peers_active_count();
let peer = self.peers.get_mut_peer(token).unwrap();
peer.set_received_block(block.index);
trace!("New block from {}", &peer.get_addr());
/// Find the next block that needs to be requested within the sync window.
/// Returns None if all blocks in the window are already requested or received.
fn next_block_to_request(&self, my_height: u64, max_height: u64) -> Option<u64> {
const SYNC_WINDOW: u64 = 500;
let end = max_height.min(my_height + SYNC_WINDOW);
for idx in (my_height + 1)..=end {
if !self.future_blocks.contains_key(&idx) && !self.pending_requests.contains_key(&idx) {
return Some(idx);
}
}
None
}
let mut context = self.context.lock().unwrap();
let max_height = context.chain.get_max_height();
match context.chain.check_new_block(&block) {
BlockQuality::Good => {
let mut next_index = block.index + 1;
context.chain.add_block(block);
// If we have some consequent blocks in a bucket of 'future blocks', we add them
while let Some(block) = self.future_blocks.remove(&next_index) {
if context.chain.check_new_block(&block) == BlockQuality::Good {
debug!("Added block {} from future blocks", next_index);
context.chain.add_block(block);
} else {
warn!("Block {} in future blocks is bad!", block.index);
/// Periodic sync maintenance: retry gap blocks, kick idle peers into the pipeline.
fn sync_maintain(&mut self, my_height: u64, max_height: u64) {
const GAP_TIMEOUT_MSECS: u128 = 1500;
let now = Instant::now();
// Record failures for timed-out requests before cleanup
let timed_out: Vec<Token> = self.pending_requests.iter()
.filter(|(_, (t, _))| t.elapsed().as_millis() >= GAP_TIMEOUT_MSECS)
.map(|(_, (_, tok))| *tok)
.collect();
for tok in &timed_out {
self.peer_rtt.record_failure(tok);
}
// Clean up stale requests: timed out or peer no longer active
self.pending_requests.retain(|_index, (request_time, token)| {
if request_time.elapsed().as_millis() >= GAP_TIMEOUT_MSECS * 2 {
return false;
}
match self.peers.get_peer(token) {
Some(peer) => peer.active(),
None => false,
}
});
let raw_peers = self.peers.get_active_peer_tokens();
if raw_peers.is_empty() {
return;
}
let active_peers = self.peer_rtt.select_ordered(&raw_peers);
// Retry gap blocks with shorter timeout — push to FRONT of fastest peer's queue
let min_future_block = self.future_blocks.keys().min().copied();
if let Some(min_future) = min_future_block {
// active_peers is already ranked by RTT (fastest first)
let mut gap_peer_idx = 0;
for block_index in (my_height + 1)..min_future {
if self.future_blocks.contains_key(&block_index) {
continue;
}
if let Some((req_time, old_token)) = self.pending_requests.get(&block_index) {
if req_time.elapsed().as_millis() < GAP_TIMEOUT_MSECS {
continue;
}
self.peer_rtt.record_failure(old_token);
// Skip the failed peer
if let Some(pos) = active_peers.iter().position(|t| t == old_token) {
if pos == gap_peer_idx {
gap_peer_idx = (gap_peer_idx + 1) % active_peers.len();
}
}
debug!("Gap block {} timed out, re-requesting (have future from {})", block_index, min_future);
}
// Find a peer that can accept a message (old nodes only allow 1 in flight)
let mut sent = false;
for _ in 0..active_peers.len() {
let peer_token = active_peers[gap_peer_idx % active_peers.len()];
gap_peer_idx = (gap_peer_idx + 1) % active_peers.len();
if let Some(peer) = self.peers.get_mut_peer(&peer_token) {
if !peer.can_send() {
continue;
}
peer.queue_priority_message(Message::GetBlock { index: block_index });
self.pending_requests.insert(block_index, (now, peer_token));
debug!("Requesting gap block {} from {} (priority)", block_index, peer.get_addr().ip());
sent = true;
break;
}
next_index += 1;
}
let my_height = context.chain.get_height();
post(crate::event::Event::BlockchainChanged { index: my_height });
// If it was the last block to sync
if my_height == max_height {
post(crate::event::Event::SyncFinished);
self.future_blocks.clear();
} else {
let event = crate::event::Event::Syncing { have: my_height, height: max(max_height, my_height) };
post(event);
}
let domains = context.chain.get_domains_count();
let keys = context.chain.get_users_count();
post(crate::event::Event::NetworkStatus { blocks: my_height, domains, keys, nodes: peers_count });
}
BlockQuality::Twin => { debug!("Ignoring duplicate block {}", block.index); }
BlockQuality::Future => {
debug!("Got future block {}", block.index);
self.future_blocks.insert(block.index, block);
}
BlockQuality::Bad => {
// TODO save bad public keys to banned table
debug!("Ignoring bad block from {}:\n{:?}", peer.get_addr(), &block);
let height = context.chain.get_height();
if height + 1 == block.index {
context.chain.update_max_height(height);
post(crate::event::Event::SyncFinished);
}
return State::Banned;
}
BlockQuality::Rewind => {
debug!("Got some orphan block, requesting its parent");
return State::message(Message::GetBlock { index: block.index - 1 });
}
BlockQuality::Fork => {
debug!("Got forked block {} with hash {:?}", block.index, block.hash);
// If we are very much behind of blockchain
let lagged = block.index == context.chain.get_height() && block.index + LIMITED_CONFIDENCE_DEPTH <= max_height;
let our_block = context.chain.get_block(block.index).unwrap();
if block.is_better_than(&our_block) || lagged {
context.chain.replace_block(block).expect("Error replacing block with fork");
let index = context.chain.get_height();
post(crate::event::Event::BlockchainChanged { index });
} else {
debug!("Fork in not better than our block, dropping.");
return State::message(Message::block(our_block.index, our_block.as_bytes()));
if !sent {
break; // All peers busy, try next cycle
}
}
}
// Kick idle peers into the pipeline (peers with no pending requests)
for t in &active_peers {
let has_pending = self.pending_requests.values().any(|(_, tk)| tk == t);
if has_pending {
continue;
}
if let Some(peer) = self.peers.get_peer(t) {
if !peer.get_state().is_idle() || peer.has_queued_messages() {
continue;
}
}
if let Some(idx) = self.next_block_to_request(my_height, max_height) {
if let Some(peer) = self.peers.get_mut_peer(t) {
peer.queue_message(Message::GetBlock { index: idx });
self.pending_requests.insert(idx, (now, *t));
debug!("Kicking idle peer {} with block {}", peer.get_addr().ip(), idx);
}
}
}
}
/// Process validation results from worker threads and add validated blocks to chain
fn process_validation_results(&mut self) {
// Process all available validation results without blocking
while let Ok(result) = self.validation_receiver.try_recv() {
match result {
PreValidationResult::NeedsDbValidation(token, block) => {
// CPU-intensive validation passed, now do DB-dependent validation
let peers_count = self.peers.get_peers_active_count();
// Update peer state
if let Some(peer) = self.peers.get_mut_peer(&token) {
peer.set_received_block(block.index);
trace!("Validated block {} from {}", block.index, peer.get_addr());
} else {
// Peer disconnected, but we can still process the block
trace!("Validated block {} from disconnected peer", block.index);
}
// Lock context only for DB operations
let mut context = self.context.lock().unwrap();
let my_height = context.chain.get_height();
let max_height = context.chain.get_max_height();
// Skip stale blocks that are at or below current height (late pipeline responses)
if block.index <= my_height {
debug!("Ignoring stale block {} (height is {})", block.index, my_height);
self.pending_requests.remove(&block.index);
continue;
}
// Do remaining DB-dependent validation and add to chain
match context.chain.check_new_block(&block) {
BlockQuality::Good => {
let block_index = block.index;
let mut next_index = block.index + 1;
context.chain.add_block(block);
// Clean up pending request for this block
self.pending_requests.remove(&block_index);
// Process future blocks that are now ready
while let Some(block) = self.future_blocks.remove(&next_index) {
if context.chain.check_new_block(&block) == BlockQuality::Good {
debug!("Added block {} from future blocks", next_index);
context.chain.add_block(block);
self.pending_requests.remove(&next_index);
} else {
warn!("Block {} in future blocks is bad!", block.index);
break;
}
next_index += 1;
}
let my_height = context.chain.get_height();
post(crate::event::Event::BlockchainChanged { index: my_height });
// Check if sync is finished
if my_height >= max_height {
post(crate::event::Event::SyncFinished);
self.future_blocks.clear();
} else {
let event = crate::event::Event::Syncing {
have: my_height,
height: max(max_height, my_height)
};
post(event);
}
let domains = context.chain.get_domains_count();
let keys = context.chain.get_users_count();
post(crate::event::Event::NetworkStatus {
blocks: my_height,
domains,
keys,
nodes: peers_count
});
}
BlockQuality::Twin => {
debug!("Ignoring duplicate block {}", block.index);
}
BlockQuality::Future => {
debug!("Got future block {}", block.index);
let block_index = block.index;
self.future_blocks.insert(block.index, block);
// Clean up pending request since we have this block now
self.pending_requests.remove(&block_index);
}
BlockQuality::Bad => {
debug!("Block {} failed DB validation", block.index);
if let Some(peer) = self.peers.get_mut_peer(&token) {
debug!("Banning peer {} for bad block", peer.get_addr());
// Mark peer for banning
peer.set_state(State::Banned);
}
let height = context.chain.get_height();
if height + 1 == block.index {
context.chain.update_max_height(height);
post(crate::event::Event::SyncFinished);
}
}
BlockQuality::Rewind => {
debug!("Got orphan block {}, requesting parent", block.index);
if let Some(peer) = self.peers.get_mut_peer(&token) {
peer.set_state(State::message(Message::GetBlock {
index: block.index - 1
}));
}
}
BlockQuality::Fork => {
debug!("Got forked block {} with hash {:?}", block.index, block.hash);
let lagged = block.index == context.chain.get_height()
&& block.index + LIMITED_CONFIDENCE_DEPTH <= max_height;
if let Some(our_block) = context.chain.get_block(block.index) {
if block.is_better_than(&our_block) || lagged {
context.chain.replace_block(block)
.expect("Error replacing block with fork");
let index = context.chain.get_height();
post(crate::event::Event::BlockchainChanged { index });
} else {
debug!("Fork is not better than our block, dropping");
if let Some(peer) = self.peers.get_mut_peer(&token) {
peer.set_state(State::message(Message::block(
our_block.index,
our_block.as_bytes()
)));
}
}
}
}
}
// Context lock is dropped here
}
PreValidationResult::Invalid(token, block) => {
// Block failed CPU-intensive validation
debug!("Block {} failed pre-validation (hash/signature)", block.index);
if let Some(peer) = self.peers.get_mut_peer(&token) {
debug!("Banning peer {} for invalid block", peer.get_addr());
peer.set_state(State::Banned);
}
}
}
}
State::idle()
}
/// Gets new token from old token, mutating the last
+55 -2
View File
@@ -1,3 +1,4 @@
use std::collections::VecDeque;
use std::fmt::Display;
use std::net::SocketAddr;
use std::time::Instant;
@@ -5,13 +6,16 @@ use std::time::Instant;
use mio::net::TcpStream;
use crate::crypto::Chacha;
use crate::p2p::message::Message;
use crate::p2p::State;
use crate::p2p::version::Version;
#[derive(Debug)]
pub struct Peer {
addr: SocketAddr,
stream: TcpStream,
state: State,
outgoing: VecDeque<Vec<u8>>,
id: String,
height: u64,
inbound: bool,
@@ -21,7 +25,8 @@ pub struct Peer {
reconnects: u32,
received_block: u64,
sent_height: u64,
cipher: Option<Chacha>
cipher: Option<Chacha>,
version: Version,
}
impl Peer {
@@ -30,6 +35,7 @@ impl Peer {
addr,
stream,
state,
outgoing: VecDeque::new(),
id: String::new(),
height: 0,
inbound,
@@ -39,7 +45,8 @@ impl Peer {
reconnects: 0,
received_block: 0,
sent_height: 0,
cipher: None
cipher: None,
version: Version::default(),
}
}
@@ -78,6 +85,52 @@ impl Peer {
self.state = state;
}
/// Queue a message for sending. Does not change peer state.
pub fn queue_message(&mut self, msg: Message) {
let data = serde_cbor::to_vec(&msg).unwrap();
self.outgoing.push_back(data);
}
/// Queue a high-priority message at the front (e.g. gap block requests)
pub fn queue_priority_message(&mut self, msg: Message) {
let data = serde_cbor::to_vec(&msg).unwrap();
self.outgoing.push_front(data);
}
pub fn has_queued_messages(&self) -> bool {
!self.outgoing.is_empty()
}
pub fn queued_count(&self) -> usize {
self.outgoing.len()
}
pub fn pop_message(&mut self) -> Option<Vec<u8>> {
self.outgoing.pop_front()
}
pub fn set_version(&mut self, version: Version) {
self.version = version;
}
pub fn get_version(&self) -> &Version {
&self.version
}
/// Old nodes (< 0.8.9) overwrite outgoing messages, can only have 1 in flight
pub fn supports_queue(&self) -> bool {
self.version >= Version { major: 0, minor: 8, patch: 9 }
}
/// Check if we can send another message to this peer
pub fn can_send(&self) -> bool {
if self.supports_queue() {
true
} else {
self.outgoing.is_empty()
}
}
pub fn get_id(&self) -> &str {
&self.id
}
+13 -3
View File
@@ -202,6 +202,13 @@ impl Peers {
count
}
pub fn get_active_peer_tokens(&self) -> Vec<Token> {
self.peers.iter()
.filter(|(_, peer)| peer.active())
.map(|(token, _)| *token)
.collect()
}
pub fn is_tween_connect(&self, id: &str) -> bool {
for (_, peer) in self.peers.iter() {
if peer.active() && peer.get_id() == id {
@@ -255,7 +262,10 @@ impl Peers {
let mut stale_tokens = Vec::new();
for (token, peer) in self.peers.iter_mut() {
if let State::Idle { from } = peer.get_state() {
if from.elapsed().as_secs() >= PING_PERIOD + random_time {
if peer.has_queued_messages() {
let stream = peer.get_stream();
registry.reregister(stream, *token, Interest::WRITABLE).unwrap();
} else if from.elapsed().as_secs() >= PING_PERIOD + random_time {
// Sometimes we check for new peers instead of pinging
let message = if nodes < MAX_NODES && random::<bool>() {
Message::GetPeers
@@ -273,7 +283,7 @@ impl Peers {
stale_tokens.push((token.clone(), peer.get_addr()));
continue;
}
if matches!(peer.get_state(), State::Message {..}) {
if matches!(peer.get_state(), State::Message {..}) || peer.has_queued_messages() {
let stream = peer.get_stream();
registry.reregister(stream, *token, Interest::WRITABLE).unwrap();
}
@@ -288,7 +298,7 @@ impl Peers {
self.ignored.retain(|_addr, time| { time.elapsed().as_secs() < 600 });
// If someone has more blocks we sync
if nodes >= MIN_CONNECTED_NODES_START_SYNC && height < max_height {
if nodes >= MIN_CONNECTED_NODES_START_SYNC && height < max_height && have_blocks.is_empty() {
// Give some opportunity to get more peers instead of requests for blocks only
let request_blocks = nodes >= 10 || random::<bool>();
if request_blocks {