Miner queue refactoring.

This commit is contained in:
Revertron
2021-04-24 22:19:44 +02:00
parent c0e49bbab5
commit d95fb82067
+97 -22
View File
@@ -15,6 +15,7 @@ use crate::blockchain::hash_utils::*;
use crate::keys::check_public_key_strength;
use crate::event::Event;
use blakeout::Blakeout;
use std::thread::sleep;
#[derive(Clone)]
pub struct MineJob {
@@ -23,6 +24,20 @@ pub struct MineJob {
keystore: Keystore
}
impl MineJob {
fn is_full(&self) -> bool {
self.block.transaction.is_some()
}
fn is_signing(&self) -> bool {
self.block.transaction.is_none()
}
fn is_due(&self) -> bool {
self.start == 0 || self.start < Utc::now().timestamp()
}
}
#[derive(Clone, Debug)]
pub struct MinerState {
pub mining: bool,
@@ -67,34 +82,101 @@ impl Miner {
pub fn start_mining_thread(&mut self) {
let context = Arc::clone(&self.context);
let blocks = self.jobs.clone();
let jobs = self.jobs.clone();
let running = self.running.clone();
let mining = self.mining.clone();
let cond_var = self.cond_var.clone();
thread::spawn(move || {
running.store(true, Ordering::SeqCst);
let delay = Duration::from_secs(30);
while running.load(Ordering::SeqCst) {
// If some transaction is being mined now, we yield
if mining.load(Ordering::SeqCst) {
thread::sleep(delay);
Miner::run_main_loop(&context, jobs, running, mining, cond_var);
});
// Add events listener to a [Bus]
let running = self.running.clone();
let mining = self.mining.clone();
self.context.lock().unwrap().bus.register(move |_uuid, e| {
match e {
Event::ActionQuit => { running.store(false, Ordering::Relaxed); }
Event::NewBlockReceived => {}
Event::BlockchainChanged {..} => {}
Event::ActionStopMining => {
mining.store(false, Ordering::SeqCst);
}
_ => {}
}
true
});
}
fn run_main_loop(context: &Arc<Mutex<Context>>, jobs: Arc<Mutex<Vec<MineJob>>>, running: Arc<AtomicBool>, mining: Arc<AtomicBool>, cond_var: Arc<Condvar>) {
running.store(true, Ordering::SeqCst);
let delay = Duration::from_secs(30);
let mut current_job: Option<MineJob> = None;
while running.load(Ordering::SeqCst) {
if let Some(ref cur_job) = current_job {
// If we are mining signing block
if mining.load(Ordering::Relaxed) && cur_job.is_signing() {
sleep(delay);
continue;
}
let mut jobs = blocks.lock().unwrap();
// If we are mining something ours
if mining.load(Ordering::Relaxed) && cur_job.is_full() {
let mut signing_waits = false;
let mut jobs = jobs.lock().unwrap();
if jobs.len() > 0 {
debug!("Got new job to mine");
let job = jobs.remove(0);
// If we have some signing job
if job.is_signing() && job.is_due() {
info!("Replacing current mining job with signing job!");
// We cancel current job, waiting for threads to finish
mining.store(false, Ordering::SeqCst);
thread::sleep(Duration::from_millis(100));
// Return current job to queue
jobs.insert(0, current_job.take().unwrap());
mining.store(true, Ordering::SeqCst);
current_job = Some(job.clone());
Miner::mine_internal(Arc::clone(&context), job, mining.clone());
continue;
} else {
debug!("This job will wait for now");
signing_waits = job.is_signing();
jobs.insert(0, job);
}
}
if !signing_waits {
if let Ok(context) = context.try_lock() {
let keystore = context.get_keystore();
// Ask the blockchain if we have to sign something
if let Some(block) = context.chain.get_sign_block(&keystore) {
info!("Got signing job, adding to queue");
// We start mining sign block after some time, not everyone in the same time
let start = Utc::now().timestamp() + (rand::random::<i64>() % BLOCK_SIGNERS_START_RANDOM);
jobs.push(MineJob { start, block, keystore: keystore.unwrap() });
}
}
}
}
} else {
let mut jobs = jobs.lock().unwrap();
if jobs.len() > 0 {
debug!("Got new job to mine");
let job = jobs.remove(0);
if job.start == 0 || job.start < Utc::now().timestamp() {
if job.is_due() {
mining.store(true, Ordering::SeqCst);
current_job = Some(job.clone());
Miner::mine_internal(Arc::clone(&context), job, mining.clone());
} else {
debug!("This job will wait for now");
jobs.insert(0, job);
}
} else if !mining.load(Ordering::SeqCst) {
} else {
// If our queue is empty
if let Ok(context) = context.try_lock() {
let keystore = context.get_keystore();
// Ask the blockchain if we have to sign something
if let Some(block) = context.chain.get_sign_block(&keystore) {
info!("Got signing job, adding to queue");
// We start mining sign block after some time, not everyone in the same time
@@ -105,19 +187,12 @@ impl Miner {
}
let _ = cond_var.wait_timeout(jobs, delay).expect("Error in wait lock!");
}
});
let mining = self.mining.clone();
self.context.lock().unwrap().bus.register(move |_uuid, e| {
match e {
Event::NewBlockReceived => {}
Event::BlockchainChanged {..} => {}
Event::ActionStopMining => {
mining.store(false, Ordering::SeqCst);
}
_ => {}
if !mining.load(Ordering::Relaxed) {
current_job = None;
}
true
});
}
info!("Stopped mining queue thread");
}
pub fn is_mining(&self) -> bool {