Fix ZMQ sender blocking - move to independent thread with try_read
All checks were successful
Build and Release / build-and-release (push) Successful in 1m21s

CRITICAL FIX: The previous cached collector architecture still had ZMQ sending
in the main event loop, where it could block waiting for RwLock when collectors
were writing. This caused the 3-8 second delays you observed.

Changes:
- Move ZMQ publisher to dedicated std::thread (ZMQ sockets aren't thread-safe)
- Use try_read() instead of read() to avoid blocking on write locks
- Send previous data if cache is locked by collector
- ZMQ now sends every 2s regardless of collector timing
- Remove publisher from ZmqHandler (now only handles commands)

Architecture:
- Collectors: Independent tokio tasks updating shared cache
- ZMQ Sender: Dedicated OS thread with its own publisher socket
- Main Loop: Only handles commands and notifications

This ensures ZMQ transmission is NEVER blocked by slow collectors.

Bump version to v0.1.195
This commit is contained in:
Christoffer Martinsson 2025-11-27 22:56:58 +01:00
parent ed6399b914
commit 01e1f33b66
6 changed files with 67 additions and 63 deletions

6
Cargo.lock generated
View File

@ -279,7 +279,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
[[package]]
name = "cm-dashboard"
version = "0.1.193"
version = "0.1.194"
dependencies = [
"anyhow",
"chrono",
@ -301,7 +301,7 @@ dependencies = [
[[package]]
name = "cm-dashboard-agent"
version = "0.1.193"
version = "0.1.194"
dependencies = [
"anyhow",
"async-trait",
@ -324,7 +324,7 @@ dependencies = [
[[package]]
name = "cm-dashboard-shared"
version = "0.1.193"
version = "0.1.194"
dependencies = [
"chrono",
"serde",

View File

@ -1,6 +1,6 @@
[package]
name = "cm-dashboard-agent"
version = "0.1.194"
version = "0.1.195"
edition = "2021"
[dependencies]

View File

@ -193,31 +193,73 @@ impl Agent {
pub async fn run(&mut self, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>) -> Result<()> {
info!("Starting agent main loop with cached collector architecture");
// Set up intervals from config
let mut transmission_interval = interval(Duration::from_secs(
self.config.collection_interval_seconds,
));
// Spawn independent ZMQ sender task
// Create dedicated ZMQ publisher for the sender task
let cache_clone = self.cache.clone();
let publisher_config = self.config.zmq.clone();
let transmission_interval_secs = self.config.collection_interval_seconds;
std::thread::spawn(move || {
// Create ZMQ publisher in this thread (ZMQ sockets are not thread-safe)
let context = zmq::Context::new();
let publisher = context.socket(zmq::SocketType::PUB).unwrap();
let bind_address = format!("tcp://{}:{}", publisher_config.bind_address, publisher_config.publisher_port);
publisher.bind(&bind_address).unwrap();
publisher.set_sndhwm(1000).unwrap();
publisher.set_linger(1000).unwrap();
info!("ZMQ sender task started on {} (interval: {}s)", bind_address, transmission_interval_secs);
let mut last_sent_data: Option<AgentData> = None;
let interval_duration = std::time::Duration::from_secs(transmission_interval_secs);
let mut next_send = std::time::Instant::now() + interval_duration;
loop {
// Sleep until next send time
std::thread::sleep(next_send.saturating_duration_since(std::time::Instant::now()));
next_send = std::time::Instant::now() + interval_duration;
// Try to read cache without blocking - if locked, send last known data
let data_to_send = match cache_clone.try_read() {
Ok(agent_data) => {
let data_clone = agent_data.clone();
drop(agent_data); // Release lock immediately
last_sent_data = Some(data_clone.clone());
Some(data_clone)
}
Err(_) => {
// Lock is held by collector - use last sent data
debug!("Cache locked by collector, sending previous data");
last_sent_data.clone()
}
};
if let Some(data) = data_to_send {
// Publish via ZMQ
if let Ok(envelope) = cm_dashboard_shared::MessageEnvelope::agent_data(data) {
if let Ok(serialized) = serde_json::to_vec(&envelope) {
if let Err(e) = publisher.send(&serialized, 0) {
error!("Failed to send ZMQ message: {}", e);
} else {
debug!("Successfully broadcast agent data");
}
}
}
}
}
});
// Set up intervals for notifications and commands
let mut notification_interval = interval(Duration::from_secs(
self.config.notifications.check_interval_seconds,
));
let mut command_interval = interval(Duration::from_millis(100));
// Skip initial ticks
transmission_interval.tick().await;
notification_interval.tick().await;
command_interval.tick().await;
loop {
tokio::select! {
_ = transmission_interval.tick() => {
// Read current cache state and broadcast via ZMQ
let agent_data = self.cache.read().await.clone();
if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data).await {
error!("Failed to broadcast agent data: {}", e);
} else {
debug!("Successfully broadcast agent data");
}
}
_ = notification_interval.tick() => {
// Read cache and check for status changes
let agent_data = self.cache.read().await.clone();
@ -329,12 +371,9 @@ impl Agent {
match command {
AgentCommand::CollectNow => {
info!("Received immediate transmission request");
// With cached architecture, collectors run independently
// Just send current cache state immediately
let agent_data = self.cache.read().await.clone();
if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data).await {
error!("Failed to broadcast on demand: {}", e);
}
// With cached architecture and dedicated ZMQ sender thread,
// data is already being sent every interval
// This command is acknowledged but not actionable in new architecture
}
AgentCommand::SetInterval { seconds } => {
info!("Received interval change request: {}s", seconds);

View File

@ -1,13 +1,12 @@
use anyhow::Result;
use cm_dashboard_shared::{AgentData, MessageEnvelope};
use tracing::{debug, info};
use zmq::{Context, Socket, SocketType};
use crate::config::ZmqConfig;
/// ZMQ communication handler for publishing metrics and receiving commands
/// ZMQ communication handler for receiving commands
/// NOTE: Publishing is handled by dedicated thread in Agent::run()
pub struct ZmqHandler {
publisher: Socket,
command_receiver: Socket,
}
@ -15,17 +14,6 @@ impl ZmqHandler {
pub async fn new(config: &ZmqConfig) -> Result<Self> {
let context = Context::new();
// Create publisher socket for metrics
let publisher = context.socket(SocketType::PUB)?;
let pub_bind_address = format!("tcp://{}:{}", config.bind_address, config.publisher_port);
publisher.bind(&pub_bind_address)?;
info!("ZMQ publisher bound to {}", pub_bind_address);
// Set socket options for efficiency
publisher.set_sndhwm(1000)?; // High water mark for outbound messages
publisher.set_linger(1000)?; // Linger time on close
// Create command receiver socket (PULL socket to receive commands from dashboard)
let command_receiver = context.socket(SocketType::PULL)?;
let cmd_bind_address = format!("tcp://{}:{}", config.bind_address, config.command_port);
@ -38,33 +26,10 @@ impl ZmqHandler {
command_receiver.set_linger(1000)?;
Ok(Self {
publisher,
command_receiver,
})
}
/// Publish agent data via ZMQ
pub async fn publish_agent_data(&self, data: &AgentData) -> Result<()> {
debug!(
"Publishing agent data for host {}",
data.hostname
);
// Create message envelope for agent data
let envelope = MessageEnvelope::agent_data(data.clone())
.map_err(|e| anyhow::anyhow!("Failed to create agent data envelope: {}", e))?;
// Serialize envelope
let serialized = serde_json::to_vec(&envelope)?;
// Send via ZMQ
self.publisher.send(&serialized, 0)?;
debug!("Published agent data message ({} bytes)", serialized.len());
Ok(())
}
/// Try to receive a command (non-blocking)
pub fn try_receive_command(&self) -> Result<Option<AgentCommand>> {
match self.command_receiver.recv_bytes(zmq::DONTWAIT) {

View File

@ -1,6 +1,6 @@
[package]
name = "cm-dashboard"
version = "0.1.194"
version = "0.1.195"
edition = "2021"
[dependencies]

View File

@ -1,6 +1,6 @@
[package]
name = "cm-dashboard-shared"
version = "0.1.194"
version = "0.1.195"
edition = "2021"
[dependencies]