use anyhow::Result; use gethostname::gethostname; use std::time::Duration; use tokio::time::interval; use tracing::{debug, error, info}; use crate::communication::{AgentCommand, ServiceAction, ZmqHandler}; use crate::config::AgentConfig; use crate::metrics::MetricCollectionManager; use crate::notifications::NotificationManager; use crate::status::HostStatusManager; use cm_dashboard_shared::{Metric, MetricMessage}; pub struct Agent { hostname: String, config: AgentConfig, zmq_handler: ZmqHandler, metric_manager: MetricCollectionManager, notification_manager: NotificationManager, host_status_manager: HostStatusManager, } impl Agent { pub async fn new(config_path: Option) -> Result { let hostname = gethostname().to_string_lossy().to_string(); info!("Initializing agent for host: {}", hostname); // Load configuration (now required) let config_path = config_path.ok_or_else(|| anyhow::anyhow!("Configuration file path is required"))?; let config = AgentConfig::from_file(&config_path)?; info!("Agent configuration loaded"); // Initialize ZMQ communication let zmq_handler = ZmqHandler::new(&config.zmq).await?; info!( "ZMQ communication initialized on port {}", config.zmq.publisher_port ); // Initialize metric collection manager with cache config let metric_manager = MetricCollectionManager::new(&config.collectors, &config).await?; info!("Metric collection manager initialized"); // Initialize notification manager let notification_manager = NotificationManager::new(&config.notifications, &hostname)?; info!("Notification manager initialized"); // Initialize host status manager let host_status_manager = HostStatusManager::new(config.status_aggregation.clone()); info!("Host status manager initialized"); Ok(Self { hostname, config, zmq_handler, metric_manager, notification_manager, host_status_manager, }) } pub async fn run(&mut self, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>) -> Result<()> { info!("Starting agent main loop with separated collection and transmission"); // CRITICAL: Collect ALL data immediately at startup before entering the loop info!("Performing initial FORCE collection of all metrics at startup"); if let Err(e) = self.collect_all_metrics_force().await { error!("Failed to collect initial metrics: {}", e); } else { info!("Initial metric collection completed - all data cached and ready"); } // Separate intervals for collection and transmission let mut collection_interval = interval(Duration::from_secs(self.config.collection_interval_seconds)); let mut transmission_interval = interval(Duration::from_secs(1)); // ZMQ broadcast every 1 second let mut notification_interval = interval(Duration::from_secs(self.config.status_aggregation.notification_interval_seconds)); loop { tokio::select! { _ = collection_interval.tick() => { // Only collect and cache metrics, no ZMQ transmission if let Err(e) = self.collect_metrics_only().await { error!("Failed to collect metrics: {}", e); } } _ = transmission_interval.tick() => { // Send all metrics via ZMQ every 1 second if let Err(e) = self.broadcast_all_metrics().await { error!("Failed to broadcast metrics: {}", e); } } _ = notification_interval.tick() => { // Process batched notifications if let Err(e) = self.host_status_manager.process_pending_notifications(&mut self.notification_manager).await { error!("Failed to process pending notifications: {}", e); } } // Handle incoming commands (check periodically) _ = tokio::time::sleep(Duration::from_millis(100)) => { if let Err(e) = self.handle_commands().await { error!("Error handling commands: {}", e); } } _ = &mut shutdown_rx => { info!("Shutdown signal received, stopping agent loop"); break; } } } info!("Agent main loop stopped"); Ok(()) } async fn collect_all_metrics_force(&mut self) -> Result<()> { info!("Starting FORCE metric collection for startup"); // Force collect all metrics from all collectors immediately let metrics = self.metric_manager.collect_all_metrics_force().await?; if metrics.is_empty() { error!("No metrics collected during force collection!"); return Ok(()); } info!("Force collected and cached {} metrics", metrics.len()); // Process metrics through status manager self.process_metrics(&metrics).await; Ok(()) } async fn collect_metrics_only(&mut self) -> Result<()> { debug!("Starting metric collection cycle (cache only)"); // Collect all metrics from all collectors and cache them let metrics = self.metric_manager.collect_all_metrics().await?; if metrics.is_empty() { debug!("No metrics collected this cycle"); return Ok(()); } debug!("Collected and cached {} metrics", metrics.len()); // Process metrics through status manager self.process_metrics(&metrics).await; Ok(()) } async fn broadcast_all_metrics(&mut self) -> Result<()> { debug!("Broadcasting all metrics via ZMQ"); // Get all current metrics from collectors let mut metrics = self.metric_manager.collect_all_metrics().await?; // Add the host status summary metric from status manager let host_status_metric = self.host_status_manager.get_host_status_metric(); metrics.push(host_status_metric); if metrics.is_empty() { debug!("No metrics to broadcast"); return Ok(()); } debug!("Broadcasting {} metrics (including host status summary)", metrics.len()); // Create and send message with all current data let message = MetricMessage::new(self.hostname.clone(), metrics); self.zmq_handler.publish_metrics(&message).await?; debug!("Metrics broadcasted successfully"); Ok(()) } async fn process_metrics(&mut self, metrics: &[Metric]) { for metric in metrics { self.host_status_manager.process_metric(metric, &mut self.notification_manager).await; } } async fn handle_commands(&mut self) -> Result<()> { // Try to receive commands (non-blocking) match self.zmq_handler.try_receive_command() { Ok(Some(command)) => { info!("Received command: {:?}", command); self.process_command(command).await?; } Ok(None) => { // No command available - this is normal } Err(e) => { error!("Error receiving command: {}", e); } } Ok(()) } async fn process_command(&mut self, command: AgentCommand) -> Result<()> { match command { AgentCommand::CollectNow => { info!("Processing CollectNow command"); if let Err(e) = self.collect_metrics_only().await { error!("Failed to collect metrics on command: {}", e); } } AgentCommand::SetInterval { seconds } => { info!("Processing SetInterval command: {} seconds", seconds); // Note: This would require modifying the interval, which is complex // For now, just log the request info!("Interval change requested but not implemented yet"); } AgentCommand::ToggleCollector { name, enabled } => { info!( "Processing ToggleCollector command: {} -> {}", name, enabled ); // Note: This would require dynamic collector management info!("Collector toggle requested but not implemented yet"); } AgentCommand::Ping => { info!("Processing Ping command - agent is alive"); // Could send a response back via ZMQ if needed } AgentCommand::ServiceControl { service_name, action } => { info!("Processing ServiceControl command: {} {:?}", service_name, action); if let Err(e) = self.handle_service_control(&service_name, &action).await { error!("Failed to execute service control: {}", e); } } AgentCommand::SystemRebuild { git_url, git_branch, working_dir, api_key_file } => { info!("Processing SystemRebuild command: {} @ {} -> {}", git_url, git_branch, working_dir); if let Err(e) = self.handle_system_rebuild(&git_url, &git_branch, &working_dir, api_key_file.as_deref()).await { error!("Failed to execute system rebuild: {}", e); } } } Ok(()) } /// Handle systemd service control commands async fn handle_service_control(&self, service_name: &str, action: &ServiceAction) -> Result<()> { let action_str = match action { ServiceAction::Start => "start", ServiceAction::Stop => "stop", ServiceAction::Restart => "restart", ServiceAction::Status => "status", }; info!("Executing systemctl {} {}", action_str, service_name); let output = tokio::process::Command::new("sudo") .arg("systemctl") .arg(action_str) .arg(service_name) .output() .await?; if output.status.success() { info!("Service {} {} completed successfully", service_name, action_str); if !output.stdout.is_empty() { debug!("stdout: {}", String::from_utf8_lossy(&output.stdout)); } } else { let stderr = String::from_utf8_lossy(&output.stderr); error!("Service {} {} failed: {}", service_name, action_str, stderr); return Err(anyhow::anyhow!("systemctl {} {} failed: {}", action_str, service_name, stderr)); } // Force refresh metrics after service control to update service status if matches!(action, ServiceAction::Start | ServiceAction::Stop | ServiceAction::Restart) { info!("Triggering metric refresh after service control"); // Note: We can't call self.collect_metrics_only() here due to borrowing issues // The next metric collection cycle will pick up the changes } Ok(()) } /// Handle NixOS system rebuild commands with git clone approach async fn handle_system_rebuild(&self, git_url: &str, git_branch: &str, working_dir: &str, api_key_file: Option<&str>) -> Result<()> { info!("Starting NixOS system rebuild: {} @ {} -> {}", git_url, git_branch, working_dir); // Enable maintenance mode before rebuild let maintenance_file = "/tmp/cm-maintenance"; if let Err(e) = tokio::fs::File::create(maintenance_file).await { error!("Failed to create maintenance mode file: {}", e); } else { info!("Maintenance mode enabled"); } // Clone or update repository let git_result = self.ensure_git_repository(git_url, git_branch, working_dir, api_key_file).await; // Execute nixos-rebuild if git operation succeeded - run detached but log output let rebuild_result = if git_result.is_ok() { info!("Git repository ready, executing nixos-rebuild in detached mode"); let log_file = std::fs::OpenOptions::new() .create(true) .append(true) .open("/var/log/cm-dashboard/nixos-rebuild.log") .map_err(|e| anyhow::anyhow!("Failed to open rebuild log: {}", e))?; tokio::process::Command::new("nohup") .arg("sudo") .arg("/run/current-system/sw/bin/nixos-rebuild") .arg("switch") .arg("--option") .arg("sandbox") .arg("false") .arg("--flake") .arg(".") .current_dir(working_dir) .stdin(std::process::Stdio::null()) .stdout(std::process::Stdio::from(log_file.try_clone().unwrap())) .stderr(std::process::Stdio::from(log_file)) .spawn() } else { return git_result.and_then(|_| unreachable!()); }; // Always try to remove maintenance mode file if let Err(e) = tokio::fs::remove_file(maintenance_file).await { if e.kind() != std::io::ErrorKind::NotFound { error!("Failed to remove maintenance mode file: {}", e); } } else { info!("Maintenance mode disabled"); } // Check rebuild start result match rebuild_result { Ok(_child) => { info!("NixOS rebuild started successfully in background"); // Don't wait for completion to avoid agent being killed during rebuild } Err(e) => { error!("Failed to start nixos-rebuild: {}", e); return Err(anyhow::anyhow!("Failed to start nixos-rebuild: {}", e)); } } info!("System rebuild completed, triggering metric refresh"); Ok(()) } /// Ensure git repository is cloned and up to date with force clone approach async fn ensure_git_repository(&self, git_url: &str, git_branch: &str, working_dir: &str, api_key_file: Option<&str>) -> Result<()> { use std::path::Path; // Read API key if provided let auth_url = if let Some(key_file) = api_key_file { match tokio::fs::read_to_string(key_file).await { Ok(api_key) => { let api_key = api_key.trim(); if !api_key.is_empty() { // Convert https://gitea.cmtec.se/cm/nixosbox.git to https://token@gitea.cmtec.se/cm/nixosbox.git if git_url.starts_with("https://") { let url_without_protocol = &git_url[8..]; // Remove "https://" format!("https://{}@{}", api_key, url_without_protocol) } else { info!("API key provided but URL is not HTTPS, using original URL"); git_url.to_string() } } else { info!("API key file is empty, using original URL"); git_url.to_string() } } Err(e) => { info!("Could not read API key file {}: {}, using original URL", key_file, e); git_url.to_string() } } } else { git_url.to_string() }; // Always remove existing directory and do fresh clone for consistent state let working_path = Path::new(working_dir); if working_path.exists() { info!("Removing existing repository directory: {}", working_dir); if let Err(e) = tokio::fs::remove_dir_all(working_path).await { error!("Failed to remove existing directory: {}", e); return Err(anyhow::anyhow!("Failed to remove existing directory: {}", e)); } } info!("Force cloning git repository from {} (branch: {})", git_url, git_branch); // Force clone with depth 1 for efficiency (no history needed for deployment) let output = tokio::process::Command::new("git") .arg("clone") .arg("--depth") .arg("1") .arg("--branch") .arg(git_branch) .arg(&auth_url) .arg(working_dir) .output() .await?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); error!("Git clone failed: {}", stderr); return Err(anyhow::anyhow!("Git clone failed: {}", stderr)); } info!("Git repository cloned successfully with latest state"); Ok(()) } }