All checks were successful
Build and Release / build-and-release (push) Successful in 2m9s
Bump version across all workspace crates for next release including agent, dashboard, and shared components.
174 lines
5.8 KiB
Rust
174 lines
5.8 KiB
Rust
use cm_dashboard_shared::AgentData;
|
|
use std::collections::HashMap;
|
|
use std::time::{Duration, Instant};
|
|
use tracing::{debug, info, warn};
|
|
|
|
use super::MetricDataPoint;
|
|
|
|
/// Central metric storage for the dashboard
|
|
pub struct MetricStore {
|
|
/// Current structured data: hostname -> AgentData
|
|
current_agent_data: HashMap<String, AgentData>,
|
|
/// Historical metrics for trending
|
|
historical_metrics: HashMap<String, Vec<MetricDataPoint>>,
|
|
/// Last heartbeat timestamp per host
|
|
last_heartbeat: HashMap<String, Instant>,
|
|
/// Configuration
|
|
max_metrics_per_host: usize,
|
|
history_retention: Duration,
|
|
}
|
|
|
|
impl MetricStore {
|
|
pub fn new(max_metrics_per_host: usize, history_retention_hours: u64) -> Self {
|
|
Self {
|
|
current_agent_data: HashMap::new(),
|
|
historical_metrics: HashMap::new(),
|
|
last_heartbeat: HashMap::new(),
|
|
max_metrics_per_host,
|
|
history_retention: Duration::from_secs(history_retention_hours * 3600),
|
|
}
|
|
}
|
|
|
|
|
|
/// Store structured agent data directly
|
|
pub fn store_agent_data(&mut self, agent_data: AgentData) {
|
|
let now = Instant::now();
|
|
let hostname = agent_data.hostname.clone();
|
|
|
|
debug!("Storing structured data for host {}", hostname);
|
|
|
|
// Store the structured data directly
|
|
self.current_agent_data.insert(hostname.clone(), agent_data);
|
|
|
|
// Update heartbeat timestamp
|
|
self.last_heartbeat.insert(hostname.clone(), now);
|
|
debug!("Updated heartbeat for host {}", hostname);
|
|
|
|
// Add to history
|
|
let host_history = self
|
|
.historical_metrics
|
|
.entry(hostname.clone())
|
|
.or_insert_with(Vec::new);
|
|
host_history.push(MetricDataPoint { received_at: now });
|
|
|
|
// Cleanup old data
|
|
self.cleanup_host_data(&hostname);
|
|
|
|
info!("Stored structured data for {}", hostname);
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Get current structured data for a host
|
|
pub fn get_agent_data(&self, hostname: &str) -> Option<&AgentData> {
|
|
self.current_agent_data.get(hostname)
|
|
}
|
|
|
|
|
|
/// Get connected hosts (hosts with recent heartbeats)
|
|
pub fn get_connected_hosts(&self, timeout: Duration) -> Vec<String> {
|
|
let now = Instant::now();
|
|
|
|
self.last_heartbeat
|
|
.iter()
|
|
.filter_map(|(hostname, &last_heartbeat)| {
|
|
if now.duration_since(last_heartbeat) <= timeout {
|
|
Some(hostname.clone())
|
|
} else {
|
|
debug!("Host {} considered offline - last heartbeat was {:?} ago",
|
|
hostname, now.duration_since(last_heartbeat));
|
|
None
|
|
}
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
/// Clean up data for offline hosts
|
|
pub fn cleanup_offline_hosts(&mut self, timeout: Duration) {
|
|
let now = Instant::now();
|
|
let mut hosts_to_cleanup = Vec::new();
|
|
|
|
// Find hosts that are offline (no recent heartbeat)
|
|
for (hostname, &last_heartbeat) in &self.last_heartbeat {
|
|
if now.duration_since(last_heartbeat) > timeout {
|
|
hosts_to_cleanup.push(hostname.clone());
|
|
}
|
|
}
|
|
|
|
// Clear data for offline hosts
|
|
for hostname in hosts_to_cleanup {
|
|
if let Some(_agent_data) = self.current_agent_data.remove(&hostname) {
|
|
info!("Cleared structured data for offline host: {}", hostname);
|
|
}
|
|
// Keep heartbeat timestamp for reconnection detection
|
|
// Don't remove from last_heartbeat to track when host was last seen
|
|
}
|
|
}
|
|
|
|
/// Cleanup old data and enforce limits
|
|
fn cleanup_host_data(&mut self, hostname: &str) {
|
|
let now = Instant::now();
|
|
|
|
// Cleanup historical data
|
|
if let Some(history) = self.historical_metrics.get_mut(hostname) {
|
|
// Remove old entries
|
|
history.retain(|dp| now.duration_since(dp.received_at) <= self.history_retention);
|
|
|
|
// Enforce size limit
|
|
if history.len() > self.max_metrics_per_host {
|
|
let excess = history.len() - self.max_metrics_per_host;
|
|
history.drain(0..excess);
|
|
warn!(
|
|
"Trimmed {} old metrics for host {} (size limit: {})",
|
|
excess, hostname, self.max_metrics_per_host
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Get agent versions from all hosts for cross-host comparison
|
|
pub fn get_agent_versions(&self) -> HashMap<String, String> {
|
|
let mut versions = HashMap::new();
|
|
|
|
for (hostname, agent_data) in &self.current_agent_data {
|
|
versions.insert(hostname.clone(), agent_data.agent_version.clone());
|
|
}
|
|
|
|
versions
|
|
}
|
|
|
|
/// Check for agent version mismatches across hosts
|
|
pub fn get_version_mismatches(&self) -> Option<(String, Vec<String>)> {
|
|
let versions = self.get_agent_versions();
|
|
|
|
if versions.len() < 2 {
|
|
return None; // Need at least 2 hosts to compare
|
|
}
|
|
|
|
// Find the most common version (assume it's the "current" version)
|
|
let mut version_counts = HashMap::new();
|
|
for version in versions.values() {
|
|
*version_counts.entry(version.clone()).or_insert(0) += 1;
|
|
}
|
|
|
|
let most_common_version = version_counts
|
|
.iter()
|
|
.max_by_key(|(_, count)| *count)
|
|
.map(|(version, _)| version.clone())?;
|
|
|
|
// Find hosts with different versions
|
|
let outdated_hosts: Vec<String> = versions
|
|
.iter()
|
|
.filter(|(_, version)| *version != &most_common_version)
|
|
.map(|(hostname, _)| hostname.clone())
|
|
.collect();
|
|
|
|
if outdated_hosts.is_empty() {
|
|
None
|
|
} else {
|
|
Some((most_common_version, outdated_hosts))
|
|
}
|
|
}
|
|
}
|