Reworked DNS-resolver.
This commit is contained in:
+13
-27
@@ -1,12 +1,11 @@
|
||||
//! UDP and TCP server implementations for DNS
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::io::Write;
|
||||
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream, UdpSocket};
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::mpsc::{channel, Sender};
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
use std::sync::Arc;
|
||||
use std::thread::Builder;
|
||||
|
||||
use derive_more::{Display, Error, From};
|
||||
@@ -162,14 +161,12 @@ pub fn execute_query(context: Arc<ServerContext>, request: &DnsPacket) -> DnsPac
|
||||
/// a new thread is spawned to service the request asynchronously.
|
||||
pub struct DnsUdpServer {
|
||||
context: Arc<ServerContext>,
|
||||
request_queue: Arc<Mutex<VecDeque<(SocketAddr, DnsPacket)>>>,
|
||||
request_cond: Arc<Condvar>,
|
||||
thread_count: usize
|
||||
}
|
||||
|
||||
impl DnsUdpServer {
|
||||
pub fn new(context: Arc<ServerContext>, thread_count: usize) -> DnsUdpServer {
|
||||
DnsUdpServer { context, request_queue: Arc::new(Mutex::new(VecDeque::new())), request_cond: Arc::new(Condvar::new()), thread_count }
|
||||
DnsUdpServer { context, thread_count }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -181,6 +178,8 @@ impl DnsServer for DnsUdpServer {
|
||||
// Bind the socket
|
||||
let socket = UdpSocket::bind(self.context.dns_listen.as_str())?;
|
||||
|
||||
let (mut sender, receiver) = spmc::channel::<(SocketAddr, DnsPacket)>();
|
||||
|
||||
// Spawn threads for handling requests
|
||||
for thread_id in 0..self.thread_count {
|
||||
let socket_clone = match socket.try_clone() {
|
||||
@@ -192,23 +191,16 @@ impl DnsServer for DnsUdpServer {
|
||||
};
|
||||
|
||||
let context = Arc::clone(&self.context);
|
||||
let request_cond = self.request_cond.clone();
|
||||
let request_queue = self.request_queue.clone();
|
||||
let receiver = receiver.clone();
|
||||
|
||||
let name = "DnsUdpServer-request-".to_string() + &thread_id.to_string();
|
||||
let _ = Builder::new().name(name).spawn(move || {
|
||||
loop {
|
||||
// Acquire lock, and wait on the condition until data is
|
||||
// available. Then proceed with popping an entry of the queue.
|
||||
let (src, request) = match request_queue
|
||||
.lock()
|
||||
.ok()
|
||||
.and_then(|x| request_cond.wait(x).ok())
|
||||
.and_then(|mut x| x.pop_front())
|
||||
{
|
||||
Some(x) => x,
|
||||
None => {
|
||||
debug!("Not expected to happen!");
|
||||
// Receive source address and packet from channel
|
||||
let (src, request) = match receiver.recv() {
|
||||
Ok((src, request)) => (src, request),
|
||||
Err(e) => {
|
||||
debug!("Not expected to happen! Error: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
@@ -278,15 +270,9 @@ impl DnsServer for DnsUdpServer {
|
||||
}
|
||||
working_ids.put(key, cur_time);
|
||||
|
||||
// Acquire lock, add request to queue, and notify waiting threads using the condition.
|
||||
match self.request_queue.lock() {
|
||||
Ok(mut queue) => {
|
||||
queue.push_back((src, request));
|
||||
self.request_cond.notify_one();
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Failed to send UDP request for processing: {}", e);
|
||||
}
|
||||
if let Err(e) = sender.send((src, request)) {
|
||||
warn!("Error sending work to DNS resolver threads! Error: {}", e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
})?;
|
||||
|
||||
Reference in New Issue
Block a user