use anyhow::Result; use std::time::Duration; use tokio::time::interval; use tracing::{info, error, debug}; use gethostname::gethostname; use crate::config::AgentConfig; use crate::communication::{ZmqHandler, AgentCommand}; use crate::metrics::MetricCollectionManager; use crate::notifications::NotificationManager; use cm_dashboard_shared::{Metric, MetricMessage}; pub struct Agent { hostname: String, config: AgentConfig, zmq_handler: ZmqHandler, metric_manager: MetricCollectionManager, notification_manager: NotificationManager, } impl Agent { pub async fn new(config_path: Option) -> Result { let hostname = gethostname().to_string_lossy().to_string(); info!("Initializing agent for host: {}", hostname); // Load configuration let config = if let Some(path) = config_path { AgentConfig::load_from_file(&path)? } else { AgentConfig::default() }; info!("Agent configuration loaded"); // Initialize ZMQ communication let zmq_handler = ZmqHandler::new(&config.zmq).await?; info!("ZMQ communication initialized on port {}", config.zmq.publisher_port); // Initialize metric collection manager with cache config let metric_manager = MetricCollectionManager::new(&config.collectors, &config).await?; info!("Metric collection manager initialized"); // Initialize notification manager let notification_manager = NotificationManager::new(&config.notifications, &hostname)?; info!("Notification manager initialized"); Ok(Self { hostname, config, zmq_handler, metric_manager, notification_manager, }) } pub async fn run(&mut self, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>) -> Result<()> { info!("Starting agent main loop with separated collection and transmission"); // CRITICAL: Collect ALL data immediately at startup before entering the loop info!("Performing initial FORCE collection of all metrics at startup"); if let Err(e) = self.collect_all_metrics_force().await { error!("Failed to collect initial metrics: {}", e); } else { info!("Initial metric collection completed - all data cached and ready"); } // Separate intervals for collection and transmission let mut collection_interval = interval(Duration::from_secs(self.config.collection_interval_seconds)); let mut transmission_interval = interval(Duration::from_secs(1)); // ZMQ broadcast every 1 second let mut notification_check_interval = interval(Duration::from_secs(30)); // Check notifications every 30s loop { tokio::select! { _ = collection_interval.tick() => { // Only collect and cache metrics, no ZMQ transmission if let Err(e) = self.collect_metrics_only().await { error!("Failed to collect metrics: {}", e); } } _ = transmission_interval.tick() => { // Send all cached metrics via ZMQ every 1 second if let Err(e) = self.broadcast_all_cached_metrics().await { error!("Failed to broadcast cached metrics: {}", e); } } _ = notification_check_interval.tick() => { // Handle any pending notifications self.notification_manager.process_pending().await; } // Handle incoming commands (check periodically) _ = tokio::time::sleep(Duration::from_millis(100)) => { if let Err(e) = self.handle_commands().await { error!("Error handling commands: {}", e); } } _ = &mut shutdown_rx => { info!("Shutdown signal received, stopping agent loop"); break; } } } info!("Agent main loop stopped"); Ok(()) } async fn collect_all_metrics_force(&mut self) -> Result<()> { info!("Starting FORCE metric collection for startup"); // Force collect all metrics from all collectors immediately let metrics = self.metric_manager.collect_all_metrics_force().await?; if metrics.is_empty() { error!("No metrics collected during force collection!"); return Ok(()); } info!("Force collected and cached {} metrics", metrics.len()); // Check for status changes and send notifications self.check_status_changes(&metrics).await; Ok(()) } async fn collect_metrics_only(&mut self) -> Result<()> { debug!("Starting metric collection cycle (cache only)"); // Collect all metrics from all collectors and cache them let metrics = self.metric_manager.collect_all_metrics().await?; if metrics.is_empty() { debug!("No metrics collected this cycle"); return Ok(()); } debug!("Collected and cached {} metrics", metrics.len()); // Check for status changes and send notifications self.check_status_changes(&metrics).await; Ok(()) } async fn broadcast_all_cached_metrics(&mut self) -> Result<()> { debug!("Broadcasting all cached metrics via ZMQ"); // Get all cached metrics from the metric manager let cached_metrics = self.metric_manager.get_all_cached_metrics().await?; if cached_metrics.is_empty() { debug!("No cached metrics to broadcast"); return Ok(()); } debug!("Broadcasting {} cached metrics", cached_metrics.len()); // Create and send message with all cached data let message = MetricMessage::new(self.hostname.clone(), cached_metrics); self.zmq_handler.publish_metrics(&message).await?; debug!("Cached metrics broadcasted successfully"); Ok(()) } async fn check_status_changes(&mut self, metrics: &[Metric]) { for metric in metrics { if let Some(status_change) = self.notification_manager.update_metric_status(&metric.name, metric.status) { info!("Status change detected for {}: {:?} -> {:?}", metric.name, status_change.old_status, status_change.new_status); // Send notification for status change if let Err(e) = self.notification_manager.send_status_change_notification(status_change, metric).await { error!("Failed to send notification: {}", e); } } } } async fn handle_commands(&mut self) -> Result<()> { // Try to receive commands (non-blocking) match self.zmq_handler.try_receive_command() { Ok(Some(command)) => { info!("Received command: {:?}", command); self.process_command(command).await?; } Ok(None) => { // No command available - this is normal } Err(e) => { error!("Error receiving command: {}", e); } } Ok(()) } async fn process_command(&mut self, command: AgentCommand) -> Result<()> { match command { AgentCommand::CollectNow => { info!("Processing CollectNow command"); if let Err(e) = self.collect_metrics_only().await { error!("Failed to collect metrics on command: {}", e); } } AgentCommand::SetInterval { seconds } => { info!("Processing SetInterval command: {} seconds", seconds); // Note: This would require modifying the interval, which is complex // For now, just log the request info!("Interval change requested but not implemented yet"); } AgentCommand::ToggleCollector { name, enabled } => { info!("Processing ToggleCollector command: {} -> {}", name, enabled); // Note: This would require dynamic collector management info!("Collector toggle requested but not implemented yet"); } AgentCommand::Ping => { info!("Processing Ping command - agent is alive"); // Could send a response back via ZMQ if needed } } Ok(()) } }