Multi-Topic Example
Demonstrates handling multiple topics with different types in a single application.
Overview
Real-world DDS applications typically work with multiple topics:
- Sensor data (high frequency, best effort)
- Commands (low frequency, reliable)
- Status updates (periodic, transient local)
- Alarms (sporadic, reliable + durable)
IDL Definitions
RobotTypes.idl
module robot {
// High-frequency sensor data
@topic
struct SensorData {
@key uint32 sensor_id;
uint64 timestamp;
float values[8];
};
// Command interface
enum CommandType { MOVE, STOP, HOME, CALIBRATE };
@topic
struct Command {
@key uint32 robot_id;
uint64 sequence;
CommandType cmd;
float parameters[4];
};
// Status feedback
enum RobotState { IDLE, BUSY, ERROR, MAINTENANCE };
@topic
struct Status {
@key uint32 robot_id;
RobotState state;
float position[3];
float battery;
};
// Alarm notifications
enum AlarmSeverity { INFO, WARNING, CRITICAL };
@topic
struct Alarm {
@key uint32 alarm_id;
uint32 robot_id;
AlarmSeverity severity;
string<256> message;
uint64 timestamp;
};
};
Multi-Topic Publisher
src/robot_node.rs
use hdds::prelude::*;
use robot::*;
use std::time::Duration;
struct RobotNode {
participant: DomainParticipant,
sensor_writer: DataWriter<SensorData>,
status_writer: DataWriter<Status>,
alarm_writer: DataWriter<Alarm>,
command_reader: DataReader<Command>,
}
impl RobotNode {
fn new(domain_id: u32, robot_id: u32) -> Result<Self, HddsError> {
let participant = DomainParticipant::new(domain_id)?;
let publisher = participant.create_publisher()?;
let subscriber = participant.create_subscriber()?;
// Sensor topic: high frequency, best effort
let sensor_topic = participant.create_topic::<SensorData>("SensorData")?;
let sensor_qos = DataWriterQos::default()
.reliability(Reliability::BestEffort)
.history(History::KeepLast { depth: 1 });
let sensor_writer = publisher.create_writer_with_qos(&sensor_topic, sensor_qos)?;
// Status topic: periodic, transient local for late joiners
let status_topic = participant.create_topic::<Status>("RobotStatus")?;
let status_qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_millis(100),
})
.durability(Durability::TransientLocal)
.history(History::KeepLast { depth: 1 });
let status_writer = publisher.create_writer_with_qos(&status_topic, status_qos)?;
// Alarm topic: reliable + durable for important events
let alarm_topic = participant.create_topic::<Alarm>("Alarms")?;
let alarm_qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_secs(5),
})
.durability(Durability::TransientLocal)
.history(History::KeepAll);
let alarm_writer = publisher.create_writer_with_qos(&alarm_topic, alarm_qos)?;
// Command topic: reliable subscriber
let command_topic = participant.create_topic::<Command>("Commands")?;
let command_qos = DataReaderQos::default()
.reliability(Reliability::Reliable)
.history(History::KeepAll);
let command_reader = subscriber.create_reader_with_qos(&command_topic, command_qos)?;
Ok(Self {
participant,
sensor_writer,
status_writer,
alarm_writer,
command_reader,
})
}
fn run(&mut self, robot_id: u32) -> Result<(), HddsError> {
let mut tick = 0u64;
loop {
// Publish sensor data at 100 Hz
self.publish_sensors(robot_id, tick)?;
// Publish status at 10 Hz
if tick % 10 == 0 {
self.publish_status(robot_id)?;
}
// Check for commands
self.process_commands(robot_id)?;
tick += 1;
std::thread::sleep(Duration::from_millis(10));
}
}
fn publish_sensors(&self, sensor_id: u32, tick: u64) -> Result<(), HddsError> {
let data = SensorData {
sensor_id,
timestamp: tick,
values: [0.0; 8], // Actual sensor readings
};
self.sensor_writer.write(&data)
}
fn publish_status(&self, robot_id: u32) -> Result<(), HddsError> {
let status = Status {
robot_id,
state: RobotState::Idle,
position: [0.0, 0.0, 0.0],
battery: 85.0,
};
self.status_writer.write(&status)
}
fn publish_alarm(&self, robot_id: u32, msg: &str) -> Result<(), HddsError> {
static ALARM_ID: std::sync::atomic::AtomicU32 =
std::sync::atomic::AtomicU32::new(0);
let alarm = Alarm {
alarm_id: ALARM_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
robot_id,
severity: AlarmSeverity::Warning,
message: msg.to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64,
};
self.alarm_writer.write(&alarm)
}
fn process_commands(&mut self, robot_id: u32) -> Result<(), HddsError> {
match self.command_reader.try_take() {
Ok(commands) => {
for cmd in commands {
if cmd.robot_id == robot_id {
println!("Received command: {:?}", cmd.cmd);
// Execute command...
}
}
}
Err(HddsError::NoData) => {}
Err(e) => return Err(e),
}
Ok(())
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut node = RobotNode::new(0, 1)?;
node.run(1)?;
Ok(())
}
Multi-Topic Subscriber (Monitoring)
src/monitor.rs
use hdds::prelude::*;
use robot::*;
use std::sync::mpsc;
use std::thread;
enum Event {
Sensor(SensorData),
Status(Status),
Alarm(Alarm),
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = DomainParticipant::new(0)?;
let subscriber = participant.create_subscriber()?;
// Channel for unified event handling
let (tx, rx) = mpsc::channel::<Event>();
// Spawn reader threads for each topic
let tx1 = tx.clone();
let sensor_topic = participant.create_topic::<SensorData>("SensorData")?;
let sensor_reader = subscriber.create_reader(&sensor_topic)?;
thread::spawn(move || {
loop {
if let Ok(samples) = sensor_reader.take() {
for s in samples {
tx1.send(Event::Sensor(s)).ok();
}
}
thread::sleep(std::time::Duration::from_millis(1));
}
});
let tx2 = tx.clone();
let status_topic = participant.create_topic::<Status>("RobotStatus")?;
let status_reader = subscriber.create_reader(&status_topic)?;
thread::spawn(move || {
loop {
if let Ok(samples) = status_reader.take() {
for s in samples {
tx2.send(Event::Status(s)).ok();
}
}
thread::sleep(std::time::Duration::from_millis(10));
}
});
let tx3 = tx.clone();
let alarm_topic = participant.create_topic::<Alarm>("Alarms")?;
let alarm_qos = DataReaderQos::default()
.reliability(Reliability::Reliable)
.durability(Durability::TransientLocal);
let alarm_reader = subscriber.create_reader_with_qos(&alarm_topic, alarm_qos)?;
thread::spawn(move || {
loop {
if let Ok(samples) = alarm_reader.take() {
for s in samples {
tx3.send(Event::Alarm(s)).ok();
}
}
thread::sleep(std::time::Duration::from_millis(10));
}
});
// Unified event loop
println!("Monitoring all topics...");
for event in rx {
match event {
Event::Sensor(s) => {
// High volume, only log occasionally
}
Event::Status(s) => {
println!("Robot {} status: {:?}, battery: {:.1}%",
s.robot_id, s.state, s.battery);
}
Event::Alarm(a) => {
println!("[{:?}] Robot {}: {}",
a.severity, a.robot_id, a.message);
}
}
}
Ok(())
}
Using Waitsets
For efficient multi-topic waiting:
use hdds::prelude::*;
let waitset = WaitSet::new()?;
// Attach read conditions
waitset.attach(sensor_reader.read_condition(SampleState::NotRead)?)?;
waitset.attach(status_reader.read_condition(SampleState::NotRead)?)?;
waitset.attach(alarm_reader.read_condition(SampleState::NotRead)?)?;
loop {
// Wait for any topic to have data
let triggered = waitset.wait(Duration::from_secs(1))?;
for condition in triggered {
if condition == sensor_condition {
// Process sensor data
} else if condition == status_condition {
// Process status
} else if condition == alarm_condition {
// Process alarms
}
}
}
Topic Organization Patterns
By Function
/sensors/lidar
/sensors/camera
/sensors/imu
/control/commands
/control/feedback
/diagnostics/health
/diagnostics/alarms
By Robot/Device
/robot_001/sensors
/robot_001/commands
/robot_002/sensors
/robot_002/commands
Using Partitions
// Separate traffic with partitions
let pub_qos = PublisherQos::default()
.partition(Partition::new(vec!["zone_a", "sensors"]));
let sub_qos = SubscriberQos::default()
.partition(Partition::new(vec!["zone_*"])); // Wildcard match
QoS Profile Summary
| Topic Type | Reliability | Durability | History |
|---|---|---|---|
| Sensor stream | BestEffort | Volatile | KeepLast(1) |
| Commands | Reliable | Volatile | KeepAll |
| Status | Reliable | TransientLocal | KeepLast(1) |
| Alarms | Reliable | TransientLocal | KeepAll |
| Configuration | Reliable | Persistent | KeepLast(1) |
Next Steps
- Cross-Vendor - Interoperability example
- Partitions - Topic filtering
- WaitSets - Efficient multi-topic waiting