How I Built a Real-Time DDoS Detection Engine with Python, Docker, and iptables
Have you ever wondered how websites protect themselves from attackers
who send millions of requests trying to crash the server? That is called
a DDoS attack (Distributed Denial of Service), and in this post I will
show you how I built a system that detects and blocks these attacks
automatically — in real time.
This project was built as part of the HNG DevOps Stage 3 task. We were
asked to protect a Nextcloud cloud storage platform running on Docker
from suspicious traffic — without using any existing security tools like
Fail2Ban. Everything had to be built from scratch.
What the Project Does and Why It Matters
Imagine you run a shop. Normally 10 customers walk in per minute.
Suddenly 5,000 people rush in at the same time — not to buy anything,
but just to block the door so real customers cannot enter. That is
exactly what a DDoS attack does to a web server.
Our detection engine:
- Watches every HTTP request coming into the server in real time
- Learns what "normal" traffic looks like
- Detects when traffic suddenly spikes far above normal
- Automatically blocks the attacking IP address
- Sends an alert to Slack so you know what happened
- Automatically unblocks the IP after a set time
The system runs as a background daemon meaning it runs continuously
24/7 alongside your application, always watching.
The Architecture
Here is how all the pieces fit together:
Internet Traffic
↓
Nginx (reverse proxy)
↓ writes JSON logs
Shared Docker Volume (HNG-nginx-logs)
↓ reads logs
Detection Daemon (Python)
↓ ↓ ↓
iptables Slack Dashboard
(block IP) (alert) (metrics UI)
We use Docker Compose to run everything together:
- MariaDB — database for Nextcloud
- Nextcloud — the cloud storage application
- Nginx — reverse proxy that logs all traffic in JSON format
- Detector — our Python daemon that watches the logs
How the Sliding Window Works
This is the heart of the detection system. We need to know how many
requests came from each IP address in the last 60 seconds at any
given moment.
The naive approach would be to count requests per minute. But that has
a problem — if someone sends 1000 requests at 11:59 and 1000 more at
12:00, a per-minute counter would show two separate spikes of 1000
instead of the real burst of 2000.
We solve this with a sliding window using Python's deque
(double-ended queue):
from collections import deque
import time
# One deque per IP address
ip_window = deque()
WINDOW_SECONDS = 60
def record_request(ip_window, timestamp):
now = timestamp
cutoff = now - WINDOW_SECONDS
# Step 1: Add the new request timestamp
ip_window.append(now)
# Step 2: Remove requests older than 60 seconds
# popleft() removes from the left — O(1) operation
while ip_window and ip_window[0] < cutoff:
ip_window.popleft()
# Step 3: Count = requests in last 60 seconds
current_rate = len(ip_window)
return current_rate
A deque is like a list but optimized for adding to one end and
removing from the other. This makes the eviction of old entries very
fast — O(1) — no matter how many requests are in the window.
We maintain two windows:
- One per IP address — to detect a single aggressive attacker
- One global — to detect a coordinated attack from many IPs
How the Baseline Learns from Traffic
We cannot hardcode a threshold like "100 requests per second is too
many." Maybe your server normally gets 500 per second at peak hours,
or just 5 per second at 3 AM. A hardcoded value would cause false
alarms constantly.
Instead we use a rolling baseline — the system learns what normal
looks like from recent traffic history.
Every second we record how many requests arrived:
from collections import deque
import math
# Keep 30 minutes of per-second counts
# 30 minutes × 60 seconds = 1800 slots
per_second_counts = deque(maxlen=1800)
# Every second:
per_second_counts.append(requests_this_second)
# Every 60 seconds, recalculate mean and stddev:
def recalculate_baseline(counts):
n = len(counts)
if n < 2:
return 1.0, 0.5 # floor values
mean = sum(counts) / n
variance = sum((c - mean)**2 for c in counts) / (n - 1)
stddev = math.sqrt(variance)
# Apply floors to prevent divide-by-zero
mean = max(mean, 1.0)
stddev = max(stddev, 0.5)
return mean, stddev
We also maintain per-hour slots. If the current hour has at least
10 minutes of data, we prefer its statistics over the global 30-minute
window. This means:
- At 9 AM (busy hour): baseline reflects busy traffic → higher threshold
- At 3 AM (quiet hour): baseline reflects quiet traffic → lower threshold
The system is always comparing current traffic to recent traffic,
not some fixed value set months ago.
How the Detection Logic Makes a Decision
Once we have the current rate and the baseline, we check two conditions.
Whichever fires first triggers an alert:
def check_anomaly(rate, mean, stddev):
# Condition 1: Rate multiplier
# Is traffic more than 5x the normal average?
if rate > 5.0 * mean:
return f"rate>5x_mean({rate}>{5.0 * mean:.1f})"
# Condition 2: Z-score
# How many standard deviations above normal is this?
z_score = (rate - mean) / stddev
if z_score > 3.0:
return f"zscore>3.0(z={z_score:.2f})"
return None # Normal traffic
Z-score measures how unusual something is statistically. A z-score
of 3.0 means the current rate is 3 standard deviations above the mean
— this happens by random chance less than 0.3% of the time. In other
words, it is almost certainly an attack.
We also have error surge detection. If an IP generates lots of
4xx/5xx errors (like failed login attempts), we tighten the thresholds
automatically — making the system more sensitive to that specific IP.
How iptables Blocks an IP
iptables is a firewall built into Linux. When we detect an attack from
a specific IP, we add a DROP rule that tells the Linux kernel to silently
discard all packets from that IP before they even reach Nginx:
import subprocess
def ban_ip(ip_address):
# Insert DROP rule at the TOP of the INPUT chain
# -I means insert (top priority)
# -s means source IP
# -j DROP means silently discard the packet
cmd = ["iptables", "-I", "INPUT", "-s", ip_address, "-j", "DROP"]
subprocess.run(cmd)
print(f"Banned: {ip_address}")
def unban_ip(ip_address):
# -D means delete the rule
cmd = ["iptables", "-D", "INPUT", "-s", ip_address, "-j", "DROP"]
subprocess.run(cmd)
print(f"Unbanned: {ip_address}")
The ban happens within 10 seconds of detection. The attacker's
connection is simply dropped at the operating system level — they get
no response at all, which is more efficient than sending error messages.
Auto-unban backoff schedule:
- 1st offence → banned for 10 minutes
- 2nd offence → banned for 30 minutes
- 3rd offence → banned for 2 hours
- 4th offence → permanently banned
Slack Alerts
Every significant event sends a notification to a Slack channel:
import requests
def send_slack_alert(webhook_url, ip, rate, mean, stddev, condition):
message = {
"attachments": [{
"color": "#FF3B30",
"text": (
f":rotating_light: *IP BAN TRIGGERED*\n"
f">*IP:* `{ip}`\n"
f">*Condition:* `{condition}`\n"
f">*Current rate:* {rate} req/60s\n"
f">*Baseline:* mean={mean:.2f} stddev={stddev:.2f}\n"
)
}]
}
requests.post(webhook_url, json=message)
We get alerts for:
- 🚨 IP banned (with condition, rate, baseline, duration)
- 🔓 IP unbanned (with ban history)
- ⚠️ Global traffic anomaly (when the whole site is under attack)
The Live Dashboard
We built a web dashboard using Flask that refreshes every 3 seconds
showing:
- Currently banned IPs and time remaining
- Global requests per second
- Top 10 source IPs
- CPU and memory usage
- Current baseline mean and standard deviation
- System uptime
- Baseline history graph
The Audit Log
Every ban, unban, and baseline recalculation is written to a structured
log file: [2026-04-28T20:45:26Z] BAN ip=172.19.0.1 | condition=zscore>3.0(z=4.00) | rate=3.0 | baseline=1.00±0.50 | duration=600s
[2026-04-28T20:55:26Z] UNBAN ip=172.19.0.1 | condition=auto_expire | duration_was=600s
[2026-04-28T21:00:00Z] BASELINE_RECALC ip=global | baseline=17.14±21.50 | source=global_30min(n=64)
What I Learned
Building this from scratch taught me:
Statistics matter in security — z-scores and standard deviation
are not just academic concepts. They are practical tools for anomaly
detection.Adaptive baselines beat fixed thresholds — a system that learns
is far more accurate than one with hardcoded values.Docker networking is complex — getting containers to communicate
correctly took more troubleshooting than the detection logic itself.iptables is powerful — blocking at the kernel level is the most
efficient way to stop an attack. The attacker gets no response,
wasting their resources.Always whitelist your own IP — learned this the hard way when
I locked myself out of my own server!
The Stack
- Python 3.12 — detection daemon
- Flask — dashboard web server
- Docker + Docker Compose — container orchestration
- Nginx — reverse proxy with JSON logging
- MariaDB — database for Nextcloud
- Nextcloud — the application being protected
- iptables — IP blocking at kernel level
- Slack — alert notifications
Repository
The full source code is available at:
https://github.com/Techgirli/ddos-detector
Conclusion
Building a security tool from scratch is one of the best ways to
understand how attacks work and how defenses are designed. Every piece
of this system — the sliding window, the adaptive baseline, the
statistical detection, the automatic blocking — solves a real problem
that security engineers face every day.
If you have questions or want to build something similar, drop a comment
below!