Compare commits
10 Commits
c8fa174ac0
...
071567fa73
| Author | SHA1 | Date | |
|---|---|---|---|
| 071567fa73 | |||
| 489f16e462 | |||
| f4c8b1fe42 | |||
| b01ade19b1 | |||
| 9bea173f21 | |||
| ef573436d9 | |||
| d86fb6916f | |||
| 5044064f6c | |||
| c90458eaaf | |||
| cfb3cf6cf8 |
@@ -17,7 +17,7 @@ jobs:
|
||||
project_version: ${{ env.VERSION }}
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v2
|
||||
uses: actions/checkout@v4
|
||||
- name: Getting version
|
||||
run: echo "VERSION=${GITHUB_REF#refs/*/}" >> $GITHUB_ENV
|
||||
|
||||
@@ -25,57 +25,50 @@ jobs:
|
||||
name: Create Release
|
||||
runs-on: ubuntu-latest
|
||||
needs: get_version
|
||||
outputs:
|
||||
upload_url: ${{ steps.create_release.outputs.upload_url }}
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v2
|
||||
uses: actions/checkout@v4
|
||||
- name: Create Release
|
||||
id: create_release
|
||||
uses: actions/create-release@v1
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
VERSION: ${{ needs.get_version.outputs.project_version }}
|
||||
with:
|
||||
tag_name: ${{ env.VERSION }}
|
||||
release_name: ${{ env.VERSION }}
|
||||
body: |
|
||||
## New
|
||||
run: |
|
||||
gh release create "$VERSION" \
|
||||
--title "$VERSION" \
|
||||
--draft \
|
||||
--notes "## New
|
||||
* Added new features.
|
||||
## Bug Fixes & Improvements
|
||||
* Various fixes and stability improvements.
|
||||
## Documentation & others
|
||||
* Updated documentation.
|
||||
draft: true
|
||||
prerelease: false
|
||||
* Updated documentation."
|
||||
|
||||
linux-nogui:
|
||||
name: Create and upload builds
|
||||
needs: [ create_release, get_version ]
|
||||
name: Build Linux nogui (${{ matrix.arch }})
|
||||
needs: [create_release, get_version]
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
arch: [ amd64, i686, armhf, armel, arm64 ]
|
||||
arch: [amd64, i686, armhf, armel, arm64]
|
||||
defaults:
|
||||
run:
|
||||
shell: bash
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v2
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: install dependencies
|
||||
- name: Install Rust toolchain
|
||||
uses: dtolnay/rust-toolchain@stable
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
sudo apt update
|
||||
sudo apt upgrade
|
||||
sudo apt install libwebkit2gtk-4.1-dev libxdo-dev libsoup-3.0-dev upx-ucl
|
||||
cargo install cross
|
||||
sudo apt-get update
|
||||
sudo apt-get install -y upx-ucl
|
||||
cargo install cross --git https://github.com/cross-rs/cross
|
||||
|
||||
- name: Build and package deb packages
|
||||
run: PKGARCH=${{ matrix.arch }} contrib/deb/generate.sh
|
||||
|
||||
- name: Upload bins & debs
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: |
|
||||
tag_name="${{ needs.get_version.outputs.project_version }}"
|
||||
ls -lh ./bin/
|
||||
@@ -83,62 +76,68 @@ jobs:
|
||||
gh release upload "$tag_name" *.deb --clobber
|
||||
|
||||
build-and-upload-gui-zips:
|
||||
name: Create and upload builds
|
||||
needs: [ create_release, get_version ]
|
||||
name: Build GUI (${{ matrix.name }})
|
||||
needs: [create_release, get_version]
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
matrix:
|
||||
os: [ windows-latest, ubuntu-latest, macOS-latest ]
|
||||
include:
|
||||
- os: ubuntu-latest
|
||||
name: linux-amd64
|
||||
bin_path: ./target/release/alfis
|
||||
archive_cmd: zip
|
||||
- os: windows-latest
|
||||
name: windows-amd64
|
||||
bin_path: target/release/alfis.exe
|
||||
archive_cmd: 7z
|
||||
- os: macos-latest
|
||||
name: darwin-arm64
|
||||
bin_path: ./target/release/alfis
|
||||
archive_cmd: zip
|
||||
- os: macos-latest
|
||||
name: darwin-amd64
|
||||
target: x86_64-apple-darwin
|
||||
bin_path: ./target/x86_64-apple-darwin/release/alfis
|
||||
archive_cmd: zip
|
||||
|
||||
defaults:
|
||||
run:
|
||||
shell: bash
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v2
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: install dependencies
|
||||
- name: Install Rust toolchain
|
||||
uses: dtolnay/rust-toolchain@stable
|
||||
with:
|
||||
targets: ${{ matrix.target || '' }}
|
||||
|
||||
- name: Install Linux dependencies
|
||||
if: contains(matrix.os, 'ubuntu')
|
||||
run: sudo apt update && sudo apt install --no-install-recommends libwebkit2gtk-4.1-dev libxdo-dev libsoup-3.0-dev upx-ucl
|
||||
run: |
|
||||
sudo apt-get update
|
||||
sudo apt-get install -y --no-install-recommends \
|
||||
libwebkit2gtk-4.1-dev \
|
||||
libgtk-3-dev \
|
||||
libxdo-dev \
|
||||
libsoup-3.0-dev \
|
||||
libayatana-appindicator3-dev
|
||||
|
||||
- name: Build release binaries
|
||||
run: cargo build --release
|
||||
run: cargo build --release ${{ matrix.target && format('--target {0}', matrix.target) || '' }}
|
||||
|
||||
- name: windows
|
||||
if: contains(matrix.os, 'windows')
|
||||
run: echo "BIN_ARCH=windows-amd64" >> $GITHUB_ENV
|
||||
|
||||
- name: linux
|
||||
if: contains(matrix.os, 'ubuntu')
|
||||
run: echo "BIN_ARCH=linux-amd64" >> $GITHUB_ENV
|
||||
|
||||
- name: macos
|
||||
if: contains(matrix.os, 'mac')
|
||||
run: echo "BIN_ARCH=darwin-amd64" >> $GITHUB_ENV
|
||||
|
||||
- name: Fill variables
|
||||
- name: Package zip
|
||||
env:
|
||||
ZIP_NAME: alfis-${{ matrix.name }}-${{ needs.get_version.outputs.project_version }}.zip
|
||||
run: |
|
||||
echo "BIN_PATH=./target/release/alfis" >> $GITHUB_ENV
|
||||
echo "ZIP_NAME=alfis-${{env.BIN_ARCH}}-${{ needs.get_version.outputs.project_version }}.zip" >> $GITHUB_ENV
|
||||
- name: Windows variables
|
||||
if: contains(matrix.os, 'windows')
|
||||
run: |
|
||||
echo "BIN_PATH=target/release/alfis.exe" >> $GITHUB_ENV
|
||||
echo "ZIP_NAME=alfis-${{env.BIN_ARCH}}-${{ needs.get_version.outputs.project_version }}.zip" >> $GITHUB_ENV
|
||||
|
||||
- name: Packaging
|
||||
uses: papeloto/action-zip@v1
|
||||
with:
|
||||
files: ${{ env.BIN_PATH }} alfis.toml README.md LICENSE adblock.txt
|
||||
dest: ${{ env.ZIP_NAME }}
|
||||
if [ "${{ matrix.archive_cmd }}" = "7z" ]; then
|
||||
7z a "$ZIP_NAME" "${{ matrix.bin_path }}" alfis.toml README.md LICENSE adblock.txt
|
||||
else
|
||||
zip "$ZIP_NAME" "${{ matrix.bin_path }}" alfis.toml README.md LICENSE adblock.txt
|
||||
fi
|
||||
|
||||
- name: Upload zip
|
||||
id: upload-zip
|
||||
uses: actions/upload-release-asset@v1
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
with:
|
||||
upload_url: ${{ needs.create_release.outputs.upload_url }}
|
||||
asset_path: ${{ env.ZIP_NAME }}
|
||||
asset_name: ${{ env.ZIP_NAME }}
|
||||
asset_content_type: application/zip
|
||||
TAG: ${{ needs.get_version.outputs.project_version }}
|
||||
ZIP_NAME: alfis-${{ matrix.name }}-${{ needs.get_version.outputs.project_version }}.zip
|
||||
run: gh release upload "$TAG" "$ZIP_NAME" --clobber
|
||||
Generated
+460
-438
File diff suppressed because it is too large
Load Diff
+5
-5
@@ -15,7 +15,7 @@ getopts = "0.2.24"
|
||||
log = "0.4.28"
|
||||
simplelog = "0.12.2"
|
||||
toml = "1.0.7"
|
||||
sha2 = "0.10.9"
|
||||
sha2 = "0.11.0"
|
||||
ed25519-dalek = "2.2.0"
|
||||
x25519-dalek = { version = "2.0.1", features = ["reusable_secrets"] }
|
||||
ecies-ed25519-ng = { git = "https://github.com/Revertron/ecies-ed25519-ng", rev = "554ca29", version = "0.5.3" }
|
||||
@@ -43,9 +43,9 @@ thread-priority = "3.0.0"
|
||||
crossbeam-channel = "0.5.13"
|
||||
|
||||
# Optional dependencies regulated by features
|
||||
wry = { version = "0.53", optional = true }
|
||||
tao = { version = "0.34", optional = true }
|
||||
tray-icon = { version = "0.21.2", optional = true }
|
||||
wry = { version = "0.55.0", optional = true }
|
||||
tao = { version = "0.35.0", optional = true }
|
||||
tray-icon = { version = "0.22.0", optional = true }
|
||||
tinyfiledialogs = { version = "3.9.1", optional = true }
|
||||
open = { version = "5.3.0", optional = true }
|
||||
|
||||
@@ -65,7 +65,7 @@ lto = true
|
||||
strip = true # Automatically strip symbols from the binary.
|
||||
|
||||
[profile.dev]
|
||||
opt-level = 2
|
||||
opt-level = 1
|
||||
|
||||
[profile.test]
|
||||
opt-level = 2
|
||||
|
||||
@@ -18,18 +18,30 @@
|
||||
"i686-windows"
|
||||
"x86_64-windows"
|
||||
];
|
||||
|
||||
in flake-utils.lib.eachSystem systems (system:
|
||||
let
|
||||
|
||||
pkgs = nixpkgs.legacyPackages.${system};
|
||||
|
||||
lib = pkgs.lib;
|
||||
naersk-lib = naersk.lib.${system};
|
||||
isLinux = pkgs.stdenv.hostPlatform.isLinux;
|
||||
|
||||
alfis = { webgui ? true, doh ? true, edge ? false }:
|
||||
guiBuildInputs = lib.optionals isLinux (with pkgs; [
|
||||
gtk3
|
||||
webkitgtk_4_1
|
||||
xdotool
|
||||
libayatana-appindicator
|
||||
]);
|
||||
|
||||
guiNativeBuildInputs = [ pkgs.pkg-config ]
|
||||
++ lib.optionals isLinux [ pkgs.makeWrapper pkgs.wrapGAppsHook ];
|
||||
|
||||
guiRuntimeTools = lib.optionals isLinux [ pkgs.kdePackages.kdialog ];
|
||||
guiRuntimeLibPath = lib.optionalString isLinux (lib.makeLibraryPath guiBuildInputs);
|
||||
|
||||
alfis = { webgui ? true, doh ? true }:
|
||||
let
|
||||
features = builtins.concatStringsSep " " (builtins.concatMap
|
||||
({ option, features }: pkgs.lib.optionals option features) [
|
||||
({ option, features }: lib.optionals option features) [
|
||||
{
|
||||
option = webgui;
|
||||
features = [ "webgui" ];
|
||||
@@ -38,55 +50,40 @@
|
||||
option = doh;
|
||||
features = [ "doh" ];
|
||||
}
|
||||
{
|
||||
option = edge;
|
||||
features = [ "edge" ];
|
||||
}
|
||||
]);
|
||||
in naersk-lib.buildPackage {
|
||||
pname = "alfis";
|
||||
nativeBuildInputs = with pkgs; [ pkg-config webkitgtk kdialog ];
|
||||
dontWrapQtApps = true;
|
||||
root = ./.;
|
||||
nativeBuildInputs = guiNativeBuildInputs;
|
||||
buildInputs = guiBuildInputs;
|
||||
cargoBuildOptions = opts:
|
||||
opts ++ [ "--no-default-features" ]
|
||||
++ [ "--features" ''"${features}"'' ];
|
||||
root = ./.;
|
||||
++ lib.optionals (features != "") [ "--features" features ];
|
||||
preFixup = lib.optionalString isLinux ''
|
||||
gappsWrapperArgs+=(--prefix PATH : "${lib.makeBinPath guiRuntimeTools}")
|
||||
gappsWrapperArgs+=(--prefix LD_LIBRARY_PATH : "${guiRuntimeLibPath}")
|
||||
'';
|
||||
};
|
||||
|
||||
isWindows = builtins.elem system [ "i686-windows" "x86_64-windows" ];
|
||||
in rec {
|
||||
|
||||
packages = {
|
||||
alfis = alfis {
|
||||
webgui = true;
|
||||
doh = true;
|
||||
edge = false;
|
||||
};
|
||||
alfisWithoutGUI = alfis {
|
||||
webgui = false;
|
||||
doh = true;
|
||||
edge = false;
|
||||
};
|
||||
} // pkgs.lib.optionalAttrs isWindows {
|
||||
alfisEdge = alfis {
|
||||
webgui = false;
|
||||
doh = true;
|
||||
edge = true;
|
||||
};
|
||||
};
|
||||
|
||||
defaultPackage = packages.alfis;
|
||||
|
||||
apps = with flake-utils.lib;
|
||||
{
|
||||
apps = with flake-utils.lib; {
|
||||
alfis = mkApp { drv = packages.alfis; };
|
||||
alfisWithoutGUI = mkApp { drv = packages.alfisWithoutGUI; };
|
||||
} // pkgs.lib.optionalAttrs isWindows {
|
||||
alfisEdge = mkApp { drv = packages.alfisEdge; };
|
||||
};
|
||||
|
||||
defaultApp = apps.alfis;
|
||||
|
||||
devShell = import ./shell.nix { inherit pkgs; };
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1,6 +1,21 @@
|
||||
{ pkgs ? import <nixpkgs> { } }:
|
||||
|
||||
let
|
||||
runtimeLibs = with pkgs; [
|
||||
gtk3
|
||||
webkitgtk_4_1
|
||||
xdotool
|
||||
libayatana-appindicator
|
||||
];
|
||||
|
||||
packages = with pkgs; [
|
||||
cargo
|
||||
rustc
|
||||
pkg-config
|
||||
kdePackages.kdialog
|
||||
] ++ runtimeLibs;
|
||||
in
|
||||
pkgs.mkShell {
|
||||
buildInputs =
|
||||
[ pkgs.cargo pkgs.rustc pkgs.webkitgtk pkgs.pkg-config pkgs.kdialog ];
|
||||
buildInputs = packages;
|
||||
LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath runtimeLibs;
|
||||
}
|
||||
|
||||
+18
-3
@@ -975,13 +975,23 @@ impl Chain {
|
||||
return false;
|
||||
}
|
||||
// If this signers' public key has already locked/signed that block we return error
|
||||
// Use signing_keys cache to avoid repeated DB loads for the same blocks
|
||||
let mut cache = self.signers.borrow_mut();
|
||||
for i in (full_block.index + 1)..block.index {
|
||||
let pub_key = if let Some(cached) = cache.signing_keys.get(&i) {
|
||||
cached.clone()
|
||||
} else {
|
||||
let signer = self.get_block(i).expect("Error in DB!");
|
||||
if signer.pub_key == block.pub_key {
|
||||
cache.signing_keys.insert(i, signer.pub_key.clone());
|
||||
signer.pub_key
|
||||
};
|
||||
if pub_key == block.pub_key {
|
||||
warn!("Ignoring block {} from '{:?}', already signed by this key", block.index, &block.pub_key);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
// Cache this block's pub_key too for future checks
|
||||
cache.signing_keys.insert(block.index, block.pub_key.clone());
|
||||
true
|
||||
}
|
||||
|
||||
@@ -1066,6 +1076,7 @@ impl Chain {
|
||||
let mut signers = self.signers.borrow_mut();
|
||||
signers.index = block.index;
|
||||
signers.signers = result.clone();
|
||||
signers.signing_keys.clear();
|
||||
result
|
||||
}
|
||||
|
||||
@@ -1087,12 +1098,15 @@ impl Chain {
|
||||
|
||||
struct SignersCache {
|
||||
index: u64,
|
||||
signers: Vec<Bytes>
|
||||
signers: Vec<Bytes>,
|
||||
/// Cache of block_index → pub_key for signing blocks since last full_block.
|
||||
/// Avoids repeated DB loads in is_good_signer_for_block duplicate check.
|
||||
signing_keys: HashMap<u64, Bytes>,
|
||||
}
|
||||
|
||||
impl SignersCache {
|
||||
pub fn new() -> RefCell<SignersCache> {
|
||||
let cache = SignersCache { index: 0, signers: Vec::new() };
|
||||
let cache = SignersCache { index: 0, signers: Vec::new(), signing_keys: HashMap::new() };
|
||||
RefCell::new(cache)
|
||||
}
|
||||
|
||||
@@ -1103,6 +1117,7 @@ impl SignersCache {
|
||||
pub fn clear(&mut self) {
|
||||
self.index = 0;
|
||||
self.signers.clear();
|
||||
self.signing_keys.clear();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -51,7 +51,8 @@ impl BlockchainFilter {
|
||||
}
|
||||
|
||||
fn lookup_from_ns(qname: &str, qtype: QueryType, servers: &[IpAddr], tracker: &RttTracker<IpAddr>) -> Option<DnsPacket> {
|
||||
let mut dns_client = DnsNetworkClient::new();
|
||||
// Disable 0x20 encoding for NS queries - external NS servers may not preserve case
|
||||
let mut dns_client = DnsNetworkClient::new_with_0x20(false);
|
||||
dns_client.run().unwrap();
|
||||
let timeout = std::time::Duration::from_secs(2);
|
||||
|
||||
|
||||
@@ -166,6 +166,9 @@ impl VectorPacketBuffer {
|
||||
|
||||
impl PacketBuffer for VectorPacketBuffer {
|
||||
fn read(&mut self) -> Result<u8> {
|
||||
if self.pos >= self.buffer.len() {
|
||||
return Err(BufferError::EndOfBuffer);
|
||||
}
|
||||
let res = self.buffer[self.pos];
|
||||
self.pos += 1;
|
||||
|
||||
@@ -173,10 +176,16 @@ impl PacketBuffer for VectorPacketBuffer {
|
||||
}
|
||||
|
||||
fn get(&mut self, pos: usize) -> Result<u8> {
|
||||
if pos >= self.buffer.len() {
|
||||
return Err(BufferError::EndOfBuffer);
|
||||
}
|
||||
Ok(self.buffer[pos])
|
||||
}
|
||||
|
||||
fn get_range(&mut self, start: usize, len: usize) -> Result<&[u8]> {
|
||||
if start + len > self.buffer.len() {
|
||||
return Err(BufferError::EndOfBuffer);
|
||||
}
|
||||
Ok(&self.buffer[start..start + len as usize])
|
||||
}
|
||||
|
||||
|
||||
+1
-1
@@ -617,7 +617,7 @@ impl DnsClient for HttpsDnsClient {
|
||||
Ok(size) => {
|
||||
let mut bytes: Vec<u8> = Vec::with_capacity(size);
|
||||
response.into_body().into_reader()
|
||||
.take(4096)
|
||||
.take(65535)
|
||||
.read_to_end(&mut bytes)?;
|
||||
let mut buffer = VectorPacketBuffer::new();
|
||||
buffer.buffer.extend_from_slice(bytes.as_slice());
|
||||
|
||||
+9
-5
@@ -109,17 +109,21 @@ impl DnsResolver for ForwardingDnsResolver {
|
||||
Ok(mut result) => {
|
||||
let elapsed = start.elapsed().as_secs_f64() * 1000.0;
|
||||
self.context.forwarder_tracker.record_success(upstream, elapsed);
|
||||
self.context.cache.store(&result.answers)?;
|
||||
|
||||
// Fix domain names in answers to match original query case
|
||||
// Fix domain names to match original query case before caching
|
||||
let qname_lower = qname.to_lowercase();
|
||||
for answer in &mut result.answers {
|
||||
if let Some(domain) = answer.get_domain() {
|
||||
for rec in result.answers.iter_mut()
|
||||
.chain(result.authorities.iter_mut())
|
||||
.chain(result.resources.iter_mut())
|
||||
{
|
||||
if let Some(domain) = rec.get_domain() {
|
||||
if domain.to_lowercase() == qname_lower {
|
||||
answer.set_domain(qname.to_string());
|
||||
rec.set_domain(qname.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.context.cache.store(&result.answers)?;
|
||||
return Ok(result);
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
+13
-1
@@ -44,7 +44,19 @@ fn create_server_context(context: Arc<Mutex<Context>>, settings: &Settings) -> A
|
||||
server_context.allow_recursive = true;
|
||||
server_context.resolve_strategy = match settings.dns.forwarders.is_empty() {
|
||||
true => ResolveStrategy::Recursive,
|
||||
false => ResolveStrategy::Forward { upstreams: settings.dns.forwarders.clone() }
|
||||
false => {
|
||||
let upstreams = settings.dns.forwarders.iter().map(|s| {
|
||||
if s.starts_with("https://") || s.parse::<std::net::SocketAddr>().is_ok() {
|
||||
s.clone()
|
||||
} else if let Ok(ip) = s.parse::<std::net::IpAddr>() {
|
||||
std::net::SocketAddr::new(ip, 53).to_string()
|
||||
} else {
|
||||
warn!("Cannot parse forwarder address: {}", s);
|
||||
s.clone()
|
||||
}
|
||||
}).collect();
|
||||
ResolveStrategy::Forward { upstreams }
|
||||
}
|
||||
};
|
||||
// Add host filters
|
||||
for host in &settings.dns.hosts {
|
||||
|
||||
+14
@@ -148,6 +148,20 @@ fn main() {
|
||||
no_gui = true;
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "webgui", target_os = "linux"))]
|
||||
if !no_gui {
|
||||
let running_via_sudo = env::var_os("SUDO_UID").is_some();
|
||||
let has_graphical_session = env::var_os("DISPLAY").is_some() || env::var_os("WAYLAND_DISPLAY").is_some();
|
||||
|
||||
if running_via_sudo {
|
||||
warn!(target: LOG_TARGET_MAIN, "Running GUI via sudo is not supported on Linux, starting without GUI");
|
||||
no_gui = true;
|
||||
} else if !has_graphical_session {
|
||||
warn!(target: LOG_TARGET_MAIN, "No graphical session detected, starting without GUI");
|
||||
no_gui = true;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
if opt_matches.opt_present("service") {
|
||||
let appdata = env::var("PROGRAMDATA").expect("Failed to get APPDATA directory");
|
||||
|
||||
@@ -3,6 +3,7 @@ pub mod network;
|
||||
pub mod peer;
|
||||
pub mod peers;
|
||||
pub mod state;
|
||||
pub mod version;
|
||||
|
||||
pub use message::Message;
|
||||
pub use network::Network;
|
||||
|
||||
+430
-38
@@ -10,17 +10,22 @@ use std::sync::{Arc, mpsc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{io, thread};
|
||||
|
||||
use crossbeam_channel::{bounded, Receiver, Sender};
|
||||
|
||||
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
|
||||
#[allow(unused_imports)]
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use mio::event::Event;
|
||||
use mio::net::{TcpListener, TcpStream};
|
||||
use mio::{Events, Interest, Poll, Registry, Token};
|
||||
use crate::commons::rtt_tracker::RttTracker;
|
||||
use crate::p2p::version::Version;
|
||||
use rand::{random, Rng, RngCore};
|
||||
use rand::prelude::thread_rng;
|
||||
use x25519_dalek::{PublicKey, ReusableSecret};
|
||||
|
||||
use crate::blockchain::types::BlockQuality;
|
||||
use crate::blockchain::hash_utils::{check_block_hash, check_block_signature, hash_difficulty};
|
||||
use crate::commons::*;
|
||||
use crate::crypto::Chacha;
|
||||
use crate::eventbus::{post, register};
|
||||
@@ -29,6 +34,20 @@ use crate::{Block, Bytes, Context};
|
||||
|
||||
const SERVER: Token = Token(0);
|
||||
|
||||
/// Job sent to validation worker threads
|
||||
struct ValidationJob {
|
||||
token: Token,
|
||||
block: Block,
|
||||
}
|
||||
|
||||
/// Result from validation worker threads after CPU-intensive checks
|
||||
enum PreValidationResult {
|
||||
/// Block passed hash and signature checks, needs DB validation
|
||||
NeedsDbValidation(Token, Block),
|
||||
/// Block failed basic validation
|
||||
Invalid(Token, Block),
|
||||
}
|
||||
|
||||
pub struct Network {
|
||||
context: Arc<Mutex<Context>>,
|
||||
secret_key: ReusableSecret,
|
||||
@@ -37,7 +56,14 @@ pub struct Network {
|
||||
// States of peer connections, and some data to send when sockets become writable
|
||||
peers: Peers,
|
||||
// Orphan blocks from future
|
||||
future_blocks: HashMap<u64, Block>
|
||||
future_blocks: HashMap<u64, Block>,
|
||||
// Validation thread pool channels
|
||||
validation_sender: Sender<ValidationJob>,
|
||||
validation_receiver: Receiver<PreValidationResult>,
|
||||
// Track pending block requests: block_index -> (request_time, peer_token)
|
||||
pending_requests: HashMap<u64, (Instant, Token)>,
|
||||
// Track peer response times for adaptive selection
|
||||
peer_rtt: RttTracker<Token>,
|
||||
}
|
||||
|
||||
impl Network {
|
||||
@@ -47,7 +73,88 @@ impl Network {
|
||||
let secret_key = ReusableSecret::random_from_rng(&mut thread_rng);
|
||||
let public_key = PublicKey::from(&secret_key);
|
||||
let peers = Peers::new();
|
||||
Network { context, secret_key, public_key, token: Token(1), peers, future_blocks: HashMap::new() }
|
||||
|
||||
// Create validation thread pool
|
||||
let cpus = num_cpus::get();
|
||||
let num_workers = cpus.min((cpus / 2).max(1)); // At most half cpus
|
||||
info!("Starting {num_workers} validation threads");
|
||||
let channel_capacity = (num_workers * 4).max(100);
|
||||
let (job_sender, job_receiver) = bounded::<ValidationJob>(channel_capacity);
|
||||
let (result_sender, result_receiver) = bounded::<PreValidationResult>(channel_capacity);
|
||||
|
||||
// Spawn validation worker threads
|
||||
for i in 0..num_workers {
|
||||
let job_rx = job_receiver.clone();
|
||||
let result_tx = result_sender.clone();
|
||||
|
||||
thread::Builder::new()
|
||||
.name(format!("block-validator-{}", i))
|
||||
.spawn(move || {
|
||||
Self::validation_worker(job_rx, result_tx);
|
||||
})
|
||||
.expect("Failed to spawn validation worker thread");
|
||||
}
|
||||
|
||||
// Drop the extra senders/receivers we don't need
|
||||
drop(result_sender);
|
||||
|
||||
Network {
|
||||
context,
|
||||
secret_key,
|
||||
public_key,
|
||||
token: Token(1),
|
||||
peers,
|
||||
future_blocks: HashMap::new(),
|
||||
validation_sender: job_sender,
|
||||
validation_receiver: result_receiver,
|
||||
pending_requests: HashMap::new(),
|
||||
peer_rtt: RttTracker::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Worker thread that performs CPU-intensive block validation
|
||||
fn validation_worker(
|
||||
job_receiver: Receiver<ValidationJob>,
|
||||
result_sender: Sender<PreValidationResult>,
|
||||
) {
|
||||
loop {
|
||||
match job_receiver.recv() {
|
||||
Ok(job) => {
|
||||
let ValidationJob { token, block } = job;
|
||||
|
||||
// Perform CPU-intensive validation without holding any locks
|
||||
// These checks don't require database access
|
||||
|
||||
// Check 1: Verify block hash
|
||||
if !check_block_hash(&block) {
|
||||
debug!("Block {} failed hash validation", block.index);
|
||||
let _ = result_sender.send(PreValidationResult::Invalid(token, block));
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check 2: Verify block signature
|
||||
if !check_block_signature(&block) {
|
||||
debug!("Block {} failed signature validation", block.index);
|
||||
let _ = result_sender.send(PreValidationResult::Invalid(token, block));
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check 3: Verify hash difficulty matches claimed difficulty
|
||||
if hash_difficulty(&block.hash) < block.difficulty {
|
||||
debug!("Block {} hash difficulty doesn't match claimed difficulty", block.index);
|
||||
let _ = result_sender.send(PreValidationResult::Invalid(token, block));
|
||||
continue;
|
||||
}
|
||||
|
||||
// Block passed CPU-intensive checks, send for DB validation
|
||||
let _ = result_sender.send(PreValidationResult::NeedsDbValidation(token, block));
|
||||
}
|
||||
Err(_) => {
|
||||
// Channel closed, exit worker thread
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start(&mut self) {
|
||||
@@ -175,6 +282,9 @@ impl Network {
|
||||
};
|
||||
let _ = debug_send.send(format!("Handle connection event: {:?} for peer {}", &event, &peer));
|
||||
if !self.handle_connection_event(poll.registry(), event, &mut seen_blocks, &mut buffer) {
|
||||
// Record failure and remove pending requests for this peer
|
||||
self.peer_rtt.record_failure(&token);
|
||||
self.pending_requests.retain(|_index, (_time, t)| *t != token);
|
||||
let _ = self.peers.close_peer(poll.registry(), &token);
|
||||
let blocks = self.context.lock().unwrap().chain.get_height();
|
||||
let keys = self.context.lock().unwrap().chain.get_users_count();
|
||||
@@ -185,6 +295,11 @@ impl Network {
|
||||
}
|
||||
}
|
||||
let _ = debug_send.send(String::from("After events iter"));
|
||||
|
||||
// Process validation results from worker threads
|
||||
let _ = debug_send.send(String::from("Process validation results"));
|
||||
self.process_validation_results();
|
||||
|
||||
if last_events_time.elapsed().as_secs() > MAX_IDLE_SECONDS {
|
||||
if self.peers.get_peers_count() > 0 {
|
||||
warn!("Something is wrong with swarm connections, closing all.");
|
||||
@@ -239,8 +354,20 @@ impl Network {
|
||||
(blocks, max_height, context.chain.get_last_hash())
|
||||
};
|
||||
|
||||
// Periodic sync maintenance: gap retries and idle peer kicks
|
||||
if height < max_height {
|
||||
self.sync_maintain(height, max_height);
|
||||
} else if height >= max_height && !self.future_blocks.is_empty() {
|
||||
// We've caught up but have stale future_blocks — clean them up
|
||||
self.future_blocks.clear();
|
||||
self.pending_requests.clear();
|
||||
post(crate::event::Event::SyncFinished);
|
||||
}
|
||||
|
||||
let _ = debug_send.send(String::from("Peers update"));
|
||||
let have_blocks: HashSet<u64> = self.future_blocks.values().map(|block| block.index).collect();
|
||||
let mut have_blocks: HashSet<u64> = self.future_blocks.keys().copied().collect();
|
||||
// Also include blocks that are pending validation to avoid re-requesting them
|
||||
have_blocks.extend(self.pending_requests.keys().copied());
|
||||
self.peers.update(poll.registry(), hash, height, max_height, have_blocks);
|
||||
ui_timer = Instant::now();
|
||||
}
|
||||
@@ -454,6 +581,14 @@ impl Network {
|
||||
peer.set_state(State::idle());
|
||||
}
|
||||
State::Idle { from } => {
|
||||
if peer.has_queued_messages() {
|
||||
// Send ONE queued message at a time to avoid flooding remote peer
|
||||
if let Some(queued_data) = peer.pop_message() {
|
||||
if let Ok(data) = encode_bytes(&queued_data, peer.get_cipher()) {
|
||||
send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending queued message {}", e));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
debug!("Odd version of pings for {}", peer.get_addr().ip());
|
||||
if from.elapsed().as_secs() >= 120 {
|
||||
let data: Vec<u8> = {
|
||||
@@ -464,6 +599,7 @@ impl Network {
|
||||
send_message(peer.get_stream(), &data).unwrap_or_else(|e| warn!("Error sending ping {}", e));
|
||||
}
|
||||
}
|
||||
}
|
||||
State::Error => {}
|
||||
State::Banned => {}
|
||||
State::Offline { .. } => {}
|
||||
@@ -504,6 +640,7 @@ impl Network {
|
||||
} else if origin.eq(my_origin) {
|
||||
let peer = self.peers.get_mut_peer(token).unwrap();
|
||||
debug!("Incoming v{} on {}", &app_version, peer.get_addr().ip());
|
||||
peer.set_version(Version::parse(&app_version));
|
||||
let app_version = self.context.lock().unwrap().app_version.clone();
|
||||
if version == my_version {
|
||||
peer.set_public(public);
|
||||
@@ -538,6 +675,7 @@ impl Network {
|
||||
let peer = self.peers.get_mut_peer(token).unwrap();
|
||||
// TODO check rand_id whether we have this peers connection already
|
||||
debug!("Outgoing v{} on {}", &app_version, peer.get_addr().ip());
|
||||
peer.set_version(Version::parse(&app_version));
|
||||
peer.set_height(height);
|
||||
peer.set_active(true);
|
||||
peer.set_public(public);
|
||||
@@ -564,10 +702,20 @@ impl Network {
|
||||
return State::message(Message::pong(my_height, my_hash));
|
||||
}
|
||||
if peer.is_higher(my_height) {
|
||||
let max_height = {
|
||||
let mut context = self.context.lock().unwrap();
|
||||
context.chain.update_max_height(height);
|
||||
info!("Peer is higher, requesting block {} from {}", my_height + 1, peer.get_addr().ip());
|
||||
State::message(Message::GetBlock { index: my_height + 1 })
|
||||
context.chain.get_max_height()
|
||||
};
|
||||
// Start pipeline: queue a block request, then send pong
|
||||
if let Some(idx) = self.next_block_to_request(my_height, max_height) {
|
||||
let peer = self.peers.get_mut_peer(token).unwrap();
|
||||
if peer.can_send() {
|
||||
peer.queue_message(Message::GetBlock { index: idx });
|
||||
self.pending_requests.insert(idx, (Instant::now(), *token));
|
||||
}
|
||||
}
|
||||
State::message(Message::pong(my_height, my_hash))
|
||||
} else if my_height == height && hash.ne(&my_hash) {
|
||||
info!("Hashes are different, requesting block {} from {}", my_height, peer.get_addr().ip());
|
||||
info!("My hash: {:?}, their hash: {:?}", &my_hash, &hash);
|
||||
@@ -585,10 +733,17 @@ impl Network {
|
||||
return State::idle();
|
||||
}
|
||||
if peer.is_higher(my_height) {
|
||||
let max_height = {
|
||||
let mut context = self.context.lock().unwrap();
|
||||
context.chain.update_max_height(height);
|
||||
info!("Peer is higher, requesting block {} from {}", my_height + 1, peer.get_addr().ip());
|
||||
State::message(Message::GetBlock { index: my_height + 1 })
|
||||
context.chain.get_max_height()
|
||||
};
|
||||
// Start pipeline: request next needed block from this peer
|
||||
if let Some(idx) = self.next_block_to_request(my_height, max_height) {
|
||||
self.pending_requests.insert(idx, (Instant::now(), *token));
|
||||
return State::message(Message::GetBlock { index: idx });
|
||||
}
|
||||
State::idle()
|
||||
} else if my_height == height && hash.ne(&my_hash) {
|
||||
info!("Hashes are different, requesting block {} from {}", my_height, peer.get_addr().ip());
|
||||
info!("My hash: {:?}, their hash: {:?}", &my_hash, &hash);
|
||||
@@ -636,12 +791,46 @@ impl Network {
|
||||
if index != block.index {
|
||||
return State::Banned;
|
||||
}
|
||||
debug!("Received block {} with hash {:?}", block.index, &block.hash);
|
||||
if !seen_blocks.contains(&block.hash) {
|
||||
self.handle_block(token, block, seen_blocks)
|
||||
} else {
|
||||
State::idle()
|
||||
let peer_addr = self.peers.get_peer(token).map_or("unknown".to_string(), |p| p.get_addr().ip().to_string());
|
||||
debug!("Received block {} with hash {:?} from {}", block.index, &block.hash, &peer_addr);
|
||||
// Record RTT but keep in pending_requests until validation completes
|
||||
// (prevents re-requesting while block is in validation channel)
|
||||
if let Some((request_time, peer_token)) = self.pending_requests.get(&block.index) {
|
||||
let rtt_ms = request_time.elapsed().as_secs_f64() * 1000.0;
|
||||
self.peer_rtt.record_success(peer_token, rtt_ms);
|
||||
}
|
||||
if !seen_blocks.contains(&block.hash) {
|
||||
seen_blocks.insert(block.hash.clone());
|
||||
// Send block to validation worker threads for parallel processing
|
||||
match self.validation_sender.try_send(ValidationJob {
|
||||
token: *token,
|
||||
block,
|
||||
}) {
|
||||
Ok(_) => {},
|
||||
Err(crossbeam_channel::TrySendError::Full(job)) => {
|
||||
debug!("Validation queue full, deferring block {}", job.block.index);
|
||||
self.future_blocks.insert(job.block.index, job.block);
|
||||
},
|
||||
Err(crossbeam_channel::TrySendError::Disconnected(_)) => {
|
||||
warn!("Validation worker threads have stopped");
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Pipeline: immediately request the next needed block from this peer
|
||||
let (my_height, max_height) = {
|
||||
let c = self.context.lock().unwrap();
|
||||
(c.chain.get_height(), c.chain.get_max_height())
|
||||
};
|
||||
if my_height < max_height {
|
||||
if let Some(next_idx) = self.next_block_to_request(my_height, max_height) {
|
||||
let peer_addr = self.peers.get_peer(token).map_or("unknown".to_string(), |p| p.get_addr().ip().to_string());
|
||||
debug!("Requesting block {next_idx} from {}", &peer_addr);
|
||||
self.pending_requests.insert(next_idx, (Instant::now(), *token));
|
||||
return State::message(Message::GetBlock { index: next_idx });
|
||||
}
|
||||
}
|
||||
State::idle()
|
||||
}
|
||||
Message::Twin => State::Twin,
|
||||
Message::Loop => State::Loop
|
||||
@@ -649,79 +838,282 @@ impl Network {
|
||||
answer
|
||||
}
|
||||
|
||||
fn handle_block(&mut self, token: &Token, block: Block, seen_blocks: &mut HashSet<Bytes>) -> State {
|
||||
seen_blocks.insert(block.hash.clone());
|
||||
let peers_count = self.peers.get_peers_active_count();
|
||||
let peer = self.peers.get_mut_peer(token).unwrap();
|
||||
peer.set_received_block(block.index);
|
||||
trace!("New block from {}", &peer.get_addr());
|
||||
/// Find the next block that needs to be requested within the sync window.
|
||||
/// Returns None if all blocks in the window are already requested or received.
|
||||
fn next_block_to_request(&self, my_height: u64, max_height: u64) -> Option<u64> {
|
||||
const SYNC_WINDOW: u64 = 500;
|
||||
// Don't pipeline until genesis block is added — other blocks would just
|
||||
// queue as "future" and spam warnings since last_block is None.
|
||||
let window = if my_height == 0 { 1 } else { SYNC_WINDOW };
|
||||
let end = max_height.min(my_height + window);
|
||||
for idx in (my_height + 1)..=end {
|
||||
if !self.future_blocks.contains_key(&idx) && !self.pending_requests.contains_key(&idx) {
|
||||
return Some(idx);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Periodic sync maintenance: retry gap blocks, kick idle peers into the pipeline.
|
||||
fn sync_maintain(&mut self, my_height: u64, max_height: u64) {
|
||||
const GAP_TIMEOUT_MSECS: u128 = 1500;
|
||||
|
||||
let now = Instant::now();
|
||||
|
||||
// Record failures for timed-out requests before cleanup
|
||||
let timed_out: Vec<Token> = self.pending_requests.iter()
|
||||
.filter(|(_, (t, _))| t.elapsed().as_millis() >= GAP_TIMEOUT_MSECS)
|
||||
.map(|(_, (_, tok))| *tok)
|
||||
.collect();
|
||||
for tok in &timed_out {
|
||||
self.peer_rtt.record_failure(tok);
|
||||
}
|
||||
|
||||
// Clean up stale requests: timed out or peer no longer active
|
||||
self.pending_requests.retain(|_index, (request_time, token)| {
|
||||
if request_time.elapsed().as_millis() >= GAP_TIMEOUT_MSECS * 2 {
|
||||
return false;
|
||||
}
|
||||
match self.peers.get_peer(token) {
|
||||
Some(peer) => peer.active(),
|
||||
None => false,
|
||||
}
|
||||
});
|
||||
|
||||
let raw_peers = self.peers.get_active_peer_tokens();
|
||||
if raw_peers.is_empty() {
|
||||
return;
|
||||
}
|
||||
let active_peers = self.peer_rtt.select_ordered(&raw_peers);
|
||||
|
||||
// Retry gap blocks with shorter timeout — push to FRONT of fastest peer's queue
|
||||
let min_future_block = self.future_blocks.keys().min().copied();
|
||||
if let Some(min_future) = min_future_block {
|
||||
// active_peers is already ranked by RTT (fastest first)
|
||||
let mut gap_peer_idx = 0;
|
||||
for block_index in (my_height + 1)..min_future {
|
||||
if self.future_blocks.contains_key(&block_index) {
|
||||
continue;
|
||||
}
|
||||
if let Some((req_time, old_token)) = self.pending_requests.get(&block_index) {
|
||||
if req_time.elapsed().as_millis() < GAP_TIMEOUT_MSECS {
|
||||
continue;
|
||||
}
|
||||
self.peer_rtt.record_failure(old_token);
|
||||
// Skip the failed peer
|
||||
if let Some(pos) = active_peers.iter().position(|t| t == old_token) {
|
||||
if pos == gap_peer_idx {
|
||||
gap_peer_idx = (gap_peer_idx + 1) % active_peers.len();
|
||||
}
|
||||
}
|
||||
debug!("Gap block {} timed out, re-requesting (have future from {})", block_index, min_future);
|
||||
}
|
||||
|
||||
// Find a peer that can accept a message (old nodes only allow 1 in flight)
|
||||
let mut sent = false;
|
||||
for _ in 0..active_peers.len() {
|
||||
let peer_token = active_peers[gap_peer_idx % active_peers.len()];
|
||||
gap_peer_idx = (gap_peer_idx + 1) % active_peers.len();
|
||||
if let Some(peer) = self.peers.get_mut_peer(&peer_token) {
|
||||
if !peer.can_send() {
|
||||
continue;
|
||||
}
|
||||
peer.queue_priority_message(Message::GetBlock { index: block_index });
|
||||
self.pending_requests.insert(block_index, (now, peer_token));
|
||||
debug!("Requesting gap block {} from {} (priority)", block_index, peer.get_addr().ip());
|
||||
sent = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if !sent {
|
||||
break; // All peers busy, try next cycle
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Kick idle peers into the pipeline (peers with no pending requests)
|
||||
for t in &active_peers {
|
||||
let has_pending = self.pending_requests.values().any(|(_, tk)| tk == t);
|
||||
if has_pending {
|
||||
continue;
|
||||
}
|
||||
if let Some(peer) = self.peers.get_peer(t) {
|
||||
if !peer.get_state().is_idle() || peer.has_queued_messages() {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if let Some(idx) = self.next_block_to_request(my_height, max_height) {
|
||||
if let Some(peer) = self.peers.get_mut_peer(t) {
|
||||
peer.queue_message(Message::GetBlock { index: idx });
|
||||
self.pending_requests.insert(idx, (now, *t));
|
||||
debug!("Kicking idle peer {} with block {}", peer.get_addr().ip(), idx);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Process validation results from worker threads and add validated blocks to chain
|
||||
fn process_validation_results(&mut self) {
|
||||
// Process all available validation results without blocking
|
||||
while let Ok(result) = self.validation_receiver.try_recv() {
|
||||
match result {
|
||||
PreValidationResult::NeedsDbValidation(token, block) => {
|
||||
// CPU-intensive validation passed, now do DB-dependent validation
|
||||
let peers_count = self.peers.get_peers_active_count();
|
||||
|
||||
// Update peer state
|
||||
if let Some(peer) = self.peers.get_mut_peer(&token) {
|
||||
peer.set_received_block(block.index);
|
||||
trace!("Validated block {} from {}", block.index, peer.get_addr());
|
||||
} else {
|
||||
// Peer disconnected, but we can still process the block
|
||||
trace!("Validated block {} from disconnected peer", block.index);
|
||||
}
|
||||
|
||||
// Lock context only for DB operations
|
||||
let mut context = self.context.lock().unwrap();
|
||||
let max_height = context.chain.get_max_height();
|
||||
|
||||
// Do remaining DB-dependent validation and add to chain
|
||||
match context.chain.check_new_block(&block) {
|
||||
BlockQuality::Good => {
|
||||
let block_index = block.index;
|
||||
let mut next_index = block.index + 1;
|
||||
context.chain.add_block(block);
|
||||
// If we have some consequent blocks in a bucket of 'future blocks', we add them
|
||||
|
||||
// Clean up pending request for this block
|
||||
self.pending_requests.remove(&block_index);
|
||||
|
||||
// Process future blocks that are now ready
|
||||
while let Some(block) = self.future_blocks.remove(&next_index) {
|
||||
if context.chain.check_new_block(&block) == BlockQuality::Good {
|
||||
debug!("Added block {} from future blocks", next_index);
|
||||
context.chain.add_block(block);
|
||||
self.pending_requests.remove(&next_index);
|
||||
} else {
|
||||
warn!("Block {} in future blocks is bad!", block.index);
|
||||
break;
|
||||
}
|
||||
next_index += 1;
|
||||
}
|
||||
|
||||
let my_height = context.chain.get_height();
|
||||
post(crate::event::Event::BlockchainChanged { index: my_height });
|
||||
// If it was the last block to sync
|
||||
if my_height == max_height {
|
||||
|
||||
// Check if sync is finished
|
||||
if my_height >= max_height {
|
||||
post(crate::event::Event::SyncFinished);
|
||||
self.future_blocks.clear();
|
||||
} else {
|
||||
let event = crate::event::Event::Syncing { have: my_height, height: max(max_height, my_height) };
|
||||
let event = crate::event::Event::Syncing {
|
||||
have: my_height,
|
||||
height: max(max_height, my_height)
|
||||
};
|
||||
post(event);
|
||||
}
|
||||
|
||||
let domains = context.chain.get_domains_count();
|
||||
let keys = context.chain.get_users_count();
|
||||
post(crate::event::Event::NetworkStatus { blocks: my_height, domains, keys, nodes: peers_count });
|
||||
post(crate::event::Event::NetworkStatus {
|
||||
blocks: my_height,
|
||||
domains,
|
||||
keys,
|
||||
nodes: peers_count
|
||||
});
|
||||
}
|
||||
BlockQuality::Twin => {
|
||||
debug!("Ignoring duplicate block {}", block.index);
|
||||
}
|
||||
BlockQuality::Twin => { debug!("Ignoring duplicate block {}", block.index); }
|
||||
BlockQuality::Future => {
|
||||
debug!("Got future block {}", block.index);
|
||||
let block_index = block.index;
|
||||
self.future_blocks.insert(block.index, block);
|
||||
// Clean up pending request since we have this block now
|
||||
self.pending_requests.remove(&block_index);
|
||||
}
|
||||
BlockQuality::Bad => {
|
||||
// TODO save bad public keys to banned table
|
||||
debug!("Ignoring bad block from {}:\n{:?}", peer.get_addr(), &block);
|
||||
debug!("Block {} failed DB validation", block.index);
|
||||
if let Some(peer) = self.peers.get_mut_peer(&token) {
|
||||
debug!("Banning peer {} for bad block", peer.get_addr());
|
||||
// Mark peer for banning
|
||||
peer.set_state(State::Banned);
|
||||
}
|
||||
let height = context.chain.get_height();
|
||||
if height + 1 == block.index {
|
||||
context.chain.update_max_height(height);
|
||||
post(crate::event::Event::SyncFinished);
|
||||
}
|
||||
return State::Banned;
|
||||
}
|
||||
BlockQuality::Rewind => {
|
||||
debug!("Got some orphan block, requesting its parent");
|
||||
return State::message(Message::GetBlock { index: block.index - 1 });
|
||||
debug!("Got orphan block {}, requesting parent", block.index);
|
||||
// Save the block so it can be processed after the rewind resolves
|
||||
let block_index = block.index;
|
||||
self.future_blocks.insert(block.index, block);
|
||||
self.pending_requests.remove(&block_index);
|
||||
if let Some(peer) = self.peers.get_mut_peer(&token) {
|
||||
peer.set_state(State::message(Message::GetBlock {
|
||||
index: block_index - 1
|
||||
}));
|
||||
}
|
||||
}
|
||||
BlockQuality::Fork => {
|
||||
debug!("Got forked block {} with hash {:?}", block.index, block.hash);
|
||||
// If we are very much behind of blockchain
|
||||
let lagged = block.index == context.chain.get_height() && block.index + LIMITED_CONFIDENCE_DEPTH <= max_height;
|
||||
let our_block = context.chain.get_block(block.index).unwrap();
|
||||
let lagged = block.index == context.chain.get_height()
|
||||
&& block.index + LIMITED_CONFIDENCE_DEPTH <= max_height;
|
||||
|
||||
if let Some(our_block) = context.chain.get_block(block.index) {
|
||||
if block.is_better_than(&our_block) || lagged {
|
||||
context.chain.replace_block(block).expect("Error replacing block with fork");
|
||||
let index = context.chain.get_height();
|
||||
post(crate::event::Event::BlockchainChanged { index });
|
||||
let fork_index = block.index;
|
||||
context.chain.replace_block(block)
|
||||
.expect("Error replacing block with fork");
|
||||
let mut next_index = fork_index + 1;
|
||||
// Process future blocks that may now be valid after fork switch
|
||||
while let Some(fb) = self.future_blocks.remove(&next_index) {
|
||||
if context.chain.check_new_block(&fb) == BlockQuality::Good {
|
||||
debug!("Added block {} from future blocks after fork", next_index);
|
||||
context.chain.add_block(fb);
|
||||
self.pending_requests.remove(&next_index);
|
||||
} else {
|
||||
debug!("Fork in not better than our block, dropping.");
|
||||
return State::message(Message::block(our_block.index, our_block.as_bytes()));
|
||||
debug!("Future block {} not good after fork", next_index);
|
||||
break;
|
||||
}
|
||||
next_index += 1;
|
||||
}
|
||||
let my_height = context.chain.get_height();
|
||||
post(crate::event::Event::BlockchainChanged { index: my_height });
|
||||
if my_height >= max_height {
|
||||
post(crate::event::Event::SyncFinished);
|
||||
self.future_blocks.clear();
|
||||
} else {
|
||||
post(crate::event::Event::Syncing {
|
||||
have: my_height,
|
||||
height: max(max_height, my_height)
|
||||
});
|
||||
}
|
||||
} else {
|
||||
debug!("Fork is not better than our block, dropping");
|
||||
if let Some(peer) = self.peers.get_mut_peer(&token) {
|
||||
peer.set_state(State::message(Message::block(
|
||||
our_block.index,
|
||||
our_block.as_bytes()
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Context lock is dropped here
|
||||
}
|
||||
PreValidationResult::Invalid(token, block) => {
|
||||
// Block failed CPU-intensive validation
|
||||
debug!("Block {} failed pre-validation (hash/signature)", block.index);
|
||||
if let Some(peer) = self.peers.get_mut_peer(&token) {
|
||||
debug!("Banning peer {} for invalid block", peer.get_addr());
|
||||
peer.set_state(State::Banned);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
State::idle()
|
||||
}
|
||||
|
||||
/// Gets new token from old token, mutating the last
|
||||
|
||||
+55
-2
@@ -1,3 +1,4 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt::Display;
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Instant;
|
||||
@@ -5,13 +6,16 @@ use std::time::Instant;
|
||||
use mio::net::TcpStream;
|
||||
|
||||
use crate::crypto::Chacha;
|
||||
use crate::p2p::message::Message;
|
||||
use crate::p2p::State;
|
||||
use crate::p2p::version::Version;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Peer {
|
||||
addr: SocketAddr,
|
||||
stream: TcpStream,
|
||||
state: State,
|
||||
outgoing: VecDeque<Vec<u8>>,
|
||||
id: String,
|
||||
height: u64,
|
||||
inbound: bool,
|
||||
@@ -21,7 +25,8 @@ pub struct Peer {
|
||||
reconnects: u32,
|
||||
received_block: u64,
|
||||
sent_height: u64,
|
||||
cipher: Option<Chacha>
|
||||
cipher: Option<Chacha>,
|
||||
version: Version,
|
||||
}
|
||||
|
||||
impl Peer {
|
||||
@@ -30,6 +35,7 @@ impl Peer {
|
||||
addr,
|
||||
stream,
|
||||
state,
|
||||
outgoing: VecDeque::new(),
|
||||
id: String::new(),
|
||||
height: 0,
|
||||
inbound,
|
||||
@@ -39,7 +45,8 @@ impl Peer {
|
||||
reconnects: 0,
|
||||
received_block: 0,
|
||||
sent_height: 0,
|
||||
cipher: None
|
||||
cipher: None,
|
||||
version: Version::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,6 +85,52 @@ impl Peer {
|
||||
self.state = state;
|
||||
}
|
||||
|
||||
/// Queue a message for sending. Does not change peer state.
|
||||
pub fn queue_message(&mut self, msg: Message) {
|
||||
let data = serde_cbor::to_vec(&msg).unwrap();
|
||||
self.outgoing.push_back(data);
|
||||
}
|
||||
|
||||
/// Queue a high-priority message at the front (e.g. gap block requests)
|
||||
pub fn queue_priority_message(&mut self, msg: Message) {
|
||||
let data = serde_cbor::to_vec(&msg).unwrap();
|
||||
self.outgoing.push_front(data);
|
||||
}
|
||||
|
||||
pub fn has_queued_messages(&self) -> bool {
|
||||
!self.outgoing.is_empty()
|
||||
}
|
||||
|
||||
pub fn queued_count(&self) -> usize {
|
||||
self.outgoing.len()
|
||||
}
|
||||
|
||||
pub fn pop_message(&mut self) -> Option<Vec<u8>> {
|
||||
self.outgoing.pop_front()
|
||||
}
|
||||
|
||||
pub fn set_version(&mut self, version: Version) {
|
||||
self.version = version;
|
||||
}
|
||||
|
||||
pub fn get_version(&self) -> &Version {
|
||||
&self.version
|
||||
}
|
||||
|
||||
/// Old nodes (< 0.8.9) overwrite outgoing messages, can only have 1 in flight
|
||||
pub fn supports_queue(&self) -> bool {
|
||||
self.version >= Version { major: 0, minor: 8, patch: 9 }
|
||||
}
|
||||
|
||||
/// Check if we can send another message to this peer
|
||||
pub fn can_send(&self) -> bool {
|
||||
if self.supports_queue() {
|
||||
true
|
||||
} else {
|
||||
self.outgoing.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_id(&self) -> &str {
|
||||
&self.id
|
||||
}
|
||||
|
||||
+13
-3
@@ -202,6 +202,13 @@ impl Peers {
|
||||
count
|
||||
}
|
||||
|
||||
pub fn get_active_peer_tokens(&self) -> Vec<Token> {
|
||||
self.peers.iter()
|
||||
.filter(|(_, peer)| peer.active())
|
||||
.map(|(token, _)| *token)
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn is_tween_connect(&self, id: &str) -> bool {
|
||||
for (_, peer) in self.peers.iter() {
|
||||
if peer.active() && peer.get_id() == id {
|
||||
@@ -255,7 +262,10 @@ impl Peers {
|
||||
let mut stale_tokens = Vec::new();
|
||||
for (token, peer) in self.peers.iter_mut() {
|
||||
if let State::Idle { from } = peer.get_state() {
|
||||
if from.elapsed().as_secs() >= PING_PERIOD + random_time {
|
||||
if peer.has_queued_messages() {
|
||||
let stream = peer.get_stream();
|
||||
registry.reregister(stream, *token, Interest::WRITABLE).unwrap();
|
||||
} else if from.elapsed().as_secs() >= PING_PERIOD + random_time {
|
||||
// Sometimes we check for new peers instead of pinging
|
||||
let message = if nodes < MAX_NODES && random::<bool>() {
|
||||
Message::GetPeers
|
||||
@@ -273,7 +283,7 @@ impl Peers {
|
||||
stale_tokens.push((token.clone(), peer.get_addr()));
|
||||
continue;
|
||||
}
|
||||
if matches!(peer.get_state(), State::Message {..}) {
|
||||
if matches!(peer.get_state(), State::Message {..}) || peer.has_queued_messages() {
|
||||
let stream = peer.get_stream();
|
||||
registry.reregister(stream, *token, Interest::WRITABLE).unwrap();
|
||||
}
|
||||
@@ -288,7 +298,7 @@ impl Peers {
|
||||
self.ignored.retain(|_addr, time| { time.elapsed().as_secs() < 600 });
|
||||
|
||||
// If someone has more blocks we sync
|
||||
if nodes >= MIN_CONNECTED_NODES_START_SYNC && height < max_height {
|
||||
if nodes >= MIN_CONNECTED_NODES_START_SYNC && height < max_height && have_blocks.is_empty() {
|
||||
// Give some opportunity to get more peers instead of requests for blocks only
|
||||
let request_blocks = nodes >= 10 || random::<bool>();
|
||||
if request_blocks {
|
||||
|
||||
@@ -0,0 +1,59 @@
|
||||
use std::fmt;
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct Version {
|
||||
pub major: u16,
|
||||
pub minor: u16,
|
||||
pub patch: u16,
|
||||
}
|
||||
|
||||
impl Version {
|
||||
pub fn parse(s: &str) -> Version {
|
||||
let parts: Vec<&str> = s.split('.').collect();
|
||||
Version {
|
||||
major: parts.get(0).and_then(|s| s.parse().ok()).unwrap_or(0),
|
||||
minor: parts.get(1).and_then(|s| s.parse().ok()).unwrap_or(0),
|
||||
patch: parts.get(2).and_then(|s| s.parse().ok()).unwrap_or(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Version {
|
||||
fn default() -> Self {
|
||||
Version { major: 0, minor: 0, patch: 0 }
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for Version {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for Version {
|
||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||
self.major.cmp(&other.major)
|
||||
.then(self.minor.cmp(&other.minor))
|
||||
.then(self.patch.cmp(&other.patch))
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Version {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}.{}.{}", self.major, self.minor, self.patch)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_version_comparison() {
|
||||
assert!(Version::parse("0.8.9") >= Version::parse("0.8.9"));
|
||||
assert!(Version::parse("0.8.10") > Version::parse("0.8.9"));
|
||||
assert!(Version::parse("0.8.8") < Version::parse("0.8.9"));
|
||||
assert!(Version::parse("0.9.0") > Version::parse("0.8.99"));
|
||||
assert!(Version::parse("1.0.0") > Version::parse("0.99.99"));
|
||||
}
|
||||
}
|
||||
+43
-8
@@ -3,6 +3,7 @@ extern crate serde;
|
||||
extern crate serde_json;
|
||||
extern crate tinyfiledialogs as tfd;
|
||||
|
||||
use std::panic::{self, AssertUnwindSafe};
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::thread;
|
||||
@@ -61,13 +62,8 @@ pub fn run_interface(context: Arc<Mutex<Context>>, miner: Arc<Mutex<Miner>>, hid
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
let icon = load_icon_from_png();
|
||||
|
||||
let tray_icon = TrayIconBuilder::new()
|
||||
.with_menu(Box::new(tray_menu))
|
||||
.with_tooltip(&title)
|
||||
.with_icon(icon)
|
||||
.with_menu_on_left_click(false)
|
||||
.build()
|
||||
.unwrap();
|
||||
let tray_icon = build_tray_icon(&title, tray_menu, icon);
|
||||
let tray_available = tray_icon.is_some();
|
||||
|
||||
let window_size = tao::dpi::LogicalSize::new(1024, 720);
|
||||
// Get primary monitor and calculate center position
|
||||
@@ -90,7 +86,7 @@ pub fn run_interface(context: Arc<Mutex<Context>>, miner: Arc<Mutex<Miner>>, hid
|
||||
.with_inner_size(window_size)
|
||||
.with_min_inner_size(tao::dpi::LogicalSize::new(773, 350))
|
||||
.with_resizable(true)
|
||||
.with_visible(!hide);
|
||||
.with_visible(!hide || !tray_available);
|
||||
|
||||
if let Some(position) = position {
|
||||
builder = builder.with_position(position);
|
||||
@@ -319,6 +315,7 @@ pub fn run_interface(context: Arc<Mutex<Context>>, miner: Arc<Mutex<Miner>>, hid
|
||||
true
|
||||
});
|
||||
|
||||
if tray_available {
|
||||
let proxy = event_loop.create_proxy();
|
||||
TrayIconEvent::set_event_handler(Some(move |event| {
|
||||
let _ = proxy.send_event(UserEvent::TrayIconEvent(event));
|
||||
@@ -328,6 +325,7 @@ pub fn run_interface(context: Arc<Mutex<Context>>, miner: Arc<Mutex<Miner>>, hid
|
||||
MenuEvent::set_event_handler(Some(move |event| {
|
||||
let _ = proxy.send_event(UserEvent::MenuEvent(event));
|
||||
}));
|
||||
}
|
||||
|
||||
let proxy = event_loop.create_proxy();
|
||||
|
||||
@@ -340,7 +338,14 @@ pub fn run_interface(context: Arc<Mutex<Context>>, miner: Arc<Mutex<Miner>>, hid
|
||||
event: WindowEvent::CloseRequested,
|
||||
..
|
||||
} => {
|
||||
if tray_available {
|
||||
window.set_visible(false);
|
||||
} else {
|
||||
info!("Interface closed, exiting");
|
||||
post(Event::ActionQuit);
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
*control_flow = ControlFlow::Exit;
|
||||
}
|
||||
}
|
||||
TaoEvent::UserEvent(user_event) => {
|
||||
let wv = webview_clone.lock().unwrap();
|
||||
@@ -364,6 +369,7 @@ pub fn run_interface(context: Arc<Mutex<Context>>, miner: Arc<Mutex<Miner>>, hid
|
||||
show_warning(&wv, &text);
|
||||
}
|
||||
UserEvent::TrayIconEvent(event) => {
|
||||
if let Some(tray_icon) = tray_icon.as_ref() {
|
||||
match event {
|
||||
TrayIconEvent::DoubleClick { button, .. } => {
|
||||
if button == tray_icon::MouseButton::Left {
|
||||
@@ -379,6 +385,7 @@ pub fn run_interface(context: Arc<Mutex<Context>>, miner: Arc<Mutex<Miner>>, hid
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
UserEvent::MenuEvent(event) => {
|
||||
if event.id == show_item.id() {
|
||||
window.set_visible(true);
|
||||
@@ -396,6 +403,34 @@ pub fn run_interface(context: Arc<Mutex<Context>>, miner: Arc<Mutex<Miner>>, hid
|
||||
});
|
||||
}
|
||||
|
||||
fn build_tray_icon(title: &str, tray_menu: Menu, icon: tray_icon::Icon) -> Option<tray_icon::TrayIcon> {
|
||||
let previous_hook = panic::take_hook();
|
||||
panic::set_hook(Box::new(|_| {}));
|
||||
|
||||
let result = panic::catch_unwind(AssertUnwindSafe(|| {
|
||||
TrayIconBuilder::new()
|
||||
.with_menu(Box::new(tray_menu))
|
||||
.with_tooltip(title)
|
||||
.with_icon(icon)
|
||||
.with_menu_on_left_click(false)
|
||||
.build()
|
||||
}));
|
||||
|
||||
panic::set_hook(previous_hook);
|
||||
|
||||
match result {
|
||||
Ok(Ok(tray_icon)) => Some(tray_icon),
|
||||
Ok(Err(error)) => {
|
||||
warn!("Tray icon is unavailable: {error}");
|
||||
None
|
||||
}
|
||||
Err(_) => {
|
||||
warn!("Tray icon is unavailable: failed to load appindicator library, continuing without tray support");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum UserEvent {
|
||||
EvalJs(String),
|
||||
|
||||
Reference in New Issue
Block a user