Added DNS timeouts here and there.
Fixed macOS and Ubuntu pipelines.
This commit is contained in:
@@ -48,10 +48,11 @@ impl BlockchainFilter {
|
||||
let port = 10000 + (rand::random::<u16>() % 50000);
|
||||
let mut dns_client = DnsNetworkClient::new(port);
|
||||
dns_client.run().unwrap();
|
||||
let timeout = std::time::Duration::from_secs(5);
|
||||
|
||||
for server in servers {
|
||||
let addr = SocketAddr::new(server.to_owned(), 53);
|
||||
if let Ok(res) = dns_client.send_udp_query(qname, qtype, addr, false) {
|
||||
if let Ok(res) = dns_client.send_udp_query(qname, qtype, addr, false, timeout) {
|
||||
dns_client.stop();
|
||||
return Some(res);
|
||||
}
|
||||
|
||||
+25
-22
@@ -15,7 +15,7 @@ use std::sync::{Arc, Mutex};
|
||||
#[cfg(feature = "doh")]
|
||||
use std::sync::RwLock;
|
||||
use std::thread::{sleep, Builder};
|
||||
use std::time::Duration as SleepDuration;
|
||||
use std::time::Duration;
|
||||
|
||||
use chrono::*;
|
||||
use derive_more::{Display, Error, From};
|
||||
@@ -38,6 +38,8 @@ use ureq::http::Uri;
|
||||
use ureq::unversioned::resolver::{ArrayVec, ResolvedSocketAddrs, Resolver};
|
||||
use ureq::unversioned::transport::{DefaultConnector, NextTimeout};
|
||||
|
||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
#[derive(Debug, Display, From, Error)]
|
||||
pub enum ClientError {
|
||||
Protocol(crate::dns::protocol::ProtocolError),
|
||||
@@ -151,12 +153,12 @@ impl DnsNetworkClient {
|
||||
|
||||
/// Send a DNS query using UDP transport
|
||||
///
|
||||
/// This will construct a query packet, and fire it off to the specified server.
|
||||
/// This will construct a query packet and fire it off to the specified server.
|
||||
/// The query is sent from the callee thread, but responses are read on a
|
||||
/// worker thread, and returned to this thread through a channel. Thus this
|
||||
/// method is thread safe, and can be used from any number of threads in
|
||||
/// worker thread and returned to this thread through a channel. Thus, this
|
||||
/// method is thread-safe and can be used from any number of threads in
|
||||
/// parallel.
|
||||
pub fn send_udp_query<A: ToSocketAddrs>(&self, qname: &str, qtype: QueryType, server: A, recursive: bool) -> Result<DnsPacket> {
|
||||
pub fn send_udp_query<A: ToSocketAddrs>(&self, qname: &str, qtype: QueryType, server: A, recursive: bool, timeout: Duration) -> Result<DnsPacket> {
|
||||
let _ = self.total_sent.fetch_add(1, Ordering::Release);
|
||||
|
||||
// Prepare request
|
||||
@@ -172,16 +174,17 @@ impl DnsNetworkClient {
|
||||
|
||||
packet.questions.push(DnsQuestion::new(qname.to_string(), qtype));
|
||||
|
||||
// Create a return channel, and add a `PendingQuery` to the list of lookups in progress
|
||||
// Create a return channel and add a `PendingQuery` to the list of lookups in progress
|
||||
let (tx, rx) = channel();
|
||||
{
|
||||
let mut pending_queries = self.pending_queries.lock().map_err(|_| ClientError::PoisonedLock)?;
|
||||
pending_queries.push(PendingQuery { seq: packet.header.id, timestamp: Local::now(), tx });
|
||||
}
|
||||
|
||||
// Send query
|
||||
// Send a query
|
||||
let mut req_buffer = BytePacketBuffer::new();
|
||||
packet.write(&mut req_buffer, 512)?;
|
||||
let len = req_buffer.buf.len();
|
||||
packet.write(&mut req_buffer, len)?;
|
||||
let addr: SocketAddr = server.to_socket_addrs()?.next().expect("Wrong resolver address");
|
||||
match addr {
|
||||
SocketAddr::V4(addr) => {
|
||||
@@ -192,8 +195,8 @@ impl DnsNetworkClient {
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for response
|
||||
match rx.recv() {
|
||||
// Wait for response with timeout
|
||||
match rx.recv_timeout(timeout) {
|
||||
Ok(Some(qr)) => Ok(qr),
|
||||
Ok(None) => {
|
||||
let _ = self.total_failed.fetch_add(1, Ordering::Release);
|
||||
@@ -201,7 +204,7 @@ impl DnsNetworkClient {
|
||||
}
|
||||
Err(_) => {
|
||||
let _ = self.total_failed.fetch_add(1, Ordering::Release);
|
||||
Err(ClientError::LookupFailed)
|
||||
Err(ClientError::TimeOut)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -219,7 +222,7 @@ impl DnsClient for DnsNetworkClient {
|
||||
/// The run method launches a worker thread. Unless this thread is running, no
|
||||
/// responses will ever be generated, and clients will just block indefinitely.
|
||||
fn run(&self) -> Result<()> {
|
||||
let timeout = Some(std::time::Duration::from_millis(500));
|
||||
let timeout = Some(Duration::from_millis(500));
|
||||
// Start the thread for handling incoming responses
|
||||
{
|
||||
let socket_copy = self.socket_ipv4.try_clone()?;
|
||||
@@ -253,7 +256,7 @@ impl DnsClient for DnsNetworkClient {
|
||||
}
|
||||
};
|
||||
|
||||
// Acquire a lock on the pending_queries list, and search for a
|
||||
// Acquire a lock on the pending_queries list and search for a
|
||||
// matching PendingQuery to which to deliver the response.
|
||||
if let Ok(mut pending_queries) = pending_queries_lock.lock() {
|
||||
let mut matched_query = None;
|
||||
@@ -346,7 +349,6 @@ impl DnsClient for DnsNetworkClient {
|
||||
Builder::new()
|
||||
.name("DnsNetworkClient-timeout-thread".into())
|
||||
.spawn(move || {
|
||||
let timeout = Duration::seconds(5);
|
||||
loop {
|
||||
if stopped.load(Ordering::SeqCst) {
|
||||
break;
|
||||
@@ -354,7 +356,7 @@ impl DnsClient for DnsNetworkClient {
|
||||
if let Ok(mut pending_queries) = pending_queries_lock.lock() {
|
||||
let mut finished_queries = Vec::new();
|
||||
for (i, pending_query) in pending_queries.iter().enumerate() {
|
||||
let expires = pending_query.timestamp + timeout;
|
||||
let expires = pending_query.timestamp + DEFAULT_TIMEOUT;
|
||||
if expires < Local::now() {
|
||||
let _ = pending_query.tx.send(None);
|
||||
finished_queries.push(i);
|
||||
@@ -367,7 +369,7 @@ impl DnsClient for DnsNetworkClient {
|
||||
}
|
||||
}
|
||||
|
||||
sleep(SleepDuration::from_millis(100));
|
||||
sleep(Duration::from_millis(100));
|
||||
}
|
||||
})?;
|
||||
}
|
||||
@@ -380,7 +382,7 @@ impl DnsClient for DnsNetworkClient {
|
||||
}
|
||||
|
||||
fn send_query(&self, qname: &str, qtype: QueryType, server: &str, recursive: bool) -> Result<DnsPacket> {
|
||||
let packet = self.send_udp_query(qname, qtype, server, recursive)?;
|
||||
let packet = self.send_udp_query(qname, qtype, server, recursive, DEFAULT_TIMEOUT)?;
|
||||
if !packet.header.truncated_message {
|
||||
return Ok(packet);
|
||||
}
|
||||
@@ -409,7 +411,7 @@ impl HttpsDnsClient {
|
||||
|
||||
let agent_config = Agent::config_builder()
|
||||
.user_agent(&client_name)
|
||||
.timeout_global(Some(std::time::Duration::from_secs(3)))
|
||||
.timeout_global(Some(Duration::from_secs(3)))
|
||||
.max_idle_connections_per_host(4)
|
||||
.max_idle_connections(16)
|
||||
.build();
|
||||
@@ -434,13 +436,14 @@ impl BootstrapResolver {
|
||||
|
||||
impl Resolver for BootstrapResolver {
|
||||
// TODO use timeout parameter
|
||||
fn resolve(&self, uri: &Uri, _config: &Config, _timeout: NextTimeout) -> std::result::Result<ResolvedSocketAddrs, ureq::Error> {
|
||||
fn resolve(&self, uri: &Uri, _config: &Config, timeout: NextTimeout) -> std::result::Result<ResolvedSocketAddrs, ureq::Error> {
|
||||
let domain = uri.host().unwrap_or("localhost");
|
||||
let port = uri.port_u16().unwrap_or(443);
|
||||
let addr = match domain.find(':') {
|
||||
Some(index) => domain[0..index].to_string(),
|
||||
None => domain.to_string()
|
||||
};
|
||||
let timeout_duration = Duration::from_millis(timeout.after.as_millis() as u64);
|
||||
trace!("Resolving {}", addr);
|
||||
if let Some(addrs) = self.cache.write().unwrap().get(&addr) {
|
||||
trace!("Found bootstrap ip in cache");
|
||||
@@ -458,7 +461,7 @@ impl Resolver for BootstrapResolver {
|
||||
let mut result: Vec<IpAddr> = Vec::new();
|
||||
let mut results: ResolvedSocketAddrs = ArrayVec::from_fn(|_| SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0));
|
||||
for server in &self.servers {
|
||||
if let Ok(res) = dns_client.send_udp_query(&addr, QueryType::A, server, true) {
|
||||
if let Ok(res) = dns_client.send_udp_query(&addr, QueryType::A, server, true, timeout_duration) {
|
||||
for answer in &res.answers {
|
||||
if let DnsRecord::A { addr, .. } = answer {
|
||||
results.push(SocketAddr::new(IpAddr::V4(*addr), port));
|
||||
@@ -466,7 +469,7 @@ impl Resolver for BootstrapResolver {
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Ok(res) = dns_client.send_udp_query(&addr, QueryType::AAAA, server, true) {
|
||||
if let Ok(res) = dns_client.send_udp_query(&addr, QueryType::AAAA, server, true, timeout_duration) {
|
||||
for answer in &res.answers {
|
||||
if let DnsRecord::AAAA { addr, .. } = answer {
|
||||
results.push(SocketAddr::new(IpAddr::V6(*addr), port));
|
||||
@@ -613,7 +616,7 @@ pub mod tests {
|
||||
let client = DnsNetworkClient::new(31456);
|
||||
client.run().unwrap();
|
||||
|
||||
let res = client.send_udp_query("google.com", QueryType::A, ("8.8.8.8", 53), true).unwrap();
|
||||
let res = client.send_udp_query("google.com", QueryType::A, ("8.8.8.8", 53), true, DEFAULT_TIMEOUT).unwrap();
|
||||
|
||||
assert_eq!(res.questions[0].name, "google.com");
|
||||
assert!(res.answers.len() > 0);
|
||||
|
||||
Reference in New Issue
Block a user