diff --git a/agent/src/collectors/systemd.rs b/agent/src/collectors/systemd.rs index 818bdbe..736fa9e 100644 --- a/agent/src/collectors/systemd.rs +++ b/agent/src/collectors/systemd.rs @@ -24,6 +24,22 @@ struct ServiceCacheState { last_collection: Option, /// Cached service data services: Vec, + /// Interesting services to monitor (cached after discovery) + monitored_services: Vec, + /// Cached service status information from discovery + service_status_cache: std::collections::HashMap, + /// Last time services were discovered + last_discovery_time: Option, + /// How often to rediscover services (from config) + discovery_interval_seconds: u64, +} + +/// Cached service status information from systemctl list-units +#[derive(Debug, Clone)] +struct ServiceStatusInfo { + load_state: String, + active_state: String, + sub_state: String, } /// Internal service information @@ -32,7 +48,7 @@ struct ServiceInfo { name: String, status: String, // "active", "inactive", "failed", etc. memory_mb: f32, // Memory usage in MB - disk_gb: f32, // Disk usage in GB (usually 0 for services) + disk_gb: f32, // Disk usage in GB } impl SystemdCollector { @@ -40,6 +56,10 @@ impl SystemdCollector { let state = ServiceCacheState { last_collection: None, services: Vec::new(), + monitored_services: Vec::new(), + service_status_cache: std::collections::HashMap::new(), + last_discovery_time: None, + discovery_interval_seconds: config.interval_seconds, }; Self { @@ -53,8 +73,36 @@ impl SystemdCollector { let start_time = Instant::now(); debug!("Collecting systemd services metrics"); - // Get systemd services status - let services = self.get_systemd_services().await?; + // Get cached services (discovery only happens when needed) + let monitored_services = match self.get_monitored_services() { + Ok(services) => services, + Err(e) => { + debug!("Failed to get monitored services: {}", e); + return Ok(()); + } + }; + + // Collect service data for each monitored service + let mut services = Vec::new(); + for service_name in &monitored_services { + match self.get_service_status(service_name) { + Ok((active_status, _detailed_info)) => { + let memory_mb = self.get_service_memory_usage(service_name).await.unwrap_or(0.0); + let disk_gb = self.get_service_disk_usage(service_name).await.unwrap_or(0.0); + + let service_info = ServiceInfo { + name: service_name.clone(), + status: active_status, + memory_mb, + disk_gb, + }; + services.push(service_info); + } + Err(e) => { + debug!("Failed to get status for service {}: {}", service_name, e); + } + } + } // Update cached state { @@ -81,90 +129,208 @@ impl SystemdCollector { Ok(()) } - /// Get systemd services information - async fn get_systemd_services(&self) -> Result, CollectorError> { + /// Get monitored services, discovering them if needed or cache is expired + fn get_monitored_services(&self) -> Result> { + // Check if we need discovery without holding the lock + let needs_discovery = { + let state = self.state.read().unwrap(); + match state.last_discovery_time { + None => true, // First time + Some(last_time) => { + let elapsed = last_time.elapsed().as_secs(); + elapsed >= state.discovery_interval_seconds + } + } + }; + + if needs_discovery { + debug!("Discovering systemd services (cache expired or first run)"); + match self.discover_services_internal() { + Ok((services, status_cache)) => { + if let Ok(mut state) = self.state.write() { + state.monitored_services = services.clone(); + state.service_status_cache = status_cache; + state.last_discovery_time = Some(Instant::now()); + debug!("Auto-discovered {} services to monitor: {:?}", + state.monitored_services.len(), state.monitored_services); + return Ok(services); + } + } + Err(e) => { + debug!("Failed to discover services, using cached list: {}", e); + } + } + } + + // Return cached services + let state = self.state.read().unwrap(); + Ok(state.monitored_services.clone()) + } + + /// Auto-discover interesting services to monitor + fn discover_services_internal(&self) -> Result<(Vec, std::collections::HashMap)> { + // First: Get all service unit files + let unit_files_output = Command::new("systemctl") + .args(&["list-unit-files", "--type=service", "--no-pager", "--plain"]) + .output()?; + + if !unit_files_output.status.success() { + return Err(anyhow::anyhow!("systemctl list-unit-files command failed")); + } + + // Second: Get runtime status of all units + let units_status_output = Command::new("systemctl") + .args(&["list-units", "--type=service", "--all", "--no-pager", "--plain"]) + .output()?; + + if !units_status_output.status.success() { + return Err(anyhow::anyhow!("systemctl list-units command failed")); + } + + let unit_files_str = String::from_utf8(unit_files_output.stdout)?; + let units_status_str = String::from_utf8(units_status_output.stdout)?; let mut services = Vec::new(); - // Get basic service status from systemctl - let status_output = Command::new("systemctl") - .args(&["list-units", "--type=service", "--no-pager", "--plain"]) - .output() - .map_err(|e| CollectorError::SystemRead { - path: "systemctl list-units".to_string(), - error: e.to_string(), - })?; + let excluded_services = &self.config.excluded_services; + let service_name_filters = &self.config.service_name_filters; - let status_str = String::from_utf8_lossy(&status_output.stdout); - - // Parse service status - for line in status_str.lines() { - if line.trim().is_empty() || line.contains("UNIT") { + // Parse all service unit files + let mut all_service_names = std::collections::HashSet::new(); + for line in unit_files_str.lines() { + let fields: Vec<&str> = line.split_whitespace().collect(); + if fields.len() >= 2 && fields[0].ends_with(".service") { + let service_name = fields[0].trim_end_matches(".service"); + all_service_names.insert(service_name.to_string()); + } + } + + // Parse runtime status for all units + let mut status_cache = std::collections::HashMap::new(); + for line in units_status_str.lines() { + let fields: Vec<&str> = line.split_whitespace().collect(); + if fields.len() >= 4 && fields[0].ends_with(".service") { + let service_name = fields[0].trim_end_matches(".service"); + let load_state = fields.get(1).unwrap_or(&"unknown").to_string(); + let active_state = fields.get(2).unwrap_or(&"unknown").to_string(); + let sub_state = fields.get(3).unwrap_or(&"unknown").to_string(); + + status_cache.insert(service_name.to_string(), ServiceStatusInfo { + load_state, + active_state, + sub_state, + }); + } + } + + // For services found in unit files but not in runtime status, set default inactive status + for service_name in &all_service_names { + if !status_cache.contains_key(service_name) { + status_cache.insert(service_name.to_string(), ServiceStatusInfo { + load_state: "not-loaded".to_string(), + active_state: "inactive".to_string(), + sub_state: "dead".to_string(), + }); + } + } + + // Process all discovered services and apply filters + for service_name in &all_service_names { + // Skip excluded services first + let mut is_excluded = false; + for excluded in excluded_services { + if service_name.contains(excluded) { + is_excluded = true; + break; + } + } + + if is_excluded { continue; } - let parts: Vec<&str> = line.split_whitespace().collect(); - if parts.len() >= 4 { - let service_name = parts[0].trim_end_matches(".service"); - let load_state = parts[1]; - let active_state = parts[2]; - let sub_state = parts[3]; - - // Skip if not loaded - if load_state != "loaded" { - continue; - } - - // Filter services based on configuration with wildcard support - if self.should_monitor_service(service_name) { - // Get memory usage for this service - let memory_mb = self.get_service_memory_usage(service_name).await.unwrap_or(0.0); - - // Get disk usage for this service - let disk_gb = self.get_service_disk_usage(service_name).await.unwrap_or(0.0); - - let normalized_status = self.normalize_service_status(active_state, sub_state); - let service_info = ServiceInfo { - name: service_name.to_string(), - status: normalized_status, - memory_mb, - disk_gb, - }; - - services.push(service_info); + // Check if this service matches our filter patterns (supports wildcards) + for pattern in service_name_filters { + if self.matches_pattern(service_name, pattern) { + services.push(service_name.to_string()); + break; } } } - Ok(services) + Ok((services, status_cache)) } - /// Check if a service should be monitored based on configuration filters with wildcard support - fn should_monitor_service(&self, service_name: &str) -> bool { - // If no filters configured, monitor nothing (to prevent noise) - if self.config.service_name_filters.is_empty() { - return false; - } - - // Check if service matches any of the configured patterns - for pattern in &self.config.service_name_filters { - if self.matches_pattern(service_name, pattern) { - return true; + /// Get service status from cache (if available) or fallback to systemctl + fn get_service_status(&self, service: &str) -> Result<(String, String)> { + // Try to get status from cache first + if let Ok(state) = self.state.read() { + if let Some(cached_info) = state.service_status_cache.get(service) { + let active_status = cached_info.active_state.clone(); + let detailed_info = format!( + "LoadState={}\nActiveState={}\nSubState={}", + cached_info.load_state, + cached_info.active_state, + cached_info.sub_state + ); + return Ok((active_status, detailed_info)); } } - - false + + // Fallback to systemctl if not in cache + let output = Command::new("systemctl") + .args(&["is-active", &format!("{}.service", service)]) + .output()?; + + let active_status = String::from_utf8(output.stdout)?.trim().to_string(); + + // Get more detailed info + let output = Command::new("systemctl") + .args(&["show", &format!("{}.service", service), "--property=LoadState,ActiveState,SubState"]) + .output()?; + + let detailed_info = String::from_utf8(output.stdout)?; + Ok((active_status, detailed_info)) } - + /// Check if service name matches pattern (supports wildcards like nginx*) fn matches_pattern(&self, service_name: &str, pattern: &str) -> bool { - if pattern.ends_with('*') { - let prefix = &pattern[..pattern.len() - 1]; - service_name.starts_with(prefix) + if pattern.contains('*') { + if pattern.ends_with('*') { + // Pattern like "nginx*" - match if service starts with "nginx" + let prefix = &pattern[..pattern.len() - 1]; + service_name.starts_with(prefix) + } else if pattern.starts_with('*') { + // Pattern like "*backup" - match if service ends with "backup" + let suffix = &pattern[1..]; + service_name.ends_with(suffix) + } else { + // Pattern like "nginx*backup" - simple glob matching + self.simple_glob_match(service_name, pattern) + } } else { + // Exact match service_name == pattern } } - + + /// Simple glob matching for patterns with * in the middle + fn simple_glob_match(&self, text: &str, pattern: &str) -> bool { + let parts: Vec<&str> = pattern.split('*').collect(); + let mut pos = 0; + + for part in parts { + if part.is_empty() { + continue; + } + if let Some(found_pos) = text[pos..].find(part) { + pos += found_pos + part.len(); + } else { + return false; + } + } + true + } + /// Get disk usage for a specific service async fn get_service_disk_usage(&self, service_name: &str) -> Result { // Check if this service has configured directory paths @@ -267,20 +433,6 @@ impl SystemdCollector { Ok(0.0) } - /// Normalize service status to standard values - fn normalize_service_status(&self, active_state: &str, sub_state: &str) -> String { - match (active_state, sub_state) { - ("active", "running") => "active".to_string(), - ("active", _) => "active".to_string(), - ("inactive", "dead") => "inactive".to_string(), - ("inactive", _) => "inactive".to_string(), - ("failed", _) => "failed".to_string(), - ("activating", _) => "starting".to_string(), - ("deactivating", _) => "stopping".to_string(), - _ => format!("{}:{}", active_state, sub_state), - } - } - /// Check if service collection cache should be updated fn should_update_cache(&self) -> bool { let state = self.state.read().unwrap();