Fix critical ZMQ broadcast issue in smart agent
Root cause: Smart agent only sent data when tier intervals triggered: - System (5s): sent data frequently ✓ - Services (5min): sent data only every 5 minutes ✗ - SMART (15min): sent data only every 15 minutes ✗ Dashboard needs continuous data flow every ~5 seconds. Solution: Add broadcast_all_data() method that sends all available cached data every 5 seconds, separate from collection intervals. This ensures dashboard receives all collector data continuously while maintaining smart caching benefits (reduced CPU from tier-based collection). Expected result: All widgets (System/Services/SMART/Backup) should populate immediately after agent restart and stay updated.
This commit is contained in:
parent
996b89aa47
commit
244cade7d8
@ -151,6 +151,9 @@ impl SmartAgent {
|
||||
let mut slow_interval = interval(CacheTier::Slow.interval());
|
||||
let mut static_interval = interval(CacheTier::Static.interval());
|
||||
|
||||
// Regular broadcast interval - send all available data every 5 seconds
|
||||
let mut broadcast_interval = interval(Duration::from_secs(5));
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = realtime_interval.tick() => {
|
||||
@ -168,6 +171,9 @@ impl SmartAgent {
|
||||
_ = static_interval.tick() => {
|
||||
self.collect_tier(CacheTier::Static).await;
|
||||
}
|
||||
_ = broadcast_interval.tick() => {
|
||||
self.broadcast_all_data().await;
|
||||
}
|
||||
_ = cache_cleanup_interval.tick() => {
|
||||
self.cache.cleanup().await;
|
||||
}
|
||||
@ -266,6 +272,31 @@ impl SmartAgent {
|
||||
}
|
||||
}
|
||||
|
||||
/// Broadcast all available data (fresh or cached) every 5 seconds for dashboard responsiveness
|
||||
async fn broadcast_all_data(&self) {
|
||||
let start = std::time::Instant::now();
|
||||
let mut sent = 0;
|
||||
|
||||
// Send latest data for all collectors (from cache or fresh collection)
|
||||
for collector in &self.cached_collectors {
|
||||
// Try to get cached data first
|
||||
if let Some(cached_output) = self.cache.get(collector.cache_key()).await {
|
||||
if let Err(e) = self.send_metrics(&cached_output.agent_type, &cached_output.data).await {
|
||||
error!("Failed to broadcast cached metrics for {}: {}", collector.name(), e);
|
||||
} else {
|
||||
sent += 1;
|
||||
}
|
||||
} else {
|
||||
// No cached data available - this shouldn't happen after cache warming
|
||||
debug!("No cached data available for {}", collector.name());
|
||||
}
|
||||
}
|
||||
|
||||
if sent > 0 {
|
||||
debug!("Broadcast: sent {} collector updates in {}ms", sent, start.elapsed().as_millis());
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_metrics(&self, agent_type: &AgentType, data: &serde_json::Value) -> anyhow::Result<()> {
|
||||
let message = serde_json::json!({
|
||||
"hostname": self.hostname,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user