Implement hysteresis for metric status changes to prevent flapping
Add comprehensive hysteresis support to prevent status oscillation near threshold boundaries while maintaining responsive alerting. Key Features: - HysteresisThresholds with configurable upper/lower limits - StatusTracker for per-metric status history - Default gaps: CPU load 10%, memory 5%, disk temp 5°C Updated Components: - CPU load collector (5-minute average with hysteresis) - Memory usage collector (percentage-based thresholds) - Disk temperature collector (SMART data monitoring) - All collectors updated to support StatusTracker interface Cache Interval Adjustments: - Service status: 60s → 10s (faster response) - Disk usage: 300s → 60s (more frequent checks) - Backup status: 900s → 60s (quicker updates) - SMART data: moved to 600s tier (10 minutes) Architecture: - Individual metric status calculation in collectors - Centralized StatusTracker in MetricCollectionManager - Status aggregation preserved in dashboard widgets
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
use anyhow::Result;
|
||||
use cm_dashboard_shared::{MetricMessage, MessageEnvelope};
|
||||
use tracing::{info, debug};
|
||||
use cm_dashboard_shared::{MessageEnvelope, MetricMessage};
|
||||
use tracing::{debug, info};
|
||||
use zmq::{Context, Socket, SocketType};
|
||||
|
||||
use crate::config::ZmqConfig;
|
||||
@@ -15,75 +15,69 @@ pub struct ZmqHandler {
|
||||
impl ZmqHandler {
|
||||
pub async fn new(config: &ZmqConfig) -> Result<Self> {
|
||||
let context = Context::new();
|
||||
|
||||
|
||||
// Create publisher socket for metrics
|
||||
let publisher = context.socket(SocketType::PUB)?;
|
||||
let pub_bind_address = format!("tcp://{}:{}", config.bind_address, config.publisher_port);
|
||||
publisher.bind(&pub_bind_address)?;
|
||||
|
||||
|
||||
info!("ZMQ publisher bound to {}", pub_bind_address);
|
||||
|
||||
|
||||
// Set socket options for efficiency
|
||||
publisher.set_sndhwm(1000)?; // High water mark for outbound messages
|
||||
publisher.set_linger(1000)?; // Linger time on close
|
||||
|
||||
|
||||
// Create command receiver socket (PULL socket to receive commands from dashboard)
|
||||
let command_receiver = context.socket(SocketType::PULL)?;
|
||||
let cmd_bind_address = format!("tcp://{}:{}", config.bind_address, config.command_port);
|
||||
command_receiver.bind(&cmd_bind_address)?;
|
||||
|
||||
|
||||
info!("ZMQ command receiver bound to {}", cmd_bind_address);
|
||||
|
||||
|
||||
// Set non-blocking mode for command receiver
|
||||
command_receiver.set_rcvtimeo(0)?; // Non-blocking receive
|
||||
command_receiver.set_linger(1000)?;
|
||||
|
||||
|
||||
Ok(Self {
|
||||
publisher,
|
||||
command_receiver,
|
||||
config: config.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
/// Publish metrics message via ZMQ
|
||||
pub async fn publish_metrics(&self, message: &MetricMessage) -> Result<()> {
|
||||
debug!("Publishing {} metrics for host {}", message.metrics.len(), message.hostname);
|
||||
|
||||
debug!(
|
||||
"Publishing {} metrics for host {}",
|
||||
message.metrics.len(),
|
||||
message.hostname
|
||||
);
|
||||
|
||||
// Create message envelope
|
||||
let envelope = MessageEnvelope::metrics(message.clone())
|
||||
.map_err(|e| anyhow::anyhow!("Failed to create message envelope: {}", e))?;
|
||||
|
||||
|
||||
// Serialize envelope
|
||||
let serialized = serde_json::to_vec(&envelope)?;
|
||||
|
||||
|
||||
// Send via ZMQ
|
||||
self.publisher.send(&serialized, 0)?;
|
||||
|
||||
|
||||
debug!("Published metrics message ({} bytes)", serialized.len());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
/// Send heartbeat (placeholder for future use)
|
||||
pub async fn send_heartbeat(&self) -> Result<()> {
|
||||
let envelope = MessageEnvelope::heartbeat()
|
||||
.map_err(|e| anyhow::anyhow!("Failed to create heartbeat envelope: {}", e))?;
|
||||
|
||||
let serialized = serde_json::to_vec(&envelope)?;
|
||||
self.publisher.send(&serialized, 0)?;
|
||||
|
||||
debug!("Sent heartbeat");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
/// Try to receive a command (non-blocking)
|
||||
pub fn try_receive_command(&self) -> Result<Option<AgentCommand>> {
|
||||
match self.command_receiver.recv_bytes(zmq::DONTWAIT) {
|
||||
Ok(bytes) => {
|
||||
debug!("Received command message ({} bytes)", bytes.len());
|
||||
|
||||
|
||||
let command: AgentCommand = serde_json::from_slice(&bytes)
|
||||
.map_err(|e| anyhow::anyhow!("Failed to deserialize command: {}", e))?;
|
||||
|
||||
|
||||
debug!("Parsed command: {:?}", command);
|
||||
Ok(Some(command))
|
||||
}
|
||||
@@ -107,4 +101,4 @@ pub enum AgentCommand {
|
||||
ToggleCollector { name: String, enabled: bool },
|
||||
/// Request status/health check
|
||||
Ping,
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user