use anyhow::Result; use async_trait::async_trait; use cm_dashboard_shared::{Metric, MetricValue, Status, StatusTracker}; use std::process::Command; use std::sync::RwLock; use std::time::Instant; use tracing::debug; use super::{Collector, CollectorError}; use crate::config::SystemdConfig; /// Systemd collector for monitoring systemd services pub struct SystemdCollector { /// Cached state with thread-safe interior mutability state: RwLock, /// Configuration for service monitoring config: SystemdConfig, } /// Internal state for service caching #[derive(Debug)] struct ServiceCacheState { /// Interesting services to monitor (cached after discovery) monitored_services: Vec, /// Cached service status information from discovery service_status_cache: std::collections::HashMap, /// Last time services were discovered last_discovery_time: Option, /// How often to rediscover services (5 minutes) discovery_interval_seconds: u64, /// Cached nginx site latency metrics nginx_site_metrics: Vec, /// Last time nginx sites were checked last_nginx_check_time: Option, /// How often to check nginx site latency (configurable) nginx_check_interval_seconds: u64, } /// Cached service status information from systemctl list-units #[derive(Debug, Clone)] struct ServiceStatusInfo { load_state: String, active_state: String, sub_state: String, } impl SystemdCollector { pub fn new(config: SystemdConfig) -> Self { Self { state: RwLock::new(ServiceCacheState { monitored_services: Vec::new(), service_status_cache: std::collections::HashMap::new(), last_discovery_time: None, discovery_interval_seconds: config.interval_seconds, nginx_site_metrics: Vec::new(), last_nginx_check_time: None, nginx_check_interval_seconds: config.nginx_check_interval_seconds, }), config, } } /// Get monitored services, discovering them if needed or cache is expired fn get_monitored_services(&self) -> Result> { // Check if we need discovery without holding the lock let needs_discovery = { let state = self.state.read().unwrap(); match state.last_discovery_time { None => true, // First time Some(last_time) => { let elapsed = last_time.elapsed().as_secs(); elapsed >= state.discovery_interval_seconds } } }; if needs_discovery { debug!("Discovering systemd services (cache expired or first run)"); // Call discover_services_internal which doesn't update state match self.discover_services_internal() { Ok((services, status_cache)) => { // Update state with discovered services in a separate scope if let Ok(mut state) = self.state.write() { state.monitored_services = services.clone(); state.service_status_cache = status_cache; state.last_discovery_time = Some(Instant::now()); debug!( "Auto-discovered {} services to monitor: {:?}", state.monitored_services.len(), state.monitored_services ); return Ok(services); } } Err(e) => { debug!("Failed to discover services, using cached list: {}", e); // Continue with existing cached services if discovery fails } } } // Return cached services let state = self.state.read().unwrap(); Ok(state.monitored_services.clone()) } /// Get nginx site metrics, checking them if cache is expired fn get_nginx_site_metrics(&self) -> Vec { let mut state = self.state.write().unwrap(); // Check if we need to refresh nginx site metrics let needs_refresh = match state.last_nginx_check_time { None => true, // First time Some(last_time) => { let elapsed = last_time.elapsed().as_secs(); elapsed >= state.nginx_check_interval_seconds } }; if needs_refresh { // Only check nginx sites if nginx service is active if state.monitored_services.iter().any(|s| s.contains("nginx")) { debug!( "Refreshing nginx site latency metrics (interval: {}s)", state.nginx_check_interval_seconds ); let fresh_metrics = self.get_nginx_sites(); state.nginx_site_metrics = fresh_metrics; state.last_nginx_check_time = Some(Instant::now()); } } state.nginx_site_metrics.clone() } /// Auto-discover interesting services to monitor (internal version that doesn't update state) fn discover_services_internal(&self) -> Result<(Vec, std::collections::HashMap)> { debug!("Starting systemd service discovery with status caching"); // First: Get all service unit files (includes services that have never been started) let unit_files_output = Command::new("systemctl") .arg("list-unit-files") .arg("--type=service") .arg("--no-pager") .arg("--plain") .output()?; if !unit_files_output.status.success() { return Err(anyhow::anyhow!("systemctl list-unit-files command failed")); } // Second: Get runtime status of all units let units_status_output = Command::new("systemctl") .arg("list-units") .arg("--type=service") .arg("--all") .arg("--no-pager") .arg("--plain") .output()?; if !units_status_output.status.success() { return Err(anyhow::anyhow!("systemctl list-units command failed")); } let unit_files_str = String::from_utf8(unit_files_output.stdout)?; let units_status_str = String::from_utf8(units_status_output.stdout)?; let mut services = Vec::new(); // Use configuration instead of hardcoded values let excluded_services = &self.config.excluded_services; let service_name_filters = &self.config.service_name_filters; // Parse all service unit files to get complete service list let mut all_service_names = std::collections::HashSet::new(); for line in unit_files_str.lines() { let fields: Vec<&str> = line.split_whitespace().collect(); if fields.len() >= 2 && fields[0].ends_with(".service") { let service_name = fields[0].trim_end_matches(".service"); all_service_names.insert(service_name.to_string()); debug!("Found service unit file: {}", service_name); } } // Parse runtime status for all units let mut status_cache = std::collections::HashMap::new(); for line in units_status_str.lines() { let fields: Vec<&str> = line.split_whitespace().collect(); if fields.len() >= 4 && fields[0].ends_with(".service") { let service_name = fields[0].trim_end_matches(".service"); // Extract status information from systemctl list-units output let load_state = fields.get(1).unwrap_or(&"unknown").to_string(); let active_state = fields.get(2).unwrap_or(&"unknown").to_string(); let sub_state = fields.get(3).unwrap_or(&"unknown").to_string(); // Cache the status information status_cache.insert(service_name.to_string(), ServiceStatusInfo { load_state: load_state.clone(), active_state: active_state.clone(), sub_state: sub_state.clone(), }); debug!("Got runtime status for service: {} (load:{}, active:{}, sub:{})", service_name, load_state, active_state, sub_state); } } // For services found in unit files but not in runtime status, set default inactive status for service_name in &all_service_names { if !status_cache.contains_key(service_name) { status_cache.insert(service_name.to_string(), ServiceStatusInfo { load_state: "not-loaded".to_string(), active_state: "inactive".to_string(), sub_state: "dead".to_string(), }); debug!("Service {} found in unit files but not runtime - marked as inactive", service_name); } } // Now process all discovered services for service_name in &all_service_names { debug!("Processing service: '{}'", service_name); // Skip excluded services first let mut is_excluded = false; for excluded in excluded_services { if service_name.contains(excluded) { debug!( "EXCLUDING service '{}' because it matches pattern '{}'", service_name, excluded ); is_excluded = true; break; } } if is_excluded { debug!("Skipping excluded service: '{}'", service_name); continue; } // Check if this service matches our filter patterns (supports wildcards) for pattern in service_name_filters { if self.matches_pattern(service_name, pattern) { debug!( "INCLUDING service '{}' because it matches pattern '{}'", service_name, pattern ); services.push(service_name.to_string()); break; } } } debug!("Service discovery completed: found {} matching services: {:?}", services.len(), services); if services.is_empty() { debug!("No services found matching the configured filters - this may indicate a parsing issue"); } Ok((services, status_cache)) } /// Check if service name matches pattern (supports wildcards like nginx*) fn matches_pattern(&self, service_name: &str, pattern: &str) -> bool { if pattern.contains('*') { // Wildcard pattern matching if pattern.ends_with('*') { // Pattern like "nginx*" - match if service starts with "nginx" let prefix = &pattern[..pattern.len() - 1]; service_name.starts_with(prefix) } else if pattern.starts_with('*') { // Pattern like "*backup" - match if service ends with "backup" let suffix = &pattern[1..]; service_name.ends_with(suffix) } else { // Pattern like "nginx*backup" - simple glob matching self.simple_glob_match(service_name, pattern) } } else { // Exact match (existing behavior) service_name == pattern } } /// Simple glob pattern matching for patterns with * in middle fn simple_glob_match(&self, text: &str, pattern: &str) -> bool { let parts: Vec<&str> = pattern.split('*').collect(); if parts.is_empty() { return false; } let mut pos = 0; for (i, part) in parts.iter().enumerate() { if part.is_empty() { continue; } if i == 0 { // First part must match at start if !text[pos..].starts_with(part) { return false; } pos += part.len(); } else if i == parts.len() - 1 { // Last part must match at end return text[pos..].ends_with(part); } else { // Middle part must be found somewhere if let Some(found_pos) = text[pos..].find(part) { pos += found_pos + part.len(); } else { return false; } } } true } /// Get service status from cache (if available) or fallback to systemctl fn get_service_status(&self, service: &str) -> Result<(String, String)> { // Try to get status from cache first if let Ok(state) = self.state.read() { if let Some(cached_info) = state.service_status_cache.get(service) { let active_status = cached_info.active_state.clone(); let detailed_info = format!( "LoadState={}\nActiveState={}\nSubState={}", cached_info.load_state, cached_info.active_state, cached_info.sub_state ); return Ok((active_status, detailed_info)); } } // Fallback to systemctl if not in cache (shouldn't happen during normal operation) debug!("Service '{}' not found in cache, falling back to systemctl", service); let output = Command::new("systemctl") .arg("is-active") .arg(format!("{}.service", service)) .output()?; let active_status = String::from_utf8(output.stdout)?.trim().to_string(); // Get more detailed info let output = Command::new("systemctl") .arg("show") .arg(format!("{}.service", service)) .arg("--property=LoadState,ActiveState,SubState") .output()?; let detailed_info = String::from_utf8(output.stdout)?; Ok((active_status, detailed_info)) } /// Calculate service status, taking user-stopped services into account fn calculate_service_status(&self, service_name: &str, active_status: &str) -> Status { match active_status.to_lowercase().as_str() { "active" => Status::Ok, "inactive" | "dead" => { debug!("Service '{}' is inactive - treating as Inactive status", service_name); Status::Inactive }, "failed" | "error" => Status::Critical, "activating" | "deactivating" | "reloading" | "start" | "stop" | "restart" => { debug!("Service '{}' is transitioning - treating as Pending", service_name); Status::Pending }, _ => Status::Unknown, } } /// Get service memory usage (if available) fn get_service_memory(&self, service: &str) -> Option { let output = Command::new("systemctl") .arg("show") .arg(format!("{}.service", service)) .arg("--property=MemoryCurrent") .output() .ok()?; let output_str = String::from_utf8(output.stdout).ok()?; for line in output_str.lines() { if line.starts_with("MemoryCurrent=") { let memory_str = line.trim_start_matches("MemoryCurrent="); if let Ok(memory_bytes) = memory_str.parse::() { return Some(memory_bytes as f32 / (1024.0 * 1024.0)); // Convert to MB } } } None } /// Get directory size in GB with permission-aware logging fn get_directory_size(&self, dir: &str) -> Option { let output = Command::new("sudo").arg("du").arg("-sb").arg(dir).output().ok()?; if !output.status.success() { // Log permission errors for debugging but don't spam logs let stderr = String::from_utf8_lossy(&output.stderr); if stderr.contains("Permission denied") { debug!("Permission denied accessing directory: {}", dir); } else { debug!("Failed to get size for directory {}: {}", dir, stderr); } return None; } let output_str = String::from_utf8(output.stdout).ok()?; let size_str = output_str.split_whitespace().next()?; if let Ok(size_bytes) = size_str.parse::() { let size_gb = size_bytes as f32 / (1024.0 * 1024.0 * 1024.0); // Return size even if very small (minimum 0.001 GB = 1MB for visibility) if size_gb > 0.0 { Some(size_gb.max(0.001)) } else { None } } else { None } } /// Get service disk usage - simplified and configuration-driven fn get_service_disk_usage(&self, service: &str) -> Option { // 1. Check if service has configured directories (exact match only) if let Some(dirs) = self.config.service_directories.get(service) { // Service has configured paths - use the first accessible one for dir in dirs { if let Some(size) = self.get_directory_size(dir) { return Some(size); } } // If configured paths failed, return None (shows as 0) return Some(0.0); } // 2. No configured path - use systemctl WorkingDirectory let output = Command::new("systemctl") .arg("show") .arg(format!("{}.service", service)) .arg("--property=WorkingDirectory") .output() .ok()?; let output_str = String::from_utf8(output.stdout).ok()?; for line in output_str.lines() { if line.starts_with("WorkingDirectory=") && !line.contains("[not set]") { let dir = line.trim_start_matches("WorkingDirectory="); if !dir.is_empty() && dir != "/" { return self.get_directory_size(dir); } } } None } } #[async_trait] impl Collector for SystemdCollector { async fn collect(&self, _status_tracker: &mut StatusTracker) -> Result, CollectorError> { let start_time = Instant::now(); debug!("Collecting systemd services metrics"); let mut metrics = Vec::new(); // Get cached services (discovery only happens when needed) let monitored_services = match self.get_monitored_services() { Ok(services) => services, Err(e) => { debug!("Failed to get monitored services: {}", e); return Ok(metrics); } }; // Collect individual metrics for each monitored service (status, memory, disk only) for service in &monitored_services { match self.get_service_status(service) { Ok((active_status, _detailed_info)) => { let status = self.calculate_service_status(service, &active_status); // Individual service status metric metrics.push(Metric { name: format!("service_{}_status", service), value: MetricValue::String(active_status.clone()), unit: None, description: Some(format!("Service {} status", service)), status, timestamp: chrono::Utc::now().timestamp() as u64, }); // Service memory usage (if available) if let Some(memory_mb) = self.get_service_memory(service) { metrics.push(Metric { name: format!("service_{}_memory_mb", service), value: MetricValue::Float(memory_mb), unit: Some("MB".to_string()), description: Some(format!("Service {} memory usage", service)), status: Status::Ok, timestamp: chrono::Utc::now().timestamp() as u64, }); } // Service disk usage (comprehensive detection) if let Some(disk_gb) = self.get_service_disk_usage(service) { metrics.push(Metric { name: format!("service_{}_disk_gb", service), value: MetricValue::Float(disk_gb), unit: Some("GB".to_string()), description: Some(format!("Service {} disk usage", service)), status: Status::Ok, timestamp: chrono::Utc::now().timestamp() as u64, }); } // Sub-service metrics for specific services if service.contains("nginx") && active_status == "active" { metrics.extend(self.get_nginx_site_metrics()); } if service.contains("docker") && active_status == "active" { metrics.extend(self.get_docker_containers()); } } Err(e) => { debug!("Failed to get status for service {}: {}", service, e); } } } let collection_time = start_time.elapsed(); debug!( "Systemd collection completed in {:?} with {} individual service metrics", collection_time, metrics.len() ); Ok(metrics) } } impl SystemdCollector { /// Get nginx sites with latency checks fn get_nginx_sites(&self) -> Vec { let mut metrics = Vec::new(); let timestamp = chrono::Utc::now().timestamp() as u64; // Discover nginx sites from configuration let sites = self.discover_nginx_sites(); for (site_name, url) in &sites { match self.check_site_latency(url) { Ok(latency_ms) => { let status = if latency_ms < self.config.nginx_latency_critical_ms { Status::Ok } else { Status::Critical }; metrics.push(Metric { name: format!("service_nginx_{}_latency_ms", site_name), value: MetricValue::Float(latency_ms), unit: Some("ms".to_string()), description: Some(format!("Response time for {}", url)), status, timestamp, }); } Err(_) => { // Site is unreachable metrics.push(Metric { name: format!("service_nginx_{}_latency_ms", site_name), value: MetricValue::Float(-1.0), // Use -1 to indicate error unit: Some("ms".to_string()), description: Some(format!("Response time for {} (unreachable)", url)), status: Status::Critical, timestamp, }); } } } metrics } /// Get docker containers as sub-services fn get_docker_containers(&self) -> Vec { let mut metrics = Vec::new(); let timestamp = chrono::Utc::now().timestamp() as u64; // Check if docker is available let output = Command::new("docker") .arg("ps") .arg("--format") .arg("{{.Names}},{{.Status}}") .output(); let output = match output { Ok(out) if out.status.success() => out, _ => return metrics, // Docker not available or failed }; let output_str = match String::from_utf8(output.stdout) { Ok(s) => s, Err(_) => return metrics, }; for line in output_str.lines() { if line.trim().is_empty() { continue; } let parts: Vec<&str> = line.split(',').collect(); if parts.len() >= 2 { let container_name = parts[0].trim(); let status_str = parts[1].trim(); let status = if status_str.contains("Up") { Status::Ok } else if status_str.contains("Exited") { Status::Warning } else { Status::Critical }; metrics.push(Metric { name: format!("service_docker_{}_status", container_name), value: MetricValue::String(status_str.to_string()), unit: None, description: Some(format!("Docker container {} status", container_name)), status, timestamp, }); } } metrics } /// Check site latency using HTTP GET requests fn check_site_latency(&self, url: &str) -> Result> { use std::time::Duration; use std::time::Instant; let start = Instant::now(); // Create HTTP client with timeouts from configuration let client = reqwest::blocking::Client::builder() .timeout(Duration::from_secs(self.config.http_timeout_seconds)) .connect_timeout(Duration::from_secs(self.config.http_connect_timeout_seconds)) .redirect(reqwest::redirect::Policy::limited(10)) .build()?; // Make GET request and measure latency let response = client.get(url).send()?; let latency = start.elapsed().as_millis() as f32; // Check if response is successful (2xx or 3xx status codes) if response.status().is_success() || response.status().is_redirection() { Ok(latency) } else { Err(format!( "HTTP request failed for {} with status: {}", url, response.status() ) .into()) } } /// Discover nginx sites from configuration files (like the old working implementation) fn discover_nginx_sites(&self) -> Vec<(String, String)> { use tracing::debug; // Use the same approach as the old working agent: get nginx config from systemd let config_content = match self.get_nginx_config_from_systemd() { Some(content) => content, None => { debug!("Could not get nginx config from systemd, trying nginx -T fallback"); match self.get_nginx_config_via_command() { Some(content) => content, None => { debug!("Could not get nginx config via any method"); return Vec::new(); } } } }; // Parse the config content to extract sites self.parse_nginx_config_for_sites(&config_content) } /// Get nginx config from systemd service definition (NixOS compatible) fn get_nginx_config_from_systemd(&self) -> Option { use tracing::debug; let output = std::process::Command::new("systemctl") .args(["show", "nginx", "--property=ExecStart", "--no-pager"]) .output() .ok()?; if !output.status.success() { debug!("Failed to get nginx ExecStart from systemd"); return None; } let stdout = String::from_utf8_lossy(&output.stdout); debug!("systemctl show nginx output: {}", stdout); // Parse ExecStart to extract -c config path for line in stdout.lines() { if line.starts_with("ExecStart=") { debug!("Found ExecStart line: {}", line); // Handle both traditional and NixOS systemd formats if let Some(config_path) = self.extract_config_path_from_exec_start(line) { debug!("Extracted config path: {}", config_path); // Read the config file return std::fs::read_to_string(&config_path) .map_err(|e| debug!("Failed to read config file {}: {}", config_path, e)) .ok(); } } } None } /// Extract config path from ExecStart line fn extract_config_path_from_exec_start(&self, exec_start: &str) -> Option { use tracing::debug; // Remove ExecStart= prefix let exec_part = exec_start.strip_prefix("ExecStart=")?; debug!("Parsing exec part: {}", exec_part); // Handle NixOS format: ExecStart={ path=...; argv[]=...nginx -c /config; ... } if exec_part.contains("argv[]=") { // Extract the part after argv[]= let argv_start = exec_part.find("argv[]=")?; let argv_part = &exec_part[argv_start + 7..]; // Skip "argv[]=" debug!("Found NixOS argv part: {}", argv_part); // Look for -c flag followed by config path if let Some(c_pos) = argv_part.find(" -c ") { let after_c = &argv_part[c_pos + 4..]; // Find the config path (until next space or semicolon) let config_path = after_c.split([' ', ';']).next()?; return Some(config_path.to_string()); } } else { // Handle traditional format: ExecStart=/path/nginx -c /config debug!("Parsing traditional format"); if let Some(c_pos) = exec_part.find(" -c ") { let after_c = &exec_part[c_pos + 4..]; let config_path = after_c.split_whitespace().next()?; return Some(config_path.to_string()); } } None } /// Fallback: get nginx config via nginx -T command fn get_nginx_config_via_command(&self) -> Option { use tracing::debug; let output = std::process::Command::new("nginx") .args(["-T"]) .output() .ok()?; if !output.status.success() { debug!("nginx -T failed"); return None; } Some(String::from_utf8_lossy(&output.stdout).to_string()) } /// Parse nginx config content to extract server names and build site list fn parse_nginx_config_for_sites(&self, config_content: &str) -> Vec<(String, String)> { use tracing::debug; let mut sites = Vec::new(); let lines: Vec<&str> = config_content.lines().collect(); let mut i = 0; debug!("Parsing nginx config with {} lines", lines.len()); while i < lines.len() { let line = lines[i].trim(); if line.starts_with("server") && line.contains("{") { if let Some(server_name) = self.parse_server_block(&lines, &mut i) { let url = format!("https://{}", server_name); sites.push((server_name.clone(), url)); } } i += 1; } debug!("Discovered {} nginx sites total", sites.len()); sites } /// Parse a server block to extract the primary server_name fn parse_server_block(&self, lines: &[&str], start_index: &mut usize) -> Option { use tracing::debug; let mut server_names = Vec::new(); let mut has_redirect = false; let mut i = *start_index + 1; let mut brace_count = 1; // Parse until we close the server block while i < lines.len() && brace_count > 0 { let trimmed = lines[i].trim(); // Track braces brace_count += trimmed.matches('{').count(); brace_count -= trimmed.matches('}').count(); // Extract server_name if trimmed.starts_with("server_name") { if let Some(names_part) = trimmed.strip_prefix("server_name") { let names_clean = names_part.trim().trim_end_matches(';'); for name in names_clean.split_whitespace() { if name != "_" && !name.is_empty() && name.contains('.') && !name.starts_with('$') { server_names.push(name.to_string()); debug!("Found server_name in block: {}", name); } } } } // Check for redirects (skip redirect-only servers) if trimmed.contains("return") && (trimmed.contains("301") || trimmed.contains("302")) { has_redirect = true; } i += 1; } *start_index = i - 1; if !server_names.is_empty() && !has_redirect { return Some(server_names[0].clone()); } None } }