Changed origin block index to 1. Added "locker" blocks - mining, exchange etc. Removed unnecesarry creation of 'zones' directory on startup. Changed bind port of DNS-UDP socket to random (fixes inability to start several copies of Alfis). Sped up block exchange by sending additional pings when we have more blocks than other peers. Fixed unnecesarry double requests of blocks. Totally reworked block checking on arrival. Added target tags for logging in main. Added a commandline flag to list all blocks in DB and exit.

This commit is contained in:
Revertron
2021-03-06 21:28:06 +01:00
parent 59df68d7c7
commit b0e78edb3d
12 changed files with 465 additions and 147 deletions
+89 -9
View File
@@ -3,7 +3,7 @@ extern crate serde_json;
use std::{io, thread};
use std::io::{Read, Write};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::{Duration, Instant};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
@@ -17,6 +17,7 @@ use crate::{Context, Block, p2p::Message, p2p::State, p2p::Peer, p2p::Peers};
use std::net::{SocketAddr, IpAddr, SocketAddrV4, ToSocketAddrs};
use crate::blockchain::blockchain::BlockQuality;
use crate::blockchain::CHAIN_VERSION;
use chrono::Utc;
const SERVER: Token = Token(0);
const POLL_TIMEOUT: Option<Duration> = Some(Duration::from_millis(3000));
@@ -107,6 +108,7 @@ impl Network {
let context = context.lock().unwrap();
(context.blockchain.height(), context.blockchain.last_hash())
};
mine_locker_block(context.clone());
peers.send_pings(poll.registry(), height, hash);
peers.connect_new_peers(poll.registry(), &mut unique_token);
}
@@ -293,7 +295,7 @@ fn handle_message(context: Arc<Mutex<Context>>, message: Message, peers: &mut Pe
if peer.is_higher(my_height) {
context.blockchain.update_max_height(height);
context.bus.post(crate::event::Event::Syncing { have: my_height, height});
State::message(Message::GetBlock { index: my_height })
State::message(Message::GetBlock { index: my_height + 1 })
} else {
State::message(Message::GetPeers)
}
@@ -306,8 +308,8 @@ fn handle_message(context: Arc<Mutex<Context>>, message: Message, peers: &mut Pe
let peer = peers.get_mut_peer(token).unwrap();
peer.set_height(height);
peer.set_active(true);
if peer.is_higher(my_height) || my_hash.ne(&hash) {
State::message(Message::GetBlock { index: my_height })
if peer.is_higher(my_height) || ( height == my_height && my_hash != hash) {
State::message(Message::GetBlock { index: my_height + 1 })
} else {
State::message(Message::pong(my_height, my_hash))
}
@@ -316,12 +318,17 @@ fn handle_message(context: Arc<Mutex<Context>>, message: Message, peers: &mut Pe
let peer = peers.get_mut_peer(token).unwrap();
peer.set_height(height);
peer.set_active(true);
if peer.is_higher(my_height) || my_hash.ne(&hash) {
let is_higher = peer.is_higher(my_height);
let mut context = context.lock().unwrap();
let blocks_count = context.blockchain.height();
context.bus.post(crate::event::Event::NetworkStatus { nodes: peers.get_peers_active_count(), blocks: blocks_count });
if is_higher {
State::message(Message::GetBlock { index: my_height + 1 })
} else if my_hash != hash {
State::message(Message::GetBlock { index: my_height })
} else {
let mut context = context.lock().unwrap();
let blocks_count = context.blockchain.height();
context.bus.post(crate::event::Event::NetworkStatus { nodes: peers.get_peers_active_count(), blocks: blocks_count });
State::idle()
}
}
@@ -346,7 +353,10 @@ fn handle_message(context: Arc<Mutex<Context>>, message: Message, peers: &mut Pe
Ok(block) => block,
Err(_) => return State::Error
};
let peer = peers.get_mut_peer(token).unwrap();
peer.set_received_block(block.index);
let context = context.clone();
let peers_count = peers.get_peers_active_count();
thread::spawn(move || {
let mut context = context.lock().unwrap();
let max_height = context.blockchain.max_height();
@@ -361,12 +371,17 @@ fn handle_message(context: Arc<Mutex<Context>>, message: Message, peers: &mut Pe
} else {
context.bus.post(crate::event::Event::Syncing { have: my_height, height: max_height});
}
context.bus.post(crate::event::Event::NetworkStatus { nodes: peers_count, blocks: my_height });
}
BlockQuality::Twin => { debug!("Ignoring duplicate block {}", block.index); }
BlockQuality::Future => { debug!("Ignoring future block {}", block.index); }
BlockQuality::Bad => { debug!("Ignoring bad block {} with hash {:?}", block.index, block.hash); }
// TODO deal with forks
BlockQuality::Fork => { debug!("Ignoring forked block {} with hash {:?}", block.index, block.hash); }
BlockQuality::Fork => {
debug!("Ignoring forked block {} with hash {:?}", block.index, block.hash);
//let peer = peers.get_mut_peer(token).unwrap();
//deal_with_fork(context, peer, block);
}
}
});
State::idle()
@@ -374,6 +389,71 @@ fn handle_message(context: Arc<Mutex<Context>>, message: Message, peers: &mut Pe
}
}
/// Sends an Event to miner to start mining locker block if "locker" is our public key
fn mine_locker_block(context: Arc<Mutex<Context>>) {
let mut context = context.lock().unwrap();
if let Some(block) = context.blockchain.last_block() {
if block.index < context.blockchain.max_height() {
info!("No locker mining while syncing");
return;
}
match context.blockchain.get_block_locker(&block, Utc::now().timestamp()) {
Some(key) => {
if key == context.keystore.get_public() {
info!("We have an honor to mine locker block!");
context.bus.post(crate::event::Event::ActionMineLocker { index: block.index + 1, hash: block.hash });
} else {
info!("Locker block must be mined by another node: {:?}", &key);
}
}
None => {}
}
}
}
#[allow(dead_code)]
fn deal_with_fork(context: MutexGuard<Context>, peer: &mut Peer, block: Block) {
peer.add_fork_block(block);
let mut vector: Vec<&Block> = peer.get_fork().values().collect();
vector.sort_by(|a, b| a.index.cmp(&b.index));
if vector[0].index == 0 {
return;
}
if let Some(prev_block) = context.blockchain.get_block(vector[0].index - 1) {
// If this block is not root of the fork (we need to go ~deeper~ more backwards)
if vector[0].prev_block_hash != prev_block.hash {
return;
}
// Okay, prev_block is the common root for our chain and the fork
let mut check_ok = true;
vector.insert(0, &prev_block);
let mut prev_block = &vector[0];
for block in &vector {
if block == prev_block {
continue;
}
if !check_block(block, prev_block) {
check_ok = false;
break;
}
prev_block = block;
}
match check_ok {
true => {
// TODO count fork chain "work" and decide which chain is "better"
}
false => {
warn!("Fork chain is wrong!");
peer.set_state(State::Banned);
}
}
};
}
fn check_block(block: &Block, prev: &Block) -> bool {
prev.index == block.index - 1 && prev.hash == block.prev_block_hash
}
/// Connecting to configured (bootstrap) peers
fn connect_peers(peers_addrs: Vec<String>, poll: &mut Poll, peers: &mut Peers, unique_token: &mut Token) {
for peer in peers_addrs.iter() {
+26 -2
View File
@@ -1,6 +1,8 @@
use crate::p2p::State;
use std::net::SocketAddr;
use std::collections::HashMap;
use mio::net::TcpStream;
use crate::p2p::State;
use crate::Block;
#[derive(Debug)]
pub struct Peer {
@@ -11,11 +13,13 @@ pub struct Peer {
inbound: bool,
public: bool,
active: bool,
received_block: u64,
fork: HashMap<u64, Block>
}
impl Peer {
pub fn new(addr: SocketAddr, stream: TcpStream, state: State, inbound: bool) -> Self {
Peer { addr, stream, state, height: 0, inbound, public: false, active: false }
Peer { addr, stream, state, height: 0, inbound, public: false, active: false, received_block: 0, fork: HashMap::new() }
}
pub fn get_addr(&self) -> SocketAddr {
@@ -46,6 +50,18 @@ impl Peer {
self.height > height
}
pub fn is_lower(&self, height: u64) -> bool {
self.height < height
}
pub fn set_received_block(&mut self, index: u64) {
self.received_block = index;
}
pub fn has_more_blocks(&self, height: u64) -> bool {
self.height > self.received_block && self.height > height && self.get_state().is_idle()
}
pub fn is_public(&self) -> bool {
self.public
}
@@ -70,6 +86,14 @@ impl Peer {
self.inbound
}
pub fn add_fork_block(&mut self, block: Block) {
self.fork.insert(block.index, block);
}
pub fn get_fork(&self) -> &HashMap<u64, Block> {
&self.fork
}
/// If loopback address then we care about ip and port.
/// If regular address then we only care about the ip and ignore the port.
pub fn equals(&self, addr: &SocketAddr) -> bool {
+20 -2
View File
@@ -157,17 +157,35 @@ impl Peers {
}
}
// If someone has more blocks we sync
if !ping_sent {
let mut rng = rand::thread_rng();
match self.peers
.iter_mut()
.filter_map(|(token, peer)| if peer.get_state().is_idle() && peer.is_higher(height) { Some((token, peer)) } else { None })
.filter_map(|(token, peer)| if peer.has_more_blocks(height) { Some((token, peer)) } else { None })
.choose(&mut rng) {
None => {}
Some((token, peer)) => {
debug!("Found some peer higher than we are, sending block request");
registry.reregister(peer.get_stream(), token.clone(), Interest::WRITABLE).unwrap();
peer.set_state(State::message(Message::GetBlock { index: height }));
peer.set_state(State::message(Message::GetBlock { index: height + 1 }));
ping_sent = true;
}
}
}
// If someone has less blocks (we mined a new block) we send a ping with our height
if !ping_sent {
let mut rng = rand::thread_rng();
match self.peers
.iter_mut()
.filter_map(|(token, peer)| if peer.is_lower(height) && peer.get_state().is_idle() { Some((token, peer)) } else { None })
.choose(&mut rng) {
None => {}
Some((token, peer)) => {
debug!("Found some peer lower than we are, sending ping");
registry.reregister(peer.get_stream(), token.clone(), Interest::WRITABLE).unwrap();
peer.set_state(State::message(Message::Ping { height, hash }));
}
}
}