use async_trait::async_trait; use chrono::Utc; use serde::Serialize; use serde_json::json; use std::collections::HashMap; use std::process::Stdio; use std::time::Duration; use tokio::fs; use tokio::process::Command; use tokio::time::timeout; use super::{AgentType, Collector, CollectorError, CollectorOutput}; #[derive(Debug, Clone)] pub struct ServiceCollector { pub enabled: bool, pub interval: Duration, pub services: Vec, pub timeout_ms: u64, } impl ServiceCollector { pub fn new(enabled: bool, interval_ms: u64, services: Vec) -> Self { Self { enabled, interval: Duration::from_millis(interval_ms), services, timeout_ms: 10000, // 10 second timeout for service checks } } async fn get_service_status(&self, service: &str) -> Result { let timeout_duration = Duration::from_millis(self.timeout_ms); // Use more efficient systemctl command - just get the essential info let status_output = timeout( timeout_duration, Command::new("systemctl") .args(["show", service, "--property=ActiveState,SubState,MainPID", "--no-pager"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output(), ) .await .map_err(|_| CollectorError::Timeout { duration_ms: self.timeout_ms, })? .map_err(|e| CollectorError::CommandFailed { command: format!("systemctl show {}", service), message: e.to_string(), })?; if !status_output.status.success() { return Err(CollectorError::ServiceNotFound { service: service.to_string(), }); } let status_stdout = String::from_utf8_lossy(&status_output.stdout); let mut active_state = None; let mut sub_state = None; let mut main_pid = None; for line in status_stdout.lines() { if let Some(value) = line.strip_prefix("ActiveState=") { active_state = Some(value.to_string()); } else if let Some(value) = line.strip_prefix("SubState=") { sub_state = Some(value.to_string()); } else if let Some(value) = line.strip_prefix("MainPID=") { main_pid = value.parse::().ok(); } } let status = self.determine_service_status(&active_state, &sub_state); // Get resource usage if service is running let (memory_used_mb, cpu_percent) = if let Some(pid) = main_pid { self.get_process_resources(pid).await.unwrap_or((0.0, 0.0)) } else { (0.0, 0.0) }; // Get memory quota from systemd if available let memory_quota_mb = self.get_service_memory_limit(service).await.unwrap_or(0.0); // Get disk usage for this service (only for running services) let disk_used_gb = if matches!(status, ServiceStatus::Running) { self.get_service_disk_usage(service).await.unwrap_or(0.0) } else { 0.0 }; // Get service-specific description (only for running services, and throttled) let description = if matches!(status, ServiceStatus::Running) { self.get_service_description_throttled(service).await } else { None }; Ok(ServiceData { name: service.to_string(), status, memory_used_mb, memory_quota_mb, cpu_percent, sandbox_limit: None, // TODO: Implement sandbox limit detection disk_used_gb, description, }) } fn determine_service_status( &self, active_state: &Option, sub_state: &Option, ) -> ServiceStatus { match (active_state.as_deref(), sub_state.as_deref()) { (Some("active"), Some("running")) => ServiceStatus::Running, (Some("active"), Some("exited")) => ServiceStatus::Running, // One-shot services (Some("reloading"), _) | (Some("activating"), _) => ServiceStatus::Restarting, (Some("failed"), _) | (Some("inactive"), Some("failed")) => ServiceStatus::Stopped, (Some("inactive"), _) => ServiceStatus::Stopped, _ => ServiceStatus::Degraded, } } async fn get_process_resources(&self, pid: u32) -> Result<(f32, f32), CollectorError> { // Read /proc/{pid}/stat for CPU and memory info let stat_path = format!("/proc/{}/stat", pid); let stat_content = fs::read_to_string(&stat_path) .await .map_err(|e| CollectorError::IoError { message: e.to_string(), })?; let stat_fields: Vec<&str> = stat_content.split_whitespace().collect(); if stat_fields.len() < 24 { return Err(CollectorError::ParseError { message: format!("Invalid /proc/{}/stat format", pid), }); } // Field 23 is RSS (Resident Set Size) in pages let rss_pages: u64 = stat_fields[23] .parse() .map_err(|e| CollectorError::ParseError { message: format!("Failed to parse RSS from /proc/{}/stat: {}", pid, e), })?; // Convert pages to MB (assuming 4KB pages) let memory_mb = (rss_pages * 4) as f32 / 1024.0; // For CPU, we'd need to track over time - simplified to 0 for now // TODO: Implement proper CPU percentage calculation let cpu_percent = 0.0; Ok((memory_mb, cpu_percent)) } async fn get_service_disk_usage(&self, service: &str) -> Result { // Only check the most likely path to avoid multiple du calls let primary_path = format!("/var/lib/{}", service); // Use a quick check first - if directory doesn't exist, don't run du if tokio::fs::metadata(&primary_path).await.is_err() { return Ok(0.0); } self.get_directory_size(&primary_path).await } async fn get_directory_size(&self, path: &str) -> Result { let output = Command::new("du") .args(["-s", "-k", path]) // Use kilobytes instead of forcing GB .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .map_err(|e| CollectorError::CommandFailed { command: format!("du -s -k {}", path), message: e.to_string(), })?; if !output.status.success() { // Directory doesn't exist or permission denied - return 0 return Ok(0.0); } let stdout = String::from_utf8_lossy(&output.stdout); if let Some(line) = stdout.lines().next() { if let Some(size_str) = line.split_whitespace().next() { let size_kb = size_str.parse::().unwrap_or(0.0); let size_gb = size_kb / (1024.0 * 1024.0); // Convert KB to GB return Ok(size_gb); } } Ok(0.0) } async fn get_service_memory_limit(&self, service: &str) -> Result { let output = Command::new("systemctl") .args(["show", service, "--property=MemoryMax", "--no-pager"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .map_err(|e| CollectorError::CommandFailed { command: format!("systemctl show {} --property=MemoryMax", service), message: e.to_string(), })?; let stdout = String::from_utf8_lossy(&output.stdout); for line in stdout.lines() { if let Some(value) = line.strip_prefix("MemoryMax=") { if value == "infinity" { return Ok(0.0); // No limit } if let Ok(bytes) = value.parse::() { return Ok(bytes as f32 / (1024.0 * 1024.0)); // Convert to MB } } } Ok(0.0) // No limit or couldn't parse } async fn get_system_memory_info(&self) -> Result { let meminfo = fs::read_to_string("/proc/meminfo") .await .map_err(|e| CollectorError::IoError { message: e.to_string(), })?; let mut memory_info = HashMap::new(); for line in meminfo.lines() { if let Some((key, value)) = line.split_once(':') { let value = value.trim().trim_end_matches(" kB"); if let Ok(kb) = value.parse::() { memory_info.insert(key.to_string(), kb); } } } let total_kb = memory_info.get("MemTotal").copied().unwrap_or(0); let available_kb = memory_info.get("MemAvailable").copied().unwrap_or(0); let used_kb = total_kb.saturating_sub(available_kb); Ok(SystemMemoryInfo { total_mb: total_kb as f32 / 1024.0, used_mb: used_kb as f32 / 1024.0, }) } async fn get_disk_usage(&self) -> Result { let output = Command::new("df") .args(["-BG", "--output=size,used,avail", "/"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .map_err(|e| CollectorError::CommandFailed { command: "df -BG --output=size,used,avail /".to_string(), message: e.to_string(), })?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); return Err(CollectorError::CommandFailed { command: "df -BG --output=size,used,avail /".to_string(), message: stderr.to_string(), }); } let stdout = String::from_utf8_lossy(&output.stdout); let lines: Vec<&str> = stdout.lines().collect(); if lines.len() < 2 { return Err(CollectorError::ParseError { message: "Unexpected df output format".to_string(), }); } let data_line = lines[1].trim(); let parts: Vec<&str> = data_line.split_whitespace().collect(); if parts.len() < 3 { return Err(CollectorError::ParseError { message: format!("Unexpected df data format: {}", data_line), }); } let parse_size = |s: &str| -> Result { s.trim_end_matches('G') .parse::() .map_err(|e| CollectorError::ParseError { message: format!("Failed to parse disk size '{}': {}", s, e), }) }; Ok(DiskUsage { total_gb: parse_size(parts[0])?, used_gb: parse_size(parts[1])?, }) } async fn get_cpu_load(&self) -> Result<(f32, f32, f32), CollectorError> { let loadavg = fs::read_to_string("/proc/loadavg") .await .map_err(|e| CollectorError::IoError { message: e.to_string(), })?; let parts: Vec<&str> = loadavg.split_whitespace().collect(); if parts.len() < 3 { return Err(CollectorError::ParseError { message: "Unexpected /proc/loadavg format".to_string(), }); } let parse = |s: &str| -> Result { s.parse::().map_err(|e| CollectorError::ParseError { message: format!("Failed to parse load average '{}': {}", s, e), }) }; Ok((parse(parts[0])?, parse(parts[1])?, parse(parts[2])?)) } async fn get_cpu_frequency_mhz(&self) -> Option { let candidates = [ "/sys/devices/system/cpu/cpufreq/policy0/scaling_cur_freq", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_cur_freq", ]; for path in candidates { if let Ok(content) = fs::read_to_string(path).await { if let Ok(khz) = content.trim().parse::() { if khz > 0.0 { return Some(khz / 1000.0); } } } } if let Ok(content) = fs::read_to_string("/proc/cpuinfo").await { for line in content.lines() { if let Some(rest) = line.strip_prefix("cpu MHz") { if let Some(value) = rest.split(':').nth(1) { if let Ok(mhz) = value.trim().parse::() { if mhz > 0.0 { return Some(mhz); } } } } } } None } async fn get_cpu_temperature_c(&self) -> Option { let mut entries = fs::read_dir("/sys/class/thermal").await.ok()?; let mut fallback: Option = None; while let Ok(Some(entry)) = entries.next_entry().await { let path = entry.path(); let type_path = path.join("type"); let temp_path = path.join("temp"); let label = fs::read_to_string(&type_path).await.ok()?.to_lowercase(); let raw = match fs::read_to_string(&temp_path).await { Ok(value) => value, Err(_) => continue, }; let milli: f32 = match raw.trim().parse() { Ok(value) => value, Err(_) => continue, }; let temp_c = milli / 1000.0; if label.contains("cpu") || label.contains("pkg") { if temp_c > 0.0 { return Some(temp_c); } } if fallback.is_none() && temp_c > 0.0 { fallback = Some(temp_c); } } fallback } async fn get_gpu_metrics(&self) -> (Option, Option) { let output = Command::new("nvidia-smi") .args([ "--query-gpu=utilization.gpu,temperature.gpu", "--format=csv,noheader,nounits", ]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await; match output { Ok(result) if result.status.success() => { let stdout = String::from_utf8_lossy(&result.stdout); if let Some(line) = stdout.lines().next() { let parts: Vec<&str> = line.split(',').map(|s| s.trim()).collect(); if parts.len() >= 2 { let load = parts[0].parse::().ok(); let temp = parts[1].parse::().ok(); return (load, temp); } } (None, None) } Ok(_) | Err(_) => { let util_output = Command::new("/opt/vc/bin/vcgencmd") .arg("measure_temp") .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await; if let Ok(result) = util_output { if result.status.success() { let stdout = String::from_utf8_lossy(&result.stdout); if let Some(value) = stdout .trim() .strip_prefix("temp=") .and_then(|s| s.strip_suffix("'C")) { if let Ok(temp_c) = value.parse::() { return (None, Some(temp_c)); } } } } (None, None) } } } async fn get_service_description_throttled(&self, service: &str) -> Option> { // Simple time-based throttling - only run expensive descriptions every ~30 seconds // Use a hash of the current time to spread out when different services get described let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs(); let service_hash = service.as_bytes().iter().fold(0u64, |acc, &b| { acc.wrapping_mul(31).wrapping_add(b as u64) }); // Each service gets its description updated every 30 seconds, but staggered let update_interval = 30; // seconds let service_offset = service_hash % update_interval; if (now + service_offset) % update_interval == 0 { self.get_service_description(service).await } else { None // Return None to indicate no new description this cycle } } async fn get_service_description(&self, service: &str) -> Option> { match service { "sshd" | "ssh" => self.get_ssh_active_users().await.map(|s| vec![s]), "nginx" => self.get_nginx_sites().await, "apache2" | "httpd" => self.get_web_server_connections().await.map(|s| vec![s]), "docker" => self.get_docker_containers().await.map(|s| vec![s]), "postgresql" | "postgres" => self.get_postgres_connections().await.map(|s| vec![s]), "mysql" | "mariadb" => self.get_mysql_connections().await.map(|s| vec![s]), _ => None, } } async fn get_ssh_active_users(&self) -> Option { let output = Command::new("who") .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if !output.status.success() { return None; } let stdout = String::from_utf8_lossy(&output.stdout); let mut ssh_users = Vec::new(); for line in stdout.lines() { let parts: Vec<&str> = line.split_whitespace().collect(); if parts.len() >= 2 { let user = parts[0]; let terminal = parts[1]; // SSH sessions typically show pts/X terminals if terminal.starts_with("pts/") { ssh_users.push(user); } } } if ssh_users.is_empty() { None } else { let unique_users: std::collections::HashSet<&str> = ssh_users.into_iter().collect(); let count = unique_users.len(); let users: Vec<&str> = unique_users.into_iter().collect(); if count == 1 { Some(format!("1 active user: {}", users[0])) } else { Some(format!("{} active users: {}", count, users.join(", "))) } } } async fn get_web_server_connections(&self) -> Option { // Use simpler ss command with minimal output let output = Command::new("ss") .args(["-tn", "state", "established", "sport", ":80", "or", "sport", ":443"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if !output.status.success() { return None; } let stdout = String::from_utf8_lossy(&output.stdout); let connection_count = stdout.lines().count().saturating_sub(1); // Subtract header line if connection_count > 0 { Some(format!("{} active connections", connection_count)) } else { None } } async fn get_docker_containers(&self) -> Option { let output = Command::new("docker") .args(["ps", "--format", "table {{.Names}}"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if !output.status.success() { return None; } let stdout = String::from_utf8_lossy(&output.stdout); let container_count = stdout.lines().count().saturating_sub(1); // Subtract header line if container_count > 0 { Some(format!("{} running containers", container_count)) } else { Some("no containers running".to_string()) } } async fn get_postgres_connections(&self) -> Option { let output = Command::new("sudo") .args(["-u", "postgres", "psql", "-t", "-c", "SELECT count(*) FROM pg_stat_activity WHERE state = 'active';"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if !output.status.success() { return None; } let stdout = String::from_utf8_lossy(&output.stdout); if let Some(line) = stdout.lines().next() { if let Ok(count) = line.trim().parse::() { if count > 0 { return Some(format!("{} active connections", count)); } } } None } async fn get_mysql_connections(&self) -> Option { let output = Command::new("mysql") .args(["-e", "SHOW PROCESSLIST;"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if !output.status.success() { return None; } let stdout = String::from_utf8_lossy(&output.stdout); let connection_count = stdout.lines().count().saturating_sub(1); // Subtract header line if connection_count > 0 { Some(format!("{} active connections", connection_count)) } else { None } } async fn get_nginx_sites(&self) -> Option> { // For NixOS and other systems, get the actual running nginx config let output = Command::new("nginx") .args(["-T"]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() .await .ok()?; if !output.status.success() { return None; } let config = String::from_utf8_lossy(&output.stdout); let mut sites = Vec::new(); for line in config.lines() { let trimmed = line.trim(); if trimmed.starts_with("server_name") { // Extract server names from "server_name example.com www.example.com;" if let Some(names_part) = trimmed.strip_prefix("server_name") { let names_clean = names_part.trim().trim_end_matches(';'); for name in names_clean.split_whitespace() { // Skip default catch-all server names if name != "_" && !name.is_empty() && name.contains('.') && !name.starts_with('$') { sites.push(name.to_string()); } } } } } // Remove duplicates and limit to reasonable number sites.sort(); sites.dedup(); sites.truncate(15); // Show max 15 sites to avoid overwhelming the UI if sites.is_empty() { None } else { Some(sites) } } } #[async_trait] impl Collector for ServiceCollector { fn name(&self) -> &str { "service" } fn agent_type(&self) -> AgentType { AgentType::Service } fn collect_interval(&self) -> Duration { self.interval } fn is_enabled(&self) -> bool { self.enabled } fn requires_root(&self) -> bool { false // Most systemctl commands work without root } async fn collect(&self) -> Result { let mut services = Vec::new(); let mut healthy = 0; let mut degraded = 0; let mut failed = 0; let mut total_memory_used = 0.0; let mut total_memory_quota = 0.0; let mut total_disk_used = 0.0; // Collect data from all configured services for service in &self.services { match self.get_service_status(service).await { Ok(service_data) => { match service_data.status { ServiceStatus::Running => healthy += 1, ServiceStatus::Degraded | ServiceStatus::Restarting => degraded += 1, ServiceStatus::Stopped => failed += 1, } total_memory_used += service_data.memory_used_mb; if service_data.memory_quota_mb > 0.0 { total_memory_quota += service_data.memory_quota_mb; } total_disk_used += service_data.disk_used_gb; services.push(service_data); } Err(e) => { failed += 1; // Add a placeholder service entry for failed collection services.push(ServiceData { name: service.clone(), status: ServiceStatus::Stopped, memory_used_mb: 0.0, memory_quota_mb: 0.0, cpu_percent: 0.0, sandbox_limit: None, disk_used_gb: 0.0, description: None, }); tracing::warn!("Failed to collect metrics for service {}: {}", service, e); } } } // Get system memory info for quota calculation let system_memory = self .get_system_memory_info() .await .unwrap_or(SystemMemoryInfo { total_mb: 0.0, used_mb: 0.0, }); let _disk_usage = self.get_disk_usage().await.unwrap_or(DiskUsage { total_gb: 0.0, used_gb: 0.0, }); let (cpu_load_1, cpu_load_5, cpu_load_15) = self.get_cpu_load().await.unwrap_or((0.0, 0.0, 0.0)); let cpu_freq_mhz = self.get_cpu_frequency_mhz().await; let cpu_temp_c = self.get_cpu_temperature_c().await; let (gpu_load_percent, gpu_temp_c) = self.get_gpu_metrics().await; // If no specific quotas are set, use system memory as reference if total_memory_quota == 0.0 { total_memory_quota = system_memory.total_mb; } let service_metrics = json!({ "summary": { "healthy": healthy, "degraded": degraded, "failed": failed, "memory_used_mb": total_memory_used, "memory_quota_mb": total_memory_quota, "system_memory_used_mb": system_memory.used_mb, "system_memory_total_mb": system_memory.total_mb, "disk_used_gb": total_disk_used, "disk_total_gb": total_disk_used, // For services, total = used (no quota concept) "cpu_load_1": cpu_load_1, "cpu_load_5": cpu_load_5, "cpu_load_15": cpu_load_15, "cpu_freq_mhz": cpu_freq_mhz, "cpu_temp_c": cpu_temp_c, "gpu_load_percent": gpu_load_percent, "gpu_temp_c": gpu_temp_c, }, "services": services, "timestamp": Utc::now() }); Ok(CollectorOutput { agent_type: AgentType::Service, data: service_metrics, timestamp: Utc::now(), }) } } #[derive(Debug, Clone, Serialize)] struct ServiceData { name: String, status: ServiceStatus, memory_used_mb: f32, memory_quota_mb: f32, cpu_percent: f32, sandbox_limit: Option, disk_used_gb: f32, #[serde(skip_serializing_if = "Option::is_none")] description: Option>, } #[derive(Debug, Clone, Serialize)] enum ServiceStatus { Running, Degraded, Restarting, Stopped, } struct SystemMemoryInfo { total_mb: f32, used_mb: f32, } #[allow(dead_code)] struct DiskUsage { total_gb: f32, used_gb: f32, }