Telemetry
Real-time metrics collection and export for HDDS monitoring.
Overview
HDDS includes a telemetry system for observability:
- Thread-safe metrics - Atomic counters, latency histograms
- HDMX binary format - Compact wire protocol for streaming
- Live TCP streaming - Push metrics to monitoring tools at 10 Hz
- Admin API integration - Metrics snapshots via Admin API
Architecture
┌─────────────────────────────────────────────────────────────┐
│ HDDS Application │
│ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ MetricsCollector │ │
│ │ ┌─────────────────┐ ┌──────────────────────────────┐ │ │
│ │ │ Atomic Counters │ │ Latency Histogram │ │ │
│ │ │ - messages_sent │ │ - ring buffer (10k samples) │ │ │
│ │ │ - messages_recv │ │ - p50, p99, p999 percentiles │ │ │
│ │ │ - bytes_sent │ └──────────────────────────────┘ │ │
│ │ │ - would_block │ │ │
│ │ └─────────────────┘ │ │
│ └────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Exporter │ │
│ │ TCP Server (port 4242) ──► HDMX binary frames @ 10 Hz │ │
│ └────────────────────────────────────────────────────────┘ │
│ │ │
└──────────────────────────────┼───────────────────────────────┘
▼
┌───────────────────┐
│ HDDS Viewer │
│ (monitoring) │
└───────────────────┘
Quick Start
use hdds::telemetry::{init_metrics, init_exporter, get_metrics};
fn main() {
// Initialize global metrics collector
let metrics = init_metrics();
// Start TCP exporter (optional) for live streaming
let exporter = init_exporter("127.0.0.1", 4242).ok();
// Application loop...
loop {
// Record metrics during DDS operations
metrics.increment_sent(1);
metrics.increment_bytes_sent(1024);
// Record latency samples
let start_ns = get_timestamp_ns();
// ... operation ...
let end_ns = get_timestamp_ns();
metrics.add_latency_sample(start_ns, end_ns);
}
}
MetricsCollector API
Initialization
use hdds::telemetry::{init_metrics, get_metrics, get_metrics_opt, MetricsCollector};
// Initialize global metrics (creates singleton)
let metrics = init_metrics();
// Get existing collector (creates if not initialized)
let metrics = get_metrics();
// Get existing collector (returns None if not initialized)
if let Some(metrics) = get_metrics_opt() {
// Use metrics
}
// Custom capacity (default: 10,000 latency samples)
let metrics = MetricsCollector::with_capacity(50_000);
Counter Methods
// Message counters
metrics.increment_sent(1); // Messages sent
metrics.increment_received(1); // Messages received
metrics.increment_dropped(1); // Messages dropped
// Byte counter
metrics.increment_bytes_sent(1024); // Bytes sent
// Error counters
metrics.increment_would_block(1); // EAGAIN/EWOULDBLOCK
metrics.increment_merge_full(1); // Merge queue full
metrics.increment_cache_insert_errors(1); // HistoryCache errors
metrics.increment_transport_errors(1); // Transport errors
// Read counters
let sent = metrics.messages_sent();
let dropped = metrics.messages_dropped();
Latency Histogram
// Add latency sample (nanoseconds)
let start_ns = std::time::Instant::now();
// ... operation ...
let elapsed_ns = start_ns.elapsed().as_nanos() as u64;
metrics.add_latency_sample(0, elapsed_ns);
// Or with timestamps
metrics.add_latency_sample(start_timestamp_ns, end_timestamp_ns);
// Check sample count
let count = metrics.latency_sample_count();
Snapshots
use hdds::telemetry::Frame;
// Get current metrics snapshot
let frame: Frame = metrics.snapshot();
// Frame contains timestamp and fields
println!("Timestamp: {} ns", frame.ts_ns);
for field in &frame.fields {
println!("Tag: {}, Value: {}", field.tag, field.value_u64);
}
Metric Tags
| Tag | Name | Type | Description |
|---|---|---|---|
| 1 | PARTICIPANT_ID | u64 | Participant identifier |
| 10 | MESSAGES_SENT | u64 | Total messages sent |
| 11 | MESSAGES_RECEIVED | u64 | Total messages received |
| 12 | MESSAGES_DROPPED | u64 | Total messages dropped |
| 13 | BYTES_SENT | u64 | Total bytes sent |
| 20 | LATENCY_P50 | u64 | 50th percentile latency (ns) |
| 21 | LATENCY_P99 | u64 | 99th percentile latency (ns) |
| 22 | LATENCY_P999 | u64 | 99.9th percentile latency (ns) |
| 40 | MERGE_FULL_COUNT | u64 | Merge queue full events |
| 41 | WOULD_BLOCK_COUNT | u64 | EAGAIN/EWOULDBLOCK events |
| 42 | CACHE_INSERT_ERRORS | u64 | HistoryCache insert errors |
| 43 | TRANSPORT_ERRORS | u64 | Transport errors |
Live Streaming with Exporter
Starting the Exporter
use hdds::telemetry::{init_exporter, get_exporter};
// Start TCP server on port 4242
let exporter = init_exporter("127.0.0.1", 4242)?;
// Or bind to all interfaces
let exporter = init_exporter("0.0.0.0", 4242)?;
// Get existing exporter
if let Some(exporter) = get_exporter() {
// Push frames manually
let frame = metrics.snapshot();
exporter.push(&frame);
}
Push Metrics
use std::time::Duration;
use std::thread;
// Spawn metrics push thread (10 Hz)
let metrics = init_metrics();
let exporter = init_exporter("127.0.0.1", 4242)?;
thread::spawn(move || {
loop {
let frame = metrics.snapshot();
exporter.push(&frame);
thread::sleep(Duration::from_millis(100));
}
});
Shutdown
// Graceful shutdown
exporter.shutdown();
HDMX Binary Format
The HDMX (HDDS Metrics eXchange) format is a compact binary protocol:
Header (16 bytes)
| Offset | Size | Field | Description |
|---|---|---|---|
| 0 | 4 | Magic | 0x48444D58 ("HDMX" LE) |
| 4 | 4 | Version | 0x00000100 (v1.0.0) |
| 8 | 4 | Frame Length | Total frame size (bytes) |
| 12 | 4 | Field Count | Number of fields |
Field (4 + N bytes)
| Offset | Size | Field | Description |
|---|---|---|---|
| 0 | 2 | Tag | Field identifier |
| 2 | 1 | DType | Data type (0=u64, 1=i64, 2=f64, 3=u32, 4=bytes) |
| 3 | 1 | Length | Value length (4 or 8 bytes) |
| 4 | N | Value | Field value (little-endian) |
Encoding/Decoding
use hdds::telemetry::{encode_frame, decode_frame, Frame, MAGIC, VERSION};
// Encode frame to bytes
let frame = metrics.snapshot();
let bytes: Vec<u8> = encode_frame(&frame)?;
// Decode bytes to frame
let decoded: Frame = decode_frame(&bytes)?;
// Check magic and version
assert_eq!(u32::from_le_bytes(bytes[0..4].try_into().unwrap()), MAGIC);
Performance
| Operation | Target Latency |
|---|---|
| Counter increment | < 5 ns |
| Latency sample | < 500 ns |
| Snapshot | < 1 us |
| Frame encode | < 5 us |
| TCP push | < 10 us |
Thread Safety
- Counters: Lock-free atomics (Relaxed ordering)
- Latency histogram: Mutex-protected ring buffer
- Exporter clients: Mutex-protected vector
Disabling Telemetry
Via environment variable:
# Disable telemetry export
export HDDS_EXPORTER_DISABLE=1
./my_dds_app
Or skip initialization:
// Simply don't call init_exporter()
let metrics = init_metrics(); // Metrics still collected locally
// No exporter = no TCP streaming
Integration with HDDS Viewer
HDDS Viewer connects to the exporter port for live monitoring:
# Start your DDS application with telemetry
export HDDS_TELEMETRY_PORT=4242
./my_dds_app
# Connect with HDDS Viewer
hdds-viewer --connect 127.0.0.1:4242
Example: Complete Instrumentation
use hdds::telemetry::{init_metrics, init_exporter};
use hdds::{Participant, QoS};
use std::time::{Duration, Instant};
use std::thread;
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize telemetry
let metrics = init_metrics();
let _exporter = init_exporter("0.0.0.0", 4242)?;
// Create DDS participant
let participant = Participant::builder("instrumented-app").build()?;
let topic = participant.topic::<SensorData>("sensor_data")?;
let writer = participant.publisher()?.create_writer(&topic, QoS::reliable())?;
// Spawn metrics push thread
let metrics_clone = metrics.clone();
thread::spawn(move || {
loop {
if let Some(exporter) = hdds::telemetry::get_exporter() {
exporter.push(&metrics_clone.snapshot());
}
thread::sleep(Duration::from_millis(100));
}
});
// Application loop with instrumentation
loop {
let start = Instant::now();
// Publish data
let data = SensorData { value: 42.0 };
match writer.write(&data) {
Ok(_) => {
metrics.increment_sent(1);
metrics.increment_bytes_sent(std::mem::size_of::<SensorData>() as u64);
}
Err(_) => {
metrics.increment_dropped(1);
}
}
// Record latency
let elapsed_ns = start.elapsed().as_nanos() as u64;
metrics.add_latency_sample(0, elapsed_ns);
thread::sleep(Duration::from_millis(10));
}
}
Related
- Admin API - Debug snapshots
- Congestion Control - Rate metrics
- Environment Variables - Configuration