adding country codes

This commit is contained in:
2026-03-10 18:37:47 -04:00
parent 54d39b9d61
commit 02800ef496
2 changed files with 52 additions and 6 deletions

View File

@@ -19,6 +19,7 @@ from typing import Optional, Tuple
from urllib.parse import parse_qs, urlparse
from .config import Config
from .geo import country as _geo_country
from .notifier import Notifier
from .stats import Stats
@@ -303,6 +304,7 @@ tr:hover td { background: #1c2128; }
.hex { color: #a5d6ff; font-size: 11px;
max-width: 240px; overflow: hidden;
text-overflow: ellipsis; white-space: nowrap; }
.cc { color: #8b949e; font-size: 11px; }
.badge {
display: inline-block; background: #21262d;
border-radius: 3px; padding: 1px 7px; font-size: 11px;
@@ -452,12 +454,29 @@ def _empty_row(cols: int, msg: str) -> str:
return f'<tr><td colspan="{cols}" class="empty">{msg}</td></tr>'
def _render_dashboard(stats: Stats, cfg: Config) -> str:
async def _render_dashboard(stats: Stats, cfg: Config) -> str:
# ── top IPs ──
top_ips = stats.top_ips()
# ── recent connections ──
recent = stats.recent_connections(50)
# Prefetch geo for all unique IPs in one batch
unique_ips = {ip for ip, _ in top_ips} | {e['src_ip'] for e in recent}
if unique_ips:
await asyncio.gather(*(_geo_country(ip) for ip in unique_ips))
def _ip_cell(ip: str) -> str:
from .geo import _cache as _geo_cache
code = _geo_cache.get(ip, '')
esc = html.escape(ip)
if code:
return f'<span class="ip">{esc}</span> <span class="cc">({html.escape(code)})</span>'
return f'<span class="ip">{esc}</span>'
ip_rows = (
''.join(
f'<tr><td class="ip">{html.escape(ip)}</td><td>{c}</td></tr>'
f'<tr><td>{_ip_cell(ip)}</td><td>{c}</td></tr>'
for ip, c in top_ips
) if top_ips else _empty_row(2, 'no data yet')
)
@@ -474,13 +493,11 @@ def _render_dashboard(stats: Stats, cfg: Config) -> str:
# ── CPM chart ──
cpm_chart = _render_cpm_chart(stats.cpm_history(60))
# ── recent connections ──
recent = stats.recent_connections(50)
conn_rows = (
''.join(
'<tr>'
f'<td class="ts">{html.escape(e["timestamp"][:19].replace("T", " "))}</td>'
f'<td class="ip">{html.escape(e["src_ip"])}</td>'
f'<td>{_ip_cell(e["src_ip"])}</td>'
f'<td>{e["src_port"]}</td>'
f'<td class="port">{e["dst_port"]}</td>'
f'<td class="hex" title="{html.escape(e["banner_hex"])}">'
@@ -637,7 +654,7 @@ async def _handle(
return
elif path == '/':
body = _render_dashboard(stats, cfg)
body = await _render_dashboard(stats, cfg)
ct = 'text/html; charset=utf-8'
elif path == '/api/stats':

29
portspoof_py/geo.py Normal file
View File

@@ -0,0 +1,29 @@
"""IP geolocation — async lookup with in-memory cache. Zero external deps."""
import asyncio
import urllib.request
_cache: dict[str, str] = {}
_sem = asyncio.Semaphore(5)
async def country(ip: str) -> str:
"""Return two-letter country code for *ip*, or '' on error/unknown."""
if ip in _cache:
return _cache[ip]
async with _sem:
if ip in _cache:
return _cache[ip]
def _fetch() -> str:
url = f'https://www.daprogs.com/ip/?raw=1&ip={ip}'
with urllib.request.urlopen(url, timeout=3) as r:
return r.read().decode('ascii', errors='replace')
try:
text = await asyncio.to_thread(_fetch)
parts = text.split('|')
code = parts[3].strip() if len(parts) > 3 else ''
except Exception:
code = ''
_cache[ip] = code
return code