use anyhow::Result; use cm_dashboard_shared::{AgentData, CommandOutputMessage, MessageEnvelope, MessageType}; use tracing::{debug, error, info, warn}; use zmq::{Context, Socket, SocketType}; use crate::config::ZmqConfig; /// ZMQ consumer for receiving metrics from agents pub struct ZmqConsumer { subscriber: Socket, config: ZmqConfig, connected_hosts: std::collections::HashSet, } impl ZmqConsumer { pub async fn new(config: &ZmqConfig) -> Result { let context = Context::new(); // Create subscriber socket let subscriber = context.socket(SocketType::SUB)?; // Set socket options subscriber.set_rcvtimeo(1000)?; // 1 second timeout for non-blocking receives subscriber.set_subscribe(b"")?; // Subscribe to all messages info!("ZMQ consumer initialized"); Ok(Self { subscriber, config: config.clone(), connected_hosts: std::collections::HashSet::new(), }) } /// Connect to a specific host's agent pub async fn connect_to_host(&mut self, hostname: &str, port: u16) -> Result<()> { let address = format!("tcp://{}:{}", hostname, port); match self.subscriber.connect(&address) { Ok(()) => { info!("Connected to agent at {}", address); self.connected_hosts.insert(hostname.to_string()); Ok(()) } Err(e) => { error!("Failed to connect to agent at {}: {}", address, e); Err(anyhow::anyhow!("Failed to connect to {}: {}", address, e)) } } } /// Connect to predefined hosts using their configuration pub async fn connect_to_predefined_hosts(&mut self, hosts: &std::collections::HashMap) -> Result<()> { let default_port = self.config.subscriber_ports[0]; for (hostname, host_details) in hosts { // Try to connect using configured IP, but don't fail if some hosts are unreachable if let Err(e) = self.connect_to_host_with_details(hostname, host_details, default_port).await { warn!("Could not connect to {}: {}", hostname, e); } } info!( "Connected to {} out of {} configured hosts", self.connected_hosts.len(), hosts.len() ); Ok(()) } /// Connect to a host using its configuration details pub async fn connect_to_host_with_details(&mut self, hostname: &str, host_details: &crate::config::HostDetails, port: u16) -> Result<()> { // Get primary connection IP only - no fallbacks let primary_ip = host_details.get_connection_ip(hostname); // Connect directly without fallback attempts self.connect_to_host(&primary_ip, port).await } /// Receive command output from any connected agent (non-blocking) pub async fn receive_command_output(&mut self) -> Result> { match self.subscriber.recv_bytes(zmq::DONTWAIT) { Ok(data) => { // Deserialize envelope let envelope: MessageEnvelope = serde_json::from_slice(&data) .map_err(|e| anyhow::anyhow!("Failed to deserialize envelope: {}", e))?; // Check message type match envelope.message_type { MessageType::CommandOutput => { let cmd_output = envelope .decode_command_output() .map_err(|e| anyhow::anyhow!("Failed to decode command output: {}", e))?; debug!( "Received command output from {}: {}", cmd_output.hostname, cmd_output.output_line ); Ok(Some(cmd_output)) } _ => Ok(None), // Not a command output message } } Err(zmq::Error::EAGAIN) => { // No message available (non-blocking mode) Ok(None) } Err(e) => { error!("ZMQ receive error: {}", e); Err(anyhow::anyhow!("ZMQ receive error: {}", e)) } } } /// Receive agent data (non-blocking) pub async fn receive_agent_data(&mut self) -> Result> { match self.subscriber.recv_bytes(zmq::DONTWAIT) { Ok(data) => { debug!("Received {} bytes from ZMQ", data.len()); // Deserialize envelope let envelope: MessageEnvelope = serde_json::from_slice(&data) .map_err(|e| anyhow::anyhow!("Failed to deserialize envelope: {}", e))?; // Check message type match envelope.message_type { MessageType::AgentData => { let agent_data = envelope .decode_agent_data() .map_err(|e| anyhow::anyhow!("Failed to decode agent data: {}", e))?; debug!( "Received agent data from host {}", agent_data.hostname ); Ok(Some(agent_data)) } MessageType::Heartbeat => { debug!("Received heartbeat"); Ok(None) // Don't return heartbeats } MessageType::CommandOutput => { debug!("Received command output (will be handled by receive_command_output)"); Ok(None) // Command output handled by separate method } _ => { debug!("Received unsupported message: {:?}", envelope.message_type); Ok(None) } } } Err(zmq::Error::EAGAIN) => { // No message available (non-blocking mode) Ok(None) } Err(e) => { error!("ZMQ receive error: {}", e); Err(anyhow::anyhow!("ZMQ receive error: {}", e)) } } } }