diff --git a/Cargo.toml b/Cargo.toml index d83b729..a063b6c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ sqlite = "0.26.0" uuid = { version = "0.8.2", features = ["serde", "v4"] } mio = { version = "0.7", features = ["os-poll", "net"] } derive_more = "0.99" # for DNS from hermes +lazy_static = "1.4.0" # Optional dependencies regulated by features web-view = { version = "0.7", features = [], optional = true } diff --git a/src/commons/eventbus.rs b/src/commons/eventbus.rs new file mode 100644 index 0000000..5eeb60f --- /dev/null +++ b/src/commons/eventbus.rs @@ -0,0 +1,21 @@ +use crate::event::Event; +use crate::simplebus::Bus; +use std::sync::Mutex; +use lazy_static::lazy_static; +use uuid::Uuid; + +lazy_static! { + static ref STATIC_BUS: Mutex> = Mutex::new(Bus::new()); +} + +pub fn register(closure: F) -> Uuid where F: FnMut(&Uuid, Event) -> bool + Send + Sync + 'static { + STATIC_BUS.lock().unwrap().register(Box::new(closure)) +} + +pub fn unregister(uuid: &Uuid) { + STATIC_BUS.lock().unwrap().unregister(uuid); +} + +pub fn post(event: Event) { + STATIC_BUS.lock().unwrap().post(event); +} \ No newline at end of file diff --git a/src/commons/mod.rs b/src/commons/mod.rs index 2b11ace..a463db1 100644 --- a/src/commons/mod.rs +++ b/src/commons/mod.rs @@ -11,6 +11,8 @@ pub use constants::*; use crate::dns::protocol::DnsRecord; pub mod constants; +pub mod simplebus; +pub mod eventbus; /// Convert bytes array to HEX format pub fn to_hex(buf: &[u8]) -> String { @@ -161,6 +163,7 @@ pub fn setup_miner_thread(cpu: u32) { #[cfg(test)] mod test { use std::net::IpAddr; + use crate::{check_domain, is_yggdrasil}; #[test] diff --git a/src/simplebus.rs b/src/commons/simplebus.rs similarity index 100% rename from src/simplebus.rs rename to src/commons/simplebus.rs diff --git a/src/context.rs b/src/context.rs index 1fe4e99..b1d630a 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,5 +1,4 @@ -use crate::{Chain, Bus, Keystore, Settings}; -use crate::event::Event; +use crate::{Chain, Keystore, Settings}; #[allow(unused_imports)] use log::{trace, debug, info, warn, error}; use crate::miner::MinerState; @@ -9,7 +8,6 @@ pub struct Context { pub settings: Settings, pub keystore: Option, pub chain: Chain, - pub bus: Bus, pub miner_state: MinerState, } @@ -21,7 +19,6 @@ impl Context { settings, keystore, chain, - bus: Bus::new(), miner_state: MinerState { mining: false, full: false } } } diff --git a/src/keystore.rs b/src/keystore.rs index b70edc6..f76ab61 100644 --- a/src/keystore.rs +++ b/src/keystore.rs @@ -20,6 +20,7 @@ use crate::blockchain::hash_utils::*; use crate::{Context, setup_miner_thread, to_hex, from_hex}; use crate::event::Event; use crate::commons::KEYSTORE_DIFFICULTY; +use crate::eventbus::{register, post}; use crate::bytes::Bytes; use crate::crypto::CryptoBox; use blakeout::Blakeout; @@ -220,7 +221,7 @@ pub fn check_public_key_strength(key: &Bytes, strength: u32) -> bool { pub fn create_key(context: Arc>) { let mining = Arc::new(AtomicBool::new(true)); let miners_count = Arc::new(AtomicUsize::new(0)); - context.lock().unwrap().bus.post(Event::KeyGeneratorStarted); + post(Event::KeyGeneratorStarted); let lower = context.lock().unwrap().settings.mining.lower; let threads = context.lock().unwrap().settings.mining.threads; let threads = match threads { @@ -247,16 +248,16 @@ pub fn create_key(context: Arc>) { let public = keystore.get_public().to_string(); info!("Key mined successfully! Public key: {}, hash: {}", &public, &hash); context.set_keystore(Some(keystore)); - context.bus.post(Event::KeyCreated { path, public, hash }); + post(Event::KeyCreated { path, public, hash }); } } let miners = miners_count.fetch_sub(1, atomic::Ordering::SeqCst) - 1; if miners == 0 { - context.lock().unwrap().bus.post(Event::KeyGeneratorStopped); + post(Event::KeyGeneratorStopped); } }); } - context.lock().unwrap().bus.register(move |_uuid, e| { + register(move |_uuid, e| { if e == Event::ActionStopMining { info!("Stopping keystore miner"); mining.store(false, atomic::Ordering::SeqCst); diff --git a/src/lib.rs b/src/lib.rs index d8c9701..a035bdb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,19 +1,18 @@ pub use blockchain::block::Block; pub use blockchain::transaction::Transaction; +pub use commons::simplebus::*; pub use crate::blockchain::Chain; +pub use crate::bytes::Bytes; +pub use crate::commons::*; pub use crate::context::Context; +pub use crate::keystore::Keystore; pub use crate::miner::Miner; pub use crate::p2p::Network; pub use crate::settings::Settings; -pub use crate::bytes::Bytes; -pub use crate::keystore::Keystore; -pub use crate::simplebus::*; -pub use crate::commons::*; pub mod blockchain; pub mod commons; -pub mod simplebus; pub mod keystore; pub mod miner; pub mod context; diff --git a/src/main.rs b/src/main.rs index 45590cf..79007e6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,6 +15,7 @@ use log::{debug, error, info, trace, warn, LevelFilter}; use simplelog::*; #[cfg(windows)] use winapi::um::wincon::{ATTACH_PARENT_PROCESS, AttachConsole, FreeConsole}; +extern crate lazy_static; use alfis::{Block, Bytes, Chain, Miner, Context, Network, Settings, dns_utils, Keystore, ORIGIN_DIFFICULTY, ALFIS_DEBUG, DB_NAME, Transaction}; use alfis::event::Event; @@ -23,6 +24,7 @@ use std::process::exit; use std::io::{Seek, SeekFrom}; use std::sync::atomic::{AtomicBool, Ordering}; use alfis::keystore::create_key; +use alfis::eventbus::register; #[cfg(feature = "webgui")] mod web_ui; @@ -128,25 +130,23 @@ fn main() { let mining = Arc::new(AtomicBool::new(true)); let mining_copy = Arc::clone(&mining); let context_copy = Arc::clone(&context); - if let Ok(mut context) = context.lock() { - // Register key-mined event listener - context.bus.register(move |_uuid, e| { - if matches!(e, Event::KeyCreated {..}) { - let context_copy = Arc::clone(&context_copy); - let mining_copy = Arc::clone(&mining_copy); - let filename = filename.clone(); - thread::spawn(move || { - if let Some(mut keystore) = context_copy.lock().unwrap().get_keystore() { - keystore.save(&filename, ""); - mining_copy.store(false, Ordering::Relaxed); - } - }); - false - } else { - true - } - }); - } + // Register key-mined event listener + register(move |_uuid, e| { + if matches!(e, Event::KeyCreated {..}) { + let context_copy = Arc::clone(&context_copy); + let mining_copy = Arc::clone(&mining_copy); + let filename = filename.clone(); + thread::spawn(move || { + if let Some(mut keystore) = context_copy.lock().unwrap().get_keystore() { + keystore.save(&filename, ""); + mining_copy.store(false, Ordering::Relaxed); + } + }); + false + } else { + true + } + }); // Start key mining create_key(context); diff --git a/src/miner.rs b/src/miner.rs index b20d3be..797687c 100644 --- a/src/miner.rs +++ b/src/miner.rs @@ -16,6 +16,7 @@ use crate::keystore::check_public_key_strength; use crate::event::Event; use blakeout::Blakeout; use std::thread::sleep; +use crate::eventbus::{register, post}; #[derive(Clone)] pub struct MineJob { @@ -93,7 +94,7 @@ impl Miner { // Add events listener to a [Bus] let running = self.running.clone(); let mining = self.mining.clone(); - self.context.lock().unwrap().bus.register(move |_uuid, e| { + register(move |_uuid, e| { match e { Event::ActionQuit => { running.store(false, Ordering::Relaxed); } Event::NewBlockReceived => {} @@ -211,14 +212,14 @@ impl Miner { job.block.pub_key = job.keystore.get_public(); if !check_public_key_strength(&job.block.pub_key, KEYSTORE_DIFFICULTY) { warn!("Can not mine block with weak public key!"); - context.lock().unwrap().bus.post(Event::MinerStopped { success: false, full: false }); + post(Event::MinerStopped { success: false, full: false }); mining.store(false, Ordering::SeqCst); return; } match context.lock().unwrap().chain.update_sign_block_for_mining(job.block) { None => { warn!("We missed block to lock"); - context.lock().unwrap().bus.post(Event::MinerStopped { success: false, full: false }); + post(Event::MinerStopped { success: false, full: false }); mining.store(false, Ordering::SeqCst); return; } @@ -235,8 +236,8 @@ impl Miner { } let (lower, threads) = { + post(Event::MinerStarted); let mut context = context.lock().unwrap(); - context.bus.post(Event::MinerStarted); context.miner_state.mining = true; context.miner_state.full = job.block.transaction.is_some(); (context.settings.mining.lower, context.settings.mining.threads) @@ -266,9 +267,10 @@ impl Miner { let count = live_threads.fetch_sub(1, Ordering::SeqCst); // If this is the last thread, but mining was not stopped by another thread if count == 1 { - let mut context = context.lock().unwrap(); - context.miner_state.mining = false; - context.bus.post(Event::MinerStopped { success: false, full }); + if let Ok(mut context) = context.lock() { + context.miner_state.mining = false; + } + post(Event::MinerStopped { success: false, full }); } }, Some(mut block) => { @@ -290,7 +292,7 @@ impl Miner { success = true; } context.miner_state.mining = false; - context.bus.post(Event::MinerStopped { success, full }); + post(Event::MinerStopped { success, full }); mining.store(false, Ordering::SeqCst); }, } @@ -352,9 +354,7 @@ fn find_hash(context: Arc>, mut block: Block, running: Arc 10000 { let speed = (nonce - prev_nonce) / (elapsed as u64 / 1000); //debug!("Mining speed {} H/s, max difficulty {}", speed, max_diff); - if let Ok(mut context) = context.try_lock() { - context.bus.post(Event::MinerStats { thread, speed, max_diff, target_diff }) - } + post(Event::MinerStats { thread, speed, max_diff, target_diff }); time = Instant::now(); prev_nonce = nonce; } diff --git a/src/p2p/network.rs b/src/p2p/network.rs index 5406aef..42f8125 100644 --- a/src/p2p/network.rs +++ b/src/p2p/network.rs @@ -20,6 +20,7 @@ use rand::random; use crate::{Block, Context, p2p::Message, p2p::Peer, p2p::Peers, p2p::State}; use crate::blockchain::types::BlockQuality; use crate::commons::*; +use crate::eventbus::{register, post}; const SERVER: Token = Token(0); @@ -39,7 +40,7 @@ impl Network { }; let running = Arc::new(AtomicBool::new(true)); - subscribe_to_bus(&mut self.context, Arc::clone(&running)); + subscribe_to_bus(Arc::clone(&running)); // Starting server socket let addr = listen_addr.parse().expect("Error parsing listen address"); @@ -128,9 +129,9 @@ impl Network { token => { if !handle_connection_event(Arc::clone(&context), &mut peers, &poll.registry(), &event) { let _ = peers.close_peer(poll.registry(), &token); - let mut context = context.lock().unwrap(); + let context = context.lock().unwrap(); let blocks_count = context.chain.get_height(); - context.bus.post(crate::event::Event::NetworkStatus { nodes: peers.get_peers_active_count(), blocks: blocks_count }); + post(crate::event::Event::NetworkStatus { nodes: peers.get_peers_active_count(), blocks: blocks_count }); } } } @@ -150,11 +151,11 @@ impl Network { if ui_timer.elapsed().as_millis() > UI_REFRESH_DELAY_MS { // Send pings to idle peers let (height, hash) = { - let mut context = context.lock().unwrap(); + let context = context.lock().unwrap(); let height = context.chain.get_height(); let nodes = peers.get_peers_active_count(); let banned = peers.get_peers_banned_count(); - context.bus.post(crate::event::Event::NetworkStatus { nodes, blocks: height }); + post(crate::event::Event::NetworkStatus { nodes, blocks: height }); if log_timer.elapsed().as_secs() > LOG_REFRESH_DELAY_SEC { info!("Active nodes count: {}, banned count: {}, blocks count: {}", nodes, banned, height); @@ -184,9 +185,9 @@ impl Network { } } -fn subscribe_to_bus(context: &mut Arc>, running: Arc) { +fn subscribe_to_bus(running: Arc) { use crate::event::Event; - context.lock().unwrap().bus.register(move |_uuid, e| { + register(move |_uuid, e| { match e { Event::ActionQuit => { running.store(false, Ordering::SeqCst); @@ -433,7 +434,7 @@ fn handle_message(context: Arc>, message: Message, peers: &mut Pe if peer.is_higher(my_height) { context.chain.update_max_height(height); let event = crate::event::Event::Syncing { have: my_height, height: max(height, my_height) }; - context.bus.post(event); + post(event); } if nodes < MAX_NODES && random::() { debug!("Requesting more peers from {}", peer.get_addr().ip()); @@ -539,15 +540,15 @@ fn handle_block(context: Arc>, peers: &mut Peers, token: &Token, BlockQuality::Good => { context.chain.add_block(block); let my_height = context.chain.get_height(); - context.bus.post(crate::event::Event::BlockchainChanged { index: my_height }); + post(crate::event::Event::BlockchainChanged { index: my_height }); // If it was the last block to sync if my_height == max_height { - context.bus.post(crate::event::Event::SyncFinished); + post(crate::event::Event::SyncFinished); } else { let event = crate::event::Event::Syncing { have: my_height, height: max(max_height, my_height) }; - context.bus.post(event); + post(event); } - context.bus.post(crate::event::Event::NetworkStatus { nodes: peers_count, blocks: my_height }); + post(crate::event::Event::NetworkStatus { nodes: peers_count, blocks: my_height }); } BlockQuality::Twin => { debug!("Ignoring duplicate block {}", block.index); } BlockQuality::Future => { debug!("Ignoring future block {}", block.index); } @@ -556,7 +557,7 @@ fn handle_block(context: Arc>, peers: &mut Peers, token: &Token, debug!("Ignoring bad block from {}:\n{:?}", peer.get_addr(), &block); let height = context.chain.get_height(); context.chain.update_max_height(height); - context.bus.post(crate::event::Event::SyncFinished); + post(crate::event::Event::SyncFinished); return State::Banned; } BlockQuality::Rewind => { @@ -571,7 +572,7 @@ fn handle_block(context: Arc>, peers: &mut Peers, token: &Token, if block.is_better_than(&last_block) || lagged { context.chain.replace_block(block).expect("Error replacing block with fork"); let index = context.chain.get_height(); - context.bus.post(crate::event::Event::BlockchainChanged { index }); + post(crate::event::Event::BlockchainChanged { index }); } else { debug!("Fork in not better than our block, dropping."); } diff --git a/src/web_ui.rs b/src/web_ui.rs index f70567b..7739b40 100644 --- a/src/web_ui.rs +++ b/src/web_ui.rs @@ -26,6 +26,7 @@ use Cmd::*; use self::web_view::{Handle, WebView}; use alfis::crypto::CryptoBox; +use alfis::eventbus::{register, post}; pub fn run_interface(context: Arc>, miner: Arc>) { let file_content = include_str!("webview/index.html"); @@ -57,7 +58,7 @@ pub fn run_interface(context: Arc>, miner: Arc>) { action_create_domain(Arc::clone(&context), Arc::clone(&miner), web_view, name, data, signing, encryption); } TransferDomain { .. } => {} - StopMining => { context.lock().unwrap().bus.post(Event::ActionStopMining); } + StopMining => { post(Event::ActionStopMining); } Open { link } => { if open::that(&link).is_err() { show_warning(web_view, "Something wrong, I can't open the link 😢"); @@ -69,12 +70,11 @@ pub fn run_interface(context: Arc>, miner: Arc>) { .build() .expect("Error building GUI"); - let mut context = Arc::clone(&context); - run_interface_loop(&mut context, &mut interface); + run_interface_loop(&mut interface); } /// Indefinitely loops through WebView steps -fn run_interface_loop(context: &mut Arc>, interface: &mut WebView<()>) { +fn run_interface_loop(interface: &mut WebView<()>) { // We use this ugly loop to lower CPU usage a lot. // If we use .run() or only .step() in a loop without sleeps it will try // to support 60FPS and uses more CPU than it should. @@ -84,7 +84,7 @@ fn run_interface_loop(context: &mut Arc>, interface: &mut WebView match interface.step() { None => { info!("Interface closed, exiting"); - context.lock().unwrap().bus.post(Event::ActionQuit); + post(Event::ActionQuit); thread::sleep(Duration::from_millis(100)); break; } @@ -140,14 +140,13 @@ fn action_save_key(context: &Arc>) { if !new_path.ends_with(".toml") { new_path.push_str(".toml"); } - let mut context = context.lock().unwrap(); let path = new_path.clone(); - if let Some(mut keystore) = context.get_keystore() { + if let Some(mut keystore) = context.lock().unwrap().get_keystore() { let public = keystore.get_public().to_string(); let hash = keystore.get_hash().to_string(); keystore.save(&new_path, ""); info!("Key file saved to {}", &path); - context.bus.post(Event::KeySaved { path, public, hash }); + post(Event::KeySaved { path, public, hash }); } } } @@ -166,12 +165,11 @@ fn action_load_key(context: &Arc>, web_view: &mut WebView<()>) { } Some(keystore) => { info!("Loaded keystore with keys: {:?}, {:?}", &keystore.get_public(), &keystore.get_encryption_public()); - let mut c = context.lock().unwrap(); let path = keystore.get_path().to_owned(); let public = keystore.get_public().to_string(); let hash = keystore.get_hash().to_string(); - c.bus.post(Event::KeyLoaded { path, public, hash }); - c.set_keystore(Some(keystore)); + post(Event::KeyLoaded { path, public, hash }); + context.lock().unwrap().set_keystore(Some(keystore)); } } } @@ -189,9 +187,9 @@ fn action_loaded(context: &Arc>, web_view: &mut WebView<()>) { }; let status = Arc::new(Mutex::new(Status::new(threads))); let context_copy = Arc::clone(&context); - let mut c = context.lock().unwrap(); + let c = context.lock().unwrap(); - c.bus.register(move |_uuid, e| { + register(move |_uuid, e| { //debug!("Got event from bus {:?}", &e); let status = Arc::clone(&status); let handle = handle.clone(); @@ -311,11 +309,11 @@ fn action_loaded(context: &Arc>, web_view: &mut WebView<()>) { let path = keystore.get_path().to_owned(); let public = keystore.get_public().to_string(); let hash = keystore.get_hash().to_string(); - c.bus.post(Event::KeyLoaded { path, public, hash }); + post(Event::KeyLoaded { path, public, hash }); } let index = c.chain.get_height(); if index > 0 { - c.bus.post(Event::BlockchainChanged { index }); + post(Event::BlockchainChanged { index }); } let zones = c.chain.get_zones(); info!("Loaded zones: {:?}", &zones);