diff --git a/agent/src/agent.rs b/agent/src/agent.rs index 59d9069..9604c42 100644 --- a/agent/src/agent.rs +++ b/agent/src/agent.rs @@ -179,7 +179,7 @@ impl Agent { async fn process_metrics(&mut self, metrics: &[Metric]) { for metric in metrics { - self.host_status_manager.process_metric(metric, &mut self.notification_manager).await; + self.host_status_manager.process_metric(metric, &mut self.notification_manager, self.metric_manager.get_cache_manager()).await; } } diff --git a/agent/src/cache/mod.rs b/agent/src/cache/mod.rs index ac9d616..affcd18 100644 --- a/agent/src/cache/mod.rs +++ b/agent/src/cache/mod.rs @@ -1,101 +1,119 @@ use cm_dashboard_shared::{CacheConfig, Metric}; use std::collections::HashMap; -use std::time::Instant; +use std::fs; +use std::path::Path; +use std::sync::Arc; use tokio::sync::RwLock; -use tracing::warn; +use tracing::{info, warn, error}; -mod cached_metric; -mod manager; - -pub use cached_metric::CachedMetric; -pub use manager::MetricCacheManager; - -/// Central cache for individual metrics with configurable tiers -pub struct ConfigurableCache { - cache: RwLock>, - config: CacheConfig, +/// Simple persistent cache for metrics +pub struct SimpleCache { + metrics: RwLock>, + persist_path: String, } -impl ConfigurableCache { +impl SimpleCache { pub fn new(config: CacheConfig) -> Self { - Self { - cache: RwLock::new(HashMap::new()), - config, - } + let cache = Self { + metrics: RwLock::new(HashMap::new()), + persist_path: config.persist_path, + }; + + // Load from disk on startup + cache.load_from_disk(); + cache } /// Store metric in cache pub async fn store_metric(&self, metric: Metric) { - if !self.config.enabled { - return; - } - - let mut cache = self.cache.write().await; - - // Enforce max entries limit - if cache.len() >= self.config.max_entries { - self.cleanup_old_entries(&mut cache).await; - } - - let cached_metric = CachedMetric { - metric: metric.clone(), - collected_at: Instant::now(), - access_count: 1, - }; - - cache.insert(metric.name.clone(), cached_metric); - - // Cached metric (debug logging disabled for performance) + let mut metrics = self.metrics.write().await; + metrics.insert(metric.name.clone(), metric); } - /// Get all cached metrics (including expired ones) for broadcasting + /// Get all cached metrics pub async fn get_all_cached_metrics(&self) -> Vec { - if !self.config.enabled { - return vec![]; - } - - let cache = self.cache.read().await; - let mut all_metrics = Vec::new(); - - for cached_metric in cache.values() { - all_metrics.push(cached_metric.metric.clone()); - } - - all_metrics + let metrics = self.metrics.read().await; + metrics.values().cloned().collect() } - /// Background cleanup of old entries - async fn cleanup_old_entries(&self, cache: &mut HashMap) { - let mut to_remove = Vec::new(); - - for (metric_name, cached_metric) in cache.iter() { - let cache_interval = self.config.default_ttl_seconds; - let elapsed = cached_metric.collected_at.elapsed().as_secs(); - - // Remove entries that are way past their expiration (2x interval) - if elapsed > cache_interval * 2 { - to_remove.push(metric_name.clone()); + /// Save cache to disk + pub async fn save_to_disk(&self) { + let metrics = self.metrics.read().await; + + // Create directory if needed + if let Some(parent) = Path::new(&self.persist_path).parent() { + if let Err(e) = fs::create_dir_all(parent) { + warn!("Failed to create cache directory {}: {}", parent.display(), e); + return; } } - for metric_name in to_remove { - cache.remove(&metric_name); - } - - // If still too many entries, remove least recently accessed - if cache.len() >= self.config.max_entries { - let mut entries: Vec<_> = cache - .iter() - .map(|(k, v)| (k.clone(), v.access_count)) - .collect(); - entries.sort_by_key(|(_, access_count)| *access_count); - - let excess = cache.len() - (self.config.max_entries * 3 / 4); // Remove 25% - for (metric_name, _) in entries.iter().take(excess) { - cache.remove(metric_name); + // Serialize and save + match serde_json::to_string_pretty(&*metrics) { + Ok(json) => { + if let Err(e) = fs::write(&self.persist_path, json) { + error!("Failed to save cache to {}: {}", self.persist_path, e); + } } + Err(e) => { + error!("Failed to serialize cache: {}", e); + } + } + } - warn!("Cache cleanup removed {} entries due to size limit", excess); + /// Load cache from disk + fn load_from_disk(&self) { + match fs::read_to_string(&self.persist_path) { + Ok(content) => { + match serde_json::from_str::>(&content) { + Ok(loaded_metrics) => { + if let Ok(mut metrics) = self.metrics.try_write() { + *metrics = loaded_metrics; + info!("Loaded {} metrics from cache", metrics.len()); + } + } + Err(e) => { + warn!("Failed to parse cache file {}: {}", self.persist_path, e); + } + } + } + Err(_) => { + info!("No cache file found at {}, starting fresh", self.persist_path); + } } } } + + +#[derive(Clone)] +pub struct MetricCacheManager { + cache: Arc, +} + +impl MetricCacheManager { + pub fn new(config: CacheConfig) -> Self { + Self { + cache: Arc::new(SimpleCache::new(config)), + } + } + + pub async fn store_metric(&self, metric: Metric) { + self.cache.store_metric(metric).await; + } + + pub async fn cache_metric(&self, metric: Metric) { + self.store_metric(metric).await; + } + + pub async fn start_background_tasks(&self) { + // No background tasks needed for simple cache + } + + pub async fn get_all_cached_metrics(&self) -> Result, anyhow::Error> { + Ok(self.cache.get_all_cached_metrics().await) + } + + pub async fn save_to_disk(&self) { + self.cache.save_to_disk().await; + } +} \ No newline at end of file diff --git a/agent/src/config/validation.rs b/agent/src/config/validation.rs index 11c558e..c420bb3 100644 --- a/agent/src/config/validation.rs +++ b/agent/src/config/validation.rs @@ -112,14 +112,8 @@ pub fn validate_config(config: &AgentConfig) -> Result<()> { } // Validate cache configuration - if config.cache.enabled { - if config.cache.default_ttl_seconds == 0 { - bail!("Cache TTL cannot be 0"); - } - - if config.cache.max_entries == 0 { - bail!("Cache max entries cannot be 0"); - } + if config.cache.persist_path.is_empty() { + bail!("Cache persist path cannot be empty"); } Ok(()) diff --git a/agent/src/metrics/mod.rs b/agent/src/metrics/mod.rs index ca898f1..2fe3b33 100644 --- a/agent/src/metrics/mod.rs +++ b/agent/src/metrics/mod.rs @@ -240,7 +240,7 @@ impl MetricCollectionManager { /// Get all cached metrics from the cache manager pub async fn get_all_cached_metrics(&self) -> Result> { - let cached_metrics = self.cache_manager.get_all_cached_metrics().await; + let cached_metrics = self.cache_manager.get_all_cached_metrics().await?; debug!( "Retrieved {} cached metrics for broadcast", cached_metrics.len() @@ -248,4 +248,8 @@ impl MetricCollectionManager { Ok(cached_metrics) } + pub fn get_cache_manager(&self) -> &MetricCacheManager { + &self.cache_manager + } + } diff --git a/agent/src/status/mod.rs b/agent/src/status/mod.rs index 18a6d17..516cf22 100644 --- a/agent/src/status/mod.rs +++ b/agent/src/status/mod.rs @@ -9,7 +9,6 @@ use chrono::Utc; pub struct HostStatusConfig { pub enabled: bool, pub aggregation_method: String, // "worst_case" - pub update_interval_seconds: u64, pub notification_interval_seconds: u64, } @@ -18,7 +17,6 @@ impl Default for HostStatusConfig { Self { enabled: true, aggregation_method: "worst_case".to_string(), - update_interval_seconds: 5, notification_interval_seconds: 30, } } @@ -72,7 +70,7 @@ impl HostStatusManager { /// Update the status of a specific service and recalculate host status /// Updates real-time status and buffers changes for email notifications - pub fn update_service_status(&mut self, service: String, status: Status) { + pub fn update_service_status(&mut self, service: String, status: Status, cache_manager: Option<&crate::cache::MetricCacheManager>) { if !self.config.enabled { return; } @@ -84,6 +82,14 @@ impl HostStatusManager { return; } + // Save cache when status changes (clone cache manager reference for async) + if let Some(cache) = cache_manager { + let cache = cache.clone(); + tokio::spawn(async move { + cache.save_to_disk().await; + }); + } + // Initialize batch if this is the first change if self.batch_start_time.is_none() { self.batch_start_time = Some(Instant::now()); @@ -163,9 +169,9 @@ impl HostStatusManager { /// Process a metric - updates status (notifications handled separately via batching) - pub async fn process_metric(&mut self, metric: &Metric, _notification_manager: &mut crate::notifications::NotificationManager) { + pub async fn process_metric(&mut self, metric: &Metric, _notification_manager: &mut crate::notifications::NotificationManager, cache_manager: &crate::cache::MetricCacheManager) { // Just update status - notifications are handled by process_pending_notifications - self.update_service_status(metric.name.clone(), metric.status); + self.update_service_status(metric.name.clone(), metric.status, Some(cache_manager)); } /// Process pending notifications - call this at notification intervals diff --git a/shared/src/cache.rs b/shared/src/cache.rs index 7d87fa8..3e5ffda 100644 --- a/shared/src/cache.rs +++ b/shared/src/cache.rs @@ -3,23 +3,13 @@ use serde::{Deserialize, Serialize}; /// Cache configuration #[derive(Debug, Clone, Deserialize, Serialize)] pub struct CacheConfig { - pub enabled: bool, - pub default_ttl_seconds: u64, - pub max_entries: usize, - pub warming_timeout_seconds: u64, - pub background_refresh_enabled: bool, - pub cleanup_interval_seconds: u64, + pub persist_path: String, } impl Default for CacheConfig { fn default() -> Self { Self { - enabled: true, - default_ttl_seconds: 30, - max_entries: 10000, - warming_timeout_seconds: 3, - background_refresh_enabled: true, - cleanup_interval_seconds: 1800, + persist_path: "/var/lib/cm-dashboard/cache.json".to_string(), } } }