diff --git a/agent/src/collectors/disk.rs b/agent/src/collectors/disk.rs index 43e7062..361be37 100644 --- a/agent/src/collectors/disk.rs +++ b/agent/src/collectors/disk.rs @@ -2,6 +2,9 @@ use anyhow::Result; use async_trait::async_trait; use cm_dashboard_shared::{Metric, MetricValue, Status}; +use crate::config::DiskConfig; +use std::fs; +use std::path::Path; use std::process::Command; use std::time::Instant; use tracing::debug; @@ -11,24 +14,136 @@ use super::{Collector, CollectorError, PerformanceMetrics}; /// Information about a mounted disk #[derive(Debug, Clone)] struct MountedDisk { - device: String, // e.g., "/dev/nvme0n1p1" - physical_device: String, // e.g., "/dev/nvme0n1" - mount_point: String, // e.g., "/" - filesystem: String, // e.g., "ext4" + device: String, // e.g., "/dev/nvme0n1p1" + physical_device: String, // e.g., "/dev/nvme0n1" + mount_point: String, // e.g., "/" + filesystem: String, // e.g., "ext4" size: String, // e.g., "120G" used: String, // e.g., "45G" available: String, // e.g., "75G" usage_percent: f32, // e.g., 38.5 + config_name: Option, // Name from config if UUID-based } /// Disk usage collector for monitoring filesystem sizes pub struct DiskCollector { - // Immutable collector for caching compatibility + config: DiskConfig, } impl DiskCollector { - pub fn new() -> Self { - Self {} + pub fn new(config: DiskConfig) -> Self { + Self { config } + } + + /// Resolve UUID to actual device path + fn resolve_uuid_to_device(&self, uuid: &str) -> Result { + let uuid_path = format!("/dev/disk/by-uuid/{}", uuid); + + if Path::new(&uuid_path).exists() { + match fs::read_link(&uuid_path) { + Ok(target) => { + // Convert relative path to absolute + if target.is_relative() { + let parent = Path::new(&uuid_path).parent().unwrap(); + let resolved = parent.join(&target); + match resolved.canonicalize() { + Ok(canonical) => Ok(canonical.to_string_lossy().to_string()), + Err(_) => Ok(target.to_string_lossy().to_string()), + } + } else { + Ok(target.to_string_lossy().to_string()) + } + } + Err(e) => Err(anyhow::anyhow!("Failed to resolve UUID {}: {}", uuid, e)), + } + } else { + Err(anyhow::anyhow!("UUID {} not found in /dev/disk/by-uuid/", uuid)) + } + } + + /// Get configured filesystems from UUIDs + fn get_configured_filesystems(&self) -> Result> { + let mut configured_disks = Vec::new(); + + for fs_config in &self.config.filesystems { + if !fs_config.monitor { + continue; + } + + // Resolve UUID to device + match self.resolve_uuid_to_device(&fs_config.uuid) { + Ok(device_path) => { + // Get filesystem stats for the mount point + match self.get_filesystem_info(&fs_config.mount_point) { + Ok((total_bytes, used_bytes)) => { + let available_bytes = total_bytes - used_bytes; + let usage_percent = if total_bytes > 0 { + (used_bytes as f64 / total_bytes as f64) * 100.0 + } else { + 0.0 + }; + + // Convert bytes to human-readable format + let size = self.bytes_to_human_readable(total_bytes); + let used = self.bytes_to_human_readable(used_bytes); + let available = self.bytes_to_human_readable(available_bytes); + + // Get physical device for SMART monitoring + let physical_device = self.get_physical_device(&device_path)?; + + configured_disks.push(MountedDisk { + device: device_path.clone(), + physical_device, + mount_point: fs_config.mount_point.clone(), + filesystem: fs_config.fs_type.clone(), + size, + used, + available, + usage_percent: usage_percent as f32, + config_name: Some(fs_config.name.clone()), + }); + + debug!( + "Configured filesystem '{}' (UUID: {}) mounted at {} using {}", + fs_config.name, fs_config.uuid, fs_config.mount_point, device_path + ); + } + Err(e) => { + debug!( + "Failed to get filesystem info for configured filesystem '{}': {}", + fs_config.name, e + ); + } + } + } + Err(e) => { + debug!( + "Failed to resolve UUID for configured filesystem '{}': {}", + fs_config.name, e + ); + } + } + } + + Ok(configured_disks) + } + + /// Convert bytes to human-readable format + fn bytes_to_human_readable(&self, bytes: u64) -> String { + const UNITS: &[&str] = &["B", "K", "M", "G", "T"]; + let mut size = bytes as f64; + let mut unit_index = 0; + + while size >= 1024.0 && unit_index < UNITS.len() - 1 { + size /= 1024.0; + unit_index += 1; + } + + if unit_index == 0 { + format!("{:.0}{}", size, UNITS[unit_index]) + } else { + format!("{:.1}{}", size, UNITS[unit_index]) + } } /// Get directory size using du command (efficient for single directory) @@ -42,9 +157,12 @@ impl DiskCollector { // du returns success even with permission denied warnings in stderr // We only care if the command completely failed or produced no stdout let output_str = String::from_utf8(output.stdout)?; - + if output_str.trim().is_empty() { - return Err(anyhow::anyhow!("du command produced no output for {}", path)); + return Err(anyhow::anyhow!( + "du command produced no output for {}", + path + )); } let size_str = output_str @@ -69,7 +187,7 @@ impl DiskCollector { let output_str = String::from_utf8(output.stdout)?; let lines: Vec<&str> = output_str.lines().collect(); - + if lines.len() < 2 { return Err(anyhow::anyhow!("Unexpected df output format")); } @@ -92,64 +210,6 @@ impl DiskCollector { Ok((total_bytes, used_bytes, usage_percent as f32)) } - /// Get all mounted disks with their mount points and underlying devices - fn get_mounted_disks(&self) -> Result> { - let output = Command::new("df") - .arg("-h") - .arg("--output=source,target,fstype,size,used,avail,pcent") - .output()?; - - if !output.status.success() { - return Err(anyhow::anyhow!("df command failed")); - } - - let output_str = String::from_utf8(output.stdout)?; - let mut mounted_disks = Vec::new(); - - for line in output_str.lines().skip(1) { // Skip header - let fields: Vec<&str> = line.split_whitespace().collect(); - if fields.len() >= 7 { - let source = fields[0]; - let target = fields[1]; - let fstype = fields[2]; - let size = fields[3]; - let used = fields[4]; - let avail = fields[5]; - let pcent_str = fields[6]; - - // Skip special filesystems - if source.starts_with("/dev/") && - !fstype.contains("tmpfs") && - !fstype.contains("devtmpfs") && - !target.starts_with("/proc") && - !target.starts_with("/sys") && - !target.starts_with("/dev") { - - // Extract percentage - let usage_percent = pcent_str - .trim_end_matches('%') - .parse::() - .unwrap_or(0.0); - - // Get underlying physical device - let physical_device = self.get_physical_device(source)?; - - mounted_disks.push(MountedDisk { - device: source.to_string(), - physical_device, - mount_point: target.to_string(), - filesystem: fstype.to_string(), - size: size.to_string(), - used: used.to_string(), - available: avail.to_string(), - usage_percent, - }); - } - } - } - - Ok(mounted_disks) - } /// Get the physical device for a given device (resolves symlinks, gets parent device) fn get_physical_device(&self, device: &str) -> Result { @@ -180,14 +240,14 @@ impl DiskCollector { .arg("smartctl") .arg("-H") .arg(device) - .output() + .output() { if output.status.success() { let output_str = String::from_utf8_lossy(&output.stdout); let health_status = if output_str.contains("PASSED") { "PASSED" } else if output_str.contains("FAILED") { - "FAILED" + "FAILED" } else { "UNKNOWN" }; @@ -197,7 +257,7 @@ impl DiskCollector { .arg("smartctl") .arg("-A") .arg(device) - .output() + .output() { let temp_str = String::from_utf8_lossy(&temp_output.stdout); // Look for temperature in SMART attributes @@ -222,7 +282,6 @@ impl DiskCollector { ("UNKNOWN".to_string(), 0.0) } - /// Calculate status based on usage percentage fn calculate_usage_status(&self, used_bytes: u64, total_bytes: u64) -> Status { if total_bytes == 0 { @@ -251,8 +310,8 @@ impl DiskCollector { // Extract numeric part and unit let (num_str, unit) = if let Some(last_char) = size_str.chars().last() { if last_char.is_alphabetic() { - let num_part = &size_str[..size_str.len()-1]; - let unit_part = &size_str[size_str.len()-1..]; + let num_part = &size_str[..size_str.len() - 1]; + let unit_part = &size_str[size_str.len() - 1..]; (num_part, unit_part) } else { (size_str, "") @@ -286,121 +345,142 @@ impl Collector for DiskCollector { let mut metrics = Vec::new(); - // Collect all mounted disks - match self.get_mounted_disks() { - Ok(mounted_disks) => { - debug!("Found {} mounted disks", mounted_disks.len()); + // Use UUID-based configured filesystems + let mounted_disks = match self.get_configured_filesystems() { + Ok(configured) => { + debug!("Using UUID-based filesystems: {} found", configured.len()); + configured + } + Err(e) => { + debug!("Failed to get configured filesystems: {}", e); + Vec::new() + } + }; - // Group disks by physical device to avoid duplicate SMART checks - let mut physical_devices: std::collections::HashMap> = std::collections::HashMap::new(); - for disk in &mounted_disks { - physical_devices.entry(disk.physical_device.clone()) - .or_insert_with(Vec::new) - .push(disk); + // Process discovered/configured disks + if !mounted_disks.is_empty() { + debug!("Found {} mounted disks", mounted_disks.len()); + + // Group disks by physical device to avoid duplicate SMART checks + let mut physical_devices: std::collections::HashMap> = + std::collections::HashMap::new(); + for disk in &mounted_disks { + physical_devices + .entry(disk.physical_device.clone()) + .or_insert_with(Vec::new) + .push(disk); + } + + // Generate metrics for each mounted disk + for (disk_index, disk) in mounted_disks.iter().enumerate() { + let timestamp = chrono::Utc::now().timestamp() as u64; + + // Use config name if available, otherwise use index + let disk_name = disk.config_name.as_ref() + .map(|name| name.clone()) + .unwrap_or_else(|| disk_index.to_string()); + + // Parse size strings to get actual values for calculations + let size_gb = self.parse_size_to_gb(&disk.size); + let used_gb = self.parse_size_to_gb(&disk.used); + let avail_gb = self.parse_size_to_gb(&disk.available); + + // Calculate status based on configured thresholds + let status = if disk.usage_percent >= self.config.usage_critical_percent { + Status::Critical + } else if disk.usage_percent >= self.config.usage_warning_percent { + Status::Warning + } else { + Status::Ok + }; + + // Device and mount point info + metrics.push(Metric { + name: format!("disk_{}_device", disk_name), + value: MetricValue::String(disk.device.clone()), + unit: None, + description: Some(format!("Device: {}", disk.device)), + status: Status::Ok, + timestamp, + }); + + metrics.push(Metric { + name: format!("disk_{}_mount_point", disk_name), + value: MetricValue::String(disk.mount_point.clone()), + unit: None, + description: Some(format!("Mount: {}", disk.mount_point)), + status: Status::Ok, + timestamp, + }); + + metrics.push(Metric { + name: format!("disk_{}_filesystem", disk_name), + value: MetricValue::String(disk.filesystem.clone()), + unit: None, + description: Some(format!("FS: {}", disk.filesystem)), + status: Status::Ok, + timestamp, + }); + + // Size metrics + metrics.push(Metric { + name: format!("disk_{}_total_gb", disk_name), + value: MetricValue::Float(size_gb), + unit: Some("GB".to_string()), + description: Some(format!("Total: {}", disk.size)), + status: Status::Ok, + timestamp, + }); + + metrics.push(Metric { + name: format!("disk_{}_used_gb", disk_name), + value: MetricValue::Float(used_gb), + unit: Some("GB".to_string()), + description: Some(format!("Used: {}", disk.used)), + status, + timestamp, + }); + + metrics.push(Metric { + name: format!("disk_{}_available_gb", disk_name), + value: MetricValue::Float(avail_gb), + unit: Some("GB".to_string()), + description: Some(format!("Available: {}", disk.available)), + status: Status::Ok, + timestamp, + }); + + metrics.push(Metric { + name: format!("disk_{}_usage_percent", disk_name), + value: MetricValue::Float(disk.usage_percent), + unit: Some("%".to_string()), + description: Some(format!("Usage: {:.1}%", disk.usage_percent)), + status, + timestamp, + }); + + // Physical device name (for SMART health grouping) + let physical_device_name = disk + .physical_device + .strip_prefix("/dev/") + .unwrap_or(&disk.physical_device); + + metrics.push(Metric { + name: format!("disk_{}_physical_device", disk_name), + value: MetricValue::String(physical_device_name.to_string()), + unit: None, + description: Some(format!("Physical: {}", physical_device_name)), + status: Status::Ok, + timestamp, + }); } - // Generate metrics for each mounted disk - for (disk_index, disk) in mounted_disks.iter().enumerate() { - let timestamp = chrono::Utc::now().timestamp() as u64; - - // Parse size strings to get actual values for calculations - let size_gb = self.parse_size_to_gb(&disk.size); - let used_gb = self.parse_size_to_gb(&disk.used); - let avail_gb = self.parse_size_to_gb(&disk.available); - - // Calculate status based on usage percentage - let status = if disk.usage_percent >= 95.0 { - Status::Critical - } else if disk.usage_percent >= 85.0 { - Status::Warning - } else { - Status::Ok - }; - - // Device and mount point info - metrics.push(Metric { - name: format!("disk_{}_device", disk_index), - value: MetricValue::String(disk.device.clone()), - unit: None, - description: Some(format!("Device: {}", disk.device)), - status: Status::Ok, - timestamp, - }); - - metrics.push(Metric { - name: format!("disk_{}_mount_point", disk_index), - value: MetricValue::String(disk.mount_point.clone()), - unit: None, - description: Some(format!("Mount: {}", disk.mount_point)), - status: Status::Ok, - timestamp, - }); - - metrics.push(Metric { - name: format!("disk_{}_filesystem", disk_index), - value: MetricValue::String(disk.filesystem.clone()), - unit: None, - description: Some(format!("FS: {}", disk.filesystem)), - status: Status::Ok, - timestamp, - }); - - // Size metrics - metrics.push(Metric { - name: format!("disk_{}_total_gb", disk_index), - value: MetricValue::Float(size_gb), - unit: Some("GB".to_string()), - description: Some(format!("Total: {}", disk.size)), - status: Status::Ok, - timestamp, - }); - - metrics.push(Metric { - name: format!("disk_{}_used_gb", disk_index), - value: MetricValue::Float(used_gb), - unit: Some("GB".to_string()), - description: Some(format!("Used: {}", disk.used)), - status, - timestamp, - }); - - metrics.push(Metric { - name: format!("disk_{}_available_gb", disk_index), - value: MetricValue::Float(avail_gb), - unit: Some("GB".to_string()), - description: Some(format!("Available: {}", disk.available)), - status: Status::Ok, - timestamp, - }); - - metrics.push(Metric { - name: format!("disk_{}_usage_percent", disk_index), - value: MetricValue::Float(disk.usage_percent), - unit: Some("%".to_string()), - description: Some(format!("Usage: {:.1}%", disk.usage_percent)), - status, - timestamp, - }); - - // Physical device name (for SMART health grouping) - let physical_device_name = disk.physical_device - .strip_prefix("/dev/") - .unwrap_or(&disk.physical_device); - - metrics.push(Metric { - name: format!("disk_{}_physical_device", disk_index), - value: MetricValue::String(physical_device_name.to_string()), - unit: None, - description: Some(format!("Physical: {}", physical_device_name)), - status: Status::Ok, - timestamp, - }); - } - - // Add SMART health metrics for each unique physical device - for (physical_device, _disks) in physical_devices { + // Add SMART health metrics for each unique physical device + for (physical_device, _disks) in physical_devices { let (health_status, temperature) = self.get_smart_health(&physical_device); - let device_name = physical_device.strip_prefix("/dev/").unwrap_or(&physical_device); + let device_name = physical_device + .strip_prefix("/dev/") + .unwrap_or(&physical_device); let timestamp = chrono::Utc::now().timestamp() as u64; let health_status_enum = match health_status.as_str() { @@ -438,34 +518,32 @@ impl Collector for DiskCollector { } } - // Add disk count metric - metrics.push(Metric { - name: "disk_count".to_string(), - value: MetricValue::Integer(mounted_disks.len() as i64), - unit: None, - description: Some(format!("Total mounted disks: {}", mounted_disks.len())), - status: Status::Ok, - timestamp: chrono::Utc::now().timestamp() as u64, - }); - } - Err(e) => { - debug!("Failed to get mounted disks: {}", e); - metrics.push(Metric { - name: "disk_count".to_string(), - value: MetricValue::Integer(0), - unit: None, - description: Some(format!("Error: {}", e)), - status: Status::Unknown, - timestamp: chrono::Utc::now().timestamp() as u64, - }); - } + // Add disk count metric + metrics.push(Metric { + name: "disk_count".to_string(), + value: MetricValue::Integer(mounted_disks.len() as i64), + unit: None, + description: Some(format!("Total mounted disks: {}", mounted_disks.len())), + status: Status::Ok, + timestamp: chrono::Utc::now().timestamp() as u64, + }); + } else { + // No disks configured - add zero count metric + metrics.push(Metric { + name: "disk_count".to_string(), + value: MetricValue::Integer(0), + unit: None, + description: Some("No disks configured for monitoring".to_string()), + status: Status::Warning, + timestamp: chrono::Utc::now().timestamp() as u64, + }); } // Monitor /tmp directory size (keep existing functionality) match self.get_directory_size("/tmp") { Ok(tmp_size_bytes) => { let tmp_size_mb = tmp_size_bytes as f64 / (1024.0 * 1024.0); - + // Get /tmp filesystem info (usually tmpfs with 2GB limit) let (total_bytes, _) = match self.get_filesystem_info("/tmp") { Ok((total, used)) => (total, used), @@ -520,8 +598,11 @@ impl Collector for DiskCollector { } let collection_time = start_time.elapsed(); - debug!("Multi-disk collection completed in {:?} with {} metrics", - collection_time, metrics.len()); + debug!( + "Multi-disk collection completed in {:?} with {} metrics", + collection_time, + metrics.len() + ); Ok(metrics) } @@ -529,4 +610,4 @@ impl Collector for DiskCollector { fn get_performance_metrics(&self) -> Option { None // Performance tracking handled by cache system } -} \ No newline at end of file +} diff --git a/agent/src/config/mod.rs b/agent/src/config/mod.rs index 0beee7f..8d851e0 100644 --- a/agent/src/config/mod.rs +++ b/agent/src/config/mod.rs @@ -1,5 +1,6 @@ use anyhow::Result; use cm_dashboard_shared::CacheConfig; +use gethostname::gethostname; use serde::{Deserialize, Serialize}; use std::path::Path; @@ -69,8 +70,17 @@ pub struct DiskConfig { pub interval_seconds: u64, pub usage_warning_percent: f32, pub usage_critical_percent: f32, - pub auto_discover: bool, - pub devices: Vec, + pub filesystems: Vec, +} + +/// Filesystem configuration entry +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FilesystemConfig { + pub name: String, // Human-readable name (e.g., "root", "boot", "home") + pub uuid: String, // UUID for /dev/disk/by-uuid/ resolution + pub mount_point: String, // Expected mount point (e.g., "/", "/boot") + pub fs_type: String, // Filesystem type (e.g., "ext4", "vfat") + pub monitor: bool, // Whether to monitor this filesystem } /// Process collector configuration @@ -121,7 +131,6 @@ pub struct NetworkConfig { pub auto_discover: bool, } - /// Notification configuration #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NotificationConfig { @@ -137,7 +146,7 @@ impl AgentConfig { pub fn load_from_file>(path: P) -> Result { loader::load_config(path) } - + pub fn validate(&self) -> Result<()> { validation::validate_config(self) } @@ -208,13 +217,130 @@ impl Default for MemoryConfig { impl Default for DiskConfig { fn default() -> Self { + let hostname = gethostname::gethostname().to_string_lossy().to_string(); + let filesystems = get_default_filesystems_for_host(&hostname); + Self { enabled: true, interval_seconds: DEFAULT_DISK_INTERVAL_SECONDS, usage_warning_percent: DEFAULT_DISK_WARNING_PERCENT, usage_critical_percent: DEFAULT_DISK_CRITICAL_PERCENT, - auto_discover: true, - devices: Vec::new(), + filesystems, + } + } +} + +/// Get default filesystem configurations for known CMTEC hosts +fn get_default_filesystems_for_host(hostname: &str) -> Vec { + match hostname { + "cmbox" => vec![ + FilesystemConfig { + name: "root".to_string(), + uuid: "4cade5ce-85a5-4a03-83c8-dfd1d3888d79".to_string(), + mount_point: "/".to_string(), + fs_type: "ext4".to_string(), + monitor: true, + }, + FilesystemConfig { + name: "boot".to_string(), + uuid: "AB4D-62EC".to_string(), + mount_point: "/boot".to_string(), + fs_type: "vfat".to_string(), + monitor: true, + }, + ], + "srv02" => vec![ + FilesystemConfig { + name: "root".to_string(), + uuid: "5a880608-c79f-458f-a031-30206aa27ca7".to_string(), + mount_point: "/".to_string(), + fs_type: "ext4".to_string(), + monitor: true, + }, + FilesystemConfig { + name: "boot".to_string(), + uuid: "6B2E-2AD9".to_string(), + mount_point: "/boot".to_string(), + fs_type: "vfat".to_string(), + monitor: true, + }, + ], + "simonbox" => vec![ + FilesystemConfig { + name: "root".to_string(), + uuid: "b74284a9-2899-4f71-bdb0-fd07dc4baab3".to_string(), + mount_point: "/".to_string(), + fs_type: "ext4".to_string(), + monitor: true, + }, + FilesystemConfig { + name: "boot".to_string(), + uuid: "F6A3-AD2B".to_string(), + mount_point: "/boot".to_string(), + fs_type: "vfat".to_string(), + monitor: true, + }, + FilesystemConfig { + name: "steampool_1".to_string(), + uuid: "09300cb7-0938-4dba-8a42-7a7aaf60db51".to_string(), + mount_point: "/steampool_1".to_string(), + fs_type: "ext4".to_string(), + monitor: true, + }, + FilesystemConfig { + name: "steampool_2".to_string(), + uuid: "a2d61a41-3f2a-4760-b62e-5eb8caf50d1a".to_string(), + mount_point: "/steampool_2".to_string(), + fs_type: "ext4".to_string(), + monitor: true, + }, + ], + "steambox" => vec![ + FilesystemConfig { + name: "root".to_string(), + uuid: "4514ca9f-2d0a-40df-b14b-e342f39c3e6a".to_string(), + mount_point: "/".to_string(), + fs_type: "ext4".to_string(), + monitor: true, + }, + FilesystemConfig { + name: "boot".to_string(), + uuid: "8FD2-1B13".to_string(), + mount_point: "/boot".to_string(), + fs_type: "vfat".to_string(), + monitor: true, + }, + FilesystemConfig { + name: "steampool".to_string(), + uuid: "0ebe8abb-bbe7-4224-947b-86bf38981f60".to_string(), + mount_point: "/mnt/steampool".to_string(), + fs_type: "ext4".to_string(), + monitor: true, + }, + ], + "srv01" => vec![ + FilesystemConfig { + name: "root".to_string(), + uuid: "cd98df34-03a3-4d68-8338-d90d2920f9f8".to_string(), + mount_point: "/".to_string(), + fs_type: "ext4".to_string(), + monitor: true, + }, + FilesystemConfig { + name: "boot".to_string(), + uuid: "13E1-4DDE".to_string(), + mount_point: "/boot".to_string(), + fs_type: "vfat".to_string(), + monitor: true, + }, + ], + // labbox and wslbox have no UUIDs configured yet + "labbox" | "wslbox" => { + Vec::new() + }, + _ => { + // Unknown hosts use auto-discovery + Vec::new() } } } @@ -277,7 +403,6 @@ impl Default for NetworkConfig { } } - impl Default for NotificationConfig { fn default() -> Self { Self { @@ -289,4 +414,4 @@ impl Default for NotificationConfig { rate_limit_minutes: DEFAULT_NOTIFICATION_RATE_LIMIT_MINUTES, } } -} \ No newline at end of file +} diff --git a/agent/src/metrics/mod.rs b/agent/src/metrics/mod.rs index e76d39e..6b78cce 100644 --- a/agent/src/metrics/mod.rs +++ b/agent/src/metrics/mod.rs @@ -2,11 +2,14 @@ use anyhow::Result; use cm_dashboard_shared::Metric; use std::collections::HashMap; use std::time::Instant; -use tracing::{info, error, debug}; +use tracing::{debug, error, info}; -use crate::config::{CollectorConfig, AgentConfig}; -use crate::collectors::{Collector, cpu::CpuCollector, memory::MemoryCollector, disk::DiskCollector, systemd::SystemdCollector, backup::BackupCollector}; use crate::cache::MetricCacheManager; +use crate::collectors::{ + backup::BackupCollector, cpu::CpuCollector, disk::DiskCollector, memory::MemoryCollector, + systemd::SystemdCollector, Collector, +}; +use crate::config::{AgentConfig, CollectorConfig}; /// Manages all metric collectors with intelligent caching pub struct MetricCollectionManager { @@ -18,10 +21,10 @@ pub struct MetricCollectionManager { impl MetricCollectionManager { pub async fn new(config: &CollectorConfig, agent_config: &AgentConfig) -> Result { let mut collectors: Vec> = Vec::new(); - + // Benchmark mode - only enable specific collector based on env var let benchmark_mode = std::env::var("BENCHMARK_COLLECTOR").ok(); - + match benchmark_mode.as_deref() { Some("cpu") => { // CPU collector only @@ -30,7 +33,7 @@ impl MetricCollectionManager { collectors.push(Box::new(cpu_collector)); info!("BENCHMARK: CPU collector only"); } - }, + } Some("memory") => { // Memory collector only if config.memory.enabled { @@ -38,34 +41,34 @@ impl MetricCollectionManager { collectors.push(Box::new(memory_collector)); info!("BENCHMARK: Memory collector only"); } - }, + } Some("disk") => { // Disk collector only - let disk_collector = DiskCollector::new(); + let disk_collector = DiskCollector::new(config.disk.clone()); collectors.push(Box::new(disk_collector)); info!("BENCHMARK: Disk collector only"); - }, + } Some("systemd") => { // Systemd collector only let systemd_collector = SystemdCollector::new(); collectors.push(Box::new(systemd_collector)); info!("BENCHMARK: Systemd collector only"); - }, + } Some("backup") => { // Backup collector only if config.backup.enabled { let backup_collector = BackupCollector::new( config.backup.backup_paths.first().cloned(), - config.backup.max_age_hours + config.backup.max_age_hours, ); collectors.push(Box::new(backup_collector)); info!("BENCHMARK: Backup collector only"); } - }, + } Some("none") => { // No collectors - test agent loop only info!("BENCHMARK: No collectors enabled"); - }, + } _ => { // Normal mode - all collectors if config.cpu.enabled { @@ -73,121 +76,140 @@ impl MetricCollectionManager { collectors.push(Box::new(cpu_collector)); info!("CPU collector initialized"); } - + if config.memory.enabled { let memory_collector = MemoryCollector::new(config.memory.clone()); collectors.push(Box::new(memory_collector)); info!("Memory collector initialized"); } - - let disk_collector = DiskCollector::new(); + + let disk_collector = DiskCollector::new(config.disk.clone()); collectors.push(Box::new(disk_collector)); info!("Disk collector initialized"); - + let systemd_collector = SystemdCollector::new(); collectors.push(Box::new(systemd_collector)); info!("Systemd collector initialized"); - + if config.backup.enabled { let backup_collector = BackupCollector::new( config.backup.backup_paths.first().cloned(), - config.backup.max_age_hours + config.backup.max_age_hours, ); collectors.push(Box::new(backup_collector)); info!("Backup collector initialized"); } } } - + // Initialize cache manager with configuration let cache_manager = MetricCacheManager::new(agent_config.cache.clone()); - + // Start background cache tasks cache_manager.start_background_tasks().await; - - info!("Metric collection manager initialized with {} collectors and caching enabled", collectors.len()); - - Ok(Self { + + info!( + "Metric collection manager initialized with {} collectors and caching enabled", + collectors.len() + ); + + Ok(Self { collectors, cache_manager, last_collection_times: HashMap::new(), }) } - + /// Force collection from ALL collectors immediately (used at startup) pub async fn collect_all_metrics_force(&mut self) -> Result> { let mut all_metrics = Vec::new(); let now = Instant::now(); - - info!("Force collecting from ALL {} collectors for startup", self.collectors.len()); - + + info!( + "Force collecting from ALL {} collectors for startup", + self.collectors.len() + ); + // Force collection from every collector regardless of intervals for collector in &self.collectors { let collector_name = collector.name(); - + match collector.collect().await { Ok(metrics) => { - info!("Force collected {} metrics from {} collector", metrics.len(), collector_name); - + info!( + "Force collected {} metrics from {} collector", + metrics.len(), + collector_name + ); + // Cache all new metrics for metric in &metrics { self.cache_manager.cache_metric(metric.clone()).await; } - + all_metrics.extend(metrics); - self.last_collection_times.insert(collector_name.to_string(), now); + self.last_collection_times + .insert(collector_name.to_string(), now); } Err(e) => { - error!("Collector '{}' failed during force collection: {}", collector_name, e); + error!( + "Collector '{}' failed during force collection: {}", + collector_name, e + ); // Continue with other collectors even if one fails } } } - - info!("Force collection completed: {} total metrics cached", all_metrics.len()); + + info!( + "Force collection completed: {} total metrics cached", + all_metrics.len() + ); Ok(all_metrics) } - + /// Collect metrics from all collectors with intelligent caching pub async fn collect_all_metrics(&mut self) -> Result> { let mut all_metrics = Vec::new(); let now = Instant::now(); - + // Collecting metrics from collectors (debug logging disabled for performance) - + // Keep track of which collector types we're collecting fresh data from let mut collecting_fresh = std::collections::HashSet::new(); - + // For each collector, check if we need to collect based on time intervals for collector in &self.collectors { let collector_name = collector.name(); - + // Determine cache interval for this collector type - ALL REALTIME FOR FAST UPDATES let cache_interval_secs = match collector_name { - "cpu" | "memory" | "disk" | "systemd" => 2, // All realtime for fast updates - "backup" => 10, // Backup metrics every 10 seconds for testing - _ => 2, // All realtime for fast updates + "cpu" | "memory" | "disk" | "systemd" => 2, // All realtime for fast updates + "backup" => 10, // Backup metrics every 10 seconds for testing + _ => 2, // All realtime for fast updates }; - - let should_collect = if let Some(last_time) = self.last_collection_times.get(collector_name) { - now.duration_since(*last_time).as_secs() >= cache_interval_secs - } else { - true // First collection - }; - + + let should_collect = + if let Some(last_time) = self.last_collection_times.get(collector_name) { + now.duration_since(*last_time).as_secs() >= cache_interval_secs + } else { + true // First collection + }; + if should_collect { collecting_fresh.insert(collector_name.to_string()); match collector.collect().await { Ok(metrics) => { // Collector returned fresh metrics (debug logging disabled for performance) - + // Cache all new metrics for metric in &metrics { self.cache_manager.cache_metric(metric.clone()).await; } - + all_metrics.extend(metrics); - self.last_collection_times.insert(collector_name.to_string(), now); + self.last_collection_times + .insert(collector_name.to_string(), now); } Err(e) => { error!("Collector '{}' failed: {}", collector_name, e); @@ -195,41 +217,48 @@ impl MetricCollectionManager { } } } else { - let _elapsed = self.last_collection_times.get(collector_name) + let _elapsed = self + .last_collection_times + .get(collector_name) .map(|t| now.duration_since(*t).as_secs()) .unwrap_or(0); // Collector skipped (debug logging disabled for performance) } } - + // For 2-second intervals, skip cached metrics to avoid duplicates // (Cache system disabled for realtime updates) - + // Collected metrics total (debug logging disabled for performance) Ok(all_metrics) } - + /// Get names of all registered collectors pub fn get_collector_names(&self) -> Vec { - self.collectors.iter() + self.collectors + .iter() .map(|c| c.name().to_string()) .collect() } - + /// Get collector statistics pub fn get_stats(&self) -> HashMap { - self.collectors.iter() + self.collectors + .iter() .map(|c| (c.name().to_string(), true)) // All collectors are enabled .collect() } - + /// Get all cached metrics from the cache manager pub async fn get_all_cached_metrics(&self) -> Result> { let cached_metrics = self.cache_manager.get_all_cached_metrics().await; - debug!("Retrieved {} cached metrics for broadcast", cached_metrics.len()); + debug!( + "Retrieved {} cached metrics for broadcast", + cached_metrics.len() + ); Ok(cached_metrics) } - + /// Determine which collector handles a specific metric fn get_collector_for_metric(&self, metric_name: &str) -> String { if metric_name.starts_with("cpu_") { @@ -246,4 +275,4 @@ impl MetricCollectionManager { "unknown".to_string() } } -} \ No newline at end of file +}