Python for Security Automation.
Python idioms specific to security work: subprocess discipline, robust HTTP, async scanners, and the regex patterns recurring in log triage.
Subprocess discipline
- Never use
shell=Truewith user input. Shell injection is trivial. Use the list form:subprocess.run(["nmap", "-sS", target]). - Capture both streams.
capture_output=True+text=True. Tools write progress to stderr; ignoring it loses half the signal. - Encoding.
encoding="utf-8", errors="replace". Real tool output contains binary garbage; default strict mode raises on every odd byte. - Timeout.
timeout=300always. A hung subprocess in a 10-thread scanner = a wedged scanner. Catchsubprocess.TimeoutExpiredand move on. - Long output. Stream via
Popenwithstdout=subprocess.PIPEand read line-by-line.capture_outputbuffers everything in memory and OOMs on big nmap runs.
Robust HTTP
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=1, max=30),
)
async def fetch(client, url):
r = await client.get(url, timeout=15.0, follow_redirects=False)
if r.status_code == 429:
retry_after = int(r.headers.get("retry-after", "5"))
await asyncio.sleep(retry_after)
r.raise_for_status()
return r
- httpx over requests. Native async, HTTP/2, sane timeouts.
- follow_redirects=False by default. SSRF probes and login flows both need to inspect redirects, not chase them.
- Honor Retry-After. Ignoring it accelerates the rate-limit; honoring it gets through.
- Per-request timeout. Connect + read + write + pool. Tuple form:
timeout=httpx.Timeout(5.0, read=10.0).
Async scanner pattern
import asyncio
import httpx
async def scan(targets, concurrency=50):
sem = asyncio.Semaphore(concurrency)
async with httpx.AsyncClient() as client:
async def one(t):
async with sem:
try:
return await fetch(client, t)
except Exception as e:
return ("error", t, str(e))
return await asyncio.gather(*[one(t) for t in targets])
- Semaphore for concurrency cap. Without it, asyncio happily opens 10000 sockets and hangs the event loop.
- Catch exceptions per-task. Otherwise one ConnectionRefused kills the entire gather.
- Single AsyncClient. Reuse the connection pool; per-request client creation negates async benefit.
Regex — patterns and ReDoS
- Avoid nested quantifiers.
(a+)+,(a|a)*,(a*)*— all catastrophic. Linear input × exponential time. - Avoid alternation with overlapping prefixes.
(abc|ab|a)*— backtracking explodes. Anchor or factor out the prefix. - Use possessive quantifiers or atomic groups if your engine supports them —
(?>...)in PCRE prevents backtracking into the group. - Set a regex timeout. Python's
rehas no built-in timeout. Usere2for untrusted input (linear time guaranteed). - Common security regexes.
- Email loose:
r"[\w.+-]+@[\w.-]+\.[a-z]{2,}"(case-insensitive). - IPv4:
r"\b(?:\d{1,3}\.){3}\d{1,3}\b". - Bearer token:
r"[Bb]earer\s+([A-Za-z0-9_\-\.=]+)". - UUIDv4:
r"[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}".
- Email loose:
Log triage — pandas idioms
import pandas as pd
df = pd.read_json("auth.log.json", lines=True)
suspicious = (
df.assign(ts=pd.to_datetime(df.timestamp))
.query("event_type == 'login_failed'")
.groupby("source_ip")
.agg(count=("ts","size"), first=("ts","min"), last=("ts","max"))
.query("count > 50")
.sort_values("count", ascending=False)
)
Rule of thumbA security script that doesn't have timeouts on every subprocess and HTTP call will eventually hang in production. The 10 minutes spent adding timeouts pays for itself the first time the script doesn't deadlock at 3am.
Related notes in this domain
From reference to evidence