query('SELECT * FROM nodes ORDER BY name')->fetchAll(); } function get_node(int $id): array|false { $s = db()->prepare('SELECT * FROM nodes WHERE id = ?'); $s->execute([$id]); return $s->fetch(); } function upsert_node(array $data, ?int $id = null): int { $pdo = db(); if ($id) { $s = $pdo->prepare( 'UPDATE nodes SET name=?, api_url=?, username=?, password=?, verify_ssl=?, enabled=? WHERE id=?' ); $s->execute([ $data['name'], $data['api_url'], $data['username'], $data['password'], (int)($data['verify_ssl'] ?? 0), (int)($data['enabled'] ?? 1), $id, ]); return $id; } $s = $pdo->prepare( 'INSERT INTO nodes (name, api_url, username, password, verify_ssl, enabled) VALUES (?, ?, ?, ?, ?, ?)' ); $s->execute([ $data['name'], $data['api_url'], $data['username'], $data['password'], (int)($data['verify_ssl'] ?? 0), (int)($data['enabled'] ?? 1), ]); return (int)$pdo->lastInsertId(); } function delete_node(int $id): void { $s = db()->prepare('DELETE FROM nodes WHERE id = ?'); $s->execute([$id]); } // ── Fetch helpers ───────────────────────────────────────────────────────────── /** * Call the portspoof_py JSON API on a node. * Returns decoded JSON on success, or false on failure. */ function fetch_node_api(array $node, string $path): mixed { $url = rtrim($node['api_url'], '/') . $path; $ch = curl_init($url); curl_setopt_array($ch, [ CURLOPT_RETURNTRANSFER => true, CURLOPT_TIMEOUT => FETCH_TIMEOUT, CURLOPT_USERPWD => $node['username'] . ':' . $node['password'], CURLOPT_HTTPAUTH => CURLAUTH_BASIC, CURLOPT_SSL_VERIFYPEER => (bool)$node['verify_ssl'], CURLOPT_SSL_VERIFYHOST => $node['verify_ssl'] ? 2 : 0, CURLOPT_HTTPHEADER => ['Accept: application/json'], ]); $body = curl_exec($ch); $code = curl_getinfo($ch, CURLINFO_HTTP_CODE); curl_close($ch); if ($body === false || $code !== 200) { return false; } return json_decode($body, true); } /** * Ingest connection events from portspoof_py that are strictly newer than * $last_event_at (ISO 8601 string, or null to ingest everything). * * Returns ['inserted' => int, 'new_max_ts' => string|null] * where new_max_ts is the DATETIME(6) of the newest row inserted, or null if * nothing new was inserted. */ function ingest_connections(int $node_id, array $events, ?string $last_event_at): array { $inserted = 0; $new_max_ts = null; $s = db()->prepare( 'INSERT INTO connections (node_id, occurred_at, src_ip, src_port, dst_port, banner_hex, banner_len) VALUES (?, ?, ?, ?, ?, ?, ?)' ); foreach ($events as $ev) { $raw_ts = $ev['timestamp'] ?? null; if ($raw_ts === null) { continue; } // Normalise to MySQL DATETIME(6) $ts = date('Y-m-d H:i:s.u', strtotime($raw_ts)); // Skip events already ingested if ($last_event_at !== null && $ts <= $last_event_at) { continue; } $s->execute([ $node_id, $ts, $ev['src_ip'] ?? '', (int)($ev['src_port'] ?? 0), (int)($ev['dst_port'] ?? 0), $ev['banner_hex'] ?? null, (int)($ev['banner_len'] ?? 0), ]); $inserted++; if ($new_max_ts === null || $ts > $new_max_ts) { $new_max_ts = $ts; } } return ['inserted' => $inserted, 'new_max_ts' => $new_max_ts]; } /** * Poll every enabled node, ingest new events, and advance each node's cursor. * * Returns an array of per-node result maps: * ['node_id', 'name', 'fetched', 'inserted', 'last_event_at', 'error'] */ function run_fetch(): array { $nodes = get_all_nodes(); $enabled = array_filter($nodes, fn($n) => (bool)$n['enabled']); $results = []; foreach ($enabled as $node) { $entry = [ 'node_id' => (int)$node['id'], 'name' => $node['name'], 'fetched' => 0, 'inserted' => 0, 'last_event_at' => $node['last_event_at'], 'error' => null, ]; $events = fetch_node_api($node, '/api/connections?limit=' . FETCH_LIMIT); if ($events === false) { $entry['error'] = 'could not reach API'; $results[] = $entry; continue; } if (!is_array($events)) { $entry['error'] = 'unexpected API response'; $results[] = $entry; continue; } $entry['fetched'] = count($events); $result = ingest_connections((int)$node['id'], $events, $node['last_event_at']); $entry['inserted'] = $result['inserted']; $s = db()->prepare( 'UPDATE nodes SET last_fetched_at = NOW()' . ($result['new_max_ts'] !== null ? ', last_event_at = ?' : '') . ' WHERE id = ?' ); $params = $result['new_max_ts'] !== null ? [$result['new_max_ts'], $node['id']] : [$node['id']]; $s->execute($params); if ($result['new_max_ts'] !== null) { $entry['last_event_at'] = $result['new_max_ts']; } $results[] = $entry; } return $results; } // ── Settings ────────────────────────────────────────────────────────────────── function get_setting(string $key, string $default = ''): string { $s = db()->prepare('SELECT value FROM settings WHERE key_name = ?'); $s->execute([$key]); $row = $s->fetchColumn(); return $row !== false ? $row : $default; } function set_setting(string $key, string $value): void { $s = db()->prepare( 'INSERT INTO settings (key_name, value) VALUES (?, ?) ON DUPLICATE KEY UPDATE value = VALUES(value)' ); $s->execute([$key, $value]); } // ── Purge ───────────────────────────────────────────────────────────────────── /** * Delete connections older than retention_days setting. * Returns the number of rows deleted. */ function purge_old_connections(): int { $days = max(1, (int)get_setting('retention_days', '7')); $s = db()->prepare( 'DELETE FROM connections WHERE occurred_at < NOW() - INTERVAL ? DAY' ); $s->execute([$days]); return $s->rowCount(); } // ── Upstream version check ──────────────────────────────────────────────────── define('UPSTREAM_VERSION_URL', 'https://git.ny.daprogs.com/api/v1/repos/DAProgs/portspoof_concentrator/raw/version.php?ref=main'); define('UPSTREAM_VERSION_CACHE', __DIR__ . '/../upstream_version.cache'); define('UPSTREAM_VERSION_TTL', 3600); // seconds /** * Fetch the upstream version string from the git repo, with a 1-hour file cache. * Returns null silently on any failure so the page always loads. */ function fetch_upstream_version(): ?string { // Return cached value if still fresh if (file_exists(UPSTREAM_VERSION_CACHE)) { $cached = json_decode(file_get_contents(UPSTREAM_VERSION_CACHE), true); if (isset($cached['version'], $cached['checked_at']) && (time() - $cached['checked_at']) < UPSTREAM_VERSION_TTL) { return $cached['version']; } } $ch = curl_init(UPSTREAM_VERSION_URL); curl_setopt_array($ch, [ CURLOPT_RETURNTRANSFER => true, CURLOPT_TIMEOUT => 5, CURLOPT_FOLLOWLOCATION => true, ]); $body = curl_exec($ch); $code = curl_getinfo($ch, CURLINFO_HTTP_CODE); curl_close($ch); if ($body === false || $code !== 200) { return null; } if (!preg_match("/define\s*\(\s*'APP_VERSION'\s*,\s*'([^']+)'\s*\)/", $body, $m)) { return null; } $version = $m[1]; file_put_contents( UPSTREAM_VERSION_CACHE, json_encode(['version' => $version, 'checked_at' => time()]), LOCK_EX ); return $version; } /** * Returns true if $upstream is a newer version than $local. * Format: YYMM.N e.g. 2603.4 */ function is_newer_version(string $upstream, string $local): bool { $parse = fn($v) => array_map('intval', explode('.', $v, 2)); [$um, $un] = $parse($upstream); [$lm, $ln] = $parse($local); return $um > $lm || ($um === $lm && $un > $ln); } // ── API queries ─────────────────────────────────────────────────────────────── /** * Return connections from the last $minutes minutes, newest first. * Optionally filtered to a single node. */ function connections_since(int $minutes = 10, ?int $node_id = null): array { $since = date('Y-m-d H:i:s', time() - $minutes * 60); if ($node_id !== null) { $s = db()->prepare( 'SELECT c.id, c.occurred_at, c.src_ip, c.src_port, c.dst_port, c.banner_hex, c.banner_len, n.name AS node_name, n.id AS node_id FROM connections c JOIN nodes n ON n.id = c.node_id WHERE c.node_id = ? AND c.occurred_at >= ? ORDER BY c.occurred_at DESC' ); $s->execute([$node_id, $since]); } else { $s = db()->prepare( 'SELECT c.id, c.occurred_at, c.src_ip, c.src_port, c.dst_port, c.banner_hex, c.banner_len, n.name AS node_name, n.id AS node_id FROM connections c JOIN nodes n ON n.id = c.node_id WHERE c.occurred_at >= ? ORDER BY c.occurred_at DESC' ); $s->execute([$since]); } return $s->fetchAll(); } // ── Dashboard stats ─────────────────────────────────────────────────────────── function global_stats(): array { $pdo = db(); $total = (int)$pdo->query('SELECT COUNT(*) FROM connections')->fetchColumn(); $since = date('Y-m-d H:i:s', time() - RATE_WINDOW_SECONDS); $recent = (int)$pdo->prepare('SELECT COUNT(*) FROM connections WHERE occurred_at >= ?') ->execute([$since]) ? : 0; $s = $pdo->prepare('SELECT COUNT(*) FROM connections WHERE occurred_at >= ?'); $s->execute([$since]); $recent = (int)$s->fetchColumn(); $s = $pdo->query('SELECT MAX(occurred_at) FROM connections'); $last = $s->fetchColumn() ?: null; return compact('total', 'recent', 'last'); } function top_ips(int $n = TOP_N): array { $s = db()->prepare( 'SELECT src_ip, COUNT(*) AS cnt FROM connections WHERE occurred_at >= NOW() - INTERVAL 24 HOUR GROUP BY src_ip ORDER BY cnt DESC LIMIT ?' ); $s->execute([$n]); return $s->fetchAll(); } function top_ports(int $n = TOP_N): array { $s = db()->prepare( 'SELECT dst_port, COUNT(*) AS cnt FROM connections WHERE occurred_at >= NOW() - INTERVAL 24 HOUR GROUP BY dst_port ORDER BY cnt DESC LIMIT ?' ); $s->execute([$n]); return $s->fetchAll(); } function top_ips_by_node(int $node_id, int $n = TOP_N): array { $s = db()->prepare( 'SELECT src_ip, COUNT(*) AS cnt FROM connections WHERE node_id = ? AND occurred_at >= NOW() - INTERVAL 24 HOUR GROUP BY src_ip ORDER BY cnt DESC LIMIT ?' ); $s->execute([$node_id, $n]); return $s->fetchAll(); } function recent_connections(int $limit = DASH_RECENT_LIMIT, ?int $node_id = null): array { if ($node_id !== null) { $s = db()->prepare( 'SELECT c.*, n.name AS node_name FROM connections c JOIN nodes n ON n.id = c.node_id WHERE c.node_id = ? ORDER BY c.occurred_at DESC LIMIT ?' ); $s->execute([$node_id, $limit]); } else { $s = db()->prepare( 'SELECT c.*, n.name AS node_name FROM connections c JOIN nodes n ON n.id = c.node_id ORDER BY c.occurred_at DESC LIMIT ?' ); $s->execute([$limit]); } return $s->fetchAll(); } // ── Misc ────────────────────────────────────────────────────────────────────── function h(string $s): string { return htmlspecialchars($s, ENT_QUOTES | ENT_SUBSTITUTE, 'UTF-8'); } function banner_text(string $hex): string { $raw = hex2bin($hex); if ($raw === false) return ''; return mb_convert_encoding($raw, 'UTF-8', 'latin-1'); }