Files
symon/src/exporter.rs
alex 2e950506b7 fix network metrics & grafana dashboard
number of top process consumer in config
2025-11-07 20:24:51 +01:00

197 lines
7.3 KiB
Rust

use crate::collector::SystemMetrics;
use crate::config::OtlpConfig;
use anyhow::{Context, Result};
use opentelemetry::metrics::MeterProvider;
use opentelemetry::KeyValue;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use opentelemetry_sdk::Resource;
pub struct MetricsExporter {
meter_provider: SdkMeterProvider,
gauges: MetricInstruments,
}
struct MetricInstruments {
cpu_usage: opentelemetry::metrics::Gauge<f64>,
memory_usage: opentelemetry::metrics::Gauge<u64>,
memory_total: opentelemetry::metrics::Gauge<u64>,
swap_usage: opentelemetry::metrics::Gauge<u64>,
swap_total: opentelemetry::metrics::Gauge<u64>,
network_rx: opentelemetry::metrics::Gauge<u64>,
network_tx: opentelemetry::metrics::Gauge<u64>,
disk_usage: opentelemetry::metrics::Gauge<u64>,
disk_total: opentelemetry::metrics::Gauge<u64>,
process_cpu: opentelemetry::metrics::Gauge<f64>,
process_memory: opentelemetry::metrics::Gauge<u64>,
temperature: opentelemetry::metrics::Gauge<f64>,
}
impl MetricsExporter {
pub async fn new(config: &OtlpConfig) -> Result<Self> {
// Build resource with service information
let mut resource_kvs = vec![
KeyValue::new("service.name", config.service_name.clone()),
KeyValue::new("service.version", config.service_version.clone()),
];
// Add custom resource attributes
for (key, value) in &config.resource_attributes {
resource_kvs.push(KeyValue::new(key.clone(), value.clone()));
}
let resource = Resource::new(resource_kvs);
// Build OTLP exporter using new pipeline API
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(&config.endpoint)
.with_timeout(config.export_timeout())
.build_metrics_exporter(
Box::new(opentelemetry_sdk::metrics::reader::DefaultTemporalitySelector::default())
)
.context("Failed to build OTLP metrics exporter")?;
// Build meter provider
let reader = PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio)
.with_interval(config.export_interval())
.build();
let meter_provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(resource)
.build();
// Create meter and instruments
let meter = meter_provider.meter("symon");
let gauges = MetricInstruments {
cpu_usage: meter
.f64_gauge("system_cpu_usage_percent")
.with_description("CPU usage percentage per core")
.init(),
memory_usage: meter
.u64_gauge("system_memory_usage_bytes")
.with_description("Memory usage in bytes")
.init(),
memory_total: meter
.u64_gauge("system_memory_total_bytes")
.with_description("Total memory in bytes")
.init(),
swap_usage: meter
.u64_gauge("system_swap_usage_bytes")
.with_description("Swap usage in bytes")
.init(),
swap_total: meter
.u64_gauge("system_swap_total_bytes")
.with_description("Total swap in bytes")
.init(),
network_rx: meter
.u64_gauge("system_network_rx_bytes_per_sec")
.with_description("Bytes received per second")
.init(),
network_tx: meter
.u64_gauge("system_network_tx_bytes_per_sec")
.with_description("Bytes transmitted per second")
.init(),
disk_usage: meter
.u64_gauge("system_disk_usage_bytes")
.with_description("Disk usage in bytes")
.init(),
disk_total: meter
.u64_gauge("system_disk_total_bytes")
.with_description("Total disk space in bytes")
.init(),
process_cpu: meter
.f64_gauge("system_process_cpu_usage_percent")
.with_description("Process CPU usage percentage")
.init(),
process_memory: meter
.u64_gauge("system_process_memory_usage_bytes")
.with_description("Process memory usage in bytes")
.init(),
temperature: meter
.f64_gauge("system_temperature_celsius")
.with_description("Temperature in Celsius")
.init(),
};
Ok(Self {
meter_provider,
gauges,
})
}
pub fn export(&self, metrics: &SystemMetrics) {
// Export CPU metrics
if let Some(cpu_metrics) = &metrics.cpu {
for cpu in cpu_metrics {
self.gauges.cpu_usage.record(
cpu.usage_percent as f64,
&[KeyValue::new("cpu_id", cpu.core_index as i64)],
);
}
}
// Export memory metrics
if let Some(memory) = &metrics.memory {
self.gauges.memory_usage.record(memory.used_bytes, &[]);
self.gauges.memory_total.record(memory.total_bytes, &[]);
self.gauges.swap_usage.record(memory.swap_used_bytes, &[]);
self.gauges.swap_total.record(memory.swap_total_bytes, &[]);
}
// Export network metrics
if let Some(network_metrics) = &metrics.network {
for net in network_metrics {
let attrs = &[KeyValue::new("interface", net.interface_name.clone())];
self.gauges.network_rx.record(net.rx_bytes_per_sec, attrs);
self.gauges.network_tx.record(net.tx_bytes_per_sec, attrs);
}
}
// Export disk metrics
if let Some(disk_metrics) = &metrics.disk {
for disk in disk_metrics {
let attrs = &[
KeyValue::new("device", disk.device_name.clone()),
KeyValue::new("mount", disk.mount_point.clone()),
];
self.gauges.disk_usage.record(disk.used_bytes, attrs);
self.gauges.disk_total.record(disk.total_bytes, attrs);
}
}
// Export process metrics
if let Some(process_metrics) = &metrics.processes {
for process in process_metrics {
let attrs = &[
KeyValue::new("pid", process.pid as i64),
KeyValue::new("name", process.name.clone()),
];
self.gauges
.process_cpu
.record(process.cpu_usage_percent as f64, attrs);
self.gauges.process_memory.record(process.memory_bytes, attrs);
}
}
// Export temperature metrics
if let Some(temp_metrics) = &metrics.temperature {
for temp in temp_metrics {
self.gauges.temperature.record(
temp.temperature_celsius as f64,
&[KeyValue::new("sensor", temp.sensor_name.clone())],
);
}
}
}
pub async fn shutdown(self) -> Result<()> {
self.meter_provider
.shutdown()
.context("Failed to shutdown meter provider")?;
Ok(())
}
}