Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| adf3b0f51c | |||
| 41ded0170c | |||
| 9b4191b2c3 | |||
| 53dbb43352 | |||
| ba03623110 | |||
| f24c4ed650 | |||
| 86501fd486 | |||
| 192eea6e0c | |||
| 43fb838c9b |
113
CLAUDE.md
113
CLAUDE.md
@@ -59,11 +59,85 @@ hostname2 = [
|
|||||||
|
|
||||||
## Core Architecture Principles
|
## Core Architecture Principles
|
||||||
|
|
||||||
### Individual Metrics Philosophy
|
### Structured Data Architecture (Planned Migration)
|
||||||
- Agent collects individual metrics, dashboard composes widgets
|
Current system uses string-based metrics with complex parsing. Planning migration to structured JSON data to eliminate fragile string manipulation.
|
||||||
- Each metric collected, transmitted, and stored individually
|
|
||||||
- Agent calculates status for each metric using thresholds
|
**Current (String Metrics):**
|
||||||
- Dashboard aggregates individual metric statuses for widget status
|
- Agent sends individual metrics with string names like `disk_nvme0n1_temperature`
|
||||||
|
- Dashboard parses metric names with underscore counting and string splitting
|
||||||
|
- Complex and error-prone metric filtering and extraction logic
|
||||||
|
|
||||||
|
**Target (Structured Data):**
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"hostname": "cmbox",
|
||||||
|
"agent_version": "v0.1.130",
|
||||||
|
"timestamp": 1763926877,
|
||||||
|
"system": {
|
||||||
|
"cpu": {
|
||||||
|
"load_1min": 3.50,
|
||||||
|
"load_5min": 3.57,
|
||||||
|
"load_15min": 3.58,
|
||||||
|
"frequency_mhz": 1500,
|
||||||
|
"temperature_celsius": 45.2
|
||||||
|
},
|
||||||
|
"memory": {
|
||||||
|
"usage_percent": 25.0,
|
||||||
|
"total_gb": 23.3,
|
||||||
|
"used_gb": 5.9,
|
||||||
|
"swap_total_gb": 10.7,
|
||||||
|
"swap_used_gb": 0.99,
|
||||||
|
"tmpfs": [
|
||||||
|
{"mount": "/tmp", "usage_percent": 15.0, "used_gb": 0.3, "total_gb": 2.0}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"storage": {
|
||||||
|
"drives": [
|
||||||
|
{
|
||||||
|
"name": "nvme0n1",
|
||||||
|
"health": "PASSED",
|
||||||
|
"temperature_celsius": 29.0,
|
||||||
|
"wear_percent": 1.0,
|
||||||
|
"filesystems": [
|
||||||
|
{"mount": "/", "usage_percent": 24.0, "used_gb": 224.9, "total_gb": 928.2}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"pools": [
|
||||||
|
{
|
||||||
|
"name": "srv_media",
|
||||||
|
"mount": "/srv/media",
|
||||||
|
"type": "mergerfs",
|
||||||
|
"health": "healthy",
|
||||||
|
"usage_percent": 63.0,
|
||||||
|
"used_gb": 2355.2,
|
||||||
|
"total_gb": 3686.4,
|
||||||
|
"data_drives": [
|
||||||
|
{"name": "sdb", "temperature_celsius": 24.0}
|
||||||
|
],
|
||||||
|
"parity_drives": [
|
||||||
|
{"name": "sdc", "temperature_celsius": 24.0}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"services": [
|
||||||
|
{"name": "sshd", "status": "active", "memory_mb": 4.5, "disk_gb": 0.0}
|
||||||
|
],
|
||||||
|
"backup": {
|
||||||
|
"status": "completed",
|
||||||
|
"last_run": 1763920000,
|
||||||
|
"next_scheduled": 1764006400,
|
||||||
|
"total_size_gb": 150.5,
|
||||||
|
"repository_health": "ok"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
- Agent sends structured JSON over ZMQ
|
||||||
|
- Dashboard accesses data directly: `data.system.storage.drives[0].temperature_celsius`
|
||||||
|
- Type safety eliminates all parsing bugs
|
||||||
|
|
||||||
|
|
||||||
### Maintenance Mode
|
### Maintenance Mode
|
||||||
- Agent checks for `/tmp/cm-maintenance` file before sending notifications
|
- Agent checks for `/tmp/cm-maintenance` file before sending notifications
|
||||||
@@ -293,12 +367,33 @@ Keep responses concise and focused. Avoid extensive implementation summaries unl
|
|||||||
- ✅ "Restructure storage widget with improved layout"
|
- ✅ "Restructure storage widget with improved layout"
|
||||||
- ✅ "Update CPU thresholds to production values"
|
- ✅ "Update CPU thresholds to production values"
|
||||||
|
|
||||||
|
## Planned Architecture Migration
|
||||||
|
|
||||||
|
### Phase 1: Structured Data Types (Shared Crate)
|
||||||
|
- Create Rust structs matching target JSON structure
|
||||||
|
- Replace `Metric` enum with typed data structures
|
||||||
|
- Add serde serialization/deserialization
|
||||||
|
|
||||||
|
### Phase 2: Agent Refactor
|
||||||
|
- Update collectors to return typed structs instead of `Vec<Metric>`
|
||||||
|
- Remove string metric name generation
|
||||||
|
- Send structured JSON over ZMQ
|
||||||
|
|
||||||
|
### Phase 3: Dashboard Refactor
|
||||||
|
- Replace metric parsing logic with direct field access
|
||||||
|
- Remove `extract_pool_name()`, `extract_drive_name()`, underscore counting
|
||||||
|
- Widgets access `data.system.storage.drives[0].temperature_celsius`
|
||||||
|
|
||||||
|
### Phase 4: Migration & Cleanup
|
||||||
|
- Support both formats during transition
|
||||||
|
- Gradual rollout with backward compatibility
|
||||||
|
- Remove legacy string metric system
|
||||||
|
|
||||||
## Implementation Rules
|
## Implementation Rules
|
||||||
|
|
||||||
1. **Individual Metrics**: Each metric is collected, transmitted, and stored individually
|
1. **Agent Status Authority**: Agent calculates status for each metric using thresholds
|
||||||
2. **Agent Status Authority**: Agent calculates status for each metric using thresholds
|
2. **Dashboard Composition**: Dashboard widgets subscribe to specific metrics by name
|
||||||
3. **Dashboard Composition**: Dashboard widgets subscribe to specific metrics by name
|
3. **Status Aggregation**: Dashboard aggregates individual metric statuses for widget status
|
||||||
4. **Status Aggregation**: Dashboard aggregates individual metric statuses for widget status
|
|
||||||
|
|
||||||
**NEVER:**
|
**NEVER:**
|
||||||
- Copy/paste ANY code from legacy implementations
|
- Copy/paste ANY code from legacy implementations
|
||||||
|
|||||||
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -279,7 +279,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cm-dashboard"
|
name = "cm-dashboard"
|
||||||
version = "0.1.122"
|
version = "0.1.130"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"chrono",
|
"chrono",
|
||||||
@@ -301,7 +301,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cm-dashboard-agent"
|
name = "cm-dashboard-agent"
|
||||||
version = "0.1.122"
|
version = "0.1.130"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@@ -324,7 +324,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cm-dashboard-shared"
|
name = "cm-dashboard-shared"
|
||||||
version = "0.1.122"
|
version = "0.1.130"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"chrono",
|
"chrono",
|
||||||
"serde",
|
"serde",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "cm-dashboard-agent"
|
name = "cm-dashboard-agent"
|
||||||
version = "0.1.122"
|
version = "0.1.131"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ use crate::metrics::MetricCollectionManager;
|
|||||||
use crate::notifications::NotificationManager;
|
use crate::notifications::NotificationManager;
|
||||||
use crate::service_tracker::UserStoppedServiceTracker;
|
use crate::service_tracker::UserStoppedServiceTracker;
|
||||||
use crate::status::HostStatusManager;
|
use crate::status::HostStatusManager;
|
||||||
use cm_dashboard_shared::{Metric, MetricMessage, MetricValue, Status};
|
use cm_dashboard_shared::{AgentData, Metric, MetricValue, Status, TmpfsData, DriveData, FilesystemData, ServiceData};
|
||||||
|
|
||||||
pub struct Agent {
|
pub struct Agent {
|
||||||
hostname: String,
|
hostname: String,
|
||||||
@@ -199,16 +199,310 @@ impl Agent {
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("Broadcasting {} cached metrics (including host status summary)", metrics.len());
|
debug!("Broadcasting {} cached metrics as structured data", metrics.len());
|
||||||
|
|
||||||
// Create and send message with all current data
|
// Convert metrics to structured data and send
|
||||||
let message = MetricMessage::new(self.hostname.clone(), metrics);
|
let agent_data = self.metrics_to_structured_data(&metrics)?;
|
||||||
self.zmq_handler.publish_metrics(&message).await?;
|
self.zmq_handler.publish_agent_data(&agent_data).await?;
|
||||||
|
|
||||||
debug!("Metrics broadcasted successfully");
|
debug!("Structured data broadcasted successfully");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Convert legacy metrics to structured data format
|
||||||
|
fn metrics_to_structured_data(&self, metrics: &[Metric]) -> Result<AgentData> {
|
||||||
|
let mut agent_data = AgentData::new(self.hostname.clone(), self.get_agent_version());
|
||||||
|
|
||||||
|
// Parse metrics into structured data
|
||||||
|
for metric in metrics {
|
||||||
|
self.parse_metric_into_agent_data(&mut agent_data, metric)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(agent_data)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse a single metric into the appropriate structured data field
|
||||||
|
fn parse_metric_into_agent_data(&self, agent_data: &mut AgentData, metric: &Metric) -> Result<()> {
|
||||||
|
// CPU metrics
|
||||||
|
if metric.name == "cpu_load_1min" {
|
||||||
|
if let Some(value) = metric.value.as_f32() {
|
||||||
|
agent_data.system.cpu.load_1min = value;
|
||||||
|
}
|
||||||
|
} else if metric.name == "cpu_load_5min" {
|
||||||
|
if let Some(value) = metric.value.as_f32() {
|
||||||
|
agent_data.system.cpu.load_5min = value;
|
||||||
|
}
|
||||||
|
} else if metric.name == "cpu_load_15min" {
|
||||||
|
if let Some(value) = metric.value.as_f32() {
|
||||||
|
agent_data.system.cpu.load_15min = value;
|
||||||
|
}
|
||||||
|
} else if metric.name == "cpu_frequency_mhz" {
|
||||||
|
if let Some(value) = metric.value.as_f32() {
|
||||||
|
agent_data.system.cpu.frequency_mhz = value;
|
||||||
|
}
|
||||||
|
} else if metric.name == "cpu_temperature_celsius" {
|
||||||
|
if let Some(value) = metric.value.as_f32() {
|
||||||
|
agent_data.system.cpu.temperature_celsius = Some(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Memory metrics
|
||||||
|
else if metric.name == "memory_usage_percent" {
|
||||||
|
if let Some(value) = metric.value.as_f32() {
|
||||||
|
agent_data.system.memory.usage_percent = value;
|
||||||
|
}
|
||||||
|
} else if metric.name == "memory_total_gb" {
|
||||||
|
if let Some(value) = metric.value.as_f32() {
|
||||||
|
agent_data.system.memory.total_gb = value;
|
||||||
|
}
|
||||||
|
} else if metric.name == "memory_used_gb" {
|
||||||
|
if let Some(value) = metric.value.as_f32() {
|
||||||
|
agent_data.system.memory.used_gb = value;
|
||||||
|
}
|
||||||
|
} else if metric.name == "memory_available_gb" {
|
||||||
|
if let Some(value) = metric.value.as_f32() {
|
||||||
|
agent_data.system.memory.available_gb = value;
|
||||||
|
}
|
||||||
|
} else if metric.name == "memory_swap_total_gb" {
|
||||||
|
if let Some(value) = metric.value.as_f32() {
|
||||||
|
agent_data.system.memory.swap_total_gb = value;
|
||||||
|
}
|
||||||
|
} else if metric.name == "memory_swap_used_gb" {
|
||||||
|
if let Some(value) = metric.value.as_f32() {
|
||||||
|
agent_data.system.memory.swap_used_gb = value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Tmpfs metrics
|
||||||
|
else if metric.name.starts_with("memory_tmp_") {
|
||||||
|
// For now, create a single /tmp tmpfs entry
|
||||||
|
if metric.name == "memory_tmp_usage_percent" {
|
||||||
|
if let Some(value) = metric.value.as_f32() {
|
||||||
|
if let Some(tmpfs) = agent_data.system.memory.tmpfs.get_mut(0) {
|
||||||
|
tmpfs.usage_percent = value;
|
||||||
|
} else {
|
||||||
|
agent_data.system.memory.tmpfs.push(TmpfsData {
|
||||||
|
mount: "/tmp".to_string(),
|
||||||
|
usage_percent: value,
|
||||||
|
used_gb: 0.0,
|
||||||
|
total_gb: 0.0,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if metric.name == "memory_tmp_used_gb" {
|
||||||
|
if let Some(value) = metric.value.as_f32() {
|
||||||
|
if let Some(tmpfs) = agent_data.system.memory.tmpfs.get_mut(0) {
|
||||||
|
tmpfs.used_gb = value;
|
||||||
|
} else {
|
||||||
|
agent_data.system.memory.tmpfs.push(TmpfsData {
|
||||||
|
mount: "/tmp".to_string(),
|
||||||
|
usage_percent: 0.0,
|
||||||
|
used_gb: value,
|
||||||
|
total_gb: 0.0,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if metric.name == "memory_tmp_total_gb" {
|
||||||
|
if let Some(value) = metric.value.as_f32() {
|
||||||
|
if let Some(tmpfs) = agent_data.system.memory.tmpfs.get_mut(0) {
|
||||||
|
tmpfs.total_gb = value;
|
||||||
|
} else {
|
||||||
|
agent_data.system.memory.tmpfs.push(TmpfsData {
|
||||||
|
mount: "/tmp".to_string(),
|
||||||
|
usage_percent: 0.0,
|
||||||
|
used_gb: 0.0,
|
||||||
|
total_gb: value,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Storage metrics
|
||||||
|
else if metric.name.starts_with("disk_") {
|
||||||
|
if metric.name.contains("_temperature") {
|
||||||
|
if let Some(drive_name) = self.extract_drive_name(&metric.name) {
|
||||||
|
if let Some(temp) = metric.value.as_f32() {
|
||||||
|
self.ensure_drive_exists(agent_data, &drive_name);
|
||||||
|
if let Some(drive) = agent_data.system.storage.drives.iter_mut().find(|d| d.name == drive_name) {
|
||||||
|
drive.temperature_celsius = Some(temp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if metric.name.contains("_wear_percent") {
|
||||||
|
if let Some(drive_name) = self.extract_drive_name(&metric.name) {
|
||||||
|
if let Some(wear) = metric.value.as_f32() {
|
||||||
|
self.ensure_drive_exists(agent_data, &drive_name);
|
||||||
|
if let Some(drive) = agent_data.system.storage.drives.iter_mut().find(|d| d.name == drive_name) {
|
||||||
|
drive.wear_percent = Some(wear);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if metric.name.contains("_health") {
|
||||||
|
if let Some(drive_name) = self.extract_drive_name(&metric.name) {
|
||||||
|
let health = metric.value.as_string();
|
||||||
|
self.ensure_drive_exists(agent_data, &drive_name);
|
||||||
|
if let Some(drive) = agent_data.system.storage.drives.iter_mut().find(|d| d.name == drive_name) {
|
||||||
|
drive.health = health;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if metric.name.contains("_fs_") {
|
||||||
|
// Filesystem metrics: disk_{pool}_fs_{filesystem}_{metric}
|
||||||
|
if let Some((pool_name, fs_name)) = self.extract_pool_and_filesystem(&metric.name) {
|
||||||
|
if metric.name.contains("_usage_percent") {
|
||||||
|
if let Some(usage) = metric.value.as_f32() {
|
||||||
|
self.ensure_filesystem_exists(agent_data, &pool_name, &fs_name, usage, 0.0, 0.0);
|
||||||
|
}
|
||||||
|
} else if metric.name.contains("_used_gb") {
|
||||||
|
if let Some(used) = metric.value.as_f32() {
|
||||||
|
self.update_filesystem_field(agent_data, &pool_name, &fs_name, |fs| fs.used_gb = used);
|
||||||
|
}
|
||||||
|
} else if metric.name.contains("_total_gb") {
|
||||||
|
if let Some(total) = metric.value.as_f32() {
|
||||||
|
self.update_filesystem_field(agent_data, &pool_name, &fs_name, |fs| fs.total_gb = total);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Service metrics
|
||||||
|
else if metric.name.starts_with("service_") {
|
||||||
|
if let Some(service_name) = self.extract_service_name(&metric.name) {
|
||||||
|
if metric.name.contains("_status") {
|
||||||
|
let status = metric.value.as_string();
|
||||||
|
self.ensure_service_exists(agent_data, &service_name, &status);
|
||||||
|
} else if metric.name.contains("_memory_mb") {
|
||||||
|
if let Some(memory) = metric.value.as_f32() {
|
||||||
|
self.update_service_field(agent_data, &service_name, |svc| svc.memory_mb = memory);
|
||||||
|
}
|
||||||
|
} else if metric.name.contains("_disk_gb") {
|
||||||
|
if let Some(disk) = metric.value.as_f32() {
|
||||||
|
self.update_service_field(agent_data, &service_name, |svc| svc.disk_gb = disk);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Backup metrics
|
||||||
|
else if metric.name.starts_with("backup_") {
|
||||||
|
if metric.name == "backup_status" {
|
||||||
|
agent_data.backup.status = metric.value.as_string();
|
||||||
|
} else if metric.name == "backup_last_run_timestamp" {
|
||||||
|
if let Some(timestamp) = metric.value.as_i64() {
|
||||||
|
agent_data.backup.last_run = Some(timestamp as u64);
|
||||||
|
}
|
||||||
|
} else if metric.name == "backup_next_scheduled_timestamp" {
|
||||||
|
if let Some(timestamp) = metric.value.as_i64() {
|
||||||
|
agent_data.backup.next_scheduled = Some(timestamp as u64);
|
||||||
|
}
|
||||||
|
} else if metric.name == "backup_size_gb" {
|
||||||
|
if let Some(size) = metric.value.as_f32() {
|
||||||
|
agent_data.backup.total_size_gb = Some(size);
|
||||||
|
}
|
||||||
|
} else if metric.name == "backup_repository_health" {
|
||||||
|
agent_data.backup.repository_health = Some(metric.value.as_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extract drive name from metric like "disk_nvme0n1_temperature"
|
||||||
|
fn extract_drive_name(&self, metric_name: &str) -> Option<String> {
|
||||||
|
if metric_name.starts_with("disk_") {
|
||||||
|
let suffixes = ["_temperature", "_wear_percent", "_health"];
|
||||||
|
for suffix in suffixes {
|
||||||
|
if let Some(suffix_pos) = metric_name.rfind(suffix) {
|
||||||
|
return Some(metric_name[5..suffix_pos].to_string()); // Skip "disk_"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extract pool and filesystem from "disk_{pool}_fs_{filesystem}_{metric}"
|
||||||
|
fn extract_pool_and_filesystem(&self, metric_name: &str) -> Option<(String, String)> {
|
||||||
|
if let Some(fs_pos) = metric_name.find("_fs_") {
|
||||||
|
let pool_name = metric_name[5..fs_pos].to_string(); // Skip "disk_"
|
||||||
|
let after_fs = &metric_name[fs_pos + 4..]; // Skip "_fs_"
|
||||||
|
if let Some(metric_pos) = after_fs.find('_') {
|
||||||
|
let fs_name = after_fs[..metric_pos].to_string();
|
||||||
|
return Some((pool_name, fs_name));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extract service name from "service_{name}_{metric}"
|
||||||
|
fn extract_service_name(&self, metric_name: &str) -> Option<String> {
|
||||||
|
if metric_name.starts_with("service_") {
|
||||||
|
let suffixes = ["_status", "_memory_mb", "_disk_gb"];
|
||||||
|
for suffix in suffixes {
|
||||||
|
if let Some(suffix_pos) = metric_name.rfind(suffix) {
|
||||||
|
return Some(metric_name[8..suffix_pos].to_string()); // Skip "service_"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Ensure drive exists in agent_data
|
||||||
|
fn ensure_drive_exists(&self, agent_data: &mut AgentData, drive_name: &str) {
|
||||||
|
if !agent_data.system.storage.drives.iter().any(|d| d.name == drive_name) {
|
||||||
|
agent_data.system.storage.drives.push(DriveData {
|
||||||
|
name: drive_name.to_string(),
|
||||||
|
health: "UNKNOWN".to_string(),
|
||||||
|
temperature_celsius: None,
|
||||||
|
wear_percent: None,
|
||||||
|
filesystems: Vec::new(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Ensure filesystem exists in the correct drive
|
||||||
|
fn ensure_filesystem_exists(&self, agent_data: &mut AgentData, pool_name: &str, fs_name: &str, usage_percent: f32, used_gb: f32, total_gb: f32) {
|
||||||
|
self.ensure_drive_exists(agent_data, pool_name);
|
||||||
|
if let Some(drive) = agent_data.system.storage.drives.iter_mut().find(|d| d.name == pool_name) {
|
||||||
|
if !drive.filesystems.iter().any(|fs| fs.mount == fs_name) {
|
||||||
|
drive.filesystems.push(FilesystemData {
|
||||||
|
mount: fs_name.to_string(),
|
||||||
|
usage_percent,
|
||||||
|
used_gb,
|
||||||
|
total_gb,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Update filesystem field
|
||||||
|
fn update_filesystem_field<F>(&self, agent_data: &mut AgentData, pool_name: &str, fs_name: &str, update_fn: F)
|
||||||
|
where F: FnOnce(&mut FilesystemData) {
|
||||||
|
if let Some(drive) = agent_data.system.storage.drives.iter_mut().find(|d| d.name == pool_name) {
|
||||||
|
if let Some(fs) = drive.filesystems.iter_mut().find(|fs| fs.mount == fs_name) {
|
||||||
|
update_fn(fs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Ensure service exists
|
||||||
|
fn ensure_service_exists(&self, agent_data: &mut AgentData, service_name: &str, status: &str) {
|
||||||
|
if !agent_data.services.iter().any(|s| s.name == service_name) {
|
||||||
|
agent_data.services.push(ServiceData {
|
||||||
|
name: service_name.to_string(),
|
||||||
|
status: status.to_string(),
|
||||||
|
memory_mb: 0.0,
|
||||||
|
disk_gb: 0.0,
|
||||||
|
user_stopped: false, // TODO: Get from service tracker
|
||||||
|
});
|
||||||
|
} else if let Some(service) = agent_data.services.iter_mut().find(|s| s.name == service_name) {
|
||||||
|
service.status = status.to_string();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Update service field
|
||||||
|
fn update_service_field<F>(&self, agent_data: &mut AgentData, service_name: &str, update_fn: F)
|
||||||
|
where F: FnOnce(&mut ServiceData) {
|
||||||
|
if let Some(service) = agent_data.services.iter_mut().find(|s| s.name == service_name) {
|
||||||
|
update_fn(service);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn process_metrics(&mut self, metrics: &[Metric]) -> bool {
|
async fn process_metrics(&mut self, metrics: &[Metric]) -> bool {
|
||||||
let mut status_changed = false;
|
let mut status_changed = false;
|
||||||
for metric in metrics {
|
for metric in metrics {
|
||||||
@@ -261,13 +555,11 @@ impl Agent {
|
|||||||
|
|
||||||
/// Send standalone heartbeat for connectivity detection
|
/// Send standalone heartbeat for connectivity detection
|
||||||
async fn send_heartbeat(&mut self) -> Result<()> {
|
async fn send_heartbeat(&mut self) -> Result<()> {
|
||||||
let heartbeat_metric = self.get_heartbeat_metric();
|
// Create minimal agent data with just heartbeat
|
||||||
let message = MetricMessage::new(
|
let agent_data = AgentData::new(self.hostname.clone(), self.get_agent_version());
|
||||||
self.hostname.clone(),
|
// Heartbeat timestamp is already set in AgentData::new()
|
||||||
vec![heartbeat_metric],
|
|
||||||
);
|
self.zmq_handler.publish_agent_data(&agent_data).await?;
|
||||||
|
|
||||||
self.zmq_handler.publish_metrics(&message).await?;
|
|
||||||
debug!("Sent standalone heartbeat for connectivity detection");
|
debug!("Sent standalone heartbeat for connectivity detection");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -210,7 +210,7 @@ impl DiskCollector {
|
|||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// Convert numeric references to actual mount points if needed
|
// Convert numeric references to actual mount points if needed
|
||||||
let member_paths = if raw_paths.iter().any(|path| !path.starts_with('/')) {
|
let mut member_paths = if raw_paths.iter().any(|path| !path.starts_with('/')) {
|
||||||
// Handle numeric format like "1:2" by finding corresponding /mnt/disk* paths
|
// Handle numeric format like "1:2" by finding corresponding /mnt/disk* paths
|
||||||
self.resolve_numeric_mergerfs_paths(&raw_paths)?
|
self.resolve_numeric_mergerfs_paths(&raw_paths)?
|
||||||
} else {
|
} else {
|
||||||
@@ -218,6 +218,10 @@ impl DiskCollector {
|
|||||||
raw_paths
|
raw_paths
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// For SnapRAID setups, include parity drives that are related to this pool's data drives
|
||||||
|
let related_parity_paths = self.discover_related_parity_drives(&member_paths)?;
|
||||||
|
member_paths.extend(related_parity_paths);
|
||||||
|
|
||||||
// Categorize as data vs parity drives
|
// Categorize as data vs parity drives
|
||||||
let (data_drives, parity_drives) = match self.categorize_pool_drives(&member_paths) {
|
let (data_drives, parity_drives) = match self.categorize_pool_drives(&member_paths) {
|
||||||
Ok(drives) => drives,
|
Ok(drives) => drives,
|
||||||
@@ -240,6 +244,38 @@ impl DiskCollector {
|
|||||||
Ok(pools)
|
Ok(pools)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Discover parity drives that are related to the given data drives
|
||||||
|
fn discover_related_parity_drives(&self, data_drives: &[String]) -> Result<Vec<String>> {
|
||||||
|
let mount_devices = self.get_mount_devices()?;
|
||||||
|
let mut related_parity = Vec::new();
|
||||||
|
|
||||||
|
// Find parity drives that share the same parent directory as the data drives
|
||||||
|
for data_path in data_drives {
|
||||||
|
if let Some(parent_dir) = self.get_parent_directory(data_path) {
|
||||||
|
// Look for parity drives in the same parent directory
|
||||||
|
for (mount_point, _device) in &mount_devices {
|
||||||
|
if mount_point.contains("parity") && mount_point.starts_with(&parent_dir) {
|
||||||
|
if !related_parity.contains(mount_point) {
|
||||||
|
related_parity.push(mount_point.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(related_parity)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get parent directory of a mount path (e.g., "/mnt/disk1" -> "/mnt")
|
||||||
|
fn get_parent_directory(&self, path: &str) -> Option<String> {
|
||||||
|
if let Some(last_slash) = path.rfind('/') {
|
||||||
|
if last_slash > 0 {
|
||||||
|
return Some(path[..last_slash].to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
/// Categorize pool member drives as data vs parity
|
/// Categorize pool member drives as data vs parity
|
||||||
fn categorize_pool_drives(&self, member_paths: &[String]) -> Result<(Vec<DriveInfo>, Vec<DriveInfo>)> {
|
fn categorize_pool_drives(&self, member_paths: &[String]) -> Result<(Vec<DriveInfo>, Vec<DriveInfo>)> {
|
||||||
let mut data_drives = Vec::new();
|
let mut data_drives = Vec::new();
|
||||||
@@ -463,6 +499,17 @@ impl DiskCollector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// NVMe format: "Temperature:" (capital T)
|
||||||
|
if line.contains("Temperature:") {
|
||||||
|
if let Some(temp_part) = line.split("Temperature:").nth(1) {
|
||||||
|
if let Some(temp_str) = temp_part.split_whitespace().next() {
|
||||||
|
if let Ok(temp) = temp_str.parse::<f32>() {
|
||||||
|
return Some(temp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Legacy format: "temperature:" (lowercase)
|
||||||
if line.contains("temperature:") {
|
if line.contains("temperature:") {
|
||||||
if let Some(temp_part) = line.split("temperature:").nth(1) {
|
if let Some(temp_part) = line.split("temperature:").nth(1) {
|
||||||
if let Some(temp_str) = temp_part.split_whitespace().next() {
|
if let Some(temp_str) = temp_part.split_whitespace().next() {
|
||||||
@@ -891,12 +938,12 @@ impl DiskCollector {
|
|||||||
});
|
});
|
||||||
|
|
||||||
// Individual drive metrics
|
// Individual drive metrics
|
||||||
for (i, drive) in pool.data_drives.iter().enumerate() {
|
for drive in &pool.data_drives {
|
||||||
self.generate_pool_drive_metrics(metrics, &pool_name, &format!("data_{}", i), drive, timestamp, status_tracker);
|
self.generate_pool_drive_metrics(metrics, &pool_name, &drive.device, drive, timestamp, status_tracker);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (i, drive) in pool.parity_drives.iter().enumerate() {
|
for drive in &pool.parity_drives {
|
||||||
self.generate_pool_drive_metrics(metrics, &pool_name, &format!("parity_{}", i), drive, timestamp, status_tracker);
|
self.generate_pool_drive_metrics(metrics, &pool_name, &drive.device, drive, timestamp, status_tracker);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use cm_dashboard_shared::{MessageEnvelope, MetricMessage};
|
use cm_dashboard_shared::{AgentData, MessageEnvelope};
|
||||||
use tracing::{debug, info};
|
use tracing::{debug, info};
|
||||||
use zmq::{Context, Socket, SocketType};
|
use zmq::{Context, Socket, SocketType};
|
||||||
|
|
||||||
@@ -43,17 +43,17 @@ impl ZmqHandler {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Publish metrics message via ZMQ
|
|
||||||
pub async fn publish_metrics(&self, message: &MetricMessage) -> Result<()> {
|
/// Publish agent data via ZMQ
|
||||||
|
pub async fn publish_agent_data(&self, data: &AgentData) -> Result<()> {
|
||||||
debug!(
|
debug!(
|
||||||
"Publishing {} metrics for host {}",
|
"Publishing agent data for host {}",
|
||||||
message.metrics.len(),
|
data.hostname
|
||||||
message.hostname
|
|
||||||
);
|
);
|
||||||
|
|
||||||
// Create message envelope
|
// Create message envelope for agent data
|
||||||
let envelope = MessageEnvelope::metrics(message.clone())
|
let envelope = MessageEnvelope::agent_data(data.clone())
|
||||||
.map_err(|e| anyhow::anyhow!("Failed to create message envelope: {}", e))?;
|
.map_err(|e| anyhow::anyhow!("Failed to create agent data envelope: {}", e))?;
|
||||||
|
|
||||||
// Serialize envelope
|
// Serialize envelope
|
||||||
let serialized = serde_json::to_vec(&envelope)?;
|
let serialized = serde_json::to_vec(&envelope)?;
|
||||||
@@ -61,11 +61,10 @@ impl ZmqHandler {
|
|||||||
// Send via ZMQ
|
// Send via ZMQ
|
||||||
self.publisher.send(&serialized, 0)?;
|
self.publisher.send(&serialized, 0)?;
|
||||||
|
|
||||||
debug!("Published metrics message ({} bytes)", serialized.len());
|
debug!("Published agent data message ({} bytes)", serialized.len());
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// Try to receive a command (non-blocking)
|
/// Try to receive a command (non-blocking)
|
||||||
pub fn try_receive_command(&self) -> Result<Option<AgentCommand>> {
|
pub fn try_receive_command(&self) -> Result<Option<AgentCommand>> {
|
||||||
match self.command_receiver.recv_bytes(zmq::DONTWAIT) {
|
match self.command_receiver.recv_bytes(zmq::DONTWAIT) {
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "cm-dashboard"
|
name = "cm-dashboard"
|
||||||
version = "0.1.122"
|
version = "0.1.131"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|||||||
@@ -20,12 +20,13 @@ pub struct Dashboard {
|
|||||||
tui_app: Option<TuiApp>,
|
tui_app: Option<TuiApp>,
|
||||||
terminal: Option<Terminal<CrosstermBackend<io::Stdout>>>,
|
terminal: Option<Terminal<CrosstermBackend<io::Stdout>>>,
|
||||||
headless: bool,
|
headless: bool,
|
||||||
|
raw_data: bool,
|
||||||
initial_commands_sent: std::collections::HashSet<String>,
|
initial_commands_sent: std::collections::HashSet<String>,
|
||||||
config: DashboardConfig,
|
config: DashboardConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Dashboard {
|
impl Dashboard {
|
||||||
pub async fn new(config_path: Option<String>, headless: bool) -> Result<Self> {
|
pub async fn new(config_path: Option<String>, headless: bool, raw_data: bool) -> Result<Self> {
|
||||||
info!("Initializing dashboard");
|
info!("Initializing dashboard");
|
||||||
|
|
||||||
// Load configuration - try default path if not specified
|
// Load configuration - try default path if not specified
|
||||||
@@ -119,6 +120,7 @@ impl Dashboard {
|
|||||||
tui_app,
|
tui_app,
|
||||||
terminal,
|
terminal,
|
||||||
headless,
|
headless,
|
||||||
|
raw_data,
|
||||||
initial_commands_sent: std::collections::HashSet::new(),
|
initial_commands_sent: std::collections::HashSet::new(),
|
||||||
config,
|
config,
|
||||||
})
|
})
|
||||||
@@ -183,30 +185,35 @@ impl Dashboard {
|
|||||||
|
|
||||||
// Check for new metrics
|
// Check for new metrics
|
||||||
if last_metrics_check.elapsed() >= metrics_check_interval {
|
if last_metrics_check.elapsed() >= metrics_check_interval {
|
||||||
if let Ok(Some(metric_message)) = self.zmq_consumer.receive_metrics().await {
|
if let Ok(Some(agent_data)) = self.zmq_consumer.receive_agent_data().await {
|
||||||
debug!(
|
debug!(
|
||||||
"Received metrics from {}: {} metrics",
|
"Received agent data from {}",
|
||||||
metric_message.hostname,
|
agent_data.hostname
|
||||||
metric_message.metrics.len()
|
|
||||||
);
|
);
|
||||||
|
|
||||||
// Track first contact with host (no command needed - agent sends data every 2s)
|
// Track first contact with host (no command needed - agent sends data every 2s)
|
||||||
let is_new_host = !self
|
let is_new_host = !self
|
||||||
.initial_commands_sent
|
.initial_commands_sent
|
||||||
.contains(&metric_message.hostname);
|
.contains(&agent_data.hostname);
|
||||||
|
|
||||||
if is_new_host {
|
if is_new_host {
|
||||||
info!(
|
info!(
|
||||||
"First contact with host {} - data will update automatically",
|
"First contact with host {} - data will update automatically",
|
||||||
metric_message.hostname
|
agent_data.hostname
|
||||||
);
|
);
|
||||||
self.initial_commands_sent
|
self.initial_commands_sent
|
||||||
.insert(metric_message.hostname.clone());
|
.insert(agent_data.hostname.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update metric store
|
// Show raw data if requested (before processing)
|
||||||
self.metric_store
|
if self.raw_data {
|
||||||
.update_metrics(&metric_message.hostname, metric_message.metrics);
|
println!("RAW AGENT DATA FROM {}:", agent_data.hostname);
|
||||||
|
println!("{}", serde_json::to_string_pretty(&agent_data).unwrap_or_else(|e| format!("Serialization error: {}", e)));
|
||||||
|
println!("{}", "─".repeat(80));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update data store
|
||||||
|
self.metric_store.process_agent_data(agent_data);
|
||||||
|
|
||||||
// Check for agent version mismatches across hosts
|
// Check for agent version mismatches across hosts
|
||||||
if let Some((current_version, outdated_hosts)) = self.metric_store.get_version_mismatches() {
|
if let Some((current_version, outdated_hosts)) = self.metric_store.get_version_mismatches() {
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use cm_dashboard_shared::{CommandOutputMessage, MessageEnvelope, MessageType, MetricMessage};
|
use cm_dashboard_shared::{AgentData, CommandOutputMessage, MessageEnvelope, MessageType};
|
||||||
use tracing::{debug, error, info, warn};
|
use tracing::{debug, error, info, warn};
|
||||||
use zmq::{Context, Socket, SocketType};
|
use zmq::{Context, Socket, SocketType};
|
||||||
|
|
||||||
@@ -117,8 +117,8 @@ impl ZmqConsumer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Receive metrics from any connected agent (non-blocking)
|
/// Receive agent data (non-blocking)
|
||||||
pub async fn receive_metrics(&mut self) -> Result<Option<MetricMessage>> {
|
pub async fn receive_agent_data(&mut self) -> Result<Option<AgentData>> {
|
||||||
match self.subscriber.recv_bytes(zmq::DONTWAIT) {
|
match self.subscriber.recv_bytes(zmq::DONTWAIT) {
|
||||||
Ok(data) => {
|
Ok(data) => {
|
||||||
debug!("Received {} bytes from ZMQ", data.len());
|
debug!("Received {} bytes from ZMQ", data.len());
|
||||||
@@ -129,29 +129,27 @@ impl ZmqConsumer {
|
|||||||
|
|
||||||
// Check message type
|
// Check message type
|
||||||
match envelope.message_type {
|
match envelope.message_type {
|
||||||
MessageType::Metrics => {
|
MessageType::AgentData => {
|
||||||
let metrics = envelope
|
let agent_data = envelope
|
||||||
.decode_metrics()
|
.decode_agent_data()
|
||||||
.map_err(|e| anyhow::anyhow!("Failed to decode metrics: {}", e))?;
|
.map_err(|e| anyhow::anyhow!("Failed to decode agent data: {}", e))?;
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"Received {} metrics from {}",
|
"Received agent data from host {}",
|
||||||
metrics.metrics.len(),
|
agent_data.hostname
|
||||||
metrics.hostname
|
|
||||||
);
|
);
|
||||||
|
Ok(Some(agent_data))
|
||||||
Ok(Some(metrics))
|
|
||||||
}
|
}
|
||||||
MessageType::Heartbeat => {
|
MessageType::Heartbeat => {
|
||||||
debug!("Received heartbeat");
|
debug!("Received heartbeat");
|
||||||
Ok(None) // Don't return heartbeats as metrics
|
Ok(None) // Don't return heartbeats
|
||||||
}
|
}
|
||||||
MessageType::CommandOutput => {
|
MessageType::CommandOutput => {
|
||||||
debug!("Received command output (will be handled by receive_command_output)");
|
debug!("Received command output (will be handled by receive_command_output)");
|
||||||
Ok(None) // Command output handled by separate method
|
Ok(None) // Command output handled by separate method
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
debug!("Received non-metrics message: {:?}", envelope.message_type);
|
debug!("Received unsupported message: {:?}", envelope.message_type);
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -166,5 +164,6 @@ impl ZmqConsumer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -51,6 +51,10 @@ struct Cli {
|
|||||||
/// Run in headless mode (no TUI, just logging)
|
/// Run in headless mode (no TUI, just logging)
|
||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
headless: bool,
|
headless: bool,
|
||||||
|
|
||||||
|
/// Show raw agent data in headless mode
|
||||||
|
#[arg(long)]
|
||||||
|
raw_data: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
@@ -86,7 +90,7 @@ async fn main() -> Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create and run dashboard
|
// Create and run dashboard
|
||||||
let mut dashboard = Dashboard::new(cli.config, cli.headless).await?;
|
let mut dashboard = Dashboard::new(cli.config, cli.headless, cli.raw_data).await?;
|
||||||
|
|
||||||
// Setup graceful shutdown
|
// Setup graceful shutdown
|
||||||
let ctrl_c = async {
|
let ctrl_c = async {
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
use cm_dashboard_shared::Metric;
|
use cm_dashboard_shared::{AgentData, Metric};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use tracing::{debug, info, warn};
|
use tracing::{debug, info, warn};
|
||||||
@@ -76,6 +76,286 @@ impl MetricStore {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Process structured agent data (temporary bridge - converts back to metrics)
|
||||||
|
/// TODO: Replace entire metric system with direct structured data processing
|
||||||
|
pub fn process_agent_data(&mut self, agent_data: AgentData) {
|
||||||
|
let metrics = self.convert_agent_data_to_metrics(&agent_data);
|
||||||
|
self.update_metrics(&agent_data.hostname, metrics);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convert structured agent data to legacy metrics (temporary bridge)
|
||||||
|
fn convert_agent_data_to_metrics(&self, agent_data: &AgentData) -> Vec<Metric> {
|
||||||
|
use cm_dashboard_shared::{Metric, MetricValue, Status};
|
||||||
|
|
||||||
|
let mut metrics = Vec::new();
|
||||||
|
|
||||||
|
// Convert CPU data
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
"cpu_load_1min".to_string(),
|
||||||
|
MetricValue::Float(agent_data.system.cpu.load_1min),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
"cpu_load_5min".to_string(),
|
||||||
|
MetricValue::Float(agent_data.system.cpu.load_5min),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
"cpu_load_15min".to_string(),
|
||||||
|
MetricValue::Float(agent_data.system.cpu.load_15min),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
"cpu_frequency_mhz".to_string(),
|
||||||
|
MetricValue::Float(agent_data.system.cpu.frequency_mhz),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
if let Some(temp) = agent_data.system.cpu.temperature_celsius {
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
"cpu_temperature_celsius".to_string(),
|
||||||
|
MetricValue::Float(temp),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert Memory data
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
"memory_usage_percent".to_string(),
|
||||||
|
MetricValue::Float(agent_data.system.memory.usage_percent),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
"memory_total_gb".to_string(),
|
||||||
|
MetricValue::Float(agent_data.system.memory.total_gb),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
"memory_used_gb".to_string(),
|
||||||
|
MetricValue::Float(agent_data.system.memory.used_gb),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
"memory_available_gb".to_string(),
|
||||||
|
MetricValue::Float(agent_data.system.memory.available_gb),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
"memory_swap_total_gb".to_string(),
|
||||||
|
MetricValue::Float(agent_data.system.memory.swap_total_gb),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
"memory_swap_used_gb".to_string(),
|
||||||
|
MetricValue::Float(agent_data.system.memory.swap_used_gb),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
|
||||||
|
// Convert tmpfs data
|
||||||
|
for tmpfs in &agent_data.system.memory.tmpfs {
|
||||||
|
if tmpfs.mount == "/tmp" {
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
"memory_tmp_usage_percent".to_string(),
|
||||||
|
MetricValue::Float(tmpfs.usage_percent),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
"memory_tmp_used_gb".to_string(),
|
||||||
|
MetricValue::Float(tmpfs.used_gb),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
"memory_tmp_total_gb".to_string(),
|
||||||
|
MetricValue::Float(tmpfs.total_gb),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add agent metadata
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
"agent_version".to_string(),
|
||||||
|
MetricValue::String(agent_data.agent_version.clone()),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
"agent_heartbeat".to_string(),
|
||||||
|
MetricValue::Integer(agent_data.timestamp as i64),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
|
||||||
|
// Convert storage data
|
||||||
|
for drive in &agent_data.system.storage.drives {
|
||||||
|
// Drive-level metrics
|
||||||
|
if let Some(temp) = drive.temperature_celsius {
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
format!("disk_{}_temperature", drive.name),
|
||||||
|
MetricValue::Float(temp),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if let Some(wear) = drive.wear_percent {
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
format!("disk_{}_wear_percent", drive.name),
|
||||||
|
MetricValue::Float(wear),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
format!("disk_{}_health", drive.name),
|
||||||
|
MetricValue::String(drive.health.clone()),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
|
||||||
|
// Filesystem metrics
|
||||||
|
for fs in &drive.filesystems {
|
||||||
|
let fs_base = format!("disk_{}_fs_{}", drive.name, fs.mount.replace('/', "root"));
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
format!("{}_usage_percent", fs_base),
|
||||||
|
MetricValue::Float(fs.usage_percent),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
format!("{}_used_gb", fs_base),
|
||||||
|
MetricValue::Float(fs.used_gb),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
format!("{}_total_gb", fs_base),
|
||||||
|
MetricValue::Float(fs.total_gb),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert storage pools
|
||||||
|
for pool in &agent_data.system.storage.pools {
|
||||||
|
let pool_base = format!("disk_{}", pool.name);
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
format!("{}_usage_percent", pool_base),
|
||||||
|
MetricValue::Float(pool.usage_percent),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
format!("{}_used_gb", pool_base),
|
||||||
|
MetricValue::Float(pool.used_gb),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
format!("{}_total_gb", pool_base),
|
||||||
|
MetricValue::Float(pool.total_gb),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
format!("{}_pool_type", pool_base),
|
||||||
|
MetricValue::String(pool.pool_type.clone()),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
format!("{}_mount_point", pool_base),
|
||||||
|
MetricValue::String(pool.mount.clone()),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
|
||||||
|
// Pool drive data
|
||||||
|
for drive in &pool.data_drives {
|
||||||
|
if let Some(temp) = drive.temperature_celsius {
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
format!("disk_{}_{}_temperature", pool.name, drive.name),
|
||||||
|
MetricValue::Float(temp),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if let Some(wear) = drive.wear_percent {
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
format!("disk_{}_{}_wear_percent", pool.name, drive.name),
|
||||||
|
MetricValue::Float(wear),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for drive in &pool.parity_drives {
|
||||||
|
if let Some(temp) = drive.temperature_celsius {
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
format!("disk_{}_{}_temperature", pool.name, drive.name),
|
||||||
|
MetricValue::Float(temp),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if let Some(wear) = drive.wear_percent {
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
format!("disk_{}_{}_wear_percent", pool.name, drive.name),
|
||||||
|
MetricValue::Float(wear),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert service data
|
||||||
|
for service in &agent_data.services {
|
||||||
|
let service_base = format!("service_{}", service.name);
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
format!("{}_status", service_base),
|
||||||
|
MetricValue::String(service.status.clone()),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
format!("{}_memory_mb", service_base),
|
||||||
|
MetricValue::Float(service.memory_mb),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
format!("{}_disk_gb", service_base),
|
||||||
|
MetricValue::Float(service.disk_gb),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
if service.user_stopped {
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
format!("{}_user_stopped", service_base),
|
||||||
|
MetricValue::Boolean(true),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert backup data
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
"backup_status".to_string(),
|
||||||
|
MetricValue::String(agent_data.backup.status.clone()),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
if let Some(last_run) = agent_data.backup.last_run {
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
"backup_last_run_timestamp".to_string(),
|
||||||
|
MetricValue::Integer(last_run as i64),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if let Some(next_scheduled) = agent_data.backup.next_scheduled {
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
"backup_next_scheduled_timestamp".to_string(),
|
||||||
|
MetricValue::Integer(next_scheduled as i64),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if let Some(size) = agent_data.backup.total_size_gb {
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
"backup_size_gb".to_string(),
|
||||||
|
MetricValue::Float(size),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if let Some(health) = &agent_data.backup.repository_health {
|
||||||
|
metrics.push(Metric::new(
|
||||||
|
"backup_repository_health".to_string(),
|
||||||
|
MetricValue::String(health.clone()),
|
||||||
|
Status::Ok,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics
|
||||||
|
}
|
||||||
|
|
||||||
/// Get current metric for a specific host
|
/// Get current metric for a specific host
|
||||||
pub fn get_metric(&self, hostname: &str, metric_name: &str) -> Option<&Metric> {
|
pub fn get_metric(&self, hostname: &str, metric_name: &str) -> Option<&Metric> {
|
||||||
self.current_metrics.get(hostname)?.get(metric_name)
|
self.current_metrics.get(hostname)?.get(metric_name)
|
||||||
|
|||||||
@@ -146,14 +146,14 @@ impl SystemWidget {
|
|||||||
self.agent_hash.as_ref()
|
self.agent_hash.as_ref()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get mount point for a pool name
|
/// Get default mount point for a pool name (fallback only - should use actual mount_point metrics)
|
||||||
fn get_mount_point_for_pool(&self, pool_name: &str) -> String {
|
fn get_mount_point_for_pool(&self, pool_name: &str) -> String {
|
||||||
match pool_name {
|
// For device names, use the device name directly as display name
|
||||||
"root" => "/".to_string(),
|
if pool_name.starts_with("nvme") || pool_name.starts_with("sd") || pool_name.starts_with("hd") {
|
||||||
"steampool" => "/mnt/steampool".to_string(),
|
pool_name.to_string()
|
||||||
"steampool_1" => "/steampool_1".to_string(),
|
} else {
|
||||||
"steampool_2" => "/steampool_2".to_string(),
|
// For other pools, use the pool name as-is (will be overridden by mount_point metric)
|
||||||
_ => format!("/{}", pool_name), // Default fallback
|
pool_name.to_string()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -208,6 +208,13 @@ impl SystemWidget {
|
|||||||
pool.pool_health = Some(health.clone());
|
pool.pool_health = Some(health.clone());
|
||||||
pool.health_status = metric.status.clone();
|
pool.health_status = metric.status.clone();
|
||||||
}
|
}
|
||||||
|
} else if metric.name.contains("_health") && !metric.name.contains("_pool_health") {
|
||||||
|
// Handle physical drive health metrics (disk_{drive}_health)
|
||||||
|
if let MetricValue::String(health) = &metric.value {
|
||||||
|
// For physical drives, use the drive health as pool health
|
||||||
|
pool.pool_health = Some(health.clone());
|
||||||
|
pool.health_status = metric.status.clone();
|
||||||
|
}
|
||||||
} else if metric.name.contains("_temperature") {
|
} else if metric.name.contains("_temperature") {
|
||||||
if let Some(drive_name) = self.extract_drive_name(&metric.name) {
|
if let Some(drive_name) = self.extract_drive_name(&metric.name) {
|
||||||
// Find existing drive or create new one
|
// Find existing drive or create new one
|
||||||
@@ -225,12 +232,16 @@ impl SystemWidget {
|
|||||||
if let MetricValue::Float(temp) = metric.value {
|
if let MetricValue::Float(temp) = metric.value {
|
||||||
drive.temperature = Some(temp);
|
drive.temperature = Some(temp);
|
||||||
drive.status = metric.status.clone();
|
drive.status = metric.status.clone();
|
||||||
|
// For physical drives, if this is the main drive, also update pool health
|
||||||
|
if drive.name == pool.name && pool.health_status == Status::Unknown {
|
||||||
|
pool.health_status = metric.status.clone();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if metric.name.contains("_wear_percent") {
|
} else if metric.name.contains("_wear_percent") {
|
||||||
if let Some(drive_name) = self.extract_drive_name(&metric.name) {
|
if let Some(drive_name) = self.extract_drive_name(&metric.name) {
|
||||||
// Find existing drive or create new one
|
// For physical drives, ensure we create the drive object
|
||||||
let drive_exists = pool.drives.iter().any(|d| d.name == drive_name);
|
let drive_exists = pool.drives.iter().any(|d| d.name == drive_name);
|
||||||
if !drive_exists {
|
if !drive_exists {
|
||||||
pool.drives.push(StorageDrive {
|
pool.drives.push(StorageDrive {
|
||||||
@@ -245,6 +256,10 @@ impl SystemWidget {
|
|||||||
if let MetricValue::Float(wear) = metric.value {
|
if let MetricValue::Float(wear) = metric.value {
|
||||||
drive.wear_percent = Some(wear);
|
drive.wear_percent = Some(wear);
|
||||||
drive.status = metric.status.clone();
|
drive.status = metric.status.clone();
|
||||||
|
// For physical drives, if this is the main drive, also update pool health
|
||||||
|
if drive.name == pool.name && pool.health_status == Status::Unknown {
|
||||||
|
pool.health_status = metric.status.clone();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -349,52 +364,50 @@ impl SystemWidget {
|
|||||||
// Pattern: disk_{pool_name}_{various suffixes}
|
// Pattern: disk_{pool_name}_{various suffixes}
|
||||||
// Since pool_name can contain underscores, work backwards from known metric suffixes
|
// Since pool_name can contain underscores, work backwards from known metric suffixes
|
||||||
if metric_name.starts_with("disk_") {
|
if metric_name.starts_with("disk_") {
|
||||||
// Handle filesystem metrics: disk_{pool}_fs_{filesystem}_{metric}
|
// Handle filesystem metrics: disk_{pool}_fs_{filesystem}_{metric}
|
||||||
if metric_name.contains("_fs_") {
|
if metric_name.contains("_fs_") {
|
||||||
if let Some(fs_pos) = metric_name.find("_fs_") {
|
if let Some(fs_pos) = metric_name.find("_fs_") {
|
||||||
return Some(metric_name[5..fs_pos].to_string()); // Skip "disk_", extract pool name before "_fs_"
|
return Some(metric_name[5..fs_pos].to_string()); // Skip "disk_", extract pool name before "_fs_"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle pool-level metrics (usage_percent, used_gb, total_gb, mount_point, pool_type, pool_health)
|
// Handle pool-level metrics (usage_percent, used_gb, total_gb, mount_point, pool_type, pool_health)
|
||||||
else if let Some(suffix_pos) = metric_name.rfind("_usage_percent")
|
// Use rfind to get the last occurrence of these suffixes
|
||||||
.or_else(|| metric_name.rfind("_used_gb"))
|
let pool_suffixes = ["_usage_percent", "_used_gb", "_total_gb", "_available_gb", "_mount_point", "_pool_type", "_pool_health"];
|
||||||
.or_else(|| metric_name.rfind("_total_gb"))
|
for suffix in pool_suffixes {
|
||||||
.or_else(|| metric_name.rfind("_available_gb"))
|
if let Some(suffix_pos) = metric_name.rfind(suffix) {
|
||||||
.or_else(|| metric_name.rfind("_mount_point"))
|
return Some(metric_name[5..suffix_pos].to_string()); // Skip "disk_"
|
||||||
.or_else(|| metric_name.rfind("_pool_type"))
|
|
||||||
.or_else(|| metric_name.rfind("_pool_health")) {
|
|
||||||
return Some(metric_name[5..suffix_pos].to_string()); // Skip "disk_"
|
|
||||||
}
|
|
||||||
// Handle drive-specific metrics: disk_{pool}_{drive_role}_{metric} (for mergerfs) or disk_{pool}_{drive}_{metric} (for physical drives)
|
|
||||||
else if let Some(suffix_pos) = metric_name.rfind("_temperature")
|
|
||||||
.or_else(|| metric_name.rfind("_wear_percent"))
|
|
||||||
.or_else(|| metric_name.rfind("_health")) {
|
|
||||||
// For mergerfs pools, metrics look like: disk_srv_media_data_0_temperature or disk_srv_media_parity_0_temperature
|
|
||||||
// We need to extract just "srv_media" as the pool name
|
|
||||||
let before_suffix = &metric_name[..suffix_pos];
|
|
||||||
|
|
||||||
// Check if this looks like a mergerfs drive metric (contains data_ or parity_)
|
|
||||||
if before_suffix.contains("_data_") {
|
|
||||||
if let Some(data_pos) = before_suffix.find("_data_") {
|
|
||||||
return Some(metric_name[5..data_pos].to_string()); // Extract pool name before "_data_"
|
|
||||||
}
|
|
||||||
} else if before_suffix.contains("_parity_") {
|
|
||||||
if let Some(parity_pos) = before_suffix.find("_parity_") {
|
|
||||||
return Some(metric_name[5..parity_pos].to_string()); // Extract pool name before "_parity_"
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// Fallback for physical drive metrics: find the second-to-last underscore
|
|
||||||
if let Some(drive_start) = before_suffix.rfind('_') {
|
// Handle physical drive metrics: disk_{drive}_health, disk_{drive}_wear_percent, and disk_{drive}_temperature
|
||||||
if drive_start > 5 {
|
if (metric_name.ends_with("_health") && !metric_name.contains("_pool_health"))
|
||||||
return Some(metric_name[5..drive_start].to_string()); // Skip "disk_"
|
|| metric_name.ends_with("_wear_percent")
|
||||||
|
|| metric_name.ends_with("_temperature") {
|
||||||
|
// Count underscores to distinguish physical drive metrics (disk_{drive}_metric)
|
||||||
|
// from pool drive metrics (disk_{pool}_{drive}_metric)
|
||||||
|
let underscore_count = metric_name.matches('_').count();
|
||||||
|
// disk_nvme0n1_wear_percent has 3 underscores: disk_nvme0n1_wear_percent
|
||||||
|
if underscore_count == 3 { // disk_{drive}_metric (where drive has underscores)
|
||||||
|
if let Some(suffix_pos) = metric_name.rfind("_health")
|
||||||
|
.or_else(|| metric_name.rfind("_wear_percent"))
|
||||||
|
.or_else(|| metric_name.rfind("_temperature")) {
|
||||||
|
return Some(metric_name[5..suffix_pos].to_string()); // Skip "disk_"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Fallback: extract first component after disk_ prefix
|
|
||||||
else if let Some(captures) = metric_name.strip_prefix("disk_") {
|
// Handle drive-specific metrics: disk_{pool}_{drive}_{metric}
|
||||||
if let Some(pos) = captures.find('_') {
|
let drive_suffixes = ["_temperature", "_health"];
|
||||||
return Some(captures[..pos].to_string());
|
for suffix in drive_suffixes {
|
||||||
|
if let Some(suffix_pos) = metric_name.rfind(suffix) {
|
||||||
|
// Extract pool name by finding the second-to-last underscore
|
||||||
|
let before_suffix = &metric_name[..suffix_pos];
|
||||||
|
if let Some(drive_start) = before_suffix.rfind('_') {
|
||||||
|
if drive_start > 5 {
|
||||||
|
return Some(metric_name[5..drive_start].to_string()); // Skip "disk_"
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -428,25 +441,22 @@ impl SystemWidget {
|
|||||||
|
|
||||||
/// Extract drive name from disk metric name
|
/// Extract drive name from disk metric name
|
||||||
fn extract_drive_name(&self, metric_name: &str) -> Option<String> {
|
fn extract_drive_name(&self, metric_name: &str) -> Option<String> {
|
||||||
// Pattern: disk_{pool_name}_{drive_name}_{metric_type}
|
// Pattern: disk_{pool_name}_{drive_name}_{metric_type} OR disk_{drive_name}_{metric_type}
|
||||||
// For mergerfs: disk_{pool_name}_{data|parity}_{index}_{metric_type}
|
// Pool drives: disk_srv_media_sdb_temperature
|
||||||
// Since pool_name can contain underscores, work backwards from known metric suffixes
|
// Physical drives: disk_nvme0n1_temperature
|
||||||
if metric_name.starts_with("disk_") {
|
if metric_name.starts_with("disk_") {
|
||||||
if let Some(suffix_pos) = metric_name.rfind("_temperature")
|
if let Some(suffix_pos) = metric_name.rfind("_temperature")
|
||||||
.or_else(|| metric_name.rfind("_wear_percent"))
|
.or_else(|| metric_name.rfind("_wear_percent"))
|
||||||
.or_else(|| metric_name.rfind("_health")) {
|
.or_else(|| metric_name.rfind("_health")) {
|
||||||
let before_suffix = &metric_name[..suffix_pos];
|
let before_suffix = &metric_name[..suffix_pos];
|
||||||
|
|
||||||
// For mergerfs drive metrics: extract the role_index part (e.g., "data_0", "parity_1")
|
// Extract the last component as drive name (e.g., "sdb", "sdc", "nvme0n1")
|
||||||
if before_suffix.contains("_data_") || before_suffix.contains("_parity_") {
|
|
||||||
if let Some(role_start) = before_suffix.rfind("_data_").or_else(|| before_suffix.rfind("_parity_")) {
|
|
||||||
return Some(before_suffix[role_start + 1..].to_string()); // e.g., "data_0" or "parity_1"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fallback for physical drive metrics: get the last component
|
|
||||||
if let Some(drive_start) = before_suffix.rfind('_') {
|
if let Some(drive_start) = before_suffix.rfind('_') {
|
||||||
return Some(before_suffix[drive_start + 1..].to_string());
|
return Some(before_suffix[drive_start + 1..].to_string());
|
||||||
|
} else {
|
||||||
|
// Handle physical drive metrics: disk_{drive}_metric (no pool)
|
||||||
|
// Extract everything after "disk_" as the drive name
|
||||||
|
return Some(before_suffix[5..].to_string()); // Skip "disk_"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -459,7 +469,28 @@ impl SystemWidget {
|
|||||||
|
|
||||||
for pool in &self.storage_pools {
|
for pool in &self.storage_pools {
|
||||||
// Pool header line with type and health
|
// Pool header line with type and health
|
||||||
let pool_label = if pool.pool_type == "single" {
|
let pool_label = if pool.pool_type.starts_with("drive (") {
|
||||||
|
// For physical drives, show the drive name with temperature and wear percentage if available
|
||||||
|
// Look for any drive with temp/wear data (physical drives may have drives named after the pool)
|
||||||
|
let temp_opt = pool.drives.iter()
|
||||||
|
.find_map(|d| d.temperature);
|
||||||
|
let wear_opt = pool.drives.iter()
|
||||||
|
.find_map(|d| d.wear_percent);
|
||||||
|
|
||||||
|
let mut drive_info = Vec::new();
|
||||||
|
if let Some(temp) = temp_opt {
|
||||||
|
drive_info.push(format!("T: {:.0}°C", temp));
|
||||||
|
}
|
||||||
|
if let Some(wear) = wear_opt {
|
||||||
|
drive_info.push(format!("W: {:.0}%", wear));
|
||||||
|
}
|
||||||
|
|
||||||
|
if drive_info.is_empty() {
|
||||||
|
format!("{}:", pool.name)
|
||||||
|
} else {
|
||||||
|
format!("{} {}:", pool.name, drive_info.join(" "))
|
||||||
|
}
|
||||||
|
} else if pool.pool_type == "single" {
|
||||||
format!("{}:", pool.mount_point)
|
format!("{}:", pool.mount_point)
|
||||||
} else {
|
} else {
|
||||||
format!("{} ({}):", pool.mount_point, pool.pool_type)
|
format!("{} ({}):", pool.mount_point, pool.pool_type)
|
||||||
@@ -470,49 +501,32 @@ impl SystemWidget {
|
|||||||
);
|
);
|
||||||
lines.push(Line::from(pool_spans));
|
lines.push(Line::from(pool_spans));
|
||||||
|
|
||||||
// Pool health line (for multi-disk pools)
|
// Skip pool health line as discussed - removed
|
||||||
if pool.pool_type != "single" {
|
|
||||||
if let Some(health) = &pool.pool_health {
|
// Total usage line (only show for multi-drive pools, skip for single physical drives)
|
||||||
let health_text = match health.as_str() {
|
if !pool.pool_type.starts_with("drive (") {
|
||||||
"healthy" => format!("Pool Status: {} Healthy",
|
let usage_text = match (pool.usage_percent, pool.used_gb, pool.total_gb) {
|
||||||
if pool.drives.len() > 1 { format!("({} drives)", pool.drives.len()) } else { String::new() }),
|
(Some(pct), Some(used), Some(total)) => {
|
||||||
"degraded" => "Pool Status: ⚠ Degraded".to_string(),
|
format!("Total: {:.0}% {:.1}GB/{:.1}GB", pct, used, total)
|
||||||
"critical" => "Pool Status: ✗ Critical".to_string(),
|
}
|
||||||
"rebuilding" => "Pool Status: ⟳ Rebuilding".to_string(),
|
_ => "Total: —% —GB/—GB".to_string(),
|
||||||
_ => format!("Pool Status: ? {}", health),
|
};
|
||||||
};
|
|
||||||
|
let has_drives = !pool.drives.is_empty();
|
||||||
let mut health_spans = vec![
|
let has_filesystems = !pool.filesystems.is_empty();
|
||||||
Span::raw(" "),
|
let has_children = has_drives || has_filesystems;
|
||||||
Span::styled("├─ ", Typography::tree()),
|
let tree_symbol = if has_children { "├─" } else { "└─" };
|
||||||
];
|
let mut usage_spans = vec![
|
||||||
health_spans.extend(StatusIcons::create_status_spans(pool.health_status.clone(), &health_text));
|
Span::raw(" "),
|
||||||
lines.push(Line::from(health_spans));
|
Span::styled(tree_symbol, Typography::tree()),
|
||||||
}
|
Span::raw(" "),
|
||||||
|
];
|
||||||
|
usage_spans.extend(StatusIcons::create_status_spans(pool.status.clone(), &usage_text));
|
||||||
|
lines.push(Line::from(usage_spans));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Total usage line (always show for pools)
|
|
||||||
let usage_text = match (pool.usage_percent, pool.used_gb, pool.total_gb) {
|
|
||||||
(Some(pct), Some(used), Some(total)) => {
|
|
||||||
format!("Total: {:.0}% {:.1}GB/{:.1}GB", pct, used, total)
|
|
||||||
}
|
|
||||||
_ => "Total: —% —GB/—GB".to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let has_drives = !pool.drives.is_empty();
|
|
||||||
let has_filesystems = !pool.filesystems.is_empty();
|
|
||||||
let has_children = has_drives || has_filesystems;
|
|
||||||
let tree_symbol = if has_children { "├─" } else { "└─" };
|
|
||||||
let mut usage_spans = vec![
|
|
||||||
Span::raw(" "),
|
|
||||||
Span::styled(tree_symbol, Typography::tree()),
|
|
||||||
Span::raw(" "),
|
|
||||||
];
|
|
||||||
usage_spans.extend(StatusIcons::create_status_spans(pool.status.clone(), &usage_text));
|
|
||||||
lines.push(Line::from(usage_spans));
|
|
||||||
|
|
||||||
// Drive lines with enhanced grouping
|
// Drive lines with enhanced grouping
|
||||||
if pool.pool_type != "single" && pool.drives.len() > 1 {
|
if pool.pool_type.contains("mergerfs") && pool.drives.len() > 1 {
|
||||||
// Group drives by type for mergerfs pools
|
// Group drives by type for mergerfs pools
|
||||||
let (data_drives, parity_drives): (Vec<_>, Vec<_>) = pool.drives.iter().enumerate()
|
let (data_drives, parity_drives): (Vec<_>, Vec<_>) = pool.drives.iter().enumerate()
|
||||||
.partition(|(_, drive)| {
|
.partition(|(_, drive)| {
|
||||||
@@ -521,7 +535,7 @@ impl SystemWidget {
|
|||||||
});
|
});
|
||||||
|
|
||||||
// Show data drives
|
// Show data drives
|
||||||
if !data_drives.is_empty() && pool.pool_type.contains("mergerfs") {
|
if !data_drives.is_empty() {
|
||||||
lines.push(Line::from(vec![
|
lines.push(Line::from(vec![
|
||||||
Span::raw(" "),
|
Span::raw(" "),
|
||||||
Span::styled("├─ ", Typography::tree()),
|
Span::styled("├─ ", Typography::tree()),
|
||||||
@@ -539,7 +553,7 @@ impl SystemWidget {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Show parity drives
|
// Show parity drives
|
||||||
if !parity_drives.is_empty() && pool.pool_type.contains("mergerfs") {
|
if !parity_drives.is_empty() {
|
||||||
lines.push(Line::from(vec![
|
lines.push(Line::from(vec![
|
||||||
Span::raw(" "),
|
Span::raw(" "),
|
||||||
Span::styled("└─ ", Typography::tree()),
|
Span::styled("└─ ", Typography::tree()),
|
||||||
@@ -554,43 +568,16 @@ impl SystemWidget {
|
|||||||
self.render_drive_line(&mut lines, drive, " ├─");
|
self.render_drive_line(&mut lines, drive, " ├─");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
}
|
||||||
// Regular drive listing for non-mergerfs pools
|
} else if pool.pool_type != "single" && pool.drives.len() > 1 {
|
||||||
for (i, drive) in pool.drives.iter().enumerate() {
|
// Regular drive listing for non-mergerfs multi-drive pools
|
||||||
let is_last = i == pool.drives.len() - 1;
|
for (i, drive) in pool.drives.iter().enumerate() {
|
||||||
let tree_symbol = if is_last { "└─" } else { "├─" };
|
let is_last = i == pool.drives.len() - 1;
|
||||||
self.render_drive_line(&mut lines, drive, tree_symbol);
|
let tree_symbol = if is_last { "└─" } else { "├─" };
|
||||||
}
|
self.render_drive_line(&mut lines, drive, tree_symbol);
|
||||||
}
|
}
|
||||||
} else if pool.pool_type.starts_with("drive (") {
|
} else if pool.pool_type.starts_with("drive (") {
|
||||||
// Physical drive pools: show drive info + filesystem children
|
// Physical drive pools: wear data shown in header, skip drive lines, show filesystems directly
|
||||||
// First show drive information
|
|
||||||
for drive in &pool.drives {
|
|
||||||
let mut drive_info = Vec::new();
|
|
||||||
if let Some(temp) = drive.temperature {
|
|
||||||
drive_info.push(format!("T: {:.0}°C", temp));
|
|
||||||
}
|
|
||||||
if let Some(wear) = drive.wear_percent {
|
|
||||||
drive_info.push(format!("W: {:.0}%", wear));
|
|
||||||
}
|
|
||||||
let drive_text = if drive_info.is_empty() {
|
|
||||||
format!("Drive: {}", drive.name)
|
|
||||||
} else {
|
|
||||||
format!("Drive: {}", drive_info.join(" "))
|
|
||||||
};
|
|
||||||
|
|
||||||
let has_filesystems = !pool.filesystems.is_empty();
|
|
||||||
let tree_symbol = if has_filesystems { "├─" } else { "└─" };
|
|
||||||
let mut drive_spans = vec![
|
|
||||||
Span::raw(" "),
|
|
||||||
Span::styled(tree_symbol, Typography::tree()),
|
|
||||||
Span::raw(" "),
|
|
||||||
];
|
|
||||||
drive_spans.extend(StatusIcons::create_status_spans(drive.status.clone(), &drive_text));
|
|
||||||
lines.push(Line::from(drive_spans));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Then show filesystem children
|
|
||||||
for (i, filesystem) in pool.filesystems.iter().enumerate() {
|
for (i, filesystem) in pool.filesystems.iter().enumerate() {
|
||||||
let is_last = i == pool.filesystems.len() - 1;
|
let is_last = i == pool.filesystems.len() - 1;
|
||||||
let tree_symbol = if is_last { "└─" } else { "├─" };
|
let tree_symbol = if is_last { "└─" } else { "├─" };
|
||||||
@@ -641,10 +628,12 @@ impl SystemWidget {
|
|||||||
if let Some(wear) = drive.wear_percent {
|
if let Some(wear) = drive.wear_percent {
|
||||||
drive_info.push(format!("W: {:.0}%", wear));
|
drive_info.push(format!("W: {:.0}%", wear));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Always show drive name with info, or just name if no info available
|
||||||
let drive_text = if drive_info.is_empty() {
|
let drive_text = if drive_info.is_empty() {
|
||||||
drive.name.clone()
|
drive.name.clone()
|
||||||
} else {
|
} else {
|
||||||
format!("{} {}", drive.name, drive_info.join(" • "))
|
format!("{} {}", drive.name, drive_info.join(" "))
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut drive_spans = vec![
|
let mut drive_spans = vec![
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "cm-dashboard-shared"
|
name = "cm-dashboard-shared"
|
||||||
version = "0.1.122"
|
version = "0.1.131"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|||||||
161
shared/src/agent_data.rs
Normal file
161
shared/src/agent_data.rs
Normal file
@@ -0,0 +1,161 @@
|
|||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
/// Complete structured data from an agent
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct AgentData {
|
||||||
|
pub hostname: String,
|
||||||
|
pub agent_version: String,
|
||||||
|
pub timestamp: u64,
|
||||||
|
pub system: SystemData,
|
||||||
|
pub services: Vec<ServiceData>,
|
||||||
|
pub backup: BackupData,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// System-level monitoring data
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct SystemData {
|
||||||
|
pub cpu: CpuData,
|
||||||
|
pub memory: MemoryData,
|
||||||
|
pub storage: StorageData,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// CPU monitoring data
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct CpuData {
|
||||||
|
pub load_1min: f32,
|
||||||
|
pub load_5min: f32,
|
||||||
|
pub load_15min: f32,
|
||||||
|
pub frequency_mhz: f32,
|
||||||
|
pub temperature_celsius: Option<f32>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Memory monitoring data
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct MemoryData {
|
||||||
|
pub usage_percent: f32,
|
||||||
|
pub total_gb: f32,
|
||||||
|
pub used_gb: f32,
|
||||||
|
pub available_gb: f32,
|
||||||
|
pub swap_total_gb: f32,
|
||||||
|
pub swap_used_gb: f32,
|
||||||
|
pub tmpfs: Vec<TmpfsData>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tmpfs filesystem data
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct TmpfsData {
|
||||||
|
pub mount: String,
|
||||||
|
pub usage_percent: f32,
|
||||||
|
pub used_gb: f32,
|
||||||
|
pub total_gb: f32,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Storage monitoring data
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct StorageData {
|
||||||
|
pub drives: Vec<DriveData>,
|
||||||
|
pub pools: Vec<PoolData>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Individual drive data
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct DriveData {
|
||||||
|
pub name: String,
|
||||||
|
pub health: String,
|
||||||
|
pub temperature_celsius: Option<f32>,
|
||||||
|
pub wear_percent: Option<f32>,
|
||||||
|
pub filesystems: Vec<FilesystemData>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Filesystem on a drive
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct FilesystemData {
|
||||||
|
pub mount: String,
|
||||||
|
pub usage_percent: f32,
|
||||||
|
pub used_gb: f32,
|
||||||
|
pub total_gb: f32,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Storage pool (MergerFS, RAID, etc.)
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct PoolData {
|
||||||
|
pub name: String,
|
||||||
|
pub mount: String,
|
||||||
|
pub pool_type: String, // "mergerfs", "raid", etc.
|
||||||
|
pub health: String,
|
||||||
|
pub usage_percent: f32,
|
||||||
|
pub used_gb: f32,
|
||||||
|
pub total_gb: f32,
|
||||||
|
pub data_drives: Vec<PoolDriveData>,
|
||||||
|
pub parity_drives: Vec<PoolDriveData>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Drive in a storage pool
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct PoolDriveData {
|
||||||
|
pub name: String,
|
||||||
|
pub temperature_celsius: Option<f32>,
|
||||||
|
pub wear_percent: Option<f32>,
|
||||||
|
pub health: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Service monitoring data
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct ServiceData {
|
||||||
|
pub name: String,
|
||||||
|
pub status: String, // "active", "inactive", "failed"
|
||||||
|
pub memory_mb: f32,
|
||||||
|
pub disk_gb: f32,
|
||||||
|
pub user_stopped: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Backup system data
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct BackupData {
|
||||||
|
pub status: String,
|
||||||
|
pub last_run: Option<u64>,
|
||||||
|
pub next_scheduled: Option<u64>,
|
||||||
|
pub total_size_gb: Option<f32>,
|
||||||
|
pub repository_health: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AgentData {
|
||||||
|
/// Create new agent data with current timestamp
|
||||||
|
pub fn new(hostname: String, agent_version: String) -> Self {
|
||||||
|
Self {
|
||||||
|
hostname,
|
||||||
|
agent_version,
|
||||||
|
timestamp: chrono::Utc::now().timestamp() as u64,
|
||||||
|
system: SystemData {
|
||||||
|
cpu: CpuData {
|
||||||
|
load_1min: 0.0,
|
||||||
|
load_5min: 0.0,
|
||||||
|
load_15min: 0.0,
|
||||||
|
frequency_mhz: 0.0,
|
||||||
|
temperature_celsius: None,
|
||||||
|
},
|
||||||
|
memory: MemoryData {
|
||||||
|
usage_percent: 0.0,
|
||||||
|
total_gb: 0.0,
|
||||||
|
used_gb: 0.0,
|
||||||
|
available_gb: 0.0,
|
||||||
|
swap_total_gb: 0.0,
|
||||||
|
swap_used_gb: 0.0,
|
||||||
|
tmpfs: Vec::new(),
|
||||||
|
},
|
||||||
|
storage: StorageData {
|
||||||
|
drives: Vec::new(),
|
||||||
|
pools: Vec::new(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
services: Vec::new(),
|
||||||
|
backup: BackupData {
|
||||||
|
status: "unknown".to_string(),
|
||||||
|
last_run: None,
|
||||||
|
next_scheduled: None,
|
||||||
|
total_size_gb: None,
|
||||||
|
repository_health: None,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,8 +1,10 @@
|
|||||||
|
pub mod agent_data;
|
||||||
pub mod cache;
|
pub mod cache;
|
||||||
pub mod error;
|
pub mod error;
|
||||||
pub mod metrics;
|
pub mod metrics;
|
||||||
pub mod protocol;
|
pub mod protocol;
|
||||||
|
|
||||||
|
pub use agent_data::*;
|
||||||
pub use cache::*;
|
pub use cache::*;
|
||||||
pub use error::*;
|
pub use error::*;
|
||||||
pub use metrics::*;
|
pub use metrics::*;
|
||||||
|
|||||||
@@ -1,13 +1,9 @@
|
|||||||
use crate::metrics::Metric;
|
use crate::agent_data::AgentData;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
/// Message sent from agent to dashboard via ZMQ
|
/// Message sent from agent to dashboard via ZMQ
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
/// Always structured data - no legacy metrics support
|
||||||
pub struct MetricMessage {
|
pub type AgentMessage = AgentData;
|
||||||
pub hostname: String,
|
|
||||||
pub timestamp: u64,
|
|
||||||
pub metrics: Vec<Metric>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Command output streaming message
|
/// Command output streaming message
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
@@ -20,15 +16,6 @@ pub struct CommandOutputMessage {
|
|||||||
pub timestamp: u64,
|
pub timestamp: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MetricMessage {
|
|
||||||
pub fn new(hostname: String, metrics: Vec<Metric>) -> Self {
|
|
||||||
Self {
|
|
||||||
hostname,
|
|
||||||
timestamp: chrono::Utc::now().timestamp() as u64,
|
|
||||||
metrics,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CommandOutputMessage {
|
impl CommandOutputMessage {
|
||||||
pub fn new(hostname: String, command_id: String, command_type: String, output_line: String, is_complete: bool) -> Self {
|
pub fn new(hostname: String, command_id: String, command_type: String, output_line: String, is_complete: bool) -> Self {
|
||||||
@@ -59,8 +46,8 @@ pub enum Command {
|
|||||||
pub enum CommandResponse {
|
pub enum CommandResponse {
|
||||||
/// Acknowledgment of command
|
/// Acknowledgment of command
|
||||||
Ack,
|
Ack,
|
||||||
/// Metrics response
|
/// Agent data response
|
||||||
Metrics(Vec<Metric>),
|
AgentData(AgentData),
|
||||||
/// Pong response to ping
|
/// Pong response to ping
|
||||||
Pong,
|
Pong,
|
||||||
/// Error response
|
/// Error response
|
||||||
@@ -76,7 +63,7 @@ pub struct MessageEnvelope {
|
|||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub enum MessageType {
|
pub enum MessageType {
|
||||||
Metrics,
|
AgentData,
|
||||||
Command,
|
Command,
|
||||||
CommandResponse,
|
CommandResponse,
|
||||||
CommandOutput,
|
CommandOutput,
|
||||||
@@ -84,10 +71,10 @@ pub enum MessageType {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl MessageEnvelope {
|
impl MessageEnvelope {
|
||||||
pub fn metrics(message: MetricMessage) -> Result<Self, crate::SharedError> {
|
pub fn agent_data(data: AgentData) -> Result<Self, crate::SharedError> {
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
message_type: MessageType::Metrics,
|
message_type: MessageType::AgentData,
|
||||||
payload: serde_json::to_vec(&message)?,
|
payload: serde_json::to_vec(&data)?,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -119,11 +106,11 @@ impl MessageEnvelope {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn decode_metrics(&self) -> Result<MetricMessage, crate::SharedError> {
|
pub fn decode_agent_data(&self) -> Result<AgentData, crate::SharedError> {
|
||||||
match self.message_type {
|
match self.message_type {
|
||||||
MessageType::Metrics => Ok(serde_json::from_slice(&self.payload)?),
|
MessageType::AgentData => Ok(serde_json::from_slice(&self.payload)?),
|
||||||
_ => Err(crate::SharedError::Protocol {
|
_ => Err(crate::SharedError::Protocol {
|
||||||
message: "Expected metrics message".to_string(),
|
message: "Expected agent data message".to_string(),
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user