Skip to main content

Reliable Delivery Example

Demonstrates guaranteed message delivery using HDDS reliable QoS.

Overview

This example shows:

  • Configuring reliable delivery
  • Handling acknowledgments
  • Blocking vs non-blocking writes
  • Recovery from packet loss

How Reliable Delivery Works

Writer                              Reader
| |
|------ DATA (seq=1) -------------->|
|<----- ACKNACK (ack=2) ------------|
| |
|------ DATA (seq=2) -----X | (lost)
| |
|------ HEARTBEAT (seq=1-2) ------->|
|<----- ACKNACK (missing=2) --------|
| |
|------ DATA (seq=2) -------------->| (retransmit)
|<----- ACKNACK (ack=3) ------------|

IDL Definition

Command.idl
module control {
enum CommandType {
START,
STOP,
CONFIGURE,
RESET
};

@topic
struct Command {
@key uint32 device_id;
uint64 sequence_number;
CommandType command_type;
string parameters;
};
};

Publisher (Reliable)

src/command_sender.rs
use hdds::prelude::*;
use control::{Command, CommandType};
use std::time::Duration;

fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = DomainParticipant::new(0)?;
let topic = participant.create_topic::<Command>("CommandTopic")?;

// Configure reliable delivery
let qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_secs(5),
})
.history(History::KeepAll)
.durability(Durability::TransientLocal);

let publisher = participant.create_publisher()?;
let writer = publisher.create_writer_with_qos(&topic, qos)?;

// Wait for at least one subscriber
println!("Waiting for subscriber...");
while writer.matched_subscriptions().is_empty() {
std::thread::sleep(Duration::from_millis(100));
}
println!("Subscriber connected!");

// Send commands
for seq in 1..=10 {
let cmd = Command {
device_id: 1,
sequence_number: seq,
command_type: CommandType::Configure,
parameters: format!("config_{}", seq),
};

// write() blocks until acknowledged or timeout
match writer.write(&cmd) {
Ok(()) => println!("Command {} sent and acknowledged", seq),
Err(HddsError::Timeout) => {
eprintln!("Command {} timed out!", seq);
}
Err(e) => return Err(e.into()),
}
}

println!("All commands delivered!");
Ok(())
}

Subscriber (Reliable)

src/command_receiver.rs
use hdds::prelude::*;
use control::Command;

fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = DomainParticipant::new(0)?;
let topic = participant.create_topic::<Command>("CommandTopic")?;

// Must match writer's reliability
let qos = DataReaderQos::default()
.reliability(Reliability::Reliable)
.history(History::KeepAll)
.durability(Durability::TransientLocal);

let subscriber = participant.create_subscriber()?;
let reader = subscriber.create_reader_with_qos(&topic, qos)?;

println!("Receiving commands...");

let mut received = std::collections::HashSet::new();

loop {
match reader.take() {
Ok(samples) => {
for sample in samples {
if received.insert(sample.sequence_number) {
println!(
"Received command {}: {:?} - {}",
sample.sequence_number,
sample.command_type,
sample.parameters
);
}
}
}
Err(HddsError::NoData) => {
std::thread::sleep(std::time::Duration::from_millis(10));
}
Err(e) => return Err(e.into()),
}
}
}

Non-Blocking Reliable Write

For applications that cannot block:

let qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::ZERO, // Non-blocking
})
.history(History::KeepLast { depth: 100 });

let writer = publisher.create_writer_with_qos(&topic, qos)?;

// Try to write, may fail if buffer full
match writer.write(&command) {
Ok(()) => println!("Queued for delivery"),
Err(HddsError::WouldBlock) => {
eprintln!("Buffer full, try again later");
}
Err(e) => return Err(e.into()),
}

Checking Delivery Status

// Get acknowledgment status
let status = writer.get_publication_matched_status()?;
println!("Matched readers: {}", status.current_count);

// Check for unacknowledged samples
let liveliness = writer.get_liveliness_lost_status()?;
if liveliness.total_count > 0 {
eprintln!("Lost connection to {} readers", liveliness.total_count);
}

Late Joiner Support

With TransientLocal durability, late subscribers receive historical data:

// Publisher: keep last 100 samples for late joiners
let writer_qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_secs(1),
})
.durability(Durability::TransientLocal)
.history(History::KeepLast { depth: 100 });

// Subscriber: request historical data
let reader_qos = DataReaderQos::default()
.reliability(Reliability::Reliable)
.durability(Durability::TransientLocal);

QoS Compatibility

WriterReaderResult
ReliableReliableFull reliability
ReliableBestEffortWorks (reader ignores retransmits)
BestEffortReliableIncompatible - no match
BestEffortBestEffortNo reliability

History Depth Tuning

// High-throughput: limit buffer
let qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_millis(100),
})
.history(History::KeepLast { depth: 1000 });

// Command queue: keep all
let qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_secs(30),
})
.history(History::KeepAll);

Performance Considerations

FactorImpact
Network RTTAffects acknowledgment latency
History depthMemory usage, retransmit buffer
max_blocking_timeTrade-off: reliability vs responsiveness
Packet loss rateMore retransmits, higher latency

Troubleshooting

Write Timeout

Error: Timeout("Write blocked for 5000ms")

Causes:

  • Reader not running or crashed
  • Network partition
  • Reader can't keep up (slow consumer)

Solutions:

  • Increase max_blocking_time
  • Reduce publish rate
  • Use KeepLast with smaller depth

QoS Mismatch

Warning: No matching subscriptions found

Check:

  • Both sides use same reliability mode
  • Durability is compatible (writer >= reader)
  • Topic and type names match exactly

Next Steps