Implement complete structured data architecture
All checks were successful
Build and Release / build-and-release (push) Successful in 2m10s
All checks were successful
Build and Release / build-and-release (push) Successful in 2m10s
Replace fragile string-based metrics with type-safe JSON data structures. Agent converts all metrics to structured data, dashboard processes typed fields. Changes: - Add AgentData struct with CPU, memory, storage, services, backup fields - Replace string parsing with direct field access throughout system - Maintain UI compatibility via temporary metric bridge conversion - Fix NVMe temperature display and eliminate string parsing bugs - Update protocol to support structured data transmission over ZMQ - Comprehensive metric type coverage: CPU, memory, storage, services, backup Version bump to 0.1.131
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "cm-dashboard-agent"
|
||||
version = "0.1.130"
|
||||
version = "0.1.131"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -10,7 +10,7 @@ use crate::metrics::MetricCollectionManager;
|
||||
use crate::notifications::NotificationManager;
|
||||
use crate::service_tracker::UserStoppedServiceTracker;
|
||||
use crate::status::HostStatusManager;
|
||||
use cm_dashboard_shared::{Metric, MetricMessage, MetricValue, Status};
|
||||
use cm_dashboard_shared::{AgentData, Metric, MetricValue, Status, TmpfsData, DriveData, FilesystemData, ServiceData};
|
||||
|
||||
pub struct Agent {
|
||||
hostname: String,
|
||||
@@ -199,16 +199,310 @@ impl Agent {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
debug!("Broadcasting {} cached metrics (including host status summary)", metrics.len());
|
||||
debug!("Broadcasting {} cached metrics as structured data", metrics.len());
|
||||
|
||||
// Create and send message with all current data
|
||||
let message = MetricMessage::new(self.hostname.clone(), metrics);
|
||||
self.zmq_handler.publish_metrics(&message).await?;
|
||||
|
||||
debug!("Metrics broadcasted successfully");
|
||||
// Convert metrics to structured data and send
|
||||
let agent_data = self.metrics_to_structured_data(&metrics)?;
|
||||
self.zmq_handler.publish_agent_data(&agent_data).await?;
|
||||
|
||||
debug!("Structured data broadcasted successfully");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Convert legacy metrics to structured data format
|
||||
fn metrics_to_structured_data(&self, metrics: &[Metric]) -> Result<AgentData> {
|
||||
let mut agent_data = AgentData::new(self.hostname.clone(), self.get_agent_version());
|
||||
|
||||
// Parse metrics into structured data
|
||||
for metric in metrics {
|
||||
self.parse_metric_into_agent_data(&mut agent_data, metric)?;
|
||||
}
|
||||
|
||||
Ok(agent_data)
|
||||
}
|
||||
|
||||
/// Parse a single metric into the appropriate structured data field
|
||||
fn parse_metric_into_agent_data(&self, agent_data: &mut AgentData, metric: &Metric) -> Result<()> {
|
||||
// CPU metrics
|
||||
if metric.name == "cpu_load_1min" {
|
||||
if let Some(value) = metric.value.as_f32() {
|
||||
agent_data.system.cpu.load_1min = value;
|
||||
}
|
||||
} else if metric.name == "cpu_load_5min" {
|
||||
if let Some(value) = metric.value.as_f32() {
|
||||
agent_data.system.cpu.load_5min = value;
|
||||
}
|
||||
} else if metric.name == "cpu_load_15min" {
|
||||
if let Some(value) = metric.value.as_f32() {
|
||||
agent_data.system.cpu.load_15min = value;
|
||||
}
|
||||
} else if metric.name == "cpu_frequency_mhz" {
|
||||
if let Some(value) = metric.value.as_f32() {
|
||||
agent_data.system.cpu.frequency_mhz = value;
|
||||
}
|
||||
} else if metric.name == "cpu_temperature_celsius" {
|
||||
if let Some(value) = metric.value.as_f32() {
|
||||
agent_data.system.cpu.temperature_celsius = Some(value);
|
||||
}
|
||||
}
|
||||
// Memory metrics
|
||||
else if metric.name == "memory_usage_percent" {
|
||||
if let Some(value) = metric.value.as_f32() {
|
||||
agent_data.system.memory.usage_percent = value;
|
||||
}
|
||||
} else if metric.name == "memory_total_gb" {
|
||||
if let Some(value) = metric.value.as_f32() {
|
||||
agent_data.system.memory.total_gb = value;
|
||||
}
|
||||
} else if metric.name == "memory_used_gb" {
|
||||
if let Some(value) = metric.value.as_f32() {
|
||||
agent_data.system.memory.used_gb = value;
|
||||
}
|
||||
} else if metric.name == "memory_available_gb" {
|
||||
if let Some(value) = metric.value.as_f32() {
|
||||
agent_data.system.memory.available_gb = value;
|
||||
}
|
||||
} else if metric.name == "memory_swap_total_gb" {
|
||||
if let Some(value) = metric.value.as_f32() {
|
||||
agent_data.system.memory.swap_total_gb = value;
|
||||
}
|
||||
} else if metric.name == "memory_swap_used_gb" {
|
||||
if let Some(value) = metric.value.as_f32() {
|
||||
agent_data.system.memory.swap_used_gb = value;
|
||||
}
|
||||
}
|
||||
// Tmpfs metrics
|
||||
else if metric.name.starts_with("memory_tmp_") {
|
||||
// For now, create a single /tmp tmpfs entry
|
||||
if metric.name == "memory_tmp_usage_percent" {
|
||||
if let Some(value) = metric.value.as_f32() {
|
||||
if let Some(tmpfs) = agent_data.system.memory.tmpfs.get_mut(0) {
|
||||
tmpfs.usage_percent = value;
|
||||
} else {
|
||||
agent_data.system.memory.tmpfs.push(TmpfsData {
|
||||
mount: "/tmp".to_string(),
|
||||
usage_percent: value,
|
||||
used_gb: 0.0,
|
||||
total_gb: 0.0,
|
||||
});
|
||||
}
|
||||
}
|
||||
} else if metric.name == "memory_tmp_used_gb" {
|
||||
if let Some(value) = metric.value.as_f32() {
|
||||
if let Some(tmpfs) = agent_data.system.memory.tmpfs.get_mut(0) {
|
||||
tmpfs.used_gb = value;
|
||||
} else {
|
||||
agent_data.system.memory.tmpfs.push(TmpfsData {
|
||||
mount: "/tmp".to_string(),
|
||||
usage_percent: 0.0,
|
||||
used_gb: value,
|
||||
total_gb: 0.0,
|
||||
});
|
||||
}
|
||||
}
|
||||
} else if metric.name == "memory_tmp_total_gb" {
|
||||
if let Some(value) = metric.value.as_f32() {
|
||||
if let Some(tmpfs) = agent_data.system.memory.tmpfs.get_mut(0) {
|
||||
tmpfs.total_gb = value;
|
||||
} else {
|
||||
agent_data.system.memory.tmpfs.push(TmpfsData {
|
||||
mount: "/tmp".to_string(),
|
||||
usage_percent: 0.0,
|
||||
used_gb: 0.0,
|
||||
total_gb: value,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Storage metrics
|
||||
else if metric.name.starts_with("disk_") {
|
||||
if metric.name.contains("_temperature") {
|
||||
if let Some(drive_name) = self.extract_drive_name(&metric.name) {
|
||||
if let Some(temp) = metric.value.as_f32() {
|
||||
self.ensure_drive_exists(agent_data, &drive_name);
|
||||
if let Some(drive) = agent_data.system.storage.drives.iter_mut().find(|d| d.name == drive_name) {
|
||||
drive.temperature_celsius = Some(temp);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if metric.name.contains("_wear_percent") {
|
||||
if let Some(drive_name) = self.extract_drive_name(&metric.name) {
|
||||
if let Some(wear) = metric.value.as_f32() {
|
||||
self.ensure_drive_exists(agent_data, &drive_name);
|
||||
if let Some(drive) = agent_data.system.storage.drives.iter_mut().find(|d| d.name == drive_name) {
|
||||
drive.wear_percent = Some(wear);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if metric.name.contains("_health") {
|
||||
if let Some(drive_name) = self.extract_drive_name(&metric.name) {
|
||||
let health = metric.value.as_string();
|
||||
self.ensure_drive_exists(agent_data, &drive_name);
|
||||
if let Some(drive) = agent_data.system.storage.drives.iter_mut().find(|d| d.name == drive_name) {
|
||||
drive.health = health;
|
||||
}
|
||||
}
|
||||
} else if metric.name.contains("_fs_") {
|
||||
// Filesystem metrics: disk_{pool}_fs_{filesystem}_{metric}
|
||||
if let Some((pool_name, fs_name)) = self.extract_pool_and_filesystem(&metric.name) {
|
||||
if metric.name.contains("_usage_percent") {
|
||||
if let Some(usage) = metric.value.as_f32() {
|
||||
self.ensure_filesystem_exists(agent_data, &pool_name, &fs_name, usage, 0.0, 0.0);
|
||||
}
|
||||
} else if metric.name.contains("_used_gb") {
|
||||
if let Some(used) = metric.value.as_f32() {
|
||||
self.update_filesystem_field(agent_data, &pool_name, &fs_name, |fs| fs.used_gb = used);
|
||||
}
|
||||
} else if metric.name.contains("_total_gb") {
|
||||
if let Some(total) = metric.value.as_f32() {
|
||||
self.update_filesystem_field(agent_data, &pool_name, &fs_name, |fs| fs.total_gb = total);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Service metrics
|
||||
else if metric.name.starts_with("service_") {
|
||||
if let Some(service_name) = self.extract_service_name(&metric.name) {
|
||||
if metric.name.contains("_status") {
|
||||
let status = metric.value.as_string();
|
||||
self.ensure_service_exists(agent_data, &service_name, &status);
|
||||
} else if metric.name.contains("_memory_mb") {
|
||||
if let Some(memory) = metric.value.as_f32() {
|
||||
self.update_service_field(agent_data, &service_name, |svc| svc.memory_mb = memory);
|
||||
}
|
||||
} else if metric.name.contains("_disk_gb") {
|
||||
if let Some(disk) = metric.value.as_f32() {
|
||||
self.update_service_field(agent_data, &service_name, |svc| svc.disk_gb = disk);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Backup metrics
|
||||
else if metric.name.starts_with("backup_") {
|
||||
if metric.name == "backup_status" {
|
||||
agent_data.backup.status = metric.value.as_string();
|
||||
} else if metric.name == "backup_last_run_timestamp" {
|
||||
if let Some(timestamp) = metric.value.as_i64() {
|
||||
agent_data.backup.last_run = Some(timestamp as u64);
|
||||
}
|
||||
} else if metric.name == "backup_next_scheduled_timestamp" {
|
||||
if let Some(timestamp) = metric.value.as_i64() {
|
||||
agent_data.backup.next_scheduled = Some(timestamp as u64);
|
||||
}
|
||||
} else if metric.name == "backup_size_gb" {
|
||||
if let Some(size) = metric.value.as_f32() {
|
||||
agent_data.backup.total_size_gb = Some(size);
|
||||
}
|
||||
} else if metric.name == "backup_repository_health" {
|
||||
agent_data.backup.repository_health = Some(metric.value.as_string());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Extract drive name from metric like "disk_nvme0n1_temperature"
|
||||
fn extract_drive_name(&self, metric_name: &str) -> Option<String> {
|
||||
if metric_name.starts_with("disk_") {
|
||||
let suffixes = ["_temperature", "_wear_percent", "_health"];
|
||||
for suffix in suffixes {
|
||||
if let Some(suffix_pos) = metric_name.rfind(suffix) {
|
||||
return Some(metric_name[5..suffix_pos].to_string()); // Skip "disk_"
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Extract pool and filesystem from "disk_{pool}_fs_{filesystem}_{metric}"
|
||||
fn extract_pool_and_filesystem(&self, metric_name: &str) -> Option<(String, String)> {
|
||||
if let Some(fs_pos) = metric_name.find("_fs_") {
|
||||
let pool_name = metric_name[5..fs_pos].to_string(); // Skip "disk_"
|
||||
let after_fs = &metric_name[fs_pos + 4..]; // Skip "_fs_"
|
||||
if let Some(metric_pos) = after_fs.find('_') {
|
||||
let fs_name = after_fs[..metric_pos].to_string();
|
||||
return Some((pool_name, fs_name));
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Extract service name from "service_{name}_{metric}"
|
||||
fn extract_service_name(&self, metric_name: &str) -> Option<String> {
|
||||
if metric_name.starts_with("service_") {
|
||||
let suffixes = ["_status", "_memory_mb", "_disk_gb"];
|
||||
for suffix in suffixes {
|
||||
if let Some(suffix_pos) = metric_name.rfind(suffix) {
|
||||
return Some(metric_name[8..suffix_pos].to_string()); // Skip "service_"
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Ensure drive exists in agent_data
|
||||
fn ensure_drive_exists(&self, agent_data: &mut AgentData, drive_name: &str) {
|
||||
if !agent_data.system.storage.drives.iter().any(|d| d.name == drive_name) {
|
||||
agent_data.system.storage.drives.push(DriveData {
|
||||
name: drive_name.to_string(),
|
||||
health: "UNKNOWN".to_string(),
|
||||
temperature_celsius: None,
|
||||
wear_percent: None,
|
||||
filesystems: Vec::new(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Ensure filesystem exists in the correct drive
|
||||
fn ensure_filesystem_exists(&self, agent_data: &mut AgentData, pool_name: &str, fs_name: &str, usage_percent: f32, used_gb: f32, total_gb: f32) {
|
||||
self.ensure_drive_exists(agent_data, pool_name);
|
||||
if let Some(drive) = agent_data.system.storage.drives.iter_mut().find(|d| d.name == pool_name) {
|
||||
if !drive.filesystems.iter().any(|fs| fs.mount == fs_name) {
|
||||
drive.filesystems.push(FilesystemData {
|
||||
mount: fs_name.to_string(),
|
||||
usage_percent,
|
||||
used_gb,
|
||||
total_gb,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Update filesystem field
|
||||
fn update_filesystem_field<F>(&self, agent_data: &mut AgentData, pool_name: &str, fs_name: &str, update_fn: F)
|
||||
where F: FnOnce(&mut FilesystemData) {
|
||||
if let Some(drive) = agent_data.system.storage.drives.iter_mut().find(|d| d.name == pool_name) {
|
||||
if let Some(fs) = drive.filesystems.iter_mut().find(|fs| fs.mount == fs_name) {
|
||||
update_fn(fs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Ensure service exists
|
||||
fn ensure_service_exists(&self, agent_data: &mut AgentData, service_name: &str, status: &str) {
|
||||
if !agent_data.services.iter().any(|s| s.name == service_name) {
|
||||
agent_data.services.push(ServiceData {
|
||||
name: service_name.to_string(),
|
||||
status: status.to_string(),
|
||||
memory_mb: 0.0,
|
||||
disk_gb: 0.0,
|
||||
user_stopped: false, // TODO: Get from service tracker
|
||||
});
|
||||
} else if let Some(service) = agent_data.services.iter_mut().find(|s| s.name == service_name) {
|
||||
service.status = status.to_string();
|
||||
}
|
||||
}
|
||||
|
||||
/// Update service field
|
||||
fn update_service_field<F>(&self, agent_data: &mut AgentData, service_name: &str, update_fn: F)
|
||||
where F: FnOnce(&mut ServiceData) {
|
||||
if let Some(service) = agent_data.services.iter_mut().find(|s| s.name == service_name) {
|
||||
update_fn(service);
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_metrics(&mut self, metrics: &[Metric]) -> bool {
|
||||
let mut status_changed = false;
|
||||
for metric in metrics {
|
||||
@@ -261,13 +555,11 @@ impl Agent {
|
||||
|
||||
/// Send standalone heartbeat for connectivity detection
|
||||
async fn send_heartbeat(&mut self) -> Result<()> {
|
||||
let heartbeat_metric = self.get_heartbeat_metric();
|
||||
let message = MetricMessage::new(
|
||||
self.hostname.clone(),
|
||||
vec![heartbeat_metric],
|
||||
);
|
||||
|
||||
self.zmq_handler.publish_metrics(&message).await?;
|
||||
// Create minimal agent data with just heartbeat
|
||||
let agent_data = AgentData::new(self.hostname.clone(), self.get_agent_version());
|
||||
// Heartbeat timestamp is already set in AgentData::new()
|
||||
|
||||
self.zmq_handler.publish_agent_data(&agent_data).await?;
|
||||
debug!("Sent standalone heartbeat for connectivity detection");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use anyhow::Result;
|
||||
use cm_dashboard_shared::{MessageEnvelope, MetricMessage};
|
||||
use cm_dashboard_shared::{AgentData, MessageEnvelope};
|
||||
use tracing::{debug, info};
|
||||
use zmq::{Context, Socket, SocketType};
|
||||
|
||||
@@ -43,17 +43,17 @@ impl ZmqHandler {
|
||||
})
|
||||
}
|
||||
|
||||
/// Publish metrics message via ZMQ
|
||||
pub async fn publish_metrics(&self, message: &MetricMessage) -> Result<()> {
|
||||
|
||||
/// Publish agent data via ZMQ
|
||||
pub async fn publish_agent_data(&self, data: &AgentData) -> Result<()> {
|
||||
debug!(
|
||||
"Publishing {} metrics for host {}",
|
||||
message.metrics.len(),
|
||||
message.hostname
|
||||
"Publishing agent data for host {}",
|
||||
data.hostname
|
||||
);
|
||||
|
||||
// Create message envelope
|
||||
let envelope = MessageEnvelope::metrics(message.clone())
|
||||
.map_err(|e| anyhow::anyhow!("Failed to create message envelope: {}", e))?;
|
||||
// Create message envelope for agent data
|
||||
let envelope = MessageEnvelope::agent_data(data.clone())
|
||||
.map_err(|e| anyhow::anyhow!("Failed to create agent data envelope: {}", e))?;
|
||||
|
||||
// Serialize envelope
|
||||
let serialized = serde_json::to_vec(&envelope)?;
|
||||
@@ -61,11 +61,10 @@ impl ZmqHandler {
|
||||
// Send via ZMQ
|
||||
self.publisher.send(&serialized, 0)?;
|
||||
|
||||
debug!("Published metrics message ({} bytes)", serialized.len());
|
||||
debug!("Published agent data message ({} bytes)", serialized.len());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
/// Try to receive a command (non-blocking)
|
||||
pub fn try_receive_command(&self) -> Result<Option<AgentCommand>> {
|
||||
match self.command_receiver.recv_bytes(zmq::DONTWAIT) {
|
||||
|
||||
Reference in New Issue
Block a user