init
This commit is contained in:
196
src/exporter.rs
Normal file
196
src/exporter.rs
Normal file
@@ -0,0 +1,196 @@
|
||||
use crate::collector::SystemMetrics;
|
||||
use crate::config::OtlpConfig;
|
||||
use anyhow::{Context, Result};
|
||||
use opentelemetry::metrics::MeterProvider;
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry_otlp::WithExportConfig;
|
||||
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
|
||||
use opentelemetry_sdk::Resource;
|
||||
|
||||
pub struct MetricsExporter {
|
||||
meter_provider: SdkMeterProvider,
|
||||
gauges: MetricInstruments,
|
||||
}
|
||||
|
||||
struct MetricInstruments {
|
||||
cpu_usage: opentelemetry::metrics::Gauge<f64>,
|
||||
memory_usage: opentelemetry::metrics::Gauge<u64>,
|
||||
memory_total: opentelemetry::metrics::Gauge<u64>,
|
||||
swap_usage: opentelemetry::metrics::Gauge<u64>,
|
||||
swap_total: opentelemetry::metrics::Gauge<u64>,
|
||||
network_rx: opentelemetry::metrics::Counter<u64>,
|
||||
network_tx: opentelemetry::metrics::Counter<u64>,
|
||||
disk_usage: opentelemetry::metrics::Gauge<u64>,
|
||||
disk_total: opentelemetry::metrics::Gauge<u64>,
|
||||
process_cpu: opentelemetry::metrics::Gauge<f64>,
|
||||
process_memory: opentelemetry::metrics::Gauge<u64>,
|
||||
temperature: opentelemetry::metrics::Gauge<f64>,
|
||||
}
|
||||
|
||||
impl MetricsExporter {
|
||||
pub async fn new(config: &OtlpConfig) -> Result<Self> {
|
||||
// Build resource with service information
|
||||
let mut resource_kvs = vec![
|
||||
KeyValue::new("service.name", config.service_name.clone()),
|
||||
KeyValue::new("service.version", config.service_version.clone()),
|
||||
];
|
||||
|
||||
// Add custom resource attributes
|
||||
for (key, value) in &config.resource_attributes {
|
||||
resource_kvs.push(KeyValue::new(key.clone(), value.clone()));
|
||||
}
|
||||
|
||||
let resource = Resource::new(resource_kvs);
|
||||
|
||||
// Build OTLP exporter using new pipeline API
|
||||
let exporter = opentelemetry_otlp::new_exporter()
|
||||
.tonic()
|
||||
.with_endpoint(&config.endpoint)
|
||||
.with_timeout(config.export_timeout())
|
||||
.build_metrics_exporter(
|
||||
Box::new(opentelemetry_sdk::metrics::reader::DefaultTemporalitySelector::default())
|
||||
)
|
||||
.context("Failed to build OTLP metrics exporter")?;
|
||||
|
||||
// Build meter provider
|
||||
let reader = PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio)
|
||||
.with_interval(config.export_interval())
|
||||
.build();
|
||||
|
||||
let meter_provider = SdkMeterProvider::builder()
|
||||
.with_reader(reader)
|
||||
.with_resource(resource)
|
||||
.build();
|
||||
|
||||
// Create meter and instruments
|
||||
let meter = meter_provider.meter("symon");
|
||||
|
||||
let gauges = MetricInstruments {
|
||||
cpu_usage: meter
|
||||
.f64_gauge("system_cpu_usage_percent")
|
||||
.with_description("CPU usage percentage per core")
|
||||
.init(),
|
||||
memory_usage: meter
|
||||
.u64_gauge("system_memory_usage_bytes")
|
||||
.with_description("Memory usage in bytes")
|
||||
.init(),
|
||||
memory_total: meter
|
||||
.u64_gauge("system_memory_total_bytes")
|
||||
.with_description("Total memory in bytes")
|
||||
.init(),
|
||||
swap_usage: meter
|
||||
.u64_gauge("system_swap_usage_bytes")
|
||||
.with_description("Swap usage in bytes")
|
||||
.init(),
|
||||
swap_total: meter
|
||||
.u64_gauge("system_swap_total_bytes")
|
||||
.with_description("Total swap in bytes")
|
||||
.init(),
|
||||
network_rx: meter
|
||||
.u64_counter("system_network_rx_bytes_total")
|
||||
.with_description("Total bytes received")
|
||||
.init(),
|
||||
network_tx: meter
|
||||
.u64_counter("system_network_tx_bytes_total")
|
||||
.with_description("Total bytes transmitted")
|
||||
.init(),
|
||||
disk_usage: meter
|
||||
.u64_gauge("system_disk_usage_bytes")
|
||||
.with_description("Disk usage in bytes")
|
||||
.init(),
|
||||
disk_total: meter
|
||||
.u64_gauge("system_disk_total_bytes")
|
||||
.with_description("Total disk space in bytes")
|
||||
.init(),
|
||||
process_cpu: meter
|
||||
.f64_gauge("system_process_cpu_usage_percent")
|
||||
.with_description("Process CPU usage percentage")
|
||||
.init(),
|
||||
process_memory: meter
|
||||
.u64_gauge("system_process_memory_usage_bytes")
|
||||
.with_description("Process memory usage in bytes")
|
||||
.init(),
|
||||
temperature: meter
|
||||
.f64_gauge("system_temperature_celsius")
|
||||
.with_description("Temperature in Celsius")
|
||||
.init(),
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
meter_provider,
|
||||
gauges,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn export(&self, metrics: &SystemMetrics) {
|
||||
// Export CPU metrics
|
||||
if let Some(cpu_metrics) = &metrics.cpu {
|
||||
for cpu in cpu_metrics {
|
||||
self.gauges.cpu_usage.record(
|
||||
cpu.usage_percent as f64,
|
||||
&[KeyValue::new("cpu_id", cpu.core_index as i64)],
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Export memory metrics
|
||||
if let Some(memory) = &metrics.memory {
|
||||
self.gauges.memory_usage.record(memory.used_bytes, &[]);
|
||||
self.gauges.memory_total.record(memory.total_bytes, &[]);
|
||||
self.gauges.swap_usage.record(memory.swap_used_bytes, &[]);
|
||||
self.gauges.swap_total.record(memory.swap_total_bytes, &[]);
|
||||
}
|
||||
|
||||
// Export network metrics
|
||||
if let Some(network_metrics) = &metrics.network {
|
||||
for net in network_metrics {
|
||||
let attrs = &[KeyValue::new("interface", net.interface_name.clone())];
|
||||
self.gauges.network_rx.add(net.rx_bytes_total, attrs);
|
||||
self.gauges.network_tx.add(net.tx_bytes_total, attrs);
|
||||
}
|
||||
}
|
||||
|
||||
// Export disk metrics
|
||||
if let Some(disk_metrics) = &metrics.disk {
|
||||
for disk in disk_metrics {
|
||||
let attrs = &[
|
||||
KeyValue::new("device", disk.device_name.clone()),
|
||||
KeyValue::new("mount", disk.mount_point.clone()),
|
||||
];
|
||||
self.gauges.disk_usage.record(disk.used_bytes, attrs);
|
||||
self.gauges.disk_total.record(disk.total_bytes, attrs);
|
||||
}
|
||||
}
|
||||
|
||||
// Export process metrics
|
||||
if let Some(process_metrics) = &metrics.processes {
|
||||
for process in process_metrics {
|
||||
let attrs = &[
|
||||
KeyValue::new("pid", process.pid as i64),
|
||||
KeyValue::new("name", process.name.clone()),
|
||||
];
|
||||
self.gauges
|
||||
.process_cpu
|
||||
.record(process.cpu_usage_percent as f64, attrs);
|
||||
self.gauges.process_memory.record(process.memory_bytes, attrs);
|
||||
}
|
||||
}
|
||||
|
||||
// Export temperature metrics
|
||||
if let Some(temp_metrics) = &metrics.temperature {
|
||||
for temp in temp_metrics {
|
||||
self.gauges.temperature.record(
|
||||
temp.temperature_celsius as f64,
|
||||
&[KeyValue::new("sensor", temp.sensor_name.clone())],
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn shutdown(self) -> Result<()> {
|
||||
self.meter_provider
|
||||
.shutdown()
|
||||
.context("Failed to shutdown meter provider")?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user