query('SELECT * FROM nodes ORDER BY name')->fetchAll(); } function get_node(int $id): array|false { $s = db()->prepare('SELECT * FROM nodes WHERE id = ?'); $s->execute([$id]); return $s->fetch(); } function upsert_node(array $data, ?int $id = null): int { $pdo = db(); if ($id) { $s = $pdo->prepare( 'UPDATE nodes SET name=?, api_url=?, username=?, password=?, verify_ssl=?, enabled=? WHERE id=?' ); $s->execute([ $data['name'], $data['api_url'], $data['username'], $data['password'], (int)($data['verify_ssl'] ?? 0), (int)($data['enabled'] ?? 1), $id, ]); return $id; } $s = $pdo->prepare( 'INSERT INTO nodes (name, api_url, username, password, verify_ssl, enabled) VALUES (?, ?, ?, ?, ?, ?)' ); $s->execute([ $data['name'], $data['api_url'], $data['username'], $data['password'], (int)($data['verify_ssl'] ?? 0), (int)($data['enabled'] ?? 1), ]); return (int)$pdo->lastInsertId(); } function delete_node(int $id): void { $s = db()->prepare('DELETE FROM nodes WHERE id = ?'); $s->execute([$id]); } // ── Fetch helpers ───────────────────────────────────────────────────────────── /** * Call the portspoof_py JSON API on a node. * Returns decoded JSON on success, or false on failure. */ function fetch_node_api(array $node, string $path): mixed { $url = rtrim($node['api_url'], '/') . $path; $ch = curl_init($url); curl_setopt_array($ch, [ CURLOPT_RETURNTRANSFER => true, CURLOPT_TIMEOUT => FETCH_TIMEOUT, CURLOPT_USERPWD => $node['username'] . ':' . $node['password'], CURLOPT_HTTPAUTH => CURLAUTH_BASIC, CURLOPT_SSL_VERIFYPEER => (bool)$node['verify_ssl'], CURLOPT_SSL_VERIFYHOST => $node['verify_ssl'] ? 2 : 0, CURLOPT_HTTPHEADER => ['Accept: application/json'], ]); $body = curl_exec($ch); $code = curl_getinfo($ch, CURLINFO_HTTP_CODE); curl_close($ch); if ($body === false || $code !== 200) { return false; } return json_decode($body, true); } /** * Ingest connection events from portspoof_py that are strictly newer than * $last_event_at (ISO 8601 string, or null to ingest everything). * * Returns ['inserted' => int, 'new_max_ts' => string|null] * where new_max_ts is the DATETIME(6) of the newest row inserted, or null if * nothing new was inserted. */ function ingest_connections(int $node_id, array $events, ?string $last_event_at): array { $inserted = 0; $new_max_ts = null; $s = db()->prepare( 'INSERT INTO connections (node_id, occurred_at, src_ip, src_port, dst_port, banner_hex, banner_len) VALUES (?, ?, ?, ?, ?, ?, ?)' ); foreach ($events as $ev) { $raw_ts = $ev['timestamp'] ?? null; if ($raw_ts === null) { continue; } // Normalise to MySQL DATETIME(6) $ts = date('Y-m-d H:i:s.u', strtotime($raw_ts)); // Skip events already ingested if ($last_event_at !== null && $ts <= $last_event_at) { continue; } $s->execute([ $node_id, $ts, $ev['src_ip'] ?? '', (int)($ev['src_port'] ?? 0), (int)($ev['dst_port'] ?? 0), $ev['banner_hex'] ?? null, (int)($ev['banner_len'] ?? 0), ]); $inserted++; if ($new_max_ts === null || $ts > $new_max_ts) { $new_max_ts = $ts; } } return ['inserted' => $inserted, 'new_max_ts' => $new_max_ts]; } /** * Poll every enabled node, ingest new events, and advance each node's cursor. * * Returns an array of per-node result maps: * ['node_id', 'name', 'fetched', 'inserted', 'last_event_at', 'error'] */ function run_fetch(): array { $nodes = get_all_nodes(); $enabled = array_filter($nodes, fn($n) => (bool)$n['enabled']); $results = []; foreach ($enabled as $node) { $entry = [ 'node_id' => (int)$node['id'], 'name' => $node['name'], 'fetched' => 0, 'inserted' => 0, 'last_event_at' => $node['last_event_at'], 'error' => null, ]; $events = fetch_node_api($node, '/api/connections?limit=' . FETCH_LIMIT); if ($events === false) { $entry['error'] = 'could not reach API'; $results[] = $entry; continue; } if (!is_array($events)) { $entry['error'] = 'unexpected API response'; $results[] = $entry; continue; } $entry['fetched'] = count($events); $result = ingest_connections((int)$node['id'], $events, $node['last_event_at']); $entry['inserted'] = $result['inserted']; $s = db()->prepare( 'UPDATE nodes SET last_fetched_at = NOW()' . ($result['new_max_ts'] !== null ? ', last_event_at = ?' : '') . ' WHERE id = ?' ); $params = $result['new_max_ts'] !== null ? [$result['new_max_ts'], $node['id']] : [$node['id']]; $s->execute($params); if ($result['new_max_ts'] !== null) { $entry['last_event_at'] = $result['new_max_ts']; } $results[] = $entry; } return $results; } // ── Dashboard stats ─────────────────────────────────────────────────────────── function global_stats(): array { $pdo = db(); $total = (int)$pdo->query('SELECT COUNT(*) FROM connections')->fetchColumn(); $since = date('Y-m-d H:i:s', time() - RATE_WINDOW_SECONDS); $recent = (int)$pdo->prepare('SELECT COUNT(*) FROM connections WHERE occurred_at >= ?') ->execute([$since]) ? : 0; $s = $pdo->prepare('SELECT COUNT(*) FROM connections WHERE occurred_at >= ?'); $s->execute([$since]); $recent = (int)$s->fetchColumn(); $s = $pdo->query('SELECT MAX(occurred_at) FROM connections'); $last = $s->fetchColumn() ?: null; return compact('total', 'recent', 'last'); } function top_ips(int $n = TOP_N): array { $s = db()->prepare( 'SELECT src_ip, COUNT(*) AS cnt FROM connections GROUP BY src_ip ORDER BY cnt DESC LIMIT ?' ); $s->execute([$n]); return $s->fetchAll(); } function top_ports(int $n = TOP_N): array { $s = db()->prepare( 'SELECT dst_port, COUNT(*) AS cnt FROM connections GROUP BY dst_port ORDER BY cnt DESC LIMIT ?' ); $s->execute([$n]); return $s->fetchAll(); } function top_ips_by_node(int $node_id, int $n = TOP_N): array { $s = db()->prepare( 'SELECT src_ip, COUNT(*) AS cnt FROM connections WHERE node_id = ? GROUP BY src_ip ORDER BY cnt DESC LIMIT ?' ); $s->execute([$node_id, $n]); return $s->fetchAll(); } function recent_connections(int $limit = DASH_RECENT_LIMIT, ?int $node_id = null): array { if ($node_id !== null) { $s = db()->prepare( 'SELECT c.*, n.name AS node_name FROM connections c JOIN nodes n ON n.id = c.node_id WHERE c.node_id = ? ORDER BY c.occurred_at DESC LIMIT ?' ); $s->execute([$node_id, $limit]); } else { $s = db()->prepare( 'SELECT c.*, n.name AS node_name FROM connections c JOIN nodes n ON n.id = c.node_id ORDER BY c.occurred_at DESC LIMIT ?' ); $s->execute([$limit]); } return $s->fetchAll(); } // ── Misc ────────────────────────────────────────────────────────────────────── function h(string $s): string { return htmlspecialchars($s, ENT_QUOTES | ENT_SUBSTITUTE, 'UTF-8'); } function banner_text(string $hex): string { $raw = hex2bin($hex); if ($raw === false) return ''; return mb_convert_encoding($raw, 'UTF-8', 'latin-1'); }