Reliability Internals
RTPS reliable delivery protocol implementation.
Overview
When using QoS::reliable(), HDDS ensures all published samples are delivered to all matched readers, even in the presence of packet loss. This module implements:
- HeartbeatTx/Rx - Writer announces available samples, reader detects gaps
- NackScheduler - Reader schedules NACK messages with exponential backoff
- GapTracker - Reader tracks missing sequence numbers
- HistoryCache - Writer stores samples for retransmission
- ReliableMetrics - Observability counters (lock-free atomics)
Protocol Flow
Writer Reader
| |
|--- DATA (seq=1) ------------------------>|
|--- DATA (seq=2) ----------X (lost) |
|--- DATA (seq=3) ------------------------>|
| |
|--- HEARTBEAT (first=1, last=3) -------->|
| | (detects gap: seq=2 missing)
|<-- ACKNACK (missing: [2]) --------------|
| |
|--- DATA (seq=2) [retransmit] ---------->|
| | (gap filled!)
Writer-Side Components
HeartbeatTx
Periodic heartbeat transmission with configurable jitter:
use hdds::reliability::{HeartbeatTx, DEFAULT_PERIOD_MS, DEFAULT_JITTER_PCT};
// Default: 100ms period, 10% jitter
let mut hb_tx = HeartbeatTx::new();
// Custom: 50ms period, 5% jitter
let mut hb_tx = HeartbeatTx::with_period_ms(50, 5);
// Check if heartbeat is due
if Instant::now() >= hb_tx.next_deadline() {
// Build and send heartbeat
let hb = hb_tx.build_heartbeat(first_seq, last_seq);
// send(hb)...
}
// Heartbeat counter (monotonic)
println!("Heartbeats sent: {}", hb_tx.count());
GapTx
Writer sends GAP messages when requested samples are no longer available:
use hdds::reliability::{GapTx, RtpsRange};
let mut gap_tx = GapTx::new();
// Build GAP for contiguous range
let range = RtpsRange::new(5, 10); // [5..10)
let gaps = gap_tx.build_gap(range);
// Build GAP from explicit sequence list
let missing = vec![3, 5, 7, 9];
let gaps = gap_tx.build_gap_from_sequences(&missing);
// Statistics
println!("GAP messages sent: {}", gap_tx.gap_count());
println!("Total sequences marked lost: {}", gap_tx.total_lost());
HistoryCache
Writer-side sample storage for retransmission:
use hdds::reliability::HistoryCache;
use hdds::qos::{History, ResourceLimits};
// Create cache from QoS limits
let limits = ResourceLimits {
max_samples: 100,
max_quota_bytes: 10_000_000,
..Default::default()
};
let cache = HistoryCache::new_with_history(slabs, &limits, History::KeepLast(100));
// Insert sample
cache.insert(seq_num, &payload)?;
// Retrieve for retransmission
if let Some(data) = cache.get(seq_num) {
// Retransmit...
}
// Cache state
println!("Cached samples: {}", cache.len());
println!("Memory usage: {} bytes", cache.quota_bytes());
println!("Oldest seq: {:?}", cache.oldest_seq());
println!("Newest seq: {:?}", cache.newest_seq());
WriterRetransmitHandler
Processes NACK messages and coordinates retransmission:
use hdds::reliability::{WriterRetransmitHandler, NackMsg};
let handler = WriterRetransmitHandler::new(&cache, &mut gap_tx, &metrics);
// Process NACK from reader
let (retransmits, gaps) = handler.on_nack(&nack_msg);
// retransmits: Vec<(seq, payload)> - samples to retransmit
// gaps: Vec<GapMsg> - samples no longer available
Reader-Side Components
HeartbeatRx
Reader processes heartbeats and detects missing samples:
use hdds::reliability::HeartbeatRx;
let mut hb_rx = HeartbeatRx::new();
// Process incoming heartbeat
if let Some(missing_ranges) = hb_rx.on_heartbeat(&heartbeat_msg, reader_last_seen) {
// Schedule NACKs for missing ranges
for range in missing_ranges {
println!("Missing: {:?}", range);
}
}
// Last heartbeat count seen
println!("Last HB count: {:?}", hb_rx.last_count());
GapTracker
Tracks missing sequence numbers with automatic range merging:
use hdds::reliability::{GapTracker, RtpsRange};
let mut tracker = GapTracker::new();
// Process received sequences
tracker.on_receive(1); // last_seen = 1
tracker.on_receive(2); // last_seen = 2, no gap
tracker.on_receive(5); // last_seen = 5, gap [3..5)
// Check pending gaps
assert_eq!(tracker.pending_gaps(), &[3..5]);
// Fill gap (out-of-order delivery)
tracker.on_receive(3);
assert_eq!(tracker.pending_gaps(), &[4..5]);
// Mark as lost (writer sent GAP)
tracker.mark_lost(RtpsRange::new(4, 5));
assert!(tracker.pending_gaps().is_empty());
// Statistics
println!("Last seen: {}", tracker.last_seen());
println!("Total missing: {}", tracker.total_missing());
GapTracker Algorithm
on_receive(seq):
1. If seq == last_seen + 1 → contiguous, no gap
2. If seq > last_seen + 1 → gap detected: [last_seen+1..seq)
3. If seq <= last_seen → out-of-order (fill gap or duplicate)
NackScheduler
Time-windowed NACK coalescing with exponential backoff:
use hdds::reliability::NackScheduler;
// Default: 20ms coalescing window
let mut scheduler = NackScheduler::new();
// Custom window
let mut scheduler = NackScheduler::with_window_ms(50);
// Attach metrics
scheduler.set_metrics(metrics.clone());
// Process received sequence (detects gaps)
scheduler.on_receive(seq);
// Try to flush pending NACKs
if let Some(gaps) = scheduler.try_flush() {
// Send NACK for gaps
send_nack(&gaps);
scheduler.on_nack_sent();
}
// Mark retransmit received
scheduler.on_data_received(seq);
// Mark sequences as lost (writer sent GAP)
scheduler.mark_lost(range);
// State
println!("Pending gaps: {:?}", scheduler.pending_gaps());
println!("Retry count: {}", scheduler.retry_count());
NACK Scheduling Parameters
| Parameter | Default | Description |
|---|---|---|
window | 20ms | Coalescing window (batch multiple gaps) |
initial_backoff | 50ms | Initial retry interval |
max_retries | 5 | Max NACK retries before giving up |
Exponential Backoff
Retry 1: wait 50ms
Retry 2: wait 100ms
Retry 3: wait 200ms
Retry 4: wait 400ms
Retry 5: wait 800ms (then give up)
GapRx
Reader processes GAP messages from writer:
use hdds::reliability::GapRx;
let mut gap_rx = GapRx::new();
// Process GAP message
let lost_ranges = gap_rx.on_gap(&gap_msg);
for range in lost_ranges {
println!("Sequences lost: {:?}", range);
}
// Statistics
println!("GAP messages received: {}", gap_rx.gap_count());
println!("Total sequences lost: {}", gap_rx.total_lost());
Metrics
ReliableMetrics
Lock-free atomic counters for observability:
use hdds::reliability::ReliableMetrics;
let metrics = ReliableMetrics::new();
// Record events
metrics.record_gap(3); // Gap of 3 sequences detected
metrics.increment_out_of_order(1); // Out-of-order packet
metrics.increment_nacks_sent(1); // NACK sent
metrics.increment_heartbeats_sent(1); // Heartbeat sent
metrics.increment_retransmit_sent(1); // Retransmit sent (writer)
metrics.increment_retransmit_received(1); // Retransmit received (reader)
// Read counters
println!("Gaps detected: {}", metrics.gaps_detected());
println!("Max gap size: {}", metrics.max_gap_size());
println!("Out-of-order: {}", metrics.out_of_order());
// Export for telemetry
let frame = metrics.snapshot(timestamp_ns);
Metric Tags
| Tag | ID | Description |
|---|---|---|
TAG_GAPS_DETECTED | 100 | Total gaps detected |
TAG_MAX_GAP_SIZE | 101 | Maximum gap size observed |
TAG_OUT_OF_ORDER | 102 | Out-of-order packets |
TAG_RETRANSMIT_SENT | 103 | Retransmissions sent (writer) |
TAG_RETRANSMIT_RECEIVED | 104 | Retransmissions received (reader) |
TAG_NACKS_SENT | 105 | NACKs sent |
TAG_HEARTBEATS_SENT | 106 | Heartbeats sent |
Message Types
HeartbeatMsg
Writer announces available sample range:
pub struct HeartbeatMsg {
pub reader_id: EntityId,
pub writer_id: EntityId,
pub first_seq: u64, // Oldest available sequence
pub last_seq: u64, // Newest available sequence
pub count: u32, // Monotonic counter
}
NackMsg
Reader requests missing samples:
pub struct NackMsg {
pub reader_id: EntityId,
pub writer_id: EntityId,
pub ranges: Vec<RtpsRange>, // Missing sequence ranges
pub count: u32, // Monotonic counter
}
GapMsg
Writer declares samples as unavailable:
pub struct GapMsg {
pub reader_id: EntityId,
pub writer_id: EntityId,
pub gap_start: u64, // First unavailable sequence
pub gap_list: SequenceNumberSet, // Additional unavailable sequences
}
Configuration
QoS Settings
use hdds::QoS;
// Reliable with history
let qos = QoS::reliable()
.history_keep_last(100); // Keep 100 samples for retransmission
// Best-effort (no reliability protocol)
let qos = QoS::best_effort();
ResourceLimits
use hdds::qos::ResourceLimits;
let limits = ResourceLimits {
max_samples: 1000, // Max samples in cache
max_instances: 1, // For non-keyed types
max_samples_per_instance: 1000,
max_quota_bytes: 100_000_000, // 100 MB memory limit
};
Performance
| Operation | Time | Notes |
|---|---|---|
metrics.record_gap() | < 10 ns | Atomic fetch_add + compare-exchange |
tracker.on_receive() | < 100 ns | Vec push + optional merge |
cache.insert() | < 500 ns | Slab allocation + mutex |
cache.get() | < 200 ns | Mutex + linear search |
Capacity Limits
| Component | Limit | Behavior |
|---|---|---|
| GapTracker | 100 ranges | Oldest dropped (FIFO) |
| HistoryCache | Configurable | FIFO eviction (KeepLast) or reject (KeepAll) |
| NackScheduler | 5 retries | Give up after 5 attempts |
Related
- QoS: Reliability - QoS configuration
- QoS: History - History depth
- Congestion Control - Rate limiting
- Telemetry - Metrics export