From b444c88ea081f04cb59342e879149e66077c528e Mon Sep 17 00:00:00 2001 From: Christoffer Martinsson Date: Fri, 28 Nov 2025 11:27:33 +0100 Subject: [PATCH] Replace external commands with native Rust APIs Significant performance improvements by eliminating subprocess spawning: - Replace 'ip' commands with rtnetlink for network interface discovery - Replace 'docker ps/images' with bollard Docker API client - Replace 'systemctl list-units' with zbus D-Bus for systemd interaction - Replace 'df' with statvfs() syscall for filesystem statistics - Replace 'lsblk' with /proc/mounts parsing Add interval-based caching to collectors: - DiskCollector now respects interval_seconds configuration - SystemdCollector now respects interval_seconds configuration - CpuCollector now respects interval_seconds configuration Remove unused command communication infrastructure: - Remove port 6131 ZMQ command receiver - Clean up unused AgentCommand types Dependencies added: - rtnetlink = "0.14" - netlink-packet-route = "0.19" - bollard = "0.17" - zbus = "4.0" - nix (fs features for statvfs) --- Cargo.lock | 1078 ++++++++++++++++++++++++++++++- agent/Cargo.toml | 17 +- agent/src/agent.rs | 40 +- agent/src/collectors/cpu.rs | 62 +- agent/src/collectors/disk.rs | 172 +++-- agent/src/collectors/memory.rs | 73 +-- agent/src/collectors/network.rs | 222 ++++--- agent/src/collectors/systemd.rs | 260 +++----- agent/src/communication/mod.rs | 47 +- agent/src/config/mod.rs | 1 - agent/src/config/validation.rs | 8 - dashboard/Cargo.toml | 2 +- shared/Cargo.toml | 2 +- 13 files changed, 1483 insertions(+), 501 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b2c955b..716aaae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -103,6 +103,137 @@ dependencies = [ "object", ] +[[package]] +name = "async-broadcast" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "435a87a52755b8f27fcf321ac4f04b2802e337c8c4872923137471ec39c37532" +dependencies = [ + "event-listener", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-channel" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-executor" +version = "1.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "497c00e0fd83a72a79a39fcbd8e3e2f055d6f6c7e025f3b3d91f4f8e76527fb8" +dependencies = [ + "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "pin-project-lite", + "slab", +] + +[[package]] +name = "async-fs" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8034a681df4aed8b8edbd7fbe472401ecf009251c8b40556b304567052e294c5" +dependencies = [ + "async-lock", + "blocking", + "futures-lite", +] + +[[package]] +name = "async-io" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "456b8a8feb6f42d237746d4b3e9a178494627745c3c56c6ea55d92ba50d026fc" +dependencies = [ + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite", + "parking", + "polling", + "rustix", + "slab", + "windows-sys 0.61.2", +] + +[[package]] +name = "async-lock" +version = "3.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd03604047cee9b6ce9de9f70c6cd540a0520c813cbd49bae61f33ab80ed1dc" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + +[[package]] +name = "async-process" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc50921ec0055cdd8a16de48773bfeec5c972598674347252c0399676be7da75" +dependencies = [ + "async-channel", + "async-io", + "async-lock", + "async-signal", + "async-task", + "blocking", + "cfg-if", + "event-listener", + "futures-lite", + "rustix", +] + +[[package]] +name = "async-recursion" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "async-signal" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43c070bbf59cd3570b6b2dd54cd772527c7c3620fce8be898406dd3ed6adc64c" +dependencies = [ + "async-io", + "async-lock", + "atomic-waker", + "cfg-if", + "futures-core", + "futures-io", + "rustix", + "signal-hook-registry", + "slab", + "windows-sys 0.61.2", +] + +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" version = "0.1.89" @@ -114,6 +245,12 @@ dependencies = [ "syn", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.5.0" @@ -144,12 +281,84 @@ version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + +[[package]] +name = "blocking" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e83f8d02be6967315521be875afa792a316e28d57b5a2d401897e2a7921b7f21" +dependencies = [ + "async-channel", + "async-task", + "futures-io", + "futures-lite", + "piper", +] + +[[package]] +name = "bollard" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d41711ad46fda47cd701f6908e59d1bd6b9a2b7464c0d0aeab95c6d37096ff8a" +dependencies = [ + "base64 0.22.1", + "bollard-stubs", + "bytes", + "futures-core", + "futures-util", + "hex", + "http 1.4.0", + "http-body-util", + "hyper 1.8.1", + "hyper-named-pipe", + "hyper-util", + "hyperlocal", + "log", + "pin-project-lite", + "serde", + "serde_derive", + "serde_json", + "serde_repr", + "serde_urlencoded", + "thiserror 1.0.69", + "tokio", + "tokio-util", + "tower-service", + "url", + "winapi", +] + +[[package]] +name = "bollard-stubs" +version = "1.45.0-rc.26.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d7c5415e3a6bc6d3e99eff6268e488fd4ee25e7b28c10f08fa6760bd9de16e4" +dependencies = [ + "serde", + "serde_repr", + "serde_with", +] + [[package]] name = "bumpalo" version = "3.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.11.0" @@ -190,6 +399,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chrono" version = "0.4.42" @@ -278,7 +493,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" [[package]] name = "cm-dashboard" -version = "0.1.196" +version = "0.1.197" dependencies = [ "anyhow", "chrono", @@ -289,7 +504,7 @@ dependencies = [ "ratatui", "serde", "serde_json", - "thiserror", + "thiserror 1.0.69", "tokio", "toml", "tracing", @@ -300,35 +515,42 @@ dependencies = [ [[package]] name = "cm-dashboard-agent" -version = "0.1.196" +version = "0.1.197" dependencies = [ "anyhow", "async-trait", + "bollard", "chrono", "chrono-tz", "clap", "cm-dashboard-shared", + "futures", "gethostname", "lettre", + "libc", + "netlink-packet-route", + "nix 0.29.0", "reqwest", + "rtnetlink", "serde", "serde_json", - "thiserror", + "thiserror 1.0.69", "tokio", "toml", "tracing", "tracing-subscriber", + "zbus", "zmq", ] [[package]] name = "cm-dashboard-shared" -version = "0.1.196" +version = "0.1.197" dependencies = [ "chrono", "serde", "serde_json", - "thiserror", + "thiserror 1.0.69", ] [[package]] @@ -337,6 +559,15 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -353,6 +584,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + [[package]] name = "crossbeam" version = "0.8.4" @@ -434,6 +674,36 @@ dependencies = [ "winapi", ] +[[package]] +name = "crypto-common" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "deranged" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ececcb659e7ba858fb4f10388c250a7252eb0a27373f1a72b8748afdd248e587" +dependencies = [ + "powerfmt", + "serde_core", +] + +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "dircpy" version = "0.3.19" @@ -456,6 +726,12 @@ dependencies = [ "syn", ] +[[package]] +name = "dyn-clone" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" + [[package]] name = "either" version = "1.15.0" @@ -487,6 +763,33 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "endi" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66b7e2430c6dff6a955451e2cfc438f09cea1965a9d6f87f7e3b90decc014099" + +[[package]] +name = "enumflags2" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1027f7680c853e056ebcec683615fb6fbbc07dbaa13b4d5d9442b146ded4ecef" +dependencies = [ + "enumflags2_derive", + "serde", +] + +[[package]] +name = "enumflags2_derive" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67c78a4d8fdf9953a5c9d458f9efe940fd97a0cab0941c075a813ac594733827" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -503,6 +806,27 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -551,6 +875,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -558,6 +897,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -566,12 +906,47 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-lite" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -590,8 +965,11 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", "futures-io", + "futures-macro", + "futures-sink", "futures-task", "memchr", "pin-project-lite", @@ -599,6 +977,16 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "gethostname" version = "0.4.3" @@ -609,6 +997,17 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "getrandom" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "getrandom" version = "0.3.4" @@ -632,14 +1031,20 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", - "indexmap", + "http 0.2.12", + "indexmap 2.12.1", "slab", "tokio", "tokio-util", "tracing", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.14.5" @@ -679,6 +1084,18 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "http" version = "0.2.12" @@ -690,6 +1107,16 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a" +dependencies = [ + "bytes", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -697,7 +1124,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.12", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http 1.4.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" +dependencies = [ + "bytes", + "futures-core", + "http 1.4.0", + "http-body 1.0.1", "pin-project-lite", ] @@ -724,8 +1174,8 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -737,6 +1187,43 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11" +dependencies = [ + "atomic-waker", + "bytes", + "futures-channel", + "futures-core", + "http 1.4.0", + "http-body 1.0.1", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "pin-utils", + "smallvec", + "tokio", + "want", +] + +[[package]] +name = "hyper-named-pipe" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73b7d8abf35697b81a825e386fc151e0d503e8cb5fcb93cc8669c376dfd6f278" +dependencies = [ + "hex", + "hyper 1.8.1", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", + "winapi", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -744,12 +1231,48 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes", - "hyper", + "hyper 0.14.32", "native-tls", "tokio", "tokio-native-tls", ] +[[package]] +name = "hyper-util" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52e9a2a24dc5c6821e71a7030e1e14b7b632acac55c40e9d2e082c621261bb56" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "hyper 1.8.1", + "libc", + "pin-project-lite", + "socket2 0.6.1", + "tokio", + "tower-service", + "tracing", +] + +[[package]] +name = "hyperlocal" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "986c5ce3b994526b3cd75578e62554abd09f0899d6206de48b3e96ab34ccc8c7" +dependencies = [ + "hex", + "http-body-util", + "hyper 1.8.1", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "iana-time-zone" version = "0.1.64" @@ -876,6 +1399,17 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", + "serde", +] + [[package]] name = "indexmap" version = "2.12.1" @@ -884,6 +1418,8 @@ checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2" dependencies = [ "equivalent", "hashbrown 0.16.1", + "serde", + "serde_core", ] [[package]] @@ -928,7 +1464,7 @@ version = "0.1.34" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33" dependencies = [ - "getrandom", + "getrandom 0.3.4", "libc", ] @@ -1037,6 +1573,15 @@ version = "2.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" +[[package]] +name = "memoffset" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.17" @@ -1083,6 +1628,94 @@ dependencies = [ "tempfile", ] +[[package]] +name = "netlink-packet-core" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72724faf704479d67b388da142b186f916188505e7e0b26719019c525882eda4" +dependencies = [ + "anyhow", + "byteorder", + "netlink-packet-utils", +] + +[[package]] +name = "netlink-packet-route" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74c171cd77b4ee8c7708da746ce392440cb7bcf618d122ec9ecc607b12938bf4" +dependencies = [ + "anyhow", + "byteorder", + "libc", + "log", + "netlink-packet-core", + "netlink-packet-utils", +] + +[[package]] +name = "netlink-packet-utils" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ede8a08c71ad5a95cdd0e4e52facd37190977039a4704eb82a283f713747d34" +dependencies = [ + "anyhow", + "byteorder", + "paste", + "thiserror 1.0.69", +] + +[[package]] +name = "netlink-proto" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72452e012c2f8d612410d89eea01e2d9b56205274abb35d53f60200b2ec41d60" +dependencies = [ + "bytes", + "futures", + "log", + "netlink-packet-core", + "netlink-sys", + "thiserror 2.0.17", +] + +[[package]] +name = "netlink-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16c903aa70590cb93691bf97a767c8d1d6122d2cc9070433deb3bbf36ce8bd23" +dependencies = [ + "bytes", + "futures", + "libc", + "log", + "tokio", +] + +[[package]] +name = "nix" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" +dependencies = [ + "bitflags 2.10.0", + "cfg-if", + "libc", +] + +[[package]] +name = "nix" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" +dependencies = [ + "bitflags 2.10.0", + "cfg-if", + "cfg_aliases", + "libc", + "memoffset", +] + [[package]] name = "nom" version = "8.0.0" @@ -1101,6 +1734,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-traits" version = "0.2.19" @@ -1175,6 +1814,22 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "ordered-stream" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aa2b01e1d916879f73a53d01d1d6cee68adbb31d6d9177a8cfce093cced1d50" +dependencies = [ + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.5" @@ -1269,12 +1924,37 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96c8c490f422ef9a4efd2cb5b42b76c8613d7e7dfc1caf667b8a3350a5acc066" +dependencies = [ + "atomic-waker", + "fastrand", + "futures-io", +] + [[package]] name = "pkg-config" version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "polling" +version = "3.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0e4f59085d47d8241c88ead0f274e8a0cb551f3625263c05eb8dd897c34218" +dependencies = [ + "cfg-if", + "concurrent-queue", + "hermit-abi", + "pin-project-lite", + "rustix", + "windows-sys 0.61.2", +] + [[package]] name = "potential_utf" version = "0.1.4" @@ -1284,6 +1964,30 @@ dependencies = [ "zerovec", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + +[[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy", +] + +[[package]] +name = "proc-macro-crate" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "219cb19e96be00ab2e37d6e299658a0cfa83e52429179969b0f0121b4ac46983" +dependencies = [ + "toml_edit 0.23.7", +] + [[package]] name = "proc-macro2" version = "1.0.103" @@ -1330,6 +2034,18 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", "rand_core", ] @@ -1338,6 +2054,9 @@ name = "rand_core" version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.16", +] [[package]] name = "ratatui" @@ -1386,6 +2105,26 @@ dependencies = [ "bitflags 2.10.0", ] +[[package]] +name = "ref-cast" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f354300ae66f76f1c85c5f84693f0ce81d747e2c3f21a45fef496d89c960bf7d" +dependencies = [ + "ref-cast-impl", +] + +[[package]] +name = "ref-cast-impl" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "regex" version = "1.12.2" @@ -1427,9 +2166,9 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.32", "hyper-tls", "ipnet", "js-sys", @@ -1455,6 +2194,24 @@ dependencies = [ "winreg", ] +[[package]] +name = "rtnetlink" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b684475344d8df1859ddb2d395dd3dac4f8f3422a1aa0725993cb375fc5caba5" +dependencies = [ + "futures", + "log", + "netlink-packet-core", + "netlink-packet-route", + "netlink-packet-utils", + "netlink-proto", + "netlink-sys", + "nix 0.27.1", + "thiserror 1.0.69", + "tokio", +] + [[package]] name = "rustix" version = "1.1.2" @@ -1507,6 +2264,30 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "schemars" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd191f9397d57d581cddd31014772520aa448f65ef991055d7f61582c65165f" +dependencies = [ + "dyn-clone", + "ref-cast", + "serde", + "serde_json", +] + +[[package]] +name = "schemars" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9558e172d4e8533736ba97870c4b2cd63f84b382a3d6eb063da41b91cce17289" +dependencies = [ + "dyn-clone", + "ref-cast", + "serde", + "serde_json", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -1579,6 +2360,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_repr" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "serde_spanned" version = "0.6.9" @@ -1600,6 +2392,35 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "3.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fa237f2807440d238e0364a218270b98f767a00d3dada77b1c53ae88940e2e7" +dependencies = [ + "base64 0.22.1", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.12.1", + "schemars 0.9.0", + "schemars 1.1.0", + "serde_core", + "serde_json", + "time", +] + +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -1702,6 +2523,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "strsim" version = "0.11.1" @@ -1805,7 +2632,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16" dependencies = [ "fastrand", - "getrandom", + "getrandom 0.3.4", "once_cell", "rustix", "windows-sys 0.61.2", @@ -1817,7 +2644,16 @@ version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.69", +] + +[[package]] +name = "thiserror" +version = "2.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" +dependencies = [ + "thiserror-impl 2.0.17", ] [[package]] @@ -1831,6 +2667,17 @@ dependencies = [ "syn", ] +[[package]] +name = "thiserror-impl" +version = "2.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.1.9" @@ -1840,6 +2687,37 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "time" +version = "0.3.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b" + +[[package]] +name = "time-macros" +version = "0.2.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinystr" version = "0.8.2" @@ -1909,8 +2787,8 @@ checksum = "dc1beb996b9d83529a9e75c17a1686767d148d70663143c7854d8b4a09ced362" dependencies = [ "serde", "serde_spanned", - "toml_datetime", - "toml_edit", + "toml_datetime 0.6.11", + "toml_edit 0.22.27", ] [[package]] @@ -1922,20 +2800,50 @@ dependencies = [ "serde", ] +[[package]] +name = "toml_datetime" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2cdb639ebbc97961c51720f858597f7f24c4fc295327923af55b74c3c724533" +dependencies = [ + "serde_core", +] + [[package]] name = "toml_edit" version = "0.22.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" dependencies = [ - "indexmap", + "indexmap 2.12.1", "serde", "serde_spanned", - "toml_datetime", + "toml_datetime 0.6.11", "toml_write", "winnow", ] +[[package]] +name = "toml_edit" +version = "0.23.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6485ef6d0d9b5d0ec17244ff7eb05310113c3f316f2d14200d4de56b3cb98f8d" +dependencies = [ + "indexmap 2.12.1", + "toml_datetime 0.7.3", + "toml_parser", + "winnow", +] + +[[package]] +name = "toml_parser" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0cbe268d35bdb4bb5a56a2de88d0ad0eb70af5384a99d648cd4b3d04039800e" +dependencies = [ + "winnow", +] + [[package]] name = "toml_write" version = "0.1.2" @@ -2015,6 +2923,23 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "typenum" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" + +[[package]] +name = "uds_windows" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89daebc3e6fd160ac4aa9fc8b3bf71e1f74fbf92367ae71fb83a037e8bf164b9" +dependencies = [ + "memoffset", + "tempfile", + "winapi", +] + [[package]] name = "unicode-ident" version = "1.0.22" @@ -2541,6 +3466,16 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" +[[package]] +name = "xdg-home" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec1cdab258fb55c0da61328dc52c8764709b249011b2cad0454c72f0bf10a1f6" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "yoke" version = "0.8.1" @@ -2564,6 +3499,68 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zbus" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb97012beadd29e654708a0fdb4c84bc046f537aecfde2c3ee0a9e4b4d48c725" +dependencies = [ + "async-broadcast", + "async-executor", + "async-fs", + "async-io", + "async-lock", + "async-process", + "async-recursion", + "async-task", + "async-trait", + "blocking", + "enumflags2", + "event-listener", + "futures-core", + "futures-sink", + "futures-util", + "hex", + "nix 0.29.0", + "ordered-stream", + "rand", + "serde", + "serde_repr", + "sha1", + "static_assertions", + "tracing", + "uds_windows", + "windows-sys 0.52.0", + "xdg-home", + "zbus_macros", + "zbus_names", + "zvariant", +] + +[[package]] +name = "zbus_macros" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "267db9407081e90bbfa46d841d3cbc60f59c0351838c4bc65199ecd79ab1983e" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn", + "zvariant_utils", +] + +[[package]] +name = "zbus_names" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b9b1fef7d021261cc16cba64c351d291b715febe0fa10dc3a443ac5a5022e6c" +dependencies = [ + "serde", + "static_assertions", + "zvariant", +] + [[package]] name = "zerocopy" version = "0.8.30" @@ -2669,3 +3666,40 @@ dependencies = [ "system-deps", "zeromq-src", ] + +[[package]] +name = "zvariant" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2084290ab9a1c471c38fc524945837734fbf124487e105daec2bb57fd48c81fe" +dependencies = [ + "endi", + "enumflags2", + "serde", + "static_assertions", + "zvariant_derive", +] + +[[package]] +name = "zvariant_derive" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73e2ba546bda683a90652bac4a279bc146adad1386f25379cf73200d2002c449" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn", + "zvariant_utils", +] + +[[package]] +name = "zvariant_utils" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c51bcff7cc3dbb5055396bcf774748c3dab426b4b8659046963523cee4808340" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 6691dad..268a9d4 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cm-dashboard-agent" -version = "0.1.196" +version = "0.1.197" edition = "2021" [dependencies] @@ -20,4 +20,17 @@ gethostname = { workspace = true } chrono-tz = "0.8" toml = { workspace = true } async-trait = "0.1" -reqwest = { version = "0.11", features = ["json", "blocking"] } \ No newline at end of file +reqwest = { version = "0.11", features = ["json", "blocking"] } + +# Native system APIs +nix = { version = "0.29", features = ["fs"] } +rtnetlink = "0.14" +netlink-packet-route = "0.19" +futures = "0.3" +libc = "0.2" + +# Docker API client +bollard = "0.17" + +# D-Bus client for systemd +zbus = "4.0" \ No newline at end of file diff --git a/agent/src/agent.rs b/agent/src/agent.rs index d74a4c7..e6eeb47 100644 --- a/agent/src/agent.rs +++ b/agent/src/agent.rs @@ -4,7 +4,7 @@ use std::time::Duration; use tokio::time::interval; use tracing::{debug, error, info}; -use crate::communication::{AgentCommand, ZmqHandler}; +use crate::communication::ZmqHandler; use crate::config::AgentConfig; use crate::collectors::{ Collector, @@ -134,12 +134,6 @@ impl Agent { // NOTE: With structured data, we might need to implement status tracking differently // For now, we skip this until status evaluation is migrated } - // Handle incoming commands (check periodically) - _ = tokio::time::sleep(Duration::from_millis(100)) => { - if let Err(e) = self.handle_commands().await { - error!("Error handling commands: {}", e); - } - } _ = &mut shutdown_rx => { info!("Shutdown signal received, stopping agent loop"); break; @@ -259,36 +253,4 @@ impl Agent { Ok(()) } - /// Handle incoming commands from dashboard - async fn handle_commands(&mut self) -> Result<()> { - // Try to receive a command (non-blocking) - if let Ok(Some(command)) = self.zmq_handler.try_receive_command() { - info!("Received command: {:?}", command); - - match command { - AgentCommand::CollectNow => { - info!("Received immediate collection request"); - if let Err(e) = self.collect_and_broadcast().await { - error!("Failed to collect on demand: {}", e); - } - } - AgentCommand::SetInterval { seconds } => { - info!("Received interval change request: {}s", seconds); - // Note: This would require more complex handling to update the interval - // For now, just acknowledge - } - AgentCommand::ToggleCollector { name, enabled } => { - info!("Received collector toggle request: {} -> {}", name, enabled); - // Note: This would require more complex handling to enable/disable collectors - // For now, just acknowledge - } - AgentCommand::Ping => { - info!("Received ping command"); - // Maybe send back a pong or status - } - } - } - Ok(()) - } - } \ No newline at end of file diff --git a/agent/src/collectors/cpu.rs b/agent/src/collectors/cpu.rs index 6c870de..6d86c76 100644 --- a/agent/src/collectors/cpu.rs +++ b/agent/src/collectors/cpu.rs @@ -1,22 +1,25 @@ use async_trait::async_trait; -use cm_dashboard_shared::{AgentData, Status, HysteresisThresholds}; +use cm_dashboard_shared::{AgentData, Status, HysteresisThresholds, CpuData}; +use std::sync::RwLock; +use std::time::Instant; use tracing::debug; use super::{utils, Collector, CollectorError}; use crate::config::CpuConfig; -/// Extremely efficient CPU metrics collector -/// -/// EFFICIENCY OPTIMIZATIONS: -/// - Single /proc/loadavg read for all load metrics -/// - Single /proc/stat read for CPU usage -/// - Minimal string allocations -/// - No process spawning -/// - <0.1ms collection time target +/// Extremely efficient CPU metrics collector with interval-based caching pub struct CpuCollector { load_thresholds: HysteresisThresholds, temperature_thresholds: HysteresisThresholds, + config: CpuConfig, + state: RwLock, +} + +#[derive(Debug, Clone)] +struct CpuCacheState { + last_collection: Option, + cached_data: CpuData, } impl CpuCollector { @@ -26,15 +29,39 @@ impl CpuCollector { config.load_warning_threshold, config.load_critical_threshold, ); - + let temperature_thresholds = HysteresisThresholds::new( config.temperature_warning_threshold, config.temperature_critical_threshold, ); - + Self { load_thresholds, temperature_thresholds, + config, + state: RwLock::new(CpuCacheState { + last_collection: None, + cached_data: CpuData { + load_1min: 0.0, + load_5min: 0.0, + load_15min: 0.0, + frequency_mhz: 0.0, + temperature_celsius: None, + load_status: Status::Unknown, + temperature_status: Status::Unknown, + }, + }), + } + } + + fn should_update_cache(&self) -> bool { + let state = self.state.read().unwrap(); + match state.last_collection { + None => true, + Some(last) => { + let cache_duration = std::time::Duration::from_secs(self.config.interval_seconds); + last.elapsed() > cache_duration + } } } @@ -156,6 +183,14 @@ impl CpuCollector { #[async_trait] impl Collector for CpuCollector { async fn collect_structured(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> { + // Check if cache is valid + if !self.should_update_cache() { + let state = self.state.read().unwrap(); + agent_data.system.cpu = state.cached_data.clone(); + debug!("Using cached CPU data (interval: {}s)", self.config.interval_seconds); + return Ok(()); + } + debug!("Collecting CPU metrics"); let start = std::time::Instant::now(); @@ -187,6 +222,11 @@ impl Collector for CpuCollector { Status::Unknown }; + // Update cache + let mut state = self.state.write().unwrap(); + state.last_collection = Some(Instant::now()); + state.cached_data = agent_data.system.cpu.clone(); + Ok(()) } } diff --git a/agent/src/collectors/disk.rs b/agent/src/collectors/disk.rs index 71c53cf..31cc945 100644 --- a/agent/src/collectors/disk.rs +++ b/agent/src/collectors/disk.rs @@ -6,6 +6,7 @@ use crate::config::DiskConfig; use std::process::Command; use std::time::Instant; use std::collections::HashMap; +use std::sync::RwLock; use tracing::debug; use super::{Collector, CollectorError}; @@ -14,6 +15,19 @@ use super::{Collector, CollectorError}; pub struct DiskCollector { config: DiskConfig, temperature_thresholds: HysteresisThresholds, + /// Cached state with thread-safe interior mutability + state: RwLock, +} + +/// Internal state for disk caching +#[derive(Debug, Clone)] +struct DiskCacheState { + /// Last collection time for performance tracking + last_collection: Option, + /// Cached drive data + cached_drives: Vec, + /// Cached pool data + cached_pools: Vec, } /// A physical drive with its filesystems @@ -58,10 +72,17 @@ impl DiskCollector { config.temperature_warning_celsius, config.temperature_critical_celsius, ); - + + let state = DiskCacheState { + last_collection: None, + cached_drives: Vec::new(), + cached_pools: Vec::new(), + }; + Self { config, temperature_thresholds, + state: RwLock::new(state), } } @@ -104,40 +125,70 @@ impl DiskCollector { self.populate_drives_data(&physical_drives, &smart_data, agent_data)?; self.populate_pools_data(&mergerfs_pools, &smart_data, agent_data)?; + // Step 7: Update cache with fresh data + { + let mut state = self.state.write().unwrap(); + state.cached_drives = agent_data.system.storage.drives.clone(); + state.cached_pools = agent_data.system.storage.pools.clone(); + state.last_collection = Some(Instant::now()); + } + let elapsed = start_time.elapsed(); debug!("Storage collection completed in {:?}", elapsed); Ok(()) } - /// Get block devices and their mount points using lsblk + /// Check if disk collection cache should be updated + fn should_update_cache(&self) -> bool { + let state = self.state.read().unwrap(); + + match state.last_collection { + None => true, + Some(last) => { + let cache_duration = std::time::Duration::from_secs(self.config.interval_seconds); + last.elapsed() > cache_duration + } + } + } + + /// Get cached disk data if available and fresh + fn get_cached_data(&self) -> Option<(Vec, Vec)> { + if !self.should_update_cache() { + let state = self.state.read().unwrap(); + Some((state.cached_drives.clone(), state.cached_pools.clone())) + } else { + None + } + } + + /// Get block devices and their mount points by reading /proc/mounts async fn get_mount_devices(&self) -> Result, CollectorError> { - use super::run_command_with_timeout; - - let mut cmd = Command::new("lsblk"); - cmd.args(&["-rn", "-o", "NAME,MOUNTPOINT"]); - - let output = run_command_with_timeout(cmd, 2).await + let content = std::fs::read_to_string("/proc/mounts") .map_err(|e| CollectorError::SystemRead { - path: "block devices".to_string(), + path: "/proc/mounts".to_string(), error: e.to_string(), })?; let mut mount_devices = HashMap::new(); - for line in String::from_utf8_lossy(&output.stdout).lines() { + + for line in content.lines() { let parts: Vec<&str> = line.split_whitespace().collect(); - if parts.len() >= 2 { - let device_name = parts[0]; + if parts.len() >= 3 { + let device = parts[0]; let mount_point = parts[1]; - - // Skip swap partitions and unmounted devices - if mount_point == "[SWAP]" || mount_point.is_empty() { + let fs_type = parts[2]; + + // Skip pseudo filesystems and fuse mounts + if fs_type.starts_with("fuse") || + matches!(fs_type, "proc" | "sysfs" | "tmpfs" | "devtmpfs" | + "devpts" | "cgroup" | "cgroup2" | "pstore" | "bpf" | + "tracefs" | "debugfs" | "securityfs" | "hugetlbfs" | + "mqueue" | "configfs" | "autofs") { continue; } - - // Convert device name to full path - let device_path = format!("/dev/{}", device_name); - mount_devices.insert(mount_point.to_string(), device_path); + + mount_devices.insert(mount_point.to_string(), device.to_string()); } } @@ -187,44 +238,20 @@ impl DiskCollector { Ok(()) } - /// Get filesystem info for a single mount point + /// Get filesystem info for a single mount point using statvfs syscall fn get_filesystem_info(&self, mount_point: &str) -> Result<(u64, u64), CollectorError> { - let output = std::process::Command::new("timeout") - .args(&["2", "df", "--block-size=1", mount_point]) - .output() - .map_err(|e| CollectorError::SystemRead { - path: format!("df {}", mount_point), - error: e.to_string(), - })?; + use nix::sys::statvfs::statvfs; - let output_str = String::from_utf8_lossy(&output.stdout); - let lines: Vec<&str> = output_str.lines().collect(); - - if lines.len() < 2 { - return Err(CollectorError::Parse { - value: output_str.to_string(), - error: "Expected at least 2 lines from df output".to_string(), - }); - } - - // Parse the data line (skip header) - let parts: Vec<&str> = lines[1].split_whitespace().collect(); - if parts.len() < 4 { - return Err(CollectorError::Parse { - value: lines[1].to_string(), - error: "Expected at least 4 fields in df output".to_string(), - }); - } - - let total_bytes: u64 = parts[1].parse().map_err(|e| CollectorError::Parse { - value: parts[1].to_string(), - error: format!("Failed to parse total bytes: {}", e), + let stat = statvfs(mount_point).map_err(|e| CollectorError::SystemRead { + path: mount_point.to_string(), + error: format!("statvfs failed: {}", e), })?; - let used_bytes: u64 = parts[2].parse().map_err(|e| CollectorError::Parse { - value: parts[2].to_string(), - error: format!("Failed to parse used bytes: {}", e), - })?; + // Calculate total and used bytes + let block_size = stat.fragment_size() as u64; + let total_bytes = stat.blocks() as u64 * block_size; + let available_bytes = stat.blocks_available() as u64 * block_size; + let used_bytes = total_bytes - available_bytes; Ok((total_bytes, used_bytes)) } @@ -760,32 +787,29 @@ impl DiskCollector { Ok((data_drives, parity_drives)) } - /// Get drive information for a mount path + /// Get drive information for a mount path by reading /proc/mounts fn get_drive_info_for_path(&self, path: &str) -> anyhow::Result { - // Use lsblk to find the backing device with timeout - let output = Command::new("timeout") - .args(&["2", "lsblk", "-rn", "-o", "NAME,MOUNTPOINT"]) - .output() - .map_err(|e| anyhow::anyhow!("Failed to run lsblk: {}", e))?; - - let output_str = String::from_utf8_lossy(&output.stdout); + // Read /proc/mounts to find the backing device + let content = std::fs::read_to_string("/proc/mounts") + .map_err(|e| anyhow::anyhow!("Failed to read /proc/mounts: {}", e))?; + let mut device = String::new(); - - for line in output_str.lines() { + + for line in content.lines() { let parts: Vec<&str> = line.split_whitespace().collect(); if parts.len() >= 2 && parts[1] == path { device = parts[0].to_string(); break; } } - + if device.is_empty() { return Err(anyhow::anyhow!("Could not find device for path {}", path)); } - - // Extract base device name (e.g., "sda1" -> "sda") - let base_device = self.extract_base_device(&format!("/dev/{}", device)); - + + // Extract base device name (e.g., "/dev/sda1" -> "sda") + let base_device = self.extract_base_device(&device); + // Get temperature from SMART data if available let temperature = if let Ok(smart_data) = tokio::task::block_in_place(|| { tokio::runtime::Handle::current().block_on(self.get_smart_data(&base_device)) @@ -794,7 +818,7 @@ impl DiskCollector { } else { None }; - + Ok(PoolDrive { name: base_device, mount_point: path.to_string(), @@ -838,7 +862,15 @@ impl DiskCollector { #[async_trait] impl Collector for DiskCollector { async fn collect_structured(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> { - self.collect_storage_data(agent_data).await + // Use cached data if available and fresh + if let Some((cached_drives, cached_pools)) = self.get_cached_data() { + agent_data.system.storage.drives = cached_drives; + agent_data.system.storage.pools = cached_pools; + Ok(()) + } else { + // Collect fresh data + self.collect_storage_data(agent_data).await + } } } diff --git a/agent/src/collectors/memory.rs b/agent/src/collectors/memory.rs index e186704..03d613e 100644 --- a/agent/src/collectors/memory.rs +++ b/agent/src/collectors/memory.rs @@ -95,62 +95,47 @@ impl MemoryCollector { Ok(()) } - /// Populate tmpfs data into AgentData + /// Populate tmpfs data into AgentData using statvfs syscall async fn populate_tmpfs_data(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> { + use nix::sys::statvfs::statvfs; + // Discover all tmpfs mount points let tmpfs_mounts = self.discover_tmpfs_mounts()?; - + if tmpfs_mounts.is_empty() { debug!("No tmpfs mounts found to monitor"); return Ok(()); } - // Get usage data for all tmpfs mounts at once using df (with 2 second timeout) - let mut df_args = vec!["2", "df", "--output=target,size,used", "--block-size=1"]; - df_args.extend(tmpfs_mounts.iter().map(|s| s.as_str())); + // Get usage data for each tmpfs mount using statvfs syscall + for mount_point in tmpfs_mounts { + match statvfs(mount_point.as_str()) { + Ok(stat) => { + let block_size = stat.fragment_size() as u64; + let total_bytes = stat.blocks() as u64 * block_size; + let available_bytes = stat.blocks_available() as u64 * block_size; + let used_bytes = total_bytes - available_bytes; - let df_output = std::process::Command::new("timeout") - .args(&df_args[..]) - .output() - .map_err(|e| CollectorError::SystemRead { - path: "tmpfs mounts".to_string(), - error: e.to_string(), - })?; + if total_bytes == 0 { + continue; + } - let df_str = String::from_utf8_lossy(&df_output.stdout); - let df_lines: Vec<&str> = df_str.lines().skip(1).collect(); // Skip header + let total_gb = total_bytes as f32 / (1024.0 * 1024.0 * 1024.0); + let used_gb = used_bytes as f32 / (1024.0 * 1024.0 * 1024.0); + let usage_percent = (used_bytes as f32 / total_bytes as f32) * 100.0; - // Process each tmpfs mount - for (i, mount_point) in tmpfs_mounts.iter().enumerate() { - if i >= df_lines.len() { - debug!("Not enough df output lines for tmpfs mount: {}", mount_point); - continue; + // Add to tmpfs list + agent_data.system.memory.tmpfs.push(TmpfsData { + mount: mount_point.clone(), + usage_percent, + used_gb, + total_gb, + }); + } + Err(e) => { + debug!("Failed to get stats for tmpfs mount {}: {}", mount_point, e); + } } - - let parts: Vec<&str> = df_lines[i].split_whitespace().collect(); - if parts.len() < 3 { - debug!("Invalid df output for tmpfs mount: {}", mount_point); - continue; - } - - let total_bytes: u64 = parts[1].parse().unwrap_or(0); - let used_bytes: u64 = parts[2].parse().unwrap_or(0); - - if total_bytes == 0 { - continue; - } - - let total_gb = total_bytes as f32 / (1024.0 * 1024.0 * 1024.0); - let used_gb = used_bytes as f32 / (1024.0 * 1024.0 * 1024.0); - let usage_percent = (used_bytes as f32 / total_bytes as f32) * 100.0; - - // Add to tmpfs list - agent_data.system.memory.tmpfs.push(TmpfsData { - mount: mount_point.clone(), - usage_percent, - used_gb, - total_gb, - }); } // Sort tmpfs mounts by mount point for consistent display order diff --git a/agent/src/collectors/network.rs b/agent/src/collectors/network.rs index fd4dbe2..302cb89 100644 --- a/agent/src/collectors/network.rs +++ b/agent/src/collectors/network.rs @@ -1,7 +1,12 @@ use async_trait::async_trait; use cm_dashboard_shared::{AgentData, NetworkInterfaceData, Status}; -use std::process::Command; use tracing::debug; +use futures::stream::TryStreamExt; +use rtnetlink::{new_connection, IpVersion}; +use netlink_packet_route::link::LinkAttribute; +use netlink_packet_route::address::AddressAttribute; +use netlink_packet_route::route::RouteAttribute; +use std::net::IpAddr; use super::{Collector, CollectorError}; use crate::config::NetworkConfig; @@ -49,36 +54,52 @@ impl NetworkCollector { } } - /// Get the primary physical interface (the one with default route) - fn get_primary_physical_interface() -> Option { - match Command::new("timeout").args(["2", "ip", "route", "show", "default"]).output() { - Ok(output) if output.status.success() => { - let output_str = String::from_utf8_lossy(&output.stdout); - // Parse: "default via 192.168.1.1 dev eno1 ..." - for line in output_str.lines() { - if line.starts_with("default") { - if let Some(dev_pos) = line.find(" dev ") { - let after_dev = &line[dev_pos + 5..]; - if let Some(space_pos) = after_dev.find(' ') { - let interface = &after_dev[..space_pos]; - // Only return if it's a physical interface - if Self::is_physical_interface(interface) { - return Some(interface.to_string()); - } + /// Get the primary physical interface (the one with default route) using rtnetlink + async fn get_primary_physical_interface() -> Option { + let (connection, handle, _) = match new_connection() { + Ok(conn) => conn, + Err(e) => { + debug!("Failed to create netlink connection: {}", e); + return None; + } + }; + + tokio::spawn(connection); + + // Get default route + let mut routes = handle.route().get(IpVersion::V4).execute(); + + while let Ok(Some(route)) = routes.try_next().await { + // Check if this is a default route (destination is 0.0.0.0/0) + if route.header.destination_prefix_length == 0 { + // Find the output interface (OIF) attribute + if let Some(oif) = route.attributes.iter().find_map(|attr| { + if let RouteAttribute::Oif(index) = attr { + Some(*index) + } else { + None + } + }) { + // Get interface name from index + let mut link = handle.link().get().match_index(oif).execute(); + if let Ok(Some(link_msg)) = link.try_next().await { + if let Some(name) = link_msg.attributes.iter().find_map(|attr| { + if let LinkAttribute::IfName(n) = attr { + Some(n.to_string()) } else { - // No space after interface name (end of line) - let interface = after_dev.trim(); - if Self::is_physical_interface(interface) { - return Some(interface.to_string()); - } + None + } + }) { + if Self::is_physical_interface(&name) { + return Some(name); } } } } - None } - _ => None, } + + None } /// Parse VLAN configuration from /proc/net/vlan/config @@ -103,102 +124,105 @@ impl NetworkCollector { vlan_map } - /// Collect network interfaces using ip command + /// Collect network interfaces using rtnetlink async fn collect_interfaces(&self) -> Vec { let mut interfaces = Vec::new(); // Parse VLAN configuration let vlan_map = Self::parse_vlan_config(); - match Command::new("timeout").args(["2", "ip", "-j", "addr"]).output() { - Ok(output) if output.status.success() => { - let json_str = String::from_utf8_lossy(&output.stdout); + // Create netlink connection + let (connection, handle, _) = match new_connection() { + Ok(conn) => conn, + Err(e) => { + debug!("Failed to create netlink connection: {}", e); + return interfaces; + } + }; - if let Ok(json_data) = serde_json::from_str::(&json_str) { - if let Some(ifaces) = json_data.as_array() { - for iface in ifaces { - let name = iface["ifname"].as_str().unwrap_or("").to_string(); + tokio::spawn(connection); - // Skip loopback, empty names, and ifb* interfaces - if name.is_empty() || name == "lo" || name.starts_with("ifb") { - continue; - } + // Get all links + let mut links = handle.link().get().execute(); - // Parse parent interface from @parent notation (e.g., lan@enp0s31f6) - let (interface_name, parent_interface) = if let Some(at_pos) = name.find('@') { - let (child, parent) = name.split_at(at_pos); - (child.to_string(), Some(parent[1..].to_string())) - } else { - (name.clone(), None) - }; + while let Ok(Some(link)) = links.try_next().await { + // Get interface name + let name = match link.attributes.iter().find_map(|attr| { + if let LinkAttribute::IfName(n) = attr { + Some(n.to_string()) + } else { + None + } + }) { + Some(n) => n, + None => continue, + }; - let mut ipv4_addresses = Vec::new(); - let mut ipv6_addresses = Vec::new(); + // Skip loopback and ifb interfaces + if name == "lo" || name.starts_with("ifb") { + continue; + } - // Extract IP addresses - if let Some(addr_info) = iface["addr_info"].as_array() { - for addr in addr_info { - if let Some(family) = addr["family"].as_str() { - if let Some(local) = addr["local"].as_str() { - match family { - "inet" => ipv4_addresses.push(local.to_string()), - "inet6" => { - // Skip link-local IPv6 addresses (fe80::) - if !local.starts_with("fe80:") { - ipv6_addresses.push(local.to_string()); - } - } - _ => {} - } - } - } + // Parse parent interface from @parent notation (e.g., lan@enp0s31f6) + let (interface_name, parent_interface) = if let Some(at_pos) = name.find('@') { + let (child, parent) = name.split_at(at_pos); + (child.to_string(), Some(parent[1..].to_string())) + } else { + (name.clone(), None) + }; + + // Get IP addresses for this interface + let mut ipv4_addresses = Vec::new(); + let mut ipv6_addresses = Vec::new(); + + let mut addrs = handle.address().get().set_link_index_filter(link.header.index).execute(); + while let Ok(Some(addr)) = addrs.try_next().await { + for nla in &addr.attributes { + if let AddressAttribute::Address(ip) = nla { + match ip { + IpAddr::V4(ipv4) => ipv4_addresses.push(ipv4.to_string()), + IpAddr::V6(ipv6) => { + // Skip link-local IPv6 addresses (fe80::) + if !ipv6.to_string().starts_with("fe80:") { + ipv6_addresses.push(ipv6.to_string()); } } - - // Determine if physical and get status - let is_physical = Self::is_physical_interface(&interface_name); - - // Only filter out virtual interfaces without IPs - // Physical interfaces should always be shown even if down/no IPs - if !is_physical && ipv4_addresses.is_empty() && ipv6_addresses.is_empty() { - continue; - } - - let link_status = if is_physical { - Self::get_link_status(&name) - } else { - Status::Unknown // Virtual interfaces don't have meaningful link status - }; - - // Look up VLAN ID from the map (use original name before @ parsing) - let vlan_id = vlan_map.get(&name).copied(); - - interfaces.push(NetworkInterfaceData { - name: interface_name, - ipv4_addresses, - ipv6_addresses, - is_physical, - link_status, - parent_interface, - vlan_id, - }); } } } } - Err(e) => { - debug!("Failed to execute ip command: {}", e); - } - Ok(output) => { - debug!("ip command failed with status: {}", output.status); + + // Determine if physical + let is_physical = Self::is_physical_interface(&interface_name); + + // Only filter out virtual interfaces without IPs + if !is_physical && ipv4_addresses.is_empty() && ipv6_addresses.is_empty() { + continue; } + + let link_status = if is_physical { + Self::get_link_status(&name) + } else { + Status::Unknown + }; + + // Look up VLAN ID + let vlan_id = vlan_map.get(&name).copied(); + + interfaces.push(NetworkInterfaceData { + name: interface_name, + ipv4_addresses, + ipv6_addresses, + is_physical, + link_status, + parent_interface, + vlan_id, + }); } - // Assign primary physical interface as parent to virtual interfaces without explicit parent - let primary_interface = Self::get_primary_physical_interface(); - if let Some(primary) = primary_interface { + // Assign primary physical interface as parent to virtual interfaces + if let Some(primary) = Self::get_primary_physical_interface().await { for interface in interfaces.iter_mut() { - // Only assign parent to virtual interfaces that don't already have one if !interface.is_physical && interface.parent_interface.is_none() { interface.parent_interface = Some(primary.clone()); } diff --git a/agent/src/collectors/systemd.rs b/agent/src/collectors/systemd.rs index bcaa6be..9affb64 100644 --- a/agent/src/collectors/systemd.rs +++ b/agent/src/collectors/systemd.rs @@ -5,6 +5,9 @@ use std::process::Command; use std::sync::RwLock; use std::time::Instant; use tracing::{debug, warn}; +use bollard::Docker; +use bollard::container::ListContainersOptions; +use zbus::Connection; use super::{Collector, CollectorError}; use crate::config::SystemdConfig; @@ -74,7 +77,7 @@ impl SystemdCollector { debug!("Collecting systemd services metrics"); // Get cached services (discovery only happens when needed) - let monitored_services = match self.get_monitored_services() { + let monitored_services = match self.get_monitored_services().await { Ok(services) => services, Err(e) => { debug!("Failed to get monitored services: {}", e); @@ -119,7 +122,7 @@ impl SystemdCollector { } if service_name.contains("docker") && active_status == "active" { - let docker_containers = self.get_docker_containers(); + let docker_containers = self.get_docker_containers().await; for (container_name, container_status) in docker_containers { // For now, docker containers have no additional metrics // Future: could add memory_mb, cpu_percent, restart_count, etc. @@ -134,7 +137,7 @@ impl SystemdCollector { } // Add Docker images - let docker_images = self.get_docker_images(); + let docker_images = self.get_docker_images().await; for (image_name, image_status, image_size_mb) in docker_images { let mut metrics = Vec::new(); metrics.push(SubServiceMetric { @@ -190,7 +193,7 @@ impl SystemdCollector { } /// Get monitored services, discovering them if needed or cache is expired - fn get_monitored_services(&self) -> Result> { + async fn get_monitored_services(&self) -> Result> { // Check if we need discovery without holding the lock let needs_discovery = { let state = self.state.read().unwrap(); @@ -205,7 +208,7 @@ impl SystemdCollector { if needs_discovery { debug!("Discovering systemd services (cache expired or first run)"); - match self.discover_services_internal() { + match self.discover_services_internal().await { Ok((services, status_cache)) => { if let Ok(mut state) = self.state.write() { state.monitored_services = services.clone(); @@ -252,72 +255,46 @@ impl SystemdCollector { state.nginx_site_metrics.clone() } - /// Auto-discover interesting services to monitor - fn discover_services_internal(&self) -> Result<(Vec, std::collections::HashMap)> { - // First: Get all service unit files (with 3 second timeout) - let unit_files_output = Command::new("timeout") - .args(&["3", "systemctl", "list-unit-files", "--type=service", "--no-pager", "--plain"]) - .output()?; + /// Auto-discover interesting services to monitor using D-Bus + async fn discover_services_internal(&self) -> Result<(Vec, std::collections::HashMap)> { + // Connect to system D-Bus + let connection = Connection::system().await?; - if !unit_files_output.status.success() { - return Err(anyhow::anyhow!("systemctl list-unit-files command failed")); + // Get systemd manager proxy + let proxy = zbus::Proxy::new( + &connection, + "org.freedesktop.systemd1", + "/org/freedesktop/systemd1", + "org.freedesktop.systemd1.Manager", + ).await?; + + // List all units via D-Bus + let units: Vec<(String, String, String, String, String, String, zbus::zvariant::OwnedObjectPath, u32, String, zbus::zvariant::OwnedObjectPath)> = + proxy.call("ListUnits", &()).await?; + + let mut all_service_names = std::collections::HashSet::new(); + let mut service_status_cache = std::collections::HashMap::new(); + + // Parse D-Bus response for services only + for unit in units { + let (unit_name, _description, load_state, active_state, sub_state, _followed, _unit_path, _job_id, _job_type, _job_path) = unit; + + if unit_name.ends_with(".service") { + let service_name = unit_name.trim_end_matches(".service"); + all_service_names.insert(service_name.to_string()); + + service_status_cache.insert(service_name.to_string(), ServiceStatusInfo { + load_state: load_state.clone(), + active_state: active_state.clone(), + sub_state: sub_state.clone(), + }); + } } - // Second: Get runtime status of all units (with 3 second timeout) - let units_status_output = Command::new("timeout") - .args(&["3", "systemctl", "list-units", "--type=service", "--all", "--no-pager", "--plain"]) - .output()?; - - if !units_status_output.status.success() { - return Err(anyhow::anyhow!("systemctl list-units command failed")); - } - - let unit_files_str = String::from_utf8(unit_files_output.stdout)?; - let units_status_str = String::from_utf8(units_status_output.stdout)?; let mut services = Vec::new(); - let excluded_services = &self.config.excluded_services; let service_name_filters = &self.config.service_name_filters; - // Parse all service unit files - let mut all_service_names = std::collections::HashSet::new(); - for line in unit_files_str.lines() { - let fields: Vec<&str> = line.split_whitespace().collect(); - if fields.len() >= 2 && fields[0].ends_with(".service") { - let service_name = fields[0].trim_end_matches(".service"); - all_service_names.insert(service_name.to_string()); - } - } - - // Parse runtime status for all units - let mut status_cache = std::collections::HashMap::new(); - for line in units_status_str.lines() { - let fields: Vec<&str> = line.split_whitespace().collect(); - if fields.len() >= 4 && fields[0].ends_with(".service") { - let service_name = fields[0].trim_end_matches(".service"); - let load_state = fields.get(1).unwrap_or(&"unknown").to_string(); - let active_state = fields.get(2).unwrap_or(&"unknown").to_string(); - let sub_state = fields.get(3).unwrap_or(&"unknown").to_string(); - - status_cache.insert(service_name.to_string(), ServiceStatusInfo { - load_state, - active_state, - sub_state, - }); - } - } - - // For services found in unit files but not in runtime status, set default inactive status - for service_name in &all_service_names { - if !status_cache.contains_key(service_name) { - status_cache.insert(service_name.to_string(), ServiceStatusInfo { - load_state: "not-loaded".to_string(), - active_state: "inactive".to_string(), - sub_state: "dead".to_string(), - }); - } - } - // Process all discovered services and apply filters for service_name in &all_service_names { // Skip excluded services first @@ -342,7 +319,7 @@ impl SystemdCollector { } } - Ok((services, status_cache)) + Ok((services, service_status_cache)) } /// Get service status from cache (if available) or fallback to systemctl @@ -541,7 +518,7 @@ impl SystemdCollector { match state.last_collection { None => true, Some(last) => { - let cache_duration = std::time::Duration::from_secs(30); + let cache_duration = std::time::Duration::from_secs(self.config.interval_seconds); last.elapsed() > cache_duration } } @@ -781,94 +758,91 @@ impl SystemdCollector { } } - /// Get docker containers as sub-services - fn get_docker_containers(&self) -> Vec<(String, String)> { + /// Get docker containers as sub-services using bollard API + async fn get_docker_containers(&self) -> Vec<(String, String)> { let mut containers = Vec::new(); - // Check if docker is available (cm-agent user is in docker group) - // Use -a to show ALL containers (running and stopped) with 3 second timeout - let output = Command::new("timeout") - .args(&["3", "docker", "ps", "-a", "--format", "{{.Names}},{{.Status}}"]) - .output(); - - let output = match output { - Ok(out) if out.status.success() => out, - _ => return containers, // Docker not available or failed + // Connect to Docker daemon + let docker = match Docker::connect_with_local_defaults() { + Ok(d) => d, + Err(e) => { + debug!("Failed to connect to Docker daemon: {}", e); + return containers; + } }; - let output_str = match String::from_utf8(output.stdout) { - Ok(s) => s, - Err(_) => return containers, + // List all containers (running and stopped) + let list_options = Some(ListContainersOptions:: { + all: true, + ..Default::default() + }); + + let container_list = match docker.list_containers(list_options).await { + Ok(list) => list, + Err(e) => { + debug!("Failed to list Docker containers: {}", e); + return containers; + } }; - for line in output_str.lines() { - if line.trim().is_empty() { - continue; - } + for container in container_list { + // Get container name (remove leading slash if present) + let container_name = container.names + .and_then(|names| names.first().map(|n| n.trim_start_matches('/').to_string())) + .unwrap_or_else(|| container.id.clone().unwrap_or_default()); - let parts: Vec<&str> = line.split(',').collect(); - if parts.len() >= 2 { - let container_name = parts[0].trim(); - let status_str = parts[1].trim(); + // Map container state to service status + let container_status = match container.state.as_deref() { + Some("running") => "active", + Some("exited") | Some("created") => "inactive", + _ => "failed", // restarting, paused, dead, etc. + }; - let container_status = if status_str.contains("Up") { - "active" - } else if status_str.contains("Exited") || status_str.contains("Created") { - "inactive" // Stopped/created containers are inactive - } else { - "failed" // Other states (restarting, paused, dead) → failed - }; - - containers.push((format!("docker_{}", container_name), container_status.to_string())); - } + containers.push((format!("docker_{}", container_name), container_status.to_string())); } containers } - /// Get docker images as sub-services - fn get_docker_images(&self) -> Vec<(String, String, f32)> { + /// Get docker images as sub-services using bollard API + async fn get_docker_images(&self) -> Vec<(String, String, f32)> { let mut images = Vec::new(); - // Check if docker is available (cm-agent user is in docker group) with 3 second timeout - let output = Command::new("timeout") - .args(&["3", "docker", "images", "--format", "{{.Repository}}:{{.Tag}},{{.Size}}"]) - .output(); - let output = match output { - Ok(out) if out.status.success() => out, - Ok(_) => { - return images; - } - Err(_) => { + // Connect to Docker daemon + let docker = match Docker::connect_with_local_defaults() { + Ok(d) => d, + Err(e) => { + debug!("Failed to connect to Docker daemon: {}", e); return images; } }; - let output_str = match String::from_utf8(output.stdout) { - Ok(s) => s, - Err(_) => return images, + // List all images + let image_list = match docker.list_images::(None).await { + Ok(list) => list, + Err(e) => { + debug!("Failed to list Docker images: {}", e); + return images; + } }; - for line in output_str.lines() { - if line.trim().is_empty() { - continue; + for image in image_list { + // Get image name from repo tags + let image_names: Vec = image.repo_tags + .into_iter() + .filter(|tag| !tag.contains("")) + .collect(); + + if image_names.is_empty() { + continue; // Skip untagged images } - let parts: Vec<&str> = line.split(',').collect(); - if parts.len() >= 2 { - let image_name = parts[0].trim(); - let size_str = parts[1].trim(); - - // Skip : images (dangling images) - if image_name.contains("") { - continue; - } - - // Parse size to MB (sizes come as "142MB", "1.5GB", "512kB", etc.) - let size_mb = self.parse_docker_size(size_str); + // Get size in MB + let size_mb = image.size as f32 / (1024.0 * 1024.0); + for image_name in image_names { images.push(( - image_name.to_string(), + image_name, "inactive".to_string(), // Images are informational - use inactive for neutral display size_mb )); @@ -877,34 +851,6 @@ impl SystemdCollector { images } - - /// Parse Docker size string to MB - fn parse_docker_size(&self, size_str: &str) -> f32 { - let size_upper = size_str.to_uppercase(); - - // Extract numeric part and unit - let mut num_str = String::new(); - let mut unit = String::new(); - - for ch in size_upper.chars() { - if ch.is_ascii_digit() || ch == '.' { - num_str.push(ch); - } else if ch.is_alphabetic() { - unit.push(ch); - } - } - - let value: f32 = num_str.parse().unwrap_or(0.0); - - // Convert to MB - match unit.as_str() { - "KB" | "K" => value / 1024.0, - "MB" | "M" => value, - "GB" | "G" => value * 1024.0, - "TB" | "T" => value * 1024.0 * 1024.0, - _ => value, // Assume bytes if no unit - } - } } #[async_trait] diff --git a/agent/src/communication/mod.rs b/agent/src/communication/mod.rs index c364f7c..af24350 100644 --- a/agent/src/communication/mod.rs +++ b/agent/src/communication/mod.rs @@ -5,10 +5,9 @@ use zmq::{Context, Socket, SocketType}; use crate::config::ZmqConfig; -/// ZMQ communication handler for publishing metrics and receiving commands +/// ZMQ communication handler for publishing metrics pub struct ZmqHandler { publisher: Socket, - command_receiver: Socket, } impl ZmqHandler { @@ -26,20 +25,8 @@ impl ZmqHandler { publisher.set_sndhwm(1000)?; // High water mark for outbound messages publisher.set_linger(1000)?; // Linger time on close - // Create command receiver socket (PULL socket to receive commands from dashboard) - let command_receiver = context.socket(SocketType::PULL)?; - let cmd_bind_address = format!("tcp://{}:{}", config.bind_address, config.command_port); - command_receiver.bind(&cmd_bind_address)?; - - info!("ZMQ command receiver bound to {}", cmd_bind_address); - - // Set non-blocking mode for command receiver - command_receiver.set_rcvtimeo(0)?; // Non-blocking receive - command_receiver.set_linger(1000)?; - Ok(Self { publisher, - command_receiver, }) } @@ -65,36 +52,4 @@ impl ZmqHandler { Ok(()) } - /// Try to receive a command (non-blocking) - pub fn try_receive_command(&self) -> Result> { - match self.command_receiver.recv_bytes(zmq::DONTWAIT) { - Ok(bytes) => { - debug!("Received command message ({} bytes)", bytes.len()); - - let command: AgentCommand = serde_json::from_slice(&bytes) - .map_err(|e| anyhow::anyhow!("Failed to deserialize command: {}", e))?; - - debug!("Parsed command: {:?}", command); - Ok(Some(command)) - } - Err(zmq::Error::EAGAIN) => { - // No message available (non-blocking) - Ok(None) - } - Err(e) => Err(anyhow::anyhow!("ZMQ receive error: {}", e)), - } - } -} - -/// Commands that can be sent to the agent -#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] -pub enum AgentCommand { - /// Request immediate metric collection - CollectNow, - /// Change collection interval - SetInterval { seconds: u64 }, - /// Enable/disable a collector - ToggleCollector { name: String, enabled: bool }, - /// Request status/health check - Ping, } diff --git a/agent/src/config/mod.rs b/agent/src/config/mod.rs index 8593b54..0dd8f70 100644 --- a/agent/src/config/mod.rs +++ b/agent/src/config/mod.rs @@ -20,7 +20,6 @@ pub struct AgentConfig { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ZmqConfig { pub publisher_port: u16, - pub command_port: u16, pub bind_address: String, pub transmission_interval_seconds: u64, /// Heartbeat transmission interval in seconds for host connectivity detection diff --git a/agent/src/config/validation.rs b/agent/src/config/validation.rs index 2747418..410770d 100644 --- a/agent/src/config/validation.rs +++ b/agent/src/config/validation.rs @@ -7,14 +7,6 @@ pub fn validate_config(config: &AgentConfig) -> Result<()> { bail!("ZMQ publisher port cannot be 0"); } - if config.zmq.command_port == 0 { - bail!("ZMQ command port cannot be 0"); - } - - if config.zmq.publisher_port == config.zmq.command_port { - bail!("ZMQ publisher and command ports cannot be the same"); - } - if config.zmq.bind_address.is_empty() { bail!("ZMQ bind address cannot be empty"); } diff --git a/dashboard/Cargo.toml b/dashboard/Cargo.toml index da9e534..680323c 100644 --- a/dashboard/Cargo.toml +++ b/dashboard/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cm-dashboard" -version = "0.1.196" +version = "0.1.197" edition = "2021" [dependencies] diff --git a/shared/Cargo.toml b/shared/Cargo.toml index af42308..397ef3c 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cm-dashboard-shared" -version = "0.1.196" +version = "0.1.197" edition = "2021" [dependencies]