use anyhow::Result; use async_trait::async_trait; use cm_dashboard_shared::{AgentData, ServiceData, SubServiceData, SubServiceMetric, Status}; use std::process::Command; use std::sync::RwLock; use std::time::Instant; use tracing::debug; use super::{Collector, CollectorError}; use crate::config::SystemdConfig; /// Systemd collector for monitoring systemd services with structured data output pub struct SystemdCollector { /// Cached state with thread-safe interior mutability state: RwLock, /// Configuration for service monitoring config: SystemdConfig, } /// Internal state for service caching #[derive(Debug, Clone)] struct ServiceCacheState { /// Last collection time for performance tracking last_collection: Option, /// Cached complete service data with sub-services cached_service_data: Vec, /// Interesting services to monitor (cached after discovery) monitored_services: Vec, /// Cached service status information from discovery service_status_cache: std::collections::HashMap, /// Last time services were discovered last_discovery_time: Option, /// How often to rediscover services (from config) discovery_interval_seconds: u64, /// Cached nginx site latency metrics nginx_site_metrics: Vec<(String, f32)>, /// Last time nginx sites were checked last_nginx_check_time: Option, /// How often to check nginx site latency (configurable) nginx_check_interval_seconds: u64, } /// Cached service status information from systemctl list-units #[derive(Debug, Clone)] struct ServiceStatusInfo { load_state: String, active_state: String, sub_state: String, } impl SystemdCollector { pub fn new(config: SystemdConfig) -> Self { let state = ServiceCacheState { last_collection: None, cached_service_data: Vec::new(), monitored_services: Vec::new(), service_status_cache: std::collections::HashMap::new(), last_discovery_time: None, discovery_interval_seconds: config.interval_seconds, nginx_site_metrics: Vec::new(), last_nginx_check_time: None, nginx_check_interval_seconds: config.nginx_check_interval_seconds, }; Self { state: RwLock::new(state), config, } } /// Collect service data and populate AgentData async fn collect_service_data(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> { let start_time = Instant::now(); debug!("Collecting systemd services metrics"); // Get cached services (discovery only happens when needed) let monitored_services = match self.get_monitored_services() { Ok(services) => services, Err(e) => { debug!("Failed to get monitored services: {}", e); return Ok(()); } }; // Collect service data for each monitored service let mut complete_service_data = Vec::new(); for service_name in &monitored_services { match self.get_service_status(service_name) { Ok((active_status, _detailed_info)) => { let memory_mb = self.get_service_memory_usage(service_name).await.unwrap_or(0.0); let disk_gb = self.get_service_disk_usage(service_name).await.unwrap_or(0.0); let mut sub_services = Vec::new(); // Sub-service metrics for specific services (always include cached results) if service_name.contains("nginx") && active_status == "active" { let nginx_sites = self.get_nginx_site_metrics(); for (site_name, latency_ms) in nginx_sites { let site_status = if latency_ms >= 0.0 && latency_ms < self.config.nginx_latency_critical_ms { "active" } else { "failed" }; let mut metrics = Vec::new(); metrics.push(SubServiceMetric { label: "latency_ms".to_string(), value: latency_ms, unit: Some("ms".to_string()), }); sub_services.push(SubServiceData { name: site_name.clone(), service_status: self.calculate_service_status(&site_name, &site_status), metrics, }); } } if service_name.contains("docker") && active_status == "active" { debug!("Collecting Docker sub-services for service: {}", service_name); let docker_containers = self.get_docker_containers(); debug!("Found {} Docker containers", docker_containers.len()); for (container_name, container_status) in docker_containers { // For now, docker containers have no additional metrics // Future: could add memory_mb, cpu_percent, restart_count, etc. let metrics = Vec::new(); sub_services.push(SubServiceData { name: container_name.clone(), service_status: self.calculate_service_status(&container_name, &container_status), metrics, }); } // Add Docker images let docker_images = self.get_docker_images(); debug!("Found {} Docker images", docker_images.len()); for (image_name, image_status, image_size) in docker_images { let mut metrics = Vec::new(); metrics.push(SubServiceMetric { label: "size".to_string(), value: 0.0, // Size as string in name instead unit: None, }); sub_services.push(SubServiceData { name: format!("{} ({})", image_name, image_size), service_status: self.calculate_service_status(&image_name, &image_status), metrics, }); } debug!("Total Docker sub-services added: {}", sub_services.len()); } // Create complete service data let service_data = ServiceData { name: service_name.clone(), memory_mb, disk_gb, user_stopped: false, // TODO: Integrate with service tracker service_status: self.calculate_service_status(service_name, &active_status), sub_services, }; // Add to AgentData and cache agent_data.services.push(service_data.clone()); complete_service_data.push(service_data); } Err(e) => { debug!("Failed to get status for service {}: {}", service_name, e); } } } // Update cached state { let mut state = self.state.write().unwrap(); state.last_collection = Some(start_time); state.cached_service_data = complete_service_data; } let elapsed = start_time.elapsed(); debug!("Systemd collection completed in {:?} with {} services", elapsed, agent_data.services.len()); Ok(()) } /// Get monitored services, discovering them if needed or cache is expired fn get_monitored_services(&self) -> Result> { // Check if we need discovery without holding the lock let needs_discovery = { let state = self.state.read().unwrap(); match state.last_discovery_time { None => true, // First time Some(last_time) => { let elapsed = last_time.elapsed().as_secs(); elapsed >= state.discovery_interval_seconds } } }; if needs_discovery { debug!("Discovering systemd services (cache expired or first run)"); match self.discover_services_internal() { Ok((services, status_cache)) => { if let Ok(mut state) = self.state.write() { state.monitored_services = services.clone(); state.service_status_cache = status_cache; state.last_discovery_time = Some(Instant::now()); debug!("Auto-discovered {} services to monitor: {:?}", state.monitored_services.len(), state.monitored_services); return Ok(services); } } Err(e) => { debug!("Failed to discover services, using cached list: {}", e); } } } // Return cached services let state = self.state.read().unwrap(); Ok(state.monitored_services.clone()) } /// Get nginx site metrics, checking them if cache is expired (like old working version) fn get_nginx_site_metrics(&self) -> Vec<(String, f32)> { let mut state = self.state.write().unwrap(); // Check if we need to refresh nginx site metrics let needs_refresh = match state.last_nginx_check_time { None => true, // First time Some(last_time) => { let elapsed = last_time.elapsed().as_secs(); elapsed >= state.nginx_check_interval_seconds } }; if needs_refresh { // Only check nginx sites if nginx service is active if state.monitored_services.iter().any(|s| s.contains("nginx")) { let fresh_metrics = self.get_nginx_sites_internal(); state.nginx_site_metrics = fresh_metrics; state.last_nginx_check_time = Some(Instant::now()); } } state.nginx_site_metrics.clone() } /// Auto-discover interesting services to monitor fn discover_services_internal(&self) -> Result<(Vec, std::collections::HashMap)> { // First: Get all service unit files let unit_files_output = Command::new("systemctl") .args(&["list-unit-files", "--type=service", "--no-pager", "--plain"]) .output()?; if !unit_files_output.status.success() { return Err(anyhow::anyhow!("systemctl list-unit-files command failed")); } // Second: Get runtime status of all units let units_status_output = Command::new("systemctl") .args(&["list-units", "--type=service", "--all", "--no-pager", "--plain"]) .output()?; if !units_status_output.status.success() { return Err(anyhow::anyhow!("systemctl list-units command failed")); } let unit_files_str = String::from_utf8(unit_files_output.stdout)?; let units_status_str = String::from_utf8(units_status_output.stdout)?; let mut services = Vec::new(); let excluded_services = &self.config.excluded_services; let service_name_filters = &self.config.service_name_filters; // Parse all service unit files let mut all_service_names = std::collections::HashSet::new(); for line in unit_files_str.lines() { let fields: Vec<&str> = line.split_whitespace().collect(); if fields.len() >= 2 && fields[0].ends_with(".service") { let service_name = fields[0].trim_end_matches(".service"); all_service_names.insert(service_name.to_string()); } } // Parse runtime status for all units let mut status_cache = std::collections::HashMap::new(); for line in units_status_str.lines() { let fields: Vec<&str> = line.split_whitespace().collect(); if fields.len() >= 4 && fields[0].ends_with(".service") { let service_name = fields[0].trim_end_matches(".service"); let load_state = fields.get(1).unwrap_or(&"unknown").to_string(); let active_state = fields.get(2).unwrap_or(&"unknown").to_string(); let sub_state = fields.get(3).unwrap_or(&"unknown").to_string(); status_cache.insert(service_name.to_string(), ServiceStatusInfo { load_state, active_state, sub_state, }); } } // For services found in unit files but not in runtime status, set default inactive status for service_name in &all_service_names { if !status_cache.contains_key(service_name) { status_cache.insert(service_name.to_string(), ServiceStatusInfo { load_state: "not-loaded".to_string(), active_state: "inactive".to_string(), sub_state: "dead".to_string(), }); } } // Process all discovered services and apply filters for service_name in &all_service_names { // Skip excluded services first let mut is_excluded = false; for excluded in excluded_services { if service_name.contains(excluded) { is_excluded = true; break; } } if is_excluded { continue; } // Check if this service matches our filter patterns (supports wildcards) for pattern in service_name_filters { if self.matches_pattern(service_name, pattern) { services.push(service_name.to_string()); break; } } } Ok((services, status_cache)) } /// Get service status from cache (if available) or fallback to systemctl fn get_service_status(&self, service: &str) -> Result<(String, String)> { // Try to get status from cache first if let Ok(state) = self.state.read() { if let Some(cached_info) = state.service_status_cache.get(service) { let active_status = cached_info.active_state.clone(); let detailed_info = format!( "LoadState={}\nActiveState={}\nSubState={}", cached_info.load_state, cached_info.active_state, cached_info.sub_state ); return Ok((active_status, detailed_info)); } } // Fallback to systemctl if not in cache let output = Command::new("systemctl") .args(&["is-active", &format!("{}.service", service)]) .output()?; let active_status = String::from_utf8(output.stdout)?.trim().to_string(); // Get more detailed info let output = Command::new("systemctl") .args(&["show", &format!("{}.service", service), "--property=LoadState,ActiveState,SubState"]) .output()?; let detailed_info = String::from_utf8(output.stdout)?; Ok((active_status, detailed_info)) } /// Check if service name matches pattern (supports wildcards like nginx*) fn matches_pattern(&self, service_name: &str, pattern: &str) -> bool { if pattern.contains('*') { if pattern.ends_with('*') { // Pattern like "nginx*" - match if service starts with "nginx" let prefix = &pattern[..pattern.len() - 1]; service_name.starts_with(prefix) } else if pattern.starts_with('*') { // Pattern like "*backup" - match if service ends with "backup" let suffix = &pattern[1..]; service_name.ends_with(suffix) } else { // Pattern like "nginx*backup" - simple glob matching self.simple_glob_match(service_name, pattern) } } else { // Exact match service_name == pattern } } /// Simple glob matching for patterns with * in the middle fn simple_glob_match(&self, text: &str, pattern: &str) -> bool { let parts: Vec<&str> = pattern.split('*').collect(); let mut pos = 0; for part in parts { if part.is_empty() { continue; } if let Some(found_pos) = text[pos..].find(part) { pos += found_pos + part.len(); } else { return false; } } true } /// Get disk usage for a specific service async fn get_service_disk_usage(&self, service_name: &str) -> Result { // Check if this service has configured directory paths if let Some(dirs) = self.config.service_directories.get(service_name) { // Service has configured paths - use the first accessible one for dir in dirs { if let Some(size) = self.get_directory_size(dir) { return Ok(size); } } // If configured paths failed, return 0 return Ok(0.0); } // No configured path - try to get WorkingDirectory from systemctl let output = Command::new("systemctl") .args(&["show", &format!("{}.service", service_name), "--property=WorkingDirectory"]) .output() .map_err(|e| CollectorError::SystemRead { path: format!("WorkingDirectory for {}", service_name), error: e.to_string(), })?; let output_str = String::from_utf8_lossy(&output.stdout); for line in output_str.lines() { if line.starts_with("WorkingDirectory=") && !line.contains("[not set]") { let dir = line.strip_prefix("WorkingDirectory=").unwrap_or(""); if !dir.is_empty() && dir != "/" { return Ok(self.get_directory_size(dir).unwrap_or(0.0)); } } } Ok(0.0) } /// Get size of a directory in GB fn get_directory_size(&self, path: &str) -> Option { let output = Command::new("sudo") .args(&["du", "-sb", path]) .output() .ok()?; if !output.status.success() { // Log permission errors for debugging but don't spam logs let stderr = String::from_utf8_lossy(&output.stderr); if stderr.contains("Permission denied") { debug!("Permission denied accessing directory: {}", path); } else { debug!("Failed to get size for directory {}: {}", path, stderr); } return None; } let output_str = String::from_utf8(output.stdout).ok()?; let size_str = output_str.split_whitespace().next()?; if let Ok(size_bytes) = size_str.parse::() { let size_gb = size_bytes as f32 / (1024.0 * 1024.0 * 1024.0); // Return size even if very small (minimum 0.001 GB = 1MB for visibility) if size_gb > 0.0 { Some(size_gb.max(0.001)) } else { None } } else { None } } /// Calculate service status, taking user-stopped services into account fn calculate_service_status(&self, service_name: &str, active_status: &str) -> Status { match active_status.to_lowercase().as_str() { "active" => Status::Ok, "inactive" | "dead" => { debug!("Service '{}' is inactive - treating as Inactive status", service_name); Status::Inactive }, "failed" | "error" => Status::Critical, "activating" | "deactivating" | "reloading" | "starting" | "stopping" => { debug!("Service '{}' is transitioning - treating as Pending", service_name); Status::Pending }, _ => Status::Unknown, } } /// Get memory usage for a specific service async fn get_service_memory_usage(&self, service_name: &str) -> Result { let output = Command::new("systemctl") .args(&["show", &format!("{}.service", service_name), "--property=MemoryCurrent"]) .output() .map_err(|e| CollectorError::SystemRead { path: format!("memory usage for {}", service_name), error: e.to_string(), })?; let output_str = String::from_utf8_lossy(&output.stdout); for line in output_str.lines() { if line.starts_with("MemoryCurrent=") { if let Some(mem_str) = line.strip_prefix("MemoryCurrent=") { if mem_str != "[not set]" { if let Ok(memory_bytes) = mem_str.parse::() { return Ok(memory_bytes as f32 / (1024.0 * 1024.0)); // Convert to MB } } } } } Ok(0.0) } /// Check if service collection cache should be updated fn should_update_cache(&self) -> bool { let state = self.state.read().unwrap(); match state.last_collection { None => true, Some(last) => { let cache_duration = std::time::Duration::from_secs(30); last.elapsed() > cache_duration } } } /// Get cached complete service data with sub-services if available and fresh fn get_cached_complete_services(&self) -> Option> { if !self.should_update_cache() { let state = self.state.read().unwrap(); Some(state.cached_service_data.clone()) } else { None } } /// Get nginx sites with latency checks (internal - no caching) fn get_nginx_sites_internal(&self) -> Vec<(String, f32)> { let mut sites = Vec::new(); // Discover nginx sites from configuration let discovered_sites = self.discover_nginx_sites(); // Always add all discovered sites, even if checks fail (like old version) for (site_name, url) in &discovered_sites { match self.check_site_latency(url) { Ok(latency_ms) => { sites.push((site_name.clone(), latency_ms)); } Err(_) => { // Site is unreachable - use -1.0 to indicate error (like old version) sites.push((site_name.clone(), -1.0)); } } } sites } /// Discover nginx sites from configuration fn discover_nginx_sites(&self) -> Vec<(String, String)> { // Use the same approach as the old working agent: get nginx config from systemd let config_content = match self.get_nginx_config_from_systemd() { Some(content) => content, None => { debug!("Could not get nginx config from systemd, trying nginx -T fallback"); match self.get_nginx_config_via_command() { Some(content) => content, None => { debug!("Could not get nginx config via any method"); return Vec::new(); } } } }; // Parse the config content to extract sites self.parse_nginx_config_for_sites(&config_content) } /// Fallback: get nginx config via nginx -T command fn get_nginx_config_via_command(&self) -> Option { let output = Command::new("nginx") .args(&["-T"]) .output() .ok()?; if !output.status.success() { debug!("nginx -T failed"); return None; } Some(String::from_utf8_lossy(&output.stdout).to_string()) } /// Get nginx config from systemd service definition (NixOS compatible) fn get_nginx_config_from_systemd(&self) -> Option { let output = Command::new("systemctl") .args(&["show", "nginx", "--property=ExecStart", "--no-pager"]) .output() .ok()?; if !output.status.success() { debug!("Failed to get nginx ExecStart from systemd"); return None; } let stdout = String::from_utf8_lossy(&output.stdout); debug!("systemctl show nginx output: {}", stdout); // Parse ExecStart to extract -c config path for line in stdout.lines() { if line.starts_with("ExecStart=") { debug!("Found ExecStart line: {}", line); if let Some(config_path) = self.extract_config_path_from_exec_start(line) { debug!("Extracted config path: {}", config_path); return std::fs::read_to_string(&config_path).ok(); } } } None } /// Extract config path from ExecStart line fn extract_config_path_from_exec_start(&self, exec_start: &str) -> Option { // Remove ExecStart= prefix let exec_part = exec_start.strip_prefix("ExecStart=")?; debug!("Parsing exec part: {}", exec_part); // Handle NixOS format: ExecStart={ path=...; argv[]=...nginx -c /config; ... } if exec_part.contains("argv[]=") { // Extract the part after argv[]= let argv_start = exec_part.find("argv[]=")?; let argv_part = &exec_part[argv_start + 7..]; // Skip "argv[]=" debug!("Found NixOS argv part: {}", argv_part); // Look for -c flag followed by config path if let Some(c_pos) = argv_part.find(" -c ") { let after_c = &argv_part[c_pos + 4..]; // Find the config path (until next space or semicolon) let config_path = after_c.split([' ', ';']).next()?; return Some(config_path.to_string()); } } else { // Handle traditional format: ExecStart=/path/nginx -c /config debug!("Parsing traditional format"); if let Some(c_pos) = exec_part.find(" -c ") { let after_c = &exec_part[c_pos + 4..]; let config_path = after_c.split_whitespace().next()?; return Some(config_path.to_string()); } } None } /// Parse nginx config content to extract server names and build site list fn parse_nginx_config_for_sites(&self, config_content: &str) -> Vec<(String, String)> { let mut sites = Vec::new(); let lines: Vec<&str> = config_content.lines().collect(); let mut i = 0; debug!("Parsing nginx config with {} lines", lines.len()); while i < lines.len() { let line = lines[i].trim(); if line.starts_with("server") && line.contains("{") { if let Some(server_name) = self.parse_server_block(&lines, &mut i) { let url = format!("https://{}", server_name); sites.push((server_name.clone(), url)); } } i += 1; } debug!("Discovered {} nginx sites total", sites.len()); sites } /// Parse a server block to extract the primary server_name fn parse_server_block(&self, lines: &[&str], start_index: &mut usize) -> Option { let mut server_names = Vec::new(); let mut has_redirect = false; let mut i = *start_index + 1; let mut brace_count = 1; // Parse until we close the server block while i < lines.len() && brace_count > 0 { let trimmed = lines[i].trim(); // Track braces brace_count += trimmed.matches('{').count(); brace_count -= trimmed.matches('}').count(); // Extract server_name if trimmed.starts_with("server_name") { if let Some(names_part) = trimmed.strip_prefix("server_name") { let names_clean = names_part.trim().trim_end_matches(';'); for name in names_clean.split_whitespace() { if name != "_" && !name.is_empty() && name.contains('.') && !name.starts_with('$') { server_names.push(name.to_string()); debug!("Found server_name in block: {}", name); } } } } // Check for redirects (skip redirect-only servers) if trimmed.contains("return") && (trimmed.contains("301") || trimmed.contains("302")) { has_redirect = true; } i += 1; } *start_index = i - 1; if !server_names.is_empty() && !has_redirect { return Some(server_names[0].clone()); } None } /// Check site latency using HTTP GET requests fn check_site_latency(&self, url: &str) -> Result> { use std::time::Duration; use std::time::Instant; let start = Instant::now(); // Create HTTP client with timeouts from configuration let client = reqwest::blocking::Client::builder() .timeout(Duration::from_secs(self.config.http_timeout_seconds)) .connect_timeout(Duration::from_secs(self.config.http_connect_timeout_seconds)) .redirect(reqwest::redirect::Policy::limited(10)) .build()?; // Make GET request and measure latency let response = client.get(url).send()?; let latency = start.elapsed().as_millis() as f32; // Check if response is successful (2xx or 3xx status codes) if response.status().is_success() || response.status().is_redirection() { Ok(latency) } else { Err(format!( "HTTP request failed for {} with status: {}", url, response.status() ) .into()) } } /// Get docker containers as sub-services fn get_docker_containers(&self) -> Vec<(String, String)> { let mut containers = Vec::new(); // Check if docker is available (use sudo for permissions) // Use -a to show ALL containers (running and stopped) let output = Command::new("sudo") .args(&["docker", "ps", "-a", "--format", "{{.Names}},{{.Status}}"]) .output(); let output = match output { Ok(out) if out.status.success() => out, _ => return containers, // Docker not available or failed }; let output_str = match String::from_utf8(output.stdout) { Ok(s) => s, Err(_) => return containers, }; for line in output_str.lines() { if line.trim().is_empty() { continue; } let parts: Vec<&str> = line.split(',').collect(); if parts.len() >= 2 { let container_name = parts[0].trim(); let status_str = parts[1].trim(); let container_status = if status_str.contains("Up") { "active" } else if status_str.contains("Exited") || status_str.contains("Created") { "inactive" // Stopped/created containers are inactive } else { "failed" // Other states (restarting, paused, dead) → failed }; containers.push((format!("docker_{}", container_name), container_status.to_string())); } } containers } /// Get docker images as sub-services fn get_docker_images(&self) -> Vec<(String, String, String)> { let mut images = Vec::new(); debug!("Collecting Docker images"); // Check if docker is available (use sudo for permissions) let output = Command::new("sudo") .args(&["docker", "images", "--format", "{{.Repository}}:{{.Tag}},{{.Size}}"]) .output(); let output = match output { Ok(out) if out.status.success() => out, Ok(out) => { debug!("Docker images command failed with status: {}", out.status); return images; } Err(e) => { debug!("Docker images command error: {}", e); return images; } }; let output_str = match String::from_utf8(output.stdout) { Ok(s) => s, Err(_) => return images, }; for line in output_str.lines() { if line.trim().is_empty() { continue; } let parts: Vec<&str> = line.split(',').collect(); if parts.len() >= 2 { let image_name = parts[0].trim(); let size = parts[1].trim(); // Skip : images (dangling images) if image_name.contains("") { continue; } images.push(( format!("image_{}", image_name), "active".to_string(), // Images are always "active" (present) size.to_string() )); } } images } } #[async_trait] impl Collector for SystemdCollector { async fn collect_structured(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> { // Use cached complete data if available and fresh if let Some(cached_complete_services) = self.get_cached_complete_services() { for service_data in cached_complete_services { agent_data.services.push(service_data); } Ok(()) } else { // Collect fresh data self.collect_service_data(agent_data).await } } }