All checks were successful
Build and Release / build-and-release (push) Successful in 2m34s
Replace ZMQ-based service start/stop commands with SSH execution in tmux popups. This provides better user feedback with real-time systemctl output while eliminating blocking operations from the main message processing loop. Changes: - Service start/stop now use SSH with progress display - Added backup functionality with 'B' key - Preserved transitional icons (↑/↓) for immediate visual feedback - Removed all ZMQ service control commands and handlers - Updated configuration to include backup_alias setting - All operations (rebuild, backup, services) now use consistent SSH interface This ensures stable heartbeat processing while providing superior user experience with live command output and service status feedback.
171 lines
6.3 KiB
Rust
171 lines
6.3 KiB
Rust
use anyhow::Result;
|
|
use cm_dashboard_shared::{CommandOutputMessage, MessageEnvelope, MessageType, MetricMessage};
|
|
use tracing::{debug, error, info, warn};
|
|
use zmq::{Context, Socket, SocketType};
|
|
|
|
use crate::config::ZmqConfig;
|
|
|
|
|
|
/// ZMQ consumer for receiving metrics from agents
|
|
pub struct ZmqConsumer {
|
|
subscriber: Socket,
|
|
config: ZmqConfig,
|
|
connected_hosts: std::collections::HashSet<String>,
|
|
}
|
|
|
|
impl ZmqConsumer {
|
|
pub async fn new(config: &ZmqConfig) -> Result<Self> {
|
|
let context = Context::new();
|
|
|
|
// Create subscriber socket
|
|
let subscriber = context.socket(SocketType::SUB)?;
|
|
|
|
// Set socket options
|
|
subscriber.set_rcvtimeo(1000)?; // 1 second timeout for non-blocking receives
|
|
subscriber.set_subscribe(b"")?; // Subscribe to all messages
|
|
|
|
info!("ZMQ consumer initialized");
|
|
|
|
Ok(Self {
|
|
subscriber,
|
|
config: config.clone(),
|
|
connected_hosts: std::collections::HashSet::new(),
|
|
})
|
|
}
|
|
|
|
/// Connect to a specific host's agent
|
|
pub async fn connect_to_host(&mut self, hostname: &str, port: u16) -> Result<()> {
|
|
let address = format!("tcp://{}:{}", hostname, port);
|
|
|
|
match self.subscriber.connect(&address) {
|
|
Ok(()) => {
|
|
info!("Connected to agent at {}", address);
|
|
self.connected_hosts.insert(hostname.to_string());
|
|
Ok(())
|
|
}
|
|
Err(e) => {
|
|
error!("Failed to connect to agent at {}: {}", address, e);
|
|
Err(anyhow::anyhow!("Failed to connect to {}: {}", address, e))
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/// Connect to predefined hosts using their configuration
|
|
pub async fn connect_to_predefined_hosts(&mut self, hosts: &std::collections::HashMap<String, crate::config::HostDetails>) -> Result<()> {
|
|
let default_port = self.config.subscriber_ports[0];
|
|
|
|
for (hostname, host_details) in hosts {
|
|
// Try to connect using configured IP, but don't fail if some hosts are unreachable
|
|
if let Err(e) = self.connect_to_host_with_details(hostname, host_details, default_port).await {
|
|
warn!("Could not connect to {}: {}", hostname, e);
|
|
}
|
|
}
|
|
|
|
info!(
|
|
"Connected to {} out of {} configured hosts",
|
|
self.connected_hosts.len(),
|
|
hosts.len()
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Connect to a host using its configuration details
|
|
pub async fn connect_to_host_with_details(&mut self, hostname: &str, host_details: &crate::config::HostDetails, port: u16) -> Result<()> {
|
|
// Get primary connection IP only - no fallbacks
|
|
let primary_ip = host_details.get_connection_ip(hostname);
|
|
|
|
// Connect directly without fallback attempts
|
|
self.connect_to_host(&primary_ip, port).await
|
|
}
|
|
|
|
/// Receive command output from any connected agent (non-blocking)
|
|
pub async fn receive_command_output(&mut self) -> Result<Option<CommandOutputMessage>> {
|
|
match self.subscriber.recv_bytes(zmq::DONTWAIT) {
|
|
Ok(data) => {
|
|
// Deserialize envelope
|
|
let envelope: MessageEnvelope = serde_json::from_slice(&data)
|
|
.map_err(|e| anyhow::anyhow!("Failed to deserialize envelope: {}", e))?;
|
|
|
|
// Check message type
|
|
match envelope.message_type {
|
|
MessageType::CommandOutput => {
|
|
let cmd_output = envelope
|
|
.decode_command_output()
|
|
.map_err(|e| anyhow::anyhow!("Failed to decode command output: {}", e))?;
|
|
|
|
debug!(
|
|
"Received command output from {}: {}",
|
|
cmd_output.hostname,
|
|
cmd_output.output_line
|
|
);
|
|
|
|
Ok(Some(cmd_output))
|
|
}
|
|
_ => Ok(None), // Not a command output message
|
|
}
|
|
}
|
|
Err(zmq::Error::EAGAIN) => {
|
|
// No message available (non-blocking mode)
|
|
Ok(None)
|
|
}
|
|
Err(e) => {
|
|
error!("ZMQ receive error: {}", e);
|
|
Err(anyhow::anyhow!("ZMQ receive error: {}", e))
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Receive metrics from any connected agent (non-blocking)
|
|
pub async fn receive_metrics(&mut self) -> Result<Option<MetricMessage>> {
|
|
match self.subscriber.recv_bytes(zmq::DONTWAIT) {
|
|
Ok(data) => {
|
|
debug!("Received {} bytes from ZMQ", data.len());
|
|
|
|
// Deserialize envelope
|
|
let envelope: MessageEnvelope = serde_json::from_slice(&data)
|
|
.map_err(|e| anyhow::anyhow!("Failed to deserialize envelope: {}", e))?;
|
|
|
|
// Check message type
|
|
match envelope.message_type {
|
|
MessageType::Metrics => {
|
|
let metrics = envelope
|
|
.decode_metrics()
|
|
.map_err(|e| anyhow::anyhow!("Failed to decode metrics: {}", e))?;
|
|
|
|
debug!(
|
|
"Received {} metrics from {}",
|
|
metrics.metrics.len(),
|
|
metrics.hostname
|
|
);
|
|
|
|
Ok(Some(metrics))
|
|
}
|
|
MessageType::Heartbeat => {
|
|
debug!("Received heartbeat");
|
|
Ok(None) // Don't return heartbeats as metrics
|
|
}
|
|
MessageType::CommandOutput => {
|
|
debug!("Received command output (will be handled by receive_command_output)");
|
|
Ok(None) // Command output handled by separate method
|
|
}
|
|
_ => {
|
|
debug!("Received non-metrics message: {:?}", envelope.message_type);
|
|
Ok(None)
|
|
}
|
|
}
|
|
}
|
|
Err(zmq::Error::EAGAIN) => {
|
|
// No message available (non-blocking mode)
|
|
Ok(None)
|
|
}
|
|
Err(e) => {
|
|
error!("ZMQ receive error: {}", e);
|
|
Err(anyhow::anyhow!("ZMQ receive error: {}", e))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|