first commit

This commit is contained in:
2026-03-08 13:28:31 -04:00
commit 8a101892f2
28 changed files with 2519 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
#Password files
admin.passwd
email.json

690
README.md Normal file
View File

@@ -0,0 +1,690 @@
# portspoof-py
A Python asyncio rewrite of [portspoof](https://github.com/drk1wi/portspoof) — a TCP honeypot that makes every port on a machine appear open and running a real service.
When a port scanner connects to any TCP port, it receives a plausible fake service banner (SSH, HTTP, FTP, etc.). The banners are randomised from a signature library at startup, so the scanner cannot fingerprint the deception from banner patterns alone.
---
## Table of contents
1. [How it works](#how-it-works)
2. [Requirements](#requirements)
3. [Installation](#installation)
4. [Quick start](#quick-start)
5. [CLI reference](#cli-reference)
6. [Signature file](#signature-file)
7. [Config file](#config-file)
8. [iptables rules](#iptables-rules)
9. [JSON log](#json-log)
10. [Web admin interface](#web-admin-interface)
11. [Email alerts](#email-alerts)
12. [systemd service](#systemd-service)
13. [Recipes](#recipes)
14. [Troubleshooting](#troubleshooting)
---
## How it works
```
attacker kernel portspoof-py
│ │ │
│─── TCP SYN → port 443 ──►│ │
│ │ iptables REDIRECT │
│ │──► port 4444 ────────────►│
│◄── TCP SYN-ACK ──────────│◄──────────────────────────│
│─── data ────────────────►│──────────────────────────►│
│ │ SO_ORIGINAL_DST │
│ │ recovers port 443 │
│◄── fake HTTPS banner ────│◄──────────────────────────│
│─── FIN ─────────────────►│──────────────────────────►│
```
1. **iptables PREROUTING REDIRECT** — on startup, three rules are added to the `nat` table. All incoming TCP traffic (except port 22 and the listener port itself) is transparently redirected to the single listener port.
2. **Single asyncio listener** — one coroutine handles every connection. There is no thread pool and no polling loop.
3. **SO_ORIGINAL_DST** — the Linux kernel preserves the originally-intended destination port in the socket. The handler reads it with `getsockopt(SOL_IP, SO_ORIGINAL_DST)`.
4. **Pre-computed banner map** — at startup, each of the 65,536 ports is assigned a banner generated by processing a randomly-chosen regex pattern from the signature file. Banner generation is done once; serving is instant.
5. **Graceful shutdown** — SIGTERM/SIGINT triggers the stop event, the server drains, the log queue is flushed, and the iptables rules are removed.
---
## Requirements
| Requirement | Notes |
|---|---|
| Python 3.11+ | Uses `asyncio`, `struct`, `socket`, `pathlib`, stdlib only |
| Linux | `SO_ORIGINAL_DST` and `iptables` are Linux-specific |
| Root | Required for iptables management; `--no-iptables` bypasses this |
| iptables | Must be installed (`apt install iptables` or equivalent) |
No PyPI packages are needed at runtime.
---
## Installation
**Run directly from the source tree (recommended for a lab machine):**
```bash
git clone <repo>
cd portspoof_py
sudo python3 -m portspoof_py -s tools/portspoof_signatures
```
**Install as a package:**
```bash
pip install . # installs the portspoof-py entry point
portspoof-py --help
```
**Install build tool only if needed:**
```bash
pip install flit
flit build # produces a wheel in dist/
```
---
## Quick start
### Test mode (no iptables, no root)
Useful for verifying the installation or developing.
```bash
python3 -m portspoof_py \
--no-iptables \
-p 4444 \
-s tools/portspoof_signatures \
-v
# In another terminal — should receive a random service banner
nc localhost 4444
```
### Full honeypot mode (as root)
```bash
sudo python3 -m portspoof_py \
-p 4444 \
-s /etc/portspoof/portspoof_signatures \
-c /etc/portspoof/portspoof.conf \
-l /var/log/portspoof/portspoof.jsonl
```
After startup you can verify with:
```bash
sudo iptables -t nat -L PREROUTING --line-numbers
# Should show three portspoof rules
nmap -p 1-1000 <your-ip>
# All ports should appear open
```
Stop with Ctrl-C or `kill <pid>`. Rules are removed automatically.
### With the web admin interface
```bash
# Create a credentials file first
echo "admin:changeme" > admin.passwd
sudo python3 -m portspoof_py \
-p 4444 \
-s /etc/portspoof/portspoof_signatures \
-l /var/log/portspoof/portspoof.jsonl \
--admin-port 8080
# Open http://127.0.0.1:8080 in a browser (HTTP Basic Auth prompt will appear)
```
---
## CLI reference
```
python3 -m portspoof_py [OPTIONS]
portspoof-py [OPTIONS]
```
### Listener
| Flag | Default | Description |
|---|---|---|
| `-p PORT` / `--port PORT` | `4444` | TCP port the honeypot listens on. This port is excluded from the iptables REDIRECT rule so it does not redirect to itself. |
| `-i IP` / `--bind-ip IP` | all interfaces | Bind the listener to a specific IP address. |
### Signatures and configuration
| Flag | Default | Description |
|---|---|---|
| `-s FILE` / `--signatures FILE` | — | Path to the signature file. Each line is a regex-like pattern used to generate a banner. At startup, one pattern is chosen randomly for each of the 65,536 ports. Without this flag every port returns an empty banner (the port appears open but silent). |
| `-c FILE` / `--config FILE` | — | Path to the config file. Overrides specific ports or port ranges with a fixed payload after the signature map is built. |
### Logging
| Flag | Default | Description |
|---|---|---|
| `-l FILE` / `--log-file FILE` | — | Append JSON connection events to this file. One line per connection. |
| `-v` / `--verbose` | off | Print each connection event as JSON to stdout. |
### iptables
| Flag | Default | Description |
|---|---|---|
| `--iface IFACE` | all interfaces | Restrict the iptables REDIRECT rule to a specific network interface (e.g. `eth0`). Useful when one interface is internal and should not be intercepted. |
| `--no-iptables` | off | Skip iptables setup and teardown entirely. The listener still runs on `-p PORT`. Use for local testing or when you manage iptables externally. |
### Web admin
| Flag | Default | Description |
|---|---|---|
| `--admin-port PORT` | disabled | Start the web admin interface on this port. |
| `--admin-host HOST` | `127.0.0.1` | Address the admin interface binds to. Set to `0.0.0.0` to expose it on all interfaces (protect with a firewall). |
| `--admin-passwd FILE` | `admin.passwd` | File containing `username:password` on a single line. Required when `--admin-port` is used. |
| `--email-config FILE` | `email.json` | JSON file where email alert settings are stored. Created automatically when you first save settings from the admin UI. |
---
## Signature file
The signature file contains one pattern per line. Blank lines are ignored. The bundled file `tools/portspoof_signatures` contains 8,962 patterns derived from the nmap service detection database.
Each pattern is passed through a three-pass regex materialiser at startup to produce a concrete banner string:
**Pass 1** — group and bracket expansion
- `(...)` groups are stripped, their content kept.
- `[abc]`, `[a-z]`, `[0-9]`, `[^\0]` etc. are expanded: a random character (or random-length sequence for `*`/`+` quantifiers) is chosen from the defined set.
**Pass 2** — special sequence substitution
- `\w` → a random lowercase letter `a``z`
- `\d` → a random digit `0``9`
- `.` → a random lowercase letter
- `\n`, `\r`, `\t` → literal newline, carriage return, tab
**Pass 3** — hex escape decoding
- `\xNN` → the byte with hex value `NN`
**Example patterns and what they can produce:**
```
# Input pattern → Example output
SSH-2.0-OpenSSH_([\w._-]+)\r\n → SSH-2g0-OpenSSH_QpKz\r\n
220 ([-.\w]+) ESMTP\r\n → 220 mxprod ESMTP\r\n
HTTP/1.\d \d\d\d [\w ]+\r\n → HTTP/1.4 200 OK\r\n
\x80\x01\x00\x80 → (four literal bytes)
AMServer → AMServer
```
The file is read with latin-1 encoding so binary-like content in patterns does not cause decode errors.
---
## Config file
The config file lets you pin specific ports or port ranges to a fixed payload, overriding the random assignment from the signature file. It is applied after the signature map is built.
### Format
Lines beginning with `#` are comments and are ignored. Each active line has the form:
```
PORT "payload"
START-END "payload"
```
The payload is everything between the **first** and **last** double-quote on the line. It is processed through the same three-pass materialiser as signature patterns, so hex escapes and regex metacharacters work.
### Example
```
# Single port — port 22 returns a specific SSH banner
22 "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.6\r\n"
# Port range — ports 80 through 443 return an HTTP 200
80-443 "HTTP/1.1 200 OK\r\nServer: nginx/1.24.0\r\nContent-Length: 0\r\n\r\n"
# Hex payload — raw bytes
9200 "\x7b\x22\x6e\x61\x6d\x65\x22\x3a\x22\x65\x6c\x61\x73\x74\x69\x63\x22\x7d"
```
The bundled `tools/portspoof.conf` shows the full comment syntax and more examples.
---
## iptables rules
When running without `--no-iptables`, three rules are appended to the `nat` PREROUTING chain at startup and removed at shutdown.
### Rules added (example with `-p 4444 --admin-port 8080`)
```
iptables -t nat -A PREROUTING -p tcp --dport 22 -j RETURN
iptables -t nat -A PREROUTING -p tcp --dport 8080 -j RETURN
iptables -t nat -A PREROUTING -p tcp --dport 4444 -j RETURN
iptables -t nat -A PREROUTING -p tcp -j REDIRECT --to-port 4444
```
The exempt ports are always added **before** the REDIRECT rule in this order:
1. Port 22 (SSH) — always first, so you cannot lock yourself out.
2. `--admin-port` — exempted automatically if specified, so the admin interface remains reachable.
3. The listener port (`-p`) — prevents a redirect loop.
Without `--admin-port`, the middle rule is omitted.
With `--iface eth0`, each rule gains `--in-interface eth0`.
### Verifying the rules
```bash
sudo iptables -t nat -L PREROUTING -v --line-numbers
```
### Manual cleanup
If the process is killed with SIGKILL (skipping the shutdown handler), the systemd `ExecStopPost` directives remove the rules. You can also remove them manually:
```bash
sudo iptables -t nat -D PREROUTING -p tcp --dport 22 -j RETURN
sudo iptables -t nat -D PREROUTING -p tcp --dport 4444 -j RETURN
sudo iptables -t nat -D PREROUTING -p tcp -j REDIRECT --to-port 4444
```
### Persistence across reboots
iptables rules are not persistent by default. The systemd service re-adds them on start. If you want them to survive without the service running, use `iptables-save` / `iptables-restore` or `netfilter-persistent`.
---
## JSON log
With `-l FILE`, each accepted connection appends one JSON object followed by a newline to the log file.
### Fields
| Field | Type | Description |
|---|---|---|
| `timestamp` | string | ISO 8601 UTC timestamp with microseconds |
| `src_ip` | string | Source IP address of the scanner |
| `src_port` | integer | Source (ephemeral) port of the scanner |
| `dst_port` | integer | Port the scanner originally connected to (recovered via `SO_ORIGINAL_DST`) |
| `banner_hex` | string | Hex-encoded bytes of the banner that was sent |
| `banner_len` | integer | Length of the banner in bytes |
### Example
```json
{"timestamp": "2025-06-01T14:32:07.841203+00:00", "src_ip": "198.51.100.42", "src_port": 54312, "dst_port": 443, "banner_hex": "485454502f312e31203230300d0a", "banner_len": 14}
```
### Querying the log
```bash
# Show all source IPs sorted by frequency
jq -r .src_ip portspoof.jsonl | sort | uniq -c | sort -rn | head -20
# Show all targeted ports sorted by frequency
jq -r .dst_port portspoof.jsonl | sort | uniq -c | sort -rn | head -20
# Show connections from a specific IP
jq 'select(.src_ip == "198.51.100.42")' portspoof.jsonl
# Decode a banner from hex
jq -r .banner_hex portspoof.jsonl | head -1 | xxd -r -p
```
---
## Web admin interface
Enable with `--admin-port PORT`. The interface binds to `127.0.0.1` by default; use `--admin-host 0.0.0.0` to expose it externally (ensure it is firewalled).
### Authentication
Every request requires HTTP Basic Auth. Create a credentials file before starting:
```bash
echo "admin:changeme" > admin.passwd
```
The file must contain a single line in `username:password` format. Pass a custom path with `--admin-passwd FILE`.
### Dashboard — `GET /`
A dark-themed HTML page that auto-refreshes every 5 seconds. Sections:
- **Stat cards** — total connections, connections in the last 60 seconds, uptime, ports mapped, and last connection time.
- **Top source IPs** — the 10 most active scanner addresses since startup.
- **Top targeted ports** — the 10 most-probed ports since startup.
- **Banner lookup** — enter any port number (065535) and see the hex and text preview of the banner that port will send. The result persists across auto-refreshes when using the `?port=N` query parameter.
- **Recent connections** — the 50 most-recent connections, newest first, with timestamp, source, destination port, banner hex excerpt, and banner length.
### Banner lookup shortcut
Append `?port=N` to the dashboard URL to pre-populate the lookup:
```
http://127.0.0.1:8080/?port=443
```
### JSON API
All endpoints require Basic Auth and return `Content-Type: application/json`.
#### `GET /api/stats`
Current statistics snapshot.
```json
{
"uptime": "2h 14m 37s",
"total_connections": 18432,
"last_connection": "2025-06-01T14:32:07.841203+00:00",
"connections_per_min": 312,
"ports_mapped": 65536,
"top_ips": [
{"ip": "198.51.100.42", "count": 9821},
{"ip": "203.0.113.7", "count": 4102}
],
"top_ports": [
{"port": 80, "count": 3201},
{"port": 443, "count": 2987},
{"port": 22, "count": 2104}
]
}
```
#### `GET /api/connections?limit=N`
The N most-recent connection events, newest first. `limit` defaults to 50, maximum 500.
```json
[
{
"timestamp": "2025-06-01T14:32:07.841203+00:00",
"src_ip": "198.51.100.42",
"src_port": 54312,
"dst_port": 443,
"banner_hex": "485454502f312e31203230300d0a",
"banner_len": 14
}
]
```
#### `GET /api/banner?port=N`
The pre-computed banner for port N.
```json
{
"port": 443,
"banner_hex": "485454502f312e31203230300d0a",
"banner_len": 14,
"banner_text": "HTTP/1.1 200\r\n"
}
```
`banner_text` is the banner decoded as latin-1. Non-printable bytes will appear as replacement characters in some terminals.
#### `GET /api/email/config`
Current email alert configuration (password is masked).
---
## Email alerts
portspoof can send email digest alerts when connections are detected. Configuration is done entirely through the admin UI — no restart required.
### Setup
1. Start with `--admin-port` (the email config page is always available).
2. Open `http://127.0.0.1:8080/config` (or click **email alerts** in the dashboard header).
3. Fill in your SMTP details and click **Save**.
4. Click **Send test email** to verify delivery.
### How batching works
Rather than sending one email per connection, portspoof collects events and sends a single digest:
1. When a matching connection arrives, it is added to a pending list and a flush timer starts.
2. Any further matching connections within the **batch delay** window are added to the same list.
3. After the batch delay elapses, one email is sent summarising all collected events.
4. The **cooldown** prevents a new batch from being sent until that many seconds have passed since the last one, avoiding email floods during sustained scans.
### Configuration options
| Option | Default | Description |
|---|---|---|
| Enabled | off | Master switch — no emails are sent when disabled. |
| SMTP host | — | Hostname of your SMTP server (e.g. `smtp.gmail.com`). |
| SMTP port | `587` | Port to connect to. |
| Use STARTTLS | on | Upgrade the connection with STARTTLS. Disable for plain SMTP or SSL-only servers. |
| SMTP username | — | Login username for SMTP authentication. |
| SMTP password | — | Login password. Stored in the config file; masked in the UI. |
| From address | — | Sender address. Defaults to the SMTP username if blank. |
| To address | — | Recipient address for alert emails. |
| Trigger ports | (all) | Comma-separated list of ports that trigger alerts. Leave blank to alert on every port. |
| Batch delay | `60` s | How long to wait and collect events before sending one digest email. |
| Cooldown | `300` s | Minimum time between batch emails. |
### Config file
Settings are saved to `email.json` (or the path given by `--email-config`) as JSON. The file is created on first save. You can edit it directly; changes take effect at the next batch send.
```json
{
"enabled": true,
"smtp_host": "smtp.gmail.com",
"smtp_port": 587,
"smtp_starttls": true,
"smtp_user": "you@gmail.com",
"smtp_password": "app-password-here",
"from_addr": "",
"to_addr": "alerts@example.com",
"trigger_ports": [22, 3389],
"batch_delay_seconds": 60,
"cooldown_seconds": 300
}
```
### Example digest email
```
Subject: [portspoof] 3 connections detected
portspoof alert — 3 connections
2025-06-01 14:32:07 198.51.100.42:54312 → port 22 (14 B 485353482d322e302d4f70656e5353…)
2025-06-01 14:32:09 198.51.100.42:54398 → port 23 (6 B deadbeef0102)
2025-06-01 14:32:11 203.0.113.7:61204 → port 3389 (0 B )
```
---
## systemd service
The included `portspoof.service` file runs portspoof as a system service that starts after the network is up, restarts on failure, and removes iptables rules even if it is killed with SIGKILL.
### Installation
```bash
# Copy files
sudo cp portspoof.service /etc/systemd/system/
sudo mkdir -p /etc/portspoof /var/log/portspoof
sudo cp tools/portspoof_signatures /etc/portspoof/
sudo cp tools/portspoof.conf /etc/portspoof/
# Enable and start
sudo systemctl daemon-reload
sudo systemctl enable portspoof
sudo systemctl start portspoof
# Check status
sudo systemctl status portspoof
sudo journalctl -u portspoof -f
```
### Adding the admin interface to the service
```bash
# Create credentials file
echo "admin:changeme" | sudo tee /etc/portspoof/admin.passwd
sudo chmod 600 /etc/portspoof/admin.passwd
```
Edit `/etc/systemd/system/portspoof.service` and append the flags to `ExecStart`:
```ini
ExecStart=/usr/bin/python3 -m portspoof_py \
-s /etc/portspoof/portspoof_signatures \
-c /etc/portspoof/portspoof.conf \
-l /var/log/portspoof/portspoof.jsonl \
--admin-port 8080 \
--admin-passwd /etc/portspoof/admin.passwd \
--email-config /etc/portspoof/email.json
```
Then reload:
```bash
sudo systemctl daemon-reload
sudo systemctl restart portspoof
```
### Stopping
```bash
sudo systemctl stop portspoof
# iptables rules are removed during graceful shutdown
# ExecStopPost directives handle the SIGKILL case
```
---
## Recipes
### Honeypot on a dedicated interface only
```bash
sudo python3 -m portspoof_py \
-p 4444 \
--iface eth1 \
-s /etc/portspoof/portspoof_signatures \
-l /var/log/portspoof/portspoof.jsonl
```
Only traffic arriving on `eth1` is redirected. Traffic on other interfaces (e.g. a management interface) is unaffected.
### Monitor in real time with the admin API
```bash
# Poll stats every second
watch -n1 'curl -su admin:changeme http://127.0.0.1:8080/api/stats | python3 -m json.tool'
# Tail recent connections
watch -n2 'curl -su admin:changeme "http://127.0.0.1:8080/api/connections?limit=10" | python3 -m json.tool'
```
### Inspect what a specific port would send
```bash
curl -su admin:changeme "http://127.0.0.1:8080/api/banner?port=22" | python3 -m json.tool
# Decode banner bytes directly
curl -su admin:changeme "http://127.0.0.1:8080/api/banner?port=22" \
| python3 -c "import sys,json; print(json.load(sys.stdin)['banner_text'])"
```
### Custom banners for a port range
Create `/etc/portspoof/portspoof.conf`:
```
# Make ports 8080-8090 look like nginx
8080-8090 "HTTP/1.1 200 OK\r\nServer: nginx/1.24.0\r\nContent-Length: 612\r\n\r\n"
# Make port 3306 look like MySQL
3306 "\x4a\x00\x00\x00\x0a\x38\x2e\x30\x2e\x33\x36\x00"
```
### Analyse a scan after the fact
```bash
# Top 10 IPs
jq -r .src_ip /var/log/portspoof/portspoof.jsonl \
| sort | uniq -c | sort -rn | head -10
# All ports targeted in the last hour
jq -r 'select(.timestamp > "2025-06-01T13:00:00") | .dst_port' \
/var/log/portspoof/portspoof.jsonl | sort -n | uniq -c | sort -rn | head -20
# Check if a specific IP hit you
grep '"src_ip": "198.51.100.42"' /var/log/portspoof/portspoof.jsonl | wc -l
```
---
## Troubleshooting
### `ERROR: root required for iptables management`
Run with `sudo`, or pass `--no-iptables` if you are testing locally.
### `ERROR: iptables setup failed`
Check that iptables is installed and the `nat` table is available:
```bash
sudo iptables -t nat -L
```
On some minimal systems you may need:
```bash
sudo apt install iptables # Debian/Ubuntu
sudo modprobe ip_tables
sudo modprobe iptable_nat
```
### Banners contain `000` instead of null bytes
This is expected. The signature file uses `\0` to represent null bytes in the nmap probe format, but the banner materialiser does not interpret `\0` as a null byte — it outputs the ASCII character `0` (0x30). Use `\x00` in signature patterns or config payloads when you need an actual null byte.
### Port 22 is redirected and SSH breaks
The iptables setup adds a `RETURN` rule for port 22 before the `REDIRECT` rule, so SSH should always be exempt. If you are still locked out, verify the rule order:
```bash
sudo iptables -t nat -L PREROUTING --line-numbers
```
The `RETURN` rules for port 22 and the listener port must appear **before** the `REDIRECT` rule. If they are in the wrong order, remove all portspoof rules and restart the service.
### `SO_ORIGINAL_DST` returns the listener port instead of the original port
This happens when there is no `REDIRECT` rule in effect (e.g. `--no-iptables` mode, or the rule is missing). The server falls back to reporting the listener port as the destination. Banners are still sent; the logged `dst_port` will be the listener port rather than the port the scanner connected to.
### Admin interface is not accessible from a remote machine
By default `--admin-host` is `127.0.0.1` (localhost only). To expose it remotely:
```bash
--admin-host 0.0.0.0 --admin-port 8080
```
Then allow the port in your firewall and restrict access to trusted IPs:
```bash
sudo iptables -A INPUT -p tcp --dport 8080 -s <your-ip> -j ACCEPT
sudo iptables -A INPUT -p tcp --dport 8080 -j DROP
```
### Startup is slow (~45 seconds)
The banner map is built synchronously at startup: 65,536 ports × one signature materialisation each. This is expected and is a one-time cost. After startup, every connection is served from the pre-computed map with no per-connection processing.

31
portspoof.service Normal file
View File

@@ -0,0 +1,31 @@
[Unit]
Description=portspoof asyncio honeypot
After=network.target
[Service]
Type=simple
ExecStart=/usr/bin/python3 -m portspoof_py \
-s /etc/portspoof/portspoof_signatures \
-c /etc/portspoof/portspoof.conf \
-l /var/log/portspoof/portspoof.jsonl \
--admin-port 8080
User=root
TimeoutStopSec=30
KillMode=mixed
# Emergency cleanup if SIGKILL'd before graceful shutdown.
# Must mirror the exact rules added by add_rules() in iptables.py,
# including any --admin-port exempt rule (adjust ports if changed above).
ExecStopPost=/usr/sbin/iptables -t nat -D PREROUTING -p tcp --dport 22 -j RETURN
ExecStopPost=/usr/sbin/iptables -t nat -D PREROUTING -p tcp --dport 8080 -j RETURN
ExecStopPost=/usr/sbin/iptables -t nat -D PREROUTING -p tcp --dport 4444 -j RETURN
ExecStopPost=/usr/sbin/iptables -t nat -D PREROUTING -p tcp -j REDIRECT --to-port 4444
ExecStopPost=/usr/sbin/iptables -t nat -D OUTPUT -p tcp -d 127.0.0.0/8 --dport 22 -j RETURN
ExecStopPost=/usr/sbin/iptables -t nat -D OUTPUT -p tcp -d 127.0.0.0/8 --dport 8080 -j RETURN
ExecStopPost=/usr/sbin/iptables -t nat -D OUTPUT -p tcp -d 127.0.0.0/8 --dport 4444 -j RETURN
ExecStopPost=/usr/sbin/iptables -t nat -D OUTPUT -p tcp -d 127.0.0.0/8 -j REDIRECT --to-port 4444
Restart=on-failure
NoNewPrivileges=yes
ReadWritePaths=/var/log/portspoof
[Install]
WantedBy=multi-user.target

2
portspoof_py/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
"""portspoof_py — asyncio Python rewrite of the portspoof TCP honeypot."""
__version__ = '1.0.0'

5
portspoof_py/__main__.py Normal file
View File

@@ -0,0 +1,5 @@
"""Entry point: python -m portspoof_py"""
import sys
from .cli import main
sys.exit(main())

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

702
portspoof_py/admin.py Normal file
View File

@@ -0,0 +1,702 @@
"""
Web admin interface — zero external deps, pure asyncio HTTP/1.1.
Endpoints:
GET / HTML dashboard (auto-refreshes every 5 s)
GET /?port=N Dashboard + banner lookup result for port N
GET /api/stats JSON stats snapshot
GET /api/connections JSON recent connections (?limit=N, default 50)
GET /api/banner?port=N JSON banner for port N
"""
import asyncio
import base64
import hmac
import html
import json
from pathlib import Path
from typing import Optional, Tuple
from urllib.parse import parse_qs, urlparse
from .config import Config
from .notifier import Notifier
from .stats import Stats
# ── credentials ───────────────────────────────────────────────────────────────
def load_credentials(passwd_file: str) -> Tuple[str, str]:
"""Read 'username:password' from the first non-blank line of passwd_file."""
text = Path(passwd_file).read_text().strip()
if ':' not in text.splitlines()[0]:
raise ValueError(
f"{passwd_file}: expected 'username:password' on the first line"
)
username, _, password = text.splitlines()[0].partition(':')
if not username or not password:
raise ValueError(f"{passwd_file}: username and password must both be non-empty")
return username, password
def _check_auth(raw_request: str, creds: Tuple[str, str]) -> bool:
"""Return True if the request carries valid Basic Auth credentials."""
for line in raw_request.split('\r\n')[1:]:
if line.lower().startswith('authorization:'):
value = line.split(':', 1)[1].strip()
if not value.startswith('Basic '):
return False
try:
decoded = base64.b64decode(value[6:]).decode('utf-8')
except Exception:
return False
user, sep, passwd = decoded.partition(':')
if not sep:
return False
expected_user, expected_passwd = creds
# constant-time comparison prevents timing attacks
ok_user = hmac.compare_digest(user, expected_user)
ok_passwd = hmac.compare_digest(passwd, expected_passwd)
return ok_user and ok_passwd
return False
_UNAUTHORIZED = (
b'HTTP/1.1 401 Unauthorized\r\n'
b'WWW-Authenticate: Basic realm="portspoof admin"\r\n'
b'Content-Type: text/plain; charset=utf-8\r\n'
b'Content-Length: 12\r\n'
b'Connection: close\r\n'
b'\r\n'
b'Unauthorized'
)
# ── config page ───────────────────────────────────────────────────────────────
_CONFIG_CSS = """
* { box-sizing: border-box; margin: 0; padding: 0; }
body { font-family: 'Courier New', monospace; background: #0d1117; color: #c9d1d9;
padding: 24px; font-size: 13px; line-height: 1.6; }
a { color: #58a6ff; text-decoration: none; }
h1 { color: #58a6ff; font-size: 20px; margin-bottom: 4px; }
h2 { color: #e6edf3; font-size: 14px; margin: 24px 0 12px; }
.sub { color: #8b949e; font-size: 12px; margin-bottom: 24px; }
.card { background: #161b22; border: 1px solid #30363d; border-radius: 6px;
padding: 20px; max-width: 600px; }
.field { margin-bottom: 14px; }
label { display: block; color: #8b949e; font-size: 11px; text-transform: uppercase;
letter-spacing: 1px; margin-bottom: 4px; }
input[type=text], input[type=number], input[type=password], input[type=email] {
width: 100%; background: #0d1117; border: 1px solid #30363d; color: #c9d1d9;
padding: 7px 10px; border-radius: 4px; font-family: inherit; font-size: 13px; }
input[type=text]:focus, input[type=number]:focus,
input[type=password]:focus, input[type=email]:focus {
outline: none; border-color: #58a6ff; }
.hint { color: #8b949e; font-size: 11px; margin-top: 3px; }
.row2 { display: flex; gap: 12px; }
.row2 .field { flex: 1; }
.check-row { display: flex; align-items: center; gap: 8px; margin-bottom: 14px; }
input[type=checkbox] { width: 15px; height: 15px; accent-color: #238636; }
.actions { display: flex; gap: 10px; margin-top: 20px; flex-wrap: wrap; }
button { background: #238636; border: 1px solid #2ea043; color: #fff;
padding: 7px 16px; border-radius: 4px; cursor: pointer;
font-family: inherit; font-size: 13px; }
button:hover { background: #2ea043; }
button.secondary { background: #21262d; border-color: #30363d; color: #c9d1d9; }
button.secondary:hover { background: #30363d; }
.banner { padding: 10px 14px; border-radius: 4px; margin-bottom: 16px; font-size: 12px; }
.banner.ok { background: #0f2a1a; border: 1px solid #238636; color: #3fb950; }
.banner.err { background: #2d1212; border: 1px solid #f85149; color: #f85149; }
"""
_CONFIG_HTML = """\
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>portspoof — email alerts</title>
<style>{css}</style>
</head>
<body>
<h1>portspoof admin</h1>
<p class="sub"><a href="/">&larr; dashboard</a></p>
{banner_html}
<div class="card">
<h2>Email alerts</h2>
<form method="post" action="/api/email/config">
<div class="check-row">
<input type="checkbox" name="enabled" id="enabled" {enabled_chk}>
<label for="enabled" style="text-transform:none;letter-spacing:0;font-size:13px;">
Enable email alerts
</label>
</div>
<div class="row2">
<div class="field">
<label>SMTP host</label>
<input type="text" name="smtp_host" value="{smtp_host}" placeholder="smtp.gmail.com">
</div>
<div class="field" style="max-width:90px">
<label>Port</label>
<input type="number" name="smtp_port" value="{smtp_port}" min="1" max="65535">
</div>
</div>
<div class="check-row">
<input type="checkbox" name="smtp_starttls" id="starttls" {starttls_chk}>
<label for="starttls" style="text-transform:none;letter-spacing:0;font-size:13px;">
Use STARTTLS
</label>
</div>
<div class="field">
<label>SMTP username</label>
<input type="text" name="smtp_user" value="{smtp_user}" placeholder="user@example.com">
</div>
<div class="field">
<label>SMTP password</label>
<input type="password" name="smtp_password" placeholder="{pw_placeholder}">
<p class="hint">Leave blank to keep the existing password.</p>
</div>
<div class="row2">
<div class="field">
<label>From address</label>
<input type="text" name="from_addr" value="{from_addr}" placeholder="portspoof@example.com">
<p class="hint">Defaults to SMTP username if blank.</p>
</div>
<div class="field">
<label>To address</label>
<input type="text" name="to_addr" value="{to_addr}" placeholder="admin@example.com">
</div>
</div>
<div class="field">
<label>Trigger ports</label>
<input type="text" name="trigger_ports" value="{trigger_ports}"
placeholder="e.g. 22, 80, 443 (blank = all ports)">
<p class="hint">Comma-separated list. Leave blank to alert on every port.</p>
</div>
<div class="row2">
<div class="field">
<label>Batch delay (seconds)</label>
<input type="number" name="batch_delay_seconds" value="{batch_delay}" min="1">
<p class="hint">Wait this long to group events before sending.</p>
</div>
<div class="field">
<label>Cooldown (seconds)</label>
<input type="number" name="cooldown_seconds" value="{cooldown}" min="0">
<p class="hint">Min time between batch emails.</p>
</div>
</div>
<div class="actions">
<button type="submit">Save</button>
<button type="submit" formaction="/api/email/test" class="secondary">
Send test email
</button>
</div>
</form>
</div>
</body>
</html>
"""
def _render_config(notifier: Notifier, msg: str = '', msg_ok: bool = True) -> str:
cfg = notifier.get_config_safe()
banner_html = ''
if msg:
cls = 'ok' if msg_ok else 'err'
banner_html = f'<div class="banner {cls}">{html.escape(msg)}</div>'
ports = cfg.get('trigger_ports') or []
ports_str = ', '.join(str(p) for p in ports)
pw = cfg.get('smtp_password', '')
pw_placeholder = '(password set — leave blank to keep)' if pw else '(not set)'
return _CONFIG_HTML.format(
css=_CONFIG_CSS,
banner_html=banner_html,
enabled_chk='checked' if cfg.get('enabled') else '',
smtp_host=html.escape(cfg.get('smtp_host', '')),
smtp_port=cfg.get('smtp_port', 587),
starttls_chk='checked' if cfg.get('smtp_starttls', True) else '',
smtp_user=html.escape(cfg.get('smtp_user', '')),
pw_placeholder=html.escape(pw_placeholder),
from_addr=html.escape(cfg.get('from_addr', '')),
to_addr=html.escape(cfg.get('to_addr', '')),
trigger_ports=html.escape(ports_str),
batch_delay=cfg.get('batch_delay_seconds', 60),
cooldown=cfg.get('cooldown_seconds', 300),
)
def _parse_post_body(raw: str) -> dict:
"""Parse application/x-www-form-urlencoded body from a raw HTTP request."""
if '\r\n\r\n' not in raw:
return {}
body = raw.split('\r\n\r\n', 1)[1]
from urllib.parse import unquote_plus
result = {}
for pair in body.split('&'):
if '=' in pair:
k, _, v = pair.partition('=')
result[unquote_plus(k)] = unquote_plus(v)
return result
def _redirect(location: str) -> bytes:
loc = location.encode()
return (
b'HTTP/1.1 303 See Other\r\n'
b'Location: ' + loc + b'\r\n'
b'Content-Length: 0\r\n'
b'Connection: close\r\n'
b'\r\n'
)
# ── HTML template ─────────────────────────────────────────────────────────────
_CSS = """
* { box-sizing: border-box; margin: 0; padding: 0; }
body {
font-family: 'Courier New', monospace;
background: #0d1117; color: #c9d1d9;
padding: 24px; font-size: 13px; line-height: 1.5;
}
a { color: #58a6ff; text-decoration: none; }
h1 { color: #58a6ff; font-size: 20px; margin-bottom: 4px; }
.sub { color: #8b949e; font-size: 12px; margin-bottom: 24px; }
.row { display: flex; gap: 16px; flex-wrap: wrap; margin-bottom: 20px; }
.card {
background: #161b22; border: 1px solid #30363d;
border-radius: 6px; padding: 16px;
}
.card.stat { min-width: 155px; flex: 1; }
.card.half { flex: 1; min-width: 260px; }
.card.full { width: 100%; }
.card h3 {
color: #8b949e; font-size: 11px;
text-transform: uppercase; letter-spacing: 1px; margin-bottom: 8px;
}
.card .val { font-size: 28px; color: #58a6ff; font-weight: bold; }
.card .val.last { font-size: 13px; margin-top: 4px; }
h2 { color: #e6edf3; font-size: 14px; margin-bottom: 12px; }
table { width: 100%; border-collapse: collapse; }
th {
color: #8b949e; font-size: 11px; text-transform: uppercase;
letter-spacing: 0.5px; padding: 6px 10px; text-align: left;
border-bottom: 1px solid #21262d;
}
td { padding: 5px 10px; border-bottom: 1px solid #21262d; }
tr:last-child td { border-bottom: none; }
tr:hover td { background: #1c2128; }
.port { color: #f78166; }
.ip { color: #79c0ff; }
.ts { color: #8b949e; font-size: 11px; }
.hex { color: #a5d6ff; font-size: 11px;
max-width: 240px; overflow: hidden;
text-overflow: ellipsis; white-space: nowrap; }
.badge {
display: inline-block; background: #21262d;
border-radius: 3px; padding: 1px 7px; font-size: 11px;
margin-left: 6px; vertical-align: middle;
}
/* lookup form */
.form-row { display: flex; gap: 8px; align-items: center; margin-bottom: 10px; }
input[type=number] {
background: #0d1117; border: 1px solid #30363d;
color: #c9d1d9; padding: 6px 10px; border-radius: 4px;
width: 110px; font-family: inherit; font-size: 13px;
}
button {
background: #238636; border: 1px solid #2ea043;
color: #fff; padding: 6px 14px; border-radius: 4px;
cursor: pointer; font-family: inherit; font-size: 13px;
}
button:hover { background: #2ea043; }
.lookup-result {
background: #0d1117; border: 1px solid #30363d;
border-radius: 4px; padding: 10px; margin-top: 8px;
}
.lookup-result .lbl { color: #8b949e; font-size: 11px; margin-bottom: 2px; }
.lookup-result .hexval { color: #a5d6ff; word-break: break-all; }
.lookup-result .txtval {
color: #c9d1d9; white-space: pre; overflow-x: auto;
max-height: 120px; font-size: 12px; margin-top: 6px;
border-top: 1px solid #21262d; padding-top: 6px;
}
.empty { color: #8b949e; font-style: italic; padding: 8px 0; }
"""
_HTML = """\
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="refresh" content="5">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>portspoof admin</title>
<style>{css}</style>
</head>
<body>
<h1>portspoof admin</h1>
<p class="sub">
Auto-refreshes every 5 s &middot;
<a href="/">reset lookup</a> &middot;
<a href="/config">email alerts</a> &middot;
<a href="/api/stats" target="_blank">JSON stats</a>
</p>
<!-- stat cards -->
<div class="row">
<div class="card stat"><h3>Total connections</h3><div class="val">{total}</div></div>
<div class="card stat"><h3>Connections / min</h3><div class="val">{cpm}</div></div>
<div class="card stat"><h3>Uptime</h3><div class="val">{uptime}</div></div>
<div class="card stat"><h3>Ports mapped</h3><div class="val">{ports}</div></div>
<div class="card stat"><h3>Last connection</h3><div class="val last">{last_seen}</div></div>
</div>
<!-- top ips / top ports / banner lookup -->
<div class="row">
<div class="card half">
<h2>Top source IPs</h2>
<table>
<tr><th>IP</th><th>Hits</th></tr>
{top_ips_rows}
</table>
</div>
<div class="card half">
<h2>Top targeted ports</h2>
<table>
<tr><th>Port</th><th>Hits</th></tr>
{top_ports_rows}
</table>
</div>
<div class="card half">
<h2>Banner lookup</h2>
<form method="get" action="/">
<div class="form-row">
<input type="number" name="port" min="0" max="65535"
placeholder="0&ndash;65535" value="{port_q}" required>
<button type="submit">Look up</button>
</div>
</form>
{lookup_html}
</div>
</div>
<!-- recent connections -->
<div class="card full">
<h2>Recent connections <span class="badge">{recent_count} shown</span></h2>
<table>
<tr><th>Timestamp (UTC)</th><th>Source IP</th><th>Src&nbsp;port</th>
<th>Dst&nbsp;port</th><th>Banner (hex)</th><th>Bytes</th></tr>
{conn_rows}
</table>
</div>
</body>
</html>
"""
def _empty_row(cols: int, msg: str) -> str:
return f'<tr><td colspan="{cols}" class="empty">{msg}</td></tr>'
def _render_dashboard(
stats: Stats,
cfg: Config,
port_q: Optional[str],
) -> str:
# ── top IPs ──
top_ips = stats.top_ips()
if top_ips:
ip_rows = ''.join(
f'<tr><td class="ip">{html.escape(ip)}</td><td>{c}</td></tr>'
for ip, c in top_ips
)
else:
ip_rows = _empty_row(2, 'no data yet')
# ── top ports ──
top_ports = stats.top_ports()
if top_ports:
port_rows = ''.join(
f'<tr><td class="port">{p}</td><td>{c}</td></tr>'
for p, c in top_ports
)
else:
port_rows = _empty_row(2, 'no data yet')
# ── banner lookup ──
lookup_html = ''
if port_q is not None:
try:
port_num = int(port_q)
if not 0 <= port_num <= 65535:
raise ValueError
banner = cfg.get_banner(port_num)
txt_preview = banner.decode('latin-1', errors='replace')
txt_safe = html.escape(txt_preview)
hex_safe = html.escape(banner.hex())
lookup_html = f"""
<div class="lookup-result">
<div class="lbl">Port {port_num} &mdash; {len(banner)} bytes</div>
<div class="hexval">{hex_safe}</div>
<div class="txtval">{txt_safe}</div>
</div>"""
except (ValueError, TypeError):
lookup_html = '<div class="lookup-result"><div class="lbl">Invalid port number.</div></div>'
# ── recent connections ──
recent = stats.recent_connections(50)
if recent:
conn_rows = ''.join(
'<tr>'
f'<td class="ts">{html.escape(e["timestamp"][:19].replace("T", " "))}</td>'
f'<td class="ip">{html.escape(e["src_ip"])}</td>'
f'<td>{e["src_port"]}</td>'
f'<td class="port">{e["dst_port"]}</td>'
f'<td class="hex" title="{html.escape(e["banner_hex"])}">{html.escape(e["banner_hex"][:48])}{"" if len(e["banner_hex"]) > 48 else ""}</td>'
f'<td>{e["banner_len"]}</td>'
'</tr>'
for e in recent
)
else:
conn_rows = _empty_row(6, 'no connections yet')
last = stats.last_connection
last_seen = last[:19].replace('T', ' ') + ' UTC' if last else ''
return _HTML.format(
css=_CSS,
total=stats.total,
cpm=stats.connections_per_minute(),
uptime=stats.uptime_str(),
ports=len(cfg.port_map),
last_seen=html.escape(last_seen),
top_ips_rows=ip_rows,
top_ports_rows=port_rows,
port_q=html.escape(str(port_q)) if port_q is not None else '',
lookup_html=lookup_html,
recent_count=len(recent),
conn_rows=conn_rows,
)
# ── HTTP server ───────────────────────────────────────────────────────────────
async def _handle(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
stats: Stats,
cfg: Config,
creds: Tuple[str, str],
notifier: Optional['Notifier'] = None,
) -> None:
try:
raw = await asyncio.wait_for(reader.read(16384), timeout=10.0)
text = raw.decode('utf-8', errors='replace')
request_line = text.split('\r\n')[0]
parts = request_line.split(' ')
if len(parts) < 2:
return
if not _check_auth(text, creds):
writer.write(_UNAUTHORIZED)
await writer.drain()
return
method = parts[0].upper()
parsed = urlparse(parts[1])
path = parsed.path.rstrip('/') or '/'
qs = parse_qs(parsed.query)
status = '200 OK'
ct = 'text/plain; charset=utf-8'
# ── email config routes (require notifier) ────────────────────────────
if path == '/config':
if notifier is None:
body = 'Email alerts not configured (start with --email-config FILE).'
status = '404 Not Found'
else:
msg = ''
msg_ok = True
if 'saved' in qs:
msg = 'Settings saved.'
elif 'test_ok' in qs:
msg = 'Test email sent successfully.'
elif 'test_err' in qs:
msg = f'Test failed: {qs["test_err"][0]}'
msg_ok = False
body = _render_config(notifier, msg, msg_ok)
ct = 'text/html; charset=utf-8'
elif path == '/api/email/config' and method == 'POST':
if notifier is None:
body = json.dumps({'error': 'notifier not enabled'})
ct = 'application/json'
status = '404 Not Found'
else:
form = _parse_post_body(text)
new_cfg = {
'enabled': 'enabled' in form,
'smtp_host': form.get('smtp_host', ''),
'smtp_port': int(form.get('smtp_port', 587) or 587),
'smtp_starttls': 'smtp_starttls' in form,
'smtp_user': form.get('smtp_user', ''),
'smtp_password': form.get('smtp_password', ''),
'from_addr': form.get('from_addr', ''),
'to_addr': form.get('to_addr', ''),
'trigger_ports': [
int(p.strip()) for p in form.get('trigger_ports', '').split(',')
if p.strip().isdigit()
],
'batch_delay_seconds': int(form.get('batch_delay_seconds', 60) or 60),
'cooldown_seconds': int(form.get('cooldown_seconds', 300) or 300),
}
notifier.update_config(new_cfg)
writer.write(_redirect('/config?saved=1'))
await writer.drain()
return
elif path == '/api/email/test' and method == 'POST':
if notifier is None:
body = json.dumps({'error': 'notifier not enabled'})
ct = 'application/json'
status = '404 Not Found'
else:
# Save form data first (same as /api/email/config POST)
form = _parse_post_body(text)
new_cfg = {
'enabled': 'enabled' in form,
'smtp_host': form.get('smtp_host', ''),
'smtp_port': int(form.get('smtp_port', 587) or 587),
'smtp_starttls': 'smtp_starttls' in form,
'smtp_user': form.get('smtp_user', ''),
'smtp_password': form.get('smtp_password', ''),
'from_addr': form.get('from_addr', ''),
'to_addr': form.get('to_addr', ''),
'trigger_ports': [
int(p.strip()) for p in form.get('trigger_ports', '').split(',')
if p.strip().isdigit()
],
'batch_delay_seconds': int(form.get('batch_delay_seconds', 60) or 60),
'cooldown_seconds': int(form.get('cooldown_seconds', 300) or 300),
}
notifier.update_config(new_cfg)
err = await asyncio.to_thread(notifier.send_test)
if err:
from urllib.parse import quote
writer.write(_redirect(f'/config?test_err={quote(err)}'))
else:
writer.write(_redirect('/config?test_ok=1'))
await writer.drain()
return
elif path == '/api/email/config' and method == 'GET':
if notifier is None:
body = json.dumps({'error': 'notifier not enabled'})
ct = 'application/json'
status = '404 Not Found'
else:
body = json.dumps(notifier.get_config_safe(), indent=2)
ct = 'application/json'
# ── standard routes ───────────────────────────────────────────────────
elif path == '/':
port_q = qs.get('port', [None])[0]
body = _render_dashboard(stats, cfg, port_q)
ct = 'text/html; charset=utf-8'
elif path == '/api/stats':
body = json.dumps(stats.as_dict(len(cfg.port_map)), indent=2)
ct = 'application/json'
elif path == '/api/connections':
limit = min(int(qs.get('limit', ['50'])[0]), 500)
body = json.dumps(stats.recent_connections(limit))
ct = 'application/json'
elif path == '/api/banner':
try:
port_num = int(qs.get('port', ['0'])[0])
if not 0 <= port_num <= 65535:
raise ValueError
banner = cfg.get_banner(port_num)
body = json.dumps({
'port': port_num,
'banner_hex': banner.hex(),
'banner_len': len(banner),
'banner_text': banner.decode('latin-1', errors='replace'),
})
ct = 'application/json'
except (ValueError, TypeError):
body = json.dumps({'error': 'invalid port'})
ct = 'application/json'
status = '400 Bad Request'
else:
body = 'Not Found'
status = '404 Not Found'
body_bytes = body.encode('utf-8')
header = (
f'HTTP/1.1 {status}\r\n'
f'Content-Type: {ct}\r\n'
f'Content-Length: {len(body_bytes)}\r\n'
f'Connection: close\r\n'
f'\r\n'
).encode()
writer.write(header + body_bytes)
await writer.drain()
except (asyncio.TimeoutError, ConnectionResetError, BrokenPipeError):
pass
except Exception:
pass
finally:
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
async def run_admin(
stats: Stats,
cfg: Config,
host: Optional[str],
port: int,
creds: Tuple[str, str],
stop_event: asyncio.Event,
notifier: Optional['Notifier'] = None,
) -> None:
"""Start the admin HTTP server and run until stop_event is set."""
def handler(r, w):
asyncio.create_task(_handle(r, w, stats, cfg, creds, notifier))
server = await asyncio.start_server(
handler,
host=host or '127.0.0.1',
port=port,
reuse_address=True,
)
addrs = ', '.join(str(s.getsockname()) for s in server.sockets)
print(f'[admin] web interface → http://{addrs}')
async with server:
await stop_event.wait()
print('[admin] stopped')

146
portspoof_py/cli.py Normal file
View File

@@ -0,0 +1,146 @@
"""
CLI entry point — argument parsing, signal handling, startup/shutdown.
"""
import argparse
import asyncio
import signal
import sys
from typing import Optional
from .admin import load_credentials, run_admin
from .config import build_port_map
from .iptables import add_rules, check_root, remove_rules
from .logger import Logger
from .notifier import Notifier
from .server import run_server
from .stats import Stats
def _parse_args(argv=None):
p = argparse.ArgumentParser(
prog='portspoof-py',
description='Python asyncio portspoof honeypot — emulates services on all TCP ports.',
)
p.add_argument('-p', '--port', type=int, default=4444,
help='Port to listen on (default: 4444)')
p.add_argument('-i', '--bind-ip', default='',
help='IP address to bind to (default: all interfaces)')
p.add_argument('-s', '--signatures', metavar='FILE',
help='Portspoof signature file (regex patterns)')
p.add_argument('-c', '--config', metavar='FILE',
help='Portspoof config file (port→payload overrides)')
p.add_argument('-l', '--log-file', metavar='FILE',
help='JSON log output file')
p.add_argument('--iface', metavar='IFACE',
help='Network interface for iptables rules (e.g. eth0)')
p.add_argument('--no-iptables', action='store_true',
help='Skip iptables rule setup/teardown')
p.add_argument('-v', '--verbose', action='store_true',
help='Print each connection to stdout')
p.add_argument('--admin-port', type=int, default=None, metavar='PORT',
help='Enable web admin interface on this port (default: disabled)')
p.add_argument('--admin-host', default='127.0.0.1', metavar='HOST',
help='Admin interface bind address (default: 127.0.0.1)')
p.add_argument('--admin-passwd', default='admin.passwd', metavar='FILE',
help='File containing "username:password" for the admin interface '
'(default: admin.passwd)')
p.add_argument('--email-config', default='email.json', metavar='FILE',
help='JSON file for email alert config (default: email.json)')
return p.parse_args(argv)
def main(argv=None) -> int:
args = _parse_args(argv)
if not args.no_iptables and not check_root():
print("ERROR: root required for iptables management. Use --no-iptables or run as root.",
file=sys.stderr)
return 1
# Build port map synchronously (CPU-bound, ~65 k iterations)
print("[portspoof] building port→banner map …", flush=True)
try:
cfg = build_port_map(args.signatures, args.config)
except Exception as exc:
print(f"ERROR: {exc}", file=sys.stderr)
return 1
print(f"[portspoof] port map ready ({len(cfg.port_map)} entries)", flush=True)
# iptables setup
exempt = [args.admin_port] if args.admin_port else []
if not args.no_iptables:
exempt_desc = ', '.join(str(p) for p in [22] + exempt + [args.port])
iface_desc = args.iface or 'all'
print(f"[portspoof] adding iptables rules (listener={args.port}, exempt={exempt_desc}, iface={iface_desc})")
try:
add_rules(args.port, args.iface, exempt)
except Exception as exc:
print(f"ERROR: iptables setup failed: {exc}", file=sys.stderr)
return 1
# Load admin credentials before starting (fail fast if file is missing/bad)
creds = None
if args.admin_port:
try:
creds = load_credentials(args.admin_passwd)
print(f"[admin] credentials loaded from {args.admin_passwd}")
except (FileNotFoundError, ValueError) as exc:
print(f"ERROR: {exc}", file=sys.stderr)
print(
f"Create {args.admin_passwd!r} with a single line: username:password",
file=sys.stderr,
)
return 1
notifier: Notifier = Notifier(args.email_config)
print(f'[portspoof] email config: {args.email_config}')
stats = Stats()
log = Logger(args.log_file, verbose=args.verbose, stats=stats, notifier=notifier)
async def _run() -> None:
stop_event = asyncio.Event()
loop = asyncio.get_running_loop()
def _signal_handler():
if not stop_event.is_set():
print("\n[portspoof] shutdown signal received")
stop_event.set()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, _signal_handler)
await log.start()
tasks = []
try:
bind_ip: Optional[str] = args.bind_ip or None
tasks.append(asyncio.create_task(
run_server(cfg, log, bind_ip, args.port, args.verbose, stop_event)
))
if args.admin_port:
tasks.append(asyncio.create_task(
run_admin(stats, cfg, args.admin_host, args.admin_port, creds, stop_event,
notifier=notifier)
))
await asyncio.gather(*tasks)
finally:
await log.stop()
rc = 0
try:
asyncio.run(_run())
except KeyboardInterrupt:
pass
except Exception as exc:
print(f"ERROR: {exc}", file=sys.stderr)
rc = 1
finally:
if not args.no_iptables:
print("[portspoof] removing iptables rules …")
try:
remove_rules(args.port, args.iface, exempt)
except Exception as exc:
print(f"WARNING: iptables cleanup error: {exc}", file=sys.stderr)
return rc

74
portspoof_py/config.py Normal file
View File

@@ -0,0 +1,74 @@
"""
Signature / config file loading and port-banner mapping.
build_port_map() is intentionally synchronous (CPU-bound, ~65 k iterations)
and must be called before asyncio.run().
"""
import random
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, Optional
from .revregex import process_signature
_RANGE_RE = re.compile(r'^(\d+)(?:-(\d+))?\s+"(.*)"')
def _extract_payload(line: str) -> str:
"""Extract payload between first and last '"' — mirrors C++ get_substring_value."""
first = line.index('"') + 1
last = line.rindex('"')
return line[first:last]
@dataclass
class Config:
port_map: Dict[int, bytes] = field(default_factory=dict)
def get_banner(self, port: int) -> bytes:
return self.port_map.get(port % 65536, b'')
def build_port_map(
sig_file: Optional[str],
conf_file: Optional[str],
) -> Config:
"""Build the port→banner map synchronously.
1. If sig_file given, load signatures and assign one randomly to each port.
2. If conf_file given, override specific ports/ranges.
"""
cfg = Config()
# ── Step 1: signature file ───────────────────────────────────────────────
if sig_file:
sigs = Path(sig_file).read_text('latin-1').splitlines()
sigs = [s for s in sigs if s] # drop blank lines
if not sigs:
raise ValueError(f"Signature file {sig_file!r} is empty")
for p in range(65536):
cfg.port_map[p] = process_signature(random.choice(sigs))
else:
# No sig file: every port gets an empty banner (open-port mode)
for p in range(65536):
cfg.port_map[p] = b''
# ── Step 2: config file overrides ───────────────────────────────────────
if conf_file:
text = Path(conf_file).read_text('latin-1')
for line in text.splitlines():
line = line.strip()
if not line or line.startswith('#'):
continue
m = _RANGE_RE.match(line)
if not m:
continue
lo = int(m.group(1))
hi = int(m.group(2)) if m.group(2) else lo
raw = _extract_payload(line)
banner = process_signature(raw)
for p in range(lo, hi + 1):
cfg.port_map[p] = banner
return cfg

102
portspoof_py/iptables.py Normal file
View File

@@ -0,0 +1,102 @@
"""
iptables PREROUTING + OUTPUT rule management.
Two chains are managed in parallel:
PREROUTING — redirects traffic arriving from other machines (external scanners).
OUTPUT — redirects traffic from the local machine to 127.0.0.0/8, so that
local tools (nmap localhost, nc localhost PORT) also hit the honeypot.
add_rules() — idempotent setup (append)
remove_rules() — teardown (delete, check=False so missing rules don't raise)
"""
import os
import shutil
import subprocess
from typing import Optional
def _ipt() -> str:
return shutil.which('iptables') or '/usr/sbin/iptables'
def _run(args: list, check: bool = True) -> None:
subprocess.run(args, check=check, capture_output=True)
def check_root() -> bool:
return os.geteuid() == 0
def _exempt_list(listen_port: int, exempt_ports: Optional[list]) -> list:
"""Return deduped ordered list: [22, *extras, listen_port]."""
seen: set = set()
result = []
for p in [22] + (exempt_ports or []) + [listen_port]:
if p not in seen:
seen.add(p)
result.append(p)
return result
def add_rules(
listen_port: int,
iface: Optional[str] = None,
exempt_ports: Optional[list] = None,
) -> None:
"""Insert PREROUTING and OUTPUT NAT rules for portspoof.
PREROUTING catches external traffic (remote scanners).
OUTPUT catches loopback traffic (local testing with nmap/nc localhost).
"""
ipt = _ipt()
iface_args = ['--in-interface', iface] if iface else []
ports = _exempt_list(listen_port, exempt_ports)
# ── PREROUTING: external traffic ─────────────────────────────────────────
for port in ports:
_run([ipt, '-t', 'nat', '-A', 'PREROUTING', '-p', 'tcp']
+ iface_args + ['--dport', str(port), '-j', 'RETURN'])
_run([ipt, '-t', 'nat', '-A', 'PREROUTING', '-p', 'tcp']
+ iface_args + ['-j', 'REDIRECT', '--to-port', str(listen_port)])
# ── OUTPUT: loopback traffic (127.0.0.0/8) ───────────────────────────────
# No --in-interface here; OUTPUT applies to locally-generated packets.
for port in ports:
_run([ipt, '-t', 'nat', '-A', 'OUTPUT', '-p', 'tcp',
'-d', '127.0.0.0/8', '--dport', str(port), '-j', 'RETURN'])
_run([ipt, '-t', 'nat', '-A', 'OUTPUT', '-p', 'tcp',
'-d', '127.0.0.0/8', '-j', 'REDIRECT', '--to-port', str(listen_port)])
def remove_rules(
listen_port: int,
iface: Optional[str] = None,
exempt_ports: Optional[list] = None,
) -> None:
"""Remove both PREROUTING and OUTPUT rules (silent if already gone)."""
ipt = _ipt()
iface_args = ['--in-interface', iface] if iface else []
ports = _exempt_list(listen_port, exempt_ports)
# ── PREROUTING ────────────────────────────────────────────────────────────
for port in ports:
_run([ipt, '-t', 'nat', '-D', 'PREROUTING', '-p', 'tcp']
+ iface_args + ['--dport', str(port), '-j', 'RETURN'],
check=False)
_run([ipt, '-t', 'nat', '-D', 'PREROUTING', '-p', 'tcp']
+ iface_args + ['-j', 'REDIRECT', '--to-port', str(listen_port)],
check=False)
# ── OUTPUT ────────────────────────────────────────────────────────────────
for port in ports:
_run([ipt, '-t', 'nat', '-D', 'OUTPUT', '-p', 'tcp',
'-d', '127.0.0.0/8', '--dport', str(port), '-j', 'RETURN'],
check=False)
_run([ipt, '-t', 'nat', '-D', 'OUTPUT', '-p', 'tcp',
'-d', '127.0.0.0/8', '-j', 'REDIRECT', '--to-port', str(listen_port)],
check=False)

90
portspoof_py/logger.py Normal file
View File

@@ -0,0 +1,90 @@
"""
Async JSON log writer.
log_connection() is non-blocking (puts event on a queue).
The _writer() coroutine drains the queue and writes to disk.
"""
import asyncio
import json
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from .notifier import Notifier
from .stats import Stats
class Logger:
def __init__(
self,
log_file: Optional[str],
verbose: bool = False,
stats: Optional['Stats'] = None,
notifier: Optional['Notifier'] = None,
):
self._log_file = log_file
self._verbose = verbose
self._stats = stats
self._notifier = notifier
self._queue: asyncio.Queue = asyncio.Queue()
self._task: Optional[asyncio.Task] = None
self._fh = None
async def start(self) -> None:
if self._log_file:
Path(self._log_file).parent.mkdir(parents=True, exist_ok=True)
self._fh = open(self._log_file, 'a', buffering=1) # line-buffered
self._task = asyncio.create_task(self._writer())
async def _writer(self) -> None:
while True:
event = await self._queue.get()
line = json.dumps(event)
try:
if self._stats:
self._stats.record(event)
if self._notifier:
self._notifier.notify(event)
if self._fh:
self._fh.write(line + '\n')
if self._verbose:
print(line, flush=True)
except Exception as exc:
print(f"[logger] write error: {exc}", file=sys.stderr)
finally:
self._queue.task_done()
def log_connection(
self,
src_ip: str,
src_port: int,
dst_port: int,
banner: bytes,
) -> None:
"""Non-blocking: enqueue a JSON log event."""
event = {
'timestamp': datetime.now(timezone.utc).isoformat(),
'src_ip': src_ip,
'src_port': src_port,
'dst_port': dst_port,
'banner_hex': banner.hex(),
'banner_len': len(banner),
}
self._queue.put_nowait(event)
async def stop(self) -> None:
"""Drain the queue then cancel the writer task."""
try:
await asyncio.wait_for(self._queue.join(), timeout=5.0)
except asyncio.TimeoutError:
pass
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
if self._fh:
self._fh.close()

163
portspoof_py/notifier.py Normal file
View File

@@ -0,0 +1,163 @@
"""
Email alert notifier — batched delivery.
notify() is called from the logger writer coroutine and adds matching events
to a pending list. A single flush task waits for batch_delay_seconds then
sends one digest email summarising all accumulated events. A cooldown
prevents a new batch from being scheduled sooner than cooldown_seconds after
the previous one was sent.
Config file is JSON, loaded at startup, saved via update_config().
"""
import asyncio
import json
import smtplib
import sys
import time
from email.message import EmailMessage
from pathlib import Path
from typing import List, Optional
_DEFAULTS: dict = {
'enabled': False,
'smtp_host': '',
'smtp_port': 587,
'smtp_starttls': True,
'smtp_user': '',
'smtp_password': '',
'from_addr': '',
'to_addr': '',
'trigger_ports': [], # empty = alert on every port
'batch_delay_seconds': 60, # wait this long to collect events before sending
'cooldown_seconds': 300, # min time between batch emails
}
class Notifier:
def __init__(self, config_file: str):
self._config_file = config_file
self._config: dict = {}
self._pending: List[dict] = [] # events waiting to be sent
self._flush_task: Optional[asyncio.Task] = None
self._last_sent: float = 0.0 # monotonic time of last send
self._load()
# ── config I/O ────────────────────────────────────────────────────────────
def _load(self) -> None:
try:
self._config = {**_DEFAULTS, **json.loads(Path(self._config_file).read_text())}
except FileNotFoundError:
self._config = dict(_DEFAULTS)
except Exception as exc:
print(f"[notifier] config load error: {exc}", file=sys.stderr)
self._config = dict(_DEFAULTS)
def get_config_safe(self) -> dict:
"""Config dict with the SMTP password replaced by a placeholder."""
cfg = dict(self._config)
if cfg.get('smtp_password'):
cfg['smtp_password'] = '••••••••'
return cfg
def update_config(self, new_config: dict) -> None:
"""Merge new_config over current config, preserve password if blank, save."""
new_pw = new_config.get('smtp_password', '')
if not new_pw or new_pw == '••••••••':
new_config['smtp_password'] = self._config.get('smtp_password', '')
self._config = {**_DEFAULTS, **new_config}
Path(self._config_file).write_text(json.dumps(self._config, indent=2))
# ── notification ──────────────────────────────────────────────────────────
def notify(self, event: dict) -> None:
"""Enqueue an event for batched delivery.
Called from the logger writer coroutine (asyncio context).
"""
cfg = self._config
if not cfg.get('enabled'):
return
if not cfg.get('smtp_host') or not cfg.get('to_addr'):
return
trigger_ports = cfg.get('trigger_ports') or []
if trigger_ports and event['dst_port'] not in trigger_ports:
return
self._pending.append(event)
# Schedule a flush if one isn't already waiting
if self._flush_task is None or self._flush_task.done():
delay = int(cfg.get('batch_delay_seconds', 60))
self._flush_task = asyncio.create_task(self._flush_after(delay))
async def _flush_after(self, delay: int) -> None:
"""Wait for the batch window, then send the digest."""
await asyncio.sleep(delay)
if not self._pending:
return
cooldown = int(self._config.get('cooldown_seconds', 300))
if time.monotonic() - self._last_sent < cooldown:
# Still in cooldown — reschedule for the remaining time
wait = cooldown - (time.monotonic() - self._last_sent)
await asyncio.sleep(wait)
events = list(self._pending)
self._pending.clear()
self._last_sent = time.monotonic()
await asyncio.to_thread(self._send_batch, events, dict(self._config))
def _send_batch(self, events: List[dict], cfg: dict) -> None:
"""Build and deliver one digest email (blocking — runs in thread pool)."""
n = len(events)
try:
msg = EmailMessage()
msg['Subject'] = f"[portspoof] {n} connection{'s' if n != 1 else ''} detected"
msg['From'] = cfg.get('from_addr') or cfg.get('smtp_user') or 'portspoof'
msg['To'] = cfg['to_addr']
lines = [f"portspoof alert — {n} connection{'s' if n != 1 else ''}\n"]
for e in events:
ts = e['timestamp'][:19].replace('T', ' ')
hex_preview = e['banner_hex'][:40] + ('' if len(e['banner_hex']) > 40 else '')
lines.append(
f" {ts} {e['src_ip']}:{e['src_port']}"
f" → port {e['dst_port']}"
f" ({e['banner_len']} B {hex_preview})"
)
msg.set_content('\n'.join(lines) + '\n')
host = cfg['smtp_host']
pport = int(cfg.get('smtp_port', 587))
with smtplib.SMTP(host, pport, timeout=15) as smtp:
if cfg.get('smtp_starttls', True):
smtp.starttls()
user, pw = cfg.get('smtp_user', ''), cfg.get('smtp_password', '')
if user and pw:
smtp.login(user, pw)
smtp.send_message(msg)
print(f"[notifier] batch alert sent → {cfg['to_addr']} ({n} events)", flush=True)
except Exception as exc:
print(f"[notifier] send failed: {exc}", file=sys.stderr)
def send_test(self) -> Optional[str]:
"""Send a test email synchronously. Returns error string or None on success."""
import datetime
fake = {
'timestamp': datetime.datetime.now(datetime.timezone.utc).isoformat(),
'src_ip': '127.0.0.1',
'src_port': 12345,
'dst_port': 80,
'banner_hex': 'deadbeef',
'banner_len': 4,
}
try:
self._send_batch([fake], self._config)
return None
except Exception as exc:
return str(exc)

301
portspoof_py/revregex.py Normal file
View File

@@ -0,0 +1,301 @@
"""
Faithful Python port of Revregex.cpp — three-pass regex materializer.
Pipeline: _revregexn → _fill_specialchars → _escape_hex
All passes operate on list[int] (byte values 0-255).
"""
import random
from typing import List
_BS = ord('\\')
_LB = ord('[')
_RB = ord(']')
_LP = ord('(')
_RP = ord(')')
def _process_bracket(chars: List[int], start: int, end: int) -> List[int]:
"""Expand bracket class chars[start..end] (inclusive).
chars[start] == '[', chars[end] == ']'.
Reads chars[end+1] for '*' or '+' quantifier.
Replicates C++ revregex_process_bracket() exactly, including quirks:
- Range `a-z` consumes an extra character (i+=3 inside for-loop that also i++)
- Only the first character_class bit wins (if/else if chain)
- Without * or +, finsize=0 → empty output
"""
nnot = False
character_set = bytearray(256)
character_class = 0
i = start + 1 # skip '['
if i < end and chars[i] == ord('^'):
nnot = True
i += 1
while i < end:
c = chars[i]
# Escape sequence: \X — C++ does i++ inside + for-loop i++ = advance 2 total
if c == _BS and i + 1 != end:
nc = chars[i + 1]
if nc == ord('c'): character_class |= 1 << 1
elif nc == ord('s'): character_class |= 2 << 1
elif nc == ord('S'): character_class |= 1 << 3
elif nc == ord('d'): character_class |= 1 << 4
elif nc == ord('D'): character_class |= 1 << 5
elif nc == ord('w'): character_class |= 1 << 6
elif nc == ord('W'): character_class |= 1 << 7
elif nc == ord('n'): character_set[0x0a] = 1
elif nc == ord('r'): character_set[0x0d] = 1
elif nc == ord('t'): character_set[0x09] = 1
elif nc == ord('v'): character_set[0x0b] = 1
elif nc == ord('f'): character_set[0x0c] = 1
elif nc == ord('0'): character_set[0x00] = 1
else: character_set[nc] = 1
i += 2 # i++ in body + i++ in loop
# Alpha range e.g. a-z: C++ does i+=3 in body + i++ in loop = advance 4 total
elif (chr(c).isalpha()
and i + 1 < end and chars[i + 1] == ord('-')
and i + 2 < end and chr(chars[i + 2]).isalpha()):
for j in range(c, chars[i + 2] + 1):
character_set[j] = 1
i += 4 # i+=3 in body + i++ in loop (C++ quirk: skips one extra char)
# Digit range e.g. 0-9
elif (chr(c).isdigit()
and i + 1 < end and chars[i + 1] == ord('-')
and i + 2 < end and chr(chars[i + 2]).isdigit()):
for j in range(c, chars[i + 2] + 1):
character_set[j] = 1
i += 4 # same C++ quirk
elif c == ord('.'):
character_class |= 1 << 8
i += 1
elif c == ord('|'):
character_class |= 1 << 9
i += 1
else:
character_set[c] = 1
i += 1
# Read quantifier from char immediately after ']'
endmeta = chars[end + 1] if end + 1 < len(chars) else 0
# Fill character pool from character_class — C++ uses if/else if chain,
# so only the FIRST matching class contributes.
if character_class & (1 << 1): pass # \c — nothing
elif character_class & (1 << 2): pass
elif character_class & (1 << 3): pass # \S — nothing
elif character_class & (1 << 4): # \d
for j in range(ord('0'), ord('9') + 1):
character_set[j] = 1
elif character_class & (1 << 5): pass # \D — nothing
elif character_class & (1 << 6): # \w
for j in range(ord('a'), ord('z') + 1):
character_set[j] = 1
for j in range(ord('A'), ord('Z') + 1):
character_set[j] = 1
elif character_class & (1 << 7): pass # \W — nothing
elif character_class & (1 << 8): pass # . — nothing (commented out in C++)
elif character_class & (1 << 9): pass # | — nothing
# Repetition count
if endmeta == ord('*'): finsize = random.randint(0, 9)
elif endmeta == ord('+'): finsize = random.randint(1, 9)
else: finsize = 0
# Build pool
pool: List[int] = []
for idx in range(256):
if idx in (ord('['), ord(']')):
continue
if nnot:
if character_set[idx] == 0:
pool.append(idx)
else:
if character_set[idx] != 0:
pool.append(idx)
if pool and finsize > 0:
return [random.choice(pool) for _ in range(finsize)]
return []
def _revregexn(chars: List[int]) -> List[int]:
"""Pass 1 — remove unescaped (...) groups; expand [...] bracket classes.
Replicates C++ revregexn() with its goto-based repeat-until-clean loops.
"""
result = list(chars)
# ── Phase 1a: strip parentheses ─────────────────────────────────────────
while True:
action = False
in_bracket = False
for i, c in enumerate(result):
# Track bracket context (simple: checks only immediate prev char)
if c == _LB and (i == 0 or result[i - 1] != _BS):
in_bracket = True
if c == _RB and (i == 0 or result[i - 1] != _BS):
in_bracket = False
if c == _LP and not in_bracket and (i == 0 or result[i - 1] != _BS):
# Find matching ')'
in_bracket_j = False
for j in range(i, len(result)):
cj = result[j]
if cj == _LB and j > 0 and result[j - 1] != _BS:
in_bracket_j = True
if cj == _RB and j > 0 and result[j - 1] != _BS:
in_bracket_j = False
if (cj == _RP and not in_bracket and not in_bracket_j
and (j == 0 or result[j - 1] != _BS)):
result = result[:i] + result[i + 1:j] + result[j + 1:]
action = True
break
if action:
break
if not action:
break
# ── Phase 1b: expand bracket classes ────────────────────────────────────
while True:
action = False
for i, c in enumerate(result):
if c == _LB and (i == 0 or result[i - 1] != _BS):
# Find matching ']'
for j in range(i, len(result)):
cj = result[j]
if cj == _RB and (j == 0 or result[j - 1] != _BS):
expanded = _process_bracket(result, i, j)
# Skip j+1 (quantifier char) — same as C++ cutvector(j+2,...)
result = result[:i] + expanded + result[j + 2:]
action = True
break
if action:
break
if not action:
break
return result
def _fill_specialchars(chars: List[int]) -> List[int]:
"""Pass 2 — expand \\w, \\d, \\n, \\r, \\t, and bare '.' metacharacters.
Replicates C++ fill_specialchars(). Key quirks:
- \\w+ and \\d+ skip one extra char after the quantifier (C++ i+=2 + loop i++)
- .+ skips the quantifier only (C++ i++ + loop i++)
- \\0 has NO special handling → '\' falls to else, '0' falls to else
"""
result: List[int] = []
i = 0
n = len(chars)
while i < n:
c = chars[i]
prev_is_bs = i > 0 and chars[i - 1] == _BS
if (c == _BS and i + 1 < n and chars[i + 1] == ord('w') and not prev_is_bs):
result.append(ord('a') + random.randint(0, 24)) # 97 + rand()%25
i += 1 # i++ in body (now at 'w')
if i + 1 < n and chars[i + 1] in (ord('+'), ord('*')):
i += 2 # C++ quirk: i+=2 skips quantifier + one more
i += 1 # loop i++
elif (c == _BS and i + 1 < n and chars[i + 1] == ord('d') and not prev_is_bs):
result.append(ord('0') + random.randint(0, 9)) # 48 + rand()%10
i += 1
if i + 1 < n and chars[i + 1] in (ord('+'), ord('*')):
i += 2
i += 1
elif (c == _BS and i + 1 < n and chars[i + 1] == ord('n') and not prev_is_bs):
result.append(0x0a) # '\n'
i += 2 # i++ in body + loop i++
elif (c == _BS and i + 1 < n and chars[i + 1] == ord('r') and not prev_is_bs):
result.append(0x0d) # '\r'
i += 2
elif (c == _BS and i + 1 < n and chars[i + 1] == ord('t') and not prev_is_bs):
result.append(0x09) # '\t'
i += 2
elif (c == ord('.') and i + 1 < n and not prev_is_bs):
result.append(ord('a') + random.randint(0, 24))
# C++: if str[i+1]=='+'/='*' then i++ (inner), then loop i++
# Net: .+ → advance 3 (consume '.', '+', next char)
# . → advance 2 (consume '.', next char) — BUT wait...
# Actually C++ check is: if(i<=end_offset && (str[i+1]=='+' || ...)) i++
# 'i' is at '.', so str[i+1] is char after '.'. If quantifier: i++ (to quantifier), loop i++ (past).
# No quantifier: loop i++ only (past '.').
if chars[i + 1] in (ord('+'), ord('*')):
i += 1 # inner i++ moves to quantifier
i += 1 # loop i++
else:
result.append(c)
i += 1
return result
def _is_hex(c: int) -> bool:
return (ord('0') <= c <= ord('9')
or ord('A') <= c <= ord('F')
or ord('a') <= c <= ord('f'))
def _char2hex(hb: int, lb: int) -> int:
"""Convert two hex digit ints to a byte value (replicates C++ char2hex)."""
value = 0
for ch in (hb, lb):
if ord('0') <= ch <= ord('9'):
value = (value << 4) + (ch - ord('0'))
elif ord('A') <= ch <= ord('F'):
value = (value << 4) + (ch - ord('A') + 10)
elif ord('a') <= ch <= ord('f'):
value = (value << 4) + (ch - ord('a') + 10)
else:
return value # early return on invalid hex char (C++ behaviour)
return value
def _escape_hex(chars: List[int]) -> List[int]:
r"""Pass 3 — convert \xNN hex escapes to byte values.
Replicates C++ escape_hex(). Key quirk:
When '\' is seen but NOT followed by xNN, the '\' is silently DROPPED
(consumed by the outer if with no push, then loop advances).
"""
result: List[int] = []
i = 0
n = len(chars)
while i < n:
c = chars[i]
if c == _BS and (i == 0 or chars[i - 1] != _BS):
if (i + 1 < n and chars[i + 1] == ord('x')
and i + 2 < n and _is_hex(chars[i + 2])
and i + 3 < n and _is_hex(chars[i + 3])):
result.append(_char2hex(chars[i + 2], chars[i + 3]))
i += 4 # i+=3 in body + loop i++
continue
# else: '\' is consumed/dropped; loop advances past it
else:
result.append(c)
i += 1
return result
def process_signature(sig: str) -> bytes:
"""Run the three-pass pipeline on one signature string → banner bytes."""
try:
data = list(sig.encode('latin-1'))
data = _revregexn(data)
data = _fill_specialchars(data)
data = _escape_hex(data)
return bytes(data)
except Exception:
return b''

100
portspoof_py/server.py Normal file
View File

@@ -0,0 +1,100 @@
"""
asyncio TCP server — connection handler with SO_ORIGINAL_DST support.
"""
import asyncio
import socket
import struct
from typing import Optional
from .config import Config
from .logger import Logger
# Linux kernel constants
SOL_IP = 0
SO_ORIGINAL_DST = 80
def get_original_dst(sock: socket.socket) -> Optional[int]:
"""Return the original destination port via SO_ORIGINAL_DST getsockopt.
Struct layout (sockaddr_in, 16 bytes):
!HH4s8x → family(2) + port(2) + addr(4) + padding(8)
Returns port number, or None on failure (e.g. no iptables REDIRECT).
"""
try:
raw = sock.getsockopt(SOL_IP, SO_ORIGINAL_DST, 16)
_family, port, _addr = struct.unpack('!HH4s8x', raw)
return port
except OSError:
return None
async def _handle(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
cfg: Config,
log: Logger,
verbose: bool,
) -> None:
sock = writer.get_extra_info('socket')
peername = writer.get_extra_info('peername') or ('?.?.?.?', 0)
src_ip, src_port = peername[0], peername[1]
original_port = get_original_dst(sock)
if original_port is None:
# No REDIRECT in place — use the local port as a fallback
sockname = writer.get_extra_info('sockname') or ('', 0)
original_port = sockname[1]
banner = cfg.get_banner(original_port)
if verbose:
print(
f"[conn] {src_ip}:{src_port} → port {original_port}"
f" banner={banner[:32].hex()}{'' if len(banner) > 32 else ''}"
)
try:
if banner:
writer.write(banner)
await writer.drain()
except (ConnectionResetError, BrokenPipeError):
pass
finally:
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
log.log_connection(src_ip, src_port, original_port, banner)
async def run_server(
cfg: Config,
log: Logger,
host: Optional[str],
port: int,
verbose: bool,
stop_event: asyncio.Event,
) -> None:
"""Start the asyncio TCP server and run until stop_event is set."""
def handler(r, w):
asyncio.create_task(_handle(r, w, cfg, log, verbose))
server = await asyncio.start_server(
handler,
host=host or None,
port=port,
reuse_address=True,
backlog=256,
)
addrs = ', '.join(str(s.getsockname()) for s in server.sockets)
print(f"[portspoof] listening on {addrs}")
async with server:
await stop_event.wait()
print("[portspoof] server stopped")

89
portspoof_py/stats.py Normal file
View File

@@ -0,0 +1,89 @@
"""
In-memory connection statistics.
All methods are called from the single asyncio event loop — no locking needed.
"""
import time
from collections import Counter, deque
from typing import List, Tuple
def _fmt_uptime(seconds: float) -> str:
s = int(seconds)
d, s = divmod(s, 86400)
h, s = divmod(s, 3600)
m, s = divmod(s, 60)
if d:
return f'{d}d {h}h {m}m'
if h:
return f'{h}h {m}m {s}s'
if m:
return f'{m}m {s}s'
return f'{s}s'
class Stats:
def __init__(self, max_recent: int = 500):
self._start = time.monotonic()
self._total = 0
self._last_ts: str | None = None # ISO timestamp of last connection
self._recent: deque = deque(maxlen=max_recent)
self._timestamps: deque = deque() # monotonic timestamps for rolling rate
self._top_ips = Counter()
self._top_ports = Counter()
# ── write side (called from logger writer coroutine) ────────────────────
def record(self, event: dict) -> None:
self._total += 1
self._recent.append(event)
now = time.monotonic()
self._timestamps.append(now)
# Trim entries older than 60 s while we have them
cutoff = now - 60
while self._timestamps and self._timestamps[0] < cutoff:
self._timestamps.popleft()
self._last_ts = event['timestamp']
self._top_ips[event['src_ip']] += 1
self._top_ports[event['dst_port']] += 1
# ── read side (called from admin HTTP handler) ───────────────────────────
@property
def total(self) -> int:
return self._total
def connections_per_minute(self) -> int:
cutoff = time.monotonic() - 60
while self._timestamps and self._timestamps[0] < cutoff:
self._timestamps.popleft()
return len(self._timestamps)
def uptime_str(self) -> str:
return _fmt_uptime(time.monotonic() - self._start)
def recent_connections(self, limit: int = 50) -> List[dict]:
"""Return up to `limit` most-recent connections, newest first."""
items = list(self._recent)
return list(reversed(items[-limit:]))
def top_ips(self, n: int = 10) -> List[Tuple[str, int]]:
return self._top_ips.most_common(n)
def top_ports(self, n: int = 10) -> List[Tuple[int, int]]:
return self._top_ports.most_common(n)
@property
def last_connection(self) -> str | None:
return self._last_ts
def as_dict(self, ports_mapped: int = 0) -> dict:
return {
'uptime': self.uptime_str(),
'total_connections': self.total,
'last_connection': self._last_ts,
'connections_per_min': self.connections_per_minute(),
'ports_mapped': ports_mapped,
'top_ips': [{'ip': ip, 'count': c} for ip, c in self.top_ips()],
'top_ports': [{'port': p, 'count': c} for p, c in self.top_ports()],
}

18
pyproject.toml Normal file
View File

@@ -0,0 +1,18 @@
[build-system]
requires = ["flit_core>=3.2"]
build-backend = "flit_core.buildapi"
[project]
name = "portspoof-py"
version = "1.0.0"
description = "Python asyncio rewrite of the portspoof TCP honeypot"
readme = "README.md"
requires-python = ">=3.11"
license = {text = "GPL-2.0-or-later"}
dependencies = [] # zero runtime deps — stdlib only
[project.scripts]
portspoof-py = "portspoof_py.cli:main"
[project.urls]
Source = "https://github.com/drk1wi/portspoof"

1
tools/portspoof.conf Symbolic link
View File

@@ -0,0 +1 @@
/home/daprogs/claude/portspoof/tools/portspoof.conf

1
tools/portspoof_signatures Symbolic link
View File

@@ -0,0 +1 @@
/home/daprogs/claude/portspoof/tools/portspoof_signatures