Files
portspoof_concentrator/includes/functions.php
2026-03-11 11:18:47 -04:00

363 lines
12 KiB
PHP

<?php
require_once __DIR__ . '/db.php';
require_once __DIR__ . '/../version.php';
// ── Node helpers ──────────────────────────────────────────────────────────────
function get_all_nodes(): array {
return db()->query('SELECT * FROM nodes ORDER BY name')->fetchAll();
}
function get_node(int $id): array|false {
$s = db()->prepare('SELECT * FROM nodes WHERE id = ?');
$s->execute([$id]);
return $s->fetch();
}
function upsert_node(array $data, ?int $id = null): int {
$pdo = db();
if ($id) {
$s = $pdo->prepare(
'UPDATE nodes SET name=?, api_url=?, username=?, password=?,
verify_ssl=?, enabled=? WHERE id=?'
);
$s->execute([
$data['name'], $data['api_url'], $data['username'], $data['password'],
(int)($data['verify_ssl'] ?? 0), (int)($data['enabled'] ?? 1), $id,
]);
return $id;
}
$s = $pdo->prepare(
'INSERT INTO nodes (name, api_url, username, password, verify_ssl, enabled)
VALUES (?, ?, ?, ?, ?, ?)'
);
$s->execute([
$data['name'], $data['api_url'], $data['username'], $data['password'],
(int)($data['verify_ssl'] ?? 0), (int)($data['enabled'] ?? 1),
]);
return (int)$pdo->lastInsertId();
}
function delete_node(int $id): void {
$s = db()->prepare('DELETE FROM nodes WHERE id = ?');
$s->execute([$id]);
}
// ── Fetch helpers ─────────────────────────────────────────────────────────────
/**
* Call the portspoof_py JSON API on a node.
* Returns decoded JSON on success, or false on failure.
*/
function fetch_node_api(array $node, string $path): mixed {
$url = rtrim($node['api_url'], '/') . $path;
$ch = curl_init($url);
curl_setopt_array($ch, [
CURLOPT_RETURNTRANSFER => true,
CURLOPT_TIMEOUT => FETCH_TIMEOUT,
CURLOPT_USERPWD => $node['username'] . ':' . $node['password'],
CURLOPT_HTTPAUTH => CURLAUTH_BASIC,
CURLOPT_SSL_VERIFYPEER => (bool)$node['verify_ssl'],
CURLOPT_SSL_VERIFYHOST => $node['verify_ssl'] ? 2 : 0,
CURLOPT_HTTPHEADER => ['Accept: application/json'],
]);
$body = curl_exec($ch);
$code = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($body === false || $code !== 200) {
return false;
}
return json_decode($body, true);
}
/**
* Ingest connection events from portspoof_py that are strictly newer than
* $last_event_at (ISO 8601 string, or null to ingest everything).
*
* Returns ['inserted' => int, 'new_max_ts' => string|null]
* where new_max_ts is the DATETIME(6) of the newest row inserted, or null if
* nothing new was inserted.
*/
function ingest_connections(int $node_id, array $events, ?string $last_event_at): array {
$inserted = 0;
$new_max_ts = null;
$s = db()->prepare(
'INSERT INTO connections
(node_id, occurred_at, src_ip, src_port, dst_port, banner_hex, banner_len)
VALUES (?, ?, ?, ?, ?, ?, ?)'
);
foreach ($events as $ev) {
$raw_ts = $ev['timestamp'] ?? null;
if ($raw_ts === null) {
continue;
}
// Normalise to MySQL DATETIME(6)
$ts = date('Y-m-d H:i:s.u', strtotime($raw_ts));
// Skip events already ingested
if ($last_event_at !== null && $ts <= $last_event_at) {
continue;
}
$s->execute([
$node_id,
$ts,
$ev['src_ip'] ?? '',
(int)($ev['src_port'] ?? 0),
(int)($ev['dst_port'] ?? 0),
$ev['banner_hex'] ?? null,
(int)($ev['banner_len'] ?? 0),
]);
$inserted++;
if ($new_max_ts === null || $ts > $new_max_ts) {
$new_max_ts = $ts;
}
}
return ['inserted' => $inserted, 'new_max_ts' => $new_max_ts];
}
/**
* Poll every enabled node, ingest new events, and advance each node's cursor.
*
* Returns an array of per-node result maps:
* ['node_id', 'name', 'fetched', 'inserted', 'last_event_at', 'error']
*/
function run_fetch(): array {
$nodes = get_all_nodes();
$enabled = array_filter($nodes, fn($n) => (bool)$n['enabled']);
$results = [];
foreach ($enabled as $node) {
$entry = [
'node_id' => (int)$node['id'],
'name' => $node['name'],
'fetched' => 0,
'inserted' => 0,
'last_event_at' => $node['last_event_at'],
'error' => null,
];
$events = fetch_node_api($node, '/api/connections?limit=' . FETCH_LIMIT);
if ($events === false) {
$entry['error'] = 'could not reach API';
$results[] = $entry;
continue;
}
if (!is_array($events)) {
$entry['error'] = 'unexpected API response';
$results[] = $entry;
continue;
}
$entry['fetched'] = count($events);
$result = ingest_connections((int)$node['id'], $events, $node['last_event_at']);
$entry['inserted'] = $result['inserted'];
$s = db()->prepare(
'UPDATE nodes SET last_fetched_at = NOW()'
. ($result['new_max_ts'] !== null ? ', last_event_at = ?' : '')
. ' WHERE id = ?'
);
$params = $result['new_max_ts'] !== null
? [$result['new_max_ts'], $node['id']]
: [$node['id']];
$s->execute($params);
if ($result['new_max_ts'] !== null) {
$entry['last_event_at'] = $result['new_max_ts'];
}
$results[] = $entry;
}
return $results;
}
// ── Upstream version check ────────────────────────────────────────────────────
define('UPSTREAM_VERSION_URL', 'https://git.ny.daprogs.com/api/v1/repos/DAProgs/portspoof_concentrator/raw/version.php?ref=main');
define('UPSTREAM_VERSION_CACHE', __DIR__ . '/../upstream_version.cache');
define('UPSTREAM_VERSION_TTL', 3600); // seconds
/**
* Fetch the upstream version string from the git repo, with a 1-hour file cache.
* Returns null silently on any failure so the page always loads.
*/
function fetch_upstream_version(): ?string {
// Return cached value if still fresh
if (file_exists(UPSTREAM_VERSION_CACHE)) {
$cached = json_decode(file_get_contents(UPSTREAM_VERSION_CACHE), true);
if (isset($cached['version'], $cached['checked_at'])
&& (time() - $cached['checked_at']) < UPSTREAM_VERSION_TTL) {
return $cached['version'];
}
}
$ch = curl_init(UPSTREAM_VERSION_URL);
curl_setopt_array($ch, [
CURLOPT_RETURNTRANSFER => true,
CURLOPT_TIMEOUT => 5,
CURLOPT_FOLLOWLOCATION => true,
]);
$body = curl_exec($ch);
$code = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($body === false || $code !== 200) {
return null;
}
if (!preg_match("/define\s*\(\s*'APP_VERSION'\s*,\s*'([^']+)'\s*\)/", $body, $m)) {
return null;
}
$version = $m[1];
file_put_contents(
UPSTREAM_VERSION_CACHE,
json_encode(['version' => $version, 'checked_at' => time()]),
LOCK_EX
);
return $version;
}
/**
* Returns true if $upstream is a newer version than $local.
* Format: YYMM.N e.g. 2603.4
*/
function is_newer_version(string $upstream, string $local): bool {
$parse = fn($v) => array_map('intval', explode('.', $v, 2));
[$um, $un] = $parse($upstream);
[$lm, $ln] = $parse($local);
return $um > $lm || ($um === $lm && $un > $ln);
}
// ── API queries ───────────────────────────────────────────────────────────────
/**
* Return connections from the last $minutes minutes, newest first.
* Optionally filtered to a single node.
*/
function connections_since(int $minutes = 10, ?int $node_id = null): array {
$since = date('Y-m-d H:i:s', time() - $minutes * 60);
if ($node_id !== null) {
$s = db()->prepare(
'SELECT c.id, c.occurred_at, c.src_ip, c.src_port, c.dst_port,
c.banner_hex, c.banner_len, n.name AS node_name, n.id AS node_id
FROM connections c JOIN nodes n ON n.id = c.node_id
WHERE c.node_id = ? AND c.occurred_at >= ?
ORDER BY c.occurred_at DESC'
);
$s->execute([$node_id, $since]);
} else {
$s = db()->prepare(
'SELECT c.id, c.occurred_at, c.src_ip, c.src_port, c.dst_port,
c.banner_hex, c.banner_len, n.name AS node_name, n.id AS node_id
FROM connections c JOIN nodes n ON n.id = c.node_id
WHERE c.occurred_at >= ?
ORDER BY c.occurred_at DESC'
);
$s->execute([$since]);
}
return $s->fetchAll();
}
// ── Dashboard stats ───────────────────────────────────────────────────────────
function global_stats(): array {
$pdo = db();
$total = (int)$pdo->query('SELECT COUNT(*) FROM connections')->fetchColumn();
$since = date('Y-m-d H:i:s', time() - RATE_WINDOW_SECONDS);
$recent = (int)$pdo->prepare('SELECT COUNT(*) FROM connections WHERE occurred_at >= ?')
->execute([$since]) ? : 0;
$s = $pdo->prepare('SELECT COUNT(*) FROM connections WHERE occurred_at >= ?');
$s->execute([$since]);
$recent = (int)$s->fetchColumn();
$s = $pdo->query('SELECT MAX(occurred_at) FROM connections');
$last = $s->fetchColumn() ?: null;
return compact('total', 'recent', 'last');
}
function top_ips(int $n = TOP_N): array {
$s = db()->prepare(
'SELECT src_ip, COUNT(*) AS cnt
FROM connections
GROUP BY src_ip
ORDER BY cnt DESC
LIMIT ?'
);
$s->execute([$n]);
return $s->fetchAll();
}
function top_ports(int $n = TOP_N): array {
$s = db()->prepare(
'SELECT dst_port, COUNT(*) AS cnt
FROM connections
GROUP BY dst_port
ORDER BY cnt DESC
LIMIT ?'
);
$s->execute([$n]);
return $s->fetchAll();
}
function top_ips_by_node(int $node_id, int $n = TOP_N): array {
$s = db()->prepare(
'SELECT src_ip, COUNT(*) AS cnt
FROM connections
WHERE node_id = ?
GROUP BY src_ip
ORDER BY cnt DESC
LIMIT ?'
);
$s->execute([$node_id, $n]);
return $s->fetchAll();
}
function recent_connections(int $limit = DASH_RECENT_LIMIT, ?int $node_id = null): array {
if ($node_id !== null) {
$s = db()->prepare(
'SELECT c.*, n.name AS node_name
FROM connections c JOIN nodes n ON n.id = c.node_id
WHERE c.node_id = ?
ORDER BY c.occurred_at DESC
LIMIT ?'
);
$s->execute([$node_id, $limit]);
} else {
$s = db()->prepare(
'SELECT c.*, n.name AS node_name
FROM connections c JOIN nodes n ON n.id = c.node_id
ORDER BY c.occurred_at DESC
LIMIT ?'
);
$s->execute([$limit]);
}
return $s->fetchAll();
}
// ── Misc ──────────────────────────────────────────────────────────────────────
function h(string $s): string {
return htmlspecialchars($s, ENT_QUOTES | ENT_SUBSTITUTE, 'UTF-8');
}
function banner_text(string $hex): string {
$raw = hex2bin($hex);
if ($raw === false) return '';
return mb_convert_encoding($raw, 'UTF-8', 'latin-1');
}