Skip to main content

Multi-Topic Example

Demonstrates handling multiple topics with different types in a single application.

Overview

Real-world DDS applications typically work with multiple topics:

  • Sensor data (high frequency, best effort)
  • Commands (low frequency, reliable)
  • Status updates (periodic, transient local)
  • Alarms (sporadic, reliable + durable)

IDL Definitions

RobotTypes.idl
module robot {
// High-frequency sensor data
@topic
struct SensorData {
@key uint32 sensor_id;
uint64 timestamp;
float values[8];
};

// Command interface
enum CommandType { MOVE, STOP, HOME, CALIBRATE };

@topic
struct Command {
@key uint32 robot_id;
uint64 sequence;
CommandType cmd;
float parameters[4];
};

// Status feedback
enum RobotState { IDLE, BUSY, ERROR, MAINTENANCE };

@topic
struct Status {
@key uint32 robot_id;
RobotState state;
float position[3];
float battery;
};

// Alarm notifications
enum AlarmSeverity { INFO, WARNING, CRITICAL };

@topic
struct Alarm {
@key uint32 alarm_id;
uint32 robot_id;
AlarmSeverity severity;
string<256> message;
uint64 timestamp;
};
};

Multi-Topic Publisher

src/robot_node.rs
use hdds::prelude::*;
use robot::*;
use std::time::Duration;

struct RobotNode {
participant: DomainParticipant,
sensor_writer: DataWriter<SensorData>,
status_writer: DataWriter<Status>,
alarm_writer: DataWriter<Alarm>,
command_reader: DataReader<Command>,
}

impl RobotNode {
fn new(domain_id: u32, robot_id: u32) -> Result<Self, HddsError> {
let participant = DomainParticipant::new(domain_id)?;
let publisher = participant.create_publisher()?;
let subscriber = participant.create_subscriber()?;

// Sensor topic: high frequency, best effort
let sensor_topic = participant.create_topic::<SensorData>("SensorData")?;
let sensor_qos = DataWriterQos::default()
.reliability(Reliability::BestEffort)
.history(History::KeepLast { depth: 1 });
let sensor_writer = publisher.create_writer_with_qos(&sensor_topic, sensor_qos)?;

// Status topic: periodic, transient local for late joiners
let status_topic = participant.create_topic::<Status>("RobotStatus")?;
let status_qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_millis(100),
})
.durability(Durability::TransientLocal)
.history(History::KeepLast { depth: 1 });
let status_writer = publisher.create_writer_with_qos(&status_topic, status_qos)?;

// Alarm topic: reliable + durable for important events
let alarm_topic = participant.create_topic::<Alarm>("Alarms")?;
let alarm_qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_secs(5),
})
.durability(Durability::TransientLocal)
.history(History::KeepAll);
let alarm_writer = publisher.create_writer_with_qos(&alarm_topic, alarm_qos)?;

// Command topic: reliable subscriber
let command_topic = participant.create_topic::<Command>("Commands")?;
let command_qos = DataReaderQos::default()
.reliability(Reliability::Reliable)
.history(History::KeepAll);
let command_reader = subscriber.create_reader_with_qos(&command_topic, command_qos)?;

Ok(Self {
participant,
sensor_writer,
status_writer,
alarm_writer,
command_reader,
})
}

fn run(&mut self, robot_id: u32) -> Result<(), HddsError> {
let mut tick = 0u64;

loop {
// Publish sensor data at 100 Hz
self.publish_sensors(robot_id, tick)?;

// Publish status at 10 Hz
if tick % 10 == 0 {
self.publish_status(robot_id)?;
}

// Check for commands
self.process_commands(robot_id)?;

tick += 1;
std::thread::sleep(Duration::from_millis(10));
}
}

fn publish_sensors(&self, sensor_id: u32, tick: u64) -> Result<(), HddsError> {
let data = SensorData {
sensor_id,
timestamp: tick,
values: [0.0; 8], // Actual sensor readings
};
self.sensor_writer.write(&data)
}

fn publish_status(&self, robot_id: u32) -> Result<(), HddsError> {
let status = Status {
robot_id,
state: RobotState::Idle,
position: [0.0, 0.0, 0.0],
battery: 85.0,
};
self.status_writer.write(&status)
}

fn publish_alarm(&self, robot_id: u32, msg: &str) -> Result<(), HddsError> {
static ALARM_ID: std::sync::atomic::AtomicU32 =
std::sync::atomic::AtomicU32::new(0);

let alarm = Alarm {
alarm_id: ALARM_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
robot_id,
severity: AlarmSeverity::Warning,
message: msg.to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64,
};
self.alarm_writer.write(&alarm)
}

fn process_commands(&mut self, robot_id: u32) -> Result<(), HddsError> {
match self.command_reader.try_take() {
Ok(commands) => {
for cmd in commands {
if cmd.robot_id == robot_id {
println!("Received command: {:?}", cmd.cmd);
// Execute command...
}
}
}
Err(HddsError::NoData) => {}
Err(e) => return Err(e),
}
Ok(())
}
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut node = RobotNode::new(0, 1)?;
node.run(1)?;
Ok(())
}

Multi-Topic Subscriber (Monitoring)

src/monitor.rs
use hdds::prelude::*;
use robot::*;
use std::sync::mpsc;
use std::thread;

enum Event {
Sensor(SensorData),
Status(Status),
Alarm(Alarm),
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = DomainParticipant::new(0)?;
let subscriber = participant.create_subscriber()?;

// Channel for unified event handling
let (tx, rx) = mpsc::channel::<Event>();

// Spawn reader threads for each topic
let tx1 = tx.clone();
let sensor_topic = participant.create_topic::<SensorData>("SensorData")?;
let sensor_reader = subscriber.create_reader(&sensor_topic)?;
thread::spawn(move || {
loop {
if let Ok(samples) = sensor_reader.take() {
for s in samples {
tx1.send(Event::Sensor(s)).ok();
}
}
thread::sleep(std::time::Duration::from_millis(1));
}
});

let tx2 = tx.clone();
let status_topic = participant.create_topic::<Status>("RobotStatus")?;
let status_reader = subscriber.create_reader(&status_topic)?;
thread::spawn(move || {
loop {
if let Ok(samples) = status_reader.take() {
for s in samples {
tx2.send(Event::Status(s)).ok();
}
}
thread::sleep(std::time::Duration::from_millis(10));
}
});

let tx3 = tx.clone();
let alarm_topic = participant.create_topic::<Alarm>("Alarms")?;
let alarm_qos = DataReaderQos::default()
.reliability(Reliability::Reliable)
.durability(Durability::TransientLocal);
let alarm_reader = subscriber.create_reader_with_qos(&alarm_topic, alarm_qos)?;
thread::spawn(move || {
loop {
if let Ok(samples) = alarm_reader.take() {
for s in samples {
tx3.send(Event::Alarm(s)).ok();
}
}
thread::sleep(std::time::Duration::from_millis(10));
}
});

// Unified event loop
println!("Monitoring all topics...");
for event in rx {
match event {
Event::Sensor(s) => {
// High volume, only log occasionally
}
Event::Status(s) => {
println!("Robot {} status: {:?}, battery: {:.1}%",
s.robot_id, s.state, s.battery);
}
Event::Alarm(a) => {
println!("[{:?}] Robot {}: {}",
a.severity, a.robot_id, a.message);
}
}
}

Ok(())
}

Using Waitsets

For efficient multi-topic waiting:

use hdds::prelude::*;

let waitset = WaitSet::new()?;

// Attach read conditions
waitset.attach(sensor_reader.read_condition(SampleState::NotRead)?)?;
waitset.attach(status_reader.read_condition(SampleState::NotRead)?)?;
waitset.attach(alarm_reader.read_condition(SampleState::NotRead)?)?;

loop {
// Wait for any topic to have data
let triggered = waitset.wait(Duration::from_secs(1))?;

for condition in triggered {
if condition == sensor_condition {
// Process sensor data
} else if condition == status_condition {
// Process status
} else if condition == alarm_condition {
// Process alarms
}
}
}

Topic Organization Patterns

By Function

/sensors/lidar
/sensors/camera
/sensors/imu
/control/commands
/control/feedback
/diagnostics/health
/diagnostics/alarms

By Robot/Device

/robot_001/sensors
/robot_001/commands
/robot_002/sensors
/robot_002/commands

Using Partitions

// Separate traffic with partitions
let pub_qos = PublisherQos::default()
.partition(Partition::new(vec!["zone_a", "sensors"]));

let sub_qos = SubscriberQos::default()
.partition(Partition::new(vec!["zone_*"])); // Wildcard match

QoS Profile Summary

Topic TypeReliabilityDurabilityHistory
Sensor streamBestEffortVolatileKeepLast(1)
CommandsReliableVolatileKeepAll
StatusReliableTransientLocalKeepLast(1)
AlarmsReliableTransientLocalKeepAll
ConfigurationReliablePersistentKeepLast(1)

Next Steps