use anyhow::Result; use cm_dashboard_shared::{Metric, StatusTracker}; use std::time::{Duration, Instant}; use tracing::{debug, error, info}; use crate::collectors::{ backup::BackupCollector, cpu::CpuCollector, disk::DiskCollector, memory::MemoryCollector, nixos::NixOSCollector, systemd::SystemdCollector, Collector, }; use crate::config::{AgentConfig, CollectorConfig}; /// Collector with timing information struct TimedCollector { collector: Box, interval: Duration, last_collection: Option, name: String, } /// Manages all metric collectors with individual intervals pub struct MetricCollectionManager { collectors: Vec, status_tracker: StatusTracker, cached_metrics: Vec, } impl MetricCollectionManager { pub async fn new(config: &CollectorConfig, _agent_config: &AgentConfig) -> Result { let mut collectors: Vec = Vec::new(); // Benchmark mode - only enable specific collector based on env var let benchmark_mode = std::env::var("BENCHMARK_COLLECTOR").ok(); match benchmark_mode.as_deref() { Some("cpu") => { // CPU collector only if config.cpu.enabled { let cpu_collector = CpuCollector::new(config.cpu.clone()); collectors.push(TimedCollector { collector: Box::new(cpu_collector), interval: Duration::from_secs(config.cpu.interval_seconds), last_collection: None, name: "CPU".to_string(), }); info!("BENCHMARK: CPU collector only"); } } Some("memory") => { // Memory collector only if config.memory.enabled { let memory_collector = MemoryCollector::new(config.memory.clone()); collectors.push(TimedCollector { collector: Box::new(memory_collector), interval: Duration::from_secs(config.memory.interval_seconds), last_collection: None, name: "Memory".to_string(), }); info!("BENCHMARK: Memory collector only"); } } Some("disk") => { // Disk collector only let disk_collector = DiskCollector::new(config.disk.clone()); collectors.push(TimedCollector { collector: Box::new(disk_collector), interval: Duration::from_secs(config.disk.interval_seconds), last_collection: None, name: "Disk".to_string(), }); info!("BENCHMARK: Disk collector only"); } Some("systemd") => { // Systemd collector only let systemd_collector = SystemdCollector::new(config.systemd.clone()); collectors.push(TimedCollector { collector: Box::new(systemd_collector), interval: Duration::from_secs(config.systemd.interval_seconds), last_collection: None, name: "Systemd".to_string(), }); info!("BENCHMARK: Systemd collector only"); } Some("backup") => { // Backup collector only if config.backup.enabled { let backup_collector = BackupCollector::new( config.backup.backup_paths.first().cloned(), config.backup.max_age_hours, ); collectors.push(TimedCollector { collector: Box::new(backup_collector), interval: Duration::from_secs(config.backup.interval_seconds), last_collection: None, name: "Backup".to_string(), }); info!("BENCHMARK: Backup collector only"); } } Some("none") => { // No collectors - test agent loop only info!("BENCHMARK: No collectors enabled"); } _ => { // Normal mode - all collectors if config.cpu.enabled { let cpu_collector = CpuCollector::new(config.cpu.clone()); collectors.push(TimedCollector { collector: Box::new(cpu_collector), interval: Duration::from_secs(config.cpu.interval_seconds), last_collection: None, name: "CPU".to_string(), }); info!("CPU collector initialized with {}s interval", config.cpu.interval_seconds); } if config.memory.enabled { let memory_collector = MemoryCollector::new(config.memory.clone()); collectors.push(TimedCollector { collector: Box::new(memory_collector), interval: Duration::from_secs(config.memory.interval_seconds), last_collection: None, name: "Memory".to_string(), }); info!("Memory collector initialized with {}s interval", config.memory.interval_seconds); } let disk_collector = DiskCollector::new(config.disk.clone()); collectors.push(TimedCollector { collector: Box::new(disk_collector), interval: Duration::from_secs(config.disk.interval_seconds), last_collection: None, name: "Disk".to_string(), }); info!("Disk collector initialized with {}s interval", config.disk.interval_seconds); let systemd_collector = SystemdCollector::new(config.systemd.clone()); collectors.push(TimedCollector { collector: Box::new(systemd_collector), interval: Duration::from_secs(config.systemd.interval_seconds), last_collection: None, name: "Systemd".to_string(), }); info!("Systemd collector initialized with {}s interval", config.systemd.interval_seconds); if config.backup.enabled { let backup_collector = BackupCollector::new( config.backup.backup_paths.first().cloned(), config.backup.max_age_hours, ); collectors.push(TimedCollector { collector: Box::new(backup_collector), interval: Duration::from_secs(config.backup.interval_seconds), last_collection: None, name: "Backup".to_string(), }); info!("Backup collector initialized with {}s interval", config.backup.interval_seconds); } if config.nixos.enabled { let nixos_collector = NixOSCollector::new(config.nixos.clone()); collectors.push(TimedCollector { collector: Box::new(nixos_collector), interval: Duration::from_secs(config.nixos.interval_seconds), last_collection: None, name: "NixOS".to_string(), }); info!("NixOS collector initialized with {}s interval", config.nixos.interval_seconds); } } } info!( "Metric collection manager initialized with {} collectors", collectors.len() ); Ok(Self { collectors, status_tracker: StatusTracker::new(), cached_metrics: Vec::new(), }) } /// Force collection from ALL collectors immediately (used at startup) pub async fn collect_all_metrics_force(&mut self) -> Result> { let mut all_metrics = Vec::new(); let now = Instant::now(); for timed_collector in &mut self.collectors { match timed_collector.collector.collect(&mut self.status_tracker).await { Ok(metrics) => { let metric_count = metrics.len(); all_metrics.extend(metrics); timed_collector.last_collection = Some(now); debug!("Force collected {} metrics from {}", metric_count, timed_collector.name); } Err(e) => { error!("Collector {} failed: {}", timed_collector.name, e); } } } // Cache the collected metrics self.cached_metrics = all_metrics.clone(); Ok(all_metrics) } /// Collect metrics from collectors whose intervals have elapsed pub async fn collect_metrics_timed(&mut self) -> Result> { let mut all_metrics = Vec::new(); let now = Instant::now(); for timed_collector in &mut self.collectors { let should_collect = match timed_collector.last_collection { None => true, // First collection Some(last_time) => now.duration_since(last_time) >= timed_collector.interval, }; if should_collect { match timed_collector.collector.collect(&mut self.status_tracker).await { Ok(metrics) => { let metric_count = metrics.len(); all_metrics.extend(metrics); timed_collector.last_collection = Some(now); debug!( "Collected {} metrics from {} ({}s interval)", metric_count, timed_collector.name, timed_collector.interval.as_secs() ); } Err(e) => { error!("Collector {} failed: {}", timed_collector.name, e); } } } } // Update cache with newly collected metrics if !all_metrics.is_empty() { // Merge new metrics with cached metrics (replace by name) for new_metric in &all_metrics { // Remove any existing metric with the same name self.cached_metrics.retain(|cached| cached.name != new_metric.name); // Add the new metric self.cached_metrics.push(new_metric.clone()); } } Ok(all_metrics) } /// Collect metrics from all collectors (legacy method for compatibility) pub async fn collect_all_metrics(&mut self) -> Result> { self.collect_metrics_timed().await } /// Get cached metrics without triggering fresh collection pub fn get_cached_metrics(&self) -> Vec { self.cached_metrics.clone() } }