Disabled full blockchain check on start.

Fixed non-working debug version.
Fixed p2p connections handling.
Lowered ping interval to 30-60 seconds.
Fixed stuck with lower number of blocks.
This commit is contained in:
Revertron
2021-04-21 23:11:10 +02:00
parent 92b03344eb
commit 864edab203
3 changed files with 80 additions and 66 deletions
+5 -3
View File
@@ -91,9 +91,11 @@ impl Chain {
self.last_full_block = self.get_last_full_block(u64::MAX, None);
}
}
self.check_chain();
// TODO Add env-var and commandline switches for full check
//self.check_chain();
}
#[allow(dead_code)]
fn check_chain(&mut self) {
let height = self.get_height();
info!("Local blockchain height is {}, starting full blockchain check...", height);
@@ -693,7 +695,7 @@ impl Chain {
}
}
pub fn last_hash(&self) -> Bytes {
pub fn get_last_hash(&self) -> Bytes {
match &self.last_block {
None => { Bytes::default() }
Some(block) => { block.hash.clone() }
@@ -988,7 +990,7 @@ impl Chain {
let mut count = 1;
let window = block.index - 1; // Without the last block
while set.len() < BLOCK_SIGNERS_ALL as usize {
let index = ((tail * count) % window) + 1; // We want it to start from 1
let index = (tail.wrapping_mul(count) % window) + 1; // We want it to start from 1
if let Some(b) = self.get_block(index) {
if b.pub_key != block.pub_key && !set.contains(&b.pub_key) {
result.push(b.pub_key.clone());
+30 -19
View File
@@ -21,7 +21,7 @@ use crate::blockchain::types::BlockQuality;
use crate::commons::*;
const SERVER: Token = Token(0);
const POLL_TIMEOUT: Option<Duration> = Some(Duration::from_millis(1000));
const POLL_TIMEOUT: Option<Duration> = Some(Duration::from_millis(500));
const MAX_PACKET_SIZE: usize = 1 * 1024 * 1024; // 1 Mb
const MAX_READ_BLOCK_TIME: u128 = 500;
@@ -165,7 +165,7 @@ impl Network {
}
log_timer = Instant::now();
}
(height, context.chain.last_hash())
(height, context.chain.get_last_hash())
};
peers.update(poll.registry(), height, hash);
peers.connect_new_peers(poll.registry(), &mut unique_token, yggdrasil_only);
@@ -295,7 +295,7 @@ fn handle_connection_event(context: Arc<Mutex<Context>>, peers: &mut Peers, regi
if from.elapsed().as_secs() >= 30 {
let data: String = {
let c = context.lock().unwrap();
let message = Message::ping(c.chain.get_height(), c.chain.last_hash());
let message = Message::ping(c.chain.get_height(), c.chain.get_last_hash());
serde_json::to_string(&message).unwrap()
};
send_message(peer.get_stream(), &data.into_bytes()).unwrap_or_else(|e| warn!("Error sending ping {}", e));
@@ -371,7 +371,7 @@ fn handle_message(context: Arc<Mutex<Context>>, message: Message, peers: &mut Pe
let (my_height, my_hash, my_origin, my_version) = {
let context = context.lock().unwrap();
// TODO cache it somewhere
(context.chain.get_height(), context.chain.last_hash(), &context.settings.origin.clone(), CHAIN_VERSION)
(context.chain.get_height(), context.chain.get_last_hash(), &context.settings.origin.clone(), CHAIN_VERSION)
};
let answer = match message {
Message::Hand { app_version, origin, version, public, rand} => {
@@ -382,6 +382,7 @@ fn handle_message(context: Arc<Mutex<Context>>, message: Message, peers: &mut Pe
if origin.eq(my_origin) && version == my_version {
let peer = peers.get_mut_peer(token).unwrap();
peer.set_public(public);
peer.set_active(true);
debug!("Hello from v{} on {}", &app_version, peer.get_addr().ip());
State::message(Message::shake(&origin, version, true, my_height))
} else {
@@ -404,13 +405,12 @@ fn handle_message(context: Arc<Mutex<Context>>, message: Message, peers: &mut Pe
if peer.is_higher(my_height) {
context.chain.update_max_height(height);
context.bus.post(crate::event::Event::Syncing { have: my_height, height});
if active_count > 3 {
State::idle()
} else {
State::message(Message::GetPeers)
}
} else {
}
if active_count < 5 || random::<u8>() < 10 {
debug!("Requesting more peers from {}", peer.get_addr().ip());
State::message(Message::GetPeers)
} else {
State::idle()
}
} else {
State::Banned
@@ -425,15 +425,16 @@ fn handle_message(context: Arc<Mutex<Context>>, message: Message, peers: &mut Pe
let mut context = context.lock().unwrap();
context.chain.update_max_height(height);
}
if hash.ne(&my_hash) {
debug!("1Hashes are different, requesting block {} from {}", my_height, peer.get_addr().ip());
debug!("My hash: {:?}, their hash: {:?}", &my_hash, &hash);
if my_height == height && hash.ne(&my_hash) {
info!("Hashes are different, requesting block {} from {}", my_height, peer.get_addr().ip());
info!("My hash: {:?}, their hash: {:?}", &my_hash, &hash);
State::message(Message::GetBlock { index: my_height })
} else {
State::message(Message::pong(my_height, my_hash))
}
}
Message::Pong { height, hash } => {
let active_count = peers.get_peers_active_count();
let peer = peers.get_mut_peer(token).unwrap();
peer.set_height(height);
peer.set_active(true);
@@ -441,12 +442,12 @@ fn handle_message(context: Arc<Mutex<Context>>, message: Message, peers: &mut Pe
let mut context = context.lock().unwrap();
context.chain.update_max_height(height);
}
if hash.ne(&my_hash) {
debug!("2Hashes are different, requesting block {} from {}", my_height, peer.get_addr().ip());
debug!("My hash: {:?}, their hash: {:?}", &my_hash, &hash);
if my_height == height && hash.ne(&my_hash) {
info!("Hashes are different, requesting block {} from {}", my_height, peer.get_addr().ip());
info!("My hash: {:?}, their hash: {:?}", &my_hash, &hash);
State::message(Message::GetBlock { index: my_height })
} else {
if random::<u8>() < 10 {
if active_count < 5 || random::<u8>() < 10 {
debug!("Requesting more peers from {}", peer.get_addr().ip());
State::message(Message::GetPeers)
} else {
@@ -455,14 +456,22 @@ fn handle_message(context: Arc<Mutex<Context>>, message: Message, peers: &mut Pe
}
}
Message::GetPeers => {
let peer = peers.get_peer(token).unwrap();
State::message(Message::Peers { peers: peers.get_peers_for_exchange(&peer.get_addr()) })
let addr = {
let peer = peers.get_mut_peer(token).unwrap();
peer.set_active(true);
peer.get_addr().clone()
};
State::message(Message::Peers { peers: peers.get_peers_for_exchange(&addr) })
}
Message::Peers { peers: new_peers } => {
let peer = peers.get_mut_peer(token).unwrap();
peer.set_active(true);
peers.add_peers_from_exchange(new_peers);
State::idle()
}
Message::GetBlock { index } => {
let peer = peers.get_mut_peer(token).unwrap();
peer.set_active(true);
let context = context.lock().unwrap();
match context.chain.get_block(index) {
Some(block) => State::message(Message::block(block.index, serde_json::to_string(&block).unwrap())),
@@ -470,6 +479,8 @@ fn handle_message(context: Arc<Mutex<Context>>, message: Message, peers: &mut Pe
}
}
Message::Block { index, block } => {
let peer = peers.get_mut_peer(token).unwrap();
peer.set_active(true);
info!("Received block {}", index);
let block: Block = match serde_json::from_str(&block) {
Ok(block) => block,
+45 -44
View File
@@ -13,19 +13,16 @@ use crate::{Bytes, commons};
use crate::commons::*;
use crate::p2p::{Message, Peer, State};
use crate::commons::next;
use std::time::Duration;
use std::io;
const PING_PERIOD: u64 = 60;
const TCP_TIMEOUT: Duration = Duration::from_millis(10000);
const PING_PERIOD: u64 = 30;
pub struct Peers {
peers: HashMap<Token, Peer>,
new_peers: Vec<SocketAddr>,
ignored: HashSet<IpAddr>,
my_id: String,
asked_block: u64,
asked_time: i64,
block_asked_time: i64,
}
impl Peers {
@@ -35,8 +32,7 @@ impl Peers {
new_peers: Vec::new(),
ignored: HashSet::new(),
my_id: commons::random_string(6),
asked_block: 0,
asked_time: 0
block_asked_time: 0
}
}
@@ -217,7 +213,6 @@ impl Peers {
}
pub fn update(&mut self, registry: &Registry, height: u64, hash: Bytes) {
let mut ping_sent = false;
for (token, peer) in self.peers.iter_mut() {
match peer.get_state() {
State::Idle { from } => {
@@ -225,7 +220,7 @@ impl Peers {
if from.elapsed().as_secs() >= PING_PERIOD + random_time {
// Sometimes we check for new peers instead of pinging
let random: u8 = random();
let message = if random < 16 {
let message = if random < 10 {
Message::GetPeers
} else {
Message::ping(height, hash.clone())
@@ -234,7 +229,6 @@ impl Peers {
peer.set_state(State::message(message));
let stream = peer.get_stream();
registry.reregister(stream, token.clone(), Interest::WRITABLE).unwrap();
ping_sent = true;
}
}
_ => {}
@@ -242,7 +236,7 @@ impl Peers {
}
// If someone has more blocks we sync
if self.need_ask_block(height + 1) {
if self.need_ask_block() {
let mut rng = rand::thread_rng();
let mut asked = false;
match self.peers
@@ -254,17 +248,16 @@ impl Peers {
debug!("Found some peer higher than we are, requesting block {}, from {}", height + 1, &peer.get_addr().ip());
registry.reregister(peer.get_stream(), token.clone(), Interest::WRITABLE).unwrap();
peer.set_state(State::message(Message::GetBlock { index: height + 1 }));
ping_sent = true;
asked = true;
}
}
if asked {
self.set_asked_block(height + 1);
self.update_asked_block_time();
}
}
// If someone has less blocks (we mined a new block) we send a ping with our height
if !ping_sent {
{
let mut rng = rand::thread_rng();
match self.peers
.iter_mut()
@@ -326,25 +319,36 @@ impl Peers {
pub fn connect_peers(&mut self, peers_addrs: &Vec<String>, registry: &Registry, unique_token: &mut Token, yggdrasil_only: bool) {
let mut set = HashSet::new();
for peer in peers_addrs.iter() {
info!("Resolving address {}", peer);
let mut addresses: Vec<SocketAddr> = match peer.to_socket_addrs() {
Ok(peers) => { peers.collect() }
Err(_) => { error!("Can't resolve address {}", &peer); continue; }
};
info!("Got addresses: {:?}", &addresses);
// At first we connect to one peer address from every "peer" or domain
let addr = addresses.remove(0);
if !set.contains(&addr) {
match self.connect_peer(&addr, registry, unique_token, yggdrasil_only) {
Ok(_) => {
set.insert(addr);
}
Err(_) => {
debug!("Could not connect to {}", &addr);
// At first we connect to 5 peer addresses
if set.len() >= 10 {
break;
}
while addresses.len() > 0 {
let addr = addresses.remove(0);
if !set.contains(&addr) {
match self.connect_peer(&addr, registry, unique_token, yggdrasil_only) {
Ok(_) => {
set.insert(addr);
}
Err(_) => {
debug!("Could not connect to {}", &addr);
}
}
}
}
// Copy others to new_peers, to connect later
self.new_peers.append(&mut addresses);
if addresses.len() > 0 {
self.new_peers.append(&mut addresses);
}
}
}
@@ -356,31 +360,28 @@ impl Peers {
debug!("Ignoring not Yggdrasil address '{}'", &addr.ip());
return Err(io::Error::from(io::ErrorKind::InvalidInput));
}
if let Ok(stream) = std::net::TcpStream::connect_timeout(&addr.clone(), TCP_TIMEOUT) {
stream.set_nodelay(true)?;
stream.set_read_timeout(Some(TCP_TIMEOUT))?;
stream.set_write_timeout(Some(TCP_TIMEOUT))?;
stream.set_nonblocking(true)?;
let mut stream = TcpStream::from_std(stream);
let token = next(unique_token);
trace!("Created connection {}, to peer {}", &token.0, &addr);
registry.register(&mut stream, token, Interest::WRITABLE)?;
let mut peer = Peer::new(addr.clone(), stream, State::Connecting, false);
peer.set_public(true);
self.peers.insert(token, peer);
trace!("Connecting to peer {}", &addr);
match TcpStream::connect(addr.clone()) {
Ok(mut stream ) => {
//stream.set_nodelay(true)?;
let token = next(unique_token);
trace!("Created connection {}, to peer {}", &token.0, &addr);
registry.register(&mut stream, token, Interest::WRITABLE).unwrap();
let mut peer = Peer::new(addr.clone(), stream, State::Connecting, false);
peer.set_public(true);
self.peers.insert(token, peer);
Ok(())
}
Err(e) => { Err(e) }
}
Ok(())
}
pub fn set_asked_block(&mut self, index: u64) {
self.asked_block = index;
self.asked_time = Utc::now().timestamp();
pub fn update_asked_block_time(&mut self) {
self.block_asked_time = Utc::now().timestamp();
}
pub fn need_ask_block(&self, index: u64) -> bool {
index > self.asked_block || self.asked_time + 3 < Utc::now().timestamp()
pub fn need_ask_block(&self) -> bool {
self.block_asked_time + 5 < Utc::now().timestamp()
}
}