Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a2519b2814 | |||
| 91f037aa3e |
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -270,7 +270,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
|
||||
|
||||
[[package]]
|
||||
name = "cm-dashboard"
|
||||
version = "0.1.18"
|
||||
version = "0.1.19"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
@@ -291,7 +291,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cm-dashboard-agent"
|
||||
version = "0.1.18"
|
||||
version = "0.1.19"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
@@ -314,7 +314,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cm-dashboard-shared"
|
||||
version = "0.1.18"
|
||||
version = "0.1.19"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"serde",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "cm-dashboard-agent"
|
||||
version = "0.1.18"
|
||||
version = "0.1.20"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -75,7 +75,6 @@ impl Agent {
|
||||
let mut collection_interval =
|
||||
interval(Duration::from_secs(self.config.collection_interval_seconds));
|
||||
let mut transmission_interval = interval(Duration::from_secs(self.config.zmq.transmission_interval_seconds));
|
||||
let mut notification_interval = interval(Duration::from_secs(self.config.status_aggregation.notification_interval_seconds));
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
@@ -86,13 +85,12 @@ impl Agent {
|
||||
}
|
||||
}
|
||||
_ = transmission_interval.tick() => {
|
||||
// Send all metrics via ZMQ every 1 second
|
||||
// Send all metrics via ZMQ and process notifications immediately
|
||||
if let Err(e) = self.broadcast_all_metrics().await {
|
||||
error!("Failed to broadcast metrics: {}", e);
|
||||
}
|
||||
}
|
||||
_ = notification_interval.tick() => {
|
||||
// Process batched notifications
|
||||
|
||||
// Process notifications immediately with each transmission
|
||||
if let Err(e) = self.host_status_manager.process_pending_notifications(&mut self.notification_manager).await {
|
||||
error!("Failed to process pending notifications: {}", e);
|
||||
}
|
||||
@@ -127,8 +125,8 @@ impl Agent {
|
||||
|
||||
info!("Force collected and cached {} metrics", metrics.len());
|
||||
|
||||
// Process metrics through status manager
|
||||
self.process_metrics(&metrics).await;
|
||||
// Process metrics through status manager (collect status data at startup)
|
||||
let _status_changed = self.process_metrics(&metrics).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -146,8 +144,15 @@ impl Agent {
|
||||
|
||||
debug!("Collected and cached {} metrics", metrics.len());
|
||||
|
||||
// Process metrics through status manager
|
||||
self.process_metrics(&metrics).await;
|
||||
// Process metrics through status manager and trigger immediate transmission if status changed
|
||||
let status_changed = self.process_metrics(&metrics).await;
|
||||
|
||||
if status_changed {
|
||||
info!("Status change detected - triggering immediate metric transmission");
|
||||
if let Err(e) = self.broadcast_all_metrics().await {
|
||||
error!("Failed to broadcast metrics after status change: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -181,10 +186,14 @@ impl Agent {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn process_metrics(&mut self, metrics: &[Metric]) {
|
||||
async fn process_metrics(&mut self, metrics: &[Metric]) -> bool {
|
||||
let mut status_changed = false;
|
||||
for metric in metrics {
|
||||
self.host_status_manager.process_metric(metric, &mut self.notification_manager).await;
|
||||
if self.host_status_manager.process_metric(metric, &mut self.notification_manager).await {
|
||||
status_changed = true;
|
||||
}
|
||||
}
|
||||
status_changed
|
||||
}
|
||||
|
||||
/// Create agent version metric for cross-host version comparison
|
||||
|
||||
@@ -19,31 +19,6 @@ impl NixOSCollector {
|
||||
Self {}
|
||||
}
|
||||
|
||||
/// Get NixOS build information
|
||||
fn get_nixos_build_info(&self) -> Result<String, Box<dyn std::error::Error>> {
|
||||
// Get nixos-version output directly
|
||||
let output = Command::new("nixos-version").output()?;
|
||||
|
||||
if !output.status.success() {
|
||||
return Err("nixos-version command failed".into());
|
||||
}
|
||||
|
||||
let version_line = String::from_utf8_lossy(&output.stdout);
|
||||
let version = version_line.trim();
|
||||
|
||||
if version.is_empty() {
|
||||
return Err("Empty nixos-version output".into());
|
||||
}
|
||||
|
||||
// Remove codename part (e.g., "(Warbler)")
|
||||
let clean_version = if let Some(pos) = version.find(" (") {
|
||||
version[..pos].to_string()
|
||||
} else {
|
||||
version.to_string()
|
||||
};
|
||||
|
||||
Ok(clean_version)
|
||||
}
|
||||
|
||||
/// Get agent hash from binary path
|
||||
fn get_agent_hash(&self) -> Result<String, Box<dyn std::error::Error>> {
|
||||
|
||||
@@ -9,7 +9,6 @@ use chrono::Utc;
|
||||
pub struct HostStatusConfig {
|
||||
pub enabled: bool,
|
||||
pub aggregation_method: String, // "worst_case"
|
||||
pub notification_interval_seconds: u64,
|
||||
}
|
||||
|
||||
impl Default for HostStatusConfig {
|
||||
@@ -17,7 +16,6 @@ impl Default for HostStatusConfig {
|
||||
Self {
|
||||
enabled: true,
|
||||
aggregation_method: "worst_case".to_string(),
|
||||
notification_interval_seconds: 30,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -160,25 +158,52 @@ impl HostStatusManager {
|
||||
|
||||
|
||||
|
||||
/// Process a metric - updates status (notifications handled separately via batching)
|
||||
pub async fn process_metric(&mut self, metric: &Metric, _notification_manager: &mut crate::notifications::NotificationManager) {
|
||||
// Just update status - notifications are handled by process_pending_notifications
|
||||
self.update_service_status(metric.name.clone(), metric.status);
|
||||
/// Process a metric - updates status and queues for aggregated notifications if status changed
|
||||
pub async fn process_metric(&mut self, metric: &Metric, _notification_manager: &mut crate::notifications::NotificationManager) -> bool {
|
||||
let old_status = self.service_statuses.get(&metric.name).copied();
|
||||
let new_status = metric.status;
|
||||
|
||||
// Update status
|
||||
self.update_service_status(metric.name.clone(), new_status);
|
||||
|
||||
// Check if status actually changed (ignore first-time status setting)
|
||||
if let Some(old_status) = old_status {
|
||||
if old_status != new_status {
|
||||
debug!("Status change detected for {}: {:?} -> {:?}", metric.name, old_status, new_status);
|
||||
|
||||
// Queue change for aggregated notification (not immediate)
|
||||
self.queue_status_change(&metric.name, old_status, new_status);
|
||||
|
||||
return true; // Status changed - caller should trigger immediate transmission
|
||||
}
|
||||
} else {
|
||||
debug!("Initial status set for {}: {:?}", metric.name, new_status);
|
||||
}
|
||||
|
||||
false // No status change (or first-time status)
|
||||
}
|
||||
|
||||
/// Process pending notifications - call this at notification intervals
|
||||
/// Queue status change for aggregated notification
|
||||
fn queue_status_change(&mut self, metric_name: &str, old_status: Status, new_status: Status) {
|
||||
// Add to pending changes for aggregated notification
|
||||
let entry = self.pending_changes.entry(metric_name.to_string()).or_insert((old_status, old_status, 0));
|
||||
entry.1 = new_status; // Update final status
|
||||
entry.2 += 1; // Increment change count
|
||||
|
||||
// Set batch start time if this is the first change
|
||||
if self.batch_start_time.is_none() {
|
||||
self.batch_start_time = Some(Instant::now());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Process pending notifications - legacy method, now rarely used
|
||||
pub async fn process_pending_notifications(&mut self, notification_manager: &mut crate::notifications::NotificationManager) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
if !self.config.enabled || self.pending_changes.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let batch_start = self.batch_start_time.unwrap_or_else(Instant::now);
|
||||
let batch_duration = batch_start.elapsed();
|
||||
|
||||
// Only process if enough time has passed
|
||||
if batch_duration.as_secs() < self.config.notification_interval_seconds {
|
||||
return Ok(());
|
||||
}
|
||||
// Process notifications immediately without interval batching
|
||||
|
||||
// Create aggregated status changes
|
||||
let aggregated = self.create_aggregated_changes();
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "cm-dashboard"
|
||||
version = "0.1.18"
|
||||
version = "0.1.20"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -14,7 +14,7 @@ use app::Dashboard;
|
||||
|
||||
/// Get hardcoded version
|
||||
fn get_version() -> &'static str {
|
||||
"v0.1.18"
|
||||
"v0.1.20"
|
||||
}
|
||||
|
||||
/// Check if running inside tmux session
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "cm-dashboard-shared"
|
||||
version = "0.1.18"
|
||||
version = "0.1.20"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
|
||||
Reference in New Issue
Block a user