Compare commits

..

14 Commits

Author SHA1 Message Date
adf3b0f51c Implement complete structured data architecture
All checks were successful
Build and Release / build-and-release (push) Successful in 2m10s
Replace fragile string-based metrics with type-safe JSON data structures.
Agent converts all metrics to structured data, dashboard processes typed fields.

Changes:
- Add AgentData struct with CPU, memory, storage, services, backup fields
- Replace string parsing with direct field access throughout system
- Maintain UI compatibility via temporary metric bridge conversion
- Fix NVMe temperature display and eliminate string parsing bugs
- Update protocol to support structured data transmission over ZMQ
- Comprehensive metric type coverage: CPU, memory, storage, services, backup

Version bump to 0.1.131
2025-11-23 21:32:00 +01:00
41ded0170c Add wear percentage display and NVMe temperature collection
All checks were successful
Build and Release / build-and-release (push) Successful in 2m9s
- Display wear percentage in storage headers for single physical drives
- Remove redundant drive type indicators, show wear data instead
- Fix wear metric parsing for physical drives (underscore count issue)
- Add NVMe temperature parsing support (Temperature: format)
- Add raw metrics debugging functionality for troubleshooting
- Clean up physical drive display to remove redundant information
2025-11-23 20:29:24 +01:00
9b4191b2c3 Fix physical drive name and health status display
All checks were successful
Build and Release / build-and-release (push) Successful in 2m13s
- Display actual drive name (e.g., nvme0n1) instead of mount point for physical drives
- Fix health status parsing for physical drives to show proper status icons
- Update pool name extraction to handle disk_{drive}_health metrics correctly
- Improve storage widget rendering for physical drive identification
2025-11-23 19:25:45 +01:00
53dbb43352 Fix SnapRAID parity association using directory-based discovery
All checks were successful
Build and Release / build-and-release (push) Successful in 1m8s
- Replace blanket parity drive inclusion with smart relationship detection
- Only associate parity drives from same parent directory as data drives
- Prevent incorrect exclusion of nvme0n1 physical drives from grouping
- Maintain zero-configuration auto-discovery without hardcoded paths
2025-11-23 18:42:48 +01:00
ba03623110 Remove hardcoded pool mount point mappings for true auto-discovery
All checks were successful
Build and Release / build-and-release (push) Successful in 1m19s
- Eliminate hardcoded mappings like 'root' -> '/' and 'steampool' -> '/mnt/steampool'
- Use device names directly for physical drives
- Rely on mount_point metrics from agent for actual mount paths
- Implement zero-configuration architecture as specified in CLAUDE.md
2025-11-23 18:34:45 +01:00
f24c4ed650 Fix pool name extraction to prevent wrong physical drive naming
All checks were successful
Build and Release / build-and-release (push) Successful in 2m10s
- Remove fallback logic that could extract incorrect pool names
- Simplify pool suffix matching to use explicit arrays
- Ensure only valid metric patterns create pools
2025-11-23 18:24:39 +01:00
86501fd486 Fix display format to match CLAUDE.md specification
All checks were successful
Build and Release / build-and-release (push) Successful in 1m17s
- Use actual device names (sdb, sdc) instead of data_0, parity_0
- Fix physical drive naming to show device names instead of mount points
- Update pool name extraction to handle new device-based naming
- Ensure Drive: line shows temperature and wear data for physical drives
2025-11-23 18:13:35 +01:00
192eea6e0c Integrate SnapRAID parity drives into mergerfs pools
All checks were successful
Build and Release / build-and-release (push) Successful in 1m19s
- Add SnapRAID parity drive detection to mergerfs discovery
- Remove Pool Status health line as discussed
- Update drive display to always show wear data when available
- Include /mnt/parity drives as part of mergerfs pool structure
2025-11-23 18:05:19 +01:00
43fb838c9b Fix duplicate drive display in mergerfs pools
All checks were successful
Build and Release / build-and-release (push) Successful in 2m9s
- Restructure storage rendering logic to prevent drive duplication
- Use specific mergerfs check instead of generic multi-drive condition
- Ensure drives only appear once under organized data/parity sections
2025-11-23 17:46:09 +01:00
54483653f9 Fix mergerfs drive metric parsing for proper pool consolidation
All checks were successful
Build and Release / build-and-release (push) Successful in 2m11s
- Update extract_pool_name to handle data_/parity_ drive metrics correctly
- Fix extract_drive_name to parse mergerfs drive roles properly
- Prevent srv_media_data from being parsed as separate pool
2025-11-23 17:40:12 +01:00
e47803b705 Fix mergerfs pool consolidation and naming
All checks were successful
Build and Release / build-and-release (push) Successful in 1m18s
- Improve pool name extraction in dashboard parsing
- Use consistent mergerfs pool naming in agent
- Add mount_point metric parsing to use actual mount paths
- Fix pool consolidation to prevent duplicate entries
2025-11-23 17:35:23 +01:00
439d0d9af6 Fix mergerfs numeric reference parsing for proper pool detection
All checks were successful
Build and Release / build-and-release (push) Successful in 2m11s
Add support for numeric mergerfs references like "1:2" by mapping them
to actual mount points (/mnt/disk1, /mnt/disk2). This enables proper
mergerfs pool detection and hides individual member drives as intended.
2025-11-23 17:27:45 +01:00
2242b5ddfe Make mergerfs detection more robust to prevent discovery failures
All checks were successful
Build and Release / build-and-release (push) Successful in 2m9s
Skip mergerfs pools with numeric device references (e.g., "1:2")
instead of crashing. This allows regular drive detection to work
even when mergerfs uses non-standard mount formats.

Preserves existing functionality for standard mergerfs setups.
2025-11-23 17:19:15 +01:00
9d0f42d55c Fix filesystem usage_percent parsing and remove hardcoded status
All checks were successful
Build and Release / build-and-release (push) Successful in 1m8s
1. Add missing _fs_ filter to usage_percent parsing in dashboard
2. Fix agent to use calculated fs_status instead of hardcoded Status::Ok

This completes the disk collector auto-discovery by ensuring filesystem
usage percentages and status indicators display correctly.
2025-11-23 16:47:20 +01:00
16 changed files with 1164 additions and 222 deletions

113
CLAUDE.md
View File

@@ -59,11 +59,85 @@ hostname2 = [
## Core Architecture Principles ## Core Architecture Principles
### Individual Metrics Philosophy ### Structured Data Architecture (Planned Migration)
- Agent collects individual metrics, dashboard composes widgets Current system uses string-based metrics with complex parsing. Planning migration to structured JSON data to eliminate fragile string manipulation.
- Each metric collected, transmitted, and stored individually
- Agent calculates status for each metric using thresholds **Current (String Metrics):**
- Dashboard aggregates individual metric statuses for widget status - Agent sends individual metrics with string names like `disk_nvme0n1_temperature`
- Dashboard parses metric names with underscore counting and string splitting
- Complex and error-prone metric filtering and extraction logic
**Target (Structured Data):**
```json
{
"hostname": "cmbox",
"agent_version": "v0.1.130",
"timestamp": 1763926877,
"system": {
"cpu": {
"load_1min": 3.50,
"load_5min": 3.57,
"load_15min": 3.58,
"frequency_mhz": 1500,
"temperature_celsius": 45.2
},
"memory": {
"usage_percent": 25.0,
"total_gb": 23.3,
"used_gb": 5.9,
"swap_total_gb": 10.7,
"swap_used_gb": 0.99,
"tmpfs": [
{"mount": "/tmp", "usage_percent": 15.0, "used_gb": 0.3, "total_gb": 2.0}
]
},
"storage": {
"drives": [
{
"name": "nvme0n1",
"health": "PASSED",
"temperature_celsius": 29.0,
"wear_percent": 1.0,
"filesystems": [
{"mount": "/", "usage_percent": 24.0, "used_gb": 224.9, "total_gb": 928.2}
]
}
],
"pools": [
{
"name": "srv_media",
"mount": "/srv/media",
"type": "mergerfs",
"health": "healthy",
"usage_percent": 63.0,
"used_gb": 2355.2,
"total_gb": 3686.4,
"data_drives": [
{"name": "sdb", "temperature_celsius": 24.0}
],
"parity_drives": [
{"name": "sdc", "temperature_celsius": 24.0}
]
}
]
}
},
"services": [
{"name": "sshd", "status": "active", "memory_mb": 4.5, "disk_gb": 0.0}
],
"backup": {
"status": "completed",
"last_run": 1763920000,
"next_scheduled": 1764006400,
"total_size_gb": 150.5,
"repository_health": "ok"
}
}
```
- Agent sends structured JSON over ZMQ
- Dashboard accesses data directly: `data.system.storage.drives[0].temperature_celsius`
- Type safety eliminates all parsing bugs
### Maintenance Mode ### Maintenance Mode
- Agent checks for `/tmp/cm-maintenance` file before sending notifications - Agent checks for `/tmp/cm-maintenance` file before sending notifications
@@ -293,12 +367,33 @@ Keep responses concise and focused. Avoid extensive implementation summaries unl
- ✅ "Restructure storage widget with improved layout" - ✅ "Restructure storage widget with improved layout"
- ✅ "Update CPU thresholds to production values" - ✅ "Update CPU thresholds to production values"
## Planned Architecture Migration
### Phase 1: Structured Data Types (Shared Crate)
- Create Rust structs matching target JSON structure
- Replace `Metric` enum with typed data structures
- Add serde serialization/deserialization
### Phase 2: Agent Refactor
- Update collectors to return typed structs instead of `Vec<Metric>`
- Remove string metric name generation
- Send structured JSON over ZMQ
### Phase 3: Dashboard Refactor
- Replace metric parsing logic with direct field access
- Remove `extract_pool_name()`, `extract_drive_name()`, underscore counting
- Widgets access `data.system.storage.drives[0].temperature_celsius`
### Phase 4: Migration & Cleanup
- Support both formats during transition
- Gradual rollout with backward compatibility
- Remove legacy string metric system
## Implementation Rules ## Implementation Rules
1. **Individual Metrics**: Each metric is collected, transmitted, and stored individually 1. **Agent Status Authority**: Agent calculates status for each metric using thresholds
2. **Agent Status Authority**: Agent calculates status for each metric using thresholds 2. **Dashboard Composition**: Dashboard widgets subscribe to specific metrics by name
3. **Dashboard Composition**: Dashboard widgets subscribe to specific metrics by name 3. **Status Aggregation**: Dashboard aggregates individual metric statuses for widget status
4. **Status Aggregation**: Dashboard aggregates individual metric statuses for widget status
**NEVER:** **NEVER:**
- Copy/paste ANY code from legacy implementations - Copy/paste ANY code from legacy implementations

6
Cargo.lock generated
View File

@@ -279,7 +279,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
[[package]] [[package]]
name = "cm-dashboard" name = "cm-dashboard"
version = "0.1.117" version = "0.1.130"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"chrono", "chrono",
@@ -301,7 +301,7 @@ dependencies = [
[[package]] [[package]]
name = "cm-dashboard-agent" name = "cm-dashboard-agent"
version = "0.1.117" version = "0.1.130"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
@@ -324,7 +324,7 @@ dependencies = [
[[package]] [[package]]
name = "cm-dashboard-shared" name = "cm-dashboard-shared"
version = "0.1.117" version = "0.1.130"
dependencies = [ dependencies = [
"chrono", "chrono",
"serde", "serde",

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "cm-dashboard-agent" name = "cm-dashboard-agent"
version = "0.1.117" version = "0.1.131"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@@ -10,7 +10,7 @@ use crate::metrics::MetricCollectionManager;
use crate::notifications::NotificationManager; use crate::notifications::NotificationManager;
use crate::service_tracker::UserStoppedServiceTracker; use crate::service_tracker::UserStoppedServiceTracker;
use crate::status::HostStatusManager; use crate::status::HostStatusManager;
use cm_dashboard_shared::{Metric, MetricMessage, MetricValue, Status}; use cm_dashboard_shared::{AgentData, Metric, MetricValue, Status, TmpfsData, DriveData, FilesystemData, ServiceData};
pub struct Agent { pub struct Agent {
hostname: String, hostname: String,
@@ -199,16 +199,310 @@ impl Agent {
return Ok(()); return Ok(());
} }
debug!("Broadcasting {} cached metrics (including host status summary)", metrics.len()); debug!("Broadcasting {} cached metrics as structured data", metrics.len());
// Create and send message with all current data // Convert metrics to structured data and send
let message = MetricMessage::new(self.hostname.clone(), metrics); let agent_data = self.metrics_to_structured_data(&metrics)?;
self.zmq_handler.publish_metrics(&message).await?; self.zmq_handler.publish_agent_data(&agent_data).await?;
debug!("Metrics broadcasted successfully"); debug!("Structured data broadcasted successfully");
Ok(()) Ok(())
} }
/// Convert legacy metrics to structured data format
fn metrics_to_structured_data(&self, metrics: &[Metric]) -> Result<AgentData> {
let mut agent_data = AgentData::new(self.hostname.clone(), self.get_agent_version());
// Parse metrics into structured data
for metric in metrics {
self.parse_metric_into_agent_data(&mut agent_data, metric)?;
}
Ok(agent_data)
}
/// Parse a single metric into the appropriate structured data field
fn parse_metric_into_agent_data(&self, agent_data: &mut AgentData, metric: &Metric) -> Result<()> {
// CPU metrics
if metric.name == "cpu_load_1min" {
if let Some(value) = metric.value.as_f32() {
agent_data.system.cpu.load_1min = value;
}
} else if metric.name == "cpu_load_5min" {
if let Some(value) = metric.value.as_f32() {
agent_data.system.cpu.load_5min = value;
}
} else if metric.name == "cpu_load_15min" {
if let Some(value) = metric.value.as_f32() {
agent_data.system.cpu.load_15min = value;
}
} else if metric.name == "cpu_frequency_mhz" {
if let Some(value) = metric.value.as_f32() {
agent_data.system.cpu.frequency_mhz = value;
}
} else if metric.name == "cpu_temperature_celsius" {
if let Some(value) = metric.value.as_f32() {
agent_data.system.cpu.temperature_celsius = Some(value);
}
}
// Memory metrics
else if metric.name == "memory_usage_percent" {
if let Some(value) = metric.value.as_f32() {
agent_data.system.memory.usage_percent = value;
}
} else if metric.name == "memory_total_gb" {
if let Some(value) = metric.value.as_f32() {
agent_data.system.memory.total_gb = value;
}
} else if metric.name == "memory_used_gb" {
if let Some(value) = metric.value.as_f32() {
agent_data.system.memory.used_gb = value;
}
} else if metric.name == "memory_available_gb" {
if let Some(value) = metric.value.as_f32() {
agent_data.system.memory.available_gb = value;
}
} else if metric.name == "memory_swap_total_gb" {
if let Some(value) = metric.value.as_f32() {
agent_data.system.memory.swap_total_gb = value;
}
} else if metric.name == "memory_swap_used_gb" {
if let Some(value) = metric.value.as_f32() {
agent_data.system.memory.swap_used_gb = value;
}
}
// Tmpfs metrics
else if metric.name.starts_with("memory_tmp_") {
// For now, create a single /tmp tmpfs entry
if metric.name == "memory_tmp_usage_percent" {
if let Some(value) = metric.value.as_f32() {
if let Some(tmpfs) = agent_data.system.memory.tmpfs.get_mut(0) {
tmpfs.usage_percent = value;
} else {
agent_data.system.memory.tmpfs.push(TmpfsData {
mount: "/tmp".to_string(),
usage_percent: value,
used_gb: 0.0,
total_gb: 0.0,
});
}
}
} else if metric.name == "memory_tmp_used_gb" {
if let Some(value) = metric.value.as_f32() {
if let Some(tmpfs) = agent_data.system.memory.tmpfs.get_mut(0) {
tmpfs.used_gb = value;
} else {
agent_data.system.memory.tmpfs.push(TmpfsData {
mount: "/tmp".to_string(),
usage_percent: 0.0,
used_gb: value,
total_gb: 0.0,
});
}
}
} else if metric.name == "memory_tmp_total_gb" {
if let Some(value) = metric.value.as_f32() {
if let Some(tmpfs) = agent_data.system.memory.tmpfs.get_mut(0) {
tmpfs.total_gb = value;
} else {
agent_data.system.memory.tmpfs.push(TmpfsData {
mount: "/tmp".to_string(),
usage_percent: 0.0,
used_gb: 0.0,
total_gb: value,
});
}
}
}
}
// Storage metrics
else if metric.name.starts_with("disk_") {
if metric.name.contains("_temperature") {
if let Some(drive_name) = self.extract_drive_name(&metric.name) {
if let Some(temp) = metric.value.as_f32() {
self.ensure_drive_exists(agent_data, &drive_name);
if let Some(drive) = agent_data.system.storage.drives.iter_mut().find(|d| d.name == drive_name) {
drive.temperature_celsius = Some(temp);
}
}
}
} else if metric.name.contains("_wear_percent") {
if let Some(drive_name) = self.extract_drive_name(&metric.name) {
if let Some(wear) = metric.value.as_f32() {
self.ensure_drive_exists(agent_data, &drive_name);
if let Some(drive) = agent_data.system.storage.drives.iter_mut().find(|d| d.name == drive_name) {
drive.wear_percent = Some(wear);
}
}
}
} else if metric.name.contains("_health") {
if let Some(drive_name) = self.extract_drive_name(&metric.name) {
let health = metric.value.as_string();
self.ensure_drive_exists(agent_data, &drive_name);
if let Some(drive) = agent_data.system.storage.drives.iter_mut().find(|d| d.name == drive_name) {
drive.health = health;
}
}
} else if metric.name.contains("_fs_") {
// Filesystem metrics: disk_{pool}_fs_{filesystem}_{metric}
if let Some((pool_name, fs_name)) = self.extract_pool_and_filesystem(&metric.name) {
if metric.name.contains("_usage_percent") {
if let Some(usage) = metric.value.as_f32() {
self.ensure_filesystem_exists(agent_data, &pool_name, &fs_name, usage, 0.0, 0.0);
}
} else if metric.name.contains("_used_gb") {
if let Some(used) = metric.value.as_f32() {
self.update_filesystem_field(agent_data, &pool_name, &fs_name, |fs| fs.used_gb = used);
}
} else if metric.name.contains("_total_gb") {
if let Some(total) = metric.value.as_f32() {
self.update_filesystem_field(agent_data, &pool_name, &fs_name, |fs| fs.total_gb = total);
}
}
}
}
}
// Service metrics
else if metric.name.starts_with("service_") {
if let Some(service_name) = self.extract_service_name(&metric.name) {
if metric.name.contains("_status") {
let status = metric.value.as_string();
self.ensure_service_exists(agent_data, &service_name, &status);
} else if metric.name.contains("_memory_mb") {
if let Some(memory) = metric.value.as_f32() {
self.update_service_field(agent_data, &service_name, |svc| svc.memory_mb = memory);
}
} else if metric.name.contains("_disk_gb") {
if let Some(disk) = metric.value.as_f32() {
self.update_service_field(agent_data, &service_name, |svc| svc.disk_gb = disk);
}
}
}
}
// Backup metrics
else if metric.name.starts_with("backup_") {
if metric.name == "backup_status" {
agent_data.backup.status = metric.value.as_string();
} else if metric.name == "backup_last_run_timestamp" {
if let Some(timestamp) = metric.value.as_i64() {
agent_data.backup.last_run = Some(timestamp as u64);
}
} else if metric.name == "backup_next_scheduled_timestamp" {
if let Some(timestamp) = metric.value.as_i64() {
agent_data.backup.next_scheduled = Some(timestamp as u64);
}
} else if metric.name == "backup_size_gb" {
if let Some(size) = metric.value.as_f32() {
agent_data.backup.total_size_gb = Some(size);
}
} else if metric.name == "backup_repository_health" {
agent_data.backup.repository_health = Some(metric.value.as_string());
}
}
Ok(())
}
/// Extract drive name from metric like "disk_nvme0n1_temperature"
fn extract_drive_name(&self, metric_name: &str) -> Option<String> {
if metric_name.starts_with("disk_") {
let suffixes = ["_temperature", "_wear_percent", "_health"];
for suffix in suffixes {
if let Some(suffix_pos) = metric_name.rfind(suffix) {
return Some(metric_name[5..suffix_pos].to_string()); // Skip "disk_"
}
}
}
None
}
/// Extract pool and filesystem from "disk_{pool}_fs_{filesystem}_{metric}"
fn extract_pool_and_filesystem(&self, metric_name: &str) -> Option<(String, String)> {
if let Some(fs_pos) = metric_name.find("_fs_") {
let pool_name = metric_name[5..fs_pos].to_string(); // Skip "disk_"
let after_fs = &metric_name[fs_pos + 4..]; // Skip "_fs_"
if let Some(metric_pos) = after_fs.find('_') {
let fs_name = after_fs[..metric_pos].to_string();
return Some((pool_name, fs_name));
}
}
None
}
/// Extract service name from "service_{name}_{metric}"
fn extract_service_name(&self, metric_name: &str) -> Option<String> {
if metric_name.starts_with("service_") {
let suffixes = ["_status", "_memory_mb", "_disk_gb"];
for suffix in suffixes {
if let Some(suffix_pos) = metric_name.rfind(suffix) {
return Some(metric_name[8..suffix_pos].to_string()); // Skip "service_"
}
}
}
None
}
/// Ensure drive exists in agent_data
fn ensure_drive_exists(&self, agent_data: &mut AgentData, drive_name: &str) {
if !agent_data.system.storage.drives.iter().any(|d| d.name == drive_name) {
agent_data.system.storage.drives.push(DriveData {
name: drive_name.to_string(),
health: "UNKNOWN".to_string(),
temperature_celsius: None,
wear_percent: None,
filesystems: Vec::new(),
});
}
}
/// Ensure filesystem exists in the correct drive
fn ensure_filesystem_exists(&self, agent_data: &mut AgentData, pool_name: &str, fs_name: &str, usage_percent: f32, used_gb: f32, total_gb: f32) {
self.ensure_drive_exists(agent_data, pool_name);
if let Some(drive) = agent_data.system.storage.drives.iter_mut().find(|d| d.name == pool_name) {
if !drive.filesystems.iter().any(|fs| fs.mount == fs_name) {
drive.filesystems.push(FilesystemData {
mount: fs_name.to_string(),
usage_percent,
used_gb,
total_gb,
});
}
}
}
/// Update filesystem field
fn update_filesystem_field<F>(&self, agent_data: &mut AgentData, pool_name: &str, fs_name: &str, update_fn: F)
where F: FnOnce(&mut FilesystemData) {
if let Some(drive) = agent_data.system.storage.drives.iter_mut().find(|d| d.name == pool_name) {
if let Some(fs) = drive.filesystems.iter_mut().find(|fs| fs.mount == fs_name) {
update_fn(fs);
}
}
}
/// Ensure service exists
fn ensure_service_exists(&self, agent_data: &mut AgentData, service_name: &str, status: &str) {
if !agent_data.services.iter().any(|s| s.name == service_name) {
agent_data.services.push(ServiceData {
name: service_name.to_string(),
status: status.to_string(),
memory_mb: 0.0,
disk_gb: 0.0,
user_stopped: false, // TODO: Get from service tracker
});
} else if let Some(service) = agent_data.services.iter_mut().find(|s| s.name == service_name) {
service.status = status.to_string();
}
}
/// Update service field
fn update_service_field<F>(&self, agent_data: &mut AgentData, service_name: &str, update_fn: F)
where F: FnOnce(&mut ServiceData) {
if let Some(service) = agent_data.services.iter_mut().find(|s| s.name == service_name) {
update_fn(service);
}
}
async fn process_metrics(&mut self, metrics: &[Metric]) -> bool { async fn process_metrics(&mut self, metrics: &[Metric]) -> bool {
let mut status_changed = false; let mut status_changed = false;
for metric in metrics { for metric in metrics {
@@ -261,13 +555,11 @@ impl Agent {
/// Send standalone heartbeat for connectivity detection /// Send standalone heartbeat for connectivity detection
async fn send_heartbeat(&mut self) -> Result<()> { async fn send_heartbeat(&mut self) -> Result<()> {
let heartbeat_metric = self.get_heartbeat_metric(); // Create minimal agent data with just heartbeat
let message = MetricMessage::new( let agent_data = AgentData::new(self.hostname.clone(), self.get_agent_version());
self.hostname.clone(), // Heartbeat timestamp is already set in AgentData::new()
vec![heartbeat_metric],
); self.zmq_handler.publish_agent_data(&agent_data).await?;
self.zmq_handler.publish_metrics(&message).await?;
debug!("Sent standalone heartbeat for connectivity detection"); debug!("Sent standalone heartbeat for connectivity detection");
Ok(()) Ok(())
} }

View File

@@ -202,15 +202,34 @@ impl DiskCollector {
let (total_bytes, used_bytes) = self.get_filesystem_info(&mount_point) let (total_bytes, used_bytes) = self.get_filesystem_info(&mount_point)
.unwrap_or((0, 0)); .unwrap_or((0, 0));
// Parse member paths // Parse member paths - handle both full paths and numeric references
let member_paths: Vec<String> = device_sources let raw_paths: Vec<String> = device_sources
.split(':') .split(':')
.map(|s| s.trim().to_string()) .map(|s| s.trim().to_string())
.filter(|s| !s.is_empty()) .filter(|s| !s.is_empty())
.collect(); .collect();
// Convert numeric references to actual mount points if needed
let mut member_paths = if raw_paths.iter().any(|path| !path.starts_with('/')) {
// Handle numeric format like "1:2" by finding corresponding /mnt/disk* paths
self.resolve_numeric_mergerfs_paths(&raw_paths)?
} else {
// Already full paths
raw_paths
};
// For SnapRAID setups, include parity drives that are related to this pool's data drives
let related_parity_paths = self.discover_related_parity_drives(&member_paths)?;
member_paths.extend(related_parity_paths);
// Categorize as data vs parity drives // Categorize as data vs parity drives
let (data_drives, parity_drives) = self.categorize_pool_drives(&member_paths)?; let (data_drives, parity_drives) = match self.categorize_pool_drives(&member_paths) {
Ok(drives) => drives,
Err(e) => {
debug!("Failed to categorize drives for pool {}: {}. Skipping.", mount_point, e);
continue;
}
};
pools.push(MergerfsPool { pools.push(MergerfsPool {
mount_point, mount_point,
@@ -225,6 +244,38 @@ impl DiskCollector {
Ok(pools) Ok(pools)
} }
/// Discover parity drives that are related to the given data drives
fn discover_related_parity_drives(&self, data_drives: &[String]) -> Result<Vec<String>> {
let mount_devices = self.get_mount_devices()?;
let mut related_parity = Vec::new();
// Find parity drives that share the same parent directory as the data drives
for data_path in data_drives {
if let Some(parent_dir) = self.get_parent_directory(data_path) {
// Look for parity drives in the same parent directory
for (mount_point, _device) in &mount_devices {
if mount_point.contains("parity") && mount_point.starts_with(&parent_dir) {
if !related_parity.contains(mount_point) {
related_parity.push(mount_point.clone());
}
}
}
}
}
Ok(related_parity)
}
/// Get parent directory of a mount path (e.g., "/mnt/disk1" -> "/mnt")
fn get_parent_directory(&self, path: &str) -> Option<String> {
if let Some(last_slash) = path.rfind('/') {
if last_slash > 0 {
return Some(path[..last_slash].to_string());
}
}
None
}
/// Categorize pool member drives as data vs parity /// Categorize pool member drives as data vs parity
fn categorize_pool_drives(&self, member_paths: &[String]) -> Result<(Vec<DriveInfo>, Vec<DriveInfo>)> { fn categorize_pool_drives(&self, member_paths: &[String]) -> Result<(Vec<DriveInfo>, Vec<DriveInfo>)> {
let mut data_drives = Vec::new(); let mut data_drives = Vec::new();
@@ -286,6 +337,35 @@ impl DiskCollector {
}) })
} }
/// Resolve numeric mergerfs references like "1:2" to actual mount paths
fn resolve_numeric_mergerfs_paths(&self, numeric_refs: &[String]) -> Result<Vec<String>> {
let mut resolved_paths = Vec::new();
// Get all mount points that look like /mnt/disk* or /mnt/parity*
let mount_devices = self.get_mount_devices()?;
let mut disk_mounts: Vec<String> = mount_devices.keys()
.filter(|path| path.starts_with("/mnt/disk") || path.starts_with("/mnt/parity"))
.cloned()
.collect();
disk_mounts.sort(); // Ensure consistent ordering
for num_ref in numeric_refs {
if let Ok(index) = num_ref.parse::<usize>() {
// Convert 1-based index to 0-based
if index > 0 && index <= disk_mounts.len() {
resolved_paths.push(disk_mounts[index - 1].clone());
}
}
}
// Fallback: if we couldn't resolve, return the original paths
if resolved_paths.is_empty() {
resolved_paths = numeric_refs.to_vec();
}
Ok(resolved_paths)
}
/// Extract base device name from partition (e.g., "nvme0n1p2" -> "nvme0n1", "sda1" -> "sda") /// Extract base device name from partition (e.g., "nvme0n1p2" -> "nvme0n1", "sda1" -> "sda")
fn extract_base_device(&self, device_name: &str) -> String { fn extract_base_device(&self, device_name: &str) -> String {
// Handle NVMe devices (nvme0n1p1 -> nvme0n1) // Handle NVMe devices (nvme0n1p1 -> nvme0n1)
@@ -419,6 +499,17 @@ impl DiskCollector {
} }
} }
} }
// NVMe format: "Temperature:" (capital T)
if line.contains("Temperature:") {
if let Some(temp_part) = line.split("Temperature:").nth(1) {
if let Some(temp_str) = temp_part.split_whitespace().next() {
if let Ok(temp) = temp_str.parse::<f32>() {
return Some(temp);
}
}
}
}
// Legacy format: "temperature:" (lowercase)
if line.contains("temperature:") { if line.contains("temperature:") {
if let Some(temp_part) = line.split("temperature:").nth(1) { if let Some(temp_part) = line.split("temperature:").nth(1) {
if let Some(temp_str) = temp_part.split_whitespace().next() { if let Some(temp_str) = temp_part.split_whitespace().next() {
@@ -506,7 +597,7 @@ impl Collector for DiskCollector {
let topology = match self.discover_storage() { let topology = match self.discover_storage() {
Ok(topology) => topology, Ok(topology) => topology,
Err(e) => { Err(e) => {
debug!("Storage discovery failed: {}", e); tracing::error!("Storage discovery failed: {}", e);
return Ok(metrics); return Ok(metrics);
} }
}; };
@@ -695,7 +786,7 @@ impl DiskCollector {
value: MetricValue::Float(self.bytes_to_gb(filesystem.used_bytes)), value: MetricValue::Float(self.bytes_to_gb(filesystem.used_bytes)),
unit: Some("GB".to_string()), unit: Some("GB".to_string()),
description: Some(format!("{}: {}", filesystem.mount_point, self.bytes_to_human_readable(filesystem.used_bytes))), description: Some(format!("{}: {}", filesystem.mount_point, self.bytes_to_human_readable(filesystem.used_bytes))),
status: Status::Ok, status: fs_status.clone(),
timestamp, timestamp,
}); });
@@ -704,7 +795,7 @@ impl DiskCollector {
value: MetricValue::Float(self.bytes_to_gb(filesystem.total_bytes)), value: MetricValue::Float(self.bytes_to_gb(filesystem.total_bytes)),
unit: Some("GB".to_string()), unit: Some("GB".to_string()),
description: Some(format!("{}: {}", filesystem.mount_point, self.bytes_to_human_readable(filesystem.total_bytes))), description: Some(format!("{}: {}", filesystem.mount_point, self.bytes_to_human_readable(filesystem.total_bytes))),
status: Status::Ok, status: fs_status.clone(),
timestamp, timestamp,
}); });
@@ -737,7 +828,13 @@ impl DiskCollector {
timestamp: u64, timestamp: u64,
status_tracker: &mut StatusTracker status_tracker: &mut StatusTracker
) { ) {
let pool_name = pool.mount_point.trim_start_matches('/').replace('/', "_"); // Use consistent pool naming: extract mount point without leading slash
let pool_name = if pool.mount_point == "/" {
"root".to_string()
} else {
pool.mount_point.trim_start_matches('/').replace('/', "_")
};
if pool_name.is_empty() { if pool_name.is_empty() {
return; return;
} }
@@ -841,12 +938,12 @@ impl DiskCollector {
}); });
// Individual drive metrics // Individual drive metrics
for (i, drive) in pool.data_drives.iter().enumerate() { for drive in &pool.data_drives {
self.generate_pool_drive_metrics(metrics, &pool_name, &format!("data_{}", i), drive, timestamp, status_tracker); self.generate_pool_drive_metrics(metrics, &pool_name, &drive.device, drive, timestamp, status_tracker);
} }
for (i, drive) in pool.parity_drives.iter().enumerate() { for drive in &pool.parity_drives {
self.generate_pool_drive_metrics(metrics, &pool_name, &format!("parity_{}", i), drive, timestamp, status_tracker); self.generate_pool_drive_metrics(metrics, &pool_name, &drive.device, drive, timestamp, status_tracker);
} }
} }

View File

@@ -1,5 +1,5 @@
use anyhow::Result; use anyhow::Result;
use cm_dashboard_shared::{MessageEnvelope, MetricMessage}; use cm_dashboard_shared::{AgentData, MessageEnvelope};
use tracing::{debug, info}; use tracing::{debug, info};
use zmq::{Context, Socket, SocketType}; use zmq::{Context, Socket, SocketType};
@@ -43,17 +43,17 @@ impl ZmqHandler {
}) })
} }
/// Publish metrics message via ZMQ
pub async fn publish_metrics(&self, message: &MetricMessage) -> Result<()> { /// Publish agent data via ZMQ
pub async fn publish_agent_data(&self, data: &AgentData) -> Result<()> {
debug!( debug!(
"Publishing {} metrics for host {}", "Publishing agent data for host {}",
message.metrics.len(), data.hostname
message.hostname
); );
// Create message envelope // Create message envelope for agent data
let envelope = MessageEnvelope::metrics(message.clone()) let envelope = MessageEnvelope::agent_data(data.clone())
.map_err(|e| anyhow::anyhow!("Failed to create message envelope: {}", e))?; .map_err(|e| anyhow::anyhow!("Failed to create agent data envelope: {}", e))?;
// Serialize envelope // Serialize envelope
let serialized = serde_json::to_vec(&envelope)?; let serialized = serde_json::to_vec(&envelope)?;
@@ -61,11 +61,10 @@ impl ZmqHandler {
// Send via ZMQ // Send via ZMQ
self.publisher.send(&serialized, 0)?; self.publisher.send(&serialized, 0)?;
debug!("Published metrics message ({} bytes)", serialized.len()); debug!("Published agent data message ({} bytes)", serialized.len());
Ok(()) Ok(())
} }
/// Try to receive a command (non-blocking) /// Try to receive a command (non-blocking)
pub fn try_receive_command(&self) -> Result<Option<AgentCommand>> { pub fn try_receive_command(&self) -> Result<Option<AgentCommand>> {
match self.command_receiver.recv_bytes(zmq::DONTWAIT) { match self.command_receiver.recv_bytes(zmq::DONTWAIT) {

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "cm-dashboard" name = "cm-dashboard"
version = "0.1.117" version = "0.1.131"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@@ -20,12 +20,13 @@ pub struct Dashboard {
tui_app: Option<TuiApp>, tui_app: Option<TuiApp>,
terminal: Option<Terminal<CrosstermBackend<io::Stdout>>>, terminal: Option<Terminal<CrosstermBackend<io::Stdout>>>,
headless: bool, headless: bool,
raw_data: bool,
initial_commands_sent: std::collections::HashSet<String>, initial_commands_sent: std::collections::HashSet<String>,
config: DashboardConfig, config: DashboardConfig,
} }
impl Dashboard { impl Dashboard {
pub async fn new(config_path: Option<String>, headless: bool) -> Result<Self> { pub async fn new(config_path: Option<String>, headless: bool, raw_data: bool) -> Result<Self> {
info!("Initializing dashboard"); info!("Initializing dashboard");
// Load configuration - try default path if not specified // Load configuration - try default path if not specified
@@ -119,6 +120,7 @@ impl Dashboard {
tui_app, tui_app,
terminal, terminal,
headless, headless,
raw_data,
initial_commands_sent: std::collections::HashSet::new(), initial_commands_sent: std::collections::HashSet::new(),
config, config,
}) })
@@ -183,30 +185,35 @@ impl Dashboard {
// Check for new metrics // Check for new metrics
if last_metrics_check.elapsed() >= metrics_check_interval { if last_metrics_check.elapsed() >= metrics_check_interval {
if let Ok(Some(metric_message)) = self.zmq_consumer.receive_metrics().await { if let Ok(Some(agent_data)) = self.zmq_consumer.receive_agent_data().await {
debug!( debug!(
"Received metrics from {}: {} metrics", "Received agent data from {}",
metric_message.hostname, agent_data.hostname
metric_message.metrics.len()
); );
// Track first contact with host (no command needed - agent sends data every 2s) // Track first contact with host (no command needed - agent sends data every 2s)
let is_new_host = !self let is_new_host = !self
.initial_commands_sent .initial_commands_sent
.contains(&metric_message.hostname); .contains(&agent_data.hostname);
if is_new_host { if is_new_host {
info!( info!(
"First contact with host {} - data will update automatically", "First contact with host {} - data will update automatically",
metric_message.hostname agent_data.hostname
); );
self.initial_commands_sent self.initial_commands_sent
.insert(metric_message.hostname.clone()); .insert(agent_data.hostname.clone());
} }
// Update metric store // Show raw data if requested (before processing)
self.metric_store if self.raw_data {
.update_metrics(&metric_message.hostname, metric_message.metrics); println!("RAW AGENT DATA FROM {}:", agent_data.hostname);
println!("{}", serde_json::to_string_pretty(&agent_data).unwrap_or_else(|e| format!("Serialization error: {}", e)));
println!("{}", "".repeat(80));
}
// Update data store
self.metric_store.process_agent_data(agent_data);
// Check for agent version mismatches across hosts // Check for agent version mismatches across hosts
if let Some((current_version, outdated_hosts)) = self.metric_store.get_version_mismatches() { if let Some((current_version, outdated_hosts)) = self.metric_store.get_version_mismatches() {

View File

@@ -1,5 +1,5 @@
use anyhow::Result; use anyhow::Result;
use cm_dashboard_shared::{CommandOutputMessage, MessageEnvelope, MessageType, MetricMessage}; use cm_dashboard_shared::{AgentData, CommandOutputMessage, MessageEnvelope, MessageType};
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
use zmq::{Context, Socket, SocketType}; use zmq::{Context, Socket, SocketType};
@@ -117,8 +117,8 @@ impl ZmqConsumer {
} }
} }
/// Receive metrics from any connected agent (non-blocking) /// Receive agent data (non-blocking)
pub async fn receive_metrics(&mut self) -> Result<Option<MetricMessage>> { pub async fn receive_agent_data(&mut self) -> Result<Option<AgentData>> {
match self.subscriber.recv_bytes(zmq::DONTWAIT) { match self.subscriber.recv_bytes(zmq::DONTWAIT) {
Ok(data) => { Ok(data) => {
debug!("Received {} bytes from ZMQ", data.len()); debug!("Received {} bytes from ZMQ", data.len());
@@ -129,29 +129,27 @@ impl ZmqConsumer {
// Check message type // Check message type
match envelope.message_type { match envelope.message_type {
MessageType::Metrics => { MessageType::AgentData => {
let metrics = envelope let agent_data = envelope
.decode_metrics() .decode_agent_data()
.map_err(|e| anyhow::anyhow!("Failed to decode metrics: {}", e))?; .map_err(|e| anyhow::anyhow!("Failed to decode agent data: {}", e))?;
debug!( debug!(
"Received {} metrics from {}", "Received agent data from host {}",
metrics.metrics.len(), agent_data.hostname
metrics.hostname
); );
Ok(Some(agent_data))
Ok(Some(metrics))
} }
MessageType::Heartbeat => { MessageType::Heartbeat => {
debug!("Received heartbeat"); debug!("Received heartbeat");
Ok(None) // Don't return heartbeats as metrics Ok(None) // Don't return heartbeats
} }
MessageType::CommandOutput => { MessageType::CommandOutput => {
debug!("Received command output (will be handled by receive_command_output)"); debug!("Received command output (will be handled by receive_command_output)");
Ok(None) // Command output handled by separate method Ok(None) // Command output handled by separate method
} }
_ => { _ => {
debug!("Received non-metrics message: {:?}", envelope.message_type); debug!("Received unsupported message: {:?}", envelope.message_type);
Ok(None) Ok(None)
} }
} }
@@ -166,5 +164,6 @@ impl ZmqConsumer {
} }
} }
} }
} }

View File

@@ -51,6 +51,10 @@ struct Cli {
/// Run in headless mode (no TUI, just logging) /// Run in headless mode (no TUI, just logging)
#[arg(long)] #[arg(long)]
headless: bool, headless: bool,
/// Show raw agent data in headless mode
#[arg(long)]
raw_data: bool,
} }
#[tokio::main] #[tokio::main]
@@ -86,7 +90,7 @@ async fn main() -> Result<()> {
} }
// Create and run dashboard // Create and run dashboard
let mut dashboard = Dashboard::new(cli.config, cli.headless).await?; let mut dashboard = Dashboard::new(cli.config, cli.headless, cli.raw_data).await?;
// Setup graceful shutdown // Setup graceful shutdown
let ctrl_c = async { let ctrl_c = async {

View File

@@ -1,4 +1,4 @@
use cm_dashboard_shared::Metric; use cm_dashboard_shared::{AgentData, Metric};
use std::collections::HashMap; use std::collections::HashMap;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
@@ -76,6 +76,286 @@ impl MetricStore {
); );
} }
/// Process structured agent data (temporary bridge - converts back to metrics)
/// TODO: Replace entire metric system with direct structured data processing
pub fn process_agent_data(&mut self, agent_data: AgentData) {
let metrics = self.convert_agent_data_to_metrics(&agent_data);
self.update_metrics(&agent_data.hostname, metrics);
}
/// Convert structured agent data to legacy metrics (temporary bridge)
fn convert_agent_data_to_metrics(&self, agent_data: &AgentData) -> Vec<Metric> {
use cm_dashboard_shared::{Metric, MetricValue, Status};
let mut metrics = Vec::new();
// Convert CPU data
metrics.push(Metric::new(
"cpu_load_1min".to_string(),
MetricValue::Float(agent_data.system.cpu.load_1min),
Status::Ok,
));
metrics.push(Metric::new(
"cpu_load_5min".to_string(),
MetricValue::Float(agent_data.system.cpu.load_5min),
Status::Ok,
));
metrics.push(Metric::new(
"cpu_load_15min".to_string(),
MetricValue::Float(agent_data.system.cpu.load_15min),
Status::Ok,
));
metrics.push(Metric::new(
"cpu_frequency_mhz".to_string(),
MetricValue::Float(agent_data.system.cpu.frequency_mhz),
Status::Ok,
));
if let Some(temp) = agent_data.system.cpu.temperature_celsius {
metrics.push(Metric::new(
"cpu_temperature_celsius".to_string(),
MetricValue::Float(temp),
Status::Ok,
));
}
// Convert Memory data
metrics.push(Metric::new(
"memory_usage_percent".to_string(),
MetricValue::Float(agent_data.system.memory.usage_percent),
Status::Ok,
));
metrics.push(Metric::new(
"memory_total_gb".to_string(),
MetricValue::Float(agent_data.system.memory.total_gb),
Status::Ok,
));
metrics.push(Metric::new(
"memory_used_gb".to_string(),
MetricValue::Float(agent_data.system.memory.used_gb),
Status::Ok,
));
metrics.push(Metric::new(
"memory_available_gb".to_string(),
MetricValue::Float(agent_data.system.memory.available_gb),
Status::Ok,
));
metrics.push(Metric::new(
"memory_swap_total_gb".to_string(),
MetricValue::Float(agent_data.system.memory.swap_total_gb),
Status::Ok,
));
metrics.push(Metric::new(
"memory_swap_used_gb".to_string(),
MetricValue::Float(agent_data.system.memory.swap_used_gb),
Status::Ok,
));
// Convert tmpfs data
for tmpfs in &agent_data.system.memory.tmpfs {
if tmpfs.mount == "/tmp" {
metrics.push(Metric::new(
"memory_tmp_usage_percent".to_string(),
MetricValue::Float(tmpfs.usage_percent),
Status::Ok,
));
metrics.push(Metric::new(
"memory_tmp_used_gb".to_string(),
MetricValue::Float(tmpfs.used_gb),
Status::Ok,
));
metrics.push(Metric::new(
"memory_tmp_total_gb".to_string(),
MetricValue::Float(tmpfs.total_gb),
Status::Ok,
));
}
}
// Add agent metadata
metrics.push(Metric::new(
"agent_version".to_string(),
MetricValue::String(agent_data.agent_version.clone()),
Status::Ok,
));
metrics.push(Metric::new(
"agent_heartbeat".to_string(),
MetricValue::Integer(agent_data.timestamp as i64),
Status::Ok,
));
// Convert storage data
for drive in &agent_data.system.storage.drives {
// Drive-level metrics
if let Some(temp) = drive.temperature_celsius {
metrics.push(Metric::new(
format!("disk_{}_temperature", drive.name),
MetricValue::Float(temp),
Status::Ok,
));
}
if let Some(wear) = drive.wear_percent {
metrics.push(Metric::new(
format!("disk_{}_wear_percent", drive.name),
MetricValue::Float(wear),
Status::Ok,
));
}
metrics.push(Metric::new(
format!("disk_{}_health", drive.name),
MetricValue::String(drive.health.clone()),
Status::Ok,
));
// Filesystem metrics
for fs in &drive.filesystems {
let fs_base = format!("disk_{}_fs_{}", drive.name, fs.mount.replace('/', "root"));
metrics.push(Metric::new(
format!("{}_usage_percent", fs_base),
MetricValue::Float(fs.usage_percent),
Status::Ok,
));
metrics.push(Metric::new(
format!("{}_used_gb", fs_base),
MetricValue::Float(fs.used_gb),
Status::Ok,
));
metrics.push(Metric::new(
format!("{}_total_gb", fs_base),
MetricValue::Float(fs.total_gb),
Status::Ok,
));
}
}
// Convert storage pools
for pool in &agent_data.system.storage.pools {
let pool_base = format!("disk_{}", pool.name);
metrics.push(Metric::new(
format!("{}_usage_percent", pool_base),
MetricValue::Float(pool.usage_percent),
Status::Ok,
));
metrics.push(Metric::new(
format!("{}_used_gb", pool_base),
MetricValue::Float(pool.used_gb),
Status::Ok,
));
metrics.push(Metric::new(
format!("{}_total_gb", pool_base),
MetricValue::Float(pool.total_gb),
Status::Ok,
));
metrics.push(Metric::new(
format!("{}_pool_type", pool_base),
MetricValue::String(pool.pool_type.clone()),
Status::Ok,
));
metrics.push(Metric::new(
format!("{}_mount_point", pool_base),
MetricValue::String(pool.mount.clone()),
Status::Ok,
));
// Pool drive data
for drive in &pool.data_drives {
if let Some(temp) = drive.temperature_celsius {
metrics.push(Metric::new(
format!("disk_{}_{}_temperature", pool.name, drive.name),
MetricValue::Float(temp),
Status::Ok,
));
}
if let Some(wear) = drive.wear_percent {
metrics.push(Metric::new(
format!("disk_{}_{}_wear_percent", pool.name, drive.name),
MetricValue::Float(wear),
Status::Ok,
));
}
}
for drive in &pool.parity_drives {
if let Some(temp) = drive.temperature_celsius {
metrics.push(Metric::new(
format!("disk_{}_{}_temperature", pool.name, drive.name),
MetricValue::Float(temp),
Status::Ok,
));
}
if let Some(wear) = drive.wear_percent {
metrics.push(Metric::new(
format!("disk_{}_{}_wear_percent", pool.name, drive.name),
MetricValue::Float(wear),
Status::Ok,
));
}
}
}
// Convert service data
for service in &agent_data.services {
let service_base = format!("service_{}", service.name);
metrics.push(Metric::new(
format!("{}_status", service_base),
MetricValue::String(service.status.clone()),
Status::Ok,
));
metrics.push(Metric::new(
format!("{}_memory_mb", service_base),
MetricValue::Float(service.memory_mb),
Status::Ok,
));
metrics.push(Metric::new(
format!("{}_disk_gb", service_base),
MetricValue::Float(service.disk_gb),
Status::Ok,
));
if service.user_stopped {
metrics.push(Metric::new(
format!("{}_user_stopped", service_base),
MetricValue::Boolean(true),
Status::Ok,
));
}
}
// Convert backup data
metrics.push(Metric::new(
"backup_status".to_string(),
MetricValue::String(agent_data.backup.status.clone()),
Status::Ok,
));
if let Some(last_run) = agent_data.backup.last_run {
metrics.push(Metric::new(
"backup_last_run_timestamp".to_string(),
MetricValue::Integer(last_run as i64),
Status::Ok,
));
}
if let Some(next_scheduled) = agent_data.backup.next_scheduled {
metrics.push(Metric::new(
"backup_next_scheduled_timestamp".to_string(),
MetricValue::Integer(next_scheduled as i64),
Status::Ok,
));
}
if let Some(size) = agent_data.backup.total_size_gb {
metrics.push(Metric::new(
"backup_size_gb".to_string(),
MetricValue::Float(size),
Status::Ok,
));
}
if let Some(health) = &agent_data.backup.repository_health {
metrics.push(Metric::new(
"backup_repository_health".to_string(),
MetricValue::String(health.clone()),
Status::Ok,
));
}
metrics
}
/// Get current metric for a specific host /// Get current metric for a specific host
pub fn get_metric(&self, hostname: &str, metric_name: &str) -> Option<&Metric> { pub fn get_metric(&self, hostname: &str, metric_name: &str) -> Option<&Metric> {
self.current_metrics.get(hostname)?.get(metric_name) self.current_metrics.get(hostname)?.get(metric_name)

View File

@@ -146,14 +146,14 @@ impl SystemWidget {
self.agent_hash.as_ref() self.agent_hash.as_ref()
} }
/// Get mount point for a pool name /// Get default mount point for a pool name (fallback only - should use actual mount_point metrics)
fn get_mount_point_for_pool(&self, pool_name: &str) -> String { fn get_mount_point_for_pool(&self, pool_name: &str) -> String {
match pool_name { // For device names, use the device name directly as display name
"root" => "/".to_string(), if pool_name.starts_with("nvme") || pool_name.starts_with("sd") || pool_name.starts_with("hd") {
"steampool" => "/mnt/steampool".to_string(), pool_name.to_string()
"steampool_1" => "/steampool_1".to_string(), } else {
"steampool_2" => "/steampool_2".to_string(), // For other pools, use the pool name as-is (will be overridden by mount_point metric)
_ => format!("/{}", pool_name), // Default fallback pool_name.to_string()
} }
} }
@@ -164,10 +164,9 @@ impl SystemWidget {
for metric in metrics { for metric in metrics {
if metric.name.starts_with("disk_") { if metric.name.starts_with("disk_") {
if let Some(pool_name) = self.extract_pool_name(&metric.name) { if let Some(pool_name) = self.extract_pool_name(&metric.name) {
let mount_point = self.get_mount_point_for_pool(&pool_name);
let pool = pools.entry(pool_name.clone()).or_insert_with(|| StoragePool { let pool = pools.entry(pool_name.clone()).or_insert_with(|| StoragePool {
name: pool_name.clone(), name: pool_name.clone(),
mount_point: mount_point.clone(), mount_point: self.get_mount_point_for_pool(&pool_name), // Default fallback
pool_type: "single".to_string(), // Default, will be updated pool_type: "single".to_string(), // Default, will be updated
pool_health: None, pool_health: None,
drives: Vec::new(), drives: Vec::new(),
@@ -180,7 +179,8 @@ impl SystemWidget {
}); });
// Parse different metric types // Parse different metric types
if metric.name.contains("_usage_percent") { if metric.name.contains("_usage_percent") && !metric.name.contains("_fs_") {
// Only use drive-level metrics for pool totals, not filesystem metrics
if let MetricValue::Float(usage) = metric.value { if let MetricValue::Float(usage) = metric.value {
pool.usage_percent = Some(usage); pool.usage_percent = Some(usage);
pool.status = metric.status.clone(); pool.status = metric.status.clone();
@@ -195,6 +195,10 @@ impl SystemWidget {
if let MetricValue::Float(total) = metric.value { if let MetricValue::Float(total) = metric.value {
pool.total_gb = Some(total); pool.total_gb = Some(total);
} }
} else if metric.name.contains("_mount_point") {
if let MetricValue::String(mount_point) = &metric.value {
pool.mount_point = mount_point.clone();
}
} else if metric.name.contains("_pool_type") { } else if metric.name.contains("_pool_type") {
if let MetricValue::String(pool_type) = &metric.value { if let MetricValue::String(pool_type) = &metric.value {
pool.pool_type = pool_type.clone(); pool.pool_type = pool_type.clone();
@@ -204,6 +208,13 @@ impl SystemWidget {
pool.pool_health = Some(health.clone()); pool.pool_health = Some(health.clone());
pool.health_status = metric.status.clone(); pool.health_status = metric.status.clone();
} }
} else if metric.name.contains("_health") && !metric.name.contains("_pool_health") {
// Handle physical drive health metrics (disk_{drive}_health)
if let MetricValue::String(health) = &metric.value {
// For physical drives, use the drive health as pool health
pool.pool_health = Some(health.clone());
pool.health_status = metric.status.clone();
}
} else if metric.name.contains("_temperature") { } else if metric.name.contains("_temperature") {
if let Some(drive_name) = self.extract_drive_name(&metric.name) { if let Some(drive_name) = self.extract_drive_name(&metric.name) {
// Find existing drive or create new one // Find existing drive or create new one
@@ -221,12 +232,16 @@ impl SystemWidget {
if let MetricValue::Float(temp) = metric.value { if let MetricValue::Float(temp) = metric.value {
drive.temperature = Some(temp); drive.temperature = Some(temp);
drive.status = metric.status.clone(); drive.status = metric.status.clone();
// For physical drives, if this is the main drive, also update pool health
if drive.name == pool.name && pool.health_status == Status::Unknown {
pool.health_status = metric.status.clone();
}
} }
} }
} }
} else if metric.name.contains("_wear_percent") { } else if metric.name.contains("_wear_percent") {
if let Some(drive_name) = self.extract_drive_name(&metric.name) { if let Some(drive_name) = self.extract_drive_name(&metric.name) {
// Find existing drive or create new one // For physical drives, ensure we create the drive object
let drive_exists = pool.drives.iter().any(|d| d.name == drive_name); let drive_exists = pool.drives.iter().any(|d| d.name == drive_name);
if !drive_exists { if !drive_exists {
pool.drives.push(StorageDrive { pool.drives.push(StorageDrive {
@@ -241,6 +256,10 @@ impl SystemWidget {
if let MetricValue::Float(wear) = metric.value { if let MetricValue::Float(wear) = metric.value {
drive.wear_percent = Some(wear); drive.wear_percent = Some(wear);
drive.status = metric.status.clone(); drive.status = metric.status.clone();
// For physical drives, if this is the main drive, also update pool health
if drive.name == pool.name && pool.health_status == Status::Unknown {
pool.health_status = metric.status.clone();
}
} }
} }
} }
@@ -342,38 +361,53 @@ impl SystemWidget {
/// Extract pool name from disk metric name /// Extract pool name from disk metric name
fn extract_pool_name(&self, metric_name: &str) -> Option<String> { fn extract_pool_name(&self, metric_name: &str) -> Option<String> {
// Pattern: disk_{pool_name}_{drive_name}_{metric_type} // Pattern: disk_{pool_name}_{various suffixes}
// Since pool_name can contain underscores, work backwards from known metric suffixes // Since pool_name can contain underscores, work backwards from known metric suffixes
if metric_name.starts_with("disk_") { if metric_name.starts_with("disk_") {
// First try drive-specific metrics that have device names // Handle filesystem metrics: disk_{pool}_fs_{filesystem}_{metric}
if let Some(suffix_pos) = metric_name.rfind("_temperature") if metric_name.contains("_fs_") {
.or_else(|| metric_name.rfind("_wear_percent"))
.or_else(|| metric_name.rfind("_health")) {
// Find the second-to-last underscore to get pool name
let before_suffix = &metric_name[..suffix_pos];
if let Some(drive_start) = before_suffix.rfind('_') {
if drive_start > 5 {
return Some(metric_name[5..drive_start].to_string()); // Skip "disk_"
}
}
}
// Handle filesystem metrics: disk_{pool}_fs_{filesystem}_{metric}
else if metric_name.contains("_fs_") {
if let Some(fs_pos) = metric_name.find("_fs_") { if let Some(fs_pos) = metric_name.find("_fs_") {
return Some(metric_name[5..fs_pos].to_string()); // Skip "disk_", extract pool name before "_fs_" return Some(metric_name[5..fs_pos].to_string()); // Skip "disk_", extract pool name before "_fs_"
} }
} }
// For pool-level metrics (usage_percent, used_gb, total_gb), take everything before the metric suffix
else if let Some(suffix_pos) = metric_name.rfind("_usage_percent") // Handle pool-level metrics (usage_percent, used_gb, total_gb, mount_point, pool_type, pool_health)
.or_else(|| metric_name.rfind("_used_gb")) // Use rfind to get the last occurrence of these suffixes
.or_else(|| metric_name.rfind("_total_gb")) let pool_suffixes = ["_usage_percent", "_used_gb", "_total_gb", "_available_gb", "_mount_point", "_pool_type", "_pool_health"];
.or_else(|| metric_name.rfind("_available_gb")) { for suffix in pool_suffixes {
return Some(metric_name[5..suffix_pos].to_string()); // Skip "disk_" if let Some(suffix_pos) = metric_name.rfind(suffix) {
return Some(metric_name[5..suffix_pos].to_string()); // Skip "disk_"
}
} }
// Fallback to old behavior for unknown patterns
else if let Some(captures) = metric_name.strip_prefix("disk_") { // Handle physical drive metrics: disk_{drive}_health, disk_{drive}_wear_percent, and disk_{drive}_temperature
if let Some(pos) = captures.find('_') { if (metric_name.ends_with("_health") && !metric_name.contains("_pool_health"))
return Some(captures[..pos].to_string()); || metric_name.ends_with("_wear_percent")
|| metric_name.ends_with("_temperature") {
// Count underscores to distinguish physical drive metrics (disk_{drive}_metric)
// from pool drive metrics (disk_{pool}_{drive}_metric)
let underscore_count = metric_name.matches('_').count();
// disk_nvme0n1_wear_percent has 3 underscores: disk_nvme0n1_wear_percent
if underscore_count == 3 { // disk_{drive}_metric (where drive has underscores)
if let Some(suffix_pos) = metric_name.rfind("_health")
.or_else(|| metric_name.rfind("_wear_percent"))
.or_else(|| metric_name.rfind("_temperature")) {
return Some(metric_name[5..suffix_pos].to_string()); // Skip "disk_"
}
}
}
// Handle drive-specific metrics: disk_{pool}_{drive}_{metric}
let drive_suffixes = ["_temperature", "_health"];
for suffix in drive_suffixes {
if let Some(suffix_pos) = metric_name.rfind(suffix) {
// Extract pool name by finding the second-to-last underscore
let before_suffix = &metric_name[..suffix_pos];
if let Some(drive_start) = before_suffix.rfind('_') {
if drive_start > 5 {
return Some(metric_name[5..drive_start].to_string()); // Skip "disk_"
}
}
} }
} }
} }
@@ -407,16 +441,22 @@ impl SystemWidget {
/// Extract drive name from disk metric name /// Extract drive name from disk metric name
fn extract_drive_name(&self, metric_name: &str) -> Option<String> { fn extract_drive_name(&self, metric_name: &str) -> Option<String> {
// Pattern: disk_{pool_name}_{drive_name}_{metric_type} // Pattern: disk_{pool_name}_{drive_name}_{metric_type} OR disk_{drive_name}_{metric_type}
// Since pool_name can contain underscores, work backwards from known metric suffixes // Pool drives: disk_srv_media_sdb_temperature
// Physical drives: disk_nvme0n1_temperature
if metric_name.starts_with("disk_") { if metric_name.starts_with("disk_") {
if let Some(suffix_pos) = metric_name.rfind("_temperature") if let Some(suffix_pos) = metric_name.rfind("_temperature")
.or_else(|| metric_name.rfind("_wear_percent")) .or_else(|| metric_name.rfind("_wear_percent"))
.or_else(|| metric_name.rfind("_health")) { .or_else(|| metric_name.rfind("_health")) {
// Find the second-to-last underscore to get the drive name
let before_suffix = &metric_name[..suffix_pos]; let before_suffix = &metric_name[..suffix_pos];
// Extract the last component as drive name (e.g., "sdb", "sdc", "nvme0n1")
if let Some(drive_start) = before_suffix.rfind('_') { if let Some(drive_start) = before_suffix.rfind('_') {
return Some(before_suffix[drive_start + 1..].to_string()); return Some(before_suffix[drive_start + 1..].to_string());
} else {
// Handle physical drive metrics: disk_{drive}_metric (no pool)
// Extract everything after "disk_" as the drive name
return Some(before_suffix[5..].to_string()); // Skip "disk_"
} }
} }
} }
@@ -429,7 +469,28 @@ impl SystemWidget {
for pool in &self.storage_pools { for pool in &self.storage_pools {
// Pool header line with type and health // Pool header line with type and health
let pool_label = if pool.pool_type == "single" { let pool_label = if pool.pool_type.starts_with("drive (") {
// For physical drives, show the drive name with temperature and wear percentage if available
// Look for any drive with temp/wear data (physical drives may have drives named after the pool)
let temp_opt = pool.drives.iter()
.find_map(|d| d.temperature);
let wear_opt = pool.drives.iter()
.find_map(|d| d.wear_percent);
let mut drive_info = Vec::new();
if let Some(temp) = temp_opt {
drive_info.push(format!("T: {:.0}°C", temp));
}
if let Some(wear) = wear_opt {
drive_info.push(format!("W: {:.0}%", wear));
}
if drive_info.is_empty() {
format!("{}:", pool.name)
} else {
format!("{} {}:", pool.name, drive_info.join(" "))
}
} else if pool.pool_type == "single" {
format!("{}:", pool.mount_point) format!("{}:", pool.mount_point)
} else { } else {
format!("{} ({}):", pool.mount_point, pool.pool_type) format!("{} ({}):", pool.mount_point, pool.pool_type)
@@ -440,49 +501,32 @@ impl SystemWidget {
); );
lines.push(Line::from(pool_spans)); lines.push(Line::from(pool_spans));
// Pool health line (for multi-disk pools) // Skip pool health line as discussed - removed
if pool.pool_type != "single" {
if let Some(health) = &pool.pool_health { // Total usage line (only show for multi-drive pools, skip for single physical drives)
let health_text = match health.as_str() { if !pool.pool_type.starts_with("drive (") {
"healthy" => format!("Pool Status: {} Healthy", let usage_text = match (pool.usage_percent, pool.used_gb, pool.total_gb) {
if pool.drives.len() > 1 { format!("({} drives)", pool.drives.len()) } else { String::new() }), (Some(pct), Some(used), Some(total)) => {
"degraded" => "Pool Status: ⚠ Degraded".to_string(), format!("Total: {:.0}% {:.1}GB/{:.1}GB", pct, used, total)
"critical" => "Pool Status: ✗ Critical".to_string(), }
"rebuilding" => "Pool Status: ⟳ Rebuilding".to_string(), _ => "Total: —% —GB/—GB".to_string(),
_ => format!("Pool Status: ? {}", health), };
};
let has_drives = !pool.drives.is_empty();
let mut health_spans = vec![ let has_filesystems = !pool.filesystems.is_empty();
Span::raw(" "), let has_children = has_drives || has_filesystems;
Span::styled("├─ ", Typography::tree()), let tree_symbol = if has_children { "├─" } else { "└─" };
]; let mut usage_spans = vec![
health_spans.extend(StatusIcons::create_status_spans(pool.health_status.clone(), &health_text)); Span::raw(" "),
lines.push(Line::from(health_spans)); Span::styled(tree_symbol, Typography::tree()),
} Span::raw(" "),
];
usage_spans.extend(StatusIcons::create_status_spans(pool.status.clone(), &usage_text));
lines.push(Line::from(usage_spans));
} }
// Total usage line (always show for pools)
let usage_text = match (pool.usage_percent, pool.used_gb, pool.total_gb) {
(Some(pct), Some(used), Some(total)) => {
format!("Total: {:.0}% {:.1}GB/{:.1}GB", pct, used, total)
}
_ => "Total: —% —GB/—GB".to_string(),
};
let has_drives = !pool.drives.is_empty();
let has_filesystems = !pool.filesystems.is_empty();
let has_children = has_drives || has_filesystems;
let tree_symbol = if has_children { "├─" } else { "└─" };
let mut usage_spans = vec![
Span::raw(" "),
Span::styled(tree_symbol, Typography::tree()),
Span::raw(" "),
];
usage_spans.extend(StatusIcons::create_status_spans(pool.status.clone(), &usage_text));
lines.push(Line::from(usage_spans));
// Drive lines with enhanced grouping // Drive lines with enhanced grouping
if pool.pool_type != "single" && pool.drives.len() > 1 { if pool.pool_type.contains("mergerfs") && pool.drives.len() > 1 {
// Group drives by type for mergerfs pools // Group drives by type for mergerfs pools
let (data_drives, parity_drives): (Vec<_>, Vec<_>) = pool.drives.iter().enumerate() let (data_drives, parity_drives): (Vec<_>, Vec<_>) = pool.drives.iter().enumerate()
.partition(|(_, drive)| { .partition(|(_, drive)| {
@@ -491,7 +535,7 @@ impl SystemWidget {
}); });
// Show data drives // Show data drives
if !data_drives.is_empty() && pool.pool_type.contains("mergerfs") { if !data_drives.is_empty() {
lines.push(Line::from(vec![ lines.push(Line::from(vec![
Span::raw(" "), Span::raw(" "),
Span::styled("├─ ", Typography::tree()), Span::styled("├─ ", Typography::tree()),
@@ -509,7 +553,7 @@ impl SystemWidget {
} }
// Show parity drives // Show parity drives
if !parity_drives.is_empty() && pool.pool_type.contains("mergerfs") { if !parity_drives.is_empty() {
lines.push(Line::from(vec![ lines.push(Line::from(vec![
Span::raw(" "), Span::raw(" "),
Span::styled("└─ ", Typography::tree()), Span::styled("└─ ", Typography::tree()),
@@ -524,43 +568,16 @@ impl SystemWidget {
self.render_drive_line(&mut lines, drive, " ├─"); self.render_drive_line(&mut lines, drive, " ├─");
} }
} }
} else { }
// Regular drive listing for non-mergerfs pools } else if pool.pool_type != "single" && pool.drives.len() > 1 {
for (i, drive) in pool.drives.iter().enumerate() { // Regular drive listing for non-mergerfs multi-drive pools
let is_last = i == pool.drives.len() - 1; for (i, drive) in pool.drives.iter().enumerate() {
let tree_symbol = if is_last { "└─" } else { "├─" }; let is_last = i == pool.drives.len() - 1;
self.render_drive_line(&mut lines, drive, tree_symbol); let tree_symbol = if is_last { "└─" } else { "├─" };
} self.render_drive_line(&mut lines, drive, tree_symbol);
} }
} else if pool.pool_type.starts_with("drive (") { } else if pool.pool_type.starts_with("drive (") {
// Physical drive pools: show drive info + filesystem children // Physical drive pools: wear data shown in header, skip drive lines, show filesystems directly
// First show drive information
for drive in &pool.drives {
let mut drive_info = Vec::new();
if let Some(temp) = drive.temperature {
drive_info.push(format!("T: {:.0}°C", temp));
}
if let Some(wear) = drive.wear_percent {
drive_info.push(format!("W: {:.0}%", wear));
}
let drive_text = if drive_info.is_empty() {
format!("Drive: {}", drive.name)
} else {
format!("Drive: {}", drive_info.join(" "))
};
let has_filesystems = !pool.filesystems.is_empty();
let tree_symbol = if has_filesystems { "├─" } else { "└─" };
let mut drive_spans = vec![
Span::raw(" "),
Span::styled(tree_symbol, Typography::tree()),
Span::raw(" "),
];
drive_spans.extend(StatusIcons::create_status_spans(drive.status.clone(), &drive_text));
lines.push(Line::from(drive_spans));
}
// Then show filesystem children
for (i, filesystem) in pool.filesystems.iter().enumerate() { for (i, filesystem) in pool.filesystems.iter().enumerate() {
let is_last = i == pool.filesystems.len() - 1; let is_last = i == pool.filesystems.len() - 1;
let tree_symbol = if is_last { "└─" } else { "├─" }; let tree_symbol = if is_last { "└─" } else { "├─" };
@@ -611,10 +628,12 @@ impl SystemWidget {
if let Some(wear) = drive.wear_percent { if let Some(wear) = drive.wear_percent {
drive_info.push(format!("W: {:.0}%", wear)); drive_info.push(format!("W: {:.0}%", wear));
} }
// Always show drive name with info, or just name if no info available
let drive_text = if drive_info.is_empty() { let drive_text = if drive_info.is_empty() {
drive.name.clone() drive.name.clone()
} else { } else {
format!("{} {}", drive.name, drive_info.join(" ")) format!("{} {}", drive.name, drive_info.join(" "))
}; };
let mut drive_spans = vec![ let mut drive_spans = vec![

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "cm-dashboard-shared" name = "cm-dashboard-shared"
version = "0.1.117" version = "0.1.131"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

161
shared/src/agent_data.rs Normal file
View File

@@ -0,0 +1,161 @@
use serde::{Deserialize, Serialize};
/// Complete structured data from an agent
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentData {
pub hostname: String,
pub agent_version: String,
pub timestamp: u64,
pub system: SystemData,
pub services: Vec<ServiceData>,
pub backup: BackupData,
}
/// System-level monitoring data
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemData {
pub cpu: CpuData,
pub memory: MemoryData,
pub storage: StorageData,
}
/// CPU monitoring data
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CpuData {
pub load_1min: f32,
pub load_5min: f32,
pub load_15min: f32,
pub frequency_mhz: f32,
pub temperature_celsius: Option<f32>,
}
/// Memory monitoring data
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryData {
pub usage_percent: f32,
pub total_gb: f32,
pub used_gb: f32,
pub available_gb: f32,
pub swap_total_gb: f32,
pub swap_used_gb: f32,
pub tmpfs: Vec<TmpfsData>,
}
/// Tmpfs filesystem data
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TmpfsData {
pub mount: String,
pub usage_percent: f32,
pub used_gb: f32,
pub total_gb: f32,
}
/// Storage monitoring data
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StorageData {
pub drives: Vec<DriveData>,
pub pools: Vec<PoolData>,
}
/// Individual drive data
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DriveData {
pub name: String,
pub health: String,
pub temperature_celsius: Option<f32>,
pub wear_percent: Option<f32>,
pub filesystems: Vec<FilesystemData>,
}
/// Filesystem on a drive
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FilesystemData {
pub mount: String,
pub usage_percent: f32,
pub used_gb: f32,
pub total_gb: f32,
}
/// Storage pool (MergerFS, RAID, etc.)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PoolData {
pub name: String,
pub mount: String,
pub pool_type: String, // "mergerfs", "raid", etc.
pub health: String,
pub usage_percent: f32,
pub used_gb: f32,
pub total_gb: f32,
pub data_drives: Vec<PoolDriveData>,
pub parity_drives: Vec<PoolDriveData>,
}
/// Drive in a storage pool
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PoolDriveData {
pub name: String,
pub temperature_celsius: Option<f32>,
pub wear_percent: Option<f32>,
pub health: String,
}
/// Service monitoring data
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceData {
pub name: String,
pub status: String, // "active", "inactive", "failed"
pub memory_mb: f32,
pub disk_gb: f32,
pub user_stopped: bool,
}
/// Backup system data
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackupData {
pub status: String,
pub last_run: Option<u64>,
pub next_scheduled: Option<u64>,
pub total_size_gb: Option<f32>,
pub repository_health: Option<String>,
}
impl AgentData {
/// Create new agent data with current timestamp
pub fn new(hostname: String, agent_version: String) -> Self {
Self {
hostname,
agent_version,
timestamp: chrono::Utc::now().timestamp() as u64,
system: SystemData {
cpu: CpuData {
load_1min: 0.0,
load_5min: 0.0,
load_15min: 0.0,
frequency_mhz: 0.0,
temperature_celsius: None,
},
memory: MemoryData {
usage_percent: 0.0,
total_gb: 0.0,
used_gb: 0.0,
available_gb: 0.0,
swap_total_gb: 0.0,
swap_used_gb: 0.0,
tmpfs: Vec::new(),
},
storage: StorageData {
drives: Vec::new(),
pools: Vec::new(),
},
},
services: Vec::new(),
backup: BackupData {
status: "unknown".to_string(),
last_run: None,
next_scheduled: None,
total_size_gb: None,
repository_health: None,
},
}
}
}

View File

@@ -1,8 +1,10 @@
pub mod agent_data;
pub mod cache; pub mod cache;
pub mod error; pub mod error;
pub mod metrics; pub mod metrics;
pub mod protocol; pub mod protocol;
pub use agent_data::*;
pub use cache::*; pub use cache::*;
pub use error::*; pub use error::*;
pub use metrics::*; pub use metrics::*;

View File

@@ -1,13 +1,9 @@
use crate::metrics::Metric; use crate::agent_data::AgentData;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
/// Message sent from agent to dashboard via ZMQ /// Message sent from agent to dashboard via ZMQ
#[derive(Debug, Clone, Serialize, Deserialize)] /// Always structured data - no legacy metrics support
pub struct MetricMessage { pub type AgentMessage = AgentData;
pub hostname: String,
pub timestamp: u64,
pub metrics: Vec<Metric>,
}
/// Command output streaming message /// Command output streaming message
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@@ -20,15 +16,6 @@ pub struct CommandOutputMessage {
pub timestamp: u64, pub timestamp: u64,
} }
impl MetricMessage {
pub fn new(hostname: String, metrics: Vec<Metric>) -> Self {
Self {
hostname,
timestamp: chrono::Utc::now().timestamp() as u64,
metrics,
}
}
}
impl CommandOutputMessage { impl CommandOutputMessage {
pub fn new(hostname: String, command_id: String, command_type: String, output_line: String, is_complete: bool) -> Self { pub fn new(hostname: String, command_id: String, command_type: String, output_line: String, is_complete: bool) -> Self {
@@ -59,8 +46,8 @@ pub enum Command {
pub enum CommandResponse { pub enum CommandResponse {
/// Acknowledgment of command /// Acknowledgment of command
Ack, Ack,
/// Metrics response /// Agent data response
Metrics(Vec<Metric>), AgentData(AgentData),
/// Pong response to ping /// Pong response to ping
Pong, Pong,
/// Error response /// Error response
@@ -76,7 +63,7 @@ pub struct MessageEnvelope {
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum MessageType { pub enum MessageType {
Metrics, AgentData,
Command, Command,
CommandResponse, CommandResponse,
CommandOutput, CommandOutput,
@@ -84,10 +71,10 @@ pub enum MessageType {
} }
impl MessageEnvelope { impl MessageEnvelope {
pub fn metrics(message: MetricMessage) -> Result<Self, crate::SharedError> { pub fn agent_data(data: AgentData) -> Result<Self, crate::SharedError> {
Ok(Self { Ok(Self {
message_type: MessageType::Metrics, message_type: MessageType::AgentData,
payload: serde_json::to_vec(&message)?, payload: serde_json::to_vec(&data)?,
}) })
} }
@@ -119,11 +106,11 @@ impl MessageEnvelope {
}) })
} }
pub fn decode_metrics(&self) -> Result<MetricMessage, crate::SharedError> { pub fn decode_agent_data(&self) -> Result<AgentData, crate::SharedError> {
match self.message_type { match self.message_type {
MessageType::Metrics => Ok(serde_json::from_slice(&self.payload)?), MessageType::AgentData => Ok(serde_json::from_slice(&self.payload)?),
_ => Err(crate::SharedError::Protocol { _ => Err(crate::SharedError::Protocol {
message: "Expected metrics message".to_string(), message: "Expected agent data message".to_string(),
}), }),
} }
} }