Revert "Fix data duplication in cached collector architecture"

This reverts commit 14618c59c61b2f8d731697f01b8388ace825a809.
This commit is contained in:
Christoffer Martinsson 2025-11-27 23:09:40 +01:00
parent 43dd5a901a
commit f09ccabc7f
10 changed files with 71 additions and 94 deletions

View File

@ -207,16 +207,10 @@ Every 1 second:
- Recommended: Slow (60-300s): Disk, Systemd
- **Independent tasks**: Each collector spawned as separate tokio task in `Agent::new()`
- **Cache updates**: Collectors acquire write lock → update → release immediately
- **ZMQ sender**: Dedicated OS thread with own publisher socket, uses `try_read()` to avoid blocking
- **Non-blocking reads**: `try_read()` never blocks - sends previous data if cache is locked
- **Notification check**: Runs every `notifications.check_interval_seconds` in main loop
- **Lock strategy**: Short-lived write locks, non-blocking reads for transmission
- **Stale data**: Acceptable for slow-changing metrics AND when collector holds write lock
**Threading Model:**
- Main thread: tokio runtime for command handling and notifications
- Collector threads: 7 independent tokio tasks updating shared cache
- ZMQ sender thread: Dedicated OS thread (ZMQ sockets not thread-safe) with lock-free reads
- **ZMQ sender**: Main loop reads cache every `collection_interval_seconds` and broadcasts
- **Notification check**: Runs every `notifications.check_interval_seconds`
- **Lock strategy**: Short-lived write locks prevent blocking, read locks for transmission
- **Stale data**: Acceptable for slow-changing metrics (SMART data, disk usage)
**Configuration (NixOS):**
All intervals and timeouts configurable in `services/cm-dashboard.nix`:
@ -238,10 +232,9 @@ Command Timeouts (prevent resource leaks from hung commands):
- `collectors.network.command_timeout_seconds` (default: 10s) - ip route, ip addr
**Code Locations:**
- agent/src/agent.rs:69-144 - Collector task spawning with configured intervals
- agent/src/agent.rs:162-190 - Independent collector task runner
- agent/src/agent.rs:202-249 - ZMQ sender in dedicated OS thread with try_read()
- agent/src/agent.rs:251-264 - Main loop (commands and notifications only)
- agent/src/agent.rs:59-133 - Collector task spawning
- agent/src/agent.rs:151-179 - Independent collector task runner
- agent/src/agent.rs:199-207 - ZMQ sender in main loop
### Maintenance Mode

6
Cargo.lock generated
View File

@ -279,7 +279,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
[[package]]
name = "cm-dashboard"
version = "0.1.194"
version = "0.1.192"
dependencies = [
"anyhow",
"chrono",
@ -301,7 +301,7 @@ dependencies = [
[[package]]
name = "cm-dashboard-agent"
version = "0.1.194"
version = "0.1.192"
dependencies = [
"anyhow",
"async-trait",
@ -324,7 +324,7 @@ dependencies = [
[[package]]
name = "cm-dashboard-shared"
version = "0.1.194"
version = "0.1.192"
dependencies = [
"chrono",
"serde",

View File

@ -1,6 +1,6 @@
[package]
name = "cm-dashboard-agent"
version = "0.1.195"
version = "0.1.193"
edition = "2021"
[dependencies]

View File

@ -193,73 +193,31 @@ impl Agent {
pub async fn run(&mut self, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>) -> Result<()> {
info!("Starting agent main loop with cached collector architecture");
// Spawn independent ZMQ sender task
// Create dedicated ZMQ publisher for the sender task
let cache_clone = self.cache.clone();
let publisher_config = self.config.zmq.clone();
let transmission_interval_secs = self.config.collection_interval_seconds;
std::thread::spawn(move || {
// Create ZMQ publisher in this thread (ZMQ sockets are not thread-safe)
let context = zmq::Context::new();
let publisher = context.socket(zmq::SocketType::PUB).unwrap();
let bind_address = format!("tcp://{}:{}", publisher_config.bind_address, publisher_config.publisher_port);
publisher.bind(&bind_address).unwrap();
publisher.set_sndhwm(1000).unwrap();
publisher.set_linger(1000).unwrap();
info!("ZMQ sender task started on {} (interval: {}s)", bind_address, transmission_interval_secs);
let mut last_sent_data: Option<AgentData> = None;
let interval_duration = std::time::Duration::from_secs(transmission_interval_secs);
let mut next_send = std::time::Instant::now() + interval_duration;
loop {
// Sleep until next send time
std::thread::sleep(next_send.saturating_duration_since(std::time::Instant::now()));
next_send = std::time::Instant::now() + interval_duration;
// Try to read cache without blocking - if locked, send last known data
let data_to_send = match cache_clone.try_read() {
Ok(agent_data) => {
let data_clone = agent_data.clone();
drop(agent_data); // Release lock immediately
last_sent_data = Some(data_clone.clone());
Some(data_clone)
}
Err(_) => {
// Lock is held by collector - use last sent data
debug!("Cache locked by collector, sending previous data");
last_sent_data.clone()
}
};
if let Some(data) = data_to_send {
// Publish via ZMQ
if let Ok(envelope) = cm_dashboard_shared::MessageEnvelope::agent_data(data) {
if let Ok(serialized) = serde_json::to_vec(&envelope) {
if let Err(e) = publisher.send(&serialized, 0) {
error!("Failed to send ZMQ message: {}", e);
} else {
debug!("Successfully broadcast agent data");
}
}
}
}
}
});
// Set up intervals for notifications and commands
// Set up intervals from config
let mut transmission_interval = interval(Duration::from_secs(
self.config.collection_interval_seconds,
));
let mut notification_interval = interval(Duration::from_secs(
self.config.notifications.check_interval_seconds,
));
let mut command_interval = interval(Duration::from_millis(100));
// Skip initial ticks
transmission_interval.tick().await;
notification_interval.tick().await;
command_interval.tick().await;
loop {
tokio::select! {
_ = transmission_interval.tick() => {
// Read current cache state and broadcast via ZMQ
let agent_data = self.cache.read().await.clone();
if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data).await {
error!("Failed to broadcast agent data: {}", e);
} else {
debug!("Successfully broadcast agent data");
}
}
_ = notification_interval.tick() => {
// Read cache and check for status changes
let agent_data = self.cache.read().await.clone();
@ -371,9 +329,12 @@ impl Agent {
match command {
AgentCommand::CollectNow => {
info!("Received immediate transmission request");
// With cached architecture and dedicated ZMQ sender thread,
// data is already being sent every interval
// This command is acknowledged but not actionable in new architecture
// With cached architecture, collectors run independently
// Just send current cache state immediately
let agent_data = self.cache.read().await.clone();
if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data).await {
error!("Failed to broadcast on demand: {}", e);
}
}
AgentCommand::SetInterval { seconds } => {
info!("Received interval change request: {}s", seconds);

View File

@ -530,9 +530,6 @@ impl DiskCollector {
/// Populate drives data into AgentData
fn populate_drives_data(&self, physical_drives: &[PhysicalDrive], smart_data: &HashMap<String, SmartData>, agent_data: &mut AgentData) -> Result<(), CollectorError> {
// Clear existing drives data to prevent duplicates in cached architecture
agent_data.system.storage.drives.clear();
for drive in physical_drives {
let smart = smart_data.get(&drive.name);
@ -570,9 +567,6 @@ impl DiskCollector {
/// Populate pools data into AgentData
fn populate_pools_data(&self, mergerfs_pools: &[MergerfsPool], smart_data: &HashMap<String, SmartData>, agent_data: &mut AgentData) -> Result<(), CollectorError> {
// Clear existing pools data to prevent duplicates in cached architecture
agent_data.system.storage.pools.clear();
for pool in mergerfs_pools {
// Calculate pool health and statuses based on member drive health
let (pool_health, health_status, usage_status, data_drive_data, parity_drive_data) = self.calculate_pool_health(pool, smart_data);

View File

@ -97,12 +97,9 @@ impl MemoryCollector {
/// Populate tmpfs data into AgentData
async fn populate_tmpfs_data(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> {
// Clear existing tmpfs data to prevent duplicates in cached architecture
agent_data.system.memory.tmpfs.clear();
// Discover all tmpfs mount points
let tmpfs_mounts = self.discover_tmpfs_mounts()?;
if tmpfs_mounts.is_empty() {
debug!("No tmpfs mounts found to monitor");
return Ok(());

View File

@ -915,9 +915,6 @@ impl SystemdCollector {
#[async_trait]
impl Collector for SystemdCollector {
async fn collect_structured(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> {
// Clear existing services data to prevent duplicates in cached architecture
agent_data.services.clear();
// Use cached complete data if available and fresh
if let Some(cached_complete_services) = self.get_cached_complete_services() {
for service_data in cached_complete_services {

View File

@ -1,12 +1,13 @@
use anyhow::Result;
use cm_dashboard_shared::{AgentData, MessageEnvelope};
use tracing::{debug, info};
use zmq::{Context, Socket, SocketType};
use crate::config::ZmqConfig;
/// ZMQ communication handler for receiving commands
/// NOTE: Publishing is handled by dedicated thread in Agent::run()
/// ZMQ communication handler for publishing metrics and receiving commands
pub struct ZmqHandler {
publisher: Socket,
command_receiver: Socket,
}
@ -14,6 +15,17 @@ impl ZmqHandler {
pub async fn new(config: &ZmqConfig) -> Result<Self> {
let context = Context::new();
// Create publisher socket for metrics
let publisher = context.socket(SocketType::PUB)?;
let pub_bind_address = format!("tcp://{}:{}", config.bind_address, config.publisher_port);
publisher.bind(&pub_bind_address)?;
info!("ZMQ publisher bound to {}", pub_bind_address);
// Set socket options for efficiency
publisher.set_sndhwm(1000)?; // High water mark for outbound messages
publisher.set_linger(1000)?; // Linger time on close
// Create command receiver socket (PULL socket to receive commands from dashboard)
let command_receiver = context.socket(SocketType::PULL)?;
let cmd_bind_address = format!("tcp://{}:{}", config.bind_address, config.command_port);
@ -26,10 +38,33 @@ impl ZmqHandler {
command_receiver.set_linger(1000)?;
Ok(Self {
publisher,
command_receiver,
})
}
/// Publish agent data via ZMQ
pub async fn publish_agent_data(&self, data: &AgentData) -> Result<()> {
debug!(
"Publishing agent data for host {}",
data.hostname
);
// Create message envelope for agent data
let envelope = MessageEnvelope::agent_data(data.clone())
.map_err(|e| anyhow::anyhow!("Failed to create agent data envelope: {}", e))?;
// Serialize envelope
let serialized = serde_json::to_vec(&envelope)?;
// Send via ZMQ
self.publisher.send(&serialized, 0)?;
debug!("Published agent data message ({} bytes)", serialized.len());
Ok(())
}
/// Try to receive a command (non-blocking)
pub fn try_receive_command(&self) -> Result<Option<AgentCommand>> {
match self.command_receiver.recv_bytes(zmq::DONTWAIT) {

View File

@ -1,6 +1,6 @@
[package]
name = "cm-dashboard"
version = "0.1.195"
version = "0.1.193"
edition = "2021"
[dependencies]

View File

@ -1,6 +1,6 @@
[package]
name = "cm-dashboard-shared"
version = "0.1.195"
version = "0.1.193"
edition = "2021"
[dependencies]