Fixed new sync.
This commit is contained in:
+35
-19
@@ -345,6 +345,7 @@ impl Network {
|
||||
warn!("Last network events time {} seconds ago", elapsed);
|
||||
}
|
||||
log_timer = Instant::now();
|
||||
seen_blocks.clear();
|
||||
}
|
||||
if nodes < MAX_NODES && connect_timer.elapsed().as_secs() >= 2 {
|
||||
self.peers.connect_new_peers(poll.registry(), &mut self.token, yggdrasil_only);
|
||||
@@ -798,13 +799,6 @@ impl Network {
|
||||
let rtt_ms = request_time.elapsed().as_secs_f64() * 1000.0;
|
||||
self.peer_rtt.record_success(peer_token, rtt_ms);
|
||||
}
|
||||
// Skip blocks we already have in the chain
|
||||
let current_height = self.context.lock().unwrap().chain.get_height();
|
||||
if block.index <= current_height {
|
||||
let peer_addr = self.peers.get_peer(token).map_or("unknown".to_string(), |p| p.get_addr().ip().to_string());
|
||||
debug!("Skipping stale block {} from {} (height is {})", block.index, peer_addr, current_height);
|
||||
return State::idle();
|
||||
}
|
||||
if !seen_blocks.contains(&block.hash) {
|
||||
seen_blocks.insert(block.hash.clone());
|
||||
// Send block to validation worker threads for parallel processing
|
||||
@@ -848,7 +842,10 @@ impl Network {
|
||||
/// Returns None if all blocks in the window are already requested or received.
|
||||
fn next_block_to_request(&self, my_height: u64, max_height: u64) -> Option<u64> {
|
||||
const SYNC_WINDOW: u64 = 500;
|
||||
let end = max_height.min(my_height + SYNC_WINDOW);
|
||||
// Don't pipeline until genesis block is added — other blocks would just
|
||||
// queue as "future" and spam warnings since last_block is None.
|
||||
let window = if my_height == 0 { 1 } else { SYNC_WINDOW };
|
||||
let end = max_height.min(my_height + window);
|
||||
for idx in (my_height + 1)..=end {
|
||||
if !self.future_blocks.contains_key(&idx) && !self.pending_requests.contains_key(&idx) {
|
||||
return Some(idx);
|
||||
@@ -975,16 +972,8 @@ impl Network {
|
||||
|
||||
// Lock context only for DB operations
|
||||
let mut context = self.context.lock().unwrap();
|
||||
let my_height = context.chain.get_height();
|
||||
let max_height = context.chain.get_max_height();
|
||||
|
||||
// Skip stale blocks that are at or below current height (late pipeline responses)
|
||||
if block.index <= my_height {
|
||||
debug!("Ignoring stale block {} (height is {})", block.index, my_height);
|
||||
self.pending_requests.remove(&block.index);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Do remaining DB-dependent validation and add to chain
|
||||
match context.chain.check_new_block(&block) {
|
||||
BlockQuality::Good => {
|
||||
@@ -1057,9 +1046,13 @@ impl Network {
|
||||
}
|
||||
BlockQuality::Rewind => {
|
||||
debug!("Got orphan block {}, requesting parent", block.index);
|
||||
// Save the block so it can be processed after the rewind resolves
|
||||
let block_index = block.index;
|
||||
self.future_blocks.insert(block.index, block);
|
||||
self.pending_requests.remove(&block_index);
|
||||
if let Some(peer) = self.peers.get_mut_peer(&token) {
|
||||
peer.set_state(State::message(Message::GetBlock {
|
||||
index: block.index - 1
|
||||
index: block_index - 1
|
||||
}));
|
||||
}
|
||||
}
|
||||
@@ -1070,10 +1063,33 @@ impl Network {
|
||||
|
||||
if let Some(our_block) = context.chain.get_block(block.index) {
|
||||
if block.is_better_than(&our_block) || lagged {
|
||||
let fork_index = block.index;
|
||||
context.chain.replace_block(block)
|
||||
.expect("Error replacing block with fork");
|
||||
let index = context.chain.get_height();
|
||||
post(crate::event::Event::BlockchainChanged { index });
|
||||
let mut next_index = fork_index + 1;
|
||||
// Process future blocks that may now be valid after fork switch
|
||||
while let Some(fb) = self.future_blocks.remove(&next_index) {
|
||||
if context.chain.check_new_block(&fb) == BlockQuality::Good {
|
||||
debug!("Added block {} from future blocks after fork", next_index);
|
||||
context.chain.add_block(fb);
|
||||
self.pending_requests.remove(&next_index);
|
||||
} else {
|
||||
debug!("Future block {} not good after fork", next_index);
|
||||
break;
|
||||
}
|
||||
next_index += 1;
|
||||
}
|
||||
let my_height = context.chain.get_height();
|
||||
post(crate::event::Event::BlockchainChanged { index: my_height });
|
||||
if my_height >= max_height {
|
||||
post(crate::event::Event::SyncFinished);
|
||||
self.future_blocks.clear();
|
||||
} else {
|
||||
post(crate::event::Event::Syncing {
|
||||
have: my_height,
|
||||
height: max(max_height, my_height)
|
||||
});
|
||||
}
|
||||
} else {
|
||||
debug!("Fork is not better than our block, dropping");
|
||||
if let Some(peer) = self.peers.get_mut_peer(&token) {
|
||||
|
||||
Reference in New Issue
Block a user