diff --git a/CLAUDE.md b/CLAUDE.md index 9894d51..77f739e 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -59,11 +59,85 @@ hostname2 = [ ## Core Architecture Principles -### Individual Metrics Philosophy -- Agent collects individual metrics, dashboard composes widgets -- Each metric collected, transmitted, and stored individually -- Agent calculates status for each metric using thresholds -- Dashboard aggregates individual metric statuses for widget status +### Structured Data Architecture (Planned Migration) +Current system uses string-based metrics with complex parsing. Planning migration to structured JSON data to eliminate fragile string manipulation. + +**Current (String Metrics):** +- Agent sends individual metrics with string names like `disk_nvme0n1_temperature` +- Dashboard parses metric names with underscore counting and string splitting +- Complex and error-prone metric filtering and extraction logic + +**Target (Structured Data):** +```json +{ + "hostname": "cmbox", + "agent_version": "v0.1.130", + "timestamp": 1763926877, + "system": { + "cpu": { + "load_1min": 3.50, + "load_5min": 3.57, + "load_15min": 3.58, + "frequency_mhz": 1500, + "temperature_celsius": 45.2 + }, + "memory": { + "usage_percent": 25.0, + "total_gb": 23.3, + "used_gb": 5.9, + "swap_total_gb": 10.7, + "swap_used_gb": 0.99, + "tmpfs": [ + {"mount": "/tmp", "usage_percent": 15.0, "used_gb": 0.3, "total_gb": 2.0} + ] + }, + "storage": { + "drives": [ + { + "name": "nvme0n1", + "health": "PASSED", + "temperature_celsius": 29.0, + "wear_percent": 1.0, + "filesystems": [ + {"mount": "/", "usage_percent": 24.0, "used_gb": 224.9, "total_gb": 928.2} + ] + } + ], + "pools": [ + { + "name": "srv_media", + "mount": "/srv/media", + "type": "mergerfs", + "health": "healthy", + "usage_percent": 63.0, + "used_gb": 2355.2, + "total_gb": 3686.4, + "data_drives": [ + {"name": "sdb", "temperature_celsius": 24.0} + ], + "parity_drives": [ + {"name": "sdc", "temperature_celsius": 24.0} + ] + } + ] + } + }, + "services": [ + {"name": "sshd", "status": "active", "memory_mb": 4.5, "disk_gb": 0.0} + ], + "backup": { + "status": "completed", + "last_run": 1763920000, + "next_scheduled": 1764006400, + "total_size_gb": 150.5, + "repository_health": "ok" + } +} +``` +- Agent sends structured JSON over ZMQ +- Dashboard accesses data directly: `data.system.storage.drives[0].temperature_celsius` +- Type safety eliminates all parsing bugs + ### Maintenance Mode - Agent checks for `/tmp/cm-maintenance` file before sending notifications @@ -293,12 +367,33 @@ Keep responses concise and focused. Avoid extensive implementation summaries unl - ✅ "Restructure storage widget with improved layout" - ✅ "Update CPU thresholds to production values" +## Planned Architecture Migration + +### Phase 1: Structured Data Types (Shared Crate) +- Create Rust structs matching target JSON structure +- Replace `Metric` enum with typed data structures +- Add serde serialization/deserialization + +### Phase 2: Agent Refactor +- Update collectors to return typed structs instead of `Vec` +- Remove string metric name generation +- Send structured JSON over ZMQ + +### Phase 3: Dashboard Refactor +- Replace metric parsing logic with direct field access +- Remove `extract_pool_name()`, `extract_drive_name()`, underscore counting +- Widgets access `data.system.storage.drives[0].temperature_celsius` + +### Phase 4: Migration & Cleanup +- Support both formats during transition +- Gradual rollout with backward compatibility +- Remove legacy string metric system + ## Implementation Rules -1. **Individual Metrics**: Each metric is collected, transmitted, and stored individually -2. **Agent Status Authority**: Agent calculates status for each metric using thresholds -3. **Dashboard Composition**: Dashboard widgets subscribe to specific metrics by name -4. **Status Aggregation**: Dashboard aggregates individual metric statuses for widget status +1. **Agent Status Authority**: Agent calculates status for each metric using thresholds +2. **Dashboard Composition**: Dashboard widgets subscribe to specific metrics by name +3. **Status Aggregation**: Dashboard aggregates individual metric statuses for widget status **NEVER:** - Copy/paste ANY code from legacy implementations diff --git a/Cargo.lock b/Cargo.lock index b7ab04e..84f8faf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -279,7 +279,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" [[package]] name = "cm-dashboard" -version = "0.1.129" +version = "0.1.130" dependencies = [ "anyhow", "chrono", @@ -301,7 +301,7 @@ dependencies = [ [[package]] name = "cm-dashboard-agent" -version = "0.1.129" +version = "0.1.130" dependencies = [ "anyhow", "async-trait", @@ -324,7 +324,7 @@ dependencies = [ [[package]] name = "cm-dashboard-shared" -version = "0.1.129" +version = "0.1.130" dependencies = [ "chrono", "serde", diff --git a/agent/Cargo.toml b/agent/Cargo.toml index c7df308..7038d96 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cm-dashboard-agent" -version = "0.1.130" +version = "0.1.131" edition = "2021" [dependencies] diff --git a/agent/src/agent.rs b/agent/src/agent.rs index 366a465..862e1e0 100644 --- a/agent/src/agent.rs +++ b/agent/src/agent.rs @@ -10,7 +10,7 @@ use crate::metrics::MetricCollectionManager; use crate::notifications::NotificationManager; use crate::service_tracker::UserStoppedServiceTracker; use crate::status::HostStatusManager; -use cm_dashboard_shared::{Metric, MetricMessage, MetricValue, Status}; +use cm_dashboard_shared::{AgentData, Metric, MetricValue, Status, TmpfsData, DriveData, FilesystemData, ServiceData}; pub struct Agent { hostname: String, @@ -199,16 +199,310 @@ impl Agent { return Ok(()); } - debug!("Broadcasting {} cached metrics (including host status summary)", metrics.len()); + debug!("Broadcasting {} cached metrics as structured data", metrics.len()); - // Create and send message with all current data - let message = MetricMessage::new(self.hostname.clone(), metrics); - self.zmq_handler.publish_metrics(&message).await?; - - debug!("Metrics broadcasted successfully"); + // Convert metrics to structured data and send + let agent_data = self.metrics_to_structured_data(&metrics)?; + self.zmq_handler.publish_agent_data(&agent_data).await?; + + debug!("Structured data broadcasted successfully"); Ok(()) } + /// Convert legacy metrics to structured data format + fn metrics_to_structured_data(&self, metrics: &[Metric]) -> Result { + let mut agent_data = AgentData::new(self.hostname.clone(), self.get_agent_version()); + + // Parse metrics into structured data + for metric in metrics { + self.parse_metric_into_agent_data(&mut agent_data, metric)?; + } + + Ok(agent_data) + } + + /// Parse a single metric into the appropriate structured data field + fn parse_metric_into_agent_data(&self, agent_data: &mut AgentData, metric: &Metric) -> Result<()> { + // CPU metrics + if metric.name == "cpu_load_1min" { + if let Some(value) = metric.value.as_f32() { + agent_data.system.cpu.load_1min = value; + } + } else if metric.name == "cpu_load_5min" { + if let Some(value) = metric.value.as_f32() { + agent_data.system.cpu.load_5min = value; + } + } else if metric.name == "cpu_load_15min" { + if let Some(value) = metric.value.as_f32() { + agent_data.system.cpu.load_15min = value; + } + } else if metric.name == "cpu_frequency_mhz" { + if let Some(value) = metric.value.as_f32() { + agent_data.system.cpu.frequency_mhz = value; + } + } else if metric.name == "cpu_temperature_celsius" { + if let Some(value) = metric.value.as_f32() { + agent_data.system.cpu.temperature_celsius = Some(value); + } + } + // Memory metrics + else if metric.name == "memory_usage_percent" { + if let Some(value) = metric.value.as_f32() { + agent_data.system.memory.usage_percent = value; + } + } else if metric.name == "memory_total_gb" { + if let Some(value) = metric.value.as_f32() { + agent_data.system.memory.total_gb = value; + } + } else if metric.name == "memory_used_gb" { + if let Some(value) = metric.value.as_f32() { + agent_data.system.memory.used_gb = value; + } + } else if metric.name == "memory_available_gb" { + if let Some(value) = metric.value.as_f32() { + agent_data.system.memory.available_gb = value; + } + } else if metric.name == "memory_swap_total_gb" { + if let Some(value) = metric.value.as_f32() { + agent_data.system.memory.swap_total_gb = value; + } + } else if metric.name == "memory_swap_used_gb" { + if let Some(value) = metric.value.as_f32() { + agent_data.system.memory.swap_used_gb = value; + } + } + // Tmpfs metrics + else if metric.name.starts_with("memory_tmp_") { + // For now, create a single /tmp tmpfs entry + if metric.name == "memory_tmp_usage_percent" { + if let Some(value) = metric.value.as_f32() { + if let Some(tmpfs) = agent_data.system.memory.tmpfs.get_mut(0) { + tmpfs.usage_percent = value; + } else { + agent_data.system.memory.tmpfs.push(TmpfsData { + mount: "/tmp".to_string(), + usage_percent: value, + used_gb: 0.0, + total_gb: 0.0, + }); + } + } + } else if metric.name == "memory_tmp_used_gb" { + if let Some(value) = metric.value.as_f32() { + if let Some(tmpfs) = agent_data.system.memory.tmpfs.get_mut(0) { + tmpfs.used_gb = value; + } else { + agent_data.system.memory.tmpfs.push(TmpfsData { + mount: "/tmp".to_string(), + usage_percent: 0.0, + used_gb: value, + total_gb: 0.0, + }); + } + } + } else if metric.name == "memory_tmp_total_gb" { + if let Some(value) = metric.value.as_f32() { + if let Some(tmpfs) = agent_data.system.memory.tmpfs.get_mut(0) { + tmpfs.total_gb = value; + } else { + agent_data.system.memory.tmpfs.push(TmpfsData { + mount: "/tmp".to_string(), + usage_percent: 0.0, + used_gb: 0.0, + total_gb: value, + }); + } + } + } + } + // Storage metrics + else if metric.name.starts_with("disk_") { + if metric.name.contains("_temperature") { + if let Some(drive_name) = self.extract_drive_name(&metric.name) { + if let Some(temp) = metric.value.as_f32() { + self.ensure_drive_exists(agent_data, &drive_name); + if let Some(drive) = agent_data.system.storage.drives.iter_mut().find(|d| d.name == drive_name) { + drive.temperature_celsius = Some(temp); + } + } + } + } else if metric.name.contains("_wear_percent") { + if let Some(drive_name) = self.extract_drive_name(&metric.name) { + if let Some(wear) = metric.value.as_f32() { + self.ensure_drive_exists(agent_data, &drive_name); + if let Some(drive) = agent_data.system.storage.drives.iter_mut().find(|d| d.name == drive_name) { + drive.wear_percent = Some(wear); + } + } + } + } else if metric.name.contains("_health") { + if let Some(drive_name) = self.extract_drive_name(&metric.name) { + let health = metric.value.as_string(); + self.ensure_drive_exists(agent_data, &drive_name); + if let Some(drive) = agent_data.system.storage.drives.iter_mut().find(|d| d.name == drive_name) { + drive.health = health; + } + } + } else if metric.name.contains("_fs_") { + // Filesystem metrics: disk_{pool}_fs_{filesystem}_{metric} + if let Some((pool_name, fs_name)) = self.extract_pool_and_filesystem(&metric.name) { + if metric.name.contains("_usage_percent") { + if let Some(usage) = metric.value.as_f32() { + self.ensure_filesystem_exists(agent_data, &pool_name, &fs_name, usage, 0.0, 0.0); + } + } else if metric.name.contains("_used_gb") { + if let Some(used) = metric.value.as_f32() { + self.update_filesystem_field(agent_data, &pool_name, &fs_name, |fs| fs.used_gb = used); + } + } else if metric.name.contains("_total_gb") { + if let Some(total) = metric.value.as_f32() { + self.update_filesystem_field(agent_data, &pool_name, &fs_name, |fs| fs.total_gb = total); + } + } + } + } + } + // Service metrics + else if metric.name.starts_with("service_") { + if let Some(service_name) = self.extract_service_name(&metric.name) { + if metric.name.contains("_status") { + let status = metric.value.as_string(); + self.ensure_service_exists(agent_data, &service_name, &status); + } else if metric.name.contains("_memory_mb") { + if let Some(memory) = metric.value.as_f32() { + self.update_service_field(agent_data, &service_name, |svc| svc.memory_mb = memory); + } + } else if metric.name.contains("_disk_gb") { + if let Some(disk) = metric.value.as_f32() { + self.update_service_field(agent_data, &service_name, |svc| svc.disk_gb = disk); + } + } + } + } + // Backup metrics + else if metric.name.starts_with("backup_") { + if metric.name == "backup_status" { + agent_data.backup.status = metric.value.as_string(); + } else if metric.name == "backup_last_run_timestamp" { + if let Some(timestamp) = metric.value.as_i64() { + agent_data.backup.last_run = Some(timestamp as u64); + } + } else if metric.name == "backup_next_scheduled_timestamp" { + if let Some(timestamp) = metric.value.as_i64() { + agent_data.backup.next_scheduled = Some(timestamp as u64); + } + } else if metric.name == "backup_size_gb" { + if let Some(size) = metric.value.as_f32() { + agent_data.backup.total_size_gb = Some(size); + } + } else if metric.name == "backup_repository_health" { + agent_data.backup.repository_health = Some(metric.value.as_string()); + } + } + + Ok(()) + } + + /// Extract drive name from metric like "disk_nvme0n1_temperature" + fn extract_drive_name(&self, metric_name: &str) -> Option { + if metric_name.starts_with("disk_") { + let suffixes = ["_temperature", "_wear_percent", "_health"]; + for suffix in suffixes { + if let Some(suffix_pos) = metric_name.rfind(suffix) { + return Some(metric_name[5..suffix_pos].to_string()); // Skip "disk_" + } + } + } + None + } + + /// Extract pool and filesystem from "disk_{pool}_fs_{filesystem}_{metric}" + fn extract_pool_and_filesystem(&self, metric_name: &str) -> Option<(String, String)> { + if let Some(fs_pos) = metric_name.find("_fs_") { + let pool_name = metric_name[5..fs_pos].to_string(); // Skip "disk_" + let after_fs = &metric_name[fs_pos + 4..]; // Skip "_fs_" + if let Some(metric_pos) = after_fs.find('_') { + let fs_name = after_fs[..metric_pos].to_string(); + return Some((pool_name, fs_name)); + } + } + None + } + + /// Extract service name from "service_{name}_{metric}" + fn extract_service_name(&self, metric_name: &str) -> Option { + if metric_name.starts_with("service_") { + let suffixes = ["_status", "_memory_mb", "_disk_gb"]; + for suffix in suffixes { + if let Some(suffix_pos) = metric_name.rfind(suffix) { + return Some(metric_name[8..suffix_pos].to_string()); // Skip "service_" + } + } + } + None + } + + /// Ensure drive exists in agent_data + fn ensure_drive_exists(&self, agent_data: &mut AgentData, drive_name: &str) { + if !agent_data.system.storage.drives.iter().any(|d| d.name == drive_name) { + agent_data.system.storage.drives.push(DriveData { + name: drive_name.to_string(), + health: "UNKNOWN".to_string(), + temperature_celsius: None, + wear_percent: None, + filesystems: Vec::new(), + }); + } + } + + /// Ensure filesystem exists in the correct drive + fn ensure_filesystem_exists(&self, agent_data: &mut AgentData, pool_name: &str, fs_name: &str, usage_percent: f32, used_gb: f32, total_gb: f32) { + self.ensure_drive_exists(agent_data, pool_name); + if let Some(drive) = agent_data.system.storage.drives.iter_mut().find(|d| d.name == pool_name) { + if !drive.filesystems.iter().any(|fs| fs.mount == fs_name) { + drive.filesystems.push(FilesystemData { + mount: fs_name.to_string(), + usage_percent, + used_gb, + total_gb, + }); + } + } + } + + /// Update filesystem field + fn update_filesystem_field(&self, agent_data: &mut AgentData, pool_name: &str, fs_name: &str, update_fn: F) + where F: FnOnce(&mut FilesystemData) { + if let Some(drive) = agent_data.system.storage.drives.iter_mut().find(|d| d.name == pool_name) { + if let Some(fs) = drive.filesystems.iter_mut().find(|fs| fs.mount == fs_name) { + update_fn(fs); + } + } + } + + /// Ensure service exists + fn ensure_service_exists(&self, agent_data: &mut AgentData, service_name: &str, status: &str) { + if !agent_data.services.iter().any(|s| s.name == service_name) { + agent_data.services.push(ServiceData { + name: service_name.to_string(), + status: status.to_string(), + memory_mb: 0.0, + disk_gb: 0.0, + user_stopped: false, // TODO: Get from service tracker + }); + } else if let Some(service) = agent_data.services.iter_mut().find(|s| s.name == service_name) { + service.status = status.to_string(); + } + } + + /// Update service field + fn update_service_field(&self, agent_data: &mut AgentData, service_name: &str, update_fn: F) + where F: FnOnce(&mut ServiceData) { + if let Some(service) = agent_data.services.iter_mut().find(|s| s.name == service_name) { + update_fn(service); + } + } + async fn process_metrics(&mut self, metrics: &[Metric]) -> bool { let mut status_changed = false; for metric in metrics { @@ -261,13 +555,11 @@ impl Agent { /// Send standalone heartbeat for connectivity detection async fn send_heartbeat(&mut self) -> Result<()> { - let heartbeat_metric = self.get_heartbeat_metric(); - let message = MetricMessage::new( - self.hostname.clone(), - vec![heartbeat_metric], - ); - - self.zmq_handler.publish_metrics(&message).await?; + // Create minimal agent data with just heartbeat + let agent_data = AgentData::new(self.hostname.clone(), self.get_agent_version()); + // Heartbeat timestamp is already set in AgentData::new() + + self.zmq_handler.publish_agent_data(&agent_data).await?; debug!("Sent standalone heartbeat for connectivity detection"); Ok(()) } diff --git a/agent/src/communication/mod.rs b/agent/src/communication/mod.rs index e94dcab..c364f7c 100644 --- a/agent/src/communication/mod.rs +++ b/agent/src/communication/mod.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use cm_dashboard_shared::{MessageEnvelope, MetricMessage}; +use cm_dashboard_shared::{AgentData, MessageEnvelope}; use tracing::{debug, info}; use zmq::{Context, Socket, SocketType}; @@ -43,17 +43,17 @@ impl ZmqHandler { }) } - /// Publish metrics message via ZMQ - pub async fn publish_metrics(&self, message: &MetricMessage) -> Result<()> { + + /// Publish agent data via ZMQ + pub async fn publish_agent_data(&self, data: &AgentData) -> Result<()> { debug!( - "Publishing {} metrics for host {}", - message.metrics.len(), - message.hostname + "Publishing agent data for host {}", + data.hostname ); - // Create message envelope - let envelope = MessageEnvelope::metrics(message.clone()) - .map_err(|e| anyhow::anyhow!("Failed to create message envelope: {}", e))?; + // Create message envelope for agent data + let envelope = MessageEnvelope::agent_data(data.clone()) + .map_err(|e| anyhow::anyhow!("Failed to create agent data envelope: {}", e))?; // Serialize envelope let serialized = serde_json::to_vec(&envelope)?; @@ -61,11 +61,10 @@ impl ZmqHandler { // Send via ZMQ self.publisher.send(&serialized, 0)?; - debug!("Published metrics message ({} bytes)", serialized.len()); + debug!("Published agent data message ({} bytes)", serialized.len()); Ok(()) } - /// Try to receive a command (non-blocking) pub fn try_receive_command(&self) -> Result> { match self.command_receiver.recv_bytes(zmq::DONTWAIT) { diff --git a/dashboard/Cargo.toml b/dashboard/Cargo.toml index 6c34e56..641fcdd 100644 --- a/dashboard/Cargo.toml +++ b/dashboard/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cm-dashboard" -version = "0.1.130" +version = "0.1.131" edition = "2021" [dependencies] diff --git a/dashboard/src/app.rs b/dashboard/src/app.rs index ef33b48..d0da630 100644 --- a/dashboard/src/app.rs +++ b/dashboard/src/app.rs @@ -185,42 +185,35 @@ impl Dashboard { // Check for new metrics if last_metrics_check.elapsed() >= metrics_check_interval { - if let Ok(Some(metric_message)) = self.zmq_consumer.receive_metrics().await { + if let Ok(Some(agent_data)) = self.zmq_consumer.receive_agent_data().await { debug!( - "Received metrics from {}: {} metrics", - metric_message.hostname, - metric_message.metrics.len() + "Received agent data from {}", + agent_data.hostname ); // Track first contact with host (no command needed - agent sends data every 2s) let is_new_host = !self .initial_commands_sent - .contains(&metric_message.hostname); + .contains(&agent_data.hostname); if is_new_host { info!( "First contact with host {} - data will update automatically", - metric_message.hostname + agent_data.hostname ); self.initial_commands_sent - .insert(metric_message.hostname.clone()); + .insert(agent_data.hostname.clone()); } // Show raw data if requested (before processing) if self.raw_data { - println!("RAW METRICS FROM {}: {} metrics", metric_message.hostname, metric_message.metrics.len()); - for metric in &metric_message.metrics { - println!(" {}: {:?} ({:?})", metric.name, metric.value, metric.status); - if let Some(desc) = &metric.description { - println!(" └─ {}", desc); - } - } + println!("RAW AGENT DATA FROM {}:", agent_data.hostname); + println!("{}", serde_json::to_string_pretty(&agent_data).unwrap_or_else(|e| format!("Serialization error: {}", e))); println!("{}", "─".repeat(80)); } - // Update metric store - self.metric_store - .update_metrics(&metric_message.hostname, metric_message.metrics); + // Update data store + self.metric_store.process_agent_data(agent_data); // Check for agent version mismatches across hosts if let Some((current_version, outdated_hosts)) = self.metric_store.get_version_mismatches() { diff --git a/dashboard/src/communication/mod.rs b/dashboard/src/communication/mod.rs index 2646a9b..7b9e463 100644 --- a/dashboard/src/communication/mod.rs +++ b/dashboard/src/communication/mod.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use cm_dashboard_shared::{CommandOutputMessage, MessageEnvelope, MessageType, MetricMessage}; +use cm_dashboard_shared::{AgentData, CommandOutputMessage, MessageEnvelope, MessageType}; use tracing::{debug, error, info, warn}; use zmq::{Context, Socket, SocketType}; @@ -117,8 +117,8 @@ impl ZmqConsumer { } } - /// Receive metrics from any connected agent (non-blocking) - pub async fn receive_metrics(&mut self) -> Result> { + /// Receive agent data (non-blocking) + pub async fn receive_agent_data(&mut self) -> Result> { match self.subscriber.recv_bytes(zmq::DONTWAIT) { Ok(data) => { debug!("Received {} bytes from ZMQ", data.len()); @@ -129,29 +129,27 @@ impl ZmqConsumer { // Check message type match envelope.message_type { - MessageType::Metrics => { - let metrics = envelope - .decode_metrics() - .map_err(|e| anyhow::anyhow!("Failed to decode metrics: {}", e))?; + MessageType::AgentData => { + let agent_data = envelope + .decode_agent_data() + .map_err(|e| anyhow::anyhow!("Failed to decode agent data: {}", e))?; debug!( - "Received {} metrics from {}", - metrics.metrics.len(), - metrics.hostname + "Received agent data from host {}", + agent_data.hostname ); - - Ok(Some(metrics)) + Ok(Some(agent_data)) } MessageType::Heartbeat => { debug!("Received heartbeat"); - Ok(None) // Don't return heartbeats as metrics + Ok(None) // Don't return heartbeats } MessageType::CommandOutput => { debug!("Received command output (will be handled by receive_command_output)"); Ok(None) // Command output handled by separate method } _ => { - debug!("Received non-metrics message: {:?}", envelope.message_type); + debug!("Received unsupported message: {:?}", envelope.message_type); Ok(None) } } @@ -166,5 +164,6 @@ impl ZmqConsumer { } } } + } diff --git a/dashboard/src/metrics/store.rs b/dashboard/src/metrics/store.rs index 1ebc8b6..494ebb3 100644 --- a/dashboard/src/metrics/store.rs +++ b/dashboard/src/metrics/store.rs @@ -1,4 +1,4 @@ -use cm_dashboard_shared::Metric; +use cm_dashboard_shared::{AgentData, Metric}; use std::collections::HashMap; use std::time::{Duration, Instant}; use tracing::{debug, info, warn}; @@ -76,6 +76,286 @@ impl MetricStore { ); } + /// Process structured agent data (temporary bridge - converts back to metrics) + /// TODO: Replace entire metric system with direct structured data processing + pub fn process_agent_data(&mut self, agent_data: AgentData) { + let metrics = self.convert_agent_data_to_metrics(&agent_data); + self.update_metrics(&agent_data.hostname, metrics); + } + + /// Convert structured agent data to legacy metrics (temporary bridge) + fn convert_agent_data_to_metrics(&self, agent_data: &AgentData) -> Vec { + use cm_dashboard_shared::{Metric, MetricValue, Status}; + + let mut metrics = Vec::new(); + + // Convert CPU data + metrics.push(Metric::new( + "cpu_load_1min".to_string(), + MetricValue::Float(agent_data.system.cpu.load_1min), + Status::Ok, + )); + metrics.push(Metric::new( + "cpu_load_5min".to_string(), + MetricValue::Float(agent_data.system.cpu.load_5min), + Status::Ok, + )); + metrics.push(Metric::new( + "cpu_load_15min".to_string(), + MetricValue::Float(agent_data.system.cpu.load_15min), + Status::Ok, + )); + metrics.push(Metric::new( + "cpu_frequency_mhz".to_string(), + MetricValue::Float(agent_data.system.cpu.frequency_mhz), + Status::Ok, + )); + if let Some(temp) = agent_data.system.cpu.temperature_celsius { + metrics.push(Metric::new( + "cpu_temperature_celsius".to_string(), + MetricValue::Float(temp), + Status::Ok, + )); + } + + // Convert Memory data + metrics.push(Metric::new( + "memory_usage_percent".to_string(), + MetricValue::Float(agent_data.system.memory.usage_percent), + Status::Ok, + )); + metrics.push(Metric::new( + "memory_total_gb".to_string(), + MetricValue::Float(agent_data.system.memory.total_gb), + Status::Ok, + )); + metrics.push(Metric::new( + "memory_used_gb".to_string(), + MetricValue::Float(agent_data.system.memory.used_gb), + Status::Ok, + )); + metrics.push(Metric::new( + "memory_available_gb".to_string(), + MetricValue::Float(agent_data.system.memory.available_gb), + Status::Ok, + )); + metrics.push(Metric::new( + "memory_swap_total_gb".to_string(), + MetricValue::Float(agent_data.system.memory.swap_total_gb), + Status::Ok, + )); + metrics.push(Metric::new( + "memory_swap_used_gb".to_string(), + MetricValue::Float(agent_data.system.memory.swap_used_gb), + Status::Ok, + )); + + // Convert tmpfs data + for tmpfs in &agent_data.system.memory.tmpfs { + if tmpfs.mount == "/tmp" { + metrics.push(Metric::new( + "memory_tmp_usage_percent".to_string(), + MetricValue::Float(tmpfs.usage_percent), + Status::Ok, + )); + metrics.push(Metric::new( + "memory_tmp_used_gb".to_string(), + MetricValue::Float(tmpfs.used_gb), + Status::Ok, + )); + metrics.push(Metric::new( + "memory_tmp_total_gb".to_string(), + MetricValue::Float(tmpfs.total_gb), + Status::Ok, + )); + } + } + + // Add agent metadata + metrics.push(Metric::new( + "agent_version".to_string(), + MetricValue::String(agent_data.agent_version.clone()), + Status::Ok, + )); + metrics.push(Metric::new( + "agent_heartbeat".to_string(), + MetricValue::Integer(agent_data.timestamp as i64), + Status::Ok, + )); + + // Convert storage data + for drive in &agent_data.system.storage.drives { + // Drive-level metrics + if let Some(temp) = drive.temperature_celsius { + metrics.push(Metric::new( + format!("disk_{}_temperature", drive.name), + MetricValue::Float(temp), + Status::Ok, + )); + } + if let Some(wear) = drive.wear_percent { + metrics.push(Metric::new( + format!("disk_{}_wear_percent", drive.name), + MetricValue::Float(wear), + Status::Ok, + )); + } + metrics.push(Metric::new( + format!("disk_{}_health", drive.name), + MetricValue::String(drive.health.clone()), + Status::Ok, + )); + + // Filesystem metrics + for fs in &drive.filesystems { + let fs_base = format!("disk_{}_fs_{}", drive.name, fs.mount.replace('/', "root")); + metrics.push(Metric::new( + format!("{}_usage_percent", fs_base), + MetricValue::Float(fs.usage_percent), + Status::Ok, + )); + metrics.push(Metric::new( + format!("{}_used_gb", fs_base), + MetricValue::Float(fs.used_gb), + Status::Ok, + )); + metrics.push(Metric::new( + format!("{}_total_gb", fs_base), + MetricValue::Float(fs.total_gb), + Status::Ok, + )); + } + } + + // Convert storage pools + for pool in &agent_data.system.storage.pools { + let pool_base = format!("disk_{}", pool.name); + metrics.push(Metric::new( + format!("{}_usage_percent", pool_base), + MetricValue::Float(pool.usage_percent), + Status::Ok, + )); + metrics.push(Metric::new( + format!("{}_used_gb", pool_base), + MetricValue::Float(pool.used_gb), + Status::Ok, + )); + metrics.push(Metric::new( + format!("{}_total_gb", pool_base), + MetricValue::Float(pool.total_gb), + Status::Ok, + )); + metrics.push(Metric::new( + format!("{}_pool_type", pool_base), + MetricValue::String(pool.pool_type.clone()), + Status::Ok, + )); + metrics.push(Metric::new( + format!("{}_mount_point", pool_base), + MetricValue::String(pool.mount.clone()), + Status::Ok, + )); + + // Pool drive data + for drive in &pool.data_drives { + if let Some(temp) = drive.temperature_celsius { + metrics.push(Metric::new( + format!("disk_{}_{}_temperature", pool.name, drive.name), + MetricValue::Float(temp), + Status::Ok, + )); + } + if let Some(wear) = drive.wear_percent { + metrics.push(Metric::new( + format!("disk_{}_{}_wear_percent", pool.name, drive.name), + MetricValue::Float(wear), + Status::Ok, + )); + } + } + for drive in &pool.parity_drives { + if let Some(temp) = drive.temperature_celsius { + metrics.push(Metric::new( + format!("disk_{}_{}_temperature", pool.name, drive.name), + MetricValue::Float(temp), + Status::Ok, + )); + } + if let Some(wear) = drive.wear_percent { + metrics.push(Metric::new( + format!("disk_{}_{}_wear_percent", pool.name, drive.name), + MetricValue::Float(wear), + Status::Ok, + )); + } + } + } + + // Convert service data + for service in &agent_data.services { + let service_base = format!("service_{}", service.name); + metrics.push(Metric::new( + format!("{}_status", service_base), + MetricValue::String(service.status.clone()), + Status::Ok, + )); + metrics.push(Metric::new( + format!("{}_memory_mb", service_base), + MetricValue::Float(service.memory_mb), + Status::Ok, + )); + metrics.push(Metric::new( + format!("{}_disk_gb", service_base), + MetricValue::Float(service.disk_gb), + Status::Ok, + )); + if service.user_stopped { + metrics.push(Metric::new( + format!("{}_user_stopped", service_base), + MetricValue::Boolean(true), + Status::Ok, + )); + } + } + + // Convert backup data + metrics.push(Metric::new( + "backup_status".to_string(), + MetricValue::String(agent_data.backup.status.clone()), + Status::Ok, + )); + if let Some(last_run) = agent_data.backup.last_run { + metrics.push(Metric::new( + "backup_last_run_timestamp".to_string(), + MetricValue::Integer(last_run as i64), + Status::Ok, + )); + } + if let Some(next_scheduled) = agent_data.backup.next_scheduled { + metrics.push(Metric::new( + "backup_next_scheduled_timestamp".to_string(), + MetricValue::Integer(next_scheduled as i64), + Status::Ok, + )); + } + if let Some(size) = agent_data.backup.total_size_gb { + metrics.push(Metric::new( + "backup_size_gb".to_string(), + MetricValue::Float(size), + Status::Ok, + )); + } + if let Some(health) = &agent_data.backup.repository_health { + metrics.push(Metric::new( + "backup_repository_health".to_string(), + MetricValue::String(health.clone()), + Status::Ok, + )); + } + + metrics + } + /// Get current metric for a specific host pub fn get_metric(&self, hostname: &str, metric_name: &str) -> Option<&Metric> { self.current_metrics.get(hostname)?.get(metric_name) diff --git a/dashboard/src/ui/widgets/system.rs b/dashboard/src/ui/widgets/system.rs index 9e24010..f0e1b8b 100644 --- a/dashboard/src/ui/widgets/system.rs +++ b/dashboard/src/ui/widgets/system.rs @@ -380,16 +380,18 @@ impl SystemWidget { } } - // Handle physical drive metrics: disk_{drive}_health and disk_{drive}_wear_percent + // Handle physical drive metrics: disk_{drive}_health, disk_{drive}_wear_percent, and disk_{drive}_temperature if (metric_name.ends_with("_health") && !metric_name.contains("_pool_health")) - || metric_name.ends_with("_wear_percent") { + || metric_name.ends_with("_wear_percent") + || metric_name.ends_with("_temperature") { // Count underscores to distinguish physical drive metrics (disk_{drive}_metric) // from pool drive metrics (disk_{pool}_{drive}_metric) let underscore_count = metric_name.matches('_').count(); // disk_nvme0n1_wear_percent has 3 underscores: disk_nvme0n1_wear_percent - if underscore_count == 3 { // disk_{drive}_wear_percent (where drive has underscores) + if underscore_count == 3 { // disk_{drive}_metric (where drive has underscores) if let Some(suffix_pos) = metric_name.rfind("_health") - .or_else(|| metric_name.rfind("_wear_percent")) { + .or_else(|| metric_name.rfind("_wear_percent")) + .or_else(|| metric_name.rfind("_temperature")) { return Some(metric_name[5..suffix_pos].to_string()); // Skip "disk_" } } @@ -468,15 +470,25 @@ impl SystemWidget { for pool in &self.storage_pools { // Pool header line with type and health let pool_label = if pool.pool_type.starts_with("drive (") { - // For physical drives, show the drive name with wear percentage if available - // Look for any drive with wear data (physical drives may have drives named after the pool) + // For physical drives, show the drive name with temperature and wear percentage if available + // Look for any drive with temp/wear data (physical drives may have drives named after the pool) + let temp_opt = pool.drives.iter() + .find_map(|d| d.temperature); let wear_opt = pool.drives.iter() .find_map(|d| d.wear_percent); + let mut drive_info = Vec::new(); + if let Some(temp) = temp_opt { + drive_info.push(format!("T: {:.0}°C", temp)); + } if let Some(wear) = wear_opt { - format!("{} W: {:.0}%:", pool.name, wear) - } else { + drive_info.push(format!("W: {:.0}%", wear)); + } + + if drive_info.is_empty() { format!("{}:", pool.name) + } else { + format!("{} {}:", pool.name, drive_info.join(" ")) } } else if pool.pool_type == "single" { format!("{}:", pool.mount_point) diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 6dcc2b3..d5870d6 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cm-dashboard-shared" -version = "0.1.130" +version = "0.1.131" edition = "2021" [dependencies] diff --git a/shared/src/agent_data.rs b/shared/src/agent_data.rs new file mode 100644 index 0000000..2707900 --- /dev/null +++ b/shared/src/agent_data.rs @@ -0,0 +1,161 @@ +use serde::{Deserialize, Serialize}; + +/// Complete structured data from an agent +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AgentData { + pub hostname: String, + pub agent_version: String, + pub timestamp: u64, + pub system: SystemData, + pub services: Vec, + pub backup: BackupData, +} + +/// System-level monitoring data +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SystemData { + pub cpu: CpuData, + pub memory: MemoryData, + pub storage: StorageData, +} + +/// CPU monitoring data +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CpuData { + pub load_1min: f32, + pub load_5min: f32, + pub load_15min: f32, + pub frequency_mhz: f32, + pub temperature_celsius: Option, +} + +/// Memory monitoring data +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MemoryData { + pub usage_percent: f32, + pub total_gb: f32, + pub used_gb: f32, + pub available_gb: f32, + pub swap_total_gb: f32, + pub swap_used_gb: f32, + pub tmpfs: Vec, +} + +/// Tmpfs filesystem data +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TmpfsData { + pub mount: String, + pub usage_percent: f32, + pub used_gb: f32, + pub total_gb: f32, +} + +/// Storage monitoring data +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StorageData { + pub drives: Vec, + pub pools: Vec, +} + +/// Individual drive data +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DriveData { + pub name: String, + pub health: String, + pub temperature_celsius: Option, + pub wear_percent: Option, + pub filesystems: Vec, +} + +/// Filesystem on a drive +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FilesystemData { + pub mount: String, + pub usage_percent: f32, + pub used_gb: f32, + pub total_gb: f32, +} + +/// Storage pool (MergerFS, RAID, etc.) +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PoolData { + pub name: String, + pub mount: String, + pub pool_type: String, // "mergerfs", "raid", etc. + pub health: String, + pub usage_percent: f32, + pub used_gb: f32, + pub total_gb: f32, + pub data_drives: Vec, + pub parity_drives: Vec, +} + +/// Drive in a storage pool +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PoolDriveData { + pub name: String, + pub temperature_celsius: Option, + pub wear_percent: Option, + pub health: String, +} + +/// Service monitoring data +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ServiceData { + pub name: String, + pub status: String, // "active", "inactive", "failed" + pub memory_mb: f32, + pub disk_gb: f32, + pub user_stopped: bool, +} + +/// Backup system data +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BackupData { + pub status: String, + pub last_run: Option, + pub next_scheduled: Option, + pub total_size_gb: Option, + pub repository_health: Option, +} + +impl AgentData { + /// Create new agent data with current timestamp + pub fn new(hostname: String, agent_version: String) -> Self { + Self { + hostname, + agent_version, + timestamp: chrono::Utc::now().timestamp() as u64, + system: SystemData { + cpu: CpuData { + load_1min: 0.0, + load_5min: 0.0, + load_15min: 0.0, + frequency_mhz: 0.0, + temperature_celsius: None, + }, + memory: MemoryData { + usage_percent: 0.0, + total_gb: 0.0, + used_gb: 0.0, + available_gb: 0.0, + swap_total_gb: 0.0, + swap_used_gb: 0.0, + tmpfs: Vec::new(), + }, + storage: StorageData { + drives: Vec::new(), + pools: Vec::new(), + }, + }, + services: Vec::new(), + backup: BackupData { + status: "unknown".to_string(), + last_run: None, + next_scheduled: None, + total_size_gb: None, + repository_health: None, + }, + } + } +} \ No newline at end of file diff --git a/shared/src/lib.rs b/shared/src/lib.rs index 9b5dbf9..c8de872 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -1,8 +1,10 @@ +pub mod agent_data; pub mod cache; pub mod error; pub mod metrics; pub mod protocol; +pub use agent_data::*; pub use cache::*; pub use error::*; pub use metrics::*; diff --git a/shared/src/protocol.rs b/shared/src/protocol.rs index b5c592a..8261120 100644 --- a/shared/src/protocol.rs +++ b/shared/src/protocol.rs @@ -1,13 +1,9 @@ -use crate::metrics::Metric; +use crate::agent_data::AgentData; use serde::{Deserialize, Serialize}; -/// Message sent from agent to dashboard via ZMQ -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct MetricMessage { - pub hostname: String, - pub timestamp: u64, - pub metrics: Vec, -} +/// Message sent from agent to dashboard via ZMQ +/// Always structured data - no legacy metrics support +pub type AgentMessage = AgentData; /// Command output streaming message #[derive(Debug, Clone, Serialize, Deserialize)] @@ -20,15 +16,6 @@ pub struct CommandOutputMessage { pub timestamp: u64, } -impl MetricMessage { - pub fn new(hostname: String, metrics: Vec) -> Self { - Self { - hostname, - timestamp: chrono::Utc::now().timestamp() as u64, - metrics, - } - } -} impl CommandOutputMessage { pub fn new(hostname: String, command_id: String, command_type: String, output_line: String, is_complete: bool) -> Self { @@ -59,8 +46,8 @@ pub enum Command { pub enum CommandResponse { /// Acknowledgment of command Ack, - /// Metrics response - Metrics(Vec), + /// Agent data response + AgentData(AgentData), /// Pong response to ping Pong, /// Error response @@ -76,7 +63,7 @@ pub struct MessageEnvelope { #[derive(Debug, Serialize, Deserialize)] pub enum MessageType { - Metrics, + AgentData, Command, CommandResponse, CommandOutput, @@ -84,10 +71,10 @@ pub enum MessageType { } impl MessageEnvelope { - pub fn metrics(message: MetricMessage) -> Result { + pub fn agent_data(data: AgentData) -> Result { Ok(Self { - message_type: MessageType::Metrics, - payload: serde_json::to_vec(&message)?, + message_type: MessageType::AgentData, + payload: serde_json::to_vec(&data)?, }) } @@ -119,11 +106,11 @@ impl MessageEnvelope { }) } - pub fn decode_metrics(&self) -> Result { + pub fn decode_agent_data(&self) -> Result { match self.message_type { - MessageType::Metrics => Ok(serde_json::from_slice(&self.payload)?), + MessageType::AgentData => Ok(serde_json::from_slice(&self.payload)?), _ => Err(crate::SharedError::Protocol { - message: "Expected metrics message".to_string(), + message: "Expected agent data message".to_string(), }), } }