use anyhow::Result; use gethostname::gethostname; use std::time::{Duration, Instant}; use tokio::time::interval; use tracing::{debug, error, info}; use crate::communication::ZmqHandler; use crate::config::AgentConfig; use crate::collectors::{ Collector, backup::BackupCollector, cpu::CpuCollector, disk::DiskCollector, memory::MemoryCollector, network::NetworkCollector, nixos::NixOSCollector, systemd::SystemdCollector, }; use crate::notifications::NotificationManager; use cm_dashboard_shared::AgentData; /// Wrapper for collectors with timing information struct TimedCollector { collector: Box, interval: Duration, last_collection: Option, name: String, } pub struct Agent { hostname: String, config: AgentConfig, zmq_handler: ZmqHandler, collectors: Vec, notification_manager: NotificationManager, previous_status: Option, } /// Track system component status for change detection #[derive(Debug, Clone)] struct SystemStatus { cpu_load_status: cm_dashboard_shared::Status, cpu_temperature_status: cm_dashboard_shared::Status, memory_usage_status: cm_dashboard_shared::Status, // Add more as needed } impl Agent { pub async fn new(config_path: Option) -> Result { let hostname = gethostname().to_string_lossy().to_string(); info!("Initializing agent for host: {}", hostname); // Load configuration (now required) let config_path = config_path.ok_or_else(|| anyhow::anyhow!("Configuration file path is required"))?; let config = AgentConfig::from_file(&config_path)?; info!("Agent configuration loaded"); // Initialize ZMQ communication let zmq_handler = ZmqHandler::new(&config.zmq).await?; info!( "ZMQ communication initialized on port {}", config.zmq.publisher_port ); // Initialize collectors with timing information let mut collectors: Vec = Vec::new(); // Add enabled collectors if config.collectors.cpu.enabled { collectors.push(TimedCollector { collector: Box::new(CpuCollector::new(config.collectors.cpu.clone())), interval: Duration::from_secs(config.collectors.cpu.interval_seconds), last_collection: None, name: "CPU".to_string(), }); info!("CPU collector initialized with {}s interval", config.collectors.cpu.interval_seconds); } if config.collectors.memory.enabled { collectors.push(TimedCollector { collector: Box::new(MemoryCollector::new(config.collectors.memory.clone())), interval: Duration::from_secs(config.collectors.memory.interval_seconds), last_collection: None, name: "Memory".to_string(), }); info!("Memory collector initialized with {}s interval", config.collectors.memory.interval_seconds); } if config.collectors.disk.enabled { collectors.push(TimedCollector { collector: Box::new(DiskCollector::new(config.collectors.disk.clone())), interval: Duration::from_secs(config.collectors.disk.interval_seconds), last_collection: None, name: "Disk".to_string(), }); info!("Disk collector initialized with {}s interval", config.collectors.disk.interval_seconds); } if config.collectors.systemd.enabled { collectors.push(TimedCollector { collector: Box::new(SystemdCollector::new(config.collectors.systemd.clone())), interval: Duration::from_secs(config.collectors.systemd.interval_seconds), last_collection: None, name: "Systemd".to_string(), }); info!("Systemd collector initialized with {}s interval", config.collectors.systemd.interval_seconds); } if config.collectors.backup.enabled { collectors.push(TimedCollector { collector: Box::new(BackupCollector::new()), interval: Duration::from_secs(config.collectors.backup.interval_seconds), last_collection: None, name: "Backup".to_string(), }); info!("Backup collector initialized with {}s interval", config.collectors.backup.interval_seconds); } if config.collectors.network.enabled { collectors.push(TimedCollector { collector: Box::new(NetworkCollector::new(config.collectors.network.clone())), interval: Duration::from_secs(config.collectors.network.interval_seconds), last_collection: None, name: "Network".to_string(), }); info!("Network collector initialized with {}s interval", config.collectors.network.interval_seconds); } if config.collectors.nixos.enabled { collectors.push(TimedCollector { collector: Box::new(NixOSCollector::new(config.collectors.nixos.clone())), interval: Duration::from_secs(config.collectors.nixos.interval_seconds), last_collection: None, name: "NixOS".to_string(), }); info!("NixOS collector initialized with {}s interval", config.collectors.nixos.interval_seconds); } info!("Initialized {} collectors", collectors.len()); // Initialize notification manager let notification_manager = NotificationManager::new(&config.notifications, &hostname)?; info!("Notification manager initialized"); Ok(Self { hostname, config, zmq_handler, collectors, notification_manager, previous_status: None, }) } /// Main agent loop with structured data collection pub async fn run(&mut self, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>) -> Result<()> { info!("Starting agent main loop"); // Initial collection if let Err(e) = self.collect_and_broadcast().await { error!("Initial metric collection failed: {}", e); } // Set up intervals let mut transmission_interval = interval(Duration::from_secs( self.config.zmq.transmission_interval_seconds, )); let mut notification_interval = interval(Duration::from_secs(30)); // Check notifications every 30s // Skip initial ticks to avoid immediate execution transmission_interval.tick().await; notification_interval.tick().await; loop { tokio::select! { _ = transmission_interval.tick() => { if let Err(e) = self.collect_and_broadcast().await { error!("Failed to collect and broadcast metrics: {}", e); } } _ = notification_interval.tick() => { // Process any pending notifications // NOTE: With structured data, we might need to implement status tracking differently // For now, we skip this until status evaluation is migrated } _ = &mut shutdown_rx => { info!("Shutdown signal received, stopping agent loop"); break; } } } info!("Agent main loop stopped"); Ok(()) } /// Collect structured data from all collectors and broadcast via ZMQ async fn collect_and_broadcast(&mut self) -> Result<()> { debug!("Starting structured data collection"); // Initialize empty AgentData let mut agent_data = AgentData::new(self.hostname.clone(), env!("CARGO_PKG_VERSION").to_string()); // Collect data from collectors whose intervals have elapsed let now = Instant::now(); for timed_collector in &mut self.collectors { let should_collect = match timed_collector.last_collection { None => true, // First collection Some(last_time) => now.duration_since(last_time) >= timed_collector.interval, }; if should_collect { if let Err(e) = timed_collector.collector.collect_structured(&mut agent_data).await { error!("Collector {} failed: {}", timed_collector.name, e); // Update last_collection time even on failure to prevent immediate retries timed_collector.last_collection = Some(now); } else { timed_collector.last_collection = Some(now); debug!( "Collected from {} ({}s interval)", timed_collector.name, timed_collector.interval.as_secs() ); } } } // Check for status changes and send notifications if let Err(e) = self.check_status_changes_and_notify(&agent_data).await { error!("Failed to check status changes: {}", e); } // Broadcast the structured data via ZMQ if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data).await { error!("Failed to broadcast agent data: {}", e); } else { debug!("Successfully broadcast structured agent data"); } Ok(()) } /// Check for status changes and send notifications async fn check_status_changes_and_notify(&mut self, agent_data: &AgentData) -> Result<()> { // Extract current status let current_status = SystemStatus { cpu_load_status: agent_data.system.cpu.load_status.clone(), cpu_temperature_status: agent_data.system.cpu.temperature_status.clone(), memory_usage_status: agent_data.system.memory.usage_status.clone(), }; // Check for status changes if let Some(previous) = self.previous_status.clone() { self.check_and_notify_status_change( "CPU Load", &previous.cpu_load_status, ¤t_status.cpu_load_status, format!("CPU load: {:.1}", agent_data.system.cpu.load_1min) ).await?; self.check_and_notify_status_change( "CPU Temperature", &previous.cpu_temperature_status, ¤t_status.cpu_temperature_status, format!("CPU temperature: {}°C", agent_data.system.cpu.temperature_celsius.unwrap_or(0.0) as i32) ).await?; self.check_and_notify_status_change( "Memory Usage", &previous.memory_usage_status, ¤t_status.memory_usage_status, format!("Memory usage: {:.1}%", agent_data.system.memory.usage_percent) ).await?; } // Store current status for next comparison self.previous_status = Some(current_status); Ok(()) } /// Check individual status change and send notification if degraded async fn check_and_notify_status_change( &mut self, component: &str, previous: &cm_dashboard_shared::Status, current: &cm_dashboard_shared::Status, details: String ) -> Result<()> { use cm_dashboard_shared::Status; // Only notify on status degradation (OK → Warning/Critical, Warning → Critical) let should_notify = match (previous, current) { (Status::Ok, Status::Warning) => true, (Status::Ok, Status::Critical) => true, (Status::Warning, Status::Critical) => true, _ => false, }; if should_notify { let subject = format!("{} {} Alert", self.hostname, component); let body = format!( "Alert: {} status changed from {:?} to {:?}\n\nDetails: {}\n\nTime: {}", component, previous, current, details, chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC") ); info!("Sending notification: {} - {:?} → {:?}", component, previous, current); if let Err(e) = self.notification_manager.send_direct_email(&subject, &body).await { error!("Failed to send notification for {}: {}", component, e); } } Ok(()) } }