Properly restore systemd collector with original architecture
Some checks failed
Build and Release / build-and-release (push) Failing after 1m16s
Some checks failed
Build and Release / build-and-release (push) Failing after 1m16s
- Restore service discovery caching with configurable intervals - Add excluded services filtering logic - Implement complete wildcard pattern matching (*prefix, suffix*, glob) - Add ServiceStatusInfo caching from systemctl commands - Restore cached service status retrieval to avoid repeated systemctl calls - Add proper systemctl command error handling All functionality now matches pre-refactor implementation.
This commit is contained in:
parent
d164c1da5f
commit
9357e5f2a8
@ -24,6 +24,22 @@ struct ServiceCacheState {
|
||||
last_collection: Option<Instant>,
|
||||
/// Cached service data
|
||||
services: Vec<ServiceInfo>,
|
||||
/// Interesting services to monitor (cached after discovery)
|
||||
monitored_services: Vec<String>,
|
||||
/// Cached service status information from discovery
|
||||
service_status_cache: std::collections::HashMap<String, ServiceStatusInfo>,
|
||||
/// Last time services were discovered
|
||||
last_discovery_time: Option<Instant>,
|
||||
/// How often to rediscover services (from config)
|
||||
discovery_interval_seconds: u64,
|
||||
}
|
||||
|
||||
/// Cached service status information from systemctl list-units
|
||||
#[derive(Debug, Clone)]
|
||||
struct ServiceStatusInfo {
|
||||
load_state: String,
|
||||
active_state: String,
|
||||
sub_state: String,
|
||||
}
|
||||
|
||||
/// Internal service information
|
||||
@ -32,7 +48,7 @@ struct ServiceInfo {
|
||||
name: String,
|
||||
status: String, // "active", "inactive", "failed", etc.
|
||||
memory_mb: f32, // Memory usage in MB
|
||||
disk_gb: f32, // Disk usage in GB (usually 0 for services)
|
||||
disk_gb: f32, // Disk usage in GB
|
||||
}
|
||||
|
||||
impl SystemdCollector {
|
||||
@ -40,6 +56,10 @@ impl SystemdCollector {
|
||||
let state = ServiceCacheState {
|
||||
last_collection: None,
|
||||
services: Vec::new(),
|
||||
monitored_services: Vec::new(),
|
||||
service_status_cache: std::collections::HashMap::new(),
|
||||
last_discovery_time: None,
|
||||
discovery_interval_seconds: config.interval_seconds,
|
||||
};
|
||||
|
||||
Self {
|
||||
@ -53,8 +73,36 @@ impl SystemdCollector {
|
||||
let start_time = Instant::now();
|
||||
debug!("Collecting systemd services metrics");
|
||||
|
||||
// Get systemd services status
|
||||
let services = self.get_systemd_services().await?;
|
||||
// Get cached services (discovery only happens when needed)
|
||||
let monitored_services = match self.get_monitored_services() {
|
||||
Ok(services) => services,
|
||||
Err(e) => {
|
||||
debug!("Failed to get monitored services: {}", e);
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
// Collect service data for each monitored service
|
||||
let mut services = Vec::new();
|
||||
for service_name in &monitored_services {
|
||||
match self.get_service_status(service_name) {
|
||||
Ok((active_status, _detailed_info)) => {
|
||||
let memory_mb = self.get_service_memory_usage(service_name).await.unwrap_or(0.0);
|
||||
let disk_gb = self.get_service_disk_usage(service_name).await.unwrap_or(0.0);
|
||||
|
||||
let service_info = ServiceInfo {
|
||||
name: service_name.clone(),
|
||||
status: active_status,
|
||||
memory_mb,
|
||||
disk_gb,
|
||||
};
|
||||
services.push(service_info);
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Failed to get status for service {}: {}", service_name, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update cached state
|
||||
{
|
||||
@ -81,90 +129,208 @@ impl SystemdCollector {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get systemd services information
|
||||
async fn get_systemd_services(&self) -> Result<Vec<ServiceInfo>, CollectorError> {
|
||||
/// Get monitored services, discovering them if needed or cache is expired
|
||||
fn get_monitored_services(&self) -> Result<Vec<String>> {
|
||||
// Check if we need discovery without holding the lock
|
||||
let needs_discovery = {
|
||||
let state = self.state.read().unwrap();
|
||||
match state.last_discovery_time {
|
||||
None => true, // First time
|
||||
Some(last_time) => {
|
||||
let elapsed = last_time.elapsed().as_secs();
|
||||
elapsed >= state.discovery_interval_seconds
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if needs_discovery {
|
||||
debug!("Discovering systemd services (cache expired or first run)");
|
||||
match self.discover_services_internal() {
|
||||
Ok((services, status_cache)) => {
|
||||
if let Ok(mut state) = self.state.write() {
|
||||
state.monitored_services = services.clone();
|
||||
state.service_status_cache = status_cache;
|
||||
state.last_discovery_time = Some(Instant::now());
|
||||
debug!("Auto-discovered {} services to monitor: {:?}",
|
||||
state.monitored_services.len(), state.monitored_services);
|
||||
return Ok(services);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Failed to discover services, using cached list: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Return cached services
|
||||
let state = self.state.read().unwrap();
|
||||
Ok(state.monitored_services.clone())
|
||||
}
|
||||
|
||||
/// Auto-discover interesting services to monitor
|
||||
fn discover_services_internal(&self) -> Result<(Vec<String>, std::collections::HashMap<String, ServiceStatusInfo>)> {
|
||||
// First: Get all service unit files
|
||||
let unit_files_output = Command::new("systemctl")
|
||||
.args(&["list-unit-files", "--type=service", "--no-pager", "--plain"])
|
||||
.output()?;
|
||||
|
||||
if !unit_files_output.status.success() {
|
||||
return Err(anyhow::anyhow!("systemctl list-unit-files command failed"));
|
||||
}
|
||||
|
||||
// Second: Get runtime status of all units
|
||||
let units_status_output = Command::new("systemctl")
|
||||
.args(&["list-units", "--type=service", "--all", "--no-pager", "--plain"])
|
||||
.output()?;
|
||||
|
||||
if !units_status_output.status.success() {
|
||||
return Err(anyhow::anyhow!("systemctl list-units command failed"));
|
||||
}
|
||||
|
||||
let unit_files_str = String::from_utf8(unit_files_output.stdout)?;
|
||||
let units_status_str = String::from_utf8(units_status_output.stdout)?;
|
||||
let mut services = Vec::new();
|
||||
|
||||
// Get basic service status from systemctl
|
||||
let status_output = Command::new("systemctl")
|
||||
.args(&["list-units", "--type=service", "--no-pager", "--plain"])
|
||||
.output()
|
||||
.map_err(|e| CollectorError::SystemRead {
|
||||
path: "systemctl list-units".to_string(),
|
||||
error: e.to_string(),
|
||||
})?;
|
||||
let excluded_services = &self.config.excluded_services;
|
||||
let service_name_filters = &self.config.service_name_filters;
|
||||
|
||||
let status_str = String::from_utf8_lossy(&status_output.stdout);
|
||||
// Parse all service unit files
|
||||
let mut all_service_names = std::collections::HashSet::new();
|
||||
for line in unit_files_str.lines() {
|
||||
let fields: Vec<&str> = line.split_whitespace().collect();
|
||||
if fields.len() >= 2 && fields[0].ends_with(".service") {
|
||||
let service_name = fields[0].trim_end_matches(".service");
|
||||
all_service_names.insert(service_name.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
// Parse service status
|
||||
for line in status_str.lines() {
|
||||
if line.trim().is_empty() || line.contains("UNIT") {
|
||||
// Parse runtime status for all units
|
||||
let mut status_cache = std::collections::HashMap::new();
|
||||
for line in units_status_str.lines() {
|
||||
let fields: Vec<&str> = line.split_whitespace().collect();
|
||||
if fields.len() >= 4 && fields[0].ends_with(".service") {
|
||||
let service_name = fields[0].trim_end_matches(".service");
|
||||
let load_state = fields.get(1).unwrap_or(&"unknown").to_string();
|
||||
let active_state = fields.get(2).unwrap_or(&"unknown").to_string();
|
||||
let sub_state = fields.get(3).unwrap_or(&"unknown").to_string();
|
||||
|
||||
status_cache.insert(service_name.to_string(), ServiceStatusInfo {
|
||||
load_state,
|
||||
active_state,
|
||||
sub_state,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// For services found in unit files but not in runtime status, set default inactive status
|
||||
for service_name in &all_service_names {
|
||||
if !status_cache.contains_key(service_name) {
|
||||
status_cache.insert(service_name.to_string(), ServiceStatusInfo {
|
||||
load_state: "not-loaded".to_string(),
|
||||
active_state: "inactive".to_string(),
|
||||
sub_state: "dead".to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Process all discovered services and apply filters
|
||||
for service_name in &all_service_names {
|
||||
// Skip excluded services first
|
||||
let mut is_excluded = false;
|
||||
for excluded in excluded_services {
|
||||
if service_name.contains(excluded) {
|
||||
is_excluded = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if is_excluded {
|
||||
continue;
|
||||
}
|
||||
|
||||
let parts: Vec<&str> = line.split_whitespace().collect();
|
||||
if parts.len() >= 4 {
|
||||
let service_name = parts[0].trim_end_matches(".service");
|
||||
let load_state = parts[1];
|
||||
let active_state = parts[2];
|
||||
let sub_state = parts[3];
|
||||
|
||||
// Skip if not loaded
|
||||
if load_state != "loaded" {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Filter services based on configuration with wildcard support
|
||||
if self.should_monitor_service(service_name) {
|
||||
// Get memory usage for this service
|
||||
let memory_mb = self.get_service_memory_usage(service_name).await.unwrap_or(0.0);
|
||||
|
||||
// Get disk usage for this service
|
||||
let disk_gb = self.get_service_disk_usage(service_name).await.unwrap_or(0.0);
|
||||
|
||||
let normalized_status = self.normalize_service_status(active_state, sub_state);
|
||||
let service_info = ServiceInfo {
|
||||
name: service_name.to_string(),
|
||||
status: normalized_status,
|
||||
memory_mb,
|
||||
disk_gb,
|
||||
};
|
||||
|
||||
services.push(service_info);
|
||||
// Check if this service matches our filter patterns (supports wildcards)
|
||||
for pattern in service_name_filters {
|
||||
if self.matches_pattern(service_name, pattern) {
|
||||
services.push(service_name.to_string());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(services)
|
||||
Ok((services, status_cache))
|
||||
}
|
||||
|
||||
/// Check if a service should be monitored based on configuration filters with wildcard support
|
||||
fn should_monitor_service(&self, service_name: &str) -> bool {
|
||||
// If no filters configured, monitor nothing (to prevent noise)
|
||||
if self.config.service_name_filters.is_empty() {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check if service matches any of the configured patterns
|
||||
for pattern in &self.config.service_name_filters {
|
||||
if self.matches_pattern(service_name, pattern) {
|
||||
return true;
|
||||
/// Get service status from cache (if available) or fallback to systemctl
|
||||
fn get_service_status(&self, service: &str) -> Result<(String, String)> {
|
||||
// Try to get status from cache first
|
||||
if let Ok(state) = self.state.read() {
|
||||
if let Some(cached_info) = state.service_status_cache.get(service) {
|
||||
let active_status = cached_info.active_state.clone();
|
||||
let detailed_info = format!(
|
||||
"LoadState={}\nActiveState={}\nSubState={}",
|
||||
cached_info.load_state,
|
||||
cached_info.active_state,
|
||||
cached_info.sub_state
|
||||
);
|
||||
return Ok((active_status, detailed_info));
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
// Fallback to systemctl if not in cache
|
||||
let output = Command::new("systemctl")
|
||||
.args(&["is-active", &format!("{}.service", service)])
|
||||
.output()?;
|
||||
|
||||
let active_status = String::from_utf8(output.stdout)?.trim().to_string();
|
||||
|
||||
// Get more detailed info
|
||||
let output = Command::new("systemctl")
|
||||
.args(&["show", &format!("{}.service", service), "--property=LoadState,ActiveState,SubState"])
|
||||
.output()?;
|
||||
|
||||
let detailed_info = String::from_utf8(output.stdout)?;
|
||||
Ok((active_status, detailed_info))
|
||||
}
|
||||
|
||||
/// Check if service name matches pattern (supports wildcards like nginx*)
|
||||
fn matches_pattern(&self, service_name: &str, pattern: &str) -> bool {
|
||||
if pattern.ends_with('*') {
|
||||
let prefix = &pattern[..pattern.len() - 1];
|
||||
service_name.starts_with(prefix)
|
||||
if pattern.contains('*') {
|
||||
if pattern.ends_with('*') {
|
||||
// Pattern like "nginx*" - match if service starts with "nginx"
|
||||
let prefix = &pattern[..pattern.len() - 1];
|
||||
service_name.starts_with(prefix)
|
||||
} else if pattern.starts_with('*') {
|
||||
// Pattern like "*backup" - match if service ends with "backup"
|
||||
let suffix = &pattern[1..];
|
||||
service_name.ends_with(suffix)
|
||||
} else {
|
||||
// Pattern like "nginx*backup" - simple glob matching
|
||||
self.simple_glob_match(service_name, pattern)
|
||||
}
|
||||
} else {
|
||||
// Exact match
|
||||
service_name == pattern
|
||||
}
|
||||
}
|
||||
|
||||
/// Simple glob matching for patterns with * in the middle
|
||||
fn simple_glob_match(&self, text: &str, pattern: &str) -> bool {
|
||||
let parts: Vec<&str> = pattern.split('*').collect();
|
||||
let mut pos = 0;
|
||||
|
||||
for part in parts {
|
||||
if part.is_empty() {
|
||||
continue;
|
||||
}
|
||||
if let Some(found_pos) = text[pos..].find(part) {
|
||||
pos += found_pos + part.len();
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
/// Get disk usage for a specific service
|
||||
async fn get_service_disk_usage(&self, service_name: &str) -> Result<f32, CollectorError> {
|
||||
// Check if this service has configured directory paths
|
||||
@ -267,20 +433,6 @@ impl SystemdCollector {
|
||||
Ok(0.0)
|
||||
}
|
||||
|
||||
/// Normalize service status to standard values
|
||||
fn normalize_service_status(&self, active_state: &str, sub_state: &str) -> String {
|
||||
match (active_state, sub_state) {
|
||||
("active", "running") => "active".to_string(),
|
||||
("active", _) => "active".to_string(),
|
||||
("inactive", "dead") => "inactive".to_string(),
|
||||
("inactive", _) => "inactive".to_string(),
|
||||
("failed", _) => "failed".to_string(),
|
||||
("activating", _) => "starting".to_string(),
|
||||
("deactivating", _) => "stopping".to_string(),
|
||||
_ => format!("{}:{}", active_state, sub_state),
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if service collection cache should be updated
|
||||
fn should_update_cache(&self) -> bool {
|
||||
let state = self.state.read().unwrap();
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user