use anyhow::Result; use gethostname::gethostname; use std::time::Duration; use tokio::time::interval; use tracing::{debug, error, info}; use crate::communication::{AgentCommand, ZmqHandler}; use crate::config::AgentConfig; use crate::collectors::{ Collector, backup::BackupCollector, cpu::CpuCollector, disk::DiskCollector, memory::MemoryCollector, nixos::NixOSCollector, systemd::SystemdCollector, }; use crate::notifications::NotificationManager; use crate::service_tracker::UserStoppedServiceTracker; use cm_dashboard_shared::AgentData; pub struct Agent { hostname: String, config: AgentConfig, zmq_handler: ZmqHandler, collectors: Vec>, notification_manager: NotificationManager, service_tracker: UserStoppedServiceTracker, previous_status: Option, } /// Track system component status for change detection #[derive(Debug, Clone)] struct SystemStatus { cpu_load_status: cm_dashboard_shared::Status, cpu_temperature_status: cm_dashboard_shared::Status, memory_usage_status: cm_dashboard_shared::Status, // Add more as needed } impl Agent { pub async fn new(config_path: Option) -> Result { let hostname = gethostname().to_string_lossy().to_string(); info!("Initializing agent for host: {}", hostname); // Load configuration (now required) let config_path = config_path.ok_or_else(|| anyhow::anyhow!("Configuration file path is required"))?; let config = AgentConfig::from_file(&config_path)?; info!("Agent configuration loaded"); // Initialize ZMQ communication let zmq_handler = ZmqHandler::new(&config.zmq).await?; info!( "ZMQ communication initialized on port {}", config.zmq.publisher_port ); // Initialize collectors let mut collectors: Vec> = Vec::new(); // Add enabled collectors if config.collectors.cpu.enabled { collectors.push(Box::new(CpuCollector::new(config.collectors.cpu.clone()))); } if config.collectors.memory.enabled { collectors.push(Box::new(MemoryCollector::new(config.collectors.memory.clone()))); } if config.collectors.disk.enabled { collectors.push(Box::new(DiskCollector::new(config.collectors.disk.clone()))); } if config.collectors.systemd.enabled { collectors.push(Box::new(SystemdCollector::new(config.collectors.systemd.clone()))); } if config.collectors.backup.enabled { collectors.push(Box::new(BackupCollector::new())); } if config.collectors.nixos.enabled { collectors.push(Box::new(NixOSCollector::new(config.collectors.nixos.clone()))); } info!("Initialized {} collectors", collectors.len()); // Initialize notification manager let notification_manager = NotificationManager::new(&config.notifications, &hostname)?; info!("Notification manager initialized"); // Initialize service tracker let service_tracker = UserStoppedServiceTracker::new(); info!("Service tracker initialized"); Ok(Self { hostname, config, zmq_handler, collectors, notification_manager, service_tracker, previous_status: None, }) } /// Main agent loop with structured data collection pub async fn run(&mut self, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>) -> Result<()> { info!("Starting agent main loop"); // Initial collection if let Err(e) = self.collect_and_broadcast().await { error!("Initial metric collection failed: {}", e); } // Set up intervals let mut transmission_interval = interval(Duration::from_secs( self.config.collection_interval_seconds, )); let mut notification_interval = interval(Duration::from_secs(30)); // Check notifications every 30s // Skip initial ticks to avoid immediate execution transmission_interval.tick().await; notification_interval.tick().await; loop { tokio::select! { _ = transmission_interval.tick() => { if let Err(e) = self.collect_and_broadcast().await { error!("Failed to collect and broadcast metrics: {}", e); } } _ = notification_interval.tick() => { // Process any pending notifications // NOTE: With structured data, we might need to implement status tracking differently // For now, we skip this until status evaluation is migrated } // Handle incoming commands (check periodically) _ = tokio::time::sleep(Duration::from_millis(100)) => { if let Err(e) = self.handle_commands().await { error!("Error handling commands: {}", e); } } _ = &mut shutdown_rx => { info!("Shutdown signal received, stopping agent loop"); break; } } } info!("Agent main loop stopped"); Ok(()) } /// Collect structured data from all collectors and broadcast via ZMQ async fn collect_and_broadcast(&mut self) -> Result<()> { debug!("Starting structured data collection"); // Initialize empty AgentData let mut agent_data = AgentData::new(self.hostname.clone(), env!("CARGO_PKG_VERSION").to_string()); // Collect data from all collectors for collector in &self.collectors { if let Err(e) = collector.collect_structured(&mut agent_data).await { error!("Collector failed: {}", e); // Continue with other collectors even if one fails } } // Check for status changes and send notifications if let Err(e) = self.check_status_changes_and_notify(&agent_data).await { error!("Failed to check status changes: {}", e); } // Broadcast the structured data via ZMQ if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data).await { error!("Failed to broadcast agent data: {}", e); } else { debug!("Successfully broadcast structured agent data"); } Ok(()) } /// Check for status changes and send notifications async fn check_status_changes_and_notify(&mut self, agent_data: &AgentData) -> Result<()> { // Extract current status let current_status = SystemStatus { cpu_load_status: agent_data.system.cpu.load_status.clone(), cpu_temperature_status: agent_data.system.cpu.temperature_status.clone(), memory_usage_status: agent_data.system.memory.usage_status.clone(), }; // Check for status changes if let Some(previous) = self.previous_status.clone() { self.check_and_notify_status_change( "CPU Load", &previous.cpu_load_status, ¤t_status.cpu_load_status, format!("CPU load: {:.1}", agent_data.system.cpu.load_1min) ).await?; self.check_and_notify_status_change( "CPU Temperature", &previous.cpu_temperature_status, ¤t_status.cpu_temperature_status, format!("CPU temperature: {}°C", agent_data.system.cpu.temperature_celsius.unwrap_or(0.0) as i32) ).await?; self.check_and_notify_status_change( "Memory Usage", &previous.memory_usage_status, ¤t_status.memory_usage_status, format!("Memory usage: {:.1}%", agent_data.system.memory.usage_percent) ).await?; } // Store current status for next comparison self.previous_status = Some(current_status); Ok(()) } /// Check individual status change and send notification if degraded async fn check_and_notify_status_change( &mut self, component: &str, previous: &cm_dashboard_shared::Status, current: &cm_dashboard_shared::Status, details: String ) -> Result<()> { use cm_dashboard_shared::Status; // Only notify on status degradation (OK → Warning/Critical, Warning → Critical) let should_notify = match (previous, current) { (Status::Ok, Status::Warning) => true, (Status::Ok, Status::Critical) => true, (Status::Warning, Status::Critical) => true, _ => false, }; if should_notify { let subject = format!("{} {} Alert", self.hostname, component); let body = format!( "Alert: {} status changed from {:?} to {:?}\n\nDetails: {}\n\nTime: {}", component, previous, current, details, chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC") ); info!("Sending notification: {} - {:?} → {:?}", component, previous, current); if let Err(e) = self.notification_manager.send_direct_email(&subject, &body).await { error!("Failed to send notification for {}: {}", component, e); } } Ok(()) } /// Handle incoming commands from dashboard async fn handle_commands(&mut self) -> Result<()> { // Try to receive a command (non-blocking) if let Ok(Some(command)) = self.zmq_handler.try_receive_command() { info!("Received command: {:?}", command); match command { AgentCommand::CollectNow => { info!("Received immediate collection request"); if let Err(e) = self.collect_and_broadcast().await { error!("Failed to collect on demand: {}", e); } } AgentCommand::SetInterval { seconds } => { info!("Received interval change request: {}s", seconds); // Note: This would require more complex handling to update the interval // For now, just acknowledge } AgentCommand::ToggleCollector { name, enabled } => { info!("Received collector toggle request: {} -> {}", name, enabled); // Note: This would require more complex handling to enable/disable collectors // For now, just acknowledge } AgentCommand::Ping => { info!("Received ping command"); // Maybe send back a pong or status } } } Ok(()) } }