use anyhow::Result; use cm_dashboard_shared::{MessageEnvelope, MetricMessage}; use tracing::{debug, info}; use zmq::{Context, Socket, SocketType}; use crate::config::ZmqConfig; /// ZMQ communication handler for publishing metrics and receiving commands pub struct ZmqHandler { publisher: Socket, command_receiver: Socket, } impl ZmqHandler { pub async fn new(config: &ZmqConfig) -> Result { let context = Context::new(); // Create publisher socket for metrics let publisher = context.socket(SocketType::PUB)?; let pub_bind_address = format!("tcp://{}:{}", config.bind_address, config.publisher_port); publisher.bind(&pub_bind_address)?; info!("ZMQ publisher bound to {}", pub_bind_address); // Set socket options for efficiency publisher.set_sndhwm(1000)?; // High water mark for outbound messages publisher.set_linger(1000)?; // Linger time on close // Create command receiver socket (PULL socket to receive commands from dashboard) let command_receiver = context.socket(SocketType::PULL)?; let cmd_bind_address = format!("tcp://{}:{}", config.bind_address, config.command_port); command_receiver.bind(&cmd_bind_address)?; info!("ZMQ command receiver bound to {}", cmd_bind_address); // Set non-blocking mode for command receiver command_receiver.set_rcvtimeo(0)?; // Non-blocking receive command_receiver.set_linger(1000)?; Ok(Self { publisher, command_receiver, }) } /// Publish metrics message via ZMQ pub async fn publish_metrics(&self, message: &MetricMessage) -> Result<()> { debug!( "Publishing {} metrics for host {}", message.metrics.len(), message.hostname ); // Create message envelope let envelope = MessageEnvelope::metrics(message.clone()) .map_err(|e| anyhow::anyhow!("Failed to create message envelope: {}", e))?; // Serialize envelope let serialized = serde_json::to_vec(&envelope)?; // Send via ZMQ self.publisher.send(&serialized, 0)?; debug!("Published metrics message ({} bytes)", serialized.len()); Ok(()) } /// Send heartbeat (placeholder for future use) /// Try to receive a command (non-blocking) pub fn try_receive_command(&self) -> Result> { match self.command_receiver.recv_bytes(zmq::DONTWAIT) { Ok(bytes) => { debug!("Received command message ({} bytes)", bytes.len()); let command: AgentCommand = serde_json::from_slice(&bytes) .map_err(|e| anyhow::anyhow!("Failed to deserialize command: {}", e))?; debug!("Parsed command: {:?}", command); Ok(Some(command)) } Err(zmq::Error::EAGAIN) => { // No message available (non-blocking) Ok(None) } Err(e) => Err(anyhow::anyhow!("ZMQ receive error: {}", e)), } } } /// Commands that can be sent to the agent #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum AgentCommand { /// Request immediate metric collection CollectNow, /// Change collection interval SetInterval { seconds: u64 }, /// Enable/disable a collector ToggleCollector { name: String, enabled: bool }, /// Request status/health check Ping, /// Control systemd service ServiceControl { service_name: String, action: ServiceAction, }, } /// Service control actions #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum ServiceAction { Start, Stop, Status, UserStart, // User-initiated start (clears user-stopped flag) UserStop, // User-initiated stop (marks as user-stopped) }