From 01e1f33b66f3f33a448b95be73aabcfdd3a8c6d0 Mon Sep 17 00:00:00 2001 From: Christoffer Martinsson Date: Thu, 27 Nov 2025 22:56:58 +0100 Subject: [PATCH] Fix ZMQ sender blocking - move to independent thread with try_read CRITICAL FIX: The previous cached collector architecture still had ZMQ sending in the main event loop, where it could block waiting for RwLock when collectors were writing. This caused the 3-8 second delays you observed. Changes: - Move ZMQ publisher to dedicated std::thread (ZMQ sockets aren't thread-safe) - Use try_read() instead of read() to avoid blocking on write locks - Send previous data if cache is locked by collector - ZMQ now sends every 2s regardless of collector timing - Remove publisher from ZmqHandler (now only handles commands) Architecture: - Collectors: Independent tokio tasks updating shared cache - ZMQ Sender: Dedicated OS thread with its own publisher socket - Main Loop: Only handles commands and notifications This ensures ZMQ transmission is NEVER blocked by slow collectors. Bump version to v0.1.195 --- Cargo.lock | 6 +-- agent/Cargo.toml | 2 +- agent/src/agent.rs | 79 +++++++++++++++++++++++++--------- agent/src/communication/mod.rs | 39 +---------------- dashboard/Cargo.toml | 2 +- shared/Cargo.toml | 2 +- 6 files changed, 67 insertions(+), 63 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5bcd08f..0c683ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -279,7 +279,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" [[package]] name = "cm-dashboard" -version = "0.1.193" +version = "0.1.194" dependencies = [ "anyhow", "chrono", @@ -301,7 +301,7 @@ dependencies = [ [[package]] name = "cm-dashboard-agent" -version = "0.1.193" +version = "0.1.194" dependencies = [ "anyhow", "async-trait", @@ -324,7 +324,7 @@ dependencies = [ [[package]] name = "cm-dashboard-shared" -version = "0.1.193" +version = "0.1.194" dependencies = [ "chrono", "serde", diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 555cb45..86a4a38 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cm-dashboard-agent" -version = "0.1.194" +version = "0.1.195" edition = "2021" [dependencies] diff --git a/agent/src/agent.rs b/agent/src/agent.rs index 900c7a2..3e1d5dd 100644 --- a/agent/src/agent.rs +++ b/agent/src/agent.rs @@ -193,31 +193,73 @@ impl Agent { pub async fn run(&mut self, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>) -> Result<()> { info!("Starting agent main loop with cached collector architecture"); - // Set up intervals from config - let mut transmission_interval = interval(Duration::from_secs( - self.config.collection_interval_seconds, - )); + // Spawn independent ZMQ sender task + // Create dedicated ZMQ publisher for the sender task + let cache_clone = self.cache.clone(); + let publisher_config = self.config.zmq.clone(); + let transmission_interval_secs = self.config.collection_interval_seconds; + + std::thread::spawn(move || { + // Create ZMQ publisher in this thread (ZMQ sockets are not thread-safe) + let context = zmq::Context::new(); + let publisher = context.socket(zmq::SocketType::PUB).unwrap(); + let bind_address = format!("tcp://{}:{}", publisher_config.bind_address, publisher_config.publisher_port); + publisher.bind(&bind_address).unwrap(); + publisher.set_sndhwm(1000).unwrap(); + publisher.set_linger(1000).unwrap(); + info!("ZMQ sender task started on {} (interval: {}s)", bind_address, transmission_interval_secs); + + let mut last_sent_data: Option = None; + let interval_duration = std::time::Duration::from_secs(transmission_interval_secs); + let mut next_send = std::time::Instant::now() + interval_duration; + + loop { + // Sleep until next send time + std::thread::sleep(next_send.saturating_duration_since(std::time::Instant::now())); + next_send = std::time::Instant::now() + interval_duration; + + // Try to read cache without blocking - if locked, send last known data + let data_to_send = match cache_clone.try_read() { + Ok(agent_data) => { + let data_clone = agent_data.clone(); + drop(agent_data); // Release lock immediately + last_sent_data = Some(data_clone.clone()); + Some(data_clone) + } + Err(_) => { + // Lock is held by collector - use last sent data + debug!("Cache locked by collector, sending previous data"); + last_sent_data.clone() + } + }; + + if let Some(data) = data_to_send { + // Publish via ZMQ + if let Ok(envelope) = cm_dashboard_shared::MessageEnvelope::agent_data(data) { + if let Ok(serialized) = serde_json::to_vec(&envelope) { + if let Err(e) = publisher.send(&serialized, 0) { + error!("Failed to send ZMQ message: {}", e); + } else { + debug!("Successfully broadcast agent data"); + } + } + } + } + } + }); + + // Set up intervals for notifications and commands let mut notification_interval = interval(Duration::from_secs( self.config.notifications.check_interval_seconds, )); let mut command_interval = interval(Duration::from_millis(100)); // Skip initial ticks - transmission_interval.tick().await; notification_interval.tick().await; command_interval.tick().await; loop { tokio::select! { - _ = transmission_interval.tick() => { - // Read current cache state and broadcast via ZMQ - let agent_data = self.cache.read().await.clone(); - if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data).await { - error!("Failed to broadcast agent data: {}", e); - } else { - debug!("Successfully broadcast agent data"); - } - } _ = notification_interval.tick() => { // Read cache and check for status changes let agent_data = self.cache.read().await.clone(); @@ -329,12 +371,9 @@ impl Agent { match command { AgentCommand::CollectNow => { info!("Received immediate transmission request"); - // With cached architecture, collectors run independently - // Just send current cache state immediately - let agent_data = self.cache.read().await.clone(); - if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data).await { - error!("Failed to broadcast on demand: {}", e); - } + // With cached architecture and dedicated ZMQ sender thread, + // data is already being sent every interval + // This command is acknowledged but not actionable in new architecture } AgentCommand::SetInterval { seconds } => { info!("Received interval change request: {}s", seconds); diff --git a/agent/src/communication/mod.rs b/agent/src/communication/mod.rs index c364f7c..591f09f 100644 --- a/agent/src/communication/mod.rs +++ b/agent/src/communication/mod.rs @@ -1,13 +1,12 @@ use anyhow::Result; -use cm_dashboard_shared::{AgentData, MessageEnvelope}; use tracing::{debug, info}; use zmq::{Context, Socket, SocketType}; use crate::config::ZmqConfig; -/// ZMQ communication handler for publishing metrics and receiving commands +/// ZMQ communication handler for receiving commands +/// NOTE: Publishing is handled by dedicated thread in Agent::run() pub struct ZmqHandler { - publisher: Socket, command_receiver: Socket, } @@ -15,17 +14,6 @@ impl ZmqHandler { pub async fn new(config: &ZmqConfig) -> Result { let context = Context::new(); - // Create publisher socket for metrics - let publisher = context.socket(SocketType::PUB)?; - let pub_bind_address = format!("tcp://{}:{}", config.bind_address, config.publisher_port); - publisher.bind(&pub_bind_address)?; - - info!("ZMQ publisher bound to {}", pub_bind_address); - - // Set socket options for efficiency - publisher.set_sndhwm(1000)?; // High water mark for outbound messages - publisher.set_linger(1000)?; // Linger time on close - // Create command receiver socket (PULL socket to receive commands from dashboard) let command_receiver = context.socket(SocketType::PULL)?; let cmd_bind_address = format!("tcp://{}:{}", config.bind_address, config.command_port); @@ -38,33 +26,10 @@ impl ZmqHandler { command_receiver.set_linger(1000)?; Ok(Self { - publisher, command_receiver, }) } - - /// Publish agent data via ZMQ - pub async fn publish_agent_data(&self, data: &AgentData) -> Result<()> { - debug!( - "Publishing agent data for host {}", - data.hostname - ); - - // Create message envelope for agent data - let envelope = MessageEnvelope::agent_data(data.clone()) - .map_err(|e| anyhow::anyhow!("Failed to create agent data envelope: {}", e))?; - - // Serialize envelope - let serialized = serde_json::to_vec(&envelope)?; - - // Send via ZMQ - self.publisher.send(&serialized, 0)?; - - debug!("Published agent data message ({} bytes)", serialized.len()); - Ok(()) - } - /// Try to receive a command (non-blocking) pub fn try_receive_command(&self) -> Result> { match self.command_receiver.recv_bytes(zmq::DONTWAIT) { diff --git a/dashboard/Cargo.toml b/dashboard/Cargo.toml index 7dfa429..a7da6d6 100644 --- a/dashboard/Cargo.toml +++ b/dashboard/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cm-dashboard" -version = "0.1.194" +version = "0.1.195" edition = "2021" [dependencies] diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 26314cd..9dd1a6d 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cm-dashboard-shared" -version = "0.1.194" +version = "0.1.195" edition = "2021" [dependencies]