Files
cm-dashboard/agent/src/communication/mod.rs
Christoffer Martinsson adf3b0f51c
All checks were successful
Build and Release / build-and-release (push) Successful in 2m10s
Implement complete structured data architecture
Replace fragile string-based metrics with type-safe JSON data structures.
Agent converts all metrics to structured data, dashboard processes typed fields.

Changes:
- Add AgentData struct with CPU, memory, storage, services, backup fields
- Replace string parsing with direct field access throughout system
- Maintain UI compatibility via temporary metric bridge conversion
- Fix NVMe temperature display and eliminate string parsing bugs
- Update protocol to support structured data transmission over ZMQ
- Comprehensive metric type coverage: CPU, memory, storage, services, backup

Version bump to 0.1.131
2025-11-23 21:32:00 +01:00

101 lines
3.4 KiB
Rust

use anyhow::Result;
use cm_dashboard_shared::{AgentData, MessageEnvelope};
use tracing::{debug, info};
use zmq::{Context, Socket, SocketType};
use crate::config::ZmqConfig;
/// ZMQ communication handler for publishing metrics and receiving commands
pub struct ZmqHandler {
publisher: Socket,
command_receiver: Socket,
}
impl ZmqHandler {
pub async fn new(config: &ZmqConfig) -> Result<Self> {
let context = Context::new();
// Create publisher socket for metrics
let publisher = context.socket(SocketType::PUB)?;
let pub_bind_address = format!("tcp://{}:{}", config.bind_address, config.publisher_port);
publisher.bind(&pub_bind_address)?;
info!("ZMQ publisher bound to {}", pub_bind_address);
// Set socket options for efficiency
publisher.set_sndhwm(1000)?; // High water mark for outbound messages
publisher.set_linger(1000)?; // Linger time on close
// Create command receiver socket (PULL socket to receive commands from dashboard)
let command_receiver = context.socket(SocketType::PULL)?;
let cmd_bind_address = format!("tcp://{}:{}", config.bind_address, config.command_port);
command_receiver.bind(&cmd_bind_address)?;
info!("ZMQ command receiver bound to {}", cmd_bind_address);
// Set non-blocking mode for command receiver
command_receiver.set_rcvtimeo(0)?; // Non-blocking receive
command_receiver.set_linger(1000)?;
Ok(Self {
publisher,
command_receiver,
})
}
/// Publish agent data via ZMQ
pub async fn publish_agent_data(&self, data: &AgentData) -> Result<()> {
debug!(
"Publishing agent data for host {}",
data.hostname
);
// Create message envelope for agent data
let envelope = MessageEnvelope::agent_data(data.clone())
.map_err(|e| anyhow::anyhow!("Failed to create agent data envelope: {}", e))?;
// Serialize envelope
let serialized = serde_json::to_vec(&envelope)?;
// Send via ZMQ
self.publisher.send(&serialized, 0)?;
debug!("Published agent data message ({} bytes)", serialized.len());
Ok(())
}
/// Try to receive a command (non-blocking)
pub fn try_receive_command(&self) -> Result<Option<AgentCommand>> {
match self.command_receiver.recv_bytes(zmq::DONTWAIT) {
Ok(bytes) => {
debug!("Received command message ({} bytes)", bytes.len());
let command: AgentCommand = serde_json::from_slice(&bytes)
.map_err(|e| anyhow::anyhow!("Failed to deserialize command: {}", e))?;
debug!("Parsed command: {:?}", command);
Ok(Some(command))
}
Err(zmq::Error::EAGAIN) => {
// No message available (non-blocking)
Ok(None)
}
Err(e) => Err(anyhow::anyhow!("ZMQ receive error: {}", e)),
}
}
}
/// Commands that can be sent to the agent
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum AgentCommand {
/// Request immediate metric collection
CollectNow,
/// Change collection interval
SetInterval { seconds: u64 },
/// Enable/disable a collector
ToggleCollector { name: String, enabled: bool },
/// Request status/health check
Ping,
}