Christoffer Martinsson 2581435b10 Implement per-service disk usage monitoring
Replaced system-wide disk usage with accurate per-service tracking by scanning
service-specific directories. Services like sshd now correctly show minimal
disk usage instead of misleading system totals.

- Rename storage widget and add drive capacity/usage columns
- Move host display to main dashboard title for cleaner layout
- Replace separate alert displays with color-coded row highlighting
- Add per-service disk usage collection using du command
- Update services widget formatting to handle small disk values
- Restructure into workspace with dedicated agent and dashboard packages
2025-10-11 22:59:16 +02:00

512 lines
14 KiB
Rust

mod app;
mod config;
mod data;
mod ui;
use std::fs;
use std::io::{self, Stdout};
use std::path::{Path, PathBuf};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, OnceLock,
};
use std::time::Duration;
use crate::data::metrics::{BackupMetrics, ServiceMetrics, SmartMetrics};
use anyhow::{anyhow, Context, Result};
use chrono::{TimeZone, Utc};
use clap::{ArgAction, Parser, Subcommand};
use cm_dashboard_shared::envelope::{AgentType, MetricsEnvelope};
use crossterm::event::{self, Event};
use crossterm::terminal::{disable_raw_mode, enable_raw_mode};
use crossterm::{execute, terminal};
use ratatui::backend::CrosstermBackend;
use ratatui::Terminal;
use serde_json::Value;
use tokio::sync::mpsc::{
error::TryRecvError, unbounded_channel, UnboundedReceiver, UnboundedSender,
};
use tokio::task::{spawn_blocking, JoinHandle};
use tracing::{debug, warn};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::EnvFilter;
use zmq::{Context as NativeZmqContext, Message as NativeZmqMessage};
use crate::app::{App, AppEvent, AppOptions, ZmqContext};
static LOG_GUARD: OnceLock<WorkerGuard> = OnceLock::new();
#[derive(Parser, Debug)]
#[command(
name = "cm-dashboard",
version,
about = "Infrastructure monitoring TUI for CMTEC"
)]
struct Cli {
#[command(subcommand)]
command: Option<Command>,
/// Optional path to configuration TOML file
#[arg(long, value_name = "FILE")]
config: Option<PathBuf>,
/// Limit dashboard to a single host
#[arg(short = 'H', long, value_name = "HOST")]
host: Option<String>,
/// Interval (ms) to refresh dashboard when idle
#[arg(long, default_value_t = 250)]
tick_rate: u64,
/// Increase logging verbosity (-v, -vv)
#[arg(short, long, action = ArgAction::Count)]
verbose: u8,
/// Override ZMQ endpoints (comma-separated)
#[arg(long, value_delimiter = ',', value_name = "ENDPOINT")]
zmq_endpoint: Vec<String>,
}
#[derive(Subcommand, Debug)]
enum Command {
/// Generate default configuration files
InitConfig {
#[arg(long, value_name = "DIR", default_value = "config")]
dir: PathBuf,
/// Overwrite existing files if they already exist
#[arg(long, action = ArgAction::SetTrue)]
force: bool,
},
}
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
if let Some(Command::InitConfig { dir, force }) = cli.command.as_ref() {
init_tracing(cli.verbose)?;
generate_config_templates(dir, *force)?;
return Ok(());
}
ensure_default_config(&cli)?;
let options = AppOptions {
config: cli.config,
host: cli.host,
tick_rate: Duration::from_millis(cli.tick_rate.max(16)),
verbosity: cli.verbose,
zmq_endpoints_override: cli.zmq_endpoint,
};
init_tracing(options.verbosity)?;
let mut app = App::new(options)?;
let (event_tx, mut event_rx) = unbounded_channel();
let shutdown_flag = Arc::new(AtomicBool::new(false));
let zmq_task = if let Some(context) = app.zmq_context() {
Some(spawn_metrics_task(
context,
event_tx.clone(),
shutdown_flag.clone(),
))
} else {
None
};
let mut terminal = setup_terminal()?;
let result = run_app(&mut terminal, &mut app, &mut event_rx);
teardown_terminal(terminal)?;
shutdown_flag.store(true, Ordering::Relaxed);
let _ = event_tx.send(AppEvent::Shutdown);
if let Some(handle) = zmq_task {
if let Err(join_error) = handle.await {
warn!(%join_error, "ZMQ metrics task ended unexpectedly");
}
}
result
}
fn setup_terminal() -> Result<Terminal<CrosstermBackend<Stdout>>> {
enable_raw_mode()?;
let mut stdout = io::stdout();
execute!(stdout, terminal::EnterAlternateScreen)?;
let backend = CrosstermBackend::new(stdout);
let terminal = Terminal::new(backend)?;
Ok(terminal)
}
fn teardown_terminal(mut terminal: Terminal<CrosstermBackend<Stdout>>) -> Result<()> {
disable_raw_mode()?;
execute!(terminal.backend_mut(), terminal::LeaveAlternateScreen)?;
terminal.show_cursor()?;
Ok(())
}
fn run_app(
terminal: &mut Terminal<CrosstermBackend<Stdout>>,
app: &mut App,
event_rx: &mut UnboundedReceiver<AppEvent>,
) -> Result<()> {
let tick_rate = app.tick_rate();
while !app.should_quit() {
drain_app_events(app, event_rx);
terminal.draw(|frame| ui::render(frame, app))?;
if event::poll(tick_rate)? {
if let Event::Key(key) = event::read()? {
app.handle_key_event(key);
}
} else {
app.on_tick();
}
}
Ok(())
}
fn drain_app_events(app: &mut App, receiver: &mut UnboundedReceiver<AppEvent>) {
loop {
match receiver.try_recv() {
Ok(event) => app.handle_app_event(event),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => break,
}
}
}
fn init_tracing(verbosity: u8) -> Result<()> {
let level = match verbosity {
0 => "warn",
1 => "info",
2 => "debug",
_ => "trace",
};
let env_filter = std::env::var("RUST_LOG")
.ok()
.and_then(|value| EnvFilter::try_new(value).ok())
.unwrap_or_else(|| EnvFilter::new(level));
let writer = prepare_log_writer()?;
tracing_subscriber::fmt()
.with_env_filter(env_filter)
.with_target(false)
.with_ansi(false)
.with_writer(writer)
.compact()
.try_init()
.map_err(|err| anyhow!(err))?;
Ok(())
}
fn prepare_log_writer() -> Result<tracing_appender::non_blocking::NonBlocking> {
let logs_dir = Path::new("logs");
if !logs_dir.exists() {
fs::create_dir_all(logs_dir).with_context(|| {
format!("failed to create logs directory at {}", logs_dir.display())
})?;
}
let file_appender = tracing_appender::rolling::never(logs_dir, "cm-dashboard.log");
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
LOG_GUARD.get_or_init(|| guard);
Ok(non_blocking)
}
fn spawn_metrics_task(
context: ZmqContext,
sender: UnboundedSender<AppEvent>,
shutdown: Arc<AtomicBool>,
) -> JoinHandle<()> {
tokio::spawn(async move {
match spawn_blocking(move || metrics_blocking_loop(context, sender, shutdown)).await {
Ok(Ok(())) => {}
Ok(Err(error)) => warn!(%error, "ZMQ metrics worker exited with error"),
Err(join_error) => warn!(%join_error, "ZMQ metrics worker panicked"),
}
})
}
fn metrics_blocking_loop(
context: ZmqContext,
sender: UnboundedSender<AppEvent>,
shutdown: Arc<AtomicBool>,
) -> Result<()> {
let zmq_context = NativeZmqContext::new();
let socket = zmq_context
.socket(zmq::SUB)
.context("failed to create ZMQ SUB socket")?;
socket
.set_linger(0)
.context("failed to configure ZMQ linger")?;
socket
.set_rcvtimeo(1_000)
.context("failed to configure ZMQ receive timeout")?;
for endpoint in context.endpoints() {
debug!(%endpoint, "connecting to ZMQ endpoint");
socket
.connect(endpoint)
.with_context(|| format!("failed to connect to {endpoint}"))?;
}
if let Some(prefix) = context.subscription() {
socket
.set_subscribe(prefix.as_bytes())
.context("failed to set ZMQ subscription")?;
} else {
socket
.set_subscribe(b"")
.context("failed to subscribe to all ZMQ topics")?;
}
while !shutdown.load(Ordering::Relaxed) {
match socket.recv_msg(0) {
Ok(message) => {
if let Err(error) = handle_zmq_message(&message, &sender) {
warn!(%error, "failed to handle ZMQ message");
}
}
Err(error) => {
if error == zmq::Error::EAGAIN {
continue;
}
warn!(%error, "ZMQ receive error");
std::thread::sleep(Duration::from_millis(250));
}
}
}
debug!("ZMQ metrics worker shutting down");
Ok(())
}
fn handle_zmq_message(
message: &NativeZmqMessage,
sender: &UnboundedSender<AppEvent>,
) -> Result<()> {
let bytes = message.to_vec();
let envelope: MetricsEnvelope =
serde_json::from_slice(&bytes).with_context(|| "failed to deserialize metrics envelope")?;
let timestamp = Utc
.timestamp_opt(envelope.timestamp as i64, 0)
.single()
.unwrap_or_else(|| Utc::now());
let host = envelope.hostname.clone();
let mut payload = envelope.metrics;
if let Some(obj) = payload.as_object_mut() {
obj.entry("timestamp")
.or_insert_with(|| Value::String(timestamp.to_rfc3339()));
}
match envelope.agent_type {
AgentType::Smart => match serde_json::from_value::<SmartMetrics>(payload.clone()) {
Ok(metrics) => {
let _ = sender.send(AppEvent::MetricsUpdated {
host,
smart: Some(metrics),
services: None,
backup: None,
timestamp,
});
}
Err(error) => {
warn!(%error, "failed to parse smart metrics");
let _ = sender.send(AppEvent::MetricsFailed {
host,
error: format!("smart metrics parse error: {error:#}"),
timestamp,
});
}
},
AgentType::Service => match serde_json::from_value::<ServiceMetrics>(payload.clone()) {
Ok(metrics) => {
let _ = sender.send(AppEvent::MetricsUpdated {
host,
smart: None,
services: Some(metrics),
backup: None,
timestamp,
});
}
Err(error) => {
warn!(%error, "failed to parse service metrics");
let _ = sender.send(AppEvent::MetricsFailed {
host,
error: format!("service metrics parse error: {error:#}"),
timestamp,
});
}
},
AgentType::Backup => match serde_json::from_value::<BackupMetrics>(payload.clone()) {
Ok(metrics) => {
let _ = sender.send(AppEvent::MetricsUpdated {
host,
smart: None,
services: None,
backup: Some(metrics),
timestamp,
});
}
Err(error) => {
warn!(%error, "failed to parse backup metrics");
let _ = sender.send(AppEvent::MetricsFailed {
host,
error: format!("backup metrics parse error: {error:#}"),
timestamp,
});
}
},
}
Ok(())
}
fn ensure_default_config(cli: &Cli) -> Result<()> {
if let Some(path) = cli.config.as_ref() {
ensure_config_at(path, false)?;
} else {
let default_path = Path::new("config/dashboard.toml");
if !default_path.exists() {
generate_config_templates(Path::new("config"), false)?;
println!("Created default configuration in ./config");
}
}
Ok(())
}
fn ensure_config_at(path: &Path, force: bool) -> Result<()> {
if path.exists() && !force {
return Ok(());
}
if let Some(parent) = path.parent() {
if !parent.exists() {
fs::create_dir_all(parent)
.with_context(|| format!("failed to create directory {}", parent.display()))?;
}
write_template(path.to_path_buf(), DASHBOARD_TEMPLATE, force, "dashboard")?;
let hosts_path = parent.join("hosts.toml");
if !hosts_path.exists() || force {
write_template(hosts_path, HOSTS_TEMPLATE, force, "hosts")?;
}
println!(
"Created configuration templates in {} (dashboard: {})",
parent.display(),
path.display()
);
} else {
return Err(anyhow!("invalid configuration path {}", path.display()));
}
Ok(())
}
fn generate_config_templates(target_dir: &Path, force: bool) -> Result<()> {
if !target_dir.exists() {
fs::create_dir_all(target_dir)
.with_context(|| format!("failed to create directory {}", target_dir.display()))?;
}
write_template(
target_dir.join("dashboard.toml"),
DASHBOARD_TEMPLATE,
force,
"dashboard",
)?;
write_template(
target_dir.join("hosts.toml"),
HOSTS_TEMPLATE,
force,
"hosts",
)?;
println!(
"Configuration templates written to {}",
target_dir.display()
);
Ok(())
}
fn write_template(path: PathBuf, contents: &str, force: bool, name: &str) -> Result<()> {
if path.exists() && !force {
return Err(anyhow!(
"{} template already exists at {} (use --force to overwrite)",
name,
path.display()
));
}
fs::write(&path, contents)
.with_context(|| format!("failed to write {} template to {}", name, path.display()))?;
Ok(())
}
const DASHBOARD_TEMPLATE: &str = r#"# CM Dashboard configuration
[hosts]
# default_host = "srv01"
[[hosts.hosts]]
name = "srv01"
enabled = true
# metadata = { rack = "R1" }
[[hosts.hosts]]
name = "labbox"
enabled = true
[dashboard]
tick_rate_ms = 250
history_duration_minutes = 60
[[dashboard.widgets]]
id = "storage"
enabled = true
[[dashboard.widgets]]
id = "services"
enabled = true
[[dashboard.widgets]]
id = "backup"
enabled = true
[[dashboard.widgets]]
id = "alerts"
enabled = true
[filesystem]
# cache_dir = "/var/lib/cm-dashboard/cache"
# history_dir = "/var/lib/cm-dashboard/history"
"#;
const HOSTS_TEMPLATE: &str = r#"# Optional separate hosts configuration
[hosts]
# default_host = "srv01"
[[hosts.hosts]]
name = "srv01"
enabled = true
[[hosts.hosts]]
name = "labbox"
enabled = true
"#;