use cm_dashboard_shared::AgentData; use std::collections::HashMap; use std::time::{Duration, Instant}; use tracing::{debug, info, warn}; use super::MetricDataPoint; /// ZMQ communication statistics per host #[derive(Debug, Clone)] pub struct ZmqStats { pub packets_received: u64, pub last_packet_time: Instant, pub last_packet_age_secs: f64, } /// Central metric storage for the dashboard pub struct MetricStore { /// Current structured data: hostname -> AgentData current_agent_data: HashMap, /// Historical metrics for trending historical_metrics: HashMap>, /// Last heartbeat timestamp per host last_heartbeat: HashMap, /// ZMQ communication statistics per host zmq_stats: HashMap, /// Configuration max_metrics_per_host: usize, history_retention: Duration, } impl MetricStore { pub fn new(max_metrics_per_host: usize, history_retention_hours: u64) -> Self { Self { current_agent_data: HashMap::new(), historical_metrics: HashMap::new(), last_heartbeat: HashMap::new(), zmq_stats: HashMap::new(), max_metrics_per_host, history_retention: Duration::from_secs(history_retention_hours * 3600), } } /// Store structured agent data directly pub fn store_agent_data(&mut self, agent_data: AgentData) { let now = Instant::now(); let hostname = agent_data.hostname.clone(); debug!("Storing structured data for host {}", hostname); // Store the structured data directly self.current_agent_data.insert(hostname.clone(), agent_data); // Update heartbeat timestamp self.last_heartbeat.insert(hostname.clone(), now); debug!("Updated heartbeat for host {}", hostname); // Update ZMQ stats let stats = self.zmq_stats.entry(hostname.clone()).or_insert(ZmqStats { packets_received: 0, last_packet_time: now, last_packet_age_secs: 0.0, }); stats.packets_received += 1; stats.last_packet_time = now; stats.last_packet_age_secs = 0.0; // Just received // Add to history let host_history = self .historical_metrics .entry(hostname.clone()) .or_insert_with(Vec::new); host_history.push(MetricDataPoint { received_at: now }); // Cleanup old data self.cleanup_host_data(&hostname); info!("Stored structured data for {}", hostname); } /// Get current structured data for a host pub fn get_agent_data(&self, hostname: &str) -> Option<&AgentData> { self.current_agent_data.get(hostname) } /// Get connected hosts (hosts with recent heartbeats) pub fn get_connected_hosts(&self, timeout: Duration) -> Vec { let now = Instant::now(); self.last_heartbeat .iter() .filter_map(|(hostname, &last_heartbeat)| { if now.duration_since(last_heartbeat) <= timeout { Some(hostname.clone()) } else { debug!("Host {} considered offline - last heartbeat was {:?} ago", hostname, now.duration_since(last_heartbeat)); None } }) .collect() } /// Clean up data for offline hosts pub fn cleanup_offline_hosts(&mut self, timeout: Duration) { let now = Instant::now(); let mut hosts_to_cleanup = Vec::new(); // Find hosts that are offline (no recent heartbeat) for (hostname, &last_heartbeat) in &self.last_heartbeat { if now.duration_since(last_heartbeat) > timeout { hosts_to_cleanup.push(hostname.clone()); } } // Clear data for offline hosts for hostname in hosts_to_cleanup { if let Some(_agent_data) = self.current_agent_data.remove(&hostname) { info!("Cleared structured data for offline host: {}", hostname); } // Keep heartbeat timestamp for reconnection detection // Don't remove from last_heartbeat to track when host was last seen } } /// Cleanup old data and enforce limits fn cleanup_host_data(&mut self, hostname: &str) { let now = Instant::now(); // Cleanup historical data if let Some(history) = self.historical_metrics.get_mut(hostname) { // Remove old entries history.retain(|dp| now.duration_since(dp.received_at) <= self.history_retention); // Enforce size limit if history.len() > self.max_metrics_per_host { let excess = history.len() - self.max_metrics_per_host; history.drain(0..excess); warn!( "Trimmed {} old metrics for host {} (size limit: {})", excess, hostname, self.max_metrics_per_host ); } } } /// Get agent versions from all hosts for cross-host comparison pub fn get_agent_versions(&self) -> HashMap { let mut versions = HashMap::new(); for (hostname, agent_data) in &self.current_agent_data { versions.insert(hostname.clone(), agent_data.agent_version.clone()); } versions } /// Check for agent version mismatches across hosts pub fn get_version_mismatches(&self) -> Option<(String, Vec)> { let versions = self.get_agent_versions(); if versions.len() < 2 { return None; // Need at least 2 hosts to compare } // Find the most common version (assume it's the "current" version) let mut version_counts = HashMap::new(); for version in versions.values() { *version_counts.entry(version.clone()).or_insert(0) += 1; } let most_common_version = version_counts .iter() .max_by_key(|(_, count)| *count) .map(|(version, _)| version.clone())?; // Find hosts with different versions let outdated_hosts: Vec = versions .iter() .filter(|(_, version)| *version != &most_common_version) .map(|(hostname, _)| hostname.clone()) .collect(); if outdated_hosts.is_empty() { None } else { Some((most_common_version, outdated_hosts)) } } }