|
|
|
|
@@ -71,6 +71,63 @@ _UNAUTHORIZED = (
|
|
|
|
|
b'Unauthorized'
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# ── passwd page ───────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
_PASSWD_HTML = """\
|
|
|
|
|
<!DOCTYPE html>
|
|
|
|
|
<html lang="en">
|
|
|
|
|
<head>
|
|
|
|
|
<meta charset="utf-8">
|
|
|
|
|
<meta name="viewport" content="width=device-width, initial-scale=1">
|
|
|
|
|
<title>portspoof — change password</title>
|
|
|
|
|
<style>{css}</style>
|
|
|
|
|
</head>
|
|
|
|
|
<body>
|
|
|
|
|
<h1>portspoof admin</h1>
|
|
|
|
|
<p class="sub"><a href="/">← dashboard</a></p>
|
|
|
|
|
|
|
|
|
|
{banner_html}
|
|
|
|
|
|
|
|
|
|
<div class="card">
|
|
|
|
|
<h2>Change password</h2>
|
|
|
|
|
<form method="post" action="/api/passwd">
|
|
|
|
|
<div class="field">
|
|
|
|
|
<label>Current password</label>
|
|
|
|
|
<input type="password" name="current_password" autofocus>
|
|
|
|
|
</div>
|
|
|
|
|
<div class="field">
|
|
|
|
|
<label>New password</label>
|
|
|
|
|
<input type="password" name="new_password">
|
|
|
|
|
</div>
|
|
|
|
|
<div class="field">
|
|
|
|
|
<label>Confirm new password</label>
|
|
|
|
|
<input type="password" name="confirm_password">
|
|
|
|
|
</div>
|
|
|
|
|
<div class="actions">
|
|
|
|
|
<button type="submit">Update password</button>
|
|
|
|
|
</div>
|
|
|
|
|
</form>
|
|
|
|
|
</div>
|
|
|
|
|
</body>
|
|
|
|
|
</html>
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
_PASSWD_ERRORS = {
|
|
|
|
|
'wrong_current': 'Current password is incorrect.',
|
|
|
|
|
'mismatch': 'New passwords do not match.',
|
|
|
|
|
'empty': 'New password must not be empty.',
|
|
|
|
|
'save': 'Password updated in memory but could not be written to disk.',
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _render_passwd(msg: str = '', msg_ok: bool = True) -> str:
|
|
|
|
|
banner_html = ''
|
|
|
|
|
if msg:
|
|
|
|
|
cls = 'ok' if msg_ok else 'err'
|
|
|
|
|
banner_html = f'<div class="banner {cls}">{html.escape(msg)}</div>'
|
|
|
|
|
return _PASSWD_HTML.format(css=_CONFIG_CSS, banner_html=banner_html)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── config page ───────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
_CONFIG_CSS = """
|
|
|
|
|
@@ -335,7 +392,8 @@ _HTML = """\
|
|
|
|
|
<button type="submit" class="link-btn">reset stats</button>
|
|
|
|
|
</form>
|
|
|
|
|
· <a href="/config">email alerts</a> ·
|
|
|
|
|
<a href="/api/stats" target="_blank">JSON stats</a>
|
|
|
|
|
<a href="/api/stats" target="_blank">JSON stats</a> ·
|
|
|
|
|
<a href="/passwd">change password</a>
|
|
|
|
|
</p>
|
|
|
|
|
|
|
|
|
|
<!-- stat cards -->
|
|
|
|
|
@@ -532,7 +590,8 @@ async def _handle(
|
|
|
|
|
writer: asyncio.StreamWriter,
|
|
|
|
|
stats: Stats,
|
|
|
|
|
cfg: Config,
|
|
|
|
|
creds: Tuple[str, str],
|
|
|
|
|
creds_mut: list,
|
|
|
|
|
passwd_file: str,
|
|
|
|
|
notifier: Optional['Notifier'] = None,
|
|
|
|
|
) -> None:
|
|
|
|
|
try:
|
|
|
|
|
@@ -543,7 +602,7 @@ async def _handle(
|
|
|
|
|
if len(parts) < 2:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if not _check_auth(text, creds):
|
|
|
|
|
if not _check_auth(text, creds_mut):
|
|
|
|
|
writer.write(_UNAUTHORIZED)
|
|
|
|
|
await writer.drain()
|
|
|
|
|
return
|
|
|
|
|
@@ -645,6 +704,40 @@ async def _handle(
|
|
|
|
|
body = json.dumps(notifier.get_config_safe(), indent=2)
|
|
|
|
|
ct = 'application/json'
|
|
|
|
|
|
|
|
|
|
# ── password change ───────────────────────────────────────────────────
|
|
|
|
|
elif path == '/passwd':
|
|
|
|
|
msg = ''
|
|
|
|
|
msg_ok = True
|
|
|
|
|
if 'ok' in qs:
|
|
|
|
|
msg = 'Password updated successfully.'
|
|
|
|
|
elif 'err' in qs:
|
|
|
|
|
msg = _PASSWD_ERRORS.get(qs['err'][0], 'Unknown error.')
|
|
|
|
|
msg_ok = False
|
|
|
|
|
body = _render_passwd(msg, msg_ok)
|
|
|
|
|
ct = 'text/html; charset=utf-8'
|
|
|
|
|
|
|
|
|
|
elif path == '/api/passwd' and method == 'POST':
|
|
|
|
|
form = _parse_post_body(text)
|
|
|
|
|
current = form.get('current_password', '')
|
|
|
|
|
new_pw = form.get('new_password', '')
|
|
|
|
|
confirm = form.get('confirm_password', '')
|
|
|
|
|
_, expected_passwd = creds_mut
|
|
|
|
|
if not hmac.compare_digest(current, expected_passwd):
|
|
|
|
|
writer.write(_redirect('/passwd?err=wrong_current'))
|
|
|
|
|
elif not new_pw:
|
|
|
|
|
writer.write(_redirect('/passwd?err=empty'))
|
|
|
|
|
elif new_pw != confirm:
|
|
|
|
|
writer.write(_redirect('/passwd?err=mismatch'))
|
|
|
|
|
else:
|
|
|
|
|
creds_mut[1] = new_pw
|
|
|
|
|
try:
|
|
|
|
|
Path(passwd_file).write_text(f'{creds_mut[0]}:{new_pw}\n')
|
|
|
|
|
writer.write(_redirect('/passwd?ok=1'))
|
|
|
|
|
except Exception:
|
|
|
|
|
writer.write(_redirect('/passwd?err=save'))
|
|
|
|
|
await writer.drain()
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
# ── standard routes ───────────────────────────────────────────────────
|
|
|
|
|
elif path == '/api/stats/reset' and method == 'POST':
|
|
|
|
|
stats.reset()
|
|
|
|
|
@@ -721,11 +814,13 @@ async def run_admin(
|
|
|
|
|
stop_event: asyncio.Event,
|
|
|
|
|
notifier: Optional['Notifier'] = None,
|
|
|
|
|
ssl_context: Optional[ssl.SSLContext] = None,
|
|
|
|
|
passwd_file: str = 'admin.passwd',
|
|
|
|
|
) -> None:
|
|
|
|
|
"""Start the admin HTTP(S) server and run until stop_event is set."""
|
|
|
|
|
creds_mut = list(creds) # mutable so password changes propagate across requests
|
|
|
|
|
|
|
|
|
|
def handler(r, w):
|
|
|
|
|
asyncio.create_task(_handle(r, w, stats, cfg, creds, notifier))
|
|
|
|
|
asyncio.create_task(_handle(r, w, stats, cfg, creds_mut, passwd_file, notifier))
|
|
|
|
|
|
|
|
|
|
server = await asyncio.start_server(
|
|
|
|
|
handler,
|
|
|
|
|
|