use anyhow::Result; use gethostname::gethostname; use std::time::Duration; use tokio::time::interval; use tracing::{debug, error, info}; use crate::communication::{AgentCommand, ServiceAction, ZmqHandler}; use crate::config::AgentConfig; use crate::metrics::MetricCollectionManager; use crate::notifications::NotificationManager; use crate::service_tracker::UserStoppedServiceTracker; use crate::status::HostStatusManager; use cm_dashboard_shared::{Metric, MetricMessage, MetricValue, Status}; pub struct Agent { hostname: String, config: AgentConfig, zmq_handler: ZmqHandler, metric_manager: MetricCollectionManager, notification_manager: NotificationManager, host_status_manager: HostStatusManager, service_tracker: UserStoppedServiceTracker, } impl Agent { pub async fn new(config_path: Option) -> Result { let hostname = gethostname().to_string_lossy().to_string(); info!("Initializing agent for host: {}", hostname); // Load configuration (now required) let config_path = config_path.ok_or_else(|| anyhow::anyhow!("Configuration file path is required"))?; let config = AgentConfig::from_file(&config_path)?; info!("Agent configuration loaded"); // Initialize ZMQ communication let zmq_handler = ZmqHandler::new(&config.zmq).await?; info!( "ZMQ communication initialized on port {}", config.zmq.publisher_port ); // Initialize metric collection manager with cache config let metric_manager = MetricCollectionManager::new(&config.collectors, &config).await?; info!("Metric collection manager initialized"); // Initialize notification manager let notification_manager = NotificationManager::new(&config.notifications, &hostname)?; info!("Notification manager initialized"); // Initialize host status manager let host_status_manager = HostStatusManager::new(config.status_aggregation.clone()); info!("Host status manager initialized"); // Initialize user-stopped service tracker let service_tracker = UserStoppedServiceTracker::init_global()?; info!("User-stopped service tracker initialized"); Ok(Self { hostname, config, zmq_handler, metric_manager, notification_manager, host_status_manager, service_tracker, }) } pub async fn run(&mut self, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>) -> Result<()> { info!("Starting agent main loop with separated collection and transmission"); // CRITICAL: Collect ALL data immediately at startup before entering the loop info!("Performing initial FORCE collection of all metrics at startup"); if let Err(e) = self.collect_all_metrics_force().await { error!("Failed to collect initial metrics: {}", e); } else { info!("Initial metric collection completed - all data cached and ready"); } // Separate intervals for collection, transmission, heartbeat, and email notifications let mut collection_interval = interval(Duration::from_secs(self.config.collection_interval_seconds)); let mut transmission_interval = interval(Duration::from_secs(self.config.zmq.transmission_interval_seconds)); let mut heartbeat_interval = interval(Duration::from_secs(self.config.zmq.heartbeat_interval_seconds)); let mut notification_interval = interval(Duration::from_secs(self.config.notifications.aggregation_interval_seconds)); loop { tokio::select! { _ = collection_interval.tick() => { // Only collect and cache metrics, no ZMQ transmission if let Err(e) = self.collect_metrics_only().await { error!("Failed to collect metrics: {}", e); } } _ = transmission_interval.tick() => { // Send all metrics via ZMQ (dashboard updates only) if let Err(e) = self.broadcast_all_metrics().await { error!("Failed to broadcast metrics: {}", e); } } _ = heartbeat_interval.tick() => { // Send standalone heartbeat for host connectivity detection if let Err(e) = self.send_heartbeat().await { error!("Failed to send heartbeat: {}", e); } } _ = notification_interval.tick() => { // Process batched email notifications (separate from dashboard updates) if let Err(e) = self.host_status_manager.process_pending_notifications(&mut self.notification_manager).await { error!("Failed to process pending notifications: {}", e); } } // Handle incoming commands (check periodically) _ = tokio::time::sleep(Duration::from_millis(100)) => { if let Err(e) = self.handle_commands().await { error!("Error handling commands: {}", e); } } _ = &mut shutdown_rx => { info!("Shutdown signal received, stopping agent loop"); break; } } } info!("Agent main loop stopped"); Ok(()) } async fn collect_all_metrics_force(&mut self) -> Result<()> { info!("Starting FORCE metric collection for startup"); // Force collect all metrics from all collectors immediately let metrics = self.metric_manager.collect_all_metrics_force().await?; if metrics.is_empty() { error!("No metrics collected during force collection!"); return Ok(()); } info!("Force collected and cached {} metrics", metrics.len()); // Process metrics through status manager (collect status data at startup) let _status_changed = self.process_metrics(&metrics).await; Ok(()) } async fn collect_metrics_only(&mut self) -> Result<()> { debug!("Starting metric collection cycle (cache only)"); // Collect all metrics from all collectors and cache them let metrics = self.metric_manager.collect_all_metrics().await?; if metrics.is_empty() { debug!("No metrics collected this cycle"); return Ok(()); } debug!("Collected and cached {} metrics", metrics.len()); // Process metrics through status manager and trigger immediate transmission if status changed let status_changed = self.process_metrics(&metrics).await; if status_changed { info!("Status change detected - triggering immediate metric transmission"); if let Err(e) = self.broadcast_all_metrics().await { error!("Failed to broadcast metrics after status change: {}", e); } } Ok(()) } async fn broadcast_all_metrics(&mut self) -> Result<()> { debug!("Broadcasting cached metrics via ZMQ"); // Get cached metrics (no fresh collection) let mut metrics = self.metric_manager.get_cached_metrics(); // Add the host status summary metric from status manager let host_status_metric = self.host_status_manager.get_host_status_metric(); metrics.push(host_status_metric); // Add agent version metric for cross-host version comparison let version_metric = self.get_agent_version_metric(); metrics.push(version_metric); // Add heartbeat metric for host connectivity detection let heartbeat_metric = self.get_heartbeat_metric(); metrics.push(heartbeat_metric); // Check for user-stopped services that are now active and clear their flags self.clear_user_stopped_flags_for_active_services(&metrics); if metrics.is_empty() { debug!("No metrics to broadcast"); return Ok(()); } debug!("Broadcasting {} cached metrics (including host status summary)", metrics.len()); // Create and send message with all current data let message = MetricMessage::new(self.hostname.clone(), metrics); self.zmq_handler.publish_metrics(&message).await?; debug!("Metrics broadcasted successfully"); Ok(()) } async fn process_metrics(&mut self, metrics: &[Metric]) -> bool { let mut status_changed = false; for metric in metrics { // Filter excluded metrics from email notification processing only if self.config.notifications.exclude_email_metrics.contains(&metric.name) { debug!("Excluding metric '{}' from email notification processing", metric.name); continue; } if self.host_status_manager.process_metric(metric, &mut self.notification_manager).await { status_changed = true; } } status_changed } /// Create agent version metric for cross-host version comparison fn get_agent_version_metric(&self) -> Metric { // Get version from executable path (same logic as main.rs get_version) let version = self.get_agent_version(); Metric::new( "agent_version".to_string(), MetricValue::String(version), Status::Ok, ) } /// Get agent version from Cargo package version fn get_agent_version(&self) -> String { // Use the version from Cargo.toml (e.g., "0.1.11") format!("v{}", env!("CARGO_PKG_VERSION")) } /// Create heartbeat metric for host connectivity detection fn get_heartbeat_metric(&self) -> Metric { use std::time::{SystemTime, UNIX_EPOCH}; let timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(); Metric::new( "agent_heartbeat".to_string(), MetricValue::Integer(timestamp as i64), Status::Ok, ) } /// Send standalone heartbeat for connectivity detection async fn send_heartbeat(&mut self) -> Result<()> { let heartbeat_metric = self.get_heartbeat_metric(); let message = MetricMessage::new( self.hostname.clone(), vec![heartbeat_metric], ); self.zmq_handler.publish_metrics(&message).await?; debug!("Sent standalone heartbeat for connectivity detection"); Ok(()) } async fn handle_commands(&mut self) -> Result<()> { // Try to receive commands (non-blocking) match self.zmq_handler.try_receive_command() { Ok(Some(command)) => { info!("Received command: {:?}", command); self.process_command(command).await?; } Ok(None) => { // No command available - this is normal } Err(e) => { error!("Error receiving command: {}", e); } } Ok(()) } async fn process_command(&mut self, command: AgentCommand) -> Result<()> { match command { AgentCommand::CollectNow => { info!("Processing CollectNow command"); if let Err(e) = self.collect_metrics_only().await { error!("Failed to collect metrics on command: {}", e); } } AgentCommand::SetInterval { seconds } => { info!("Processing SetInterval command: {} seconds", seconds); // Note: This would require modifying the interval, which is complex // For now, just log the request info!("Interval change requested but not implemented yet"); } AgentCommand::ToggleCollector { name, enabled } => { info!( "Processing ToggleCollector command: {} -> {}", name, enabled ); // Note: This would require dynamic collector management info!("Collector toggle requested but not implemented yet"); } AgentCommand::Ping => { info!("Processing Ping command - agent is alive"); // Could send a response back via ZMQ if needed } AgentCommand::ServiceControl { service_name, action } => { info!("Processing ServiceControl command: {} {:?}", service_name, action); if let Err(e) = self.handle_service_control(&service_name, &action).await { error!("Failed to execute service control: {}", e); } } } Ok(()) } /// Handle systemd service control commands async fn handle_service_control(&mut self, service_name: &str, action: &ServiceAction) -> Result<()> { let (action_str, is_user_action) = match action { ServiceAction::Start => ("start", false), ServiceAction::Stop => ("stop", false), ServiceAction::Status => ("status", false), ServiceAction::UserStart => ("start", true), ServiceAction::UserStop => ("stop", true), }; info!("Executing systemctl {} {} (user action: {})", action_str, service_name, is_user_action); // Handle user-stopped service tracking before systemctl execution (stop only) match action { ServiceAction::UserStop => { info!("Marking service '{}' as user-stopped", service_name); if let Err(e) = self.service_tracker.mark_user_stopped(service_name) { error!("Failed to mark service as user-stopped: {}", e); } else { // Sync to global tracker UserStoppedServiceTracker::update_global(&self.service_tracker); } } _ => {} } // Spawn the systemctl command asynchronously to avoid blocking the agent let service_name_clone = service_name.to_string(); let action_str_clone = action_str.to_string(); tokio::spawn(async move { let result = tokio::process::Command::new("sudo") .arg("systemctl") .arg(&action_str_clone) .arg(format!("{}.service", service_name_clone)) .output() .await; match result { Ok(output) => { if output.status.success() { info!("Service {} {} completed successfully", service_name_clone, action_str_clone); if !output.stdout.is_empty() { debug!("stdout: {}", String::from_utf8_lossy(&output.stdout)); } } else { let stderr = String::from_utf8_lossy(&output.stderr); error!("Service {} {} failed: {}", service_name_clone, action_str_clone, stderr); } } Err(e) => { error!("Failed to execute systemctl {} {}: {}", action_str_clone, service_name_clone, e); } } }); info!("Service {} {} command initiated (non-blocking)", service_name, action_str); // Note: Service status will be updated by the normal metric collection cycle // once the systemctl operation completes Ok(()) } /// Check metrics for user-stopped services that are now active and clear their flags fn clear_user_stopped_flags_for_active_services(&mut self, metrics: &[Metric]) { for metric in metrics { // Look for service status metrics that are active if metric.name.starts_with("service_") && metric.name.ends_with("_status") { if let MetricValue::String(status) = &metric.value { if status == "active" { // Extract service name from metric name (service_nginx_status -> nginx) let service_name = metric.name .strip_prefix("service_") .and_then(|s| s.strip_suffix("_status")) .unwrap_or(""); if !service_name.is_empty() && UserStoppedServiceTracker::is_service_user_stopped(service_name) { info!("Service '{}' is now active - clearing user-stopped flag", service_name); if let Err(e) = self.service_tracker.clear_user_stopped(service_name) { error!("Failed to clear user-stopped flag for '{}': {}", service_name, e); } else { // Sync to global tracker UserStoppedServiceTracker::update_global(&self.service_tracker); debug!("Cleared user-stopped flag for service '{}'", service_name); } } } } } } } }