Skip to main content

Reliability Internals

RTPS reliable delivery protocol implementation.

Overview

When using QoS::reliable(), HDDS ensures all published samples are delivered to all matched readers, even in the presence of packet loss. This module implements:

  • HeartbeatTx/Rx - Writer announces available samples, reader detects gaps
  • NackScheduler - Reader schedules NACK messages with exponential backoff
  • GapTracker - Reader tracks missing sequence numbers
  • HistoryCache - Writer stores samples for retransmission
  • ReliableMetrics - Observability counters (lock-free atomics)

Protocol Flow

Writer                                    Reader
| |
|--- DATA (seq=1) ------------------------>|
|--- DATA (seq=2) ----------X (lost) |
|--- DATA (seq=3) ------------------------>|
| |
|--- HEARTBEAT (first=1, last=3) -------->|
| | (detects gap: seq=2 missing)
|<-- ACKNACK (missing: [2]) --------------|
| |
|--- DATA (seq=2) [retransmit] ---------->|
| | (gap filled!)

Writer-Side Components

HeartbeatTx

Periodic heartbeat transmission with configurable jitter:

use hdds::reliability::{HeartbeatTx, DEFAULT_PERIOD_MS, DEFAULT_JITTER_PCT};

// Default: 100ms period, 10% jitter
let mut hb_tx = HeartbeatTx::new();

// Custom: 50ms period, 5% jitter
let mut hb_tx = HeartbeatTx::with_period_ms(50, 5);

// Check if heartbeat is due
if Instant::now() >= hb_tx.next_deadline() {
// Build and send heartbeat
let hb = hb_tx.build_heartbeat(first_seq, last_seq);
// send(hb)...
}

// Heartbeat counter (monotonic)
println!("Heartbeats sent: {}", hb_tx.count());

GapTx

Writer sends GAP messages when requested samples are no longer available:

use hdds::reliability::{GapTx, RtpsRange};

let mut gap_tx = GapTx::new();

// Build GAP for contiguous range
let range = RtpsRange::new(5, 10); // [5..10)
let gaps = gap_tx.build_gap(range);

// Build GAP from explicit sequence list
let missing = vec![3, 5, 7, 9];
let gaps = gap_tx.build_gap_from_sequences(&missing);

// Statistics
println!("GAP messages sent: {}", gap_tx.gap_count());
println!("Total sequences marked lost: {}", gap_tx.total_lost());

HistoryCache

Writer-side sample storage for retransmission:

use hdds::reliability::HistoryCache;
use hdds::qos::{History, ResourceLimits};

// Create cache from QoS limits
let limits = ResourceLimits {
max_samples: 100,
max_quota_bytes: 10_000_000,
..Default::default()
};
let cache = HistoryCache::new_with_history(slabs, &limits, History::KeepLast(100));

// Insert sample
cache.insert(seq_num, &payload)?;

// Retrieve for retransmission
if let Some(data) = cache.get(seq_num) {
// Retransmit...
}

// Cache state
println!("Cached samples: {}", cache.len());
println!("Memory usage: {} bytes", cache.quota_bytes());
println!("Oldest seq: {:?}", cache.oldest_seq());
println!("Newest seq: {:?}", cache.newest_seq());

WriterRetransmitHandler

Processes NACK messages and coordinates retransmission:

use hdds::reliability::{WriterRetransmitHandler, NackMsg};

let handler = WriterRetransmitHandler::new(&cache, &mut gap_tx, &metrics);

// Process NACK from reader
let (retransmits, gaps) = handler.on_nack(&nack_msg);

// retransmits: Vec<(seq, payload)> - samples to retransmit
// gaps: Vec<GapMsg> - samples no longer available

Reader-Side Components

HeartbeatRx

Reader processes heartbeats and detects missing samples:

use hdds::reliability::HeartbeatRx;

let mut hb_rx = HeartbeatRx::new();

// Process incoming heartbeat
if let Some(missing_ranges) = hb_rx.on_heartbeat(&heartbeat_msg, reader_last_seen) {
// Schedule NACKs for missing ranges
for range in missing_ranges {
println!("Missing: {:?}", range);
}
}

// Last heartbeat count seen
println!("Last HB count: {:?}", hb_rx.last_count());

GapTracker

Tracks missing sequence numbers with automatic range merging:

use hdds::reliability::{GapTracker, RtpsRange};

let mut tracker = GapTracker::new();

// Process received sequences
tracker.on_receive(1); // last_seen = 1
tracker.on_receive(2); // last_seen = 2, no gap
tracker.on_receive(5); // last_seen = 5, gap [3..5)

// Check pending gaps
assert_eq!(tracker.pending_gaps(), &[3..5]);

// Fill gap (out-of-order delivery)
tracker.on_receive(3);
assert_eq!(tracker.pending_gaps(), &[4..5]);

// Mark as lost (writer sent GAP)
tracker.mark_lost(RtpsRange::new(4, 5));
assert!(tracker.pending_gaps().is_empty());

// Statistics
println!("Last seen: {}", tracker.last_seen());
println!("Total missing: {}", tracker.total_missing());

GapTracker Algorithm

on_receive(seq):
1. If seq == last_seen + 1 → contiguous, no gap
2. If seq > last_seen + 1 → gap detected: [last_seen+1..seq)
3. If seq <= last_seen → out-of-order (fill gap or duplicate)

NackScheduler

Time-windowed NACK coalescing with exponential backoff:

use hdds::reliability::NackScheduler;

// Default: 20ms coalescing window
let mut scheduler = NackScheduler::new();

// Custom window
let mut scheduler = NackScheduler::with_window_ms(50);

// Attach metrics
scheduler.set_metrics(metrics.clone());

// Process received sequence (detects gaps)
scheduler.on_receive(seq);

// Try to flush pending NACKs
if let Some(gaps) = scheduler.try_flush() {
// Send NACK for gaps
send_nack(&gaps);
scheduler.on_nack_sent();
}

// Mark retransmit received
scheduler.on_data_received(seq);

// Mark sequences as lost (writer sent GAP)
scheduler.mark_lost(range);

// State
println!("Pending gaps: {:?}", scheduler.pending_gaps());
println!("Retry count: {}", scheduler.retry_count());

NACK Scheduling Parameters

ParameterDefaultDescription
window20msCoalescing window (batch multiple gaps)
initial_backoff50msInitial retry interval
max_retries5Max NACK retries before giving up

Exponential Backoff

Retry 1: wait 50ms
Retry 2: wait 100ms
Retry 3: wait 200ms
Retry 4: wait 400ms
Retry 5: wait 800ms (then give up)

GapRx

Reader processes GAP messages from writer:

use hdds::reliability::GapRx;

let mut gap_rx = GapRx::new();

// Process GAP message
let lost_ranges = gap_rx.on_gap(&gap_msg);
for range in lost_ranges {
println!("Sequences lost: {:?}", range);
}

// Statistics
println!("GAP messages received: {}", gap_rx.gap_count());
println!("Total sequences lost: {}", gap_rx.total_lost());

Metrics

ReliableMetrics

Lock-free atomic counters for observability:

use hdds::reliability::ReliableMetrics;

let metrics = ReliableMetrics::new();

// Record events
metrics.record_gap(3); // Gap of 3 sequences detected
metrics.increment_out_of_order(1); // Out-of-order packet
metrics.increment_nacks_sent(1); // NACK sent
metrics.increment_heartbeats_sent(1); // Heartbeat sent
metrics.increment_retransmit_sent(1); // Retransmit sent (writer)
metrics.increment_retransmit_received(1); // Retransmit received (reader)

// Read counters
println!("Gaps detected: {}", metrics.gaps_detected());
println!("Max gap size: {}", metrics.max_gap_size());
println!("Out-of-order: {}", metrics.out_of_order());

// Export for telemetry
let frame = metrics.snapshot(timestamp_ns);

Metric Tags

TagIDDescription
TAG_GAPS_DETECTED100Total gaps detected
TAG_MAX_GAP_SIZE101Maximum gap size observed
TAG_OUT_OF_ORDER102Out-of-order packets
TAG_RETRANSMIT_SENT103Retransmissions sent (writer)
TAG_RETRANSMIT_RECEIVED104Retransmissions received (reader)
TAG_NACKS_SENT105NACKs sent
TAG_HEARTBEATS_SENT106Heartbeats sent

Message Types

HeartbeatMsg

Writer announces available sample range:

pub struct HeartbeatMsg {
pub reader_id: EntityId,
pub writer_id: EntityId,
pub first_seq: u64, // Oldest available sequence
pub last_seq: u64, // Newest available sequence
pub count: u32, // Monotonic counter
}

NackMsg

Reader requests missing samples:

pub struct NackMsg {
pub reader_id: EntityId,
pub writer_id: EntityId,
pub ranges: Vec<RtpsRange>, // Missing sequence ranges
pub count: u32, // Monotonic counter
}

GapMsg

Writer declares samples as unavailable:

pub struct GapMsg {
pub reader_id: EntityId,
pub writer_id: EntityId,
pub gap_start: u64, // First unavailable sequence
pub gap_list: SequenceNumberSet, // Additional unavailable sequences
}

Configuration

QoS Settings

use hdds::QoS;

// Reliable with history
let qos = QoS::reliable()
.history_keep_last(100); // Keep 100 samples for retransmission

// Best-effort (no reliability protocol)
let qos = QoS::best_effort();

ResourceLimits

use hdds::qos::ResourceLimits;

let limits = ResourceLimits {
max_samples: 1000, // Max samples in cache
max_instances: 1, // For non-keyed types
max_samples_per_instance: 1000,
max_quota_bytes: 100_000_000, // 100 MB memory limit
};

Performance

OperationTimeNotes
metrics.record_gap()< 10 nsAtomic fetch_add + compare-exchange
tracker.on_receive()< 100 nsVec push + optional merge
cache.insert()< 500 nsSlab allocation + mutex
cache.get()< 200 nsMutex + linear search

Capacity Limits

ComponentLimitBehavior
GapTracker100 rangesOldest dropped (FIFO)
HistoryCacheConfigurableFIFO eviction (KeepLast) or reject (KeepAll)
NackScheduler5 retriesGive up after 5 attempts