Compare commits
15 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| de252d27b9 | |||
| db0e41a7d3 | |||
| ec460496d8 | |||
| 33e700529e | |||
| d644b7d40a | |||
| f635ba9c75 | |||
| 76b6e3373e | |||
| 0a13cab897 | |||
| d33ec5d225 | |||
| d31c2384df | |||
| c8db463204 | |||
| e8e50ef9bb | |||
| 0faed9309e | |||
| c980346d05 | |||
| 3e3d3f0c2b |
@@ -113,13 +113,13 @@ jobs:
|
||||
NIX_HASH="sha256-$(python3 -c "import base64, binascii; print(base64.b64encode(binascii.unhexlify('$NEW_HASH')).decode())")"
|
||||
|
||||
# Update the NixOS configuration
|
||||
sed -i "s|version = \"v[^\"]*\"|version = \"$VERSION\"|" hosts/common/cm-dashboard.nix
|
||||
sed -i "s|sha256 = \"sha256-[^\"]*\"|sha256 = \"$NIX_HASH\"|" hosts/common/cm-dashboard.nix
|
||||
sed -i "s|version = \"v[^\"]*\"|version = \"$VERSION\"|" hosts/services/cm-dashboard.nix
|
||||
sed -i "s|sha256 = \"sha256-[^\"]*\"|sha256 = \"$NIX_HASH\"|" hosts/services/cm-dashboard.nix
|
||||
|
||||
# Commit and push changes
|
||||
git config user.name "Gitea Actions"
|
||||
git config user.email "actions@gitea.cmtec.se"
|
||||
git add hosts/common/cm-dashboard.nix
|
||||
git add hosts/services/cm-dashboard.nix
|
||||
git commit -m "Auto-update cm-dashboard to $VERSION
|
||||
|
||||
- Update version to $VERSION with automated release
|
||||
|
||||
@@ -49,8 +49,12 @@ hostname2 = [
|
||||
### Navigation
|
||||
- **Tab**: Switch between hosts
|
||||
- **↑↓ or j/k**: Select services
|
||||
- **s**: Start selected service (UserStart)
|
||||
- **S**: Stop selected service (UserStop)
|
||||
- **J**: Show service logs (journalctl)
|
||||
- **L**: Show custom log files
|
||||
- **R**: Rebuild current host
|
||||
- **B**: Run backup on current host
|
||||
- **q**: Quit dashboard
|
||||
|
||||
## Core Architecture Principles
|
||||
@@ -115,7 +119,7 @@ This automatically:
|
||||
- Uploads binaries via Gitea API
|
||||
|
||||
### NixOS Configuration Updates
|
||||
Edit `~/projects/nixosbox/hosts/common/cm-dashboard.nix`:
|
||||
Edit `~/projects/nixosbox/hosts/services/cm-dashboard.nix`:
|
||||
|
||||
```nix
|
||||
version = "v0.1.X";
|
||||
|
||||
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -270,7 +270,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
|
||||
|
||||
[[package]]
|
||||
name = "cm-dashboard"
|
||||
version = "0.1.59"
|
||||
version = "0.1.73"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
@@ -292,7 +292,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cm-dashboard-agent"
|
||||
version = "0.1.59"
|
||||
version = "0.1.73"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
@@ -315,7 +315,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cm-dashboard-shared"
|
||||
version = "0.1.59"
|
||||
version = "0.1.73"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"serde",
|
||||
|
||||
@@ -88,7 +88,9 @@ cm-dashboard • ● cmbox ● srv01 ● srv02 ● steambox
|
||||
- **s**: Start selected service (UserStart)
|
||||
- **S**: Stop selected service (UserStop)
|
||||
- **J**: Show service logs (journalctl in tmux popup)
|
||||
- **L**: Show custom log files (tail -f custom paths in tmux popup)
|
||||
- **R**: Rebuild current host
|
||||
- **B**: Run backup on current host
|
||||
- **q**: Quit
|
||||
|
||||
### Status Indicators
|
||||
@@ -173,9 +175,10 @@ subscriber_ports = [6130]
|
||||
[hosts]
|
||||
predefined_hosts = ["cmbox", "srv01", "srv02"]
|
||||
|
||||
[ui]
|
||||
ssh_user = "cm"
|
||||
[ssh]
|
||||
rebuild_user = "cm"
|
||||
rebuild_alias = "nixos-rebuild-cmtec"
|
||||
backup_alias = "cm-backup-run"
|
||||
```
|
||||
|
||||
## Technical Implementation
|
||||
@@ -329,7 +332,7 @@ This triggers automated:
|
||||
- Tarball upload to Gitea
|
||||
|
||||
### NixOS Integration
|
||||
Update `~/projects/nixosbox/hosts/common/cm-dashboard.nix`:
|
||||
Update `~/projects/nixosbox/hosts/services/cm-dashboard.nix`:
|
||||
|
||||
```nix
|
||||
version = "v0.1.43";
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "cm-dashboard-agent"
|
||||
version = "0.1.60"
|
||||
version = "0.1.74"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::time::Duration;
|
||||
use tokio::time::interval;
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
use crate::communication::{AgentCommand, ServiceAction, ZmqHandler};
|
||||
use crate::communication::{AgentCommand, ZmqHandler};
|
||||
use crate::config::AgentConfig;
|
||||
use crate::metrics::MetricCollectionManager;
|
||||
use crate::notifications::NotificationManager;
|
||||
@@ -78,10 +78,11 @@ impl Agent {
|
||||
info!("Initial metric collection completed - all data cached and ready");
|
||||
}
|
||||
|
||||
// Separate intervals for collection, transmission, and email notifications
|
||||
// Separate intervals for collection, transmission, heartbeat, and email notifications
|
||||
let mut collection_interval =
|
||||
interval(Duration::from_secs(self.config.collection_interval_seconds));
|
||||
let mut transmission_interval = interval(Duration::from_secs(self.config.zmq.transmission_interval_seconds));
|
||||
let mut heartbeat_interval = interval(Duration::from_secs(self.config.zmq.heartbeat_interval_seconds));
|
||||
let mut notification_interval = interval(Duration::from_secs(self.config.notifications.aggregation_interval_seconds));
|
||||
|
||||
loop {
|
||||
@@ -98,6 +99,12 @@ impl Agent {
|
||||
error!("Failed to broadcast metrics: {}", e);
|
||||
}
|
||||
}
|
||||
_ = heartbeat_interval.tick() => {
|
||||
// Send standalone heartbeat for host connectivity detection
|
||||
if let Err(e) = self.send_heartbeat().await {
|
||||
error!("Failed to send heartbeat: {}", e);
|
||||
}
|
||||
}
|
||||
_ = notification_interval.tick() => {
|
||||
// Process batched email notifications (separate from dashboard updates)
|
||||
if let Err(e) = self.host_status_manager.process_pending_notifications(&mut self.notification_manager).await {
|
||||
@@ -206,7 +213,7 @@ impl Agent {
|
||||
let mut status_changed = false;
|
||||
for metric in metrics {
|
||||
// Filter excluded metrics from email notification processing only
|
||||
if self.config.exclude_email_metrics.contains(&metric.name) {
|
||||
if self.config.notifications.exclude_email_metrics.contains(&metric.name) {
|
||||
debug!("Excluding metric '{}' from email notification processing", metric.name);
|
||||
continue;
|
||||
}
|
||||
@@ -252,6 +259,19 @@ impl Agent {
|
||||
)
|
||||
}
|
||||
|
||||
/// Send standalone heartbeat for connectivity detection
|
||||
async fn send_heartbeat(&mut self) -> Result<()> {
|
||||
let heartbeat_metric = self.get_heartbeat_metric();
|
||||
let message = MetricMessage::new(
|
||||
self.hostname.clone(),
|
||||
vec![heartbeat_metric],
|
||||
);
|
||||
|
||||
self.zmq_handler.publish_metrics(&message).await?;
|
||||
debug!("Sent standalone heartbeat for connectivity detection");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_commands(&mut self) -> Result<()> {
|
||||
// Try to receive commands (non-blocking)
|
||||
match self.zmq_handler.try_receive_command() {
|
||||
@@ -295,75 +315,10 @@ impl Agent {
|
||||
info!("Processing Ping command - agent is alive");
|
||||
// Could send a response back via ZMQ if needed
|
||||
}
|
||||
AgentCommand::ServiceControl { service_name, action } => {
|
||||
info!("Processing ServiceControl command: {} {:?}", service_name, action);
|
||||
if let Err(e) = self.handle_service_control(&service_name, &action).await {
|
||||
error!("Failed to execute service control: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle systemd service control commands
|
||||
async fn handle_service_control(&mut self, service_name: &str, action: &ServiceAction) -> Result<()> {
|
||||
let (action_str, is_user_action) = match action {
|
||||
ServiceAction::Start => ("start", false),
|
||||
ServiceAction::Stop => ("stop", false),
|
||||
ServiceAction::Status => ("status", false),
|
||||
ServiceAction::UserStart => ("start", true),
|
||||
ServiceAction::UserStop => ("stop", true),
|
||||
};
|
||||
|
||||
info!("Executing systemctl {} {} (user action: {})", action_str, service_name, is_user_action);
|
||||
|
||||
// Handle user-stopped service tracking before systemctl execution (stop only)
|
||||
match action {
|
||||
ServiceAction::UserStop => {
|
||||
info!("Marking service '{}' as user-stopped", service_name);
|
||||
if let Err(e) = self.service_tracker.mark_user_stopped(service_name) {
|
||||
error!("Failed to mark service as user-stopped: {}", e);
|
||||
} else {
|
||||
// Sync to global tracker
|
||||
UserStoppedServiceTracker::update_global(&self.service_tracker);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let output = tokio::process::Command::new("sudo")
|
||||
.arg("systemctl")
|
||||
.arg(action_str)
|
||||
.arg(format!("{}.service", service_name))
|
||||
.output()
|
||||
.await?;
|
||||
|
||||
if output.status.success() {
|
||||
info!("Service {} {} completed successfully", service_name, action_str);
|
||||
if !output.stdout.is_empty() {
|
||||
debug!("stdout: {}", String::from_utf8_lossy(&output.stdout));
|
||||
}
|
||||
|
||||
// Note: User-stopped flag will be cleared by systemd collector
|
||||
// when service actually reaches 'active' state, not here
|
||||
} else {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
error!("Service {} {} failed: {}", service_name, action_str, stderr);
|
||||
return Err(anyhow::anyhow!("systemctl {} {} failed: {}", action_str, service_name, stderr));
|
||||
}
|
||||
|
||||
// Force refresh metrics after service control to update service status
|
||||
if matches!(action, ServiceAction::Start | ServiceAction::Stop | ServiceAction::UserStart | ServiceAction::UserStop) {
|
||||
info!("Triggering immediate metric refresh after service control");
|
||||
if let Err(e) = self.collect_metrics_only().await {
|
||||
error!("Failed to refresh metrics after service control: {}", e);
|
||||
} else {
|
||||
info!("Service status refreshed immediately after {} {}", action_str, service_name);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check metrics for user-stopped services that are now active and clear their flags
|
||||
fn clear_user_stopped_flags_for_active_services(&mut self, metrics: &[Metric]) {
|
||||
|
||||
@@ -98,19 +98,4 @@ pub enum AgentCommand {
|
||||
ToggleCollector { name: String, enabled: bool },
|
||||
/// Request status/health check
|
||||
Ping,
|
||||
/// Control systemd service
|
||||
ServiceControl {
|
||||
service_name: String,
|
||||
action: ServiceAction,
|
||||
},
|
||||
}
|
||||
|
||||
/// Service control actions
|
||||
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
||||
pub enum ServiceAction {
|
||||
Start,
|
||||
Stop,
|
||||
Status,
|
||||
UserStart, // User-initiated start (clears user-stopped flag)
|
||||
UserStop, // User-initiated stop (marks as user-stopped)
|
||||
}
|
||||
|
||||
@@ -17,9 +17,6 @@ pub struct AgentConfig {
|
||||
pub notifications: NotificationConfig,
|
||||
pub status_aggregation: HostStatusConfig,
|
||||
pub collection_interval_seconds: u64,
|
||||
/// List of metric names to exclude from email notifications
|
||||
#[serde(default)]
|
||||
pub exclude_email_metrics: Vec<String>,
|
||||
}
|
||||
|
||||
/// ZMQ communication configuration
|
||||
@@ -29,6 +26,9 @@ pub struct ZmqConfig {
|
||||
pub command_port: u16,
|
||||
pub bind_address: String,
|
||||
pub transmission_interval_seconds: u64,
|
||||
/// Heartbeat transmission interval in seconds for host connectivity detection
|
||||
#[serde(default = "default_heartbeat_interval_seconds")]
|
||||
pub heartbeat_interval_seconds: u64,
|
||||
}
|
||||
|
||||
/// Collector configuration
|
||||
@@ -147,9 +147,23 @@ pub struct NotificationConfig {
|
||||
pub rate_limit_minutes: u64,
|
||||
/// Email notification batching interval in seconds (default: 60)
|
||||
pub aggregation_interval_seconds: u64,
|
||||
/// List of metric names to exclude from email notifications
|
||||
#[serde(default)]
|
||||
pub exclude_email_metrics: Vec<String>,
|
||||
/// Path to maintenance mode file that suppresses email notifications when present
|
||||
#[serde(default = "default_maintenance_mode_file")]
|
||||
pub maintenance_mode_file: String,
|
||||
}
|
||||
|
||||
|
||||
fn default_heartbeat_interval_seconds() -> u64 {
|
||||
5
|
||||
}
|
||||
|
||||
fn default_maintenance_mode_file() -> String {
|
||||
"/tmp/cm-maintenance".to_string()
|
||||
}
|
||||
|
||||
impl AgentConfig {
|
||||
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
|
||||
loader::load_config(path)
|
||||
|
||||
@@ -59,6 +59,6 @@ impl NotificationManager {
|
||||
}
|
||||
|
||||
fn is_maintenance_mode(&self) -> bool {
|
||||
std::fs::metadata("/tmp/cm-maintenance").is_ok()
|
||||
std::fs::metadata(&self.config.maintenance_mode_file).is_ok()
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "cm-dashboard"
|
||||
version = "0.1.60"
|
||||
version = "0.1.74"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -9,14 +9,13 @@ use std::io;
|
||||
use std::time::{Duration, Instant};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::communication::{AgentCommand, ServiceAction, ZmqCommandSender, ZmqConsumer};
|
||||
use crate::communication::{ZmqConsumer};
|
||||
use crate::config::DashboardConfig;
|
||||
use crate::metrics::MetricStore;
|
||||
use crate::ui::{TuiApp, UiCommand};
|
||||
|
||||
pub struct Dashboard {
|
||||
zmq_consumer: ZmqConsumer,
|
||||
zmq_command_sender: ZmqCommandSender,
|
||||
metric_store: MetricStore,
|
||||
tui_app: Option<TuiApp>,
|
||||
terminal: Option<Terminal<CrosstermBackend<io::Stdout>>>,
|
||||
@@ -58,20 +57,9 @@ impl Dashboard {
|
||||
}
|
||||
};
|
||||
|
||||
// Initialize ZMQ command sender
|
||||
let zmq_command_sender = match ZmqCommandSender::new(&config.zmq) {
|
||||
Ok(sender) => sender,
|
||||
Err(e) => {
|
||||
error!("Failed to initialize ZMQ command sender: {}", e);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
// Connect to configured hosts from configuration
|
||||
let hosts: Vec<String> = config.hosts.keys().cloned().collect();
|
||||
|
||||
// Try to connect to hosts but don't fail if none are available
|
||||
match zmq_consumer.connect_to_predefined_hosts(&hosts).await {
|
||||
match zmq_consumer.connect_to_predefined_hosts(&config.hosts).await {
|
||||
Ok(_) => info!("Successfully connected to ZMQ hosts"),
|
||||
Err(e) => {
|
||||
warn!(
|
||||
@@ -127,7 +115,6 @@ impl Dashboard {
|
||||
|
||||
Ok(Self {
|
||||
zmq_consumer,
|
||||
zmq_command_sender,
|
||||
metric_store,
|
||||
tui_app,
|
||||
terminal,
|
||||
@@ -137,18 +124,14 @@ impl Dashboard {
|
||||
})
|
||||
}
|
||||
|
||||
/// Send a command to a specific agent
|
||||
pub async fn send_command(&mut self, hostname: &str, command: AgentCommand) -> Result<()> {
|
||||
self.zmq_command_sender
|
||||
.send_command(hostname, command)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) -> Result<()> {
|
||||
info!("Starting dashboard main loop");
|
||||
|
||||
let mut last_metrics_check = Instant::now();
|
||||
let metrics_check_interval = Duration::from_millis(100); // Check for metrics every 100ms
|
||||
let mut last_heartbeat_check = Instant::now();
|
||||
let heartbeat_check_interval = Duration::from_secs(1); // Check for host connectivity every 1 second
|
||||
|
||||
loop {
|
||||
// Handle terminal events (keyboard input) only if not headless
|
||||
@@ -213,34 +196,18 @@ impl Dashboard {
|
||||
metric_message.metrics.len()
|
||||
);
|
||||
|
||||
// Check if this is the first time we've seen this host
|
||||
// Track first contact with host (no command needed - agent sends data every 2s)
|
||||
let is_new_host = !self
|
||||
.initial_commands_sent
|
||||
.contains(&metric_message.hostname);
|
||||
|
||||
if is_new_host {
|
||||
info!(
|
||||
"First contact with host {}, sending initial CollectNow command",
|
||||
"First contact with host {} - data will update automatically",
|
||||
metric_message.hostname
|
||||
);
|
||||
|
||||
// Send CollectNow command for immediate refresh
|
||||
if let Err(e) = self
|
||||
.send_command(&metric_message.hostname, AgentCommand::CollectNow)
|
||||
.await
|
||||
{
|
||||
error!(
|
||||
"Failed to send initial CollectNow command to {}: {}",
|
||||
metric_message.hostname, e
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
"✓ Sent initial CollectNow command to {}",
|
||||
metric_message.hostname
|
||||
);
|
||||
self.initial_commands_sent
|
||||
.insert(metric_message.hostname.clone());
|
||||
}
|
||||
self.initial_commands_sent
|
||||
.insert(metric_message.hostname.clone());
|
||||
}
|
||||
|
||||
// Update metric store
|
||||
@@ -254,14 +221,8 @@ impl Dashboard {
|
||||
}
|
||||
}
|
||||
|
||||
// Update TUI with new hosts and metrics (only if not headless)
|
||||
// Update TUI with new metrics (only if not headless)
|
||||
if let Some(ref mut tui_app) = self.tui_app {
|
||||
let connected_hosts = self
|
||||
.metric_store
|
||||
.get_connected_hosts(Duration::from_secs(self.config.zmq.heartbeat_timeout_seconds));
|
||||
|
||||
|
||||
tui_app.update_hosts(connected_hosts);
|
||||
tui_app.update_metrics(&self.metric_store);
|
||||
}
|
||||
}
|
||||
@@ -280,6 +241,20 @@ impl Dashboard {
|
||||
last_metrics_check = Instant::now();
|
||||
}
|
||||
|
||||
// Check for host connectivity changes (heartbeat timeouts) periodically
|
||||
if last_heartbeat_check.elapsed() >= heartbeat_check_interval {
|
||||
let timeout = Duration::from_secs(self.config.zmq.heartbeat_timeout_seconds);
|
||||
|
||||
// Clean up metrics for offline hosts
|
||||
self.metric_store.cleanup_offline_hosts(timeout);
|
||||
|
||||
if let Some(ref mut tui_app) = self.tui_app {
|
||||
let connected_hosts = self.metric_store.get_connected_hosts(timeout);
|
||||
tui_app.update_hosts(connected_hosts);
|
||||
}
|
||||
last_heartbeat_check = Instant::now();
|
||||
}
|
||||
|
||||
// Render TUI (only if not headless)
|
||||
if !self.headless {
|
||||
if let Some(ref mut terminal) = self.terminal {
|
||||
@@ -305,22 +280,6 @@ impl Dashboard {
|
||||
/// Execute a UI command by sending it to the appropriate agent
|
||||
async fn execute_ui_command(&self, command: UiCommand) -> Result<()> {
|
||||
match command {
|
||||
UiCommand::ServiceStart { hostname, service_name } => {
|
||||
info!("Sending user start command for service {} on {}", service_name, hostname);
|
||||
let agent_command = AgentCommand::ServiceControl {
|
||||
service_name: service_name.clone(),
|
||||
action: ServiceAction::UserStart,
|
||||
};
|
||||
self.zmq_command_sender.send_command(&hostname, agent_command).await?;
|
||||
}
|
||||
UiCommand::ServiceStop { hostname, service_name } => {
|
||||
info!("Sending user stop command for service {} on {}", service_name, hostname);
|
||||
let agent_command = AgentCommand::ServiceControl {
|
||||
service_name: service_name.clone(),
|
||||
action: ServiceAction::UserStop,
|
||||
};
|
||||
self.zmq_command_sender.send_command(&hostname, agent_command).await?;
|
||||
}
|
||||
UiCommand::TriggerBackup { hostname } => {
|
||||
info!("Trigger backup requested for {}", hostname);
|
||||
// TODO: Implement backup trigger command
|
||||
|
||||
@@ -5,40 +5,6 @@ use zmq::{Context, Socket, SocketType};
|
||||
|
||||
use crate::config::ZmqConfig;
|
||||
|
||||
/// Commands that can be sent to agents
|
||||
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
||||
pub enum AgentCommand {
|
||||
/// Request immediate metric collection
|
||||
CollectNow,
|
||||
/// Change collection interval
|
||||
SetInterval { seconds: u64 },
|
||||
/// Enable/disable a collector
|
||||
ToggleCollector { name: String, enabled: bool },
|
||||
/// Request status/health check
|
||||
Ping,
|
||||
/// Control systemd service
|
||||
ServiceControl {
|
||||
service_name: String,
|
||||
action: ServiceAction,
|
||||
},
|
||||
/// Rebuild NixOS system
|
||||
SystemRebuild {
|
||||
git_url: String,
|
||||
git_branch: String,
|
||||
working_dir: String,
|
||||
api_key_file: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Service control actions
|
||||
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
||||
pub enum ServiceAction {
|
||||
Start,
|
||||
Stop,
|
||||
Status,
|
||||
UserStart, // User-initiated start (clears user-stopped flag)
|
||||
UserStop, // User-initiated stop (marks as user-stopped)
|
||||
}
|
||||
|
||||
/// ZMQ consumer for receiving metrics from agents
|
||||
pub struct ZmqConsumer {
|
||||
@@ -84,13 +50,14 @@ impl ZmqConsumer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Connect to predefined hosts
|
||||
pub async fn connect_to_predefined_hosts(&mut self, hosts: &[String]) -> Result<()> {
|
||||
|
||||
/// Connect to predefined hosts using their configuration
|
||||
pub async fn connect_to_predefined_hosts(&mut self, hosts: &std::collections::HashMap<String, crate::config::HostDetails>) -> Result<()> {
|
||||
let default_port = self.config.subscriber_ports[0];
|
||||
|
||||
for hostname in hosts {
|
||||
// Try to connect, but don't fail if some hosts are unreachable
|
||||
if let Err(e) = self.connect_to_host(hostname, default_port).await {
|
||||
for (hostname, host_details) in hosts {
|
||||
// Try to connect using configured IP, but don't fail if some hosts are unreachable
|
||||
if let Err(e) = self.connect_to_host_with_details(hostname, host_details, default_port).await {
|
||||
warn!("Could not connect to {}: {}", hostname, e);
|
||||
}
|
||||
}
|
||||
@@ -104,6 +71,15 @@ impl ZmqConsumer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Connect to a host using its configuration details
|
||||
pub async fn connect_to_host_with_details(&mut self, hostname: &str, host_details: &crate::config::HostDetails, port: u16) -> Result<()> {
|
||||
// Get primary connection IP only - no fallbacks
|
||||
let primary_ip = host_details.get_connection_ip(hostname);
|
||||
|
||||
// Connect directly without fallback attempts
|
||||
self.connect_to_host(&primary_ip, port).await
|
||||
}
|
||||
|
||||
/// Receive command output from any connected agent (non-blocking)
|
||||
pub async fn receive_command_output(&mut self) -> Result<Option<CommandOutputMessage>> {
|
||||
match self.subscriber.recv_bytes(zmq::DONTWAIT) {
|
||||
@@ -141,9 +117,9 @@ impl ZmqConsumer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Receive metrics from any connected agent (with timeout)
|
||||
/// Receive metrics from any connected agent (non-blocking)
|
||||
pub async fn receive_metrics(&mut self) -> Result<Option<MetricMessage>> {
|
||||
match self.subscriber.recv_bytes(0) {
|
||||
match self.subscriber.recv_bytes(zmq::DONTWAIT) {
|
||||
Ok(data) => {
|
||||
debug!("Received {} bytes from ZMQ", data.len());
|
||||
|
||||
@@ -192,42 +168,3 @@ impl ZmqConsumer {
|
||||
}
|
||||
}
|
||||
|
||||
/// ZMQ command sender for sending commands to agents
|
||||
pub struct ZmqCommandSender {
|
||||
context: Context,
|
||||
}
|
||||
|
||||
impl ZmqCommandSender {
|
||||
pub fn new(_config: &ZmqConfig) -> Result<Self> {
|
||||
let context = Context::new();
|
||||
|
||||
info!("ZMQ command sender initialized");
|
||||
|
||||
Ok(Self { context })
|
||||
}
|
||||
|
||||
/// Send a command to a specific agent
|
||||
pub async fn send_command(&self, hostname: &str, command: AgentCommand) -> Result<()> {
|
||||
// Create a new PUSH socket for this command (ZMQ best practice)
|
||||
let socket = self.context.socket(SocketType::PUSH)?;
|
||||
|
||||
// Set socket options
|
||||
socket.set_linger(1000)?; // Wait up to 1 second on close
|
||||
socket.set_sndtimeo(5000)?; // 5 second send timeout
|
||||
|
||||
// Connect to agent's command port (6131)
|
||||
let address = format!("tcp://{}:6131", hostname);
|
||||
socket.connect(&address)?;
|
||||
|
||||
// Serialize command
|
||||
let serialized = serde_json::to_vec(&command)?;
|
||||
|
||||
// Send command
|
||||
socket.send(&serialized, 0)?;
|
||||
|
||||
info!("Sent command {:?} to agent at {}", command, hostname);
|
||||
|
||||
// Socket will be automatically closed when dropped
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,6 +29,17 @@ fn default_heartbeat_timeout_seconds() -> u64 {
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct HostDetails {
|
||||
pub mac_address: Option<String>,
|
||||
/// Primary IP address (local network)
|
||||
pub ip: Option<String>,
|
||||
}
|
||||
|
||||
|
||||
impl HostDetails {
|
||||
/// Get the IP address for connection (uses ip field or hostname as fallback)
|
||||
pub fn get_connection_ip(&self, hostname: &str) -> String {
|
||||
self.ip.as_ref().unwrap_or(&hostname.to_string()).clone()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// System configuration
|
||||
@@ -40,11 +51,12 @@ pub struct SystemConfig {
|
||||
pub nixos_config_api_key_file: Option<String>,
|
||||
}
|
||||
|
||||
/// SSH configuration for rebuild operations
|
||||
/// SSH configuration for rebuild and backup operations
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SshConfig {
|
||||
pub rebuild_user: String,
|
||||
pub rebuild_alias: String,
|
||||
pub backup_alias: String,
|
||||
}
|
||||
|
||||
/// Service log file configuration per host
|
||||
|
||||
@@ -109,6 +109,28 @@ impl MetricStore {
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Clean up data for offline hosts
|
||||
pub fn cleanup_offline_hosts(&mut self, timeout: Duration) {
|
||||
let now = Instant::now();
|
||||
let mut hosts_to_cleanup = Vec::new();
|
||||
|
||||
// Find hosts that are offline (no recent heartbeat)
|
||||
for (hostname, &last_heartbeat) in &self.last_heartbeat {
|
||||
if now.duration_since(last_heartbeat) > timeout {
|
||||
hosts_to_cleanup.push(hostname.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// Clear metrics for offline hosts
|
||||
for hostname in hosts_to_cleanup {
|
||||
if let Some(metrics) = self.current_metrics.remove(&hostname) {
|
||||
info!("Cleared {} metrics for offline host: {}", metrics.len(), hostname);
|
||||
}
|
||||
// Keep heartbeat timestamp for reconnection detection
|
||||
// Don't remove from last_heartbeat to track when host was last seen
|
||||
}
|
||||
}
|
||||
|
||||
/// Cleanup old data and enforce limits
|
||||
fn cleanup_host_data(&mut self, hostname: &str) {
|
||||
let now = Instant::now();
|
||||
|
||||
@@ -23,8 +23,6 @@ use widgets::{BackupWidget, ServicesWidget, SystemWidget, Widget};
|
||||
/// Commands that can be triggered from the UI
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum UiCommand {
|
||||
ServiceStart { hostname: String, service_name: String },
|
||||
ServiceStop { hostname: String, service_name: String },
|
||||
TriggerBackup { hostname: String },
|
||||
}
|
||||
|
||||
@@ -251,12 +249,14 @@ impl TuiApp {
|
||||
KeyCode::Char('r') => {
|
||||
// System rebuild command - works on any panel for current host
|
||||
if let Some(hostname) = self.current_host.clone() {
|
||||
let connection_ip = self.get_connection_ip(&hostname);
|
||||
// Create command that shows logo, rebuilds, and waits for user input
|
||||
let logo_and_rebuild = format!(
|
||||
"bash -c 'cat << \"EOF\"\nNixOS System Rebuild\nTarget: {}\n\nEOF\nssh -tt {}@{} \"bash -ic {}\"\necho\necho \"========================================\"\necho \"Rebuild completed. Press any key to close...\"\necho \"========================================\"\nread -n 1 -s\nexit'",
|
||||
"bash -c 'cat << \"EOF\"\nNixOS System Rebuild\nTarget: {} ({})\n\nEOF\nssh -tt {}@{} \"bash -ic {}\"\necho\necho \"========================================\"\necho \"Rebuild completed. Press any key to close...\"\necho \"========================================\"\nread -n 1 -s\nexit'",
|
||||
hostname,
|
||||
connection_ip,
|
||||
self.config.ssh.rebuild_user,
|
||||
hostname,
|
||||
connection_ip,
|
||||
self.config.ssh.rebuild_alias
|
||||
);
|
||||
|
||||
@@ -270,29 +270,94 @@ impl TuiApp {
|
||||
.ok(); // Ignore errors, tmux will handle them
|
||||
}
|
||||
}
|
||||
KeyCode::Char('B') => {
|
||||
// Backup command - works on any panel for current host
|
||||
if let Some(hostname) = self.current_host.clone() {
|
||||
let connection_ip = self.get_connection_ip(&hostname);
|
||||
// Create command that shows logo, runs backup, and waits for user input
|
||||
let logo_and_backup = format!(
|
||||
"bash -c 'cat << \"EOF\"\nBackup Operation\nTarget: {} ({})\n\nEOF\nssh -tt {}@{} \"bash -ic {}\"\necho\necho \"========================================\"\necho \"Backup completed. Press any key to close...\"\necho \"========================================\"\nread -n 1 -s\nexit'",
|
||||
hostname,
|
||||
connection_ip,
|
||||
self.config.ssh.rebuild_user,
|
||||
connection_ip,
|
||||
self.config.ssh.backup_alias
|
||||
);
|
||||
|
||||
std::process::Command::new("tmux")
|
||||
.arg("split-window")
|
||||
.arg("-v")
|
||||
.arg("-p")
|
||||
.arg("30")
|
||||
.arg(&logo_and_backup)
|
||||
.spawn()
|
||||
.ok(); // Ignore errors, tmux will handle them
|
||||
}
|
||||
}
|
||||
KeyCode::Char('s') => {
|
||||
// Service start command
|
||||
// Service start command via SSH with progress display
|
||||
if let (Some(service_name), Some(hostname)) = (self.get_selected_service(), self.current_host.clone()) {
|
||||
if self.start_command(&hostname, CommandType::ServiceStart, service_name.clone()) {
|
||||
return Ok(Some(UiCommand::ServiceStart { hostname, service_name }));
|
||||
}
|
||||
// Start transition tracking for visual feedback
|
||||
self.start_command(&hostname, CommandType::ServiceStart, service_name.clone());
|
||||
|
||||
let connection_ip = self.get_connection_ip(&hostname);
|
||||
let service_start_command = format!(
|
||||
"bash -c 'cat << \"EOF\"\nService Start: {}.service\nTarget: {} ({})\n\nEOF\nssh -tt {}@{} \"sudo systemctl start {}.service && echo \\\"Service started successfully\\\" && sudo systemctl status {}.service --no-pager -l\"\necho\necho \"========================================\"\necho \"Operation completed. Press any key to close...\"\necho \"========================================\"\nread -n 1 -s\nexit'",
|
||||
service_name,
|
||||
hostname,
|
||||
connection_ip,
|
||||
self.config.ssh.rebuild_user,
|
||||
connection_ip,
|
||||
service_name,
|
||||
service_name
|
||||
);
|
||||
|
||||
std::process::Command::new("tmux")
|
||||
.arg("split-window")
|
||||
.arg("-v")
|
||||
.arg("-p")
|
||||
.arg("30")
|
||||
.arg(&service_start_command)
|
||||
.spawn()
|
||||
.ok(); // Ignore errors, tmux will handle them
|
||||
}
|
||||
}
|
||||
KeyCode::Char('S') => {
|
||||
// Service stop command
|
||||
// Service stop command via SSH with progress display
|
||||
if let (Some(service_name), Some(hostname)) = (self.get_selected_service(), self.current_host.clone()) {
|
||||
if self.start_command(&hostname, CommandType::ServiceStop, service_name.clone()) {
|
||||
return Ok(Some(UiCommand::ServiceStop { hostname, service_name }));
|
||||
}
|
||||
// Start transition tracking for visual feedback
|
||||
self.start_command(&hostname, CommandType::ServiceStop, service_name.clone());
|
||||
|
||||
let connection_ip = self.get_connection_ip(&hostname);
|
||||
let service_stop_command = format!(
|
||||
"bash -c 'cat << \"EOF\"\nService Stop: {}.service\nTarget: {} ({})\n\nEOF\nssh -tt {}@{} \"sudo systemctl stop {}.service && echo \\\"Service stopped successfully\\\" && sudo systemctl status {}.service --no-pager -l\"\necho\necho \"========================================\"\necho \"Operation completed. Press any key to close...\"\necho \"========================================\"\nread -n 1 -s\nexit'",
|
||||
service_name,
|
||||
hostname,
|
||||
connection_ip,
|
||||
self.config.ssh.rebuild_user,
|
||||
connection_ip,
|
||||
service_name,
|
||||
service_name
|
||||
);
|
||||
|
||||
std::process::Command::new("tmux")
|
||||
.arg("split-window")
|
||||
.arg("-v")
|
||||
.arg("-p")
|
||||
.arg("30")
|
||||
.arg(&service_stop_command)
|
||||
.spawn()
|
||||
.ok(); // Ignore errors, tmux will handle them
|
||||
}
|
||||
}
|
||||
KeyCode::Char('J') => {
|
||||
// Show service logs via journalctl in tmux split window
|
||||
if let (Some(service_name), Some(hostname)) = (self.get_selected_service(), self.current_host.clone()) {
|
||||
let connection_ip = self.get_connection_ip(&hostname);
|
||||
let journalctl_command = format!(
|
||||
"bash -c \"ssh -tt {}@{} 'sudo journalctl -u {}.service -f --no-pager -n 50'; exit\"",
|
||||
self.config.ssh.rebuild_user,
|
||||
hostname,
|
||||
connection_ip,
|
||||
service_name
|
||||
);
|
||||
|
||||
@@ -312,10 +377,11 @@ impl TuiApp {
|
||||
// Check if this service has a custom log file configured
|
||||
if let Some(host_logs) = self.config.service_logs.get(&hostname) {
|
||||
if let Some(log_config) = host_logs.iter().find(|config| config.service_name == service_name) {
|
||||
let connection_ip = self.get_connection_ip(&hostname);
|
||||
let tail_command = format!(
|
||||
"bash -c \"ssh -tt {}@{} 'sudo tail -n 50 -f {}'; exit\"",
|
||||
self.config.ssh.rebuild_user,
|
||||
hostname,
|
||||
connection_ip,
|
||||
log_config.log_file_path
|
||||
);
|
||||
|
||||
@@ -365,6 +431,26 @@ impl TuiApp {
|
||||
}
|
||||
}
|
||||
}
|
||||
KeyCode::Char('t') => {
|
||||
// Open SSH terminal session in tmux window
|
||||
if let Some(hostname) = self.current_host.clone() {
|
||||
let connection_ip = self.get_connection_ip(&hostname);
|
||||
let ssh_command = format!(
|
||||
"ssh -tt {}@{}",
|
||||
self.config.ssh.rebuild_user,
|
||||
connection_ip
|
||||
);
|
||||
|
||||
std::process::Command::new("tmux")
|
||||
.arg("split-window")
|
||||
.arg("-v")
|
||||
.arg("-p")
|
||||
.arg("30") // Use 30% like other commands
|
||||
.arg(&ssh_command)
|
||||
.spawn()
|
||||
.ok(); // Ignore errors, tmux will handle them
|
||||
}
|
||||
}
|
||||
KeyCode::Tab => {
|
||||
// Tab cycles to next host
|
||||
self.navigate_host(1);
|
||||
@@ -558,6 +644,21 @@ impl TuiApp {
|
||||
])
|
||||
.split(main_chunks[1]); // main_chunks[1] is now the content area (between title and statusbar)
|
||||
|
||||
// Check if current host is offline
|
||||
let current_host_offline = if let Some(hostname) = self.current_host.clone() {
|
||||
self.calculate_host_status(&hostname, metric_store) == Status::Offline
|
||||
} else {
|
||||
true // No host selected is considered offline
|
||||
};
|
||||
|
||||
// If host is offline, render wake-up message instead of panels
|
||||
if current_host_offline {
|
||||
self.render_offline_host_message(frame, main_chunks[1]);
|
||||
self.render_btop_title(frame, main_chunks[0], metric_store);
|
||||
self.render_statusbar(frame, main_chunks[2]);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if backup panel should be shown
|
||||
let show_backup = if let Some(hostname) = self.current_host.clone() {
|
||||
let host_widgets = self.get_or_create_host_widgets(&hostname);
|
||||
@@ -790,8 +891,10 @@ impl TuiApp {
|
||||
let host_widgets = self.get_or_create_host_widgets(&hostname);
|
||||
host_widgets.system_scroll_offset
|
||||
};
|
||||
// Clone the config to avoid borrowing issues
|
||||
let config = self.config.clone();
|
||||
let host_widgets = self.get_or_create_host_widgets(&hostname);
|
||||
host_widgets.system_widget.render_with_scroll(frame, inner_area, scroll_offset, &hostname);
|
||||
host_widgets.system_widget.render_with_scroll(frame, inner_area, scroll_offset, &hostname, Some(&config));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -811,7 +914,87 @@ impl TuiApp {
|
||||
}
|
||||
}
|
||||
|
||||
/// Render offline host message with wake-up option
|
||||
fn render_offline_host_message(&self, frame: &mut Frame, area: Rect) {
|
||||
use ratatui::layout::Alignment;
|
||||
use ratatui::style::Modifier;
|
||||
use ratatui::text::{Line, Span};
|
||||
use ratatui::widgets::{Block, Borders, Paragraph};
|
||||
|
||||
// Get hostname for message
|
||||
let hostname = self.current_host.as_ref()
|
||||
.map(|h| h.as_str())
|
||||
.unwrap_or("Unknown");
|
||||
|
||||
// Check if host has MAC address for wake-on-LAN
|
||||
let has_mac = self.current_host.as_ref()
|
||||
.and_then(|hostname| self.config.hosts.get(hostname))
|
||||
.and_then(|details| details.mac_address.as_ref())
|
||||
.is_some();
|
||||
|
||||
// Create message content
|
||||
let mut lines = vec![
|
||||
Line::from(Span::styled(
|
||||
format!("Host '{}' is offline", hostname),
|
||||
Style::default().fg(Theme::muted_text()).add_modifier(Modifier::BOLD),
|
||||
)),
|
||||
Line::from(""),
|
||||
];
|
||||
|
||||
if has_mac {
|
||||
lines.push(Line::from(Span::styled(
|
||||
"Press 'w' to wake up host",
|
||||
Style::default().fg(Theme::primary_text()).add_modifier(Modifier::BOLD),
|
||||
)));
|
||||
} else {
|
||||
lines.push(Line::from(Span::styled(
|
||||
"No MAC address configured - cannot wake up",
|
||||
Style::default().fg(Theme::muted_text()),
|
||||
)));
|
||||
}
|
||||
|
||||
// Create centered message
|
||||
let message = Paragraph::new(lines)
|
||||
.block(Block::default()
|
||||
.borders(Borders::ALL)
|
||||
.border_style(Style::default().fg(Theme::muted_text()))
|
||||
.title(" Offline Host ")
|
||||
.title_style(Style::default().fg(Theme::muted_text()).add_modifier(Modifier::BOLD)))
|
||||
.style(Style::default().bg(Theme::background()).fg(Theme::primary_text()))
|
||||
.alignment(Alignment::Center);
|
||||
|
||||
// Center the message in the available area
|
||||
let popup_area = ratatui::layout::Layout::default()
|
||||
.direction(Direction::Vertical)
|
||||
.constraints([
|
||||
Constraint::Percentage(40),
|
||||
Constraint::Length(6),
|
||||
Constraint::Percentage(40),
|
||||
])
|
||||
.split(area)[1];
|
||||
|
||||
let popup_area = ratatui::layout::Layout::default()
|
||||
.direction(Direction::Horizontal)
|
||||
.constraints([
|
||||
Constraint::Percentage(25),
|
||||
Constraint::Percentage(50),
|
||||
Constraint::Percentage(25),
|
||||
])
|
||||
.split(popup_area)[1];
|
||||
|
||||
frame.render_widget(message, popup_area);
|
||||
}
|
||||
|
||||
/// Parse MAC address string (e.g., "AA:BB:CC:DD:EE:FF") to [u8; 6]
|
||||
/// Get the connection IP for a hostname based on host configuration
|
||||
fn get_connection_ip(&self, hostname: &str) -> String {
|
||||
if let Some(host_details) = self.config.hosts.get(hostname) {
|
||||
host_details.get_connection_ip(hostname)
|
||||
} else {
|
||||
hostname.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_mac_address(mac_str: &str) -> Result<[u8; 6], &'static str> {
|
||||
let parts: Vec<&str> = mac_str.split(':').collect();
|
||||
if parts.len() != 6 {
|
||||
|
||||
@@ -439,7 +439,7 @@ impl Widget for SystemWidget {
|
||||
|
||||
impl SystemWidget {
|
||||
/// Render with scroll offset support
|
||||
pub fn render_with_scroll(&mut self, frame: &mut Frame, area: Rect, scroll_offset: usize, hostname: &str) {
|
||||
pub fn render_with_scroll(&mut self, frame: &mut Frame, area: Rect, scroll_offset: usize, hostname: &str, config: Option<&crate::config::DashboardConfig>) {
|
||||
let mut lines = Vec::new();
|
||||
|
||||
// NixOS section
|
||||
@@ -457,6 +457,16 @@ impl SystemWidget {
|
||||
Span::styled(format!("Agent: {}", agent_version_text), Typography::secondary())
|
||||
]));
|
||||
|
||||
// Display detected connection IP
|
||||
if let Some(config) = config {
|
||||
if let Some(host_details) = config.hosts.get(hostname) {
|
||||
let detected_ip = host_details.get_connection_ip(hostname);
|
||||
lines.push(Line::from(vec![
|
||||
Span::styled(format!("IP: {}", detected_ip), Typography::secondary())
|
||||
]));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// CPU section
|
||||
lines.push(Line::from(vec![
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "cm-dashboard-shared"
|
||||
version = "0.1.60"
|
||||
version = "0.1.74"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
|
||||
Reference in New Issue
Block a user