Reliable Delivery Example
Demonstrates guaranteed message delivery using HDDS reliable QoS.
Overview
This example shows:
- Configuring reliable delivery
- Handling acknowledgments
- Blocking vs non-blocking writes
- Recovery from packet loss
How Reliable Delivery Works
Writer Reader
| |
|------ DATA (seq=1) -------------->|
|<----- ACKNACK (ack=2) ------------|
| |
|------ DATA (seq=2) -----X | (lost)
| |
|------ HEARTBEAT (seq=1-2) ------->|
|<----- ACKNACK (missing=2) --------|
| |
|------ DATA (seq=2) -------------->| (retransmit)
|<----- ACKNACK (ack=3) ------------|
IDL Definition
Command.idl
module control {
enum CommandType {
START,
STOP,
CONFIGURE,
RESET
};
@topic
struct Command {
@key uint32 device_id;
uint64 sequence_number;
CommandType command_type;
string parameters;
};
};
Publisher (Reliable)
src/command_sender.rs
use hdds::prelude::*;
use control::{Command, CommandType};
use std::time::Duration;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = DomainParticipant::new(0)?;
let topic = participant.create_topic::<Command>("CommandTopic")?;
// Configure reliable delivery
let qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_secs(5),
})
.history(History::KeepAll)
.durability(Durability::TransientLocal);
let publisher = participant.create_publisher()?;
let writer = publisher.create_writer_with_qos(&topic, qos)?;
// Wait for at least one subscriber
println!("Waiting for subscriber...");
while writer.matched_subscriptions().is_empty() {
std::thread::sleep(Duration::from_millis(100));
}
println!("Subscriber connected!");
// Send commands
for seq in 1..=10 {
let cmd = Command {
device_id: 1,
sequence_number: seq,
command_type: CommandType::Configure,
parameters: format!("config_{}", seq),
};
// write() blocks until acknowledged or timeout
match writer.write(&cmd) {
Ok(()) => println!("Command {} sent and acknowledged", seq),
Err(HddsError::Timeout) => {
eprintln!("Command {} timed out!", seq);
}
Err(e) => return Err(e.into()),
}
}
println!("All commands delivered!");
Ok(())
}
Subscriber (Reliable)
src/command_receiver.rs
use hdds::prelude::*;
use control::Command;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = DomainParticipant::new(0)?;
let topic = participant.create_topic::<Command>("CommandTopic")?;
// Must match writer's reliability
let qos = DataReaderQos::default()
.reliability(Reliability::Reliable)
.history(History::KeepAll)
.durability(Durability::TransientLocal);
let subscriber = participant.create_subscriber()?;
let reader = subscriber.create_reader_with_qos(&topic, qos)?;
println!("Receiving commands...");
let mut received = std::collections::HashSet::new();
loop {
match reader.take() {
Ok(samples) => {
for sample in samples {
if received.insert(sample.sequence_number) {
println!(
"Received command {}: {:?} - {}",
sample.sequence_number,
sample.command_type,
sample.parameters
);
}
}
}
Err(HddsError::NoData) => {
std::thread::sleep(std::time::Duration::from_millis(10));
}
Err(e) => return Err(e.into()),
}
}
}
Non-Blocking Reliable Write
For applications that cannot block:
let qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::ZERO, // Non-blocking
})
.history(History::KeepLast { depth: 100 });
let writer = publisher.create_writer_with_qos(&topic, qos)?;
// Try to write, may fail if buffer full
match writer.write(&command) {
Ok(()) => println!("Queued for delivery"),
Err(HddsError::WouldBlock) => {
eprintln!("Buffer full, try again later");
}
Err(e) => return Err(e.into()),
}
Checking Delivery Status
// Get acknowledgment status
let status = writer.get_publication_matched_status()?;
println!("Matched readers: {}", status.current_count);
// Check for unacknowledged samples
let liveliness = writer.get_liveliness_lost_status()?;
if liveliness.total_count > 0 {
eprintln!("Lost connection to {} readers", liveliness.total_count);
}
Late Joiner Support
With TransientLocal durability, late subscribers receive historical data:
// Publisher: keep last 100 samples for late joiners
let writer_qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_secs(1),
})
.durability(Durability::TransientLocal)
.history(History::KeepLast { depth: 100 });
// Subscriber: request historical data
let reader_qos = DataReaderQos::default()
.reliability(Reliability::Reliable)
.durability(Durability::TransientLocal);
QoS Compatibility
| Writer | Reader | Result |
|---|---|---|
| Reliable | Reliable | Full reliability |
| Reliable | BestEffort | Works (reader ignores retransmits) |
| BestEffort | Reliable | Incompatible - no match |
| BestEffort | BestEffort | No reliability |
History Depth Tuning
// High-throughput: limit buffer
let qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_millis(100),
})
.history(History::KeepLast { depth: 1000 });
// Command queue: keep all
let qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_secs(30),
})
.history(History::KeepAll);
Performance Considerations
| Factor | Impact |
|---|---|
| Network RTT | Affects acknowledgment latency |
| History depth | Memory usage, retransmit buffer |
| max_blocking_time | Trade-off: reliability vs responsiveness |
| Packet loss rate | More retransmits, higher latency |
Troubleshooting
Write Timeout
Error: Timeout("Write blocked for 5000ms")
Causes:
- Reader not running or crashed
- Network partition
- Reader can't keep up (slow consumer)
Solutions:
- Increase
max_blocking_time - Reduce publish rate
- Use KeepLast with smaller depth
QoS Mismatch
Warning: No matching subscriptions found
Check:
- Both sides use same reliability mode
- Durability is compatible (writer >= reader)
- Topic and type names match exactly
Next Steps
- Key Instance - Per-instance reliability
- Deadline QoS - Timing requirements
- Liveliness - Failure detection