use anyhow::Result; use cm_dashboard_shared::{CommandOutputMessage, MessageEnvelope, MessageType, MetricMessage}; use tracing::{debug, error, info, warn}; use zmq::{Context, Socket, SocketType}; use crate::config::ZmqConfig; /// Commands that can be sent to agents #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum AgentCommand { /// Request immediate metric collection CollectNow, /// Change collection interval SetInterval { seconds: u64 }, /// Enable/disable a collector ToggleCollector { name: String, enabled: bool }, /// Request status/health check Ping, /// Control systemd service ServiceControl { service_name: String, action: ServiceAction, }, /// Rebuild NixOS system SystemRebuild { git_url: String, git_branch: String, working_dir: String, api_key_file: Option, }, } /// Service control actions #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum ServiceAction { Start, Stop, Status, UserStart, // User-initiated start (clears user-stopped flag) UserStop, // User-initiated stop (marks as user-stopped) } /// ZMQ consumer for receiving metrics from agents pub struct ZmqConsumer { subscriber: Socket, config: ZmqConfig, connected_hosts: std::collections::HashSet, } impl ZmqConsumer { pub async fn new(config: &ZmqConfig) -> Result { let context = Context::new(); // Create subscriber socket let subscriber = context.socket(SocketType::SUB)?; // Set socket options subscriber.set_rcvtimeo(1000)?; // 1 second timeout for non-blocking receives subscriber.set_subscribe(b"")?; // Subscribe to all messages info!("ZMQ consumer initialized"); Ok(Self { subscriber, config: config.clone(), connected_hosts: std::collections::HashSet::new(), }) } /// Connect to a specific host's agent pub async fn connect_to_host(&mut self, hostname: &str, port: u16) -> Result<()> { let address = format!("tcp://{}:{}", hostname, port); match self.subscriber.connect(&address) { Ok(()) => { info!("Connected to agent at {}", address); self.connected_hosts.insert(hostname.to_string()); Ok(()) } Err(e) => { error!("Failed to connect to agent at {}: {}", address, e); Err(anyhow::anyhow!("Failed to connect to {}: {}", address, e)) } } } /// Connect to predefined hosts using their configuration pub async fn connect_to_predefined_hosts(&mut self, hosts: &std::collections::HashMap) -> Result<()> { let default_port = self.config.subscriber_ports[0]; for (hostname, host_details) in hosts { // Try to connect using configured IP, but don't fail if some hosts are unreachable if let Err(e) = self.connect_to_host_with_details(hostname, host_details, default_port).await { warn!("Could not connect to {}: {}", hostname, e); } } info!( "Connected to {} out of {} configured hosts", self.connected_hosts.len(), hosts.len() ); Ok(()) } /// Connect to a host using its configuration details pub async fn connect_to_host_with_details(&mut self, hostname: &str, host_details: &crate::config::HostDetails, port: u16) -> Result<()> { // Get primary connection IP only - no fallbacks let primary_ip = host_details.get_connection_ip(hostname); // Connect directly without fallback attempts self.connect_to_host(&primary_ip, port).await } /// Receive command output from any connected agent (non-blocking) pub async fn receive_command_output(&mut self) -> Result> { match self.subscriber.recv_bytes(zmq::DONTWAIT) { Ok(data) => { // Deserialize envelope let envelope: MessageEnvelope = serde_json::from_slice(&data) .map_err(|e| anyhow::anyhow!("Failed to deserialize envelope: {}", e))?; // Check message type match envelope.message_type { MessageType::CommandOutput => { let cmd_output = envelope .decode_command_output() .map_err(|e| anyhow::anyhow!("Failed to decode command output: {}", e))?; debug!( "Received command output from {}: {}", cmd_output.hostname, cmd_output.output_line ); Ok(Some(cmd_output)) } _ => Ok(None), // Not a command output message } } Err(zmq::Error::EAGAIN) => { // No message available (non-blocking mode) Ok(None) } Err(e) => { error!("ZMQ receive error: {}", e); Err(anyhow::anyhow!("ZMQ receive error: {}", e)) } } } /// Receive metrics from any connected agent (non-blocking) pub async fn receive_metrics(&mut self) -> Result> { match self.subscriber.recv_bytes(zmq::DONTWAIT) { Ok(data) => { debug!("Received {} bytes from ZMQ", data.len()); // Deserialize envelope let envelope: MessageEnvelope = serde_json::from_slice(&data) .map_err(|e| anyhow::anyhow!("Failed to deserialize envelope: {}", e))?; // Check message type match envelope.message_type { MessageType::Metrics => { let metrics = envelope .decode_metrics() .map_err(|e| anyhow::anyhow!("Failed to decode metrics: {}", e))?; debug!( "Received {} metrics from {}", metrics.metrics.len(), metrics.hostname ); Ok(Some(metrics)) } MessageType::Heartbeat => { debug!("Received heartbeat"); Ok(None) // Don't return heartbeats as metrics } MessageType::CommandOutput => { debug!("Received command output (will be handled by receive_command_output)"); Ok(None) // Command output handled by separate method } _ => { debug!("Received non-metrics message: {:?}", envelope.message_type); Ok(None) } } } Err(zmq::Error::EAGAIN) => { // No message available (non-blocking mode) Ok(None) } Err(e) => { error!("ZMQ receive error: {}", e); Err(anyhow::anyhow!("ZMQ receive error: {}", e)) } } } } /// ZMQ command sender for sending commands to agents pub struct ZmqCommandSender { context: Context, } impl ZmqCommandSender { pub fn new(_config: &ZmqConfig) -> Result { let context = Context::new(); info!("ZMQ command sender initialized"); Ok(Self { context }) } /// Send a command to a specific agent pub async fn send_command(&self, hostname: &str, command: AgentCommand) -> Result<()> { // Create a new PUSH socket for this command (ZMQ best practice) let socket = self.context.socket(SocketType::PUSH)?; // Set socket options socket.set_linger(1000)?; // Wait up to 1 second on close socket.set_sndtimeo(5000)?; // 5 second send timeout // Connect to agent's command port (6131) let address = format!("tcp://{}:6131", hostname); socket.connect(&address)?; // Serialize command let serialized = serde_json::to_vec(&command)?; // Send command socket.send(&serialized, 0)?; info!("Sent command {:?} to agent at {}", command, hostname); // Socket will be automatically closed when dropped Ok(()) } }