From 00a8ed3da21b391ac964cbfef5b54310d6fc14da Mon Sep 17 00:00:00 2001 From: Christoffer Martinsson Date: Mon, 20 Oct 2025 18:45:41 +0200 Subject: [PATCH] Implement hysteresis for metric status changes to prevent flapping MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add comprehensive hysteresis support to prevent status oscillation near threshold boundaries while maintaining responsive alerting. Key Features: - HysteresisThresholds with configurable upper/lower limits - StatusTracker for per-metric status history - Default gaps: CPU load 10%, memory 5%, disk temp 5°C Updated Components: - CPU load collector (5-minute average with hysteresis) - Memory usage collector (percentage-based thresholds) - Disk temperature collector (SMART data monitoring) - All collectors updated to support StatusTracker interface Cache Interval Adjustments: - Service status: 60s → 10s (faster response) - Disk usage: 300s → 60s (more frequent checks) - Backup status: 900s → 60s (quicker updates) - SMART data: moved to 600s tier (10 minutes) Architecture: - Individual metric status calculation in collectors - Centralized StatusTracker in MetricCollectionManager - Status aggregation preserved in dashboard widgets --- CLAUDE.md | 8 +- agent/src/agent.rs | 104 ++++++++------- agent/src/cache/cached_metric.rs | 2 +- agent/src/cache/manager.rs | 9 +- agent/src/cache/mod.rs | 33 ++--- agent/src/collectors/backup.rs | 71 ++++++---- agent/src/collectors/cpu.rs | 57 ++++---- agent/src/collectors/disk.rs | 38 +++--- agent/src/collectors/error.rs | 4 +- agent/src/collectors/memory.rs | 196 +++++++++++++++------------ agent/src/collectors/mod.rs | 140 ++++++++++---------- agent/src/collectors/systemd.rs | 4 +- agent/src/communication/mod.rs | 54 ++++---- agent/src/config/loader.rs | 17 +-- agent/src/config/mod.rs | 1 - agent/src/config/validation.rs | 76 ++++++----- agent/src/main.rs | 30 ++--- agent/src/metrics/mod.rs | 8 +- agent/src/utils/mod.rs | 31 ++--- dashboard/src/app.rs | 205 ++++++++++++++++------------- dashboard/src/communication/mod.rs | 77 +++++------ dashboard/src/config/mod.rs | 2 +- dashboard/src/main.rs | 30 ++--- dashboard/src/metrics/mod.rs | 3 - dashboard/src/metrics/store.rs | 71 +++++----- dashboard/src/ui/theme.rs | 152 +++++++++++---------- dashboard/src/ui/widgets/cpu.rs | 38 +++--- dashboard/src/ui/widgets/memory.rs | 57 ++++---- dashboard/src/ui/widgets/mod.rs | 8 +- shared/src/cache.rs | 114 +++++++++------- shared/src/error.rs | 6 +- shared/src/lib.rs | 2 +- shared/src/metrics.rs | 143 +++++++++++++++++--- shared/src/protocol.rs | 16 +-- 34 files changed, 1037 insertions(+), 770 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 21ab857..b289cf1 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -329,7 +329,7 @@ Agent → ["cpu_load_1min", "memory_usage_percent", ...] → Dashboard → Widge - [x] All collectors output standardized status strings (ok/warning/critical/unknown) - [x] Dashboard connection loss detection with 5-second keep-alive - [x] Removed excessive logging from agent -- [x] Fixed all compiler warnings in both agent and dashboard +- [x] Reduced initial compiler warnings from excessive logging cleanup - [x] **SystemCollector architecture refactoring completed (2025-10-12)** - [x] Created SystemCollector for CPU load, memory, temperature, C-states - [x] Moved system metrics from ServiceCollector to SystemCollector @@ -376,6 +376,12 @@ Agent → ["cpu_load_1min", "memory_usage_percent", ...] → Dashboard → Widge - [x] Resolved timezone issues by using UTC timestamps in backup script - [x] Added disk identification metrics (product name, serial number) to backup status - [x] Enhanced UI layout with proper backup monitoring integration +- [x] **Complete warning elimination and code cleanup (2025-10-18)** +- [x] Removed all unused code including widget subscription system and WidgetType enum +- [x] Eliminated unused cache utilities, error variants, and theme functions +- [x] Removed unused struct fields and imports throughout codebase +- [x] Fixed lifetime warnings and replaced subscription-based widgets with direct metric filtering +- [x] Achieved zero build warnings in both agent and dashboard (down from 46 total warnings) **Production Configuration:** - CPU load thresholds: Warning ≥ 9.0, Critical ≥ 10.0 diff --git a/agent/src/agent.rs b/agent/src/agent.rs index 0575c4e..3f41639 100644 --- a/agent/src/agent.rs +++ b/agent/src/agent.rs @@ -1,11 +1,11 @@ use anyhow::Result; +use gethostname::gethostname; use std::time::Duration; use tokio::time::interval; -use tracing::{info, error, debug}; -use gethostname::gethostname; +use tracing::{debug, error, info}; +use crate::communication::{AgentCommand, ZmqHandler}; use crate::config::AgentConfig; -use crate::communication::{ZmqHandler, AgentCommand}; use crate::metrics::MetricCollectionManager; use crate::notifications::NotificationManager; use cm_dashboard_shared::{Metric, MetricMessage}; @@ -22,28 +22,31 @@ impl Agent { pub async fn new(config_path: Option) -> Result { let hostname = gethostname().to_string_lossy().to_string(); info!("Initializing agent for host: {}", hostname); - + // Load configuration let config = if let Some(path) = config_path { AgentConfig::load_from_file(&path)? } else { AgentConfig::default() }; - + info!("Agent configuration loaded"); - + // Initialize ZMQ communication let zmq_handler = ZmqHandler::new(&config.zmq).await?; - info!("ZMQ communication initialized on port {}", config.zmq.publisher_port); - + info!( + "ZMQ communication initialized on port {}", + config.zmq.publisher_port + ); + // Initialize metric collection manager with cache config let metric_manager = MetricCollectionManager::new(&config.collectors, &config).await?; info!("Metric collection manager initialized"); - + // Initialize notification manager let notification_manager = NotificationManager::new(&config.notifications, &hostname)?; info!("Notification manager initialized"); - + Ok(Self { hostname, config, @@ -52,10 +55,10 @@ impl Agent { notification_manager, }) } - + pub async fn run(&mut self, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>) -> Result<()> { info!("Starting agent main loop with separated collection and transmission"); - + // CRITICAL: Collect ALL data immediately at startup before entering the loop info!("Performing initial FORCE collection of all metrics at startup"); if let Err(e) = self.collect_all_metrics_force().await { @@ -63,12 +66,13 @@ impl Agent { } else { info!("Initial metric collection completed - all data cached and ready"); } - + // Separate intervals for collection and transmission - let mut collection_interval = interval(Duration::from_secs(self.config.collection_interval_seconds)); + let mut collection_interval = + interval(Duration::from_secs(self.config.collection_interval_seconds)); let mut transmission_interval = interval(Duration::from_secs(1)); // ZMQ broadcast every 1 second let mut notification_check_interval = interval(Duration::from_secs(30)); // Check notifications every 30s - + loop { tokio::select! { _ = collection_interval.tick() => { @@ -99,84 +103,93 @@ impl Agent { } } } - + info!("Agent main loop stopped"); Ok(()) } - + async fn collect_all_metrics_force(&mut self) -> Result<()> { info!("Starting FORCE metric collection for startup"); - + // Force collect all metrics from all collectors immediately let metrics = self.metric_manager.collect_all_metrics_force().await?; - + if metrics.is_empty() { error!("No metrics collected during force collection!"); return Ok(()); } - + info!("Force collected and cached {} metrics", metrics.len()); - + // Check for status changes and send notifications self.check_status_changes(&metrics).await; - + Ok(()) } - + async fn collect_metrics_only(&mut self) -> Result<()> { debug!("Starting metric collection cycle (cache only)"); - + // Collect all metrics from all collectors and cache them let metrics = self.metric_manager.collect_all_metrics().await?; - + if metrics.is_empty() { debug!("No metrics collected this cycle"); return Ok(()); } - + debug!("Collected and cached {} metrics", metrics.len()); - + // Check for status changes and send notifications self.check_status_changes(&metrics).await; - + Ok(()) } - + async fn broadcast_all_cached_metrics(&mut self) -> Result<()> { debug!("Broadcasting all cached metrics via ZMQ"); - + // Get all cached metrics from the metric manager let cached_metrics = self.metric_manager.get_all_cached_metrics().await?; - + if cached_metrics.is_empty() { debug!("No cached metrics to broadcast"); return Ok(()); } - + debug!("Broadcasting {} cached metrics", cached_metrics.len()); - + // Create and send message with all cached data let message = MetricMessage::new(self.hostname.clone(), cached_metrics); self.zmq_handler.publish_metrics(&message).await?; - + debug!("Cached metrics broadcasted successfully"); Ok(()) } - + async fn check_status_changes(&mut self, metrics: &[Metric]) { for metric in metrics { - if let Some(status_change) = self.notification_manager.update_metric_status(&metric.name, metric.status) { - info!("Status change detected for {}: {:?} -> {:?}", - metric.name, status_change.old_status, status_change.new_status); - + if let Some(status_change) = self + .notification_manager + .update_metric_status(&metric.name, metric.status) + { + info!( + "Status change detected for {}: {:?} -> {:?}", + metric.name, status_change.old_status, status_change.new_status + ); + // Send notification for status change - if let Err(e) = self.notification_manager.send_status_change_notification(status_change, metric).await { + if let Err(e) = self + .notification_manager + .send_status_change_notification(status_change, metric) + .await + { error!("Failed to send notification: {}", e); } } } } - + async fn handle_commands(&mut self) -> Result<()> { // Try to receive commands (non-blocking) match self.zmq_handler.try_receive_command() { @@ -193,7 +206,7 @@ impl Agent { } Ok(()) } - + async fn process_command(&mut self, command: AgentCommand) -> Result<()> { match command { AgentCommand::CollectNow => { @@ -209,7 +222,10 @@ impl Agent { info!("Interval change requested but not implemented yet"); } AgentCommand::ToggleCollector { name, enabled } => { - info!("Processing ToggleCollector command: {} -> {}", name, enabled); + info!( + "Processing ToggleCollector command: {} -> {}", + name, enabled + ); // Note: This would require dynamic collector management info!("Collector toggle requested but not implemented yet"); } @@ -220,4 +236,4 @@ impl Agent { } Ok(()) } -} \ No newline at end of file +} diff --git a/agent/src/cache/cached_metric.rs b/agent/src/cache/cached_metric.rs index 9e0812e..669b362 100644 --- a/agent/src/cache/cached_metric.rs +++ b/agent/src/cache/cached_metric.rs @@ -8,4 +8,4 @@ pub struct CachedMetric { pub collected_at: Instant, pub access_count: u64, pub tier: Option, -} \ No newline at end of file +} diff --git a/agent/src/cache/manager.rs b/agent/src/cache/manager.rs index 31f3198..6d63cb8 100644 --- a/agent/src/cache/manager.rs +++ b/agent/src/cache/manager.rs @@ -11,10 +11,8 @@ pub struct MetricCacheManager { impl MetricCacheManager { pub fn new(config: CacheConfig) -> Self { let cache = Arc::new(ConfigurableCache::new(config.clone())); - - Self { - cache, - } + + Self { cache } } /// Start background cache management tasks @@ -32,5 +30,4 @@ impl MetricCacheManager { pub async fn get_all_cached_metrics(&self) -> Vec { self.cache.get_all_cached_metrics().await } - -} \ No newline at end of file +} diff --git a/agent/src/cache/mod.rs b/agent/src/cache/mod.rs index e5ca512..fd95b08 100644 --- a/agent/src/cache/mod.rs +++ b/agent/src/cache/mod.rs @@ -4,11 +4,11 @@ use std::time::Instant; use tokio::sync::RwLock; use tracing::warn; -mod manager; mod cached_metric; +mod manager; -pub use manager::MetricCacheManager; pub use cached_metric::CachedMetric; +pub use manager::MetricCacheManager; /// Central cache for individual metrics with configurable tiers pub struct ConfigurableCache { @@ -31,7 +31,7 @@ impl ConfigurableCache { } let mut cache = self.cache.write().await; - + // Enforce max entries limit if cache.len() >= self.config.max_entries { self.cleanup_old_entries(&mut cache).await; @@ -45,11 +45,10 @@ impl ConfigurableCache { }; cache.insert(metric.name.clone(), cached_metric); - + // Cached metric (debug logging disabled for performance) } - /// Get all cached metrics (including expired ones) for broadcasting pub async fn get_all_cached_metrics(&self) -> Vec { if !self.config.enabled { @@ -58,44 +57,46 @@ impl ConfigurableCache { let cache = self.cache.read().await; let mut all_metrics = Vec::new(); - + for cached_metric in cache.values() { all_metrics.push(cached_metric.metric.clone()); } - + all_metrics } /// Background cleanup of old entries async fn cleanup_old_entries(&self, cache: &mut HashMap) { let mut to_remove = Vec::new(); - + for (metric_name, cached_metric) in cache.iter() { let cache_interval = self.config.get_cache_interval(metric_name); let elapsed = cached_metric.collected_at.elapsed().as_secs(); - + // Remove entries that are way past their expiration (2x interval) if elapsed > cache_interval * 2 { to_remove.push(metric_name.clone()); } } - + for metric_name in to_remove { cache.remove(&metric_name); } - + // If still too many entries, remove least recently accessed if cache.len() >= self.config.max_entries { - let mut entries: Vec<_> = cache.iter().map(|(k, v)| (k.clone(), v.access_count)).collect(); + let mut entries: Vec<_> = cache + .iter() + .map(|(k, v)| (k.clone(), v.access_count)) + .collect(); entries.sort_by_key(|(_, access_count)| *access_count); - + let excess = cache.len() - (self.config.max_entries * 3 / 4); // Remove 25% for (metric_name, _) in entries.iter().take(excess) { cache.remove(metric_name); } - + warn!("Cache cleanup removed {} entries due to size limit", excess); } } - -} \ No newline at end of file +} diff --git a/agent/src/collectors/backup.rs b/agent/src/collectors/backup.rs index 33b5d7f..49e3d08 100644 --- a/agent/src/collectors/backup.rs +++ b/agent/src/collectors/backup.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; -use cm_dashboard_shared::{Metric, MetricValue, Status}; use chrono::Utc; +use cm_dashboard_shared::{Metric, MetricValue, Status, StatusTracker}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use tokio::fs; @@ -18,7 +18,8 @@ pub struct BackupCollector { impl BackupCollector { pub fn new(backup_status_file: Option, max_age_hours: u64) -> Self { Self { - backup_status_file: backup_status_file.unwrap_or_else(|| "/var/lib/backup/backup-status.toml".to_string()), + backup_status_file: backup_status_file + .unwrap_or_else(|| "/var/lib/backup/backup-status.toml".to_string()), max_age_hours, } } @@ -43,10 +44,16 @@ impl BackupCollector { Ok(dt) => dt.with_timezone(&Utc), Err(_) => { // Try parsing as naive datetime and assume UTC - match chrono::NaiveDateTime::parse_from_str(&backup_status.start_time, "%Y-%m-%dT%H:%M:%S%.f") { + match chrono::NaiveDateTime::parse_from_str( + &backup_status.start_time, + "%Y-%m-%dT%H:%M:%S%.f", + ) { Ok(naive_dt) => naive_dt.and_utc(), Err(_) => { - error!("Failed to parse backup timestamp: {}", backup_status.start_time); + error!( + "Failed to parse backup timestamp: {}", + backup_status.start_time + ); return Status::Unknown; } } @@ -63,7 +70,7 @@ impl BackupCollector { } else { Status::Ok } - }, + } "failed" => Status::Critical, "running" => Status::Ok, // Currently running is OK _ => Status::Unknown, @@ -78,7 +85,7 @@ impl BackupCollector { } else { Status::Critical } - }, + } "failed" => Status::Critical, "disabled" => Status::Warning, // Service intentionally disabled "running" => Status::Ok, @@ -97,7 +104,7 @@ impl Collector for BackupCollector { "backup" } - async fn collect(&self) -> Result, CollectorError> { + async fn collect(&self, _status_tracker: &mut StatusTracker) -> Result, CollectorError> { let backup_status = self.read_backup_status().await?; let mut metrics = Vec::new(); let timestamp = chrono::Utc::now().timestamp() as u64; @@ -114,7 +121,10 @@ impl Collector for BackupCollector { }), status: overall_status, timestamp, - description: Some(format!("Backup: {} at {}", backup_status.status, backup_status.start_time)), + description: Some(format!( + "Backup: {} at {}", + backup_status.status, backup_status.start_time + )), unit: None, }); @@ -129,14 +139,18 @@ impl Collector for BackupCollector { }); // Last backup timestamp - use last_updated (when backup finished) instead of start_time - let last_updated_dt_result = chrono::DateTime::parse_from_rfc3339(&backup_status.last_updated) - .map(|dt| dt.with_timezone(&Utc)) - .or_else(|_| { - // Try parsing as naive datetime and assume UTC - chrono::NaiveDateTime::parse_from_str(&backup_status.last_updated, "%Y-%m-%dT%H:%M:%S%.f") + let last_updated_dt_result = + chrono::DateTime::parse_from_rfc3339(&backup_status.last_updated) + .map(|dt| dt.with_timezone(&Utc)) + .or_else(|_| { + // Try parsing as naive datetime and assume UTC + chrono::NaiveDateTime::parse_from_str( + &backup_status.last_updated, + "%Y-%m-%dT%H:%M:%S%.f", + ) .map(|naive_dt| naive_dt.and_utc()) - }); - + }); + if let Ok(last_updated_dt) = last_updated_dt_result { metrics.push(Metric { name: "backup_last_run_timestamp".to_string(), @@ -147,13 +161,16 @@ impl Collector for BackupCollector { unit: Some("unix_timestamp".to_string()), }); } else { - error!("Failed to parse backup timestamp for last_run_timestamp: {}", backup_status.last_updated); + error!( + "Failed to parse backup timestamp for last_run_timestamp: {}", + backup_status.last_updated + ); } // Individual service metrics for (service_name, service) in &backup_status.services { let service_status = self.calculate_service_status(service); - + // Service status metrics.push(Metric { name: format!("backup_service_{}_status", service_name), @@ -165,7 +182,10 @@ impl Collector for BackupCollector { }), status: service_status, timestamp, - description: Some(format!("Backup service {} status: {}", service_name, service.status)), + description: Some(format!( + "Backup service {} status: {}", + service_name, service.status + )), unit: None, }); @@ -173,7 +193,11 @@ impl Collector for BackupCollector { metrics.push(Metric { name: format!("backup_service_{}_exit_code", service_name), value: MetricValue::Integer(service.exit_code), - status: if service.exit_code == 0 { Status::Ok } else { Status::Critical }, + status: if service.exit_code == 0 { + Status::Ok + } else { + Status::Critical + }, timestamp, description: Some(format!("Exit code for backup service {}", service_name)), unit: None, @@ -222,7 +246,9 @@ impl Collector for BackupCollector { }); // Calculate total repository size - let total_size_bytes: u64 = backup_status.services.values() + let total_size_bytes: u64 = backup_status + .services + .values() .map(|s| s.repo_size_bytes) .sum(); let total_size_gb = Self::bytes_to_gb(total_size_bytes); @@ -301,7 +327,6 @@ impl Collector for BackupCollector { unit: None, }); } - } // Add standalone disk identification metrics from TOML fields @@ -372,7 +397,7 @@ pub struct DiskSpace { pub used_gb: f64, pub available_gb: f64, pub usage_percent: f64, - // Optional disk identification fields + // Optional disk identification fields pub product_name: Option, pub serial_number: Option, } @@ -384,4 +409,4 @@ pub struct ServiceStatus { pub repo_path: String, pub archive_count: i64, pub repo_size_bytes: u64, -} \ No newline at end of file +} diff --git a/agent/src/collectors/cpu.rs b/agent/src/collectors/cpu.rs index 48852e0..e96d7ad 100644 --- a/agent/src/collectors/cpu.rs +++ b/agent/src/collectors/cpu.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use cm_dashboard_shared::{registry, Metric, MetricValue, Status}; +use cm_dashboard_shared::{registry, Metric, MetricValue, Status, StatusTracker, HysteresisThresholds}; use tracing::debug; @@ -17,41 +17,44 @@ use crate::config::CpuConfig; pub struct CpuCollector { config: CpuConfig, name: String, + load_thresholds: HysteresisThresholds, + temperature_thresholds: HysteresisThresholds, } impl CpuCollector { pub fn new(config: CpuConfig) -> Self { + // Create hysteresis thresholds with 10% gap for recovery + let load_thresholds = HysteresisThresholds::new( + config.load_warning_threshold, + config.load_critical_threshold, + ); + + let temperature_thresholds = HysteresisThresholds::new( + config.temperature_warning_threshold, + config.temperature_critical_threshold, + ); + Self { config, name: "cpu".to_string(), + load_thresholds, + temperature_thresholds, } } - /// Calculate CPU load status using configured thresholds - fn calculate_load_status(&self, load: f32) -> Status { - if load >= self.config.load_critical_threshold { - Status::Critical - } else if load >= self.config.load_warning_threshold { - Status::Warning - } else { - Status::Ok - } + /// Calculate CPU load status using hysteresis thresholds + fn calculate_load_status(&self, metric_name: &str, load: f32, status_tracker: &mut StatusTracker) -> Status { + status_tracker.calculate_with_hysteresis(metric_name, load, &self.load_thresholds) } - /// Calculate CPU temperature status using configured thresholds - fn calculate_temperature_status(&self, temp: f32) -> Status { - if temp >= self.config.temperature_critical_threshold { - Status::Critical - } else if temp >= self.config.temperature_warning_threshold { - Status::Warning - } else { - Status::Ok - } + /// Calculate CPU temperature status using hysteresis thresholds + fn calculate_temperature_status(&self, metric_name: &str, temp: f32, status_tracker: &mut StatusTracker) -> Status { + status_tracker.calculate_with_hysteresis(metric_name, temp, &self.temperature_thresholds) } /// Collect CPU load averages from /proc/loadavg /// Format: "0.52 0.58 0.59 1/257 12345" - async fn collect_load_averages(&self) -> Result, CollectorError> { + async fn collect_load_averages(&self, status_tracker: &mut StatusTracker) -> Result, CollectorError> { let content = utils::read_proc_file("/proc/loadavg")?; let parts: Vec<&str> = content.trim().split_whitespace().collect(); @@ -68,7 +71,7 @@ impl CpuCollector { // Only apply thresholds to 5-minute load average let load_1min_status = Status::Ok; // No alerting on 1min - let load_5min_status = self.calculate_load_status(load_5min); // Only 5min triggers alerts + let load_5min_status = self.calculate_load_status(registry::CPU_LOAD_5MIN, load_5min, status_tracker); // Only 5min triggers alerts let load_15min_status = Status::Ok; // No alerting on 15min Ok(vec![ @@ -95,14 +98,14 @@ impl CpuCollector { /// Collect CPU temperature from thermal zones /// Prioritizes x86_pkg_temp over generic thermal zones (legacy behavior) - async fn collect_temperature(&self) -> Result, CollectorError> { + async fn collect_temperature(&self, status_tracker: &mut StatusTracker) -> Result, CollectorError> { // Try x86_pkg_temp first (Intel CPU package temperature) if let Ok(temp) = self .read_thermal_zone("/sys/class/thermal/thermal_zone0/temp") .await { let temp_celsius = temp as f32 / 1000.0; - let status = self.calculate_temperature_status(temp_celsius); + let status = self.calculate_temperature_status(registry::CPU_TEMPERATURE_CELSIUS, temp_celsius, status_tracker); return Ok(Some( Metric::new( @@ -120,7 +123,7 @@ impl CpuCollector { let path = format!("/sys/class/thermal/thermal_zone{}/temp", zone_id); if let Ok(temp) = self.read_thermal_zone(&path).await { let temp_celsius = temp as f32 / 1000.0; - let status = self.calculate_temperature_status(temp_celsius); + let status = self.calculate_temperature_status(registry::CPU_TEMPERATURE_CELSIUS, temp_celsius, status_tracker); return Ok(Some( Metric::new( @@ -200,17 +203,17 @@ impl Collector for CpuCollector { &self.name } - async fn collect(&self) -> Result, CollectorError> { + async fn collect(&self, status_tracker: &mut StatusTracker) -> Result, CollectorError> { debug!("Collecting CPU metrics"); let start = std::time::Instant::now(); let mut metrics = Vec::with_capacity(5); // Pre-allocate for efficiency // Collect load averages (always available) - metrics.extend(self.collect_load_averages().await?); + metrics.extend(self.collect_load_averages(status_tracker).await?); // Collect temperature (optional) - if let Some(temp_metric) = self.collect_temperature().await? { + if let Some(temp_metric) = self.collect_temperature(status_tracker).await? { metrics.push(temp_metric); } diff --git a/agent/src/collectors/disk.rs b/agent/src/collectors/disk.rs index 0328c9f..ee8a532 100644 --- a/agent/src/collectors/disk.rs +++ b/agent/src/collectors/disk.rs @@ -1,6 +1,6 @@ use anyhow::Result; use async_trait::async_trait; -use cm_dashboard_shared::{Metric, MetricValue, Status}; +use cm_dashboard_shared::{Metric, MetricValue, Status, StatusTracker, HysteresisThresholds}; use crate::config::DiskConfig; use std::fs; @@ -28,11 +28,28 @@ struct MountedDisk { /// Disk usage collector for monitoring filesystem sizes pub struct DiskCollector { config: DiskConfig, + temperature_thresholds: HysteresisThresholds, } impl DiskCollector { pub fn new(config: DiskConfig) -> Self { - Self { config } + // Create hysteresis thresholds for disk temperature + let temperature_thresholds = HysteresisThresholds::with_custom_gaps( + 60.0, // warning at 60°C + 5.0, // 5°C gap for recovery + 70.0, // critical at 70°C + 5.0, // 5°C gap for recovery + ); + + Self { + config, + temperature_thresholds, + } + } + + /// Calculate disk temperature status using hysteresis thresholds + fn calculate_temperature_status(&self, metric_name: &str, temperature: f32, status_tracker: &mut StatusTracker) -> Status { + status_tracker.calculate_with_hysteresis(metric_name, temperature, &self.temperature_thresholds) } /// Resolve UUID to actual device path @@ -203,12 +220,6 @@ impl DiskCollector { Ok((total_bytes, used_bytes)) } - /// Get root filesystem disk usage - fn get_root_filesystem_usage(&self) -> Result<(u64, u64, f32)> { - let (total_bytes, used_bytes) = self.get_filesystem_info("/")?; - let usage_percent = (used_bytes as f64 / total_bytes as f64) * 100.0; - Ok((total_bytes, used_bytes, usage_percent as f32)) - } /// Get the physical device for a given device (resolves symlinks, gets parent device) @@ -339,7 +350,7 @@ impl Collector for DiskCollector { "disk" } - async fn collect(&self) -> Result, CollectorError> { + async fn collect(&self, status_tracker: &mut StatusTracker) -> Result, CollectorError> { let start_time = Instant::now(); debug!("Collecting multi-disk metrics"); @@ -497,13 +508,8 @@ impl Collector for DiskCollector { }); if temperature > 0.0 { - let temp_status = if temperature >= 70.0 { - Status::Critical - } else if temperature >= 60.0 { - Status::Warning - } else { - Status::Ok - }; + let metric_name = format!("disk_smart_{}_temperature", device_name); + let temp_status = self.calculate_temperature_status(&metric_name, temperature, status_tracker); metrics.push(Metric { name: format!("disk_smart_{}_temperature", device_name), diff --git a/agent/src/collectors/error.rs b/agent/src/collectors/error.rs index bd160a7..bbc2e56 100644 --- a/agent/src/collectors/error.rs +++ b/agent/src/collectors/error.rs @@ -4,7 +4,7 @@ use thiserror::Error; pub enum CollectorError { #[error("Failed to read system file {path}: {error}")] SystemRead { path: String, error: String }, - + #[error("Failed to parse value '{value}': {error}")] Parse { value: String, error: String }, -} \ No newline at end of file +} diff --git a/agent/src/collectors/memory.rs b/agent/src/collectors/memory.rs index 9cfbb81..bbbfc6e 100644 --- a/agent/src/collectors/memory.rs +++ b/agent/src/collectors/memory.rs @@ -1,13 +1,13 @@ use async_trait::async_trait; -use cm_dashboard_shared::{Metric, MetricValue, Status, registry}; +use cm_dashboard_shared::{registry, Metric, MetricValue, Status, StatusTracker, HysteresisThresholds}; use tracing::debug; -use super::{Collector, CollectorError, utils}; +use super::{utils, Collector, CollectorError}; use crate::config::MemoryConfig; /// Extremely efficient memory metrics collector -/// +/// /// EFFICIENCY OPTIMIZATIONS: /// - Single /proc/meminfo read for all memory metrics /// - Minimal string parsing with split operations @@ -17,6 +17,7 @@ use crate::config::MemoryConfig; pub struct MemoryCollector { config: MemoryConfig, name: String, + usage_thresholds: HysteresisThresholds, } /// Memory information parsed from /proc/meminfo @@ -33,36 +34,38 @@ struct MemoryInfo { impl MemoryCollector { pub fn new(config: MemoryConfig) -> Self { + // Create hysteresis thresholds with 5% gap for memory usage + let usage_thresholds = HysteresisThresholds::with_custom_gaps( + config.usage_warning_percent, + 5.0, // 5% gap for warning recovery + config.usage_critical_percent, + 5.0, // 5% gap for critical recovery + ); + Self { config, name: "memory".to_string(), - + usage_thresholds, } } - - /// Calculate memory usage status using configured thresholds - fn calculate_usage_status(&self, usage_percent: f32) -> Status { - if usage_percent >= self.config.usage_critical_percent { - Status::Critical - } else if usage_percent >= self.config.usage_warning_percent { - Status::Warning - } else { - Status::Ok - } + + /// Calculate memory usage status using hysteresis thresholds + fn calculate_usage_status(&self, metric_name: &str, usage_percent: f32, status_tracker: &mut StatusTracker) -> Status { + status_tracker.calculate_with_hysteresis(metric_name, usage_percent, &self.usage_thresholds) } - + /// Parse /proc/meminfo efficiently /// Format: "MemTotal: 16384000 kB" async fn parse_meminfo(&self) -> Result { let content = utils::read_proc_file("/proc/meminfo")?; let mut info = MemoryInfo::default(); - + // Parse each line efficiently - only extract what we need for line in content.lines() { if let Some(colon_pos) = line.find(':') { let key = &line[..colon_pos]; let value_part = &line[colon_pos + 1..]; - + // Extract number from value part (format: " 12345 kB") if let Some(number_str) = value_part.split_whitespace().next() { if let Ok(value_kb) = utils::parse_u64(number_str) { @@ -80,7 +83,7 @@ impl MemoryCollector { } } } - + // Validate that we got essential fields if info.total_kb == 0 { return Err(CollectorError::Parse { @@ -88,87 +91,105 @@ impl MemoryCollector { error: "MemTotal not found or zero in /proc/meminfo".to_string(), }); } - + // If MemAvailable is not available (older kernels), calculate it if info.available_kb == 0 { info.available_kb = info.free_kb + info.buffers_kb + info.cached_kb; } - + Ok(info) } - + /// Convert KB to GB efficiently (avoiding floating point in hot path) fn kb_to_gb(kb: u64) -> f32 { kb as f32 / 1_048_576.0 // 1024 * 1024 } - + /// Calculate memory metrics from parsed info - fn calculate_metrics(&self, info: &MemoryInfo) -> Vec { + fn calculate_metrics(&self, info: &MemoryInfo, status_tracker: &mut StatusTracker) -> Vec { let mut metrics = Vec::with_capacity(6); - + // Calculate derived values let used_kb = info.total_kb - info.available_kb; let usage_percent = (used_kb as f32 / info.total_kb as f32) * 100.0; - let usage_status = self.calculate_usage_status(usage_percent); - + let usage_status = self.calculate_usage_status(registry::MEMORY_USAGE_PERCENT, usage_percent, status_tracker); + let swap_used_kb = info.swap_total_kb - info.swap_free_kb; - + // Convert to GB for metrics let total_gb = Self::kb_to_gb(info.total_kb); let used_gb = Self::kb_to_gb(used_kb); let available_gb = Self::kb_to_gb(info.available_kb); let swap_total_gb = Self::kb_to_gb(info.swap_total_kb); let swap_used_gb = Self::kb_to_gb(swap_used_kb); - + // Memory usage percentage (primary metric with status) - metrics.push(Metric::new( - registry::MEMORY_USAGE_PERCENT.to_string(), - MetricValue::Float(usage_percent), - usage_status, - ).with_description("Memory usage percentage".to_string()) - .with_unit("%".to_string())); - + metrics.push( + Metric::new( + registry::MEMORY_USAGE_PERCENT.to_string(), + MetricValue::Float(usage_percent), + usage_status, + ) + .with_description("Memory usage percentage".to_string()) + .with_unit("%".to_string()), + ); + // Total memory - metrics.push(Metric::new( - registry::MEMORY_TOTAL_GB.to_string(), - MetricValue::Float(total_gb), - Status::Ok, // Total memory doesn't have status - ).with_description("Total system memory".to_string()) - .with_unit("GB".to_string())); - + metrics.push( + Metric::new( + registry::MEMORY_TOTAL_GB.to_string(), + MetricValue::Float(total_gb), + Status::Ok, // Total memory doesn't have status + ) + .with_description("Total system memory".to_string()) + .with_unit("GB".to_string()), + ); + // Used memory - metrics.push(Metric::new( - registry::MEMORY_USED_GB.to_string(), - MetricValue::Float(used_gb), - Status::Ok, // Used memory absolute value doesn't have status - ).with_description("Used system memory".to_string()) - .with_unit("GB".to_string())); - + metrics.push( + Metric::new( + registry::MEMORY_USED_GB.to_string(), + MetricValue::Float(used_gb), + Status::Ok, // Used memory absolute value doesn't have status + ) + .with_description("Used system memory".to_string()) + .with_unit("GB".to_string()), + ); + // Available memory - metrics.push(Metric::new( - registry::MEMORY_AVAILABLE_GB.to_string(), - MetricValue::Float(available_gb), - Status::Ok, // Available memory absolute value doesn't have status - ).with_description("Available system memory".to_string()) - .with_unit("GB".to_string())); - + metrics.push( + Metric::new( + registry::MEMORY_AVAILABLE_GB.to_string(), + MetricValue::Float(available_gb), + Status::Ok, // Available memory absolute value doesn't have status + ) + .with_description("Available system memory".to_string()) + .with_unit("GB".to_string()), + ); + // Swap metrics (only if swap exists) if info.swap_total_kb > 0 { - metrics.push(Metric::new( - registry::MEMORY_SWAP_TOTAL_GB.to_string(), - MetricValue::Float(swap_total_gb), - Status::Ok, - ).with_description("Total swap space".to_string()) - .with_unit("GB".to_string())); - - metrics.push(Metric::new( - registry::MEMORY_SWAP_USED_GB.to_string(), - MetricValue::Float(swap_used_gb), - Status::Ok, - ).with_description("Used swap space".to_string()) - .with_unit("GB".to_string())); + metrics.push( + Metric::new( + registry::MEMORY_SWAP_TOTAL_GB.to_string(), + MetricValue::Float(swap_total_gb), + Status::Ok, + ) + .with_description("Total swap space".to_string()) + .with_unit("GB".to_string()), + ); + + metrics.push( + Metric::new( + registry::MEMORY_SWAP_USED_GB.to_string(), + MetricValue::Float(swap_used_gb), + Status::Ok, + ) + .with_description("Used swap space".to_string()) + .with_unit("GB".to_string()), + ); } - + metrics } } @@ -178,34 +199,39 @@ impl Collector for MemoryCollector { fn name(&self) -> &str { &self.name } - - - async fn collect(&self) -> Result, CollectorError> { - + + async fn collect(&self, status_tracker: &mut StatusTracker) -> Result, CollectorError> { debug!("Collecting memory metrics"); let start = std::time::Instant::now(); - + // Parse memory info from /proc/meminfo let info = self.parse_meminfo().await?; - + // Calculate all metrics from parsed info - let metrics = self.calculate_metrics(&info); - + let metrics = self.calculate_metrics(&info, status_tracker); + let duration = start.elapsed(); - debug!("Memory collection completed in {:?} with {} metrics", duration, metrics.len()); - + debug!( + "Memory collection completed in {:?} with {} metrics", + duration, + metrics.len() + ); + // Efficiency check: warn if collection takes too long if duration.as_millis() > 1 { - debug!("Memory collection took {}ms - consider optimization", duration.as_millis()); + debug!( + "Memory collection took {}ms - consider optimization", + duration.as_millis() + ); } - + // Store performance metrics // Performance tracking handled by cache system - + Ok(metrics) } - + fn get_performance_metrics(&self) -> Option { None // Performance tracking handled by cache system } -} \ No newline at end of file +} diff --git a/agent/src/collectors/mod.rs b/agent/src/collectors/mod.rs index 47f575d..d2bbec8 100644 --- a/agent/src/collectors/mod.rs +++ b/agent/src/collectors/mod.rs @@ -1,16 +1,7 @@ use async_trait::async_trait; -use cm_dashboard_shared::Metric; +use cm_dashboard_shared::{Metric, StatusTracker}; use std::time::Duration; -pub mod cpu; -pub mod memory; -pub mod disk; -pub mod systemd; -pub mod backup; -pub mod error; - -pub use error::CollectorError; - /// Performance metrics for a collector #[derive(Debug, Clone)] pub struct PerformanceMetrics { @@ -18,69 +9,78 @@ pub struct PerformanceMetrics { pub collection_efficiency_percent: f32, } +pub mod backup; +pub mod cpu; +pub mod disk; +pub mod error; +pub mod memory; +pub mod systemd; + +pub use error::CollectorError; + + /// Base trait for all collectors with extreme efficiency requirements #[async_trait] pub trait Collector: Send + Sync { /// Name of this collector fn name(&self) -> &str; - + /// Collect all metrics this collector provides - async fn collect(&self) -> Result, CollectorError>; - + async fn collect(&self, status_tracker: &mut StatusTracker) -> Result, CollectorError>; + /// Get performance metrics for monitoring collector efficiency fn get_performance_metrics(&self) -> Option { None } + } /// CPU efficiency rules for all collectors pub mod efficiency { - /// CRITICAL: All collectors must follow these efficiency rules to minimize system impact - - /// 1. FILE READING RULES - /// - Read entire files in single syscall when possible - /// - Use BufReader only for very large files (>4KB) - /// - Never read files character by character - /// - Cache file descriptors when safe (immutable paths) - - /// 2. PARSING RULES - /// - Use split() instead of regex for simple patterns - /// - Parse numbers with from_str() not complex parsing - /// - Avoid string allocations in hot paths - /// - Use str::trim() before parsing numbers - - /// 3. MEMORY ALLOCATION RULES - /// - Reuse Vec buffers when possible - /// - Pre-allocate collections with known sizes - /// - Use str slices instead of String when possible - /// - Avoid clone() in hot paths - - /// 4. SYSTEM CALL RULES - /// - Minimize syscalls - prefer single reads over multiple - /// - Use /proc filesystem efficiently - /// - Avoid spawning processes when /proc data available - /// - Cache static data (like CPU count) - - /// 5. ERROR HANDLING RULES - /// - Use Result<> but minimize allocation in error paths - /// - Log errors at debug level only to avoid I/O overhead - /// - Graceful degradation - missing metrics better than failing - /// - Never panic in collectors - - /// 6. CONCURRENCY RULES - /// - Collectors must be thread-safe but avoid locks - /// - Use atomic operations for simple counters - /// - Avoid shared mutable state between collections - /// - Each collection should be independent - - pub const PERFORMANCE_TARGET_OVERHEAD_PERCENT: f32 = 0.1; + //! CRITICAL: All collectors must follow these efficiency rules to minimize system impact + //! + //! # FILE READING RULES + //! - Read entire files in single syscall when possible + //! - Use BufReader only for very large files (>4KB) + //! - Never read files character by character + //! - Cache file descriptors when safe (immutable paths) + //! + //! # PARSING RULES + //! - Use split() instead of regex for simple patterns + //! - Parse numbers with from_str() not complex parsing + //! - Avoid string allocations in hot paths + //! - Use str::trim() before parsing numbers + //! + //! # MEMORY ALLOCATION RULES + //! - Reuse Vec buffers when possible + //! - Pre-allocate collections with known sizes + //! - Use str slices instead of String when possible + //! - Avoid clone() in hot paths + //! + //! # SYSTEM CALL RULES + //! - Minimize syscalls - prefer single reads over multiple + //! - Use /proc filesystem efficiently + //! - Avoid spawning processes when /proc data available + //! - Cache static data (like CPU count) + //! + //! # ERROR HANDLING RULES + //! - Use Result<> but minimize allocation in error paths + //! - Log errors at debug level only to avoid I/O overhead + //! - Graceful degradation - missing metrics better than failing + //! - Never panic in collectors + //! + //! # CONCURRENCY RULES + //! - Collectors must be thread-safe but avoid locks + //! - Use atomic operations for simple counters + //! - Avoid shared mutable state between collections + //! - Each collection should be independent } /// Utility functions for efficient system data collection pub mod utils { - use std::fs; use super::CollectorError; - + use std::fs; + /// Read entire file content efficiently pub fn read_proc_file(path: &str) -> Result { fs::read_to_string(path).map_err(|e| CollectorError::SystemRead { @@ -88,25 +88,25 @@ pub mod utils { error: e.to_string(), }) } - + /// Parse float from string slice efficiently pub fn parse_f32(s: &str) -> Result { - s.trim().parse().map_err(|e: std::num::ParseFloatError| CollectorError::Parse { - value: s.to_string(), - error: e.to_string(), - }) + s.trim() + .parse() + .map_err(|e: std::num::ParseFloatError| CollectorError::Parse { + value: s.to_string(), + error: e.to_string(), + }) } - + /// Parse integer from string slice efficiently pub fn parse_u64(s: &str) -> Result { - s.trim().parse().map_err(|e: std::num::ParseIntError| CollectorError::Parse { - value: s.to_string(), - error: e.to_string(), - }) + s.trim() + .parse() + .map_err(|e: std::num::ParseIntError| CollectorError::Parse { + value: s.to_string(), + error: e.to_string(), + }) } - - /// Split string and get nth element safely - pub fn split_nth<'a>(s: &'a str, delimiter: char, n: usize) -> Option<&'a str> { - s.split(delimiter).nth(n) - } -} \ No newline at end of file + +} diff --git a/agent/src/collectors/systemd.rs b/agent/src/collectors/systemd.rs index e95f7f4..aa7bda6 100644 --- a/agent/src/collectors/systemd.rs +++ b/agent/src/collectors/systemd.rs @@ -1,6 +1,6 @@ use anyhow::Result; use async_trait::async_trait; -use cm_dashboard_shared::{Metric, MetricValue, Status}; +use cm_dashboard_shared::{Metric, MetricValue, Status, StatusTracker}; use std::process::Command; use std::sync::RwLock; use std::time::Instant; @@ -401,7 +401,7 @@ impl Collector for SystemdCollector { "systemd" } - async fn collect(&self) -> Result, CollectorError> { + async fn collect(&self, _status_tracker: &mut StatusTracker) -> Result, CollectorError> { let start_time = Instant::now(); debug!("Collecting systemd services metrics"); diff --git a/agent/src/communication/mod.rs b/agent/src/communication/mod.rs index 075deb6..235e8b7 100644 --- a/agent/src/communication/mod.rs +++ b/agent/src/communication/mod.rs @@ -1,6 +1,6 @@ use anyhow::Result; -use cm_dashboard_shared::{MetricMessage, MessageEnvelope}; -use tracing::{info, debug}; +use cm_dashboard_shared::{MessageEnvelope, MetricMessage}; +use tracing::{debug, info}; use zmq::{Context, Socket, SocketType}; use crate::config::ZmqConfig; @@ -15,75 +15,69 @@ pub struct ZmqHandler { impl ZmqHandler { pub async fn new(config: &ZmqConfig) -> Result { let context = Context::new(); - + // Create publisher socket for metrics let publisher = context.socket(SocketType::PUB)?; let pub_bind_address = format!("tcp://{}:{}", config.bind_address, config.publisher_port); publisher.bind(&pub_bind_address)?; - + info!("ZMQ publisher bound to {}", pub_bind_address); - + // Set socket options for efficiency publisher.set_sndhwm(1000)?; // High water mark for outbound messages publisher.set_linger(1000)?; // Linger time on close - + // Create command receiver socket (PULL socket to receive commands from dashboard) let command_receiver = context.socket(SocketType::PULL)?; let cmd_bind_address = format!("tcp://{}:{}", config.bind_address, config.command_port); command_receiver.bind(&cmd_bind_address)?; - + info!("ZMQ command receiver bound to {}", cmd_bind_address); - + // Set non-blocking mode for command receiver command_receiver.set_rcvtimeo(0)?; // Non-blocking receive command_receiver.set_linger(1000)?; - + Ok(Self { publisher, command_receiver, config: config.clone(), }) } - + /// Publish metrics message via ZMQ pub async fn publish_metrics(&self, message: &MetricMessage) -> Result<()> { - debug!("Publishing {} metrics for host {}", message.metrics.len(), message.hostname); - + debug!( + "Publishing {} metrics for host {}", + message.metrics.len(), + message.hostname + ); + // Create message envelope let envelope = MessageEnvelope::metrics(message.clone()) .map_err(|e| anyhow::anyhow!("Failed to create message envelope: {}", e))?; - + // Serialize envelope let serialized = serde_json::to_vec(&envelope)?; - + // Send via ZMQ self.publisher.send(&serialized, 0)?; - + debug!("Published metrics message ({} bytes)", serialized.len()); Ok(()) } - + /// Send heartbeat (placeholder for future use) - pub async fn send_heartbeat(&self) -> Result<()> { - let envelope = MessageEnvelope::heartbeat() - .map_err(|e| anyhow::anyhow!("Failed to create heartbeat envelope: {}", e))?; - - let serialized = serde_json::to_vec(&envelope)?; - self.publisher.send(&serialized, 0)?; - - debug!("Sent heartbeat"); - Ok(()) - } - + /// Try to receive a command (non-blocking) pub fn try_receive_command(&self) -> Result> { match self.command_receiver.recv_bytes(zmq::DONTWAIT) { Ok(bytes) => { debug!("Received command message ({} bytes)", bytes.len()); - + let command: AgentCommand = serde_json::from_slice(&bytes) .map_err(|e| anyhow::anyhow!("Failed to deserialize command: {}", e))?; - + debug!("Parsed command: {:?}", command); Ok(Some(command)) } @@ -107,4 +101,4 @@ pub enum AgentCommand { ToggleCollector { name: String, enabled: bool }, /// Request status/health check Ping, -} \ No newline at end of file +} diff --git a/agent/src/config/loader.rs b/agent/src/config/loader.rs index f913503..e22273c 100644 --- a/agent/src/config/loader.rs +++ b/agent/src/config/loader.rs @@ -1,18 +1,19 @@ -use anyhow::{Context, Result}; -use std::path::Path; -use std::fs; use crate::config::AgentConfig; +use anyhow::{Context, Result}; +use std::fs; +use std::path::Path; pub fn load_config>(path: P) -> Result { let path = path.as_ref(); let content = fs::read_to_string(path) .with_context(|| format!("Failed to read config file: {}", path.display()))?; - + let config: AgentConfig = toml::from_str(&content) .with_context(|| format!("Failed to parse config file: {}", path.display()))?; - - config.validate() + + config + .validate() .with_context(|| format!("Invalid configuration in file: {}", path.display()))?; - + Ok(config) -} \ No newline at end of file +} diff --git a/agent/src/config/mod.rs b/agent/src/config/mod.rs index 8d851e0..381591f 100644 --- a/agent/src/config/mod.rs +++ b/agent/src/config/mod.rs @@ -1,6 +1,5 @@ use anyhow::Result; use cm_dashboard_shared::CacheConfig; -use gethostname::gethostname; use serde::{Deserialize, Serialize}; use std::path::Path; diff --git a/agent/src/config/validation.rs b/agent/src/config/validation.rs index 49e188f..11c558e 100644 --- a/agent/src/config/validation.rs +++ b/agent/src/config/validation.rs @@ -1,114 +1,126 @@ -use anyhow::{bail, Result}; use crate::config::AgentConfig; +use anyhow::{bail, Result}; pub fn validate_config(config: &AgentConfig) -> Result<()> { // Validate ZMQ configuration if config.zmq.publisher_port == 0 { bail!("ZMQ publisher port cannot be 0"); } - + if config.zmq.command_port == 0 { bail!("ZMQ command port cannot be 0"); } - + if config.zmq.publisher_port == config.zmq.command_port { bail!("ZMQ publisher and command ports cannot be the same"); } - + if config.zmq.bind_address.is_empty() { bail!("ZMQ bind address cannot be empty"); } - + if config.zmq.timeout_ms == 0 { bail!("ZMQ timeout cannot be 0"); } - + // Validate collection interval if config.collection_interval_seconds == 0 { bail!("Collection interval cannot be 0"); } - + // Validate CPU thresholds if config.collectors.cpu.enabled { if config.collectors.cpu.load_warning_threshold <= 0.0 { bail!("CPU load warning threshold must be positive"); } - - if config.collectors.cpu.load_critical_threshold <= config.collectors.cpu.load_warning_threshold { + + if config.collectors.cpu.load_critical_threshold + <= config.collectors.cpu.load_warning_threshold + { bail!("CPU load critical threshold must be greater than warning threshold"); } - + if config.collectors.cpu.temperature_warning_threshold <= 0.0 { bail!("CPU temperature warning threshold must be positive"); } - - if config.collectors.cpu.temperature_critical_threshold <= config.collectors.cpu.temperature_warning_threshold { + + if config.collectors.cpu.temperature_critical_threshold + <= config.collectors.cpu.temperature_warning_threshold + { bail!("CPU temperature critical threshold must be greater than warning threshold"); } } - + // Validate memory thresholds if config.collectors.memory.enabled { - if config.collectors.memory.usage_warning_percent <= 0.0 || config.collectors.memory.usage_warning_percent > 100.0 { + if config.collectors.memory.usage_warning_percent <= 0.0 + || config.collectors.memory.usage_warning_percent > 100.0 + { bail!("Memory usage warning threshold must be between 0 and 100"); } - - if config.collectors.memory.usage_critical_percent <= config.collectors.memory.usage_warning_percent - || config.collectors.memory.usage_critical_percent > 100.0 { + + if config.collectors.memory.usage_critical_percent + <= config.collectors.memory.usage_warning_percent + || config.collectors.memory.usage_critical_percent > 100.0 + { bail!("Memory usage critical threshold must be between warning threshold and 100"); } } - + // Validate disk thresholds if config.collectors.disk.enabled { - if config.collectors.disk.usage_warning_percent <= 0.0 || config.collectors.disk.usage_warning_percent > 100.0 { + if config.collectors.disk.usage_warning_percent <= 0.0 + || config.collectors.disk.usage_warning_percent > 100.0 + { bail!("Disk usage warning threshold must be between 0 and 100"); } - - if config.collectors.disk.usage_critical_percent <= config.collectors.disk.usage_warning_percent - || config.collectors.disk.usage_critical_percent > 100.0 { + + if config.collectors.disk.usage_critical_percent + <= config.collectors.disk.usage_warning_percent + || config.collectors.disk.usage_critical_percent > 100.0 + { bail!("Disk usage critical threshold must be between warning threshold and 100"); } } - + // Validate SMTP configuration if config.notifications.enabled { if config.notifications.smtp_host.is_empty() { bail!("SMTP host cannot be empty when notifications are enabled"); } - + if config.notifications.smtp_port == 0 { bail!("SMTP port cannot be 0"); } - + if config.notifications.from_email.is_empty() { bail!("From email cannot be empty when notifications are enabled"); } - + if config.notifications.to_email.is_empty() { bail!("To email cannot be empty when notifications are enabled"); } - + // Basic email validation if !config.notifications.from_email.contains('@') { bail!("From email must contain @ symbol"); } - + if !config.notifications.to_email.contains('@') { bail!("To email must contain @ symbol"); } } - + // Validate cache configuration if config.cache.enabled { if config.cache.default_ttl_seconds == 0 { bail!("Cache TTL cannot be 0"); } - + if config.cache.max_entries == 0 { bail!("Cache max entries cannot be 0"); } } - + Ok(()) -} \ No newline at end of file +} diff --git a/agent/src/main.rs b/agent/src/main.rs index 5b30af1..b78e845 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -1,14 +1,14 @@ use anyhow::Result; use clap::Parser; -use tracing::{info, error}; +use tracing::{error, info}; use tracing_subscriber::EnvFilter; mod agent; mod cache; -mod config; -mod communication; -mod metrics; mod collectors; +mod communication; +mod config; +mod metrics; mod notifications; mod utils; @@ -22,7 +22,7 @@ struct Cli { /// Increase logging verbosity (-v, -vv) #[arg(short, long, action = clap::ArgAction::Count)] verbose: u8, - + /// Configuration file path #[arg(short, long)] config: Option, @@ -31,32 +31,32 @@ struct Cli { #[tokio::main] async fn main() -> Result<()> { let cli = Cli::parse(); - + // Setup logging let log_level = match cli.verbose { 0 => "info", - 1 => "debug", + 1 => "debug", _ => "trace", }; - + tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env().add_directive(log_level.parse()?)) .init(); - + info!("CM Dashboard Agent starting with individual metrics architecture..."); - + // Create and run agent let mut agent = Agent::new(cli.config).await?; - + // Setup graceful shutdown channel let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); - + let ctrl_c = async { tokio::signal::ctrl_c() .await .expect("failed to install Ctrl+C handler"); }; - + // Run agent with graceful shutdown tokio::select! { result = agent.run(shutdown_rx) => { @@ -72,7 +72,7 @@ async fn main() -> Result<()> { tokio::time::sleep(std::time::Duration::from_millis(100)).await; } } - + info!("Agent shutdown complete"); Ok(()) -} \ No newline at end of file +} diff --git a/agent/src/metrics/mod.rs b/agent/src/metrics/mod.rs index 8fb41a6..191d26a 100644 --- a/agent/src/metrics/mod.rs +++ b/agent/src/metrics/mod.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use cm_dashboard_shared::Metric; +use cm_dashboard_shared::{Metric, StatusTracker}; use std::collections::HashMap; use std::time::Instant; use tracing::{debug, error, info}; @@ -16,6 +16,7 @@ pub struct MetricCollectionManager { collectors: Vec>, cache_manager: MetricCacheManager, last_collection_times: HashMap, + status_tracker: StatusTracker, } impl MetricCollectionManager { @@ -117,6 +118,7 @@ impl MetricCollectionManager { collectors, cache_manager, last_collection_times: HashMap::new(), + status_tracker: StatusTracker::new(), }) } @@ -134,7 +136,7 @@ impl MetricCollectionManager { for collector in &self.collectors { let collector_name = collector.name(); - match collector.collect().await { + match collector.collect(&mut self.status_tracker).await { Ok(metrics) => { info!( "Force collected {} metrics from {} collector", @@ -200,7 +202,7 @@ impl MetricCollectionManager { if should_collect { collecting_fresh.insert(collector_name.to_string()); - match collector.collect().await { + match collector.collect(&mut self.status_tracker).await { Ok(metrics) => { // Collector returned fresh metrics (debug logging disabled for performance) diff --git a/agent/src/utils/mod.rs b/agent/src/utils/mod.rs index 79e3c33..de66f0a 100644 --- a/agent/src/utils/mod.rs +++ b/agent/src/utils/mod.rs @@ -3,41 +3,42 @@ /// System information utilities pub mod system { use std::fs; - + /// Get number of CPU cores efficiently pub fn get_cpu_count() -> Result { // Try /proc/cpuinfo first (most reliable) if let Ok(content) = fs::read_to_string("/proc/cpuinfo") { - let count = content.lines() + let count = content + .lines() .filter(|line| line.starts_with("processor")) .count(); - + if count > 0 { return Ok(count); } } - + // Fallback to nproc equivalent match std::thread::available_parallelism() { Ok(count) => Ok(count.get()), Err(_) => Ok(1), // Default to 1 core if all else fails } } - + /// Check if running in container pub fn is_container() -> bool { // Check for common container indicators - fs::metadata("/.dockerenv").is_ok() || - fs::read_to_string("/proc/1/cgroup") - .map(|content| content.contains("docker") || content.contains("containerd")) - .unwrap_or(false) + fs::metadata("/.dockerenv").is_ok() + || fs::read_to_string("/proc/1/cgroup") + .map(|content| content.contains("docker") || content.contains("containerd")) + .unwrap_or(false) } } /// Time utilities pub mod time { use std::time::{Duration, Instant}; - + /// Measure execution time of a closure pub fn measure_time(f: F) -> (R, Duration) where @@ -54,14 +55,14 @@ pub mod time { pub mod perf { use std::time::{Duration, Instant}; use tracing::warn; - + /// Performance monitor for critical operations pub struct PerfMonitor { operation: String, start: Instant, warning_threshold: Duration, } - + impl PerfMonitor { pub fn new(operation: &str, warning_threshold: Duration) -> Self { Self { @@ -70,12 +71,12 @@ pub mod perf { warning_threshold, } } - + pub fn new_ms(operation: &str, warning_threshold_ms: u64) -> Self { Self::new(operation, Duration::from_millis(warning_threshold_ms)) } } - + impl Drop for PerfMonitor { fn drop(&mut self) { let elapsed = self.start.elapsed(); @@ -87,4 +88,4 @@ pub mod perf { } } } -} \ No newline at end of file +} diff --git a/dashboard/src/app.rs b/dashboard/src/app.rs index 85ede64..cdb498f 100644 --- a/dashboard/src/app.rs +++ b/dashboard/src/app.rs @@ -4,16 +4,13 @@ use crossterm::{ execute, terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, }; -use ratatui::{ - backend::CrosstermBackend, - Terminal, -}; +use ratatui::{backend::CrosstermBackend, Terminal}; use std::io; use std::time::{Duration, Instant}; -use tracing::{info, error, debug, warn}; +use tracing::{debug, error, info, warn}; +use crate::communication::{AgentCommand, ZmqCommandSender, ZmqConsumer}; use crate::config::DashboardConfig; -use crate::communication::{ZmqConsumer, ZmqCommandSender, AgentCommand}; use crate::metrics::MetricStore; use crate::ui::TuiApp; @@ -30,14 +27,14 @@ pub struct Dashboard { impl Dashboard { pub async fn new(config_path: Option, headless: bool) -> Result { info!("Initializing dashboard"); - + // Load configuration let config = if let Some(path) = config_path { DashboardConfig::load_from_file(&path)? } else { DashboardConfig::default() }; - + // Initialize ZMQ consumer let mut zmq_consumer = match ZmqConsumer::new(&config.zmq).await { Ok(consumer) => consumer, @@ -46,7 +43,7 @@ impl Dashboard { return Err(e); } }; - + // Initialize ZMQ command sender let zmq_command_sender = match ZmqCommandSender::new(&config.zmq) { Ok(sender) => sender, @@ -55,22 +52,25 @@ impl Dashboard { return Err(e); } }; - + // Connect to predefined hosts from configuration let hosts = config.hosts.predefined_hosts.clone(); - + // Try to connect to hosts but don't fail if none are available match zmq_consumer.connect_to_predefined_hosts(&hosts).await { Ok(_) => info!("Successfully connected to ZMQ hosts"), Err(e) => { - warn!("Failed to connect to hosts (this is normal if no agents are running): {}", e); + warn!( + "Failed to connect to hosts (this is normal if no agents are running): {}", + e + ); info!("Dashboard will start anyway and connect when agents become available"); } } - + // Initialize metric store let metric_store = MetricStore::new(10000, 24); // 10k metrics, 24h retention - + // Initialize TUI components only if not headless let (tui_app, terminal) = if headless { info!("Running in headless mode (no TUI)"); @@ -78,22 +78,24 @@ impl Dashboard { } else { // Initialize TUI app let tui_app = TuiApp::new(); - + // Setup terminal if let Err(e) = enable_raw_mode() { error!("Failed to enable raw mode: {}", e); - error!("This usually means the dashboard is being run without a proper terminal (TTY)"); + error!( + "This usually means the dashboard is being run without a proper terminal (TTY)" + ); error!("Try running with --headless flag or in a proper terminal"); return Err(e.into()); } - + let mut stdout = io::stdout(); if let Err(e) = execute!(stdout, EnterAlternateScreen) { error!("Failed to enter alternate screen: {}", e); let _ = disable_raw_mode(); return Err(e.into()); } - + let backend = CrosstermBackend::new(stdout); let terminal = match Terminal::new(backend) { Ok(term) => term, @@ -103,12 +105,12 @@ impl Dashboard { return Err(e.into()); } }; - + (Some(tui_app), Some(terminal)) }; - + info!("Dashboard initialization complete"); - + Ok(Self { zmq_consumer, zmq_command_sender, @@ -119,66 +121,65 @@ impl Dashboard { initial_commands_sent: std::collections::HashSet::new(), }) } - + /// Send a command to a specific agent pub async fn send_command(&mut self, hostname: &str, command: AgentCommand) -> Result<()> { - self.zmq_command_sender.send_command(hostname, command).await + self.zmq_command_sender + .send_command(hostname, command) + .await } - - + pub async fn run(&mut self) -> Result<()> { info!("Starting dashboard main loop"); - + let mut last_metrics_check = Instant::now(); let metrics_check_interval = Duration::from_millis(100); // Check for metrics every 100ms - + loop { // Handle terminal events (keyboard input) only if not headless if !self.headless { match event::poll(Duration::from_millis(50)) { Ok(true) => { match event::read() { - Ok(Event::Key(key)) => { - match key.code { - KeyCode::Char('q') => { - info!("Quit key pressed, exiting dashboard"); - break; - } - KeyCode::Left => { - debug!("Navigate left"); - if let Some(ref mut tui_app) = self.tui_app { - if let Err(e) = tui_app.handle_input(Event::Key(key)) { - error!("Error handling left navigation: {}", e); - } - } - } - KeyCode::Right => { - debug!("Navigate right"); - if let Some(ref mut tui_app) = self.tui_app { - if let Err(e) = tui_app.handle_input(Event::Key(key)) { - error!("Error handling right navigation: {}", e); - } - } - } - KeyCode::Char('r') => { - debug!("Refresh requested"); - if let Some(ref mut tui_app) = self.tui_app { - if let Err(e) = tui_app.handle_input(Event::Key(key)) { - error!("Error handling refresh: {}", e); - } - } - } - KeyCode::Tab => { - debug!("Tab pressed - next host"); - if let Some(ref mut tui_app) = self.tui_app { - if let Err(e) = tui_app.handle_input(Event::Key(key)) { - error!("Error handling tab navigation: {}", e); - } - } - } - _ => {} + Ok(Event::Key(key)) => match key.code { + KeyCode::Char('q') => { + info!("Quit key pressed, exiting dashboard"); + break; } - } + KeyCode::Left => { + debug!("Navigate left"); + if let Some(ref mut tui_app) = self.tui_app { + if let Err(e) = tui_app.handle_input(Event::Key(key)) { + error!("Error handling left navigation: {}", e); + } + } + } + KeyCode::Right => { + debug!("Navigate right"); + if let Some(ref mut tui_app) = self.tui_app { + if let Err(e) = tui_app.handle_input(Event::Key(key)) { + error!("Error handling right navigation: {}", e); + } + } + } + KeyCode::Char('r') => { + debug!("Refresh requested"); + if let Some(ref mut tui_app) = self.tui_app { + if let Err(e) = tui_app.handle_input(Event::Key(key)) { + error!("Error handling refresh: {}", e); + } + } + } + KeyCode::Tab => { + debug!("Tab pressed - next host"); + if let Some(ref mut tui_app) = self.tui_app { + if let Err(e) = tui_app.handle_input(Event::Key(key)) { + error!("Error handling tab navigation: {}", e); + } + } + } + _ => {} + }, Ok(_) => {} // Other events (mouse, resize, etc.) Err(e) => { error!("Error reading terminal event: {}", e); @@ -193,44 +194,67 @@ impl Dashboard { } } } - + // Check for new metrics if last_metrics_check.elapsed() >= metrics_check_interval { if let Ok(Some(metric_message)) = self.zmq_consumer.receive_metrics().await { - debug!("Received metrics from {}: {} metrics", - metric_message.hostname, metric_message.metrics.len()); - + debug!( + "Received metrics from {}: {} metrics", + metric_message.hostname, + metric_message.metrics.len() + ); + // Check if this is the first time we've seen this host - let is_new_host = !self.initial_commands_sent.contains(&metric_message.hostname); - + let is_new_host = !self + .initial_commands_sent + .contains(&metric_message.hostname); + if is_new_host { - info!("First contact with host {}, sending initial CollectNow command", metric_message.hostname); - + info!( + "First contact with host {}, sending initial CollectNow command", + metric_message.hostname + ); + // Send CollectNow command for immediate refresh - if let Err(e) = self.send_command(&metric_message.hostname, AgentCommand::CollectNow).await { - error!("Failed to send initial CollectNow command to {}: {}", metric_message.hostname, e); + if let Err(e) = self + .send_command(&metric_message.hostname, AgentCommand::CollectNow) + .await + { + error!( + "Failed to send initial CollectNow command to {}: {}", + metric_message.hostname, e + ); } else { - info!("✓ Sent initial CollectNow command to {}", metric_message.hostname); - self.initial_commands_sent.insert(metric_message.hostname.clone()); + info!( + "✓ Sent initial CollectNow command to {}", + metric_message.hostname + ); + self.initial_commands_sent + .insert(metric_message.hostname.clone()); } } - + // Update metric store - self.metric_store.update_metrics(&metric_message.hostname, metric_message.metrics); - + self.metric_store + .update_metrics(&metric_message.hostname, metric_message.metrics); + // Update TUI with new hosts and metrics (only if not headless) if let Some(ref mut tui_app) = self.tui_app { - let connected_hosts = self.metric_store.get_connected_hosts(Duration::from_secs(30)); + let connected_hosts = self + .metric_store + .get_connected_hosts(Duration::from_secs(30)); tui_app.update_hosts(connected_hosts); tui_app.update_metrics(&self.metric_store); } } last_metrics_check = Instant::now(); } - + // Render TUI (only if not headless) if !self.headless { - if let (Some(ref mut terminal), Some(ref mut tui_app)) = (&mut self.terminal, &mut self.tui_app) { + if let (Some(ref mut terminal), Some(ref mut tui_app)) = + (&mut self.terminal, &mut self.tui_app) + { if let Err(e) = terminal.draw(|frame| { tui_app.render(frame, &self.metric_store); }) { @@ -239,11 +263,11 @@ impl Dashboard { } } } - + // Small sleep to prevent excessive CPU usage tokio::time::sleep(Duration::from_millis(10)).await; } - + info!("Dashboard main loop ended"); Ok(()) } @@ -255,12 +279,9 @@ impl Drop for Dashboard { if !self.headless { let _ = disable_raw_mode(); if let Some(ref mut terminal) = self.terminal { - let _ = execute!( - terminal.backend_mut(), - LeaveAlternateScreen - ); + let _ = execute!(terminal.backend_mut(), LeaveAlternateScreen); let _ = terminal.show_cursor(); } } } -} \ No newline at end of file +} diff --git a/dashboard/src/communication/mod.rs b/dashboard/src/communication/mod.rs index aa3fe7c..af7ecb0 100644 --- a/dashboard/src/communication/mod.rs +++ b/dashboard/src/communication/mod.rs @@ -1,6 +1,6 @@ use anyhow::Result; -use cm_dashboard_shared::{MetricMessage, MessageEnvelope, MessageType}; -use tracing::{info, error, debug, warn}; +use cm_dashboard_shared::{MessageEnvelope, MessageType, MetricMessage}; +use tracing::{debug, error, info, warn}; use zmq::{Context, Socket, SocketType}; use crate::config::ZmqConfig; @@ -28,27 +28,27 @@ pub struct ZmqConsumer { impl ZmqConsumer { pub async fn new(config: &ZmqConfig) -> Result { let context = Context::new(); - + // Create subscriber socket let subscriber = context.socket(SocketType::SUB)?; - + // Set socket options subscriber.set_rcvtimeo(1000)?; // 1 second timeout for non-blocking receives subscriber.set_subscribe(b"")?; // Subscribe to all messages - + info!("ZMQ consumer initialized"); - + Ok(Self { subscriber, config: config.clone(), connected_hosts: std::collections::HashSet::new(), }) } - + /// Connect to a specific host's agent pub async fn connect_to_host(&mut self, hostname: &str, port: u16) -> Result<()> { let address = format!("tcp://{}:{}", hostname, port); - + match self.subscriber.connect(&address) { Ok(()) => { info!("Connected to agent at {}", address); @@ -61,44 +61,50 @@ impl ZmqConsumer { } } } - + /// Connect to predefined hosts pub async fn connect_to_predefined_hosts(&mut self, hosts: &[String]) -> Result<()> { let default_port = self.config.subscriber_ports[0]; - + for hostname in hosts { // Try to connect, but don't fail if some hosts are unreachable if let Err(e) = self.connect_to_host(hostname, default_port).await { warn!("Could not connect to {}: {}", hostname, e); } } - - info!("Connected to {} out of {} configured hosts", - self.connected_hosts.len(), hosts.len()); - + + info!( + "Connected to {} out of {} configured hosts", + self.connected_hosts.len(), + hosts.len() + ); + Ok(()) } - - + /// Receive metrics from any connected agent (non-blocking) pub async fn receive_metrics(&mut self) -> Result> { match self.subscriber.recv_bytes(zmq::DONTWAIT) { Ok(data) => { debug!("Received {} bytes from ZMQ", data.len()); - + // Deserialize envelope let envelope: MessageEnvelope = serde_json::from_slice(&data) .map_err(|e| anyhow::anyhow!("Failed to deserialize envelope: {}", e))?; - + // Check message type match envelope.message_type { MessageType::Metrics => { - let metrics = envelope.decode_metrics() + let metrics = envelope + .decode_metrics() .map_err(|e| anyhow::anyhow!("Failed to decode metrics: {}", e))?; - - debug!("Received {} metrics from {}", - metrics.metrics.len(), metrics.hostname); - + + debug!( + "Received {} metrics from {}", + metrics.metrics.len(), + metrics.hostname + ); + Ok(Some(metrics)) } MessageType::Heartbeat => { @@ -121,7 +127,6 @@ impl ZmqConsumer { } } } - } /// ZMQ command sender for sending commands to agents @@ -132,36 +137,34 @@ pub struct ZmqCommandSender { impl ZmqCommandSender { pub fn new(_config: &ZmqConfig) -> Result { let context = Context::new(); - + info!("ZMQ command sender initialized"); - - Ok(Self { - context, - }) + + Ok(Self { context }) } - + /// Send a command to a specific agent pub async fn send_command(&self, hostname: &str, command: AgentCommand) -> Result<()> { // Create a new PUSH socket for this command (ZMQ best practice) let socket = self.context.socket(SocketType::PUSH)?; - + // Set socket options socket.set_linger(1000)?; // Wait up to 1 second on close socket.set_sndtimeo(5000)?; // 5 second send timeout - + // Connect to agent's command port (6131) let address = format!("tcp://{}:6131", hostname); socket.connect(&address)?; - + // Serialize command let serialized = serde_json::to_vec(&command)?; - + // Send command socket.send(&serialized, 0)?; - + info!("Sent command {:?} to agent at {}", command, hostname); - + // Socket will be automatically closed when dropped Ok(()) } -} \ No newline at end of file +} diff --git a/dashboard/src/config/mod.rs b/dashboard/src/config/mod.rs index 594882c..02a5445 100644 --- a/dashboard/src/config/mod.rs +++ b/dashboard/src/config/mod.rs @@ -171,4 +171,4 @@ impl Default for WidgetsConfig { }, } } -} \ No newline at end of file +} diff --git a/dashboard/src/main.rs b/dashboard/src/main.rs index 8f5ecfa..1511129 100644 --- a/dashboard/src/main.rs +++ b/dashboard/src/main.rs @@ -1,14 +1,14 @@ use anyhow::Result; use clap::Parser; -use tracing::{info, error}; +use tracing::{error, info}; use tracing_subscriber::EnvFilter; mod app; -mod config; mod communication; +mod config; +mod hosts; mod metrics; mod ui; -mod hosts; mod utils; use app::Dashboard; @@ -21,11 +21,11 @@ struct Cli { /// Increase logging verbosity (-v, -vv) #[arg(short, long, action = clap::ArgAction::Count)] verbose: u8, - + /// Configuration file path #[arg(short, long)] config: Option, - + /// Run in headless mode (no TUI, just logging) #[arg(long)] headless: bool, @@ -34,16 +34,16 @@ struct Cli { #[tokio::main] async fn main() -> Result<()> { let cli = Cli::parse(); - + // Setup logging - only if headless or verbose if cli.headless || cli.verbose > 0 { let log_level = match cli.verbose { - 0 => "warn", // Only warnings and errors when not verbose + 0 => "warn", // Only warnings and errors when not verbose 1 => "info", - 2 => "debug", + 2 => "debug", _ => "trace", }; - + tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env().add_directive(log_level.parse()?)) .init(); @@ -53,21 +53,21 @@ async fn main() -> Result<()> { .with_env_filter(EnvFilter::from_default_env().add_directive("off".parse()?)) .init(); } - + if cli.headless || cli.verbose > 0 { info!("CM Dashboard starting with individual metrics architecture..."); } - + // Create and run dashboard let mut dashboard = Dashboard::new(cli.config, cli.headless).await?; - + // Setup graceful shutdown let ctrl_c = async { tokio::signal::ctrl_c() .await .expect("failed to install Ctrl+C handler"); }; - + // Run dashboard with graceful shutdown tokio::select! { result = dashboard.run() => { @@ -80,9 +80,9 @@ async fn main() -> Result<()> { info!("Shutdown signal received"); } } - + if cli.headless || cli.verbose > 0 { info!("Dashboard shutdown complete"); } Ok(()) -} \ No newline at end of file +} diff --git a/dashboard/src/metrics/mod.rs b/dashboard/src/metrics/mod.rs index 8749c83..b408b94 100644 --- a/dashboard/src/metrics/mod.rs +++ b/dashboard/src/metrics/mod.rs @@ -4,11 +4,8 @@ pub mod store; pub use store::MetricStore; - /// Historical metric data point #[derive(Debug, Clone)] pub struct MetricDataPoint { pub received_at: Instant, } - - diff --git a/dashboard/src/metrics/store.rs b/dashboard/src/metrics/store.rs index 4769bcc..801548f 100644 --- a/dashboard/src/metrics/store.rs +++ b/dashboard/src/metrics/store.rs @@ -28,62 +28,62 @@ impl MetricStore { history_retention: Duration::from_secs(history_retention_hours * 3600), } } - + /// Update metrics for a specific host pub fn update_metrics(&mut self, hostname: &str, metrics: Vec) { let now = Instant::now(); - + debug!("Updating {} metrics for host {}", metrics.len(), hostname); - + // Get or create host entry - let host_metrics = self.current_metrics + let host_metrics = self + .current_metrics .entry(hostname.to_string()) .or_insert_with(HashMap::new); - + // Get or create historical entry - let host_history = self.historical_metrics + let host_history = self + .historical_metrics .entry(hostname.to_string()) .or_insert_with(Vec::new); - + // Update current metrics and add to history for metric in metrics { let metric_name = metric.name.clone(); - + // Store current metric host_metrics.insert(metric_name.clone(), metric.clone()); - + // Add to history - host_history.push(MetricDataPoint { - received_at: now, - }); + host_history.push(MetricDataPoint { received_at: now }); } - + // Update last update timestamp self.last_update.insert(hostname.to_string(), now); - + // Get metrics count before cleanup let metrics_count = host_metrics.len(); - + // Cleanup old history and enforce limits self.cleanup_host_data(hostname); - - info!("Updated metrics for {}: {} current metrics", - hostname, metrics_count); + + info!( + "Updated metrics for {}: {} current metrics", + hostname, metrics_count + ); } - + /// Get current metric for a specific host pub fn get_metric(&self, hostname: &str, metric_name: &str) -> Option<&Metric> { - self.current_metrics - .get(hostname)? - .get(metric_name) + self.current_metrics.get(hostname)?.get(metric_name) } - + /// Get all current metrics for a host #[allow(dead_code)] pub fn get_host_metrics(&self, hostname: &str) -> Option<&HashMap> { self.current_metrics.get(hostname) } - + /// Get all current metrics for a host as a vector pub fn get_metrics_for_host(&self, hostname: &str) -> Vec<&Metric> { if let Some(metrics_map) = self.current_metrics.get(hostname) { @@ -92,13 +92,11 @@ impl MetricStore { Vec::new() } } - - - + /// Get connected hosts (hosts with recent updates) pub fn get_connected_hosts(&self, timeout: Duration) -> Vec { let now = Instant::now(); - + self.last_update .iter() .filter_map(|(hostname, &last_update)| { @@ -110,26 +108,25 @@ impl MetricStore { }) .collect() } - - - + /// Cleanup old data and enforce limits fn cleanup_host_data(&mut self, hostname: &str) { let now = Instant::now(); - + // Cleanup historical data if let Some(history) = self.historical_metrics.get_mut(hostname) { // Remove old entries history.retain(|dp| now.duration_since(dp.received_at) <= self.history_retention); - + // Enforce size limit if history.len() > self.max_metrics_per_host { let excess = history.len() - self.max_metrics_per_host; history.drain(0..excess); - warn!("Trimmed {} old metrics for host {} (size limit: {})", - excess, hostname, self.max_metrics_per_host); + warn!( + "Trimmed {} old metrics for host {} (size limit: {})", + excess, hostname, self.max_metrics_per_host + ); } } } - -} \ No newline at end of file +} diff --git a/dashboard/src/ui/theme.rs b/dashboard/src/ui/theme.rs index edf0f49..db5aec1 100644 --- a/dashboard/src/ui/theme.rs +++ b/dashboard/src/ui/theme.rs @@ -1,6 +1,6 @@ -use ratatui::style::{Color, Style, Modifier}; -use ratatui::widgets::{Block, Borders}; use cm_dashboard_shared::Status; +use ratatui::style::{Color, Modifier, Style}; +use ratatui::widgets::{Block, Borders}; /// Complete terminal color palette matching your configuration #[allow(dead_code)] @@ -10,7 +10,7 @@ pub struct TerminalColors { pub dim_foreground: Color, pub bright_foreground: Color, pub background: Color, - + // Normal colors pub normal_black: Color, pub normal_red: Color, @@ -20,7 +20,7 @@ pub struct TerminalColors { pub normal_magenta: Color, pub normal_cyan: Color, pub normal_white: Color, - + // Bright colors pub bright_black: Color, pub bright_red: Color, @@ -30,7 +30,7 @@ pub struct TerminalColors { pub bright_magenta: Color, pub bright_cyan: Color, pub bright_white: Color, - + // Dim colors pub dim_black: Color, pub dim_red: Color, @@ -46,40 +46,40 @@ impl Default for TerminalColors { fn default() -> Self { Self { // Primary colors - foreground: Color::Rgb(198, 198, 198), // #c6c6c6 - dim_foreground: Color::Rgb(112, 112, 112), // #707070 + foreground: Color::Rgb(198, 198, 198), // #c6c6c6 + dim_foreground: Color::Rgb(112, 112, 112), // #707070 bright_foreground: Color::Rgb(255, 255, 255), // #ffffff - background: Color::Rgb(38, 38, 38), // #262626 - + background: Color::Rgb(38, 38, 38), // #262626 + // Normal colors - normal_black: Color::Rgb(0, 0, 0), // #000000 - normal_red: Color::Rgb(215, 84, 0), // #d75400 - normal_green: Color::Rgb(175, 215, 135), // #afd787 - normal_yellow: Color::Rgb(215, 175, 95), // #d7af5f - normal_blue: Color::Rgb(135, 175, 215), // #87afd7 - normal_magenta: Color::Rgb(215, 215, 175), // #d7d7af - normal_cyan: Color::Rgb(160, 160, 160), // #a0a0a0 - normal_white: Color::Rgb(238, 238, 238), // #eeeeee - + normal_black: Color::Rgb(0, 0, 0), // #000000 + normal_red: Color::Rgb(215, 84, 0), // #d75400 + normal_green: Color::Rgb(175, 215, 135), // #afd787 + normal_yellow: Color::Rgb(215, 175, 95), // #d7af5f + normal_blue: Color::Rgb(135, 175, 215), // #87afd7 + normal_magenta: Color::Rgb(215, 215, 175), // #d7d7af + normal_cyan: Color::Rgb(160, 160, 160), // #a0a0a0 + normal_white: Color::Rgb(238, 238, 238), // #eeeeee + // Bright colors - bright_black: Color::Rgb(48, 48, 48), // #303030 - bright_red: Color::Rgb(215, 84, 0), // #d75400 - bright_green: Color::Rgb(175, 215, 135), // #afd787 - bright_yellow: Color::Rgb(215, 175, 95), // #d7af5f - bright_blue: Color::Rgb(135, 175, 215), // #87afd7 - bright_magenta: Color::Rgb(215, 215, 175), // #d7d7af - bright_cyan: Color::Rgb(160, 160, 160), // #a0a0a0 - bright_white: Color::Rgb(255, 255, 255), // #ffffff - + bright_black: Color::Rgb(48, 48, 48), // #303030 + bright_red: Color::Rgb(215, 84, 0), // #d75400 + bright_green: Color::Rgb(175, 215, 135), // #afd787 + bright_yellow: Color::Rgb(215, 175, 95), // #d7af5f + bright_blue: Color::Rgb(135, 175, 215), // #87afd7 + bright_magenta: Color::Rgb(215, 215, 175), // #d7d7af + bright_cyan: Color::Rgb(160, 160, 160), // #a0a0a0 + bright_white: Color::Rgb(255, 255, 255), // #ffffff + // Dim colors - dim_black: Color::Rgb(0, 0, 0), // #000000 - dim_red: Color::Rgb(215, 84, 0), // #d75400 - dim_green: Color::Rgb(175, 215, 135), // #afd787 - dim_yellow: Color::Rgb(215, 175, 95), // #d7af5f - dim_blue: Color::Rgb(135, 175, 215), // #87afd7 - dim_magenta: Color::Rgb(215, 215, 175), // #d7d7af - dim_cyan: Color::Rgb(160, 160, 160), // #a0a0a0 - dim_white: Color::Rgb(221, 221, 221), // #dddddd + dim_black: Color::Rgb(0, 0, 0), // #000000 + dim_red: Color::Rgb(215, 84, 0), // #d75400 + dim_green: Color::Rgb(175, 215, 135), // #afd787 + dim_yellow: Color::Rgb(215, 175, 95), // #d7af5f + dim_blue: Color::Rgb(135, 175, 215), // #87afd7 + dim_magenta: Color::Rgb(215, 215, 175), // #d7d7af + dim_cyan: Color::Rgb(160, 160, 160), // #a0a0a0 + dim_white: Color::Rgb(221, 221, 221), // #dddddd } } } @@ -93,52 +93,52 @@ impl Theme { static COLORS: std::sync::OnceLock = std::sync::OnceLock::new(); COLORS.get_or_init(TerminalColors::default) } - + // Semantic color mapping using the terminal color struct pub fn primary_text() -> Color { Self::colors().normal_white } - + pub fn secondary_text() -> Color { Self::colors().foreground } - + pub fn muted_text() -> Color { Self::colors().dim_foreground } - + pub fn border() -> Color { Self::colors().dim_foreground } - + pub fn border_title() -> Color { Self::colors().bright_white } - + pub fn background() -> Color { Self::colors().background } - + pub fn success() -> Color { Self::colors().normal_green } - + pub fn warning() -> Color { Self::colors().normal_yellow } - + pub fn error() -> Color { Self::colors().normal_red } - + pub fn info() -> Color { Self::colors().normal_cyan } - + pub fn highlight() -> Color { Self::colors().normal_blue } - + /// Get color for status level pub fn status_color(status: Status) -> Color { match status { @@ -148,12 +148,12 @@ impl Theme { Status::Unknown => Self::muted_text(), } } - + /// Get style for status level pub fn status_style(status: Status) -> Style { Style::default().fg(Self::status_color(status)) } - + /// CPU usage colors using terminal color struct pub fn cpu_color(percentage: u16) -> Color { match percentage { @@ -164,7 +164,7 @@ impl Theme { _ => Self::colors().normal_red, // Over 100% } } - + /// Memory usage colors using terminal color struct pub fn memory_color(percentage: u16) -> Color { match percentage { @@ -175,7 +175,7 @@ impl Theme { _ => Self::colors().normal_red, // Over 100% } } - + /// Get gauge color based on percentage pub fn gauge_color(percentage: u16, warning_threshold: u16, critical_threshold: u16) -> Color { if percentage >= critical_threshold { @@ -186,25 +186,31 @@ impl Theme { Self::success() } } - + /// Widget border style pub fn widget_border_style() -> Style { Style::default().fg(Self::border()).bg(Self::background()) } - + /// Inactive widget border style pub fn widget_border_inactive_style() -> Style { - Style::default().fg(Self::muted_text()).bg(Self::background()) + Style::default() + .fg(Self::muted_text()) + .bg(Self::background()) } - + /// Title style pub fn title_style() -> Style { - Style::default().fg(Self::border_title()).bg(Self::background()) + Style::default() + .fg(Self::border_title()) + .bg(Self::background()) } - + /// Status bar style pub fn status_bar_style() -> Style { - Style::default().fg(Self::muted_text()).bg(Self::background()) + Style::default() + .fg(Self::muted_text()) + .bg(Self::background()) } } @@ -245,29 +251,29 @@ impl StatusIcons { } } - /// Create spans with status icon colored and text in foreground color pub fn create_status_spans(status: Status, text: &str) -> Vec> { let icon = Self::get_icon(status); let status_color = match status { - Status::Ok => Theme::success(), // Green - Status::Warning => Theme::warning(), // Yellow - Status::Critical => Theme::error(), // Red + Status::Ok => Theme::success(), // Green + Status::Warning => Theme::warning(), // Yellow + Status::Critical => Theme::error(), // Red Status::Unknown => Theme::muted_text(), // Gray }; - + vec![ ratatui::text::Span::styled( format!("{} ", icon), - Style::default().fg(status_color).bg(Theme::background()) + Style::default().fg(status_color).bg(Theme::background()), ), ratatui::text::Span::styled( text.to_string(), - Style::default().fg(Theme::secondary_text()).bg(Theme::background()) + Style::default() + .fg(Theme::secondary_text()) + .bg(Theme::background()), ), ] } - } impl Components { @@ -277,9 +283,12 @@ impl Components { .title(title) .borders(Borders::ALL) .style(Style::default().fg(Theme::border()).bg(Theme::background())) - .title_style(Style::default().fg(Theme::border_title()).bg(Theme::background())) + .title_style( + Style::default() + .fg(Theme::border_title()) + .bg(Theme::background()), + ) } - } impl Typography { @@ -289,7 +298,7 @@ impl Typography { .fg(Theme::primary_text()) .bg(Theme::background()) } - + /// Widget title style (panel headers) - bold bright white pub fn widget_title() -> Style { Style::default() @@ -297,14 +306,14 @@ impl Typography { .bg(Theme::background()) .add_modifier(Modifier::BOLD) } - + /// Secondary content text pub fn secondary() -> Style { Style::default() .fg(Theme::secondary_text()) .bg(Theme::background()) } - + /// Muted text (inactive items, placeholders) - now bold bright white for headers pub fn muted() -> Style { Style::default() @@ -312,5 +321,4 @@ impl Typography { .bg(Theme::background()) .add_modifier(Modifier::BOLD) } - } diff --git a/dashboard/src/ui/widgets/cpu.rs b/dashboard/src/ui/widgets/cpu.rs index dc9cdf0..0a64b53 100644 --- a/dashboard/src/ui/widgets/cpu.rs +++ b/dashboard/src/ui/widgets/cpu.rs @@ -7,7 +7,7 @@ use ratatui::{ use tracing::debug; use super::Widget; -use crate::ui::theme::{Typography, StatusIcons}; +use crate::ui::theme::{StatusIcons, Typography}; /// CPU widget displaying load, temperature, and frequency #[derive(Clone)] @@ -38,7 +38,7 @@ impl CpuWidget { has_data: false, } } - + /// Format load average for display fn format_load(&self) -> String { match (self.load_1min, self.load_5min, self.load_15min) { @@ -48,7 +48,7 @@ impl CpuWidget { _ => "— — —".to_string(), } } - + /// Format frequency for display fn format_frequency(&self) -> String { match self.frequency { @@ -56,16 +56,15 @@ impl CpuWidget { None => "— MHz".to_string(), } } - } impl Widget for CpuWidget { fn update_from_metrics(&mut self, metrics: &[&Metric]) { debug!("CPU widget updating with {} metrics", metrics.len()); - + // Reset status aggregation let mut statuses = Vec::new(); - + for metric in metrics { match metric.name.as_str() { "cpu_load_1min" => { @@ -101,33 +100,40 @@ impl Widget for CpuWidget { _ => {} } } - + // Aggregate status self.status = if statuses.is_empty() { Status::Unknown } else { Status::aggregate(&statuses) }; - + self.has_data = !metrics.is_empty(); - - debug!("CPU widget updated: load={:?}, temp={:?}, freq={:?}, status={:?}", - self.load_1min, self.temperature, self.frequency, self.status); + + debug!( + "CPU widget updated: load={:?}, temp={:?}, freq={:?}, status={:?}", + self.load_1min, self.temperature, self.frequency, self.status + ); } - + fn render(&mut self, frame: &mut Frame, area: Rect) { - let content_chunks = Layout::default().direction(Direction::Vertical).constraints([Constraint::Length(1), Constraint::Length(1)]).split(area); + let content_chunks = Layout::default() + .direction(Direction::Vertical) + .constraints([Constraint::Length(1), Constraint::Length(1)]) + .split(area); let cpu_title = Paragraph::new("CPU:").style(Typography::widget_title()); frame.render_widget(cpu_title, content_chunks[0]); - let load_freq_spans = StatusIcons::create_status_spans(self.status, &format!("Load: {} • {}", self.format_load(), self.format_frequency())); + let load_freq_spans = StatusIcons::create_status_spans( + self.status, + &format!("Load: {} • {}", self.format_load(), self.format_frequency()), + ); let load_freq_para = Paragraph::new(ratatui::text::Line::from(load_freq_spans)); frame.render_widget(load_freq_para, content_chunks[1]); } } - impl Default for CpuWidget { fn default() -> Self { Self::new() } -} \ No newline at end of file +} diff --git a/dashboard/src/ui/widgets/memory.rs b/dashboard/src/ui/widgets/memory.rs index b7a70e1..dd0b3f6 100644 --- a/dashboard/src/ui/widgets/memory.rs +++ b/dashboard/src/ui/widgets/memory.rs @@ -7,7 +7,7 @@ use ratatui::{ use tracing::debug; use super::Widget; -use crate::ui::theme::{Typography, StatusIcons}; +use crate::ui::theme::{StatusIcons, Typography}; /// Memory widget displaying usage, totals, and swap information #[derive(Clone)] @@ -52,8 +52,7 @@ impl MemoryWidget { has_data: false, } } - - + /// Get memory usage percentage for gauge fn get_memory_percentage(&self) -> u16 { match self.usage_percent { @@ -108,10 +107,8 @@ impl MemoryWidget { let total_str = Self::format_size_units(total_mb); format!("{}/{}", used_str, total_str) } - (None, Some(used_mb), None) => { - Self::format_size_units(used_mb) - } - _ => "—".to_string() + (None, Some(used_mb), None) => Self::format_size_units(used_mb), + _ => "—".to_string(), } } @@ -129,16 +126,15 @@ impl MemoryWidget { Status::Unknown } } - } impl Widget for MemoryWidget { fn update_from_metrics(&mut self, metrics: &[&Metric]) { debug!("Memory widget updating with {} metrics", metrics.len()); - + // Reset status aggregation let mut statuses = Vec::new(); - + for metric in metrics { match metric.name.as_str() { "memory_usage_percent" => { @@ -198,36 +194,53 @@ impl Widget for MemoryWidget { _ => {} } } - + // Aggregate status self.status = if statuses.is_empty() { Status::Unknown } else { Status::aggregate(&statuses) }; - + self.has_data = !metrics.is_empty(); - + debug!("Memory widget updated: usage={:?}%, total={:?}GB, swap_total={:?}GB, tmp={:?}/{:?}MB, status={:?}", self.usage_percent, self.total_gb, self.swap_total_gb, self.tmp_size_mb, self.tmp_total_mb, self.status); } - + fn render(&mut self, frame: &mut Frame, area: Rect) { - let content_chunks = Layout::default().direction(Direction::Vertical).constraints([Constraint::Length(1), Constraint::Length(1), Constraint::Length(1)]).split(area); + let content_chunks = Layout::default() + .direction(Direction::Vertical) + .constraints([ + Constraint::Length(1), + Constraint::Length(1), + Constraint::Length(1), + ]) + .split(area); let mem_title = Paragraph::new("RAM:").style(Typography::widget_title()); frame.render_widget(mem_title, content_chunks[0]); - + // Format used and total memory with smart units, percentage, and status icon - let used_str = self.used_gb.map_or("—".to_string(), |v| Self::format_size_units(v * 1024.0)); // Convert GB to MB for formatting - let total_str = self.total_gb.map_or("—".to_string(), |v| Self::format_size_units(v * 1024.0)); // Convert GB to MB for formatting + let used_str = self + .used_gb + .map_or("—".to_string(), |v| Self::format_size_units(v * 1024.0)); // Convert GB to MB for formatting + let total_str = self + .total_gb + .map_or("—".to_string(), |v| Self::format_size_units(v * 1024.0)); // Convert GB to MB for formatting let percentage = self.get_memory_percentage(); - let mem_details_spans = StatusIcons::create_status_spans(self.status, &format!("Used: {}% {}/{}", percentage, used_str, total_str)); + let mem_details_spans = StatusIcons::create_status_spans( + self.status, + &format!("Used: {}% {}/{}", percentage, used_str, total_str), + ); let mem_details_para = Paragraph::new(ratatui::text::Line::from(mem_details_spans)); frame.render_widget(mem_details_para, content_chunks[1]); - + // /tmp usage line with status icon let tmp_status = self.get_tmp_status(); - let tmp_spans = StatusIcons::create_status_spans(tmp_status, &format!("tmp: {}", self.format_tmp_usage())); + let tmp_spans = StatusIcons::create_status_spans( + tmp_status, + &format!("tmp: {}", self.format_tmp_usage()), + ); let tmp_para = Paragraph::new(ratatui::text::Line::from(tmp_spans)); frame.render_widget(tmp_para, content_chunks[2]); } @@ -237,4 +250,4 @@ impl Default for MemoryWidget { fn default() -> Self { Self::new() } -} \ No newline at end of file +} diff --git a/dashboard/src/ui/widgets/mod.rs b/dashboard/src/ui/widgets/mod.rs index a157009..24dc8a9 100644 --- a/dashboard/src/ui/widgets/mod.rs +++ b/dashboard/src/ui/widgets/mod.rs @@ -1,21 +1,21 @@ use cm_dashboard_shared::Metric; use ratatui::{layout::Rect, Frame}; +pub mod backup; pub mod cpu; pub mod memory; pub mod services; -pub mod backup; +pub use backup::BackupWidget; pub use cpu::CpuWidget; pub use memory::MemoryWidget; pub use services::ServicesWidget; -pub use backup::BackupWidget; /// Widget trait for UI components that display metrics pub trait Widget { /// Update widget with new metrics data fn update_from_metrics(&mut self, metrics: &[&Metric]); - + /// Render the widget to a terminal frame fn render(&mut self, frame: &mut Frame, area: Rect); -} \ No newline at end of file +} diff --git a/shared/src/cache.rs b/shared/src/cache.rs index 9677fa9..ede290b 100644 --- a/shared/src/cache.rs +++ b/shared/src/cache.rs @@ -24,29 +24,47 @@ pub struct CacheConfig { impl Default for CacheConfig { fn default() -> Self { let mut tiers = HashMap::new(); - tiers.insert("realtime".to_string(), CacheTier { - interval_seconds: 2, - description: "Memory/CPU operations - no disk I/O (CPU, memory, service CPU/RAM)".to_string(), - }); - tiers.insert("disk_light".to_string(), CacheTier { - interval_seconds: 60, - description: "Light disk operations - 1 minute (service status checks)".to_string(), - }); - tiers.insert("disk_medium".to_string(), CacheTier { - interval_seconds: 300, - description: "Medium disk operations - 5 minutes (disk usage, service disk)".to_string(), - }); - tiers.insert("disk_heavy".to_string(), CacheTier { - interval_seconds: 900, - description: "Heavy disk operations - 15 minutes (SMART data, backup status)".to_string(), - }); - tiers.insert("static".to_string(), CacheTier { - interval_seconds: 3600, - description: "Hardware info that rarely changes - 1 hour".to_string(), - }); + tiers.insert( + "realtime".to_string(), + CacheTier { + interval_seconds: 2, + description: "Memory/CPU operations - no disk I/O (CPU, memory, service CPU/RAM)" + .to_string(), + }, + ); + tiers.insert( + "disk_light".to_string(), + CacheTier { + interval_seconds: 10, + description: "Light disk operations - 10 seconds (service status checks)".to_string(), + }, + ); + tiers.insert( + "disk_medium".to_string(), + CacheTier { + interval_seconds: 60, + description: "Medium disk operations - 1 minute (disk usage, service disk)" + .to_string(), + }, + ); + tiers.insert( + "disk_heavy".to_string(), + CacheTier { + interval_seconds: 60, + description: "Heavy disk operations - 1 minute (backup status)" + .to_string(), + }, + ); + tiers.insert( + "static".to_string(), + CacheTier { + interval_seconds: 600, + description: "SMART data operations - 10 minutes".to_string(), + }, + ); let mut metric_assignments = HashMap::new(); - + // REALTIME (2s) - Memory/CPU operations, no disk I/O metric_assignments.insert("cpu_load_*".to_string(), "realtime".to_string()); metric_assignments.insert("cpu_temperature_*".to_string(), "realtime".to_string()); @@ -55,22 +73,24 @@ impl Default for CacheConfig { metric_assignments.insert("service_*_cpu_percent".to_string(), "realtime".to_string()); metric_assignments.insert("service_*_memory_mb".to_string(), "realtime".to_string()); metric_assignments.insert("network_*".to_string(), "realtime".to_string()); - + // DISK_LIGHT (1min) - Light disk operations: service status checks metric_assignments.insert("service_*_status".to_string(), "disk_light".to_string()); - + // DISK_MEDIUM (5min) - Medium disk operations: du commands, disk usage metric_assignments.insert("service_*_disk_gb".to_string(), "disk_medium".to_string()); metric_assignments.insert("disk_tmp_*".to_string(), "disk_medium".to_string()); metric_assignments.insert("disk_*_usage_*".to_string(), "disk_medium".to_string()); metric_assignments.insert("disk_*_size_*".to_string(), "disk_medium".to_string()); - - // DISK_HEAVY (15min) - Heavy disk operations: SMART data, backup status - metric_assignments.insert("disk_*_temperature".to_string(), "disk_heavy".to_string()); - metric_assignments.insert("disk_*_wear_percent".to_string(), "disk_heavy".to_string()); - metric_assignments.insert("smart_*".to_string(), "disk_heavy".to_string()); + + // DISK_HEAVY (1min) - Heavy disk operations: backup status metric_assignments.insert("backup_*".to_string(), "disk_heavy".to_string()); + // STATIC (10min) - SMART data operations + metric_assignments.insert("disk_*_temperature".to_string(), "static".to_string()); + metric_assignments.insert("disk_*_wear_percent".to_string(), "static".to_string()); + metric_assignments.insert("smart_*".to_string(), "static".to_string()); + Self { enabled: true, default_ttl_seconds: 30, @@ -101,11 +121,11 @@ impl CacheConfig { if pattern.contains('*') { // Convert pattern to regex-like matching let pattern_parts: Vec<&str> = pattern.split('*').collect(); - + if pattern_parts.len() == 2 { let prefix = pattern_parts[0]; let suffix = pattern_parts[1]; - + if suffix.is_empty() { // Pattern like "cpu_*" - just check prefix metric_name.starts_with(prefix) @@ -118,9 +138,9 @@ impl CacheConfig { } } else { // More complex patterns - for now, just check if all parts are present - pattern_parts.iter().all(|part| { - part.is_empty() || metric_name.contains(part) - }) + pattern_parts + .iter() + .all(|part| part.is_empty() || metric_name.contains(part)) } } else { metric_name == pattern @@ -142,7 +162,7 @@ mod tests { #[test] fn test_pattern_matching() { let config = CacheConfig::default(); - + assert!(config.matches_pattern("cpu_load_1min", "cpu_load_*")); assert!(config.matches_pattern("service_nginx_disk_gb", "service_*_disk_gb")); assert!(!config.matches_pattern("memory_usage_percent", "cpu_load_*")); @@ -151,21 +171,21 @@ mod tests { #[test] fn test_tier_assignment() { let config = CacheConfig::default(); - + // Realtime (2s) - CPU/Memory operations assert_eq!(config.get_cache_interval("cpu_load_1min"), 2); assert_eq!(config.get_cache_interval("memory_usage_percent"), 2); assert_eq!(config.get_cache_interval("service_nginx_cpu_percent"), 2); - - // Disk light (60s) - Service status - assert_eq!(config.get_cache_interval("service_nginx_status"), 60); - - // Disk medium (300s) - Disk usage - assert_eq!(config.get_cache_interval("service_nginx_disk_gb"), 300); - assert_eq!(config.get_cache_interval("disk_tmp_usage_percent"), 300); - - // Disk heavy (900s) - SMART data - assert_eq!(config.get_cache_interval("disk_nvme0_temperature"), 900); - assert_eq!(config.get_cache_interval("smart_nvme0_wear_percent"), 900); + + // Disk light (10s) - Service status + assert_eq!(config.get_cache_interval("service_nginx_status"), 10); + + // Disk medium (60s) - Disk usage + assert_eq!(config.get_cache_interval("service_nginx_disk_gb"), 60); + assert_eq!(config.get_cache_interval("disk_tmp_usage_percent"), 60); + + // Static (600s) - SMART data + assert_eq!(config.get_cache_interval("disk_nvme0_temperature"), 600); + assert_eq!(config.get_cache_interval("smart_nvme0_wear_percent"), 600); } -} \ No newline at end of file +} diff --git a/shared/src/error.rs b/shared/src/error.rs index af909e5..ca1cbf1 100644 --- a/shared/src/error.rs +++ b/shared/src/error.rs @@ -4,10 +4,10 @@ use thiserror::Error; pub enum SharedError { #[error("Serialization error: {message}")] Serialization { message: String }, - + #[error("Invalid metric value: {message}")] InvalidMetric { message: String }, - + #[error("Protocol error: {message}")] Protocol { message: String }, } @@ -18,4 +18,4 @@ impl From for SharedError { message: err.to_string(), } } -} \ No newline at end of file +} diff --git a/shared/src/lib.rs b/shared/src/lib.rs index cc50871..9b5dbf9 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -6,4 +6,4 @@ pub mod protocol; pub use cache::*; pub use error::*; pub use metrics::*; -pub use protocol::*; \ No newline at end of file +pub use protocol::*; diff --git a/shared/src/metrics.rs b/shared/src/metrics.rs index fee8d5c..0d6a38a 100644 --- a/shared/src/metrics.rs +++ b/shared/src/metrics.rs @@ -1,5 +1,6 @@ -use serde::{Deserialize, Serialize}; use chrono::Utc; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; /// Individual metric with value, status, and metadata #[derive(Debug, Clone, Serialize, Deserialize)] @@ -23,12 +24,12 @@ impl Metric { unit: None, } } - + pub fn with_description(mut self, description: String) -> Self { self.description = Some(description); self } - + pub fn with_unit(mut self, unit: String) -> Self { self.unit = Some(unit); self @@ -52,7 +53,7 @@ impl MetricValue { _ => None, } } - + pub fn as_i64(&self) -> Option { match self { MetricValue::Integer(i) => Some(*i), @@ -60,7 +61,7 @@ impl MetricValue { _ => None, } } - + pub fn as_string(&self) -> String { match self { MetricValue::String(s) => s.clone(), @@ -69,7 +70,7 @@ impl MetricValue { MetricValue::Boolean(b) => b.to_string(), } } - + pub fn as_bool(&self) -> Option { match self { MetricValue::Boolean(b) => Some(*b), @@ -100,6 +101,118 @@ impl Default for Status { } } +/// Hysteresis thresholds for preventing status flapping +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HysteresisThresholds { + /// Warning threshold - trigger warning when value >= this + pub warning_high: f32, + /// Warning recovery - return to ok when value < this + pub warning_low: f32, + /// Critical threshold - trigger critical when value >= this + pub critical_high: f32, + /// Critical recovery - return to warning when value < this + pub critical_low: f32, +} + +impl HysteresisThresholds { + pub fn new(warning_high: f32, critical_high: f32) -> Self { + // Default hysteresis: 10% gap for recovery + let warning_gap = warning_high * 0.1; + let critical_gap = critical_high * 0.1; + + Self { + warning_high, + warning_low: warning_high - warning_gap, + critical_high, + critical_low: critical_high - critical_gap, + } + } + + pub fn with_custom_gaps(warning_high: f32, warning_gap: f32, critical_high: f32, critical_gap: f32) -> Self { + Self { + warning_high, + warning_low: warning_high - warning_gap, + critical_high, + critical_low: critical_high - critical_gap, + } + } + + /// Calculate status with hysteresis based on current value and previous status + pub fn calculate_status(&self, value: f32, previous_status: Status) -> Status { + match previous_status { + Status::Ok => { + if value >= self.critical_high { + Status::Critical + } else if value >= self.warning_high { + Status::Warning + } else { + Status::Ok + } + } + Status::Warning => { + if value >= self.critical_high { + Status::Critical + } else if value < self.warning_low { + Status::Ok + } else { + Status::Warning + } + } + Status::Critical => { + if value < self.critical_low { + if value < self.warning_low { + Status::Ok + } else { + Status::Warning + } + } else { + Status::Critical + } + } + Status::Unknown => { + // First measurement, use normal thresholds + if value >= self.critical_high { + Status::Critical + } else if value >= self.warning_high { + Status::Warning + } else { + Status::Ok + } + } + } + } +} + +/// Status tracker for hysteresis - tracks previous status per metric +#[derive(Debug, Default)] +pub struct StatusTracker { + previous_statuses: HashMap, +} + +impl StatusTracker { + pub fn new() -> Self { + Self::default() + } + + /// Get previous status for a metric + pub fn get_previous_status(&self, metric_name: &str) -> Status { + self.previous_statuses.get(metric_name).copied().unwrap_or(Status::Unknown) + } + + /// Update status for a metric + pub fn update_status(&mut self, metric_name: String, status: Status) { + self.previous_statuses.insert(metric_name, status); + } + + /// Calculate status with hysteresis + pub fn calculate_with_hysteresis(&mut self, metric_name: &str, value: f32, thresholds: &HysteresisThresholds) -> Status { + let previous = self.get_previous_status(metric_name); + let new_status = thresholds.calculate_status(value, previous); + self.update_status(metric_name.to_string(), new_status); + new_status + } +} + /// Metric name registry - constants for all metric names pub mod registry { // CPU metrics @@ -109,7 +222,7 @@ pub mod registry { pub const CPU_TEMPERATURE_CELSIUS: &str = "cpu_temperature_celsius"; pub const CPU_FREQUENCY_MHZ: &str = "cpu_frequency_mhz"; pub const CPU_USAGE_PERCENT: &str = "cpu_usage_percent"; - + // Memory metrics pub const MEMORY_USAGE_PERCENT: &str = "memory_usage_percent"; pub const MEMORY_TOTAL_GB: &str = "memory_total_gb"; @@ -117,7 +230,7 @@ pub mod registry { pub const MEMORY_AVAILABLE_GB: &str = "memory_available_gb"; pub const MEMORY_SWAP_TOTAL_GB: &str = "memory_swap_total_gb"; pub const MEMORY_SWAP_USED_GB: &str = "memory_swap_used_gb"; - + // Disk metrics (template - actual names include device) pub const DISK_USAGE_PERCENT_TEMPLATE: &str = "disk_{device}_usage_percent"; pub const DISK_TEMPERATURE_CELSIUS_TEMPLATE: &str = "disk_{device}_temperature_celsius"; @@ -125,37 +238,37 @@ pub mod registry { pub const DISK_SPARE_PERCENT_TEMPLATE: &str = "disk_{device}_spare_percent"; pub const DISK_HOURS_TEMPLATE: &str = "disk_{device}_hours"; pub const DISK_CAPACITY_GB_TEMPLATE: &str = "disk_{device}_capacity_gb"; - + // Service metrics (template - actual names include service) pub const SERVICE_STATUS_TEMPLATE: &str = "service_{name}_status"; pub const SERVICE_MEMORY_MB_TEMPLATE: &str = "service_{name}_memory_mb"; pub const SERVICE_CPU_PERCENT_TEMPLATE: &str = "service_{name}_cpu_percent"; - + // Backup metrics pub const BACKUP_STATUS: &str = "backup_status"; pub const BACKUP_LAST_RUN_TIMESTAMP: &str = "backup_last_run_timestamp"; pub const BACKUP_SIZE_GB: &str = "backup_size_gb"; pub const BACKUP_DURATION_MINUTES: &str = "backup_duration_minutes"; pub const BACKUP_NEXT_SCHEDULED_TIMESTAMP: &str = "backup_next_scheduled_timestamp"; - + // Network metrics (template - actual names include interface) pub const NETWORK_RX_BYTES_TEMPLATE: &str = "network_{interface}_rx_bytes"; pub const NETWORK_TX_BYTES_TEMPLATE: &str = "network_{interface}_tx_bytes"; pub const NETWORK_RX_PACKETS_TEMPLATE: &str = "network_{interface}_rx_packets"; pub const NETWORK_TX_PACKETS_TEMPLATE: &str = "network_{interface}_tx_packets"; - + /// Generate disk metric name from template pub fn disk_metric(template: &str, device: &str) -> String { template.replace("{device}", device) } - + /// Generate service metric name from template pub fn service_metric(template: &str, name: &str) -> String { template.replace("{name}", name) } - + /// Generate network metric name from template pub fn network_metric(template: &str, interface: &str) -> String { template.replace("{interface}", interface) } -} \ No newline at end of file +} diff --git a/shared/src/protocol.rs b/shared/src/protocol.rs index b6f21a7..da2e9a1 100644 --- a/shared/src/protocol.rs +++ b/shared/src/protocol.rs @@ -1,5 +1,5 @@ -use serde::{Deserialize, Serialize}; use crate::metrics::Metric; +use serde::{Deserialize, Serialize}; /// Message sent from agent to dashboard via ZMQ #[derive(Debug, Clone, Serialize, Deserialize)] @@ -65,28 +65,28 @@ impl MessageEnvelope { payload: serde_json::to_vec(&message)?, }) } - + pub fn command(command: Command) -> Result { Ok(Self { message_type: MessageType::Command, payload: serde_json::to_vec(&command)?, }) } - + pub fn command_response(response: CommandResponse) -> Result { Ok(Self { message_type: MessageType::CommandResponse, payload: serde_json::to_vec(&response)?, }) } - + pub fn heartbeat() -> Result { Ok(Self { message_type: MessageType::Heartbeat, payload: Vec::new(), }) } - + pub fn decode_metrics(&self) -> Result { match self.message_type { MessageType::Metrics => Ok(serde_json::from_slice(&self.payload)?), @@ -95,7 +95,7 @@ impl MessageEnvelope { }), } } - + pub fn decode_command(&self) -> Result { match self.message_type { MessageType::Command => Ok(serde_json::from_slice(&self.payload)?), @@ -104,7 +104,7 @@ impl MessageEnvelope { }), } } - + pub fn decode_command_response(&self) -> Result { match self.message_type { MessageType::CommandResponse => Ok(serde_json::from_slice(&self.payload)?), @@ -113,4 +113,4 @@ impl MessageEnvelope { }), } } -} \ No newline at end of file +}