All checks were successful
Build and Release / build-and-release (push) Successful in 2m34s
Replace ZMQ-based service start/stop commands with SSH execution in tmux popups. This provides better user feedback with real-time systemctl output while eliminating blocking operations from the main message processing loop. Changes: - Service start/stop now use SSH with progress display - Added backup functionality with 'B' key - Preserved transitional icons (↑/↓) for immediate visual feedback - Removed all ZMQ service control commands and handlers - Updated configuration to include backup_alias setting - All operations (rebuild, backup, services) now use consistent SSH interface This ensures stable heartbeat processing while providing superior user experience with live command output and service status feedback.
102 lines
3.4 KiB
Rust
102 lines
3.4 KiB
Rust
use anyhow::Result;
|
|
use cm_dashboard_shared::{MessageEnvelope, MetricMessage};
|
|
use tracing::{debug, info};
|
|
use zmq::{Context, Socket, SocketType};
|
|
|
|
use crate::config::ZmqConfig;
|
|
|
|
/// ZMQ communication handler for publishing metrics and receiving commands
|
|
pub struct ZmqHandler {
|
|
publisher: Socket,
|
|
command_receiver: Socket,
|
|
}
|
|
|
|
impl ZmqHandler {
|
|
pub async fn new(config: &ZmqConfig) -> Result<Self> {
|
|
let context = Context::new();
|
|
|
|
// Create publisher socket for metrics
|
|
let publisher = context.socket(SocketType::PUB)?;
|
|
let pub_bind_address = format!("tcp://{}:{}", config.bind_address, config.publisher_port);
|
|
publisher.bind(&pub_bind_address)?;
|
|
|
|
info!("ZMQ publisher bound to {}", pub_bind_address);
|
|
|
|
// Set socket options for efficiency
|
|
publisher.set_sndhwm(1000)?; // High water mark for outbound messages
|
|
publisher.set_linger(1000)?; // Linger time on close
|
|
|
|
// Create command receiver socket (PULL socket to receive commands from dashboard)
|
|
let command_receiver = context.socket(SocketType::PULL)?;
|
|
let cmd_bind_address = format!("tcp://{}:{}", config.bind_address, config.command_port);
|
|
command_receiver.bind(&cmd_bind_address)?;
|
|
|
|
info!("ZMQ command receiver bound to {}", cmd_bind_address);
|
|
|
|
// Set non-blocking mode for command receiver
|
|
command_receiver.set_rcvtimeo(0)?; // Non-blocking receive
|
|
command_receiver.set_linger(1000)?;
|
|
|
|
Ok(Self {
|
|
publisher,
|
|
command_receiver,
|
|
})
|
|
}
|
|
|
|
/// Publish metrics message via ZMQ
|
|
pub async fn publish_metrics(&self, message: &MetricMessage) -> Result<()> {
|
|
debug!(
|
|
"Publishing {} metrics for host {}",
|
|
message.metrics.len(),
|
|
message.hostname
|
|
);
|
|
|
|
// Create message envelope
|
|
let envelope = MessageEnvelope::metrics(message.clone())
|
|
.map_err(|e| anyhow::anyhow!("Failed to create message envelope: {}", e))?;
|
|
|
|
// Serialize envelope
|
|
let serialized = serde_json::to_vec(&envelope)?;
|
|
|
|
// Send via ZMQ
|
|
self.publisher.send(&serialized, 0)?;
|
|
|
|
debug!("Published metrics message ({} bytes)", serialized.len());
|
|
Ok(())
|
|
}
|
|
|
|
|
|
/// Try to receive a command (non-blocking)
|
|
pub fn try_receive_command(&self) -> Result<Option<AgentCommand>> {
|
|
match self.command_receiver.recv_bytes(zmq::DONTWAIT) {
|
|
Ok(bytes) => {
|
|
debug!("Received command message ({} bytes)", bytes.len());
|
|
|
|
let command: AgentCommand = serde_json::from_slice(&bytes)
|
|
.map_err(|e| anyhow::anyhow!("Failed to deserialize command: {}", e))?;
|
|
|
|
debug!("Parsed command: {:?}", command);
|
|
Ok(Some(command))
|
|
}
|
|
Err(zmq::Error::EAGAIN) => {
|
|
// No message available (non-blocking)
|
|
Ok(None)
|
|
}
|
|
Err(e) => Err(anyhow::anyhow!("ZMQ receive error: {}", e)),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Commands that can be sent to the agent
|
|
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
|
pub enum AgentCommand {
|
|
/// Request immediate metric collection
|
|
CollectNow,
|
|
/// Change collection interval
|
|
SetInterval { seconds: u64 },
|
|
/// Enable/disable a collector
|
|
ToggleCollector { name: String, enabled: bool },
|
|
/// Request status/health check
|
|
Ping,
|
|
}
|