use anyhow::Result; use cm_dashboard_shared::{AgentData, MessageEnvelope}; use tracing::{debug, info}; use zmq::{Context, Socket, SocketType}; use crate::config::ZmqConfig; /// ZMQ communication handler for publishing metrics pub struct ZmqHandler { publisher: Socket, } impl ZmqHandler { pub async fn new(config: &ZmqConfig) -> Result { let context = Context::new(); // Create publisher socket for metrics let publisher = context.socket(SocketType::PUB)?; let pub_bind_address = format!("tcp://{}:{}", config.bind_address, config.publisher_port); publisher.bind(&pub_bind_address)?; info!("ZMQ publisher bound to {}", pub_bind_address); // Set socket options for efficiency publisher.set_sndhwm(1000)?; // High water mark for outbound messages publisher.set_linger(1000)?; // Linger time on close Ok(Self { publisher, }) } /// Publish agent data via ZMQ pub async fn publish_agent_data(&self, data: &AgentData) -> Result<()> { debug!( "Publishing agent data for host {}", data.hostname ); // Create message envelope for agent data let envelope = MessageEnvelope::agent_data(data.clone()) .map_err(|e| anyhow::anyhow!("Failed to create agent data envelope: {}", e))?; // Serialize envelope let serialized = serde_json::to_vec(&envelope)?; // Send via ZMQ self.publisher.send(&serialized, 0)?; debug!("Published agent data message ({} bytes)", serialized.len()); Ok(()) } }