Restructure into workspace with dashboard and agent
This commit is contained in:
477
dashboard/src/main.rs
Normal file
477
dashboard/src/main.rs
Normal file
@@ -0,0 +1,477 @@
|
||||
mod app;
|
||||
mod config;
|
||||
mod data;
|
||||
mod ui;
|
||||
|
||||
use std::fs;
|
||||
use std::io::{self, Stdout};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::OnceLock;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::data::metrics::{BackupMetrics, ServiceMetrics, SmartMetrics};
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use chrono::{TimeZone, Utc};
|
||||
use clap::{ArgAction, Parser, Subcommand};
|
||||
use cm_dashboard_shared::envelope::{AgentType, MetricsEnvelope};
|
||||
use crossterm::event::{self, Event};
|
||||
use crossterm::terminal::{disable_raw_mode, enable_raw_mode};
|
||||
use crossterm::{execute, terminal};
|
||||
use ratatui::backend::CrosstermBackend;
|
||||
use ratatui::Terminal;
|
||||
use serde_json::Value;
|
||||
use tokio::sync::mpsc::{
|
||||
error::TryRecvError, unbounded_channel, UnboundedReceiver, UnboundedSender,
|
||||
};
|
||||
use tokio::task::{spawn_blocking, JoinHandle};
|
||||
use tracing::{debug, warn};
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
use zmq::{Context as NativeZmqContext, Message as NativeZmqMessage};
|
||||
|
||||
use crate::app::{App, AppEvent, AppOptions, ZmqContext};
|
||||
|
||||
static LOG_GUARD: OnceLock<WorkerGuard> = OnceLock::new();
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(
|
||||
name = "cm-dashboard",
|
||||
version,
|
||||
about = "Infrastructure monitoring TUI for CMTEC"
|
||||
)]
|
||||
struct Cli {
|
||||
#[command(subcommand)]
|
||||
command: Option<Command>,
|
||||
/// Optional path to configuration TOML file
|
||||
#[arg(long, value_name = "FILE")]
|
||||
config: Option<PathBuf>,
|
||||
|
||||
/// Limit dashboard to a single host
|
||||
#[arg(short = 'H', long, value_name = "HOST")]
|
||||
host: Option<String>,
|
||||
|
||||
/// Interval (ms) to refresh dashboard when idle
|
||||
#[arg(long, default_value_t = 250)]
|
||||
tick_rate: u64,
|
||||
|
||||
/// Increase logging verbosity (-v, -vv)
|
||||
#[arg(short, long, action = ArgAction::Count)]
|
||||
verbose: u8,
|
||||
|
||||
/// Override ZMQ endpoints (comma-separated)
|
||||
#[arg(long, value_delimiter = ',', value_name = "ENDPOINT")]
|
||||
zmq_endpoint: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Subcommand, Debug)]
|
||||
enum Command {
|
||||
/// Generate default configuration files
|
||||
InitConfig {
|
||||
#[arg(long, value_name = "DIR", default_value = "config")]
|
||||
dir: PathBuf,
|
||||
/// Overwrite existing files if they already exist
|
||||
#[arg(long, action = ArgAction::SetTrue)]
|
||||
force: bool,
|
||||
},
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let cli = Cli::parse();
|
||||
|
||||
if let Some(Command::InitConfig { dir, force }) = cli.command.as_ref() {
|
||||
init_tracing(cli.verbose)?;
|
||||
generate_config_templates(dir, *force)?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
ensure_default_config(&cli)?;
|
||||
|
||||
let options = AppOptions {
|
||||
config: cli.config,
|
||||
host: cli.host,
|
||||
tick_rate: Duration::from_millis(cli.tick_rate.max(16)),
|
||||
verbosity: cli.verbose,
|
||||
zmq_endpoints_override: cli.zmq_endpoint,
|
||||
};
|
||||
|
||||
init_tracing(options.verbosity)?;
|
||||
|
||||
let mut app = App::new(options)?;
|
||||
let (event_tx, mut event_rx) = unbounded_channel();
|
||||
|
||||
let zmq_task = if let Some(context) = app.zmq_context() {
|
||||
Some(spawn_metrics_task(context, event_tx.clone()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let mut terminal = setup_terminal()?;
|
||||
let result = run_app(&mut terminal, &mut app, &mut event_rx);
|
||||
teardown_terminal(terminal)?;
|
||||
let _ = event_tx.send(AppEvent::Shutdown);
|
||||
if let Some(handle) = zmq_task {
|
||||
handle.abort();
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
fn setup_terminal() -> Result<Terminal<CrosstermBackend<Stdout>>> {
|
||||
enable_raw_mode()?;
|
||||
let mut stdout = io::stdout();
|
||||
execute!(stdout, terminal::EnterAlternateScreen)?;
|
||||
let backend = CrosstermBackend::new(stdout);
|
||||
let terminal = Terminal::new(backend)?;
|
||||
Ok(terminal)
|
||||
}
|
||||
|
||||
fn teardown_terminal(mut terminal: Terminal<CrosstermBackend<Stdout>>) -> Result<()> {
|
||||
disable_raw_mode()?;
|
||||
execute!(terminal.backend_mut(), terminal::LeaveAlternateScreen)?;
|
||||
terminal.show_cursor()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn run_app(
|
||||
terminal: &mut Terminal<CrosstermBackend<Stdout>>,
|
||||
app: &mut App,
|
||||
event_rx: &mut UnboundedReceiver<AppEvent>,
|
||||
) -> Result<()> {
|
||||
let tick_rate = app.tick_rate();
|
||||
|
||||
while !app.should_quit() {
|
||||
drain_app_events(app, event_rx);
|
||||
terminal.draw(|frame| ui::render(frame, app))?;
|
||||
|
||||
if event::poll(tick_rate)? {
|
||||
if let Event::Key(key) = event::read()? {
|
||||
app.handle_key_event(key);
|
||||
}
|
||||
} else {
|
||||
app.on_tick();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn drain_app_events(app: &mut App, receiver: &mut UnboundedReceiver<AppEvent>) {
|
||||
loop {
|
||||
match receiver.try_recv() {
|
||||
Ok(event) => app.handle_app_event(event),
|
||||
Err(TryRecvError::Empty) => break,
|
||||
Err(TryRecvError::Disconnected) => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn init_tracing(verbosity: u8) -> Result<()> {
|
||||
let level = match verbosity {
|
||||
0 => "warn",
|
||||
1 => "info",
|
||||
2 => "debug",
|
||||
_ => "trace",
|
||||
};
|
||||
|
||||
let env_filter = std::env::var("RUST_LOG")
|
||||
.ok()
|
||||
.and_then(|value| EnvFilter::try_new(value).ok())
|
||||
.unwrap_or_else(|| EnvFilter::new(level));
|
||||
|
||||
let writer = prepare_log_writer()?;
|
||||
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(env_filter)
|
||||
.with_target(false)
|
||||
.with_ansi(false)
|
||||
.with_writer(writer)
|
||||
.compact()
|
||||
.try_init()
|
||||
.map_err(|err| anyhow!(err))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn prepare_log_writer() -> Result<tracing_appender::non_blocking::NonBlocking> {
|
||||
let logs_dir = Path::new("logs");
|
||||
if !logs_dir.exists() {
|
||||
fs::create_dir_all(logs_dir).with_context(|| {
|
||||
format!("failed to create logs directory at {}", logs_dir.display())
|
||||
})?;
|
||||
}
|
||||
|
||||
let file_appender = tracing_appender::rolling::never(logs_dir, "cm-dashboard.log");
|
||||
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
|
||||
LOG_GUARD.get_or_init(|| guard);
|
||||
Ok(non_blocking)
|
||||
}
|
||||
|
||||
fn spawn_metrics_task(context: ZmqContext, sender: UnboundedSender<AppEvent>) -> JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
match spawn_blocking(move || metrics_blocking_loop(context, sender)).await {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(error)) => warn!(%error, "ZMQ metrics worker exited with error"),
|
||||
Err(join_error) => warn!(%join_error, "ZMQ metrics worker panicked"),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn metrics_blocking_loop(context: ZmqContext, sender: UnboundedSender<AppEvent>) -> Result<()> {
|
||||
let zmq_context = NativeZmqContext::new();
|
||||
let socket = zmq_context
|
||||
.socket(zmq::SUB)
|
||||
.context("failed to create ZMQ SUB socket")?;
|
||||
|
||||
for endpoint in context.endpoints() {
|
||||
debug!(%endpoint, "connecting to ZMQ endpoint");
|
||||
socket
|
||||
.connect(endpoint)
|
||||
.with_context(|| format!("failed to connect to {endpoint}"))?;
|
||||
}
|
||||
|
||||
if let Some(prefix) = context.subscription() {
|
||||
socket
|
||||
.set_subscribe(prefix.as_bytes())
|
||||
.context("failed to set ZMQ subscription")?;
|
||||
} else {
|
||||
socket
|
||||
.set_subscribe(b"")
|
||||
.context("failed to subscribe to all ZMQ topics")?;
|
||||
}
|
||||
|
||||
loop {
|
||||
match socket.recv_msg(0) {
|
||||
Ok(message) => {
|
||||
if let Err(error) = handle_zmq_message(&message, &sender) {
|
||||
warn!(%error, "failed to handle ZMQ message");
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
warn!(%error, "ZMQ receive error");
|
||||
std::thread::sleep(Duration::from_secs(1));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_zmq_message(
|
||||
message: &NativeZmqMessage,
|
||||
sender: &UnboundedSender<AppEvent>,
|
||||
) -> Result<()> {
|
||||
let bytes = message.to_vec();
|
||||
|
||||
let envelope: MetricsEnvelope =
|
||||
serde_json::from_slice(&bytes).with_context(|| "failed to deserialize metrics envelope")?;
|
||||
let timestamp = Utc
|
||||
.timestamp_opt(envelope.timestamp as i64, 0)
|
||||
.single()
|
||||
.unwrap_or_else(|| Utc::now());
|
||||
|
||||
let host = envelope.hostname.clone();
|
||||
|
||||
let mut payload = envelope.metrics;
|
||||
if let Some(obj) = payload.as_object_mut() {
|
||||
obj.entry("timestamp")
|
||||
.or_insert_with(|| Value::String(timestamp.to_rfc3339()));
|
||||
}
|
||||
|
||||
match envelope.agent_type {
|
||||
AgentType::Smart => match serde_json::from_value::<SmartMetrics>(payload.clone()) {
|
||||
Ok(metrics) => {
|
||||
let _ = sender.send(AppEvent::MetricsUpdated {
|
||||
host,
|
||||
smart: Some(metrics),
|
||||
services: None,
|
||||
backup: None,
|
||||
timestamp,
|
||||
});
|
||||
}
|
||||
Err(error) => {
|
||||
warn!(%error, "failed to parse smart metrics");
|
||||
let _ = sender.send(AppEvent::MetricsFailed {
|
||||
host,
|
||||
error: format!("smart metrics parse error: {error:#}"),
|
||||
timestamp,
|
||||
});
|
||||
}
|
||||
},
|
||||
AgentType::Service => match serde_json::from_value::<ServiceMetrics>(payload.clone()) {
|
||||
Ok(metrics) => {
|
||||
let _ = sender.send(AppEvent::MetricsUpdated {
|
||||
host,
|
||||
smart: None,
|
||||
services: Some(metrics),
|
||||
backup: None,
|
||||
timestamp,
|
||||
});
|
||||
}
|
||||
Err(error) => {
|
||||
warn!(%error, "failed to parse service metrics");
|
||||
let _ = sender.send(AppEvent::MetricsFailed {
|
||||
host,
|
||||
error: format!("service metrics parse error: {error:#}"),
|
||||
timestamp,
|
||||
});
|
||||
}
|
||||
},
|
||||
AgentType::Backup => match serde_json::from_value::<BackupMetrics>(payload.clone()) {
|
||||
Ok(metrics) => {
|
||||
let _ = sender.send(AppEvent::MetricsUpdated {
|
||||
host,
|
||||
smart: None,
|
||||
services: None,
|
||||
backup: Some(metrics),
|
||||
timestamp,
|
||||
});
|
||||
}
|
||||
Err(error) => {
|
||||
warn!(%error, "failed to parse backup metrics");
|
||||
let _ = sender.send(AppEvent::MetricsFailed {
|
||||
host,
|
||||
error: format!("backup metrics parse error: {error:#}"),
|
||||
timestamp,
|
||||
});
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ensure_default_config(cli: &Cli) -> Result<()> {
|
||||
if let Some(path) = cli.config.as_ref() {
|
||||
ensure_config_at(path, false)?;
|
||||
} else {
|
||||
let default_path = Path::new("config/dashboard.toml");
|
||||
if !default_path.exists() {
|
||||
generate_config_templates(Path::new("config"), false)?;
|
||||
println!("Created default configuration in ./config");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ensure_config_at(path: &Path, force: bool) -> Result<()> {
|
||||
if path.exists() && !force {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Some(parent) = path.parent() {
|
||||
if !parent.exists() {
|
||||
fs::create_dir_all(parent)
|
||||
.with_context(|| format!("failed to create directory {}", parent.display()))?;
|
||||
}
|
||||
|
||||
write_template(path.to_path_buf(), DASHBOARD_TEMPLATE, force, "dashboard")?;
|
||||
|
||||
let hosts_path = parent.join("hosts.toml");
|
||||
if !hosts_path.exists() || force {
|
||||
write_template(hosts_path, HOSTS_TEMPLATE, force, "hosts")?;
|
||||
}
|
||||
println!(
|
||||
"Created configuration templates in {} (dashboard: {})",
|
||||
parent.display(),
|
||||
path.display()
|
||||
);
|
||||
} else {
|
||||
return Err(anyhow!("invalid configuration path {}", path.display()));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn generate_config_templates(target_dir: &Path, force: bool) -> Result<()> {
|
||||
if !target_dir.exists() {
|
||||
fs::create_dir_all(target_dir)
|
||||
.with_context(|| format!("failed to create directory {}", target_dir.display()))?;
|
||||
}
|
||||
|
||||
write_template(
|
||||
target_dir.join("dashboard.toml"),
|
||||
DASHBOARD_TEMPLATE,
|
||||
force,
|
||||
"dashboard",
|
||||
)?;
|
||||
write_template(
|
||||
target_dir.join("hosts.toml"),
|
||||
HOSTS_TEMPLATE,
|
||||
force,
|
||||
"hosts",
|
||||
)?;
|
||||
|
||||
println!(
|
||||
"Configuration templates written to {}",
|
||||
target_dir.display()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_template(path: PathBuf, contents: &str, force: bool, name: &str) -> Result<()> {
|
||||
if path.exists() && !force {
|
||||
return Err(anyhow!(
|
||||
"{} template already exists at {} (use --force to overwrite)",
|
||||
name,
|
||||
path.display()
|
||||
));
|
||||
}
|
||||
|
||||
fs::write(&path, contents)
|
||||
.with_context(|| format!("failed to write {} template to {}", name, path.display()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
const DASHBOARD_TEMPLATE: &str = r#"# CM Dashboard configuration
|
||||
|
||||
[hosts]
|
||||
# default_host = "srv01"
|
||||
|
||||
[[hosts.hosts]]
|
||||
name = "srv01"
|
||||
enabled = true
|
||||
# metadata = { rack = "R1" }
|
||||
|
||||
[[hosts.hosts]]
|
||||
name = "labbox"
|
||||
enabled = true
|
||||
|
||||
[dashboard]
|
||||
tick_rate_ms = 250
|
||||
history_duration_minutes = 60
|
||||
|
||||
[[dashboard.widgets]]
|
||||
id = "nvme"
|
||||
enabled = true
|
||||
|
||||
[[dashboard.widgets]]
|
||||
id = "services"
|
||||
enabled = true
|
||||
|
||||
[[dashboard.widgets]]
|
||||
id = "backup"
|
||||
enabled = true
|
||||
|
||||
[[dashboard.widgets]]
|
||||
id = "alerts"
|
||||
enabled = true
|
||||
|
||||
[filesystem]
|
||||
# cache_dir = "/var/lib/cm-dashboard/cache"
|
||||
# history_dir = "/var/lib/cm-dashboard/history"
|
||||
"#;
|
||||
|
||||
const HOSTS_TEMPLATE: &str = r#"# Optional separate hosts configuration
|
||||
|
||||
[hosts]
|
||||
# default_host = "srv01"
|
||||
|
||||
[[hosts.hosts]]
|
||||
name = "srv01"
|
||||
enabled = true
|
||||
|
||||
[[hosts.hosts]]
|
||||
name = "labbox"
|
||||
enabled = true
|
||||
"#;
|
||||
Reference in New Issue
Block a user