use async_trait::async_trait; use chrono::Utc; use serde::Serialize; use serde_json::json; use std::process::Stdio; use std::time::Duration; use tokio::fs; use tokio::process::Command; use tokio::time::timeout; use super::{AgentType, Collector, CollectorError, CollectorOutput}; #[derive(Debug, Clone)] pub struct ServiceCollector { pub interval: Duration, pub services: Vec, pub timeout_ms: u64, pub cpu_tracking: std::sync::Arc>>, pub description_cache: std::sync::Arc>>>, } #[derive(Debug, Clone)] pub(crate) struct CpuSample { utime: u64, stime: u64, timestamp: std::time::Instant, } impl ServiceCollector { pub fn new(_enabled: bool, interval_ms: u64, services: Vec) -> Self { Self { interval: Duration::from_millis(interval_ms), services, timeout_ms: 10000, // 10 second timeout for service checks cpu_tracking: std::sync::Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())), description_cache: std::sync::Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())), } } async fn get_service_status(&self, service: &str) -> Result { let timeout_duration = Duration::from_millis(self.timeout_ms); // Use more efficient systemctl command - just get the essential info let status_output = timeout( timeout_duration, Command::new("/run/current-system/sw/bin/systemctl") .args(["show", service, "--property=ActiveState,SubState,MainPID", "--no-pager"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output(), ) .await .map_err(|_| CollectorError::Timeout { duration_ms: self.timeout_ms, })? .map_err(|e| CollectorError::CommandFailed { command: format!("systemctl show {}", service), message: e.to_string(), })?; if !status_output.status.success() { return Err(CollectorError::ServiceNotFound { service: service.to_string(), }); } let status_stdout = String::from_utf8_lossy(&status_output.stdout); let mut active_state = None; let mut sub_state = None; let mut main_pid = None; for line in status_stdout.lines() { if let Some(value) = line.strip_prefix("ActiveState=") { active_state = Some(value.to_string()); } else if let Some(value) = line.strip_prefix("SubState=") { sub_state = Some(value.to_string()); } else if let Some(value) = line.strip_prefix("MainPID=") { main_pid = value.parse::().ok(); } } let status = self.determine_service_status(&active_state, &sub_state); // Get resource usage if service is running let (memory_used_mb, cpu_percent) = if let Some(pid) = main_pid { self.get_process_resources(pid).await.unwrap_or((0.0, 0.0)) } else { (0.0, 0.0) }; // Get memory quota from systemd if available let memory_quota_mb = self.get_service_memory_limit(service).await.unwrap_or(0.0); // Get disk usage for this service (only for running services) let disk_used_gb = if matches!(status, ServiceStatus::Running) { self.get_service_disk_usage(service).await.unwrap_or(0.0) } else { 0.0 }; // Get service-specific description (only for running services) let description = if matches!(status, ServiceStatus::Running) { self.get_service_description_with_cache(service).await } else { None }; Ok(ServiceData { name: service.to_string(), status, memory_used_mb, memory_quota_mb, cpu_percent, sandbox_limit: None, // TODO: Implement sandbox limit detection disk_used_gb, disk_quota_gb: 0.0, // Will be set to system total in collect() description, sub_service: None, }) } fn determine_service_status( &self, active_state: &Option, sub_state: &Option, ) -> ServiceStatus { match (active_state.as_deref(), sub_state.as_deref()) { (Some("active"), Some("running")) => ServiceStatus::Running, (Some("active"), Some("exited")) => ServiceStatus::Running, // One-shot services (Some("reloading"), _) | (Some("activating"), _) => ServiceStatus::Restarting, (Some("failed"), _) | (Some("inactive"), Some("failed")) => ServiceStatus::Stopped, (Some("inactive"), _) => ServiceStatus::Stopped, _ => ServiceStatus::Degraded, } } async fn get_process_resources(&self, pid: u32) -> Result<(f32, f32), CollectorError> { // Read /proc/{pid}/stat for CPU and memory info let stat_path = format!("/proc/{}/stat", pid); let stat_content = fs::read_to_string(&stat_path) .await .map_err(|e| CollectorError::IoError { message: e.to_string(), })?; let stat_fields: Vec<&str> = stat_content.split_whitespace().collect(); if stat_fields.len() < 24 { return Err(CollectorError::ParseError { message: format!("Invalid /proc/{}/stat format", pid), }); } // Field 23 is RSS (Resident Set Size) in pages let rss_pages: u64 = stat_fields[23] .parse() .map_err(|e| CollectorError::ParseError { message: format!("Failed to parse RSS from /proc/{}/stat: {}", pid, e), })?; // Convert pages to MB (assuming 4KB pages) let memory_mb = (rss_pages * 4) as f32 / 1024.0; // Calculate CPU percentage let cpu_percent = self.calculate_cpu_usage(pid, &stat_fields).await.unwrap_or(0.0); Ok((memory_mb, cpu_percent)) } async fn calculate_cpu_usage(&self, pid: u32, stat_fields: &[&str]) -> Result { // Parse CPU time fields from /proc/pid/stat let utime: u64 = stat_fields[13].parse().map_err(|e| CollectorError::ParseError { message: format!("Failed to parse utime: {}", e), })?; let stime: u64 = stat_fields[14].parse().map_err(|e| CollectorError::ParseError { message: format!("Failed to parse stime: {}", e), })?; let now = std::time::Instant::now(); let current_sample = CpuSample { utime, stime, timestamp: now, }; let mut cpu_tracking = self.cpu_tracking.lock().await; let cpu_percent = if let Some(previous_sample) = cpu_tracking.get(&pid) { let time_delta = now.duration_since(previous_sample.timestamp).as_secs_f32(); if time_delta > 0.1 { // At least 100ms between samples let utime_delta = current_sample.utime.saturating_sub(previous_sample.utime); let stime_delta = current_sample.stime.saturating_sub(previous_sample.stime); let total_delta = utime_delta + stime_delta; // Convert from jiffies to CPU percentage // sysconf(_SC_CLK_TCK) is typically 100 on Linux let hz = 100.0; // Clock ticks per second let cpu_time_used = total_delta as f32 / hz; let cpu_percent = (cpu_time_used / time_delta) * 100.0; // Cap at reasonable values cpu_percent.min(999.9) } else { 0.0 // Too soon for accurate measurement } } else { 0.0 // First measurement, no baseline }; // Store current sample for next calculation cpu_tracking.insert(pid, current_sample); // Clean up old entries (processes that no longer exist) let cutoff = now - Duration::from_secs(300); // 5 minutes cpu_tracking.retain(|_, sample| sample.timestamp > cutoff); Ok(cpu_percent) } async fn get_service_disk_usage(&self, service: &str) -> Result { // Only check the most likely path to avoid multiple du calls let primary_path = format!("/var/lib/{}", service); // Use a quick check first - if directory doesn't exist, don't run du if tokio::fs::metadata(&primary_path).await.is_err() { return Ok(0.0); } self.get_directory_size(&primary_path).await } async fn get_directory_size(&self, path: &str) -> Result { let output = Command::new("sudo") .args(["/run/current-system/sw/bin/du", "-s", "-k", path]) // Use kilobytes instead of forcing GB .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .map_err(|e| CollectorError::CommandFailed { command: format!("du -s -k {}", path), message: e.to_string(), })?; if !output.status.success() { // Directory doesn't exist or permission denied - return 0 return Ok(0.0); } let stdout = String::from_utf8_lossy(&output.stdout); if let Some(line) = stdout.lines().next() { if let Some(size_str) = line.split_whitespace().next() { let size_kb = size_str.parse::().unwrap_or(0.0); let size_gb = size_kb / (1024.0 * 1024.0); // Convert KB to GB return Ok(size_gb); } } Ok(0.0) } async fn get_service_memory_limit(&self, service: &str) -> Result { let output = Command::new("/run/current-system/sw/bin/systemctl") .args(["show", service, "--property=MemoryMax", "--no-pager"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .map_err(|e| CollectorError::CommandFailed { command: format!("systemctl show {} --property=MemoryMax", service), message: e.to_string(), })?; let stdout = String::from_utf8_lossy(&output.stdout); for line in stdout.lines() { if let Some(value) = line.strip_prefix("MemoryMax=") { if value == "infinity" { return Ok(0.0); // No limit } if let Ok(bytes) = value.parse::() { return Ok(bytes as f32 / (1024.0 * 1024.0)); // Convert to MB } } } Ok(0.0) // No limit or couldn't parse } async fn get_disk_usage(&self) -> Result { let output = Command::new("/run/current-system/sw/bin/df") .args(["-BG", "--output=size,used,avail", "/"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .map_err(|e| CollectorError::CommandFailed { command: "df -BG --output=size,used,avail /".to_string(), message: e.to_string(), })?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); return Err(CollectorError::CommandFailed { command: "df -BG --output=size,used,avail /".to_string(), message: stderr.to_string(), }); } let stdout = String::from_utf8_lossy(&output.stdout); let lines: Vec<&str> = stdout.lines().collect(); if lines.len() < 2 { return Err(CollectorError::ParseError { message: "Unexpected df output format".to_string(), }); } let data_line = lines[1].trim(); let parts: Vec<&str> = data_line.split_whitespace().collect(); if parts.len() < 3 { return Err(CollectorError::ParseError { message: format!("Unexpected df data format: {}", data_line), }); } let parse_size = |s: &str| -> Result { s.trim_end_matches('G') .parse::() .map_err(|e| CollectorError::ParseError { message: format!("Failed to parse disk size '{}': {}", s, e), }) }; Ok(DiskUsage { total_capacity_gb: parse_size(parts[0])?, used_gb: parse_size(parts[1])?, }) } fn determine_services_status(&self, healthy: usize, degraded: usize, failed: usize) -> String { if failed > 0 { "critical".to_string() } else if degraded > 0 { "warning".to_string() } else if healthy > 0 { "ok".to_string() } else { "unknown".to_string() } } async fn get_gpu_metrics(&self) -> (Option, Option) { let output = Command::new("nvidia-smi") .args([ "--query-gpu=utilization.gpu,temperature.gpu", "--format=csv,noheader,nounits", ]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await; match output { Ok(result) if result.status.success() => { let stdout = String::from_utf8_lossy(&result.stdout); if let Some(line) = stdout.lines().next() { let parts: Vec<&str> = line.split(',').map(|s| s.trim()).collect(); if parts.len() >= 2 { let load = parts[0].parse::().ok(); let temp = parts[1].parse::().ok(); return (load, temp); } } (None, None) } Ok(_) | Err(_) => { let util_output = Command::new("/opt/vc/bin/vcgencmd") .arg("measure_temp") .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await; if let Ok(result) = util_output { if result.status.success() { let stdout = String::from_utf8_lossy(&result.stdout); if let Some(value) = stdout .trim() .strip_prefix("temp=") .and_then(|s| s.strip_suffix("'C")) { if let Ok(temp_c) = value.parse::() { return (None, Some(temp_c)); } } } } (None, None) } } } async fn get_service_description_with_cache(&self, service: &str) -> Option> { // Check if we should update the cache (throttled) let should_update = self.should_update_description(service).await; if should_update { if let Some(new_description) = self.get_service_description(service).await { // Update cache let mut cache = self.description_cache.lock().await; cache.insert(service.to_string(), new_description.clone()); return Some(new_description); } } // Always return cached description if available let cache = self.description_cache.lock().await; cache.get(service).cloned() } async fn should_update_description(&self, _service: &str) -> bool { // For now, always update descriptions since we have caching // The cache will prevent redundant work true } async fn get_service_description(&self, service: &str) -> Option> { let result = match service { // KEEP: nginx sites and docker containers (needed for sub-services) "nginx" => self.get_nginx_description().await.map(|s| vec![s]), "docker" => self.get_docker_containers().await, // DISABLED: All connection monitoring for CPU/C-state testing /* "sshd" | "ssh" => self.get_ssh_active_users().await.map(|s| vec![s]), "apache2" | "httpd" => self.get_web_server_connections().await.map(|s| vec![s]), "docker-registry" => self.get_docker_registry_info().await.map(|s| vec![s]), "postgresql" | "postgres" => self.get_postgres_connections().await.map(|s| vec![s]), "mysql" | "mariadb" => self.get_mysql_connections().await.map(|s| vec![s]), "redis" | "redis-immich" => self.get_redis_info().await.map(|s| vec![s]), "gitea" => self.get_gitea_info().await.map(|s| vec![s]), "immich-server" => self.get_immich_info().await.map(|s| vec![s]), "vaultwarden" => self.get_vaultwarden_info().await.map(|s| vec![s]), "unifi" => self.get_unifi_info().await.map(|s| vec![s]), "mosquitto" => self.get_mosquitto_info().await.map(|s| vec![s]), "haasp-webgrid" => self.get_haasp_webgrid_info().await.map(|s| vec![s]), */ _ => None, }; result } async fn get_ssh_active_users(&self) -> Option { // Use ss to find established SSH connections on port 22 let output = Command::new("/run/current-system/sw/bin/ss") .args(["-tn", "state", "established", "sport", "= :22"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if !output.status.success() { return None; } let stdout = String::from_utf8_lossy(&output.stdout); let mut connections = 0; // Count lines excluding header for line in stdout.lines().skip(1) { if !line.trim().is_empty() { connections += 1; } } if connections > 0 { Some(format!("{} connections", connections)) } else { None } } async fn get_web_server_connections(&self) -> Option { // Use simpler ss command with minimal output let output = Command::new("/run/current-system/sw/bin/ss") .args(["-tn", "state", "established", "sport", ":80", "or", "sport", ":443"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if !output.status.success() { return None; } let stdout = String::from_utf8_lossy(&output.stdout); let connection_count = stdout.lines().count().saturating_sub(1); // Subtract header line if connection_count > 0 { Some(format!("{} connections", connection_count)) } else { None } } async fn get_docker_containers(&self) -> Option> { let output = Command::new("/run/current-system/sw/bin/docker") .args(["ps", "--format", "{{.Names}}"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if !output.status.success() { return None; } let stdout = String::from_utf8_lossy(&output.stdout); let containers: Vec = stdout .lines() .filter(|line| !line.trim().is_empty()) .map(|line| line.trim().to_string()) .collect(); if containers.is_empty() { None } else { Some(containers) } } async fn get_postgres_connections(&self) -> Option { let output = Command::new("sudo") .args(["-u", "postgres", "/run/current-system/sw/bin/psql", "-t", "-c", "SELECT count(*) FROM pg_stat_activity WHERE state = 'active';"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if !output.status.success() { return None; } let stdout = String::from_utf8_lossy(&output.stdout); if let Some(line) = stdout.lines().next() { if let Ok(count) = line.trim().parse::() { if count > 0 { return Some(format!("{} connections", count)); } } } None } async fn get_mysql_connections(&self) -> Option { // Try mysql command first let output = Command::new("/run/current-system/sw/bin/mysql") .args(["-e", "SHOW PROCESSLIST;"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if output.status.success() { let stdout = String::from_utf8_lossy(&output.stdout); let connection_count = stdout.lines().count().saturating_sub(1); // Subtract header line if connection_count > 0 { return Some(format!("{} connections", connection_count)); } } // Fallback: check MySQL unix socket connections (more common than TCP) let output = Command::new("/run/current-system/sw/bin/ss") .args(["-x", "state", "connected", "src", "*mysql*"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if output.status.success() { let stdout = String::from_utf8_lossy(&output.stdout); let connection_count = stdout.lines().count().saturating_sub(1); if connection_count > 0 { return Some(format!("{} connections", connection_count)); } } // Also try TCP port 3306 as final fallback let output = Command::new("/run/current-system/sw/bin/ss") .args(["-tn", "state", "established", "dport", "= :3306"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if output.status.success() { let stdout = String::from_utf8_lossy(&output.stdout); let connection_count = stdout.lines().count().saturating_sub(1); if connection_count > 0 { return Some(format!("{} connections", connection_count)); } } None } fn is_running_as_root(&self) -> bool { std::env::var("USER").unwrap_or_default() == "root" || std::env::var("UID").unwrap_or_default() == "0" } async fn get_nginx_sites(&self) -> Option> { // Get the actual nginx config file path from systemd (NixOS uses custom config) let config_path = match self.get_nginx_config_from_systemd().await { Some(path) => path, None => { // Fallback to default nginx -T let mut cmd = if self.is_running_as_root() { Command::new("/run/current-system/sw/bin/nginx") } else { let mut cmd = Command::new("sudo"); cmd.arg("/run/current-system/sw/bin/nginx"); cmd }; match cmd .args(["-T"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await { Ok(output) => { if !output.status.success() { return None; } let config = String::from_utf8_lossy(&output.stdout); return self.parse_nginx_config(&config).await; } Err(_) => { return None; } } } }; // Use the specific config file let mut cmd = if self.is_running_as_root() { Command::new("/run/current-system/sw/bin/nginx") } else { let mut cmd = Command::new("sudo"); cmd.arg("/run/current-system/sw/bin/nginx"); cmd }; let output = match cmd .args(["-T", "-c", &config_path]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await { Ok(output) => output, Err(_) => { return None; } }; if !output.status.success() { return None; } let config = String::from_utf8_lossy(&output.stdout); self.parse_nginx_config(&config).await } async fn get_nginx_config_from_systemd(&self) -> Option { let output = Command::new("/run/current-system/sw/bin/systemctl") .args(["show", "nginx", "--property=ExecStart", "--no-pager"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if !output.status.success() { return None; } let stdout = String::from_utf8_lossy(&output.stdout); // Parse ExecStart to extract -c config path for line in stdout.lines() { if line.starts_with("ExecStart=") { // Handle both traditional and NixOS systemd formats // Traditional: ExecStart=/path/nginx -c /config // NixOS: ExecStart={ path=...; argv[]=...nginx -c /config; ... } if let Some(c_index) = line.find(" -c ") { let after_c = &line[c_index + 4..]; // Find the end of the config path let end_pos = after_c.find(' ') .or_else(|| after_c.find(" ;")) // NixOS format ends with " ;" .unwrap_or(after_c.len()); let config_path = after_c[..end_pos].trim(); return Some(config_path.to_string()); } } } None } async fn parse_nginx_config(&self, config: &str) -> Option> { let mut sites = Vec::new(); let lines: Vec<&str> = config.lines().collect(); let mut i = 0; while i < lines.len() { let trimmed = lines[i].trim(); // Look for server blocks if trimmed == "server {" { if let Some(hostname) = self.parse_server_block(&lines, &mut i) { sites.push(hostname); } } i += 1; } // Check which sites are actually accessible let mut accessible_sites = Vec::new(); for site in sites { if self.check_site_accessibility(&site).await { accessible_sites.push(site); // Remove checkmark - status will be shown via sub_service row status } } // Limit to reasonable number accessible_sites.truncate(15); if accessible_sites.is_empty() { None } else { Some(accessible_sites) } } fn parse_server_block(&self, lines: &[&str], start_index: &mut usize) -> Option { let mut server_names = Vec::new(); let mut has_redirect = false; let mut i = *start_index + 1; let mut brace_count = 1; // Parse until we close the server block while i < lines.len() && brace_count > 0 { let trimmed = lines[i].trim(); // Track braces brace_count += trimmed.matches('{').count(); brace_count -= trimmed.matches('}').count(); // Extract server_name if trimmed.starts_with("server_name") { if let Some(names_part) = trimmed.strip_prefix("server_name") { let names_clean = names_part.trim().trim_end_matches(';'); for name in names_clean.split_whitespace() { if name != "_" && !name.is_empty() && name.contains('.') && !name.starts_with('$') { server_names.push(name.to_string()); } } } } // Check if this server block is just a redirect if trimmed.starts_with("return") && trimmed.contains("301") { has_redirect = true; } i += 1; } *start_index = i - 1; // Only return hostnames that are not redirects and have actual content if !server_names.is_empty() && !has_redirect { Some(server_names[0].clone()) } else { None } } async fn check_site_accessibility(&self, hostname: &str) -> bool { // Try HTTPS first, then HTTP for scheme in ["https", "http"] { let url = format!("{}://{}", scheme, hostname); match tokio::time::timeout( Duration::from_secs(5), Command::new("/run/current-system/sw/bin/curl") .args(["-s", "-I", "--max-time", "3", &url]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() ).await { Ok(Ok(output)) if output.status.success() => { let response = String::from_utf8_lossy(&output.stdout); // Check for successful HTTP status codes if response.contains("HTTP/") && ( response.contains(" 200 ") || response.contains(" 301 ") || response.contains(" 302 ") || response.contains(" 403 ") // Some sites return 403 but are still "accessible" ) { return true; } } _ => continue, } } false } async fn get_nginx_description(&self) -> Option { // Get site count and active connections let sites = self.get_nginx_sites().await?; let site_count = sites.len(); // Get active connections let connections = self.get_web_server_connections().await; if let Some(conn_info) = connections { Some(format!("{} sites, {}", site_count, conn_info)) } else { Some(format!("{} sites", site_count)) } } async fn get_redis_info(&self) -> Option { // Try redis-cli first let output = Command::new("/run/current-system/sw/bin/redis-cli") .args(["info", "clients"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if output.status.success() { let stdout = String::from_utf8_lossy(&output.stdout); for line in stdout.lines() { if line.starts_with("connected_clients:") { if let Some(count) = line.split(':').nth(1) { if let Ok(client_count) = count.trim().parse::() { return Some(format!("{} connections", client_count)); } } } } } // Fallback: check for redis connections on port 6379 let output = Command::new("/run/current-system/sw/bin/ss") .args(["-tn", "state", "established", "dport", "= :6379"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if output.status.success() { let stdout = String::from_utf8_lossy(&output.stdout); let connection_count = stdout.lines().count().saturating_sub(1); if connection_count > 0 { return Some(format!("{} connections", connection_count)); } } None } async fn get_gitea_info(&self) -> Option { // Try to get gitea stats from API (if accessible) let output = Command::new("/run/current-system/sw/bin/curl") .args(["-s", "-f", "-m", "2", "http://localhost:3000/api/v1/repos/search?limit=1"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if output.status.success() { let stdout = String::from_utf8_lossy(&output.stdout); if let Ok(json) = serde_json::from_str::(&stdout) { if let Some(total_count) = json["total_count"].as_u64() { return Some(format!("{} repositories", total_count)); } } } // Fallback: check HTTP connections on port 3000 let output = Command::new("/run/current-system/sw/bin/ss") .args(["-tn", "state", "established", "dport", "= :3000"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if output.status.success() { let stdout = String::from_utf8_lossy(&output.stdout); let connection_count = stdout.lines().count().saturating_sub(1); if connection_count > 0 { return Some(format!("{} connections", connection_count)); } } None } async fn get_immich_info(&self) -> Option { // Check HTTP connections - Immich runs on port 8084 (from nginx proxy config) let output = Command::new("/run/current-system/sw/bin/ss") .args(["-tn", "state", "established", "dport", "= :8084"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if output.status.success() { let stdout = String::from_utf8_lossy(&output.stdout); let connection_count = stdout.lines().count().saturating_sub(1); if connection_count > 0 { return Some(format!("{} connections", connection_count)); } } None } async fn get_vaultwarden_info(&self) -> Option { // Check vaultwarden connections on port 8222 (from nginx proxy config) let output = Command::new("/run/current-system/sw/bin/ss") .args(["-tn", "state", "established", "dport", "= :8222"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if output.status.success() { let stdout = String::from_utf8_lossy(&output.stdout); let connection_count = stdout.lines().count().saturating_sub(1); if connection_count > 0 { return Some(format!("{} connections", connection_count)); } } None } async fn get_unifi_info(&self) -> Option { // Check UniFi connections on port 8080 (TCP) let output = Command::new("/run/current-system/sw/bin/ss") .args(["-tn", "state", "established", "dport", "= :8080"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if output.status.success() { let stdout = String::from_utf8_lossy(&output.stdout); let connection_count = stdout.lines().count().saturating_sub(1); if connection_count > 0 { return Some(format!("{} connections", connection_count)); } } None } async fn get_mosquitto_info(&self) -> Option { // Check for active connections using netstat on MQTT ports let output = Command::new("/run/current-system/sw/bin/ss") .args(["-tn", "state", "established", "sport", "= :1883", "or", "sport", "= :8883"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if output.status.success() { let stdout = String::from_utf8_lossy(&output.stdout); let connection_count = stdout.lines().count().saturating_sub(1); if connection_count > 0 { return Some(format!("{} connections", connection_count)); } } None } async fn get_docker_registry_info(&self) -> Option { // Check Docker registry connections on port 5000 (from nginx proxy config) let output = Command::new("/run/current-system/sw/bin/ss") .args(["-tn", "state", "established", "dport", "= :5000"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if output.status.success() { let stdout = String::from_utf8_lossy(&output.stdout); let connection_count = stdout.lines().count().saturating_sub(1); if connection_count > 0 { return Some(format!("{} connections", connection_count)); } } None } async fn get_haasp_webgrid_info(&self) -> Option { // Check HAASP webgrid connections on port 8081 let output = Command::new("/run/current-system/sw/bin/ss") .args(["-tn", "state", "established", "dport", "= :8081"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if output.status.success() { let stdout = String::from_utf8_lossy(&output.stdout); let connection_count = stdout.lines().count().saturating_sub(1); if connection_count > 0 { return Some(format!("{} connections", connection_count)); } } None } } #[async_trait] impl Collector for ServiceCollector { fn name(&self) -> &str { "service" } fn agent_type(&self) -> AgentType { AgentType::Service } fn collect_interval(&self) -> Duration { self.interval } async fn collect(&self) -> Result { let mut services = Vec::new(); let mut healthy = 0; let mut degraded = 0; let mut failed = 0; let mut total_memory_used = 0.0; let mut total_memory_quota = 0.0; let mut total_disk_used = 0.0; // Collect data from all configured services for service in &self.services { match self.get_service_status(service).await { Ok(service_data) => { match service_data.status { ServiceStatus::Running => healthy += 1, ServiceStatus::Degraded | ServiceStatus::Restarting => degraded += 1, ServiceStatus::Stopped => failed += 1, } total_memory_used += service_data.memory_used_mb; if service_data.memory_quota_mb > 0.0 { total_memory_quota += service_data.memory_quota_mb; } total_disk_used += service_data.disk_used_gb; // Handle nginx specially - create sub-services for sites if service == "nginx" && matches!(service_data.status, ServiceStatus::Running) { // Clear nginx description - sites will become individual sub-services let mut nginx_service = service_data; nginx_service.description = None; services.push(nginx_service); // Add nginx sites as individual sub-services if let Some(sites) = self.get_nginx_sites().await { for site in sites.iter() { services.push(ServiceData { name: site.clone(), status: ServiceStatus::Running, // Assume sites are running if nginx is running memory_used_mb: 0.0, memory_quota_mb: 0.0, cpu_percent: 0.0, sandbox_limit: None, disk_used_gb: 0.0, disk_quota_gb: 0.0, description: None, sub_service: Some("nginx".to_string()), }); healthy += 1; } } } // Handle docker specially - create sub-services for containers else if service == "docker" && matches!(service_data.status, ServiceStatus::Running) { // Clear docker description - containers will become individual sub-services let mut docker_service = service_data; docker_service.description = None; services.push(docker_service); // Add docker containers as individual sub-services if let Some(containers) = self.get_docker_containers().await { for container in containers.iter() { services.push(ServiceData { name: container.clone(), status: ServiceStatus::Running, // Assume containers are running if docker is running memory_used_mb: 0.0, memory_quota_mb: 0.0, cpu_percent: 0.0, sandbox_limit: None, disk_used_gb: 0.0, disk_quota_gb: 0.0, description: None, sub_service: Some("docker".to_string()), }); healthy += 1; } } } else { services.push(service_data); } } Err(e) => { failed += 1; // Add a placeholder service entry for failed collection services.push(ServiceData { name: service.clone(), status: ServiceStatus::Stopped, memory_used_mb: 0.0, memory_quota_mb: 0.0, cpu_percent: 0.0, sandbox_limit: None, disk_used_gb: 0.0, disk_quota_gb: 0.0, description: None, sub_service: None, }); tracing::warn!("Failed to collect metrics for service {}: {}", service, e); } } } let disk_usage = self.get_disk_usage().await.unwrap_or(DiskUsage { total_capacity_gb: 0.0, used_gb: 0.0, }); // Set disk quota to system total capacity for services that don't have specific quotas let system_disk_capacity_gb = disk_usage.total_capacity_gb; for service in &mut services { if service.disk_quota_gb == 0.0 { service.disk_quota_gb = system_disk_capacity_gb; } } // Calculate overall services status let services_status = self.determine_services_status(healthy, degraded, failed); let (gpu_load_percent, gpu_temp_c) = self.get_gpu_metrics().await; // If no specific quotas are set, use a default value if total_memory_quota == 0.0 { total_memory_quota = 8192.0; // Default 8GB for quota calculation } let service_metrics = json!({ "summary": { "healthy": healthy, "degraded": degraded, "failed": failed, "services_status": services_status, "memory_used_mb": total_memory_used, "memory_quota_mb": total_memory_quota, "disk_used_gb": total_disk_used, "disk_total_gb": total_disk_used, // For services, total = used (no quota concept) "gpu_load_percent": gpu_load_percent, "gpu_temp_c": gpu_temp_c, }, "services": services, "timestamp": Utc::now() }); Ok(CollectorOutput { agent_type: AgentType::Service, data: service_metrics, }) } } #[derive(Debug, Clone, Serialize)] struct ServiceData { name: String, status: ServiceStatus, memory_used_mb: f32, memory_quota_mb: f32, cpu_percent: f32, sandbox_limit: Option, disk_used_gb: f32, disk_quota_gb: f32, #[serde(skip_serializing_if = "Option::is_none")] description: Option>, #[serde(default)] sub_service: Option, } #[derive(Debug, Clone, Serialize)] enum ServiceStatus { Running, Degraded, Restarting, Stopped, } #[allow(dead_code)] struct DiskUsage { total_capacity_gb: f32, used_gb: f32, }