use anyhow::Result; use cm_dashboard_shared::Metric; use std::collections::HashMap; use std::time::Instant; use tracing::{debug, error, info}; use crate::cache::MetricCacheManager; use crate::collectors::{ backup::BackupCollector, cpu::CpuCollector, disk::DiskCollector, memory::MemoryCollector, systemd::SystemdCollector, Collector, }; use crate::config::{AgentConfig, CollectorConfig}; /// Manages all metric collectors with intelligent caching pub struct MetricCollectionManager { collectors: Vec>, cache_manager: MetricCacheManager, last_collection_times: HashMap, } impl MetricCollectionManager { pub async fn new(config: &CollectorConfig, agent_config: &AgentConfig) -> Result { let mut collectors: Vec> = Vec::new(); // Benchmark mode - only enable specific collector based on env var let benchmark_mode = std::env::var("BENCHMARK_COLLECTOR").ok(); match benchmark_mode.as_deref() { Some("cpu") => { // CPU collector only if config.cpu.enabled { let cpu_collector = CpuCollector::new(config.cpu.clone()); collectors.push(Box::new(cpu_collector)); info!("BENCHMARK: CPU collector only"); } } Some("memory") => { // Memory collector only if config.memory.enabled { let memory_collector = MemoryCollector::new(config.memory.clone()); collectors.push(Box::new(memory_collector)); info!("BENCHMARK: Memory collector only"); } } Some("disk") => { // Disk collector only let disk_collector = DiskCollector::new(config.disk.clone()); collectors.push(Box::new(disk_collector)); info!("BENCHMARK: Disk collector only"); } Some("systemd") => { // Systemd collector only let systemd_collector = SystemdCollector::new(); collectors.push(Box::new(systemd_collector)); info!("BENCHMARK: Systemd collector only"); } Some("backup") => { // Backup collector only if config.backup.enabled { let backup_collector = BackupCollector::new( config.backup.backup_paths.first().cloned(), config.backup.max_age_hours, ); collectors.push(Box::new(backup_collector)); info!("BENCHMARK: Backup collector only"); } } Some("none") => { // No collectors - test agent loop only info!("BENCHMARK: No collectors enabled"); } _ => { // Normal mode - all collectors if config.cpu.enabled { let cpu_collector = CpuCollector::new(config.cpu.clone()); collectors.push(Box::new(cpu_collector)); info!("CPU collector initialized"); } if config.memory.enabled { let memory_collector = MemoryCollector::new(config.memory.clone()); collectors.push(Box::new(memory_collector)); info!("Memory collector initialized"); } let disk_collector = DiskCollector::new(config.disk.clone()); collectors.push(Box::new(disk_collector)); info!("Disk collector initialized"); let systemd_collector = SystemdCollector::new(); collectors.push(Box::new(systemd_collector)); info!("Systemd collector initialized"); if config.backup.enabled { let backup_collector = BackupCollector::new( config.backup.backup_paths.first().cloned(), config.backup.max_age_hours, ); collectors.push(Box::new(backup_collector)); info!("Backup collector initialized"); } } } // Initialize cache manager with configuration let cache_manager = MetricCacheManager::new(agent_config.cache.clone()); // Start background cache tasks cache_manager.start_background_tasks().await; info!( "Metric collection manager initialized with {} collectors and caching enabled", collectors.len() ); Ok(Self { collectors, cache_manager, last_collection_times: HashMap::new(), }) } /// Force collection from ALL collectors immediately (used at startup) pub async fn collect_all_metrics_force(&mut self) -> Result> { let mut all_metrics = Vec::new(); let now = Instant::now(); info!( "Force collecting from ALL {} collectors for startup", self.collectors.len() ); // Force collection from every collector regardless of intervals for collector in &self.collectors { let collector_name = collector.name(); match collector.collect().await { Ok(metrics) => { info!( "Force collected {} metrics from {} collector", metrics.len(), collector_name ); // Cache all new metrics for metric in &metrics { self.cache_manager.cache_metric(metric.clone()).await; } all_metrics.extend(metrics); self.last_collection_times .insert(collector_name.to_string(), now); } Err(e) => { error!( "Collector '{}' failed during force collection: {}", collector_name, e ); // Continue with other collectors even if one fails } } } info!( "Force collection completed: {} total metrics cached", all_metrics.len() ); Ok(all_metrics) } /// Collect metrics from all collectors with intelligent caching pub async fn collect_all_metrics(&mut self) -> Result> { let mut all_metrics = Vec::new(); let now = Instant::now(); // Collecting metrics from collectors (debug logging disabled for performance) // Keep track of which collector types we're collecting fresh data from let mut collecting_fresh = std::collections::HashSet::new(); // For each collector, check if we need to collect based on time intervals for collector in &self.collectors { let collector_name = collector.name(); // Determine cache interval for this collector type based on data volatility let cache_interval_secs = match collector_name { "cpu" | "memory" => 5, // Fast updates for volatile metrics "systemd" => 30, // Service status changes less frequently "disk" => 300, // SMART data changes very slowly (5 minutes) "backup" => 600, // Backup status changes rarely (10 minutes) _ => 30, // Default: moderate frequency }; let should_collect = if let Some(last_time) = self.last_collection_times.get(collector_name) { now.duration_since(*last_time).as_secs() >= cache_interval_secs } else { true // First collection }; if should_collect { collecting_fresh.insert(collector_name.to_string()); match collector.collect().await { Ok(metrics) => { // Collector returned fresh metrics (debug logging disabled for performance) // Cache all new metrics for metric in &metrics { self.cache_manager.cache_metric(metric.clone()).await; } all_metrics.extend(metrics); self.last_collection_times .insert(collector_name.to_string(), now); } Err(e) => { error!("Collector '{}' failed: {}", collector_name, e); // Continue with other collectors even if one fails } } } else { let _elapsed = self .last_collection_times .get(collector_name) .map(|t| now.duration_since(*t).as_secs()) .unwrap_or(0); // Collector skipped (debug logging disabled for performance) } } // For 2-second intervals, skip cached metrics to avoid duplicates // (Cache system disabled for realtime updates) // Collected metrics total (debug logging disabled for performance) Ok(all_metrics) } /// Get names of all registered collectors pub fn get_collector_names(&self) -> Vec { self.collectors .iter() .map(|c| c.name().to_string()) .collect() } /// Get collector statistics pub fn get_stats(&self) -> HashMap { self.collectors .iter() .map(|c| (c.name().to_string(), true)) // All collectors are enabled .collect() } /// Get all cached metrics from the cache manager pub async fn get_all_cached_metrics(&self) -> Result> { let cached_metrics = self.cache_manager.get_all_cached_metrics().await; debug!( "Retrieved {} cached metrics for broadcast", cached_metrics.len() ); Ok(cached_metrics) } /// Determine which collector handles a specific metric fn get_collector_for_metric(&self, metric_name: &str) -> String { if metric_name.starts_with("cpu_") { "cpu".to_string() } else if metric_name.starts_with("memory_") { "memory".to_string() } else if metric_name.starts_with("disk_") { "disk".to_string() } else if metric_name.starts_with("service_") { "systemd".to_string() } else if metric_name.starts_with("backup_") { "backup".to_string() } else { "unknown".to_string() } } }