From f09ccabc7fef01c71bf734c898ad6ec6538352c8 Mon Sep 17 00:00:00 2001 From: Christoffer Martinsson Date: Thu, 27 Nov 2025 23:09:40 +0100 Subject: [PATCH] Revert "Fix data duplication in cached collector architecture" This reverts commit 14618c59c61b2f8d731697f01b8388ace825a809. --- CLAUDE.md | 21 +++------ Cargo.lock | 6 +-- agent/Cargo.toml | 2 +- agent/src/agent.rs | 79 +++++++++------------------------ agent/src/collectors/disk.rs | 6 --- agent/src/collectors/memory.rs | 5 +-- agent/src/collectors/systemd.rs | 3 -- agent/src/communication/mod.rs | 39 +++++++++++++++- dashboard/Cargo.toml | 2 +- shared/Cargo.toml | 2 +- 10 files changed, 71 insertions(+), 94 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index b50be71..32fe7aa 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -207,16 +207,10 @@ Every 1 second: - Recommended: Slow (60-300s): Disk, Systemd - **Independent tasks**: Each collector spawned as separate tokio task in `Agent::new()` - **Cache updates**: Collectors acquire write lock → update → release immediately -- **ZMQ sender**: Dedicated OS thread with own publisher socket, uses `try_read()` to avoid blocking -- **Non-blocking reads**: `try_read()` never blocks - sends previous data if cache is locked -- **Notification check**: Runs every `notifications.check_interval_seconds` in main loop -- **Lock strategy**: Short-lived write locks, non-blocking reads for transmission -- **Stale data**: Acceptable for slow-changing metrics AND when collector holds write lock - -**Threading Model:** -- Main thread: tokio runtime for command handling and notifications -- Collector threads: 7 independent tokio tasks updating shared cache -- ZMQ sender thread: Dedicated OS thread (ZMQ sockets not thread-safe) with lock-free reads +- **ZMQ sender**: Main loop reads cache every `collection_interval_seconds` and broadcasts +- **Notification check**: Runs every `notifications.check_interval_seconds` +- **Lock strategy**: Short-lived write locks prevent blocking, read locks for transmission +- **Stale data**: Acceptable for slow-changing metrics (SMART data, disk usage) **Configuration (NixOS):** All intervals and timeouts configurable in `services/cm-dashboard.nix`: @@ -238,10 +232,9 @@ Command Timeouts (prevent resource leaks from hung commands): - `collectors.network.command_timeout_seconds` (default: 10s) - ip route, ip addr **Code Locations:** -- agent/src/agent.rs:69-144 - Collector task spawning with configured intervals -- agent/src/agent.rs:162-190 - Independent collector task runner -- agent/src/agent.rs:202-249 - ZMQ sender in dedicated OS thread with try_read() -- agent/src/agent.rs:251-264 - Main loop (commands and notifications only) +- agent/src/agent.rs:59-133 - Collector task spawning +- agent/src/agent.rs:151-179 - Independent collector task runner +- agent/src/agent.rs:199-207 - ZMQ sender in main loop ### Maintenance Mode diff --git a/Cargo.lock b/Cargo.lock index 0c683ca..2a78eb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -279,7 +279,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" [[package]] name = "cm-dashboard" -version = "0.1.194" +version = "0.1.192" dependencies = [ "anyhow", "chrono", @@ -301,7 +301,7 @@ dependencies = [ [[package]] name = "cm-dashboard-agent" -version = "0.1.194" +version = "0.1.192" dependencies = [ "anyhow", "async-trait", @@ -324,7 +324,7 @@ dependencies = [ [[package]] name = "cm-dashboard-shared" -version = "0.1.194" +version = "0.1.192" dependencies = [ "chrono", "serde", diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 86a4a38..dcdf453 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cm-dashboard-agent" -version = "0.1.195" +version = "0.1.193" edition = "2021" [dependencies] diff --git a/agent/src/agent.rs b/agent/src/agent.rs index 3e1d5dd..900c7a2 100644 --- a/agent/src/agent.rs +++ b/agent/src/agent.rs @@ -193,73 +193,31 @@ impl Agent { pub async fn run(&mut self, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>) -> Result<()> { info!("Starting agent main loop with cached collector architecture"); - // Spawn independent ZMQ sender task - // Create dedicated ZMQ publisher for the sender task - let cache_clone = self.cache.clone(); - let publisher_config = self.config.zmq.clone(); - let transmission_interval_secs = self.config.collection_interval_seconds; - - std::thread::spawn(move || { - // Create ZMQ publisher in this thread (ZMQ sockets are not thread-safe) - let context = zmq::Context::new(); - let publisher = context.socket(zmq::SocketType::PUB).unwrap(); - let bind_address = format!("tcp://{}:{}", publisher_config.bind_address, publisher_config.publisher_port); - publisher.bind(&bind_address).unwrap(); - publisher.set_sndhwm(1000).unwrap(); - publisher.set_linger(1000).unwrap(); - info!("ZMQ sender task started on {} (interval: {}s)", bind_address, transmission_interval_secs); - - let mut last_sent_data: Option = None; - let interval_duration = std::time::Duration::from_secs(transmission_interval_secs); - let mut next_send = std::time::Instant::now() + interval_duration; - - loop { - // Sleep until next send time - std::thread::sleep(next_send.saturating_duration_since(std::time::Instant::now())); - next_send = std::time::Instant::now() + interval_duration; - - // Try to read cache without blocking - if locked, send last known data - let data_to_send = match cache_clone.try_read() { - Ok(agent_data) => { - let data_clone = agent_data.clone(); - drop(agent_data); // Release lock immediately - last_sent_data = Some(data_clone.clone()); - Some(data_clone) - } - Err(_) => { - // Lock is held by collector - use last sent data - debug!("Cache locked by collector, sending previous data"); - last_sent_data.clone() - } - }; - - if let Some(data) = data_to_send { - // Publish via ZMQ - if let Ok(envelope) = cm_dashboard_shared::MessageEnvelope::agent_data(data) { - if let Ok(serialized) = serde_json::to_vec(&envelope) { - if let Err(e) = publisher.send(&serialized, 0) { - error!("Failed to send ZMQ message: {}", e); - } else { - debug!("Successfully broadcast agent data"); - } - } - } - } - } - }); - - // Set up intervals for notifications and commands + // Set up intervals from config + let mut transmission_interval = interval(Duration::from_secs( + self.config.collection_interval_seconds, + )); let mut notification_interval = interval(Duration::from_secs( self.config.notifications.check_interval_seconds, )); let mut command_interval = interval(Duration::from_millis(100)); // Skip initial ticks + transmission_interval.tick().await; notification_interval.tick().await; command_interval.tick().await; loop { tokio::select! { + _ = transmission_interval.tick() => { + // Read current cache state and broadcast via ZMQ + let agent_data = self.cache.read().await.clone(); + if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data).await { + error!("Failed to broadcast agent data: {}", e); + } else { + debug!("Successfully broadcast agent data"); + } + } _ = notification_interval.tick() => { // Read cache and check for status changes let agent_data = self.cache.read().await.clone(); @@ -371,9 +329,12 @@ impl Agent { match command { AgentCommand::CollectNow => { info!("Received immediate transmission request"); - // With cached architecture and dedicated ZMQ sender thread, - // data is already being sent every interval - // This command is acknowledged but not actionable in new architecture + // With cached architecture, collectors run independently + // Just send current cache state immediately + let agent_data = self.cache.read().await.clone(); + if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data).await { + error!("Failed to broadcast on demand: {}", e); + } } AgentCommand::SetInterval { seconds } => { info!("Received interval change request: {}s", seconds); diff --git a/agent/src/collectors/disk.rs b/agent/src/collectors/disk.rs index dfbd5fa..588bec8 100644 --- a/agent/src/collectors/disk.rs +++ b/agent/src/collectors/disk.rs @@ -530,9 +530,6 @@ impl DiskCollector { /// Populate drives data into AgentData fn populate_drives_data(&self, physical_drives: &[PhysicalDrive], smart_data: &HashMap, agent_data: &mut AgentData) -> Result<(), CollectorError> { - // Clear existing drives data to prevent duplicates in cached architecture - agent_data.system.storage.drives.clear(); - for drive in physical_drives { let smart = smart_data.get(&drive.name); @@ -570,9 +567,6 @@ impl DiskCollector { /// Populate pools data into AgentData fn populate_pools_data(&self, mergerfs_pools: &[MergerfsPool], smart_data: &HashMap, agent_data: &mut AgentData) -> Result<(), CollectorError> { - // Clear existing pools data to prevent duplicates in cached architecture - agent_data.system.storage.pools.clear(); - for pool in mergerfs_pools { // Calculate pool health and statuses based on member drive health let (pool_health, health_status, usage_status, data_drive_data, parity_drive_data) = self.calculate_pool_health(pool, smart_data); diff --git a/agent/src/collectors/memory.rs b/agent/src/collectors/memory.rs index 9151ee2..e186704 100644 --- a/agent/src/collectors/memory.rs +++ b/agent/src/collectors/memory.rs @@ -97,12 +97,9 @@ impl MemoryCollector { /// Populate tmpfs data into AgentData async fn populate_tmpfs_data(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> { - // Clear existing tmpfs data to prevent duplicates in cached architecture - agent_data.system.memory.tmpfs.clear(); - // Discover all tmpfs mount points let tmpfs_mounts = self.discover_tmpfs_mounts()?; - + if tmpfs_mounts.is_empty() { debug!("No tmpfs mounts found to monitor"); return Ok(()); diff --git a/agent/src/collectors/systemd.rs b/agent/src/collectors/systemd.rs index 4e17cf4..9ba8663 100644 --- a/agent/src/collectors/systemd.rs +++ b/agent/src/collectors/systemd.rs @@ -915,9 +915,6 @@ impl SystemdCollector { #[async_trait] impl Collector for SystemdCollector { async fn collect_structured(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> { - // Clear existing services data to prevent duplicates in cached architecture - agent_data.services.clear(); - // Use cached complete data if available and fresh if let Some(cached_complete_services) = self.get_cached_complete_services() { for service_data in cached_complete_services { diff --git a/agent/src/communication/mod.rs b/agent/src/communication/mod.rs index 591f09f..c364f7c 100644 --- a/agent/src/communication/mod.rs +++ b/agent/src/communication/mod.rs @@ -1,12 +1,13 @@ use anyhow::Result; +use cm_dashboard_shared::{AgentData, MessageEnvelope}; use tracing::{debug, info}; use zmq::{Context, Socket, SocketType}; use crate::config::ZmqConfig; -/// ZMQ communication handler for receiving commands -/// NOTE: Publishing is handled by dedicated thread in Agent::run() +/// ZMQ communication handler for publishing metrics and receiving commands pub struct ZmqHandler { + publisher: Socket, command_receiver: Socket, } @@ -14,6 +15,17 @@ impl ZmqHandler { pub async fn new(config: &ZmqConfig) -> Result { let context = Context::new(); + // Create publisher socket for metrics + let publisher = context.socket(SocketType::PUB)?; + let pub_bind_address = format!("tcp://{}:{}", config.bind_address, config.publisher_port); + publisher.bind(&pub_bind_address)?; + + info!("ZMQ publisher bound to {}", pub_bind_address); + + // Set socket options for efficiency + publisher.set_sndhwm(1000)?; // High water mark for outbound messages + publisher.set_linger(1000)?; // Linger time on close + // Create command receiver socket (PULL socket to receive commands from dashboard) let command_receiver = context.socket(SocketType::PULL)?; let cmd_bind_address = format!("tcp://{}:{}", config.bind_address, config.command_port); @@ -26,10 +38,33 @@ impl ZmqHandler { command_receiver.set_linger(1000)?; Ok(Self { + publisher, command_receiver, }) } + + /// Publish agent data via ZMQ + pub async fn publish_agent_data(&self, data: &AgentData) -> Result<()> { + debug!( + "Publishing agent data for host {}", + data.hostname + ); + + // Create message envelope for agent data + let envelope = MessageEnvelope::agent_data(data.clone()) + .map_err(|e| anyhow::anyhow!("Failed to create agent data envelope: {}", e))?; + + // Serialize envelope + let serialized = serde_json::to_vec(&envelope)?; + + // Send via ZMQ + self.publisher.send(&serialized, 0)?; + + debug!("Published agent data message ({} bytes)", serialized.len()); + Ok(()) + } + /// Try to receive a command (non-blocking) pub fn try_receive_command(&self) -> Result> { match self.command_receiver.recv_bytes(zmq::DONTWAIT) { diff --git a/dashboard/Cargo.toml b/dashboard/Cargo.toml index a7da6d6..ce8d404 100644 --- a/dashboard/Cargo.toml +++ b/dashboard/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cm-dashboard" -version = "0.1.195" +version = "0.1.193" edition = "2021" [dependencies] diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 9dd1a6d..3949e8c 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cm-dashboard-shared" -version = "0.1.195" +version = "0.1.193" edition = "2021" [dependencies]