Implemented DNS on blockchain. Beautified a lot of code, fixed some things.

This commit is contained in:
Revertron
2021-02-19 16:41:43 +01:00
parent 4b5e5112da
commit d135204af7
24 changed files with 539 additions and 295 deletions
+11 -49
View File
@@ -39,25 +39,15 @@ impl PartialEq<RecordEntry> for RecordEntry {
}
impl Hash for RecordEntry {
fn hash<H>(&self, state: &mut H)
where
H: Hasher,
{
fn hash<H>(&self, state: &mut H) where H: Hasher {
self.record.hash(state);
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum RecordSet {
NoRecords {
qtype: QueryType,
ttl: u32,
timestamp: DateTime<Local>,
},
Records {
qtype: QueryType,
records: HashSet<RecordEntry>,
},
NoRecords { qtype: QueryType, ttl: u32, timestamp: DateTime<Local> },
Records { qtype: QueryType, records: HashSet<RecordEntry> },
}
#[derive(Clone, Debug)]
@@ -70,22 +60,13 @@ pub struct DomainEntry {
impl DomainEntry {
pub fn new(domain: String) -> DomainEntry {
DomainEntry {
domain: domain,
record_types: HashMap::new(),
hits: 0,
updates: 0,
}
DomainEntry { domain, record_types: HashMap::new(), hits: 0, updates: 0 }
}
pub fn store_nxdomain(&mut self, qtype: QueryType, ttl: u32) {
self.updates += 1;
let new_set = RecordSet::NoRecords {
qtype: qtype,
ttl: ttl,
timestamp: Local::now(),
};
let new_set = RecordSet::NoRecords { qtype, ttl, timestamp: Local::now() };
self.record_types.insert(qtype, new_set);
}
@@ -93,15 +74,9 @@ impl DomainEntry {
pub fn store_record(&mut self, rec: &DnsRecord) {
self.updates += 1;
let entry = RecordEntry {
record: rec.clone(),
timestamp: Local::now(),
};
let entry = RecordEntry { record: rec.clone(), timestamp: Local::now() };
if let Some(&mut RecordSet::Records {
ref mut records, ..
}) = self.record_types.get_mut(&rec.get_querytype())
{
if let Some(&mut RecordSet::Records { ref mut records, .. }) = self.record_types.get_mut(&rec.get_querytype()) {
if records.contains(&entry) {
records.remove(&entry);
}
@@ -113,10 +88,7 @@ impl DomainEntry {
let mut records = HashSet::new();
records.insert(entry);
let new_set = RecordSet::Records {
qtype: rec.get_querytype(),
records: records,
};
let new_set = RecordSet::Records { qtype: rec.get_querytype(), records };
self.record_types.insert(rec.get_querytype(), new_set);
}
@@ -191,9 +163,7 @@ pub struct Cache {
impl Cache {
pub fn new() -> Cache {
Cache {
domain_entries: BTreeMap::new(),
}
Cache { domain_entries: BTreeMap::new() }
}
fn get_cache_state(&mut self, qname: &str, qtype: QueryType) -> CacheState {
@@ -203,13 +173,7 @@ impl Cache {
}
}
fn fill_queryresult(
&mut self,
qname: &str,
qtype: QueryType,
result_vec: &mut Vec<DnsRecord>,
increment_stats: bool,
) {
fn fill_queryresult(&mut self,qname: &str, qtype: QueryType, result_vec: &mut Vec<DnsRecord>, increment_stats: bool) {
if let Some(domain_entry) = self.domain_entries.get_mut(qname).and_then(Arc::get_mut) {
if increment_stats {
domain_entry.hits += 1
@@ -275,9 +239,7 @@ pub struct SynchronizedCache {
impl SynchronizedCache {
pub fn new() -> SynchronizedCache {
SynchronizedCache {
cache: RwLock::new(Cache::new()),
}
SynchronizedCache { cache: RwLock::new(Cache::new()) }
}
pub fn list(&self) -> Result<Vec<Arc<DomainEntry>>> {
+14 -56
View File
@@ -32,13 +32,7 @@ pub trait DnsClient {
fn get_failed_count(&self) -> usize;
fn run(&self) -> Result<()>;
fn send_query(
&self,
qname: &str,
qtype: QueryType,
server: (&str, u16),
recursive: bool,
) -> Result<DnsPacket>;
fn send_query(&self, qname: &str, qtype: QueryType, server: (&str, u16), recursive: bool) -> Result<DnsPacket>;
}
/// The UDP client
@@ -72,6 +66,7 @@ struct PendingQuery {
}
unsafe impl Send for DnsNetworkClient {}
unsafe impl Sync for DnsNetworkClient {}
impl DnsNetworkClient {
@@ -89,13 +84,7 @@ impl DnsNetworkClient {
///
/// This is much simpler than using UDP, since the kernel will take care of
/// packet ordering, connection state, timeouts etc.
pub fn send_tcp_query(
&self,
qname: &str,
qtype: QueryType,
server: (&str, u16),
recursive: bool,
) -> Result<DnsPacket> {
pub fn send_tcp_query(&self, qname: &str, qtype: QueryType, server: (&str, u16), recursive: bool) -> Result<DnsPacket> {
let _ = self.total_sent.fetch_add(1, Ordering::Release);
// Prepare request
@@ -135,14 +124,8 @@ impl DnsNetworkClient {
/// The query is sent from the callee thread, but responses are read on a
/// worker thread, and returned to this thread through a channel. Thus this
/// method is thread safe, and can be used from any number of threads in
/// parallell.
pub fn send_udp_query(
&self,
qname: &str,
qtype: QueryType,
server: (&str, u16),
recursive: bool,
) -> Result<DnsPacket> {
/// parallel.
pub fn send_udp_query(&self, qname: &str, qtype: QueryType, server: (&str, u16), recursive: bool) -> Result<DnsPacket> {
let _ = self.total_sent.fetch_add(1, Ordering::Release);
// Prepare request
@@ -156,30 +139,20 @@ impl DnsNetworkClient {
packet.header.questions = 1;
packet.header.recursion_desired = recursive;
packet
.questions
.push(DnsQuestion::new(qname.to_string(), qtype));
packet.questions.push(DnsQuestion::new(qname.to_string(), qtype));
// Create a return channel, and add a `PendingQuery` to the list of lookups
// in progress
let (tx, rx) = channel();
{
let mut pending_queries = self
.pending_queries
.lock()
.map_err(|_| ClientError::PoisonedLock)?;
pending_queries.push(PendingQuery {
seq: packet.header.id,
timestamp: Local::now(),
tx: tx,
});
let mut pending_queries = self.pending_queries.lock().map_err(|_| ClientError::PoisonedLock)?;
pending_queries.push(PendingQuery { seq: packet.header.id, timestamp: Local::now(), tx });
}
// Send query
let mut req_buffer = BytePacketBuffer::new();
packet.write(&mut req_buffer, 512)?;
self.socket
.send_to(&req_buffer.buf[0..req_buffer.pos], server)?;
self.socket.send_to(&req_buffer.buf[0..req_buffer.pos], server)?;
// Wait for response
match rx.recv() {
@@ -231,10 +204,7 @@ impl DnsClient for DnsNetworkClient {
let packet = match DnsPacket::from_buffer(&mut res_buffer) {
Ok(packet) => packet,
Err(err) => {
println!(
"DnsNetworkClient failed to parse packet with error: {}",
err
);
println!("DnsNetworkClient failed to parse packet with error: {:?}", err);
continue;
}
};
@@ -298,13 +268,7 @@ impl DnsClient for DnsNetworkClient {
Ok(())
}
fn send_query(
&self,
qname: &str,
qtype: QueryType,
server: (&str, u16),
recursive: bool,
) -> Result<DnsPacket> {
fn send_query(&self,qname: &str, qtype: QueryType, server: (&str, u16), recursive: bool) -> Result<DnsPacket> {
let packet = self.send_udp_query(qname, qtype, server, recursive)?;
if !packet.header.truncated_message {
return Ok(packet);
@@ -317,7 +281,6 @@ impl DnsClient for DnsNetworkClient {
#[cfg(test)]
pub mod tests {
use super::*;
use crate::dns::protocol::{DnsPacket, DnsRecord, QueryType};
@@ -329,11 +292,12 @@ pub mod tests {
impl<'a> DnsStubClient {
pub fn new(callback: Box<StubCallback>) -> DnsStubClient {
DnsStubClient { callback: callback }
DnsStubClient { callback }
}
}
unsafe impl Send for DnsStubClient {}
unsafe impl Sync for DnsStubClient {}
impl DnsClient for DnsStubClient {
@@ -349,13 +313,7 @@ pub mod tests {
Ok(())
}
fn send_query(
&self,
qname: &str,
qtype: QueryType,
server: (&str, u16),
recursive: bool,
) -> Result<DnsPacket> {
fn send_query(&self,qname: &str, qtype: QueryType, server: (&str, u16), recursive: bool) -> Result<DnsPacket> {
(self.callback)(qname, qtype, server, recursive)
}
}
+6 -2
View File
@@ -10,6 +10,7 @@ use crate::dns::authority::Authority;
use crate::dns::cache::SynchronizedCache;
use crate::dns::client::{DnsClient, DnsNetworkClient};
use crate::dns::resolve::{DnsResolver, ForwardingDnsResolver, RecursiveDnsResolver};
use crate::dns::filter::DnsFilter;
#[derive(Debug, Display, From, Error)]
pub enum ContextError {
@@ -43,6 +44,7 @@ pub enum ResolveStrategy {
pub struct ServerContext {
pub authority: Authority,
pub cache: SynchronizedCache,
pub filters: Vec<Box<dyn DnsFilter + Sync + Send>>,
pub client: Box<dyn DnsClient + Sync + Send>,
pub dns_port: u16,
pub api_port: u16,
@@ -66,6 +68,7 @@ impl ServerContext {
ServerContext {
authority: Authority::new(),
cache: SynchronizedCache::new(),
filters: Vec::new(),
client: Box::new(DnsNetworkClient::new(34255)),
dns_port: 53,
api_port: 5380,
@@ -73,7 +76,7 @@ impl ServerContext {
allow_recursive: true,
enable_udp: true,
enable_tcp: true,
enable_api: true,
enable_api: false,
statistics: ServerStatistics {
tcp_query_count: AtomicUsize::new(0),
udp_query_count: AtomicUsize::new(0),
@@ -122,6 +125,7 @@ pub mod tests {
Arc::new(ServerContext {
authority: Authority::new(),
cache: SynchronizedCache::new(),
filters: Vec::new(),
client: Box::new(DnsStubClient::new(callback)),
dns_port: 53,
api_port: 5380,
@@ -129,7 +133,7 @@ pub mod tests {
allow_recursive: true,
enable_udp: true,
enable_tcp: true,
enable_api: true,
enable_api: false,
statistics: ServerStatistics {
tcp_query_count: AtomicUsize::new(0),
udp_query_count: AtomicUsize::new(0),
+16
View File
@@ -0,0 +1,16 @@
use crate::dns::protocol::{QueryType, DnsPacket};
pub trait DnsFilter {
fn lookup(&self, qname: &str, qtype: QueryType) -> Option<DnsPacket>;
}
pub struct DummyFilter {
}
#[allow(unused_variables)]
impl DnsFilter for DummyFilter {
fn lookup(&self, qname: &str, qtype: QueryType) -> Option<DnsPacket> {
None
}
}
+1
View File
@@ -22,5 +22,6 @@ pub mod context;
pub mod protocol;
pub mod resolve;
pub mod server;
pub mod filter;
mod netutil;
+25 -28
View File
@@ -188,8 +188,8 @@ impl DnsRecord {
);
Ok(DnsRecord::A {
domain: domain,
addr: addr,
domain,
addr,
ttl: TransientTtl(ttl),
})
}
@@ -210,8 +210,8 @@ impl DnsRecord {
);
Ok(DnsRecord::AAAA {
domain: domain,
addr: addr,
domain,
addr,
ttl: TransientTtl(ttl),
})
}
@@ -220,7 +220,7 @@ impl DnsRecord {
buffer.read_qname(&mut ns)?;
Ok(DnsRecord::NS {
domain: domain,
domain,
host: ns,
ttl: TransientTtl(ttl),
})
@@ -230,7 +230,7 @@ impl DnsRecord {
buffer.read_qname(&mut cname)?;
Ok(DnsRecord::CNAME {
domain: domain,
domain,
host: cname,
ttl: TransientTtl(ttl),
})
@@ -244,10 +244,10 @@ impl DnsRecord {
buffer.read_qname(&mut srv)?;
Ok(DnsRecord::SRV {
domain: domain,
priority: priority,
weight: weight,
port: port,
domain,
priority,
weight,
port,
host: srv,
ttl: TransientTtl(ttl),
})
@@ -258,8 +258,8 @@ impl DnsRecord {
buffer.read_qname(&mut mx)?;
Ok(DnsRecord::MX {
domain: domain,
priority: priority,
domain,
priority,
host: mx,
ttl: TransientTtl(ttl),
})
@@ -278,14 +278,14 @@ impl DnsRecord {
let minimum = buffer.read_u32()?;
Ok(DnsRecord::SOA {
domain: domain,
m_name: m_name,
r_name: r_name,
serial: serial,
refresh: refresh,
retry: retry,
expire: expire,
minimum: minimum,
domain,
m_name,
r_name,
serial,
refresh,
retry,
expire,
minimum,
ttl: TransientTtl(ttl),
})
}
@@ -300,7 +300,7 @@ impl DnsRecord {
buffer.step(data_len as usize)?;
Ok(DnsRecord::TXT {
domain: domain,
domain,
data: txt,
ttl: TransientTtl(ttl),
})
@@ -317,16 +317,16 @@ impl DnsRecord {
Ok(DnsRecord::OPT {
packet_len: class,
flags: ttl,
data: data,
data,
})
}
QueryType::UNKNOWN(_) => {
buffer.step(data_len as usize)?;
Ok(DnsRecord::UNKNOWN {
domain: domain,
domain,
qtype: qtype_num,
data_len: data_len,
data_len,
ttl: TransientTtl(ttl),
})
}
@@ -755,10 +755,7 @@ pub struct DnsQuestion {
impl DnsQuestion {
pub fn new(name: String, qtype: QueryType) -> DnsQuestion {
DnsQuestion {
name: name,
qtype: qtype,
}
DnsQuestion { name, qtype }
}
pub fn binary_len(&self) -> usize {
+14 -9
View File
@@ -51,6 +51,12 @@ pub trait DnsResolver {
}
}
for filter in self.get_context().filters.iter() {
if let Some(packet) = filter.lookup(qname, qtype) {
return Ok(packet);
}
}
self.perform(qname, qtype)
}
@@ -67,10 +73,7 @@ pub struct ForwardingDnsResolver {
impl ForwardingDnsResolver {
pub fn new(context: Arc<ServerContext>, server: (String, u16)) -> ForwardingDnsResolver {
ForwardingDnsResolver {
context: context,
server: server,
}
ForwardingDnsResolver { context, server }
}
}
@@ -81,10 +84,12 @@ impl DnsResolver for ForwardingDnsResolver {
fn perform(&mut self, qname: &str, qtype: QueryType) -> Result<DnsPacket> {
let &(ref host, port) = &self.server;
let result = self
.context
.client
.send_query(qname, qtype, (host.as_str(), port), true)?;
let result = match self.context.cache.lookup(qname, qtype) {
None => {
self.context.client.send_query(qname, qtype, (host.as_str(), port), true)?
}
Some(packet) => packet
};
self.context.cache.store(&result.answers)?;
@@ -101,7 +106,7 @@ pub struct RecursiveDnsResolver {
impl RecursiveDnsResolver {
pub fn new(context: Arc<ServerContext>) -> RecursiveDnsResolver {
RecursiveDnsResolver { context: context }
RecursiveDnsResolver { context }
}
}
+21 -67
View File
@@ -59,8 +59,7 @@ pub trait DnsServer {
}
/// Utility function for resolving domains referenced in for example CNAME or SRV
/// records. This usually spares the client from having to perform additional
/// lookups.
/// records. This usually spares the client from having to perform additional lookups.
fn resolve_cnames(
lookup_list: &[DnsRecord],
results: &mut Vec<DnsPacket>,
@@ -112,11 +111,7 @@ pub fn execute_query(context: Arc<ServerContext>, request: &DnsPacket) -> DnsPac
packet.questions.push(question.clone());
let mut resolver = context.create_resolver(context.clone());
let rescode = match resolver.resolve(
&question.name,
question.qtype,
request.header.recursion_desired,
) {
let rescode = match resolver.resolve(&question.name, question.qtype, request.header.recursion_desired) {
Ok(result) => {
let rescode = result.header.rescode;
@@ -128,10 +123,7 @@ pub fn execute_query(context: Arc<ServerContext>, request: &DnsPacket) -> DnsPac
rescode
}
Err(err) => {
println!(
"Failed to resolve {:?} {}: {:?}",
question.qtype, question.name, err
);
println!("Failed to resolve {:?} {}: {:?}", question.qtype, question.name, err);
ResultCode::SERVFAIL
}
};
@@ -169,10 +161,10 @@ pub struct DnsUdpServer {
impl DnsUdpServer {
pub fn new(context: Arc<ServerContext>, thread_count: usize) -> DnsUdpServer {
DnsUdpServer {
context: context,
context,
request_queue: Arc::new(Mutex::new(VecDeque::new())),
request_cond: Arc::new(Condvar::new()),
thread_count: thread_count,
thread_count,
}
}
}
@@ -180,11 +172,10 @@ impl DnsUdpServer {
impl DnsServer for DnsUdpServer {
/// Launch the server
///
/// This method takes ownership of the server, preventing the method from
/// being called multiple times.
/// This method takes ownership of the server, preventing the method from being called multiple times.
fn run_server(self) -> Result<()> {
// Bind the socket
let socket = UdpSocket::bind(("0.0.0.0", self.context.dns_port))?;
let socket = UdpSocket::bind(("[::]", self.context.dns_port))?;
// Spawn threads for handling requests
for thread_id in 0..self.thread_count {
@@ -227,8 +218,7 @@ impl DnsServer for DnsUdpServer {
}
}
// Create a response buffer, and ask the context for an appropriate
// resolver
// Create a response buffer, and ask the context for an appropriate resolver
let mut res_buffer = VectorPacketBuffer::new();
let mut packet = execute_query(context.clone(), &request);
@@ -236,14 +226,8 @@ impl DnsServer for DnsUdpServer {
// Fire off the response
let len = res_buffer.pos();
let data = return_or_report!(
res_buffer.get_range(0, len),
"Failed to get buffer data"
);
ignore_or_report!(
socket_clone.send_to(data, src),
"Failed to send response packet"
);
let data = return_or_report!(res_buffer.get_range(0, len), "Failed to get buffer data");
ignore_or_report!(socket_clone.send_to(data, src), "Failed to send response packet");
}
})?;
}
@@ -253,11 +237,7 @@ impl DnsServer for DnsUdpServer {
.name("DnsUdpServer-incoming".into())
.spawn(move || {
loop {
let _ = self
.context
.statistics
.udp_query_count
.fetch_add(1, Ordering::Release);
let _ = self.context.statistics.udp_query_count.fetch_add(1, Ordering::Release);
// Read a query packet
let mut req_buffer = BytePacketBuffer::new();
@@ -278,8 +258,7 @@ impl DnsServer for DnsUdpServer {
}
};
// Acquire lock, add request to queue, and notify waiting threads
// using the condition.
// Acquire lock, add request to queue, and notify waiting threads using the condition.
match self.request_queue.lock() {
Ok(mut queue) => {
queue.push_back((src, request));
@@ -305,17 +284,13 @@ pub struct DnsTcpServer {
impl DnsTcpServer {
pub fn new(context: Arc<ServerContext>, thread_count: usize) -> DnsTcpServer {
DnsTcpServer {
context: context,
senders: Vec::new(),
thread_count: thread_count,
}
DnsTcpServer { context, senders: Vec::new(), thread_count }
}
}
impl DnsServer for DnsTcpServer {
fn run_server(mut self) -> Result<()> {
let socket = TcpListener::bind(("0.0.0.0", self.context.dns_port))?;
let socket = TcpListener::bind(("[::]", self.context.dns_port))?;
// Spawn threads for handling requests, and create the channels
for thread_id in 0..self.thread_count {
@@ -332,48 +307,30 @@ impl DnsServer for DnsTcpServer {
Err(_) => continue,
};
let _ = context
.statistics
.tcp_query_count
.fetch_add(1, Ordering::Release);
let _ = context.statistics.tcp_query_count.fetch_add(1, Ordering::Release);
// When DNS packets are sent over TCP, they're prefixed with a two byte
// length. We don't really need to know the length in advance, so we
// just move past it and continue reading as usual
ignore_or_report!(
read_packet_length(&mut stream),
"Failed to read query packet length"
);
ignore_or_report!(read_packet_length(&mut stream), "Failed to read query packet length");
let request = {
let mut stream_buffer = StreamPacketBuffer::new(&mut stream);
return_or_report!(
DnsPacket::from_buffer(&mut stream_buffer),
"Failed to read query packet"
)
return_or_report!(DnsPacket::from_buffer(&mut stream_buffer), "Failed to read query packet")
};
let mut res_buffer = VectorPacketBuffer::new();
let mut packet = execute_query(context.clone(), &request);
ignore_or_report!(
packet.write(&mut res_buffer, 0xFFFF),
"Failed to write packet to buffer"
);
ignore_or_report!(packet.write(&mut res_buffer, 0xFFFF), "Failed to write packet to buffer");
// As is the case for incoming queries, we need to send a 2 byte length
// value before handing of the actual packet.
let len = res_buffer.pos();
ignore_or_report!(
write_packet_length(&mut stream, len),
"Failed to write packet size"
);
ignore_or_report!(write_packet_length(&mut stream, len), "Failed to write packet size");
// Now we can go ahead and write the actual packet
let data = return_or_report!(
res_buffer.get_range(0, len),
"Failed to get packet data"
);
let data = return_or_report!(res_buffer.get_range(0, len), "Failed to get packet data");
ignore_or_report!(stream.write(data), "Failed to write response packet");
@@ -399,10 +356,7 @@ impl DnsServer for DnsTcpServer {
match self.senders[thread_no].send(stream) {
Ok(_) => {}
Err(e) => {
println!(
"Failed to send TCP request for processing on thread {}: {}",
thread_no, e
);
println!("Failed to send TCP request for processing on thread {}: {}", thread_no, e);
}
}
}