diff --git a/src/miner.rs b/src/miner.rs index f3945c4..18e9331 100644 --- a/src/miner.rs +++ b/src/miner.rs @@ -15,6 +15,7 @@ use crate::blockchain::hash_utils::*; use crate::keys::check_public_key_strength; use crate::event::Event; use blakeout::Blakeout; +use std::thread::sleep; #[derive(Clone)] pub struct MineJob { @@ -23,6 +24,20 @@ pub struct MineJob { keystore: Keystore } +impl MineJob { + fn is_full(&self) -> bool { + self.block.transaction.is_some() + } + + fn is_signing(&self) -> bool { + self.block.transaction.is_none() + } + + fn is_due(&self) -> bool { + self.start == 0 || self.start < Utc::now().timestamp() + } +} + #[derive(Clone, Debug)] pub struct MinerState { pub mining: bool, @@ -67,34 +82,101 @@ impl Miner { pub fn start_mining_thread(&mut self) { let context = Arc::clone(&self.context); - let blocks = self.jobs.clone(); + let jobs = self.jobs.clone(); let running = self.running.clone(); let mining = self.mining.clone(); let cond_var = self.cond_var.clone(); thread::spawn(move || { - running.store(true, Ordering::SeqCst); - let delay = Duration::from_secs(30); - while running.load(Ordering::SeqCst) { - // If some transaction is being mined now, we yield - if mining.load(Ordering::SeqCst) { - thread::sleep(delay); + Miner::run_main_loop(&context, jobs, running, mining, cond_var); + }); + + // Add events listener to a [Bus] + let running = self.running.clone(); + let mining = self.mining.clone(); + self.context.lock().unwrap().bus.register(move |_uuid, e| { + match e { + Event::ActionQuit => { running.store(false, Ordering::Relaxed); } + Event::NewBlockReceived => {} + Event::BlockchainChanged {..} => {} + Event::ActionStopMining => { + mining.store(false, Ordering::SeqCst); + } + _ => {} + } + true + }); + } + + fn run_main_loop(context: &Arc>, jobs: Arc>>, running: Arc, mining: Arc, cond_var: Arc) { + running.store(true, Ordering::SeqCst); + let delay = Duration::from_secs(30); + let mut current_job: Option = None; + while running.load(Ordering::SeqCst) { + if let Some(ref cur_job) = current_job { + // If we are mining signing block + if mining.load(Ordering::Relaxed) && cur_job.is_signing() { + sleep(delay); continue; } - let mut jobs = blocks.lock().unwrap(); + // If we are mining something ours + if mining.load(Ordering::Relaxed) && cur_job.is_full() { + let mut signing_waits = false; + let mut jobs = jobs.lock().unwrap(); + if jobs.len() > 0 { + debug!("Got new job to mine"); + let job = jobs.remove(0); + // If we have some signing job + if job.is_signing() && job.is_due() { + info!("Replacing current mining job with signing job!"); + // We cancel current job, waiting for threads to finish + mining.store(false, Ordering::SeqCst); + thread::sleep(Duration::from_millis(100)); + // Return current job to queue + jobs.insert(0, current_job.take().unwrap()); + + mining.store(true, Ordering::SeqCst); + current_job = Some(job.clone()); + Miner::mine_internal(Arc::clone(&context), job, mining.clone()); + continue; + } else { + debug!("This job will wait for now"); + signing_waits = job.is_signing(); + jobs.insert(0, job); + } + } + + if !signing_waits { + if let Ok(context) = context.try_lock() { + let keystore = context.get_keystore(); + // Ask the blockchain if we have to sign something + if let Some(block) = context.chain.get_sign_block(&keystore) { + info!("Got signing job, adding to queue"); + // We start mining sign block after some time, not everyone in the same time + let start = Utc::now().timestamp() + (rand::random::() % BLOCK_SIGNERS_START_RANDOM); + jobs.push(MineJob { start, block, keystore: keystore.unwrap() }); + } + } + } + } + } else { + let mut jobs = jobs.lock().unwrap(); if jobs.len() > 0 { debug!("Got new job to mine"); let job = jobs.remove(0); - if job.start == 0 || job.start < Utc::now().timestamp() { + if job.is_due() { mining.store(true, Ordering::SeqCst); + current_job = Some(job.clone()); Miner::mine_internal(Arc::clone(&context), job, mining.clone()); } else { debug!("This job will wait for now"); jobs.insert(0, job); } - } else if !mining.load(Ordering::SeqCst) { + } else { + // If our queue is empty if let Ok(context) = context.try_lock() { let keystore = context.get_keystore(); + // Ask the blockchain if we have to sign something if let Some(block) = context.chain.get_sign_block(&keystore) { info!("Got signing job, adding to queue"); // We start mining sign block after some time, not everyone in the same time @@ -105,19 +187,12 @@ impl Miner { } let _ = cond_var.wait_timeout(jobs, delay).expect("Error in wait lock!"); } - }); - let mining = self.mining.clone(); - self.context.lock().unwrap().bus.register(move |_uuid, e| { - match e { - Event::NewBlockReceived => {} - Event::BlockchainChanged {..} => {} - Event::ActionStopMining => { - mining.store(false, Ordering::SeqCst); - } - _ => {} + + if !mining.load(Ordering::Relaxed) { + current_job = None; } - true - }); + } + info!("Stopped mining queue thread"); } pub fn is_mining(&self) -> bool {