All checks were successful
Build and Release / build-and-release (push) Successful in 1m13s
- Separate dashboard updates from email notifications for immediate status aggregation - Add metric caching to MetricCollectionManager for instant dashboard updates - Dashboard now receives cached data every 1 second instead of waiting for collection intervals - Fix transmission to use cached metrics rather than triggering fresh collection - Email notifications maintain separate 60-second batching interval - Update configurable email notification aggregation interval
265 lines
11 KiB
Rust
265 lines
11 KiB
Rust
use anyhow::Result;
|
|
use cm_dashboard_shared::{Metric, StatusTracker};
|
|
use std::time::{Duration, Instant};
|
|
use tracing::{debug, error, info};
|
|
|
|
use crate::collectors::{
|
|
backup::BackupCollector, cpu::CpuCollector, disk::DiskCollector, memory::MemoryCollector,
|
|
nixos::NixOSCollector, systemd::SystemdCollector, Collector,
|
|
};
|
|
use crate::config::{AgentConfig, CollectorConfig};
|
|
|
|
/// Collector with timing information
|
|
struct TimedCollector {
|
|
collector: Box<dyn Collector>,
|
|
interval: Duration,
|
|
last_collection: Option<Instant>,
|
|
name: String,
|
|
}
|
|
|
|
/// Manages all metric collectors with individual intervals
|
|
pub struct MetricCollectionManager {
|
|
collectors: Vec<TimedCollector>,
|
|
status_tracker: StatusTracker,
|
|
cached_metrics: Vec<Metric>,
|
|
}
|
|
|
|
impl MetricCollectionManager {
|
|
pub async fn new(config: &CollectorConfig, _agent_config: &AgentConfig) -> Result<Self> {
|
|
let mut collectors: Vec<TimedCollector> = Vec::new();
|
|
|
|
// Benchmark mode - only enable specific collector based on env var
|
|
let benchmark_mode = std::env::var("BENCHMARK_COLLECTOR").ok();
|
|
|
|
match benchmark_mode.as_deref() {
|
|
Some("cpu") => {
|
|
// CPU collector only
|
|
if config.cpu.enabled {
|
|
let cpu_collector = CpuCollector::new(config.cpu.clone());
|
|
collectors.push(TimedCollector {
|
|
collector: Box::new(cpu_collector),
|
|
interval: Duration::from_secs(config.cpu.interval_seconds),
|
|
last_collection: None,
|
|
name: "CPU".to_string(),
|
|
});
|
|
info!("BENCHMARK: CPU collector only");
|
|
}
|
|
}
|
|
Some("memory") => {
|
|
// Memory collector only
|
|
if config.memory.enabled {
|
|
let memory_collector = MemoryCollector::new(config.memory.clone());
|
|
collectors.push(TimedCollector {
|
|
collector: Box::new(memory_collector),
|
|
interval: Duration::from_secs(config.memory.interval_seconds),
|
|
last_collection: None,
|
|
name: "Memory".to_string(),
|
|
});
|
|
info!("BENCHMARK: Memory collector only");
|
|
}
|
|
}
|
|
Some("disk") => {
|
|
// Disk collector only
|
|
let disk_collector = DiskCollector::new(config.disk.clone());
|
|
collectors.push(TimedCollector {
|
|
collector: Box::new(disk_collector),
|
|
interval: Duration::from_secs(config.disk.interval_seconds),
|
|
last_collection: None,
|
|
name: "Disk".to_string(),
|
|
});
|
|
info!("BENCHMARK: Disk collector only");
|
|
}
|
|
Some("systemd") => {
|
|
// Systemd collector only
|
|
let systemd_collector = SystemdCollector::new(config.systemd.clone());
|
|
collectors.push(TimedCollector {
|
|
collector: Box::new(systemd_collector),
|
|
interval: Duration::from_secs(config.systemd.interval_seconds),
|
|
last_collection: None,
|
|
name: "Systemd".to_string(),
|
|
});
|
|
info!("BENCHMARK: Systemd collector only");
|
|
}
|
|
Some("backup") => {
|
|
// Backup collector only
|
|
if config.backup.enabled {
|
|
let backup_collector = BackupCollector::new(
|
|
config.backup.backup_paths.first().cloned(),
|
|
config.backup.max_age_hours,
|
|
);
|
|
collectors.push(TimedCollector {
|
|
collector: Box::new(backup_collector),
|
|
interval: Duration::from_secs(config.backup.interval_seconds),
|
|
last_collection: None,
|
|
name: "Backup".to_string(),
|
|
});
|
|
info!("BENCHMARK: Backup collector only");
|
|
}
|
|
}
|
|
Some("none") => {
|
|
// No collectors - test agent loop only
|
|
info!("BENCHMARK: No collectors enabled");
|
|
}
|
|
_ => {
|
|
// Normal mode - all collectors
|
|
if config.cpu.enabled {
|
|
let cpu_collector = CpuCollector::new(config.cpu.clone());
|
|
collectors.push(TimedCollector {
|
|
collector: Box::new(cpu_collector),
|
|
interval: Duration::from_secs(config.cpu.interval_seconds),
|
|
last_collection: None,
|
|
name: "CPU".to_string(),
|
|
});
|
|
info!("CPU collector initialized with {}s interval", config.cpu.interval_seconds);
|
|
}
|
|
|
|
if config.memory.enabled {
|
|
let memory_collector = MemoryCollector::new(config.memory.clone());
|
|
collectors.push(TimedCollector {
|
|
collector: Box::new(memory_collector),
|
|
interval: Duration::from_secs(config.memory.interval_seconds),
|
|
last_collection: None,
|
|
name: "Memory".to_string(),
|
|
});
|
|
info!("Memory collector initialized with {}s interval", config.memory.interval_seconds);
|
|
}
|
|
|
|
let disk_collector = DiskCollector::new(config.disk.clone());
|
|
collectors.push(TimedCollector {
|
|
collector: Box::new(disk_collector),
|
|
interval: Duration::from_secs(config.disk.interval_seconds),
|
|
last_collection: None,
|
|
name: "Disk".to_string(),
|
|
});
|
|
info!("Disk collector initialized with {}s interval", config.disk.interval_seconds);
|
|
|
|
let systemd_collector = SystemdCollector::new(config.systemd.clone());
|
|
collectors.push(TimedCollector {
|
|
collector: Box::new(systemd_collector),
|
|
interval: Duration::from_secs(config.systemd.interval_seconds),
|
|
last_collection: None,
|
|
name: "Systemd".to_string(),
|
|
});
|
|
info!("Systemd collector initialized with {}s interval", config.systemd.interval_seconds);
|
|
|
|
if config.backup.enabled {
|
|
let backup_collector = BackupCollector::new(
|
|
config.backup.backup_paths.first().cloned(),
|
|
config.backup.max_age_hours,
|
|
);
|
|
collectors.push(TimedCollector {
|
|
collector: Box::new(backup_collector),
|
|
interval: Duration::from_secs(config.backup.interval_seconds),
|
|
last_collection: None,
|
|
name: "Backup".to_string(),
|
|
});
|
|
info!("Backup collector initialized with {}s interval", config.backup.interval_seconds);
|
|
}
|
|
|
|
if config.nixos.enabled {
|
|
let nixos_collector = NixOSCollector::new(config.nixos.clone());
|
|
collectors.push(TimedCollector {
|
|
collector: Box::new(nixos_collector),
|
|
interval: Duration::from_secs(config.nixos.interval_seconds),
|
|
last_collection: None,
|
|
name: "NixOS".to_string(),
|
|
});
|
|
info!("NixOS collector initialized with {}s interval", config.nixos.interval_seconds);
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
info!(
|
|
"Metric collection manager initialized with {} collectors",
|
|
collectors.len()
|
|
);
|
|
|
|
Ok(Self {
|
|
collectors,
|
|
status_tracker: StatusTracker::new(),
|
|
cached_metrics: Vec::new(),
|
|
})
|
|
}
|
|
|
|
/// Force collection from ALL collectors immediately (used at startup)
|
|
pub async fn collect_all_metrics_force(&mut self) -> Result<Vec<Metric>> {
|
|
let mut all_metrics = Vec::new();
|
|
let now = Instant::now();
|
|
|
|
for timed_collector in &mut self.collectors {
|
|
match timed_collector.collector.collect(&mut self.status_tracker).await {
|
|
Ok(metrics) => {
|
|
let metric_count = metrics.len();
|
|
all_metrics.extend(metrics);
|
|
timed_collector.last_collection = Some(now);
|
|
debug!("Force collected {} metrics from {}", metric_count, timed_collector.name);
|
|
}
|
|
Err(e) => {
|
|
error!("Collector {} failed: {}", timed_collector.name, e);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Cache the collected metrics
|
|
self.cached_metrics = all_metrics.clone();
|
|
Ok(all_metrics)
|
|
}
|
|
|
|
/// Collect metrics from collectors whose intervals have elapsed
|
|
pub async fn collect_metrics_timed(&mut self) -> Result<Vec<Metric>> {
|
|
let mut all_metrics = Vec::new();
|
|
let now = Instant::now();
|
|
|
|
for timed_collector in &mut self.collectors {
|
|
let should_collect = match timed_collector.last_collection {
|
|
None => true, // First collection
|
|
Some(last_time) => now.duration_since(last_time) >= timed_collector.interval,
|
|
};
|
|
|
|
if should_collect {
|
|
match timed_collector.collector.collect(&mut self.status_tracker).await {
|
|
Ok(metrics) => {
|
|
let metric_count = metrics.len();
|
|
all_metrics.extend(metrics);
|
|
timed_collector.last_collection = Some(now);
|
|
debug!(
|
|
"Collected {} metrics from {} ({}s interval)",
|
|
metric_count,
|
|
timed_collector.name,
|
|
timed_collector.interval.as_secs()
|
|
);
|
|
}
|
|
Err(e) => {
|
|
error!("Collector {} failed: {}", timed_collector.name, e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Update cache with newly collected metrics
|
|
if !all_metrics.is_empty() {
|
|
// Merge new metrics with cached metrics (replace by name)
|
|
for new_metric in &all_metrics {
|
|
// Remove any existing metric with the same name
|
|
self.cached_metrics.retain(|cached| cached.name != new_metric.name);
|
|
// Add the new metric
|
|
self.cached_metrics.push(new_metric.clone());
|
|
}
|
|
}
|
|
|
|
Ok(all_metrics)
|
|
}
|
|
|
|
/// Collect metrics from all collectors (legacy method for compatibility)
|
|
pub async fn collect_all_metrics(&mut self) -> Result<Vec<Metric>> {
|
|
self.collect_metrics_timed().await
|
|
}
|
|
|
|
/// Get cached metrics without triggering fresh collection
|
|
pub fn get_cached_metrics(&self) -> Vec<Metric> {
|
|
self.cached_metrics.clone()
|
|
}
|
|
|
|
}
|