use cm_dashboard_shared::{AgentData, Metric}; use std::collections::HashMap; use std::time::{Duration, Instant}; use tracing::{debug, info, warn}; use super::MetricDataPoint; /// Central metric storage for the dashboard pub struct MetricStore { /// Current metrics: hostname -> metric_name -> metric current_metrics: HashMap>, /// Historical metrics for trending historical_metrics: HashMap>, /// Last heartbeat timestamp per host last_heartbeat: HashMap, /// Configuration max_metrics_per_host: usize, history_retention: Duration, } impl MetricStore { pub fn new(max_metrics_per_host: usize, history_retention_hours: u64) -> Self { Self { current_metrics: HashMap::new(), historical_metrics: HashMap::new(), last_heartbeat: HashMap::new(), max_metrics_per_host, history_retention: Duration::from_secs(history_retention_hours * 3600), } } /// Update metrics for a specific host pub fn update_metrics(&mut self, hostname: &str, metrics: Vec) { let now = Instant::now(); debug!("Updating {} metrics for host {}", metrics.len(), hostname); // Get or create host entry let host_metrics = self .current_metrics .entry(hostname.to_string()) .or_insert_with(HashMap::new); // Get or create historical entry let host_history = self .historical_metrics .entry(hostname.to_string()) .or_insert_with(Vec::new); // Update current metrics and add to history for metric in metrics { let metric_name = metric.name.clone(); // Store current metric host_metrics.insert(metric_name.clone(), metric.clone()); // Add to history host_history.push(MetricDataPoint { received_at: now }); // Track heartbeat metrics for connectivity detection if metric_name == "agent_heartbeat" { self.last_heartbeat.insert(hostname.to_string(), now); debug!("Updated heartbeat for host {}", hostname); } } // Get metrics count before cleanup let metrics_count = host_metrics.len(); // Cleanup old history and enforce limits self.cleanup_host_data(hostname); info!( "Updated metrics for {}: {} current metrics", hostname, metrics_count ); } /// Process structured agent data (temporary bridge - converts back to metrics) /// TODO: Replace entire metric system with direct structured data processing pub fn process_agent_data(&mut self, agent_data: AgentData) { let metrics = self.convert_agent_data_to_metrics(&agent_data); self.update_metrics(&agent_data.hostname, metrics); } /// Convert structured agent data to legacy metrics (temporary bridge) fn convert_agent_data_to_metrics(&self, agent_data: &AgentData) -> Vec { use cm_dashboard_shared::{Metric, MetricValue, Status}; let mut metrics = Vec::new(); // Convert CPU data metrics.push(Metric::new( "cpu_load_1min".to_string(), MetricValue::Float(agent_data.system.cpu.load_1min), Status::Ok, )); metrics.push(Metric::new( "cpu_load_5min".to_string(), MetricValue::Float(agent_data.system.cpu.load_5min), Status::Ok, )); metrics.push(Metric::new( "cpu_load_15min".to_string(), MetricValue::Float(agent_data.system.cpu.load_15min), Status::Ok, )); metrics.push(Metric::new( "cpu_frequency_mhz".to_string(), MetricValue::Float(agent_data.system.cpu.frequency_mhz), Status::Ok, )); if let Some(temp) = agent_data.system.cpu.temperature_celsius { metrics.push(Metric::new( "cpu_temperature_celsius".to_string(), MetricValue::Float(temp), Status::Ok, )); } // Convert Memory data metrics.push(Metric::new( "memory_usage_percent".to_string(), MetricValue::Float(agent_data.system.memory.usage_percent), Status::Ok, )); metrics.push(Metric::new( "memory_total_gb".to_string(), MetricValue::Float(agent_data.system.memory.total_gb), Status::Ok, )); metrics.push(Metric::new( "memory_used_gb".to_string(), MetricValue::Float(agent_data.system.memory.used_gb), Status::Ok, )); metrics.push(Metric::new( "memory_available_gb".to_string(), MetricValue::Float(agent_data.system.memory.available_gb), Status::Ok, )); metrics.push(Metric::new( "memory_swap_total_gb".to_string(), MetricValue::Float(agent_data.system.memory.swap_total_gb), Status::Ok, )); metrics.push(Metric::new( "memory_swap_used_gb".to_string(), MetricValue::Float(agent_data.system.memory.swap_used_gb), Status::Ok, )); // Convert tmpfs data for tmpfs in &agent_data.system.memory.tmpfs { if tmpfs.mount == "/tmp" { metrics.push(Metric::new( "memory_tmp_usage_percent".to_string(), MetricValue::Float(tmpfs.usage_percent), Status::Ok, )); metrics.push(Metric::new( "memory_tmp_used_gb".to_string(), MetricValue::Float(tmpfs.used_gb), Status::Ok, )); metrics.push(Metric::new( "memory_tmp_total_gb".to_string(), MetricValue::Float(tmpfs.total_gb), Status::Ok, )); } } // Add agent metadata metrics.push(Metric::new( "agent_version".to_string(), MetricValue::String(agent_data.agent_version.clone()), Status::Ok, )); metrics.push(Metric::new( "agent_heartbeat".to_string(), MetricValue::Integer(agent_data.timestamp as i64), Status::Ok, )); // Convert storage data for drive in &agent_data.system.storage.drives { // Drive-level metrics if let Some(temp) = drive.temperature_celsius { metrics.push(Metric::new( format!("disk_{}_temperature", drive.name), MetricValue::Float(temp), Status::Ok, )); } if let Some(wear) = drive.wear_percent { metrics.push(Metric::new( format!("disk_{}_wear_percent", drive.name), MetricValue::Float(wear), Status::Ok, )); } metrics.push(Metric::new( format!("disk_{}_health", drive.name), MetricValue::String(drive.health.clone()), Status::Ok, )); // Calculate drive totals from all filesystems let total_used: f32 = drive.filesystems.iter().map(|fs| fs.used_gb).sum(); let total_size: f32 = drive.filesystems.iter().map(|fs| fs.total_gb).sum(); let average_usage = if total_size > 0.0 { (total_used / total_size) * 100.0 } else { 0.0 }; // Drive total metrics (aggregated from filesystems) metrics.push(Metric::new( format!("disk_{}_usage_percent", drive.name), MetricValue::Float(average_usage), Status::Ok, )); metrics.push(Metric::new( format!("disk_{}_used_gb", drive.name), MetricValue::Float(total_used), Status::Ok, )); metrics.push(Metric::new( format!("disk_{}_total_gb", drive.name), MetricValue::Float(total_size), Status::Ok, )); metrics.push(Metric::new( format!("disk_{}_pool_type", drive.name), MetricValue::String("drive".to_string()), Status::Ok, )); // Filesystem metrics for fs in &drive.filesystems { let fs_base = format!("disk_{}_fs_{}", drive.name, fs.mount.replace('/', "root")); metrics.push(Metric::new( format!("{}_usage_percent", fs_base), MetricValue::Float(fs.usage_percent), Status::Ok, )); metrics.push(Metric::new( format!("{}_used_gb", fs_base), MetricValue::Float(fs.used_gb), Status::Ok, )); metrics.push(Metric::new( format!("{}_total_gb", fs_base), MetricValue::Float(fs.total_gb), Status::Ok, )); } } // Convert storage pools for pool in &agent_data.system.storage.pools { let pool_base = format!("disk_{}", pool.name); metrics.push(Metric::new( format!("{}_usage_percent", pool_base), MetricValue::Float(pool.usage_percent), Status::Ok, )); metrics.push(Metric::new( format!("{}_used_gb", pool_base), MetricValue::Float(pool.used_gb), Status::Ok, )); metrics.push(Metric::new( format!("{}_total_gb", pool_base), MetricValue::Float(pool.total_gb), Status::Ok, )); metrics.push(Metric::new( format!("{}_pool_type", pool_base), MetricValue::String(pool.pool_type.clone()), Status::Ok, )); metrics.push(Metric::new( format!("{}_mount_point", pool_base), MetricValue::String(pool.mount.clone()), Status::Ok, )); // Pool drive data for drive in &pool.data_drives { if let Some(temp) = drive.temperature_celsius { metrics.push(Metric::new( format!("disk_{}_{}_temperature", pool.name, drive.name), MetricValue::Float(temp), Status::Ok, )); } if let Some(wear) = drive.wear_percent { metrics.push(Metric::new( format!("disk_{}_{}_wear_percent", pool.name, drive.name), MetricValue::Float(wear), Status::Ok, )); } } for drive in &pool.parity_drives { if let Some(temp) = drive.temperature_celsius { metrics.push(Metric::new( format!("disk_{}_{}_temperature", pool.name, drive.name), MetricValue::Float(temp), Status::Ok, )); } if let Some(wear) = drive.wear_percent { metrics.push(Metric::new( format!("disk_{}_{}_wear_percent", pool.name, drive.name), MetricValue::Float(wear), Status::Ok, )); } } } // Convert service data for service in &agent_data.services { let service_base = format!("service_{}", service.name); metrics.push(Metric::new( format!("{}_status", service_base), MetricValue::String(service.status.clone()), Status::Ok, )); metrics.push(Metric::new( format!("{}_memory_mb", service_base), MetricValue::Float(service.memory_mb), Status::Ok, )); metrics.push(Metric::new( format!("{}_disk_gb", service_base), MetricValue::Float(service.disk_gb), Status::Ok, )); if service.user_stopped { metrics.push(Metric::new( format!("{}_user_stopped", service_base), MetricValue::Boolean(true), Status::Ok, )); } } // Convert backup data metrics.push(Metric::new( "backup_status".to_string(), MetricValue::String(agent_data.backup.status.clone()), Status::Ok, )); if let Some(last_run) = agent_data.backup.last_run { metrics.push(Metric::new( "backup_last_run_timestamp".to_string(), MetricValue::Integer(last_run as i64), Status::Ok, )); } if let Some(next_scheduled) = agent_data.backup.next_scheduled { metrics.push(Metric::new( "backup_next_scheduled_timestamp".to_string(), MetricValue::Integer(next_scheduled as i64), Status::Ok, )); } if let Some(size) = agent_data.backup.total_size_gb { metrics.push(Metric::new( "backup_size_gb".to_string(), MetricValue::Float(size), Status::Ok, )); } if let Some(health) = &agent_data.backup.repository_health { metrics.push(Metric::new( "backup_repository_health".to_string(), MetricValue::String(health.clone()), Status::Ok, )); } metrics } /// Get current metric for a specific host pub fn get_metric(&self, hostname: &str, metric_name: &str) -> Option<&Metric> { self.current_metrics.get(hostname)?.get(metric_name) } /// Get all current metrics for a host as a vector pub fn get_metrics_for_host(&self, hostname: &str) -> Vec<&Metric> { if let Some(metrics_map) = self.current_metrics.get(hostname) { metrics_map.values().collect() } else { Vec::new() } } /// Get connected hosts (hosts with recent heartbeats) pub fn get_connected_hosts(&self, timeout: Duration) -> Vec { let now = Instant::now(); self.last_heartbeat .iter() .filter_map(|(hostname, &last_heartbeat)| { if now.duration_since(last_heartbeat) <= timeout { Some(hostname.clone()) } else { debug!("Host {} considered offline - last heartbeat was {:?} ago", hostname, now.duration_since(last_heartbeat)); None } }) .collect() } /// Clean up data for offline hosts pub fn cleanup_offline_hosts(&mut self, timeout: Duration) { let now = Instant::now(); let mut hosts_to_cleanup = Vec::new(); // Find hosts that are offline (no recent heartbeat) for (hostname, &last_heartbeat) in &self.last_heartbeat { if now.duration_since(last_heartbeat) > timeout { hosts_to_cleanup.push(hostname.clone()); } } // Clear metrics for offline hosts for hostname in hosts_to_cleanup { if let Some(metrics) = self.current_metrics.remove(&hostname) { info!("Cleared {} metrics for offline host: {}", metrics.len(), hostname); } // Keep heartbeat timestamp for reconnection detection // Don't remove from last_heartbeat to track when host was last seen } } /// Cleanup old data and enforce limits fn cleanup_host_data(&mut self, hostname: &str) { let now = Instant::now(); // Cleanup historical data if let Some(history) = self.historical_metrics.get_mut(hostname) { // Remove old entries history.retain(|dp| now.duration_since(dp.received_at) <= self.history_retention); // Enforce size limit if history.len() > self.max_metrics_per_host { let excess = history.len() - self.max_metrics_per_host; history.drain(0..excess); warn!( "Trimmed {} old metrics for host {} (size limit: {})", excess, hostname, self.max_metrics_per_host ); } } } /// Get agent versions from all hosts for cross-host comparison pub fn get_agent_versions(&self) -> HashMap { let mut versions = HashMap::new(); for (hostname, metrics) in &self.current_metrics { if let Some(version_metric) = metrics.get("agent_version") { if let cm_dashboard_shared::MetricValue::String(version) = &version_metric.value { versions.insert(hostname.clone(), version.clone()); } } } versions } /// Check for agent version mismatches across hosts pub fn get_version_mismatches(&self) -> Option<(String, Vec)> { let versions = self.get_agent_versions(); if versions.len() < 2 { return None; // Need at least 2 hosts to compare } // Find the most common version (assume it's the "current" version) let mut version_counts = HashMap::new(); for version in versions.values() { *version_counts.entry(version.clone()).or_insert(0) += 1; } let most_common_version = version_counts .iter() .max_by_key(|(_, count)| *count) .map(|(version, _)| version.clone())?; // Find hosts with different versions let outdated_hosts: Vec = versions .iter() .filter(|(_, version)| *version != &most_common_version) .map(|(hostname, _)| hostname.clone()) .collect(); if outdated_hosts.is_empty() { None } else { Some((most_common_version, outdated_hosts)) } } }