Compare commits

...

10 Commits

Author SHA1 Message Date
Sweetbread 071567fa73 Fix NixOS GUI issues
Fixes #380, #414
2026-04-13 09:27:21 +03:00
Revertron 489f16e462 Added forgotten file, fixed CI. 2026-04-06 02:40:45 +02:00
Revertron f4c8b1fe42 Updated the CI for releases. 2026-04-06 02:28:56 +02:00
Revertron b01ade19b1 Fixed new sync. 2026-04-06 02:18:23 +02:00
Revertron 9bea173f21 Initial sync made a lot faster. 2026-04-05 12:13:30 +02:00
Revertron ef573436d9 Updated dependencies. 2026-04-04 16:24:21 +02:00
Revertron d86fb6916f Disabled 0x20 encoding for NS queries. 2026-04-04 16:19:58 +02:00
Revertron 5044064f6c Fixed 0x20 encoding in cache. 2026-04-04 15:49:16 +02:00
Revertron c90458eaaf Fixed #411 (very big DNS result over DoH). 2026-04-04 15:37:25 +02:00
Revertron cfb3cf6cf8 Added parsing of bare IPs for forwarders in config. 2026-03-29 18:55:59 +02:00
18 changed files with 1323 additions and 685 deletions
+70 -71
View File
@@ -17,7 +17,7 @@ jobs:
project_version: ${{ env.VERSION }} project_version: ${{ env.VERSION }}
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v2 uses: actions/checkout@v4
- name: Getting version - name: Getting version
run: echo "VERSION=${GITHUB_REF#refs/*/}" >> $GITHUB_ENV run: echo "VERSION=${GITHUB_REF#refs/*/}" >> $GITHUB_ENV
@@ -25,57 +25,50 @@ jobs:
name: Create Release name: Create Release
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: get_version needs: get_version
outputs:
upload_url: ${{ steps.create_release.outputs.upload_url }}
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v2 uses: actions/checkout@v4
- name: Create Release - name: Create Release
id: create_release
uses: actions/create-release@v1
env: env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
VERSION: ${{ needs.get_version.outputs.project_version }} VERSION: ${{ needs.get_version.outputs.project_version }}
with: run: |
tag_name: ${{ env.VERSION }} gh release create "$VERSION" \
release_name: ${{ env.VERSION }} --title "$VERSION" \
body: | --draft \
## New --notes "## New
* Added new features. * Added new features.
## Bug Fixes & Improvements ## Bug Fixes & Improvements
* Various fixes and stability improvements. * Various fixes and stability improvements.
## Documentation & others ## Documentation & others
* Updated documentation. * Updated documentation."
draft: true
prerelease: false
linux-nogui: linux-nogui:
name: Create and upload builds name: Build Linux nogui (${{ matrix.arch }})
needs: [ create_release, get_version ] needs: [create_release, get_version]
runs-on: ubuntu-latest runs-on: ubuntu-latest
strategy: strategy:
matrix: matrix:
arch: [ amd64, i686, armhf, armel, arm64 ] arch: [amd64, i686, armhf, armel, arm64]
defaults: defaults:
run: run:
shell: bash shell: bash
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v2 uses: actions/checkout@v4
- name: install dependencies - name: Install Rust toolchain
uses: dtolnay/rust-toolchain@stable
- name: Install dependencies
run: | run: |
sudo apt update sudo apt-get update
sudo apt upgrade sudo apt-get install -y upx-ucl
sudo apt install libwebkit2gtk-4.1-dev libxdo-dev libsoup-3.0-dev upx-ucl cargo install cross --git https://github.com/cross-rs/cross
cargo install cross
- name: Build and package deb packages - name: Build and package deb packages
run: PKGARCH=${{ matrix.arch }} contrib/deb/generate.sh run: PKGARCH=${{ matrix.arch }} contrib/deb/generate.sh
- name: Upload bins & debs - name: Upload bins & debs
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: | run: |
tag_name="${{ needs.get_version.outputs.project_version }}" tag_name="${{ needs.get_version.outputs.project_version }}"
ls -lh ./bin/ ls -lh ./bin/
@@ -83,62 +76,68 @@ jobs:
gh release upload "$tag_name" *.deb --clobber gh release upload "$tag_name" *.deb --clobber
build-and-upload-gui-zips: build-and-upload-gui-zips:
name: Create and upload builds name: Build GUI (${{ matrix.name }})
needs: [ create_release, get_version ] needs: [create_release, get_version]
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
strategy: strategy:
matrix: matrix:
os: [ windows-latest, ubuntu-latest, macOS-latest ] include:
- os: ubuntu-latest
name: linux-amd64
bin_path: ./target/release/alfis
archive_cmd: zip
- os: windows-latest
name: windows-amd64
bin_path: target/release/alfis.exe
archive_cmd: 7z
- os: macos-latest
name: darwin-arm64
bin_path: ./target/release/alfis
archive_cmd: zip
- os: macos-latest
name: darwin-amd64
target: x86_64-apple-darwin
bin_path: ./target/x86_64-apple-darwin/release/alfis
archive_cmd: zip
defaults: defaults:
run: run:
shell: bash shell: bash
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v2 uses: actions/checkout@v4
- name: install dependencies - name: Install Rust toolchain
uses: dtolnay/rust-toolchain@stable
with:
targets: ${{ matrix.target || '' }}
- name: Install Linux dependencies
if: contains(matrix.os, 'ubuntu') if: contains(matrix.os, 'ubuntu')
run: sudo apt update && sudo apt install --no-install-recommends libwebkit2gtk-4.1-dev libxdo-dev libsoup-3.0-dev upx-ucl run: |
sudo apt-get update
sudo apt-get install -y --no-install-recommends \
libwebkit2gtk-4.1-dev \
libgtk-3-dev \
libxdo-dev \
libsoup-3.0-dev \
libayatana-appindicator3-dev
- name: Build release binaries - name: Build release binaries
run: cargo build --release run: cargo build --release ${{ matrix.target && format('--target {0}', matrix.target) || '' }}
- name: windows - name: Package zip
if: contains(matrix.os, 'windows') env:
run: echo "BIN_ARCH=windows-amd64" >> $GITHUB_ENV ZIP_NAME: alfis-${{ matrix.name }}-${{ needs.get_version.outputs.project_version }}.zip
- name: linux
if: contains(matrix.os, 'ubuntu')
run: echo "BIN_ARCH=linux-amd64" >> $GITHUB_ENV
- name: macos
if: contains(matrix.os, 'mac')
run: echo "BIN_ARCH=darwin-amd64" >> $GITHUB_ENV
- name: Fill variables
run: | run: |
echo "BIN_PATH=./target/release/alfis" >> $GITHUB_ENV if [ "${{ matrix.archive_cmd }}" = "7z" ]; then
echo "ZIP_NAME=alfis-${{env.BIN_ARCH}}-${{ needs.get_version.outputs.project_version }}.zip" >> $GITHUB_ENV 7z a "$ZIP_NAME" "${{ matrix.bin_path }}" alfis.toml README.md LICENSE adblock.txt
- name: Windows variables else
if: contains(matrix.os, 'windows') zip "$ZIP_NAME" "${{ matrix.bin_path }}" alfis.toml README.md LICENSE adblock.txt
run: | fi
echo "BIN_PATH=target/release/alfis.exe" >> $GITHUB_ENV
echo "ZIP_NAME=alfis-${{env.BIN_ARCH}}-${{ needs.get_version.outputs.project_version }}.zip" >> $GITHUB_ENV
- name: Packaging
uses: papeloto/action-zip@v1
with:
files: ${{ env.BIN_PATH }} alfis.toml README.md LICENSE adblock.txt
dest: ${{ env.ZIP_NAME }}
- name: Upload zip - name: Upload zip
id: upload-zip
uses: actions/upload-release-asset@v1
env: env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} TAG: ${{ needs.get_version.outputs.project_version }}
with: ZIP_NAME: alfis-${{ matrix.name }}-${{ needs.get_version.outputs.project_version }}.zip
upload_url: ${{ needs.create_release.outputs.upload_url }} run: gh release upload "$TAG" "$ZIP_NAME" --clobber
asset_path: ${{ env.ZIP_NAME }}
asset_name: ${{ env.ZIP_NAME }}
asset_content_type: application/zip
Generated
+460 -438
View File
File diff suppressed because it is too large Load Diff
+5 -5
View File
@@ -15,7 +15,7 @@ getopts = "0.2.24"
log = "0.4.28" log = "0.4.28"
simplelog = "0.12.2" simplelog = "0.12.2"
toml = "1.0.7" toml = "1.0.7"
sha2 = "0.10.9" sha2 = "0.11.0"
ed25519-dalek = "2.2.0" ed25519-dalek = "2.2.0"
x25519-dalek = { version = "2.0.1", features = ["reusable_secrets"] } x25519-dalek = { version = "2.0.1", features = ["reusable_secrets"] }
ecies-ed25519-ng = { git = "https://github.com/Revertron/ecies-ed25519-ng", rev = "554ca29", version = "0.5.3" } ecies-ed25519-ng = { git = "https://github.com/Revertron/ecies-ed25519-ng", rev = "554ca29", version = "0.5.3" }
@@ -43,9 +43,9 @@ thread-priority = "3.0.0"
crossbeam-channel = "0.5.13" crossbeam-channel = "0.5.13"
# Optional dependencies regulated by features # Optional dependencies regulated by features
wry = { version = "0.53", optional = true } wry = { version = "0.55.0", optional = true }
tao = { version = "0.34", optional = true } tao = { version = "0.35.0", optional = true }
tray-icon = { version = "0.21.2", optional = true } tray-icon = { version = "0.22.0", optional = true }
tinyfiledialogs = { version = "3.9.1", optional = true } tinyfiledialogs = { version = "3.9.1", optional = true }
open = { version = "5.3.0", optional = true } open = { version = "5.3.0", optional = true }
@@ -65,7 +65,7 @@ lto = true
strip = true # Automatically strip symbols from the binary. strip = true # Automatically strip symbols from the binary.
[profile.dev] [profile.dev]
opt-level = 2 opt-level = 1
[profile.test] [profile.test]
opt-level = 2 opt-level = 2
+30 -33
View File
@@ -18,18 +18,30 @@
"i686-windows" "i686-windows"
"x86_64-windows" "x86_64-windows"
]; ];
in flake-utils.lib.eachSystem systems (system: in flake-utils.lib.eachSystem systems (system:
let let
pkgs = nixpkgs.legacyPackages.${system}; pkgs = nixpkgs.legacyPackages.${system};
lib = pkgs.lib;
naersk-lib = naersk.lib.${system}; naersk-lib = naersk.lib.${system};
isLinux = pkgs.stdenv.hostPlatform.isLinux;
alfis = { webgui ? true, doh ? true, edge ? false }: guiBuildInputs = lib.optionals isLinux (with pkgs; [
gtk3
webkitgtk_4_1
xdotool
libayatana-appindicator
]);
guiNativeBuildInputs = [ pkgs.pkg-config ]
++ lib.optionals isLinux [ pkgs.makeWrapper pkgs.wrapGAppsHook ];
guiRuntimeTools = lib.optionals isLinux [ pkgs.kdePackages.kdialog ];
guiRuntimeLibPath = lib.optionalString isLinux (lib.makeLibraryPath guiBuildInputs);
alfis = { webgui ? true, doh ? true }:
let let
features = builtins.concatStringsSep " " (builtins.concatMap features = builtins.concatStringsSep " " (builtins.concatMap
({ option, features }: pkgs.lib.optionals option features) [ ({ option, features }: lib.optionals option features) [
{ {
option = webgui; option = webgui;
features = [ "webgui" ]; features = [ "webgui" ];
@@ -38,55 +50,40 @@
option = doh; option = doh;
features = [ "doh" ]; features = [ "doh" ];
} }
{
option = edge;
features = [ "edge" ];
}
]); ]);
in naersk-lib.buildPackage { in naersk-lib.buildPackage {
pname = "alfis"; pname = "alfis";
nativeBuildInputs = with pkgs; [ pkg-config webkitgtk kdialog ]; root = ./.;
dontWrapQtApps = true; nativeBuildInputs = guiNativeBuildInputs;
buildInputs = guiBuildInputs;
cargoBuildOptions = opts: cargoBuildOptions = opts:
opts ++ [ "--no-default-features" ] opts ++ [ "--no-default-features" ]
++ [ "--features" ''"${features}"'' ]; ++ lib.optionals (features != "") [ "--features" features ];
root = ./.; preFixup = lib.optionalString isLinux ''
gappsWrapperArgs+=(--prefix PATH : "${lib.makeBinPath guiRuntimeTools}")
gappsWrapperArgs+=(--prefix LD_LIBRARY_PATH : "${guiRuntimeLibPath}")
'';
}; };
isWindows = builtins.elem system [ "i686-windows" "x86_64-windows" ];
in rec { in rec {
packages = { packages = {
alfis = alfis { alfis = alfis {
webgui = true; webgui = true;
doh = true; doh = true;
edge = false;
}; };
alfisWithoutGUI = alfis { alfisWithoutGUI = alfis {
webgui = false; webgui = false;
doh = true; doh = true;
edge = false;
};
} // pkgs.lib.optionalAttrs isWindows {
alfisEdge = alfis {
webgui = false;
doh = true;
edge = true;
}; };
}; };
defaultPackage = packages.alfis; defaultPackage = packages.alfis;
apps = with flake-utils.lib; apps = with flake-utils.lib; {
{ alfis = mkApp { drv = packages.alfis; };
alfis = mkApp { drv = packages.alfis; }; alfisWithoutGUI = mkApp { drv = packages.alfisWithoutGUI; };
alfisWithoutGUI = mkApp { drv = packages.alfisWithoutGUI; }; };
} // pkgs.lib.optionalAttrs isWindows {
alfisEdge = mkApp { drv = packages.alfisEdge; };
};
defaultApp = apps.alfis; defaultApp = apps.alfis;
devShell = import ./shell.nix { inherit pkgs; }; devShell = import ./shell.nix { inherit pkgs; };
}); });
} }
+17 -2
View File
@@ -1,6 +1,21 @@
{ pkgs ? import <nixpkgs> { } }: { pkgs ? import <nixpkgs> { } }:
let
runtimeLibs = with pkgs; [
gtk3
webkitgtk_4_1
xdotool
libayatana-appindicator
];
packages = with pkgs; [
cargo
rustc
pkg-config
kdePackages.kdialog
] ++ runtimeLibs;
in
pkgs.mkShell { pkgs.mkShell {
buildInputs = buildInputs = packages;
[ pkgs.cargo pkgs.rustc pkgs.webkitgtk pkgs.pkg-config pkgs.kdialog ]; LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath runtimeLibs;
} }
+19 -4
View File
@@ -975,13 +975,23 @@ impl Chain {
return false; return false;
} }
// If this signers' public key has already locked/signed that block we return error // If this signers' public key has already locked/signed that block we return error
// Use signing_keys cache to avoid repeated DB loads for the same blocks
let mut cache = self.signers.borrow_mut();
for i in (full_block.index + 1)..block.index { for i in (full_block.index + 1)..block.index {
let signer = self.get_block(i).expect("Error in DB!"); let pub_key = if let Some(cached) = cache.signing_keys.get(&i) {
if signer.pub_key == block.pub_key { cached.clone()
} else {
let signer = self.get_block(i).expect("Error in DB!");
cache.signing_keys.insert(i, signer.pub_key.clone());
signer.pub_key
};
if pub_key == block.pub_key {
warn!("Ignoring block {} from '{:?}', already signed by this key", block.index, &block.pub_key); warn!("Ignoring block {} from '{:?}', already signed by this key", block.index, &block.pub_key);
return false; return false;
} }
} }
// Cache this block's pub_key too for future checks
cache.signing_keys.insert(block.index, block.pub_key.clone());
true true
} }
@@ -1066,6 +1076,7 @@ impl Chain {
let mut signers = self.signers.borrow_mut(); let mut signers = self.signers.borrow_mut();
signers.index = block.index; signers.index = block.index;
signers.signers = result.clone(); signers.signers = result.clone();
signers.signing_keys.clear();
result result
} }
@@ -1087,12 +1098,15 @@ impl Chain {
struct SignersCache { struct SignersCache {
index: u64, index: u64,
signers: Vec<Bytes> signers: Vec<Bytes>,
/// Cache of block_index → pub_key for signing blocks since last full_block.
/// Avoids repeated DB loads in is_good_signer_for_block duplicate check.
signing_keys: HashMap<u64, Bytes>,
} }
impl SignersCache { impl SignersCache {
pub fn new() -> RefCell<SignersCache> { pub fn new() -> RefCell<SignersCache> {
let cache = SignersCache { index: 0, signers: Vec::new() }; let cache = SignersCache { index: 0, signers: Vec::new(), signing_keys: HashMap::new() };
RefCell::new(cache) RefCell::new(cache)
} }
@@ -1103,6 +1117,7 @@ impl SignersCache {
pub fn clear(&mut self) { pub fn clear(&mut self) {
self.index = 0; self.index = 0;
self.signers.clear(); self.signers.clear();
self.signing_keys.clear();
} }
} }
+2 -1
View File
@@ -51,7 +51,8 @@ impl BlockchainFilter {
} }
fn lookup_from_ns(qname: &str, qtype: QueryType, servers: &[IpAddr], tracker: &RttTracker<IpAddr>) -> Option<DnsPacket> { fn lookup_from_ns(qname: &str, qtype: QueryType, servers: &[IpAddr], tracker: &RttTracker<IpAddr>) -> Option<DnsPacket> {
let mut dns_client = DnsNetworkClient::new(); // Disable 0x20 encoding for NS queries - external NS servers may not preserve case
let mut dns_client = DnsNetworkClient::new_with_0x20(false);
dns_client.run().unwrap(); dns_client.run().unwrap();
let timeout = std::time::Duration::from_secs(2); let timeout = std::time::Duration::from_secs(2);
+9
View File
@@ -166,6 +166,9 @@ impl VectorPacketBuffer {
impl PacketBuffer for VectorPacketBuffer { impl PacketBuffer for VectorPacketBuffer {
fn read(&mut self) -> Result<u8> { fn read(&mut self) -> Result<u8> {
if self.pos >= self.buffer.len() {
return Err(BufferError::EndOfBuffer);
}
let res = self.buffer[self.pos]; let res = self.buffer[self.pos];
self.pos += 1; self.pos += 1;
@@ -173,10 +176,16 @@ impl PacketBuffer for VectorPacketBuffer {
} }
fn get(&mut self, pos: usize) -> Result<u8> { fn get(&mut self, pos: usize) -> Result<u8> {
if pos >= self.buffer.len() {
return Err(BufferError::EndOfBuffer);
}
Ok(self.buffer[pos]) Ok(self.buffer[pos])
} }
fn get_range(&mut self, start: usize, len: usize) -> Result<&[u8]> { fn get_range(&mut self, start: usize, len: usize) -> Result<&[u8]> {
if start + len > self.buffer.len() {
return Err(BufferError::EndOfBuffer);
}
Ok(&self.buffer[start..start + len as usize]) Ok(&self.buffer[start..start + len as usize])
} }
+1 -1
View File
@@ -617,7 +617,7 @@ impl DnsClient for HttpsDnsClient {
Ok(size) => { Ok(size) => {
let mut bytes: Vec<u8> = Vec::with_capacity(size); let mut bytes: Vec<u8> = Vec::with_capacity(size);
response.into_body().into_reader() response.into_body().into_reader()
.take(4096) .take(65535)
.read_to_end(&mut bytes)?; .read_to_end(&mut bytes)?;
let mut buffer = VectorPacketBuffer::new(); let mut buffer = VectorPacketBuffer::new();
buffer.buffer.extend_from_slice(bytes.as_slice()); buffer.buffer.extend_from_slice(bytes.as_slice());
+9 -5
View File
@@ -109,17 +109,21 @@ impl DnsResolver for ForwardingDnsResolver {
Ok(mut result) => { Ok(mut result) => {
let elapsed = start.elapsed().as_secs_f64() * 1000.0; let elapsed = start.elapsed().as_secs_f64() * 1000.0;
self.context.forwarder_tracker.record_success(upstream, elapsed); self.context.forwarder_tracker.record_success(upstream, elapsed);
self.context.cache.store(&result.answers)?;
// Fix domain names in answers to match original query case // Fix domain names to match original query case before caching
let qname_lower = qname.to_lowercase(); let qname_lower = qname.to_lowercase();
for answer in &mut result.answers { for rec in result.answers.iter_mut()
if let Some(domain) = answer.get_domain() { .chain(result.authorities.iter_mut())
.chain(result.resources.iter_mut())
{
if let Some(domain) = rec.get_domain() {
if domain.to_lowercase() == qname_lower { if domain.to_lowercase() == qname_lower {
answer.set_domain(qname.to_string()); rec.set_domain(qname.to_string());
} }
} }
} }
self.context.cache.store(&result.answers)?;
return Ok(result); return Ok(result);
} }
Err(e) => { Err(e) => {
+13 -1
View File
@@ -44,7 +44,19 @@ fn create_server_context(context: Arc<Mutex<Context>>, settings: &Settings) -> A
server_context.allow_recursive = true; server_context.allow_recursive = true;
server_context.resolve_strategy = match settings.dns.forwarders.is_empty() { server_context.resolve_strategy = match settings.dns.forwarders.is_empty() {
true => ResolveStrategy::Recursive, true => ResolveStrategy::Recursive,
false => ResolveStrategy::Forward { upstreams: settings.dns.forwarders.clone() } false => {
let upstreams = settings.dns.forwarders.iter().map(|s| {
if s.starts_with("https://") || s.parse::<std::net::SocketAddr>().is_ok() {
s.clone()
} else if let Ok(ip) = s.parse::<std::net::IpAddr>() {
std::net::SocketAddr::new(ip, 53).to_string()
} else {
warn!("Cannot parse forwarder address: {}", s);
s.clone()
}
}).collect();
ResolveStrategy::Forward { upstreams }
}
}; };
// Add host filters // Add host filters
for host in &settings.dns.hosts { for host in &settings.dns.hosts {
+14
View File
@@ -148,6 +148,20 @@ fn main() {
no_gui = true; no_gui = true;
} }
#[cfg(all(feature = "webgui", target_os = "linux"))]
if !no_gui {
let running_via_sudo = env::var_os("SUDO_UID").is_some();
let has_graphical_session = env::var_os("DISPLAY").is_some() || env::var_os("WAYLAND_DISPLAY").is_some();
if running_via_sudo {
warn!(target: LOG_TARGET_MAIN, "Running GUI via sudo is not supported on Linux, starting without GUI");
no_gui = true;
} else if !has_graphical_session {
warn!(target: LOG_TARGET_MAIN, "No graphical session detected, starting without GUI");
no_gui = true;
}
}
#[cfg(windows)] #[cfg(windows)]
if opt_matches.opt_present("service") { if opt_matches.opt_present("service") {
let appdata = env::var("PROGRAMDATA").expect("Failed to get APPDATA directory"); let appdata = env::var("PROGRAMDATA").expect("Failed to get APPDATA directory");
+1
View File
@@ -3,6 +3,7 @@ pub mod network;
pub mod peer; pub mod peer;
pub mod peers; pub mod peers;
pub mod state; pub mod state;
pub mod version;
pub use message::Message; pub use message::Message;
pub use network::Network; pub use network::Network;
+482 -90
View File
@@ -10,17 +10,22 @@ use std::sync::{Arc, mpsc, Mutex};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::{io, thread}; use std::{io, thread};
use crossbeam_channel::{bounded, Receiver, Sender};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
#[allow(unused_imports)] #[allow(unused_imports)]
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
use mio::event::Event; use mio::event::Event;
use mio::net::{TcpListener, TcpStream}; use mio::net::{TcpListener, TcpStream};
use mio::{Events, Interest, Poll, Registry, Token}; use mio::{Events, Interest, Poll, Registry, Token};
use crate::commons::rtt_tracker::RttTracker;
use crate::p2p::version::Version;
use rand::{random, Rng, RngCore}; use rand::{random, Rng, RngCore};
use rand::prelude::thread_rng; use rand::prelude::thread_rng;
use x25519_dalek::{PublicKey, ReusableSecret}; use x25519_dalek::{PublicKey, ReusableSecret};
use crate::blockchain::types::BlockQuality; use crate::blockchain::types::BlockQuality;
use crate::blockchain::hash_utils::{check_block_hash, check_block_signature, hash_difficulty};
use crate::commons::*; use crate::commons::*;
use crate::crypto::Chacha; use crate::crypto::Chacha;
use crate::eventbus::{post, register}; use crate::eventbus::{post, register};
@@ -29,6 +34,20 @@ use crate::{Block, Bytes, Context};
const SERVER: Token = Token(0); const SERVER: Token = Token(0);
/// Job sent to validation worker threads
struct ValidationJob {
token: Token,
block: Block,
}
/// Result from validation worker threads after CPU-intensive checks
enum PreValidationResult {
/// Block passed hash and signature checks, needs DB validation
NeedsDbValidation(Token, Block),
/// Block failed basic validation
Invalid(Token, Block),
}
pub struct Network { pub struct Network {
context: Arc<Mutex<Context>>, context: Arc<Mutex<Context>>,
secret_key: ReusableSecret, secret_key: ReusableSecret,
@@ -37,7 +56,14 @@ pub struct Network {
// States of peer connections, and some data to send when sockets become writable // States of peer connections, and some data to send when sockets become writable
peers: Peers, peers: Peers,
// Orphan blocks from future // Orphan blocks from future
future_blocks: HashMap<u64, Block> future_blocks: HashMap<u64, Block>,
// Validation thread pool channels
validation_sender: Sender<ValidationJob>,
validation_receiver: Receiver<PreValidationResult>,
// Track pending block requests: block_index -> (request_time, peer_token)
pending_requests: HashMap<u64, (Instant, Token)>,
// Track peer response times for adaptive selection
peer_rtt: RttTracker<Token>,
} }
impl Network { impl Network {
@@ -47,7 +73,88 @@ impl Network {
let secret_key = ReusableSecret::random_from_rng(&mut thread_rng); let secret_key = ReusableSecret::random_from_rng(&mut thread_rng);
let public_key = PublicKey::from(&secret_key); let public_key = PublicKey::from(&secret_key);
let peers = Peers::new(); let peers = Peers::new();
Network { context, secret_key, public_key, token: Token(1), peers, future_blocks: HashMap::new() }
// Create validation thread pool
let cpus = num_cpus::get();
let num_workers = cpus.min((cpus / 2).max(1)); // At most half cpus
info!("Starting {num_workers} validation threads");
let channel_capacity = (num_workers * 4).max(100);
let (job_sender, job_receiver) = bounded::<ValidationJob>(channel_capacity);
let (result_sender, result_receiver) = bounded::<PreValidationResult>(channel_capacity);
// Spawn validation worker threads
for i in 0..num_workers {
let job_rx = job_receiver.clone();
let result_tx = result_sender.clone();
thread::Builder::new()
.name(format!("block-validator-{}", i))
.spawn(move || {
Self::validation_worker(job_rx, result_tx);
})
.expect("Failed to spawn validation worker thread");
}
// Drop the extra senders/receivers we don't need
drop(result_sender);
Network {
context,
secret_key,
public_key,
token: Token(1),
peers,
future_blocks: HashMap::new(),
validation_sender: job_sender,
validation_receiver: result_receiver,
pending_requests: HashMap::new(),
peer_rtt: RttTracker::new(),
}
}
/// Worker thread that performs CPU-intensive block validation
fn validation_worker(
job_receiver: Receiver<ValidationJob>,
result_sender: Sender<PreValidationResult>,
) {
loop {
match job_receiver.recv() {
Ok(job) => {
let ValidationJob { token, block } = job;
// Perform CPU-intensive validation without holding any locks
// These checks don't require database access
// Check 1: Verify block hash
if !check_block_hash(&block) {
debug!("Block {} failed hash validation", block.index);
let _ = result_sender.send(PreValidationResult::Invalid(token, block));
continue;
}
// Check 2: Verify block signature
if !check_block_signature(&block) {
debug!("Block {} failed signature validation", block.index);
let _ = result_sender.send(PreValidationResult::Invalid(token, block));
continue;
}
// Check 3: Verify hash difficulty matches claimed difficulty
if hash_difficulty(&block.hash) < block.difficulty {
debug!("Block {} hash difficulty doesn't match claimed difficulty", block.index);
let _ = result_sender.send(PreValidationResult::Invalid(token, block));
continue;
}
// Block passed CPU-intensive checks, send for DB validation
let _ = result_sender.send(PreValidationResult::NeedsDbValidation(token, block));
}
Err(_) => {
// Channel closed, exit worker thread
break;
}
}
}
} }
pub fn start(&mut self) { pub fn start(&mut self) {
@@ -175,6 +282,9 @@ impl Network {
}; };
let _ = debug_send.send(format!("Handle connection event: {:?} for peer {}", &event, &peer)); let _ = debug_send.send(format!("Handle connection event: {:?} for peer {}", &event, &peer));
if !self.handle_connection_event(poll.registry(), event, &mut seen_blocks, &mut buffer) { if !self.handle_connection_event(poll.registry(), event, &mut seen_blocks, &mut buffer) {
// Record failure and remove pending requests for this peer
self.peer_rtt.record_failure(&token);
self.pending_requests.retain(|_index, (_time, t)| *t != token);
let _ = self.peers.close_peer(poll.registry(), &token); let _ = self.peers.close_peer(poll.registry(), &token);
let blocks = self.context.lock().unwrap().chain.get_height(); let blocks = self.context.lock().unwrap().chain.get_height();
let keys = self.context.lock().unwrap().chain.get_users_count(); let keys = self.context.lock().unwrap().chain.get_users_count();
@@ -185,6 +295,11 @@ impl Network {
} }
} }
let _ = debug_send.send(String::from("After events iter")); let _ = debug_send.send(String::from("After events iter"));
// Process validation results from worker threads
let _ = debug_send.send(String::from("Process validation results"));
self.process_validation_results();
if last_events_time.elapsed().as_secs() > MAX_IDLE_SECONDS { if last_events_time.elapsed().as_secs() > MAX_IDLE_SECONDS {
if self.peers.get_peers_count() > 0 { if self.peers.get_peers_count() > 0 {
warn!("Something is wrong with swarm connections, closing all."); warn!("Something is wrong with swarm connections, closing all.");
@@ -239,8 +354,20 @@ impl Network {
(blocks, max_height, context.chain.get_last_hash()) (blocks, max_height, context.chain.get_last_hash())
}; };
// Periodic sync maintenance: gap retries and idle peer kicks
if height < max_height {
self.sync_maintain(height, max_height);
} else if height >= max_height && !self.future_blocks.is_empty() {
// We've caught up but have stale future_blocks — clean them up
self.future_blocks.clear();
self.pending_requests.clear();
post(crate::event::Event::SyncFinished);
}
let _ = debug_send.send(String::from("Peers update")); let _ = debug_send.send(String::from("Peers update"));
let have_blocks: HashSet<u64> = self.future_blocks.values().map(|block| block.index).collect(); let mut have_blocks: HashSet<u64> = self.future_blocks.keys().copied().collect();
// Also include blocks that are pending validation to avoid re-requesting them
have_blocks.extend(self.pending_requests.keys().copied());
self.peers.update(poll.registry(), hash, height, max_height, have_blocks); self.peers.update(poll.registry(), hash, height, max_height, have_blocks);
ui_timer = Instant::now(); ui_timer = Instant::now();
} }
@@ -454,14 +581,23 @@ impl Network {
peer.set_state(State::idle()); peer.set_state(State::idle());
} }
State::Idle { from } => { State::Idle { from } => {
debug!("Odd version of pings for {}", peer.get_addr().ip()); if peer.has_queued_messages() {
if from.elapsed().as_secs() >= 120 { // Send ONE queued message at a time to avoid flooding remote peer
let data: Vec<u8> = { if let Some(queued_data) = peer.pop_message() {
let c = self.context.lock().unwrap(); if let Ok(data) = encode_bytes(&queued_data, peer.get_cipher()) {
let message = Message::ping(c.chain.get_height(), c.chain.get_last_hash()); send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending queued message {}", e));
encode_message(&message, peer.get_cipher()).unwrap() }
}; }
send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending ping {}", e)); } else {
debug!("Odd version of pings for {}", peer.get_addr().ip());
if from.elapsed().as_secs() >= 120 {
let data: Vec<u8> = {
let c = self.context.lock().unwrap();
let message = Message::ping(c.chain.get_height(), c.chain.get_last_hash());
encode_message(&message, peer.get_cipher()).unwrap()
};
send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending ping {}", e));
}
} }
} }
State::Error => {} State::Error => {}
@@ -504,6 +640,7 @@ impl Network {
} else if origin.eq(my_origin) { } else if origin.eq(my_origin) {
let peer = self.peers.get_mut_peer(token).unwrap(); let peer = self.peers.get_mut_peer(token).unwrap();
debug!("Incoming v{} on {}", &app_version, peer.get_addr().ip()); debug!("Incoming v{} on {}", &app_version, peer.get_addr().ip());
peer.set_version(Version::parse(&app_version));
let app_version = self.context.lock().unwrap().app_version.clone(); let app_version = self.context.lock().unwrap().app_version.clone();
if version == my_version { if version == my_version {
peer.set_public(public); peer.set_public(public);
@@ -538,6 +675,7 @@ impl Network {
let peer = self.peers.get_mut_peer(token).unwrap(); let peer = self.peers.get_mut_peer(token).unwrap();
// TODO check rand_id whether we have this peers connection already // TODO check rand_id whether we have this peers connection already
debug!("Outgoing v{} on {}", &app_version, peer.get_addr().ip()); debug!("Outgoing v{} on {}", &app_version, peer.get_addr().ip());
peer.set_version(Version::parse(&app_version));
peer.set_height(height); peer.set_height(height);
peer.set_active(true); peer.set_active(true);
peer.set_public(public); peer.set_public(public);
@@ -564,10 +702,20 @@ impl Network {
return State::message(Message::pong(my_height, my_hash)); return State::message(Message::pong(my_height, my_hash));
} }
if peer.is_higher(my_height) { if peer.is_higher(my_height) {
let mut context = self.context.lock().unwrap(); let max_height = {
context.chain.update_max_height(height); let mut context = self.context.lock().unwrap();
info!("Peer is higher, requesting block {} from {}", my_height + 1, peer.get_addr().ip()); context.chain.update_max_height(height);
State::message(Message::GetBlock { index: my_height + 1 }) context.chain.get_max_height()
};
// Start pipeline: queue a block request, then send pong
if let Some(idx) = self.next_block_to_request(my_height, max_height) {
let peer = self.peers.get_mut_peer(token).unwrap();
if peer.can_send() {
peer.queue_message(Message::GetBlock { index: idx });
self.pending_requests.insert(idx, (Instant::now(), *token));
}
}
State::message(Message::pong(my_height, my_hash))
} else if my_height == height && hash.ne(&my_hash) { } else if my_height == height && hash.ne(&my_hash) {
info!("Hashes are different, requesting block {} from {}", my_height, peer.get_addr().ip()); info!("Hashes are different, requesting block {} from {}", my_height, peer.get_addr().ip());
info!("My hash: {:?}, their hash: {:?}", &my_hash, &hash); info!("My hash: {:?}, their hash: {:?}", &my_hash, &hash);
@@ -585,10 +733,17 @@ impl Network {
return State::idle(); return State::idle();
} }
if peer.is_higher(my_height) { if peer.is_higher(my_height) {
let mut context = self.context.lock().unwrap(); let max_height = {
context.chain.update_max_height(height); let mut context = self.context.lock().unwrap();
info!("Peer is higher, requesting block {} from {}", my_height + 1, peer.get_addr().ip()); context.chain.update_max_height(height);
State::message(Message::GetBlock { index: my_height + 1 }) context.chain.get_max_height()
};
// Start pipeline: request next needed block from this peer
if let Some(idx) = self.next_block_to_request(my_height, max_height) {
self.pending_requests.insert(idx, (Instant::now(), *token));
return State::message(Message::GetBlock { index: idx });
}
State::idle()
} else if my_height == height && hash.ne(&my_hash) { } else if my_height == height && hash.ne(&my_hash) {
info!("Hashes are different, requesting block {} from {}", my_height, peer.get_addr().ip()); info!("Hashes are different, requesting block {} from {}", my_height, peer.get_addr().ip());
info!("My hash: {:?}, their hash: {:?}", &my_hash, &hash); info!("My hash: {:?}, their hash: {:?}", &my_hash, &hash);
@@ -636,12 +791,46 @@ impl Network {
if index != block.index { if index != block.index {
return State::Banned; return State::Banned;
} }
debug!("Received block {} with hash {:?}", block.index, &block.hash); let peer_addr = self.peers.get_peer(token).map_or("unknown".to_string(), |p| p.get_addr().ip().to_string());
if !seen_blocks.contains(&block.hash) { debug!("Received block {} with hash {:?} from {}", block.index, &block.hash, &peer_addr);
self.handle_block(token, block, seen_blocks) // Record RTT but keep in pending_requests until validation completes
} else { // (prevents re-requesting while block is in validation channel)
State::idle() if let Some((request_time, peer_token)) = self.pending_requests.get(&block.index) {
let rtt_ms = request_time.elapsed().as_secs_f64() * 1000.0;
self.peer_rtt.record_success(peer_token, rtt_ms);
} }
if !seen_blocks.contains(&block.hash) {
seen_blocks.insert(block.hash.clone());
// Send block to validation worker threads for parallel processing
match self.validation_sender.try_send(ValidationJob {
token: *token,
block,
}) {
Ok(_) => {},
Err(crossbeam_channel::TrySendError::Full(job)) => {
debug!("Validation queue full, deferring block {}", job.block.index);
self.future_blocks.insert(job.block.index, job.block);
},
Err(crossbeam_channel::TrySendError::Disconnected(_)) => {
warn!("Validation worker threads have stopped");
},
}
}
// Pipeline: immediately request the next needed block from this peer
let (my_height, max_height) = {
let c = self.context.lock().unwrap();
(c.chain.get_height(), c.chain.get_max_height())
};
if my_height < max_height {
if let Some(next_idx) = self.next_block_to_request(my_height, max_height) {
let peer_addr = self.peers.get_peer(token).map_or("unknown".to_string(), |p| p.get_addr().ip().to_string());
debug!("Requesting block {next_idx} from {}", &peer_addr);
self.pending_requests.insert(next_idx, (Instant::now(), *token));
return State::message(Message::GetBlock { index: next_idx });
}
}
State::idle()
} }
Message::Twin => State::Twin, Message::Twin => State::Twin,
Message::Loop => State::Loop Message::Loop => State::Loop
@@ -649,79 +838,282 @@ impl Network {
answer answer
} }
fn handle_block(&mut self, token: &Token, block: Block, seen_blocks: &mut HashSet<Bytes>) -> State { /// Find the next block that needs to be requested within the sync window.
seen_blocks.insert(block.hash.clone()); /// Returns None if all blocks in the window are already requested or received.
let peers_count = self.peers.get_peers_active_count(); fn next_block_to_request(&self, my_height: u64, max_height: u64) -> Option<u64> {
let peer = self.peers.get_mut_peer(token).unwrap(); const SYNC_WINDOW: u64 = 500;
peer.set_received_block(block.index); // Don't pipeline until genesis block is added — other blocks would just
trace!("New block from {}", &peer.get_addr()); // queue as "future" and spam warnings since last_block is None.
let window = if my_height == 0 { 1 } else { SYNC_WINDOW };
let end = max_height.min(my_height + window);
for idx in (my_height + 1)..=end {
if !self.future_blocks.contains_key(&idx) && !self.pending_requests.contains_key(&idx) {
return Some(idx);
}
}
None
}
let mut context = self.context.lock().unwrap(); /// Periodic sync maintenance: retry gap blocks, kick idle peers into the pipeline.
let max_height = context.chain.get_max_height(); fn sync_maintain(&mut self, my_height: u64, max_height: u64) {
match context.chain.check_new_block(&block) { const GAP_TIMEOUT_MSECS: u128 = 1500;
BlockQuality::Good => {
let mut next_index = block.index + 1; let now = Instant::now();
context.chain.add_block(block);
// If we have some consequent blocks in a bucket of 'future blocks', we add them // Record failures for timed-out requests before cleanup
while let Some(block) = self.future_blocks.remove(&next_index) { let timed_out: Vec<Token> = self.pending_requests.iter()
if context.chain.check_new_block(&block) == BlockQuality::Good { .filter(|(_, (t, _))| t.elapsed().as_millis() >= GAP_TIMEOUT_MSECS)
debug!("Added block {} from future blocks", next_index); .map(|(_, (_, tok))| *tok)
context.chain.add_block(block); .collect();
} else { for tok in &timed_out {
warn!("Block {} in future blocks is bad!", block.index); self.peer_rtt.record_failure(tok);
}
// Clean up stale requests: timed out or peer no longer active
self.pending_requests.retain(|_index, (request_time, token)| {
if request_time.elapsed().as_millis() >= GAP_TIMEOUT_MSECS * 2 {
return false;
}
match self.peers.get_peer(token) {
Some(peer) => peer.active(),
None => false,
}
});
let raw_peers = self.peers.get_active_peer_tokens();
if raw_peers.is_empty() {
return;
}
let active_peers = self.peer_rtt.select_ordered(&raw_peers);
// Retry gap blocks with shorter timeout — push to FRONT of fastest peer's queue
let min_future_block = self.future_blocks.keys().min().copied();
if let Some(min_future) = min_future_block {
// active_peers is already ranked by RTT (fastest first)
let mut gap_peer_idx = 0;
for block_index in (my_height + 1)..min_future {
if self.future_blocks.contains_key(&block_index) {
continue;
}
if let Some((req_time, old_token)) = self.pending_requests.get(&block_index) {
if req_time.elapsed().as_millis() < GAP_TIMEOUT_MSECS {
continue;
}
self.peer_rtt.record_failure(old_token);
// Skip the failed peer
if let Some(pos) = active_peers.iter().position(|t| t == old_token) {
if pos == gap_peer_idx {
gap_peer_idx = (gap_peer_idx + 1) % active_peers.len();
}
}
debug!("Gap block {} timed out, re-requesting (have future from {})", block_index, min_future);
}
// Find a peer that can accept a message (old nodes only allow 1 in flight)
let mut sent = false;
for _ in 0..active_peers.len() {
let peer_token = active_peers[gap_peer_idx % active_peers.len()];
gap_peer_idx = (gap_peer_idx + 1) % active_peers.len();
if let Some(peer) = self.peers.get_mut_peer(&peer_token) {
if !peer.can_send() {
continue;
}
peer.queue_priority_message(Message::GetBlock { index: block_index });
self.pending_requests.insert(block_index, (now, peer_token));
debug!("Requesting gap block {} from {} (priority)", block_index, peer.get_addr().ip());
sent = true;
break; break;
} }
next_index += 1;
} }
let my_height = context.chain.get_height(); if !sent {
post(crate::event::Event::BlockchainChanged { index: my_height }); break; // All peers busy, try next cycle
// If it was the last block to sync }
if my_height == max_height { }
post(crate::event::Event::SyncFinished); }
self.future_blocks.clear();
} else { // Kick idle peers into the pipeline (peers with no pending requests)
let event = crate::event::Event::Syncing { have: my_height, height: max(max_height, my_height) }; for t in &active_peers {
post(event); let has_pending = self.pending_requests.values().any(|(_, tk)| tk == t);
} if has_pending {
let domains = context.chain.get_domains_count(); continue;
let keys = context.chain.get_users_count(); }
post(crate::event::Event::NetworkStatus { blocks: my_height, domains, keys, nodes: peers_count }); if let Some(peer) = self.peers.get_peer(t) {
} if !peer.get_state().is_idle() || peer.has_queued_messages() {
BlockQuality::Twin => { debug!("Ignoring duplicate block {}", block.index); } continue;
BlockQuality::Future => { }
debug!("Got future block {}", block.index); }
self.future_blocks.insert(block.index, block); if let Some(idx) = self.next_block_to_request(my_height, max_height) {
} if let Some(peer) = self.peers.get_mut_peer(t) {
BlockQuality::Bad => { peer.queue_message(Message::GetBlock { index: idx });
// TODO save bad public keys to banned table self.pending_requests.insert(idx, (now, *t));
debug!("Ignoring bad block from {}:\n{:?}", peer.get_addr(), &block); debug!("Kicking idle peer {} with block {}", peer.get_addr().ip(), idx);
let height = context.chain.get_height(); }
if height + 1 == block.index { }
context.chain.update_max_height(height); }
post(crate::event::Event::SyncFinished); }
}
return State::Banned; /// Process validation results from worker threads and add validated blocks to chain
} fn process_validation_results(&mut self) {
BlockQuality::Rewind => { // Process all available validation results without blocking
debug!("Got some orphan block, requesting its parent"); while let Ok(result) = self.validation_receiver.try_recv() {
return State::message(Message::GetBlock { index: block.index - 1 }); match result {
} PreValidationResult::NeedsDbValidation(token, block) => {
BlockQuality::Fork => { // CPU-intensive validation passed, now do DB-dependent validation
debug!("Got forked block {} with hash {:?}", block.index, block.hash); let peers_count = self.peers.get_peers_active_count();
// If we are very much behind of blockchain
let lagged = block.index == context.chain.get_height() && block.index + LIMITED_CONFIDENCE_DEPTH <= max_height; // Update peer state
let our_block = context.chain.get_block(block.index).unwrap(); if let Some(peer) = self.peers.get_mut_peer(&token) {
if block.is_better_than(&our_block) || lagged { peer.set_received_block(block.index);
context.chain.replace_block(block).expect("Error replacing block with fork"); trace!("Validated block {} from {}", block.index, peer.get_addr());
let index = context.chain.get_height(); } else {
post(crate::event::Event::BlockchainChanged { index }); // Peer disconnected, but we can still process the block
} else { trace!("Validated block {} from disconnected peer", block.index);
debug!("Fork in not better than our block, dropping."); }
return State::message(Message::block(our_block.index, our_block.as_bytes()));
// Lock context only for DB operations
let mut context = self.context.lock().unwrap();
let max_height = context.chain.get_max_height();
// Do remaining DB-dependent validation and add to chain
match context.chain.check_new_block(&block) {
BlockQuality::Good => {
let block_index = block.index;
let mut next_index = block.index + 1;
context.chain.add_block(block);
// Clean up pending request for this block
self.pending_requests.remove(&block_index);
// Process future blocks that are now ready
while let Some(block) = self.future_blocks.remove(&next_index) {
if context.chain.check_new_block(&block) == BlockQuality::Good {
debug!("Added block {} from future blocks", next_index);
context.chain.add_block(block);
self.pending_requests.remove(&next_index);
} else {
warn!("Block {} in future blocks is bad!", block.index);
break;
}
next_index += 1;
}
let my_height = context.chain.get_height();
post(crate::event::Event::BlockchainChanged { index: my_height });
// Check if sync is finished
if my_height >= max_height {
post(crate::event::Event::SyncFinished);
self.future_blocks.clear();
} else {
let event = crate::event::Event::Syncing {
have: my_height,
height: max(max_height, my_height)
};
post(event);
}
let domains = context.chain.get_domains_count();
let keys = context.chain.get_users_count();
post(crate::event::Event::NetworkStatus {
blocks: my_height,
domains,
keys,
nodes: peers_count
});
}
BlockQuality::Twin => {
debug!("Ignoring duplicate block {}", block.index);
}
BlockQuality::Future => {
debug!("Got future block {}", block.index);
let block_index = block.index;
self.future_blocks.insert(block.index, block);
// Clean up pending request since we have this block now
self.pending_requests.remove(&block_index);
}
BlockQuality::Bad => {
debug!("Block {} failed DB validation", block.index);
if let Some(peer) = self.peers.get_mut_peer(&token) {
debug!("Banning peer {} for bad block", peer.get_addr());
// Mark peer for banning
peer.set_state(State::Banned);
}
let height = context.chain.get_height();
if height + 1 == block.index {
context.chain.update_max_height(height);
post(crate::event::Event::SyncFinished);
}
}
BlockQuality::Rewind => {
debug!("Got orphan block {}, requesting parent", block.index);
// Save the block so it can be processed after the rewind resolves
let block_index = block.index;
self.future_blocks.insert(block.index, block);
self.pending_requests.remove(&block_index);
if let Some(peer) = self.peers.get_mut_peer(&token) {
peer.set_state(State::message(Message::GetBlock {
index: block_index - 1
}));
}
}
BlockQuality::Fork => {
debug!("Got forked block {} with hash {:?}", block.index, block.hash);
let lagged = block.index == context.chain.get_height()
&& block.index + LIMITED_CONFIDENCE_DEPTH <= max_height;
if let Some(our_block) = context.chain.get_block(block.index) {
if block.is_better_than(&our_block) || lagged {
let fork_index = block.index;
context.chain.replace_block(block)
.expect("Error replacing block with fork");
let mut next_index = fork_index + 1;
// Process future blocks that may now be valid after fork switch
while let Some(fb) = self.future_blocks.remove(&next_index) {
if context.chain.check_new_block(&fb) == BlockQuality::Good {
debug!("Added block {} from future blocks after fork", next_index);
context.chain.add_block(fb);
self.pending_requests.remove(&next_index);
} else {
debug!("Future block {} not good after fork", next_index);
break;
}
next_index += 1;
}
let my_height = context.chain.get_height();
post(crate::event::Event::BlockchainChanged { index: my_height });
if my_height >= max_height {
post(crate::event::Event::SyncFinished);
self.future_blocks.clear();
} else {
post(crate::event::Event::Syncing {
have: my_height,
height: max(max_height, my_height)
});
}
} else {
debug!("Fork is not better than our block, dropping");
if let Some(peer) = self.peers.get_mut_peer(&token) {
peer.set_state(State::message(Message::block(
our_block.index,
our_block.as_bytes()
)));
}
}
}
}
}
// Context lock is dropped here
}
PreValidationResult::Invalid(token, block) => {
// Block failed CPU-intensive validation
debug!("Block {} failed pre-validation (hash/signature)", block.index);
if let Some(peer) = self.peers.get_mut_peer(&token) {
debug!("Banning peer {} for invalid block", peer.get_addr());
peer.set_state(State::Banned);
}
} }
} }
} }
State::idle()
} }
/// Gets new token from old token, mutating the last /// Gets new token from old token, mutating the last
+55 -2
View File
@@ -1,3 +1,4 @@
use std::collections::VecDeque;
use std::fmt::Display; use std::fmt::Display;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::time::Instant; use std::time::Instant;
@@ -5,13 +6,16 @@ use std::time::Instant;
use mio::net::TcpStream; use mio::net::TcpStream;
use crate::crypto::Chacha; use crate::crypto::Chacha;
use crate::p2p::message::Message;
use crate::p2p::State; use crate::p2p::State;
use crate::p2p::version::Version;
#[derive(Debug)] #[derive(Debug)]
pub struct Peer { pub struct Peer {
addr: SocketAddr, addr: SocketAddr,
stream: TcpStream, stream: TcpStream,
state: State, state: State,
outgoing: VecDeque<Vec<u8>>,
id: String, id: String,
height: u64, height: u64,
inbound: bool, inbound: bool,
@@ -21,7 +25,8 @@ pub struct Peer {
reconnects: u32, reconnects: u32,
received_block: u64, received_block: u64,
sent_height: u64, sent_height: u64,
cipher: Option<Chacha> cipher: Option<Chacha>,
version: Version,
} }
impl Peer { impl Peer {
@@ -30,6 +35,7 @@ impl Peer {
addr, addr,
stream, stream,
state, state,
outgoing: VecDeque::new(),
id: String::new(), id: String::new(),
height: 0, height: 0,
inbound, inbound,
@@ -39,7 +45,8 @@ impl Peer {
reconnects: 0, reconnects: 0,
received_block: 0, received_block: 0,
sent_height: 0, sent_height: 0,
cipher: None cipher: None,
version: Version::default(),
} }
} }
@@ -78,6 +85,52 @@ impl Peer {
self.state = state; self.state = state;
} }
/// Queue a message for sending. Does not change peer state.
pub fn queue_message(&mut self, msg: Message) {
let data = serde_cbor::to_vec(&msg).unwrap();
self.outgoing.push_back(data);
}
/// Queue a high-priority message at the front (e.g. gap block requests)
pub fn queue_priority_message(&mut self, msg: Message) {
let data = serde_cbor::to_vec(&msg).unwrap();
self.outgoing.push_front(data);
}
pub fn has_queued_messages(&self) -> bool {
!self.outgoing.is_empty()
}
pub fn queued_count(&self) -> usize {
self.outgoing.len()
}
pub fn pop_message(&mut self) -> Option<Vec<u8>> {
self.outgoing.pop_front()
}
pub fn set_version(&mut self, version: Version) {
self.version = version;
}
pub fn get_version(&self) -> &Version {
&self.version
}
/// Old nodes (< 0.8.9) overwrite outgoing messages, can only have 1 in flight
pub fn supports_queue(&self) -> bool {
self.version >= Version { major: 0, minor: 8, patch: 9 }
}
/// Check if we can send another message to this peer
pub fn can_send(&self) -> bool {
if self.supports_queue() {
true
} else {
self.outgoing.is_empty()
}
}
pub fn get_id(&self) -> &str { pub fn get_id(&self) -> &str {
&self.id &self.id
} }
+13 -3
View File
@@ -202,6 +202,13 @@ impl Peers {
count count
} }
pub fn get_active_peer_tokens(&self) -> Vec<Token> {
self.peers.iter()
.filter(|(_, peer)| peer.active())
.map(|(token, _)| *token)
.collect()
}
pub fn is_tween_connect(&self, id: &str) -> bool { pub fn is_tween_connect(&self, id: &str) -> bool {
for (_, peer) in self.peers.iter() { for (_, peer) in self.peers.iter() {
if peer.active() && peer.get_id() == id { if peer.active() && peer.get_id() == id {
@@ -255,7 +262,10 @@ impl Peers {
let mut stale_tokens = Vec::new(); let mut stale_tokens = Vec::new();
for (token, peer) in self.peers.iter_mut() { for (token, peer) in self.peers.iter_mut() {
if let State::Idle { from } = peer.get_state() { if let State::Idle { from } = peer.get_state() {
if from.elapsed().as_secs() >= PING_PERIOD + random_time { if peer.has_queued_messages() {
let stream = peer.get_stream();
registry.reregister(stream, *token, Interest::WRITABLE).unwrap();
} else if from.elapsed().as_secs() >= PING_PERIOD + random_time {
// Sometimes we check for new peers instead of pinging // Sometimes we check for new peers instead of pinging
let message = if nodes < MAX_NODES && random::<bool>() { let message = if nodes < MAX_NODES && random::<bool>() {
Message::GetPeers Message::GetPeers
@@ -273,7 +283,7 @@ impl Peers {
stale_tokens.push((token.clone(), peer.get_addr())); stale_tokens.push((token.clone(), peer.get_addr()));
continue; continue;
} }
if matches!(peer.get_state(), State::Message {..}) { if matches!(peer.get_state(), State::Message {..}) || peer.has_queued_messages() {
let stream = peer.get_stream(); let stream = peer.get_stream();
registry.reregister(stream, *token, Interest::WRITABLE).unwrap(); registry.reregister(stream, *token, Interest::WRITABLE).unwrap();
} }
@@ -288,7 +298,7 @@ impl Peers {
self.ignored.retain(|_addr, time| { time.elapsed().as_secs() < 600 }); self.ignored.retain(|_addr, time| { time.elapsed().as_secs() < 600 });
// If someone has more blocks we sync // If someone has more blocks we sync
if nodes >= MIN_CONNECTED_NODES_START_SYNC && height < max_height { if nodes >= MIN_CONNECTED_NODES_START_SYNC && height < max_height && have_blocks.is_empty() {
// Give some opportunity to get more peers instead of requests for blocks only // Give some opportunity to get more peers instead of requests for blocks only
let request_blocks = nodes >= 10 || random::<bool>(); let request_blocks = nodes >= 10 || random::<bool>();
if request_blocks { if request_blocks {
+59
View File
@@ -0,0 +1,59 @@
use std::fmt;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Version {
pub major: u16,
pub minor: u16,
pub patch: u16,
}
impl Version {
pub fn parse(s: &str) -> Version {
let parts: Vec<&str> = s.split('.').collect();
Version {
major: parts.get(0).and_then(|s| s.parse().ok()).unwrap_or(0),
minor: parts.get(1).and_then(|s| s.parse().ok()).unwrap_or(0),
patch: parts.get(2).and_then(|s| s.parse().ok()).unwrap_or(0),
}
}
}
impl Default for Version {
fn default() -> Self {
Version { major: 0, minor: 0, patch: 0 }
}
}
impl PartialOrd for Version {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Version {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.major.cmp(&other.major)
.then(self.minor.cmp(&other.minor))
.then(self.patch.cmp(&other.patch))
}
}
impl fmt::Display for Version {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}.{}.{}", self.major, self.minor, self.patch)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_version_comparison() {
assert!(Version::parse("0.8.9") >= Version::parse("0.8.9"));
assert!(Version::parse("0.8.10") > Version::parse("0.8.9"));
assert!(Version::parse("0.8.8") < Version::parse("0.8.9"));
assert!(Version::parse("0.9.0") > Version::parse("0.8.99"));
assert!(Version::parse("1.0.0") > Version::parse("0.99.99"));
}
}
+63 -28
View File
@@ -3,6 +3,7 @@ extern crate serde;
extern crate serde_json; extern crate serde_json;
extern crate tinyfiledialogs as tfd; extern crate tinyfiledialogs as tfd;
use std::panic::{self, AssertUnwindSafe};
use std::sync::{Arc, Mutex, MutexGuard}; use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread; use std::thread;
@@ -61,13 +62,8 @@ pub fn run_interface(context: Arc<Mutex<Context>>, miner: Arc<Mutex<Miner>>, hid
#[cfg(not(target_os = "windows"))] #[cfg(not(target_os = "windows"))]
let icon = load_icon_from_png(); let icon = load_icon_from_png();
let tray_icon = TrayIconBuilder::new() let tray_icon = build_tray_icon(&title, tray_menu, icon);
.with_menu(Box::new(tray_menu)) let tray_available = tray_icon.is_some();
.with_tooltip(&title)
.with_icon(icon)
.with_menu_on_left_click(false)
.build()
.unwrap();
let window_size = tao::dpi::LogicalSize::new(1024, 720); let window_size = tao::dpi::LogicalSize::new(1024, 720);
// Get primary monitor and calculate center position // Get primary monitor and calculate center position
@@ -90,7 +86,7 @@ pub fn run_interface(context: Arc<Mutex<Context>>, miner: Arc<Mutex<Miner>>, hid
.with_inner_size(window_size) .with_inner_size(window_size)
.with_min_inner_size(tao::dpi::LogicalSize::new(773, 350)) .with_min_inner_size(tao::dpi::LogicalSize::new(773, 350))
.with_resizable(true) .with_resizable(true)
.with_visible(!hide); .with_visible(!hide || !tray_available);
if let Some(position) = position { if let Some(position) = position {
builder = builder.with_position(position); builder = builder.with_position(position);
@@ -319,15 +315,17 @@ pub fn run_interface(context: Arc<Mutex<Context>>, miner: Arc<Mutex<Miner>>, hid
true true
}); });
let proxy = event_loop.create_proxy(); if tray_available {
TrayIconEvent::set_event_handler(Some(move |event| { let proxy = event_loop.create_proxy();
let _ = proxy.send_event(UserEvent::TrayIconEvent(event)); TrayIconEvent::set_event_handler(Some(move |event| {
})); let _ = proxy.send_event(UserEvent::TrayIconEvent(event));
}));
let proxy = event_loop.create_proxy(); let proxy = event_loop.create_proxy();
MenuEvent::set_event_handler(Some(move |event| { MenuEvent::set_event_handler(Some(move |event| {
let _ = proxy.send_event(UserEvent::MenuEvent(event)); let _ = proxy.send_event(UserEvent::MenuEvent(event));
})); }));
}
let proxy = event_loop.create_proxy(); let proxy = event_loop.create_proxy();
@@ -340,7 +338,14 @@ pub fn run_interface(context: Arc<Mutex<Context>>, miner: Arc<Mutex<Miner>>, hid
event: WindowEvent::CloseRequested, event: WindowEvent::CloseRequested,
.. ..
} => { } => {
window.set_visible(false); if tray_available {
window.set_visible(false);
} else {
info!("Interface closed, exiting");
post(Event::ActionQuit);
thread::sleep(Duration::from_millis(100));
*control_flow = ControlFlow::Exit;
}
} }
TaoEvent::UserEvent(user_event) => { TaoEvent::UserEvent(user_event) => {
let wv = webview_clone.lock().unwrap(); let wv = webview_clone.lock().unwrap();
@@ -364,19 +369,21 @@ pub fn run_interface(context: Arc<Mutex<Context>>, miner: Arc<Mutex<Miner>>, hid
show_warning(&wv, &text); show_warning(&wv, &text);
} }
UserEvent::TrayIconEvent(event) => { UserEvent::TrayIconEvent(event) => {
match event { if let Some(tray_icon) = tray_icon.as_ref() {
TrayIconEvent::DoubleClick { button, .. } => { match event {
if button == tray_icon::MouseButton::Left { TrayIconEvent::DoubleClick { button, .. } => {
window.set_visible(true); if button == tray_icon::MouseButton::Left {
window.set_focus(); window.set_visible(true);
window.set_focus();
}
} }
TrayIconEvent::Enter { .. } => {
let nodes = connected_nodes.load(Ordering::SeqCst);
let title = format!("ALFIS {}\nConnected: {nodes}", env!("CARGO_PKG_VERSION"));
let _ = tray_icon.set_tooltip(Some(title));
}
_ => {}
} }
TrayIconEvent::Enter { .. } => {
let nodes = connected_nodes.load(Ordering::SeqCst);
let title = format!("ALFIS {}\nConnected: {nodes}", env!("CARGO_PKG_VERSION"));
let _ = tray_icon.set_tooltip(Some(title));
}
_ => {}
} }
} }
UserEvent::MenuEvent(event) => { UserEvent::MenuEvent(event) => {
@@ -396,6 +403,34 @@ pub fn run_interface(context: Arc<Mutex<Context>>, miner: Arc<Mutex<Miner>>, hid
}); });
} }
fn build_tray_icon(title: &str, tray_menu: Menu, icon: tray_icon::Icon) -> Option<tray_icon::TrayIcon> {
let previous_hook = panic::take_hook();
panic::set_hook(Box::new(|_| {}));
let result = panic::catch_unwind(AssertUnwindSafe(|| {
TrayIconBuilder::new()
.with_menu(Box::new(tray_menu))
.with_tooltip(title)
.with_icon(icon)
.with_menu_on_left_click(false)
.build()
}));
panic::set_hook(previous_hook);
match result {
Ok(Ok(tray_icon)) => Some(tray_icon),
Ok(Err(error)) => {
warn!("Tray icon is unavailable: {error}");
None
}
Err(_) => {
warn!("Tray icon is unavailable: failed to load appindicator library, continuing without tray support");
None
}
}
}
#[derive(Debug)] #[derive(Debug)]
enum UserEvent { enum UserEvent {
EvalJs(String), EvalJs(String),