Update to v0.1.19 with event-driven status aggregation
All checks were successful
Build and Release / build-and-release (push) Successful in 2m4s

Major architectural improvements:

CORE CHANGES:
- Remove notification_interval_seconds - status aggregation now immediate
- Status calculation moved to collection phase instead of transmission
- Event-driven transmission triggers immediately on status changes
- Dual transmission strategy: immediate on change + periodic backup
- Real-time notifications without batching delays

TECHNICAL IMPROVEMENTS:
- process_metric() now returns bool indicating status change
- Immediate ZMQ broadcast when status changes detected
- Status aggregation happens during metric collection, not later
- Legacy get_nixos_build_info() method removed (unused)
- All compilation warnings fixed

BEHAVIOR CHANGES:
- Critical alerts sent instantly instead of waiting for intervals
- Dashboard receives real-time status updates
- Notifications triggered immediately on status transitions
- Backup periodic transmission every 1s ensures heartbeat

This provides much more responsive monitoring with instant alerting
while maintaining the reliability of periodic transmission as backup.
This commit is contained in:
2025-10-28 10:36:34 +01:00
parent 627c533724
commit 91f037aa3e
8 changed files with 83 additions and 57 deletions

View File

@@ -75,7 +75,6 @@ impl Agent {
let mut collection_interval =
interval(Duration::from_secs(self.config.collection_interval_seconds));
let mut transmission_interval = interval(Duration::from_secs(self.config.zmq.transmission_interval_seconds));
let mut notification_interval = interval(Duration::from_secs(self.config.status_aggregation.notification_interval_seconds));
loop {
tokio::select! {
@@ -86,13 +85,12 @@ impl Agent {
}
}
_ = transmission_interval.tick() => {
// Send all metrics via ZMQ every 1 second
// Send all metrics via ZMQ and process notifications immediately
if let Err(e) = self.broadcast_all_metrics().await {
error!("Failed to broadcast metrics: {}", e);
}
}
_ = notification_interval.tick() => {
// Process batched notifications
// Process notifications immediately with each transmission
if let Err(e) = self.host_status_manager.process_pending_notifications(&mut self.notification_manager).await {
error!("Failed to process pending notifications: {}", e);
}
@@ -127,8 +125,8 @@ impl Agent {
info!("Force collected and cached {} metrics", metrics.len());
// Process metrics through status manager
self.process_metrics(&metrics).await;
// Process metrics through status manager (don't trigger transmission on startup)
let _status_changed = self.process_metrics(&metrics).await;
Ok(())
}
@@ -146,8 +144,15 @@ impl Agent {
debug!("Collected and cached {} metrics", metrics.len());
// Process metrics through status manager
self.process_metrics(&metrics).await;
// Process metrics through status manager and trigger immediate transmission if status changed
let status_changed = self.process_metrics(&metrics).await;
if status_changed {
info!("Status change detected - triggering immediate metric transmission");
if let Err(e) = self.broadcast_all_metrics().await {
error!("Failed to broadcast metrics after status change: {}", e);
}
}
Ok(())
}
@@ -181,10 +186,14 @@ impl Agent {
Ok(())
}
async fn process_metrics(&mut self, metrics: &[Metric]) {
async fn process_metrics(&mut self, metrics: &[Metric]) -> bool {
let mut status_changed = false;
for metric in metrics {
self.host_status_manager.process_metric(metric, &mut self.notification_manager).await;
if self.host_status_manager.process_metric(metric, &mut self.notification_manager).await {
status_changed = true;
}
}
status_changed
}
/// Create agent version metric for cross-host version comparison