RTI Connext Interoperability Example
Working example of HDDS and RTI Connext DDS communicating.
Shared IDL
Both sides must use the same type definition:
// sensor_data.idl
module sensors {
@appendable
struct SensorData {
@key unsigned long sensor_id;
float value;
unsigned long long timestamp;
};
};
Generate Types
For RTI Connext
rtiddsgen -language C++11 -d gen sensor_data.idl
Generated files:
gen/sensor_data.cxx/gen/sensor_data.hppgen/sensor_dataPlugin.cxx/gen/sensor_dataPlugin.hpp
For HDDS
hdds-gen -l rust sensor_data.idl
Generated file:
src/sensor_data.rs
HDDS Publisher
// hdds_publisher.rs
use hdds::*;
use std::time::Duration;
mod sensor_data;
use sensor_data::SensorData;
fn main() -> Result<(), HddsError> {
// Create participant on domain 0
let participant = DomainParticipant::new(0)?;
// Create topic
let topic = participant.create_topic::<SensorData>("SensorTopic")?;
// Create publisher with QoS
let publisher = participant.create_publisher()?;
let writer_qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_millis(100),
})
.durability(Durability::TransientLocal)
.history(History::KeepLast { depth: 10 });
let writer = publisher.create_datawriter_with_qos(&topic, writer_qos)?;
println!("HDDS Publisher started. Waiting for RTI Connext subscriber...");
// Wait for subscriber
while writer.publication_matched_status()?.current_count == 0 {
std::thread::sleep(Duration::from_millis(100));
}
println!("Subscriber connected!");
// Publish samples
for i in 0..100 {
let sample = SensorData {
sensor_id: 1,
value: 20.0 + (i as f32 * 0.5),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64,
};
writer.write(&sample)?;
println!("Published: sensor_id={}, value={:.1}", sample.sensor_id, sample.value);
std::thread::sleep(Duration::from_millis(100));
}
Ok(())
}
RTI Connext Subscriber (Modern C++)
// rti_subscriber.cxx
#include <iostream>
#include <dds/dds.hpp>
#include "sensor_data.hpp"
using namespace dds::core;
using namespace dds::domain;
using namespace dds::topic;
using namespace dds::sub;
int main() {
// Create participant
DomainParticipant participant(0);
// Create topic
Topic<sensors::SensorData> topic(participant, "SensorTopic");
// Create subscriber
Subscriber subscriber(participant);
// Create reader with matching QoS
DataReaderQos rqos = subscriber.default_datareader_qos();
rqos << dds::core::policy::Reliability::Reliable(
dds::core::Duration::from_millisecs(100))
<< dds::core::policy::Durability::TransientLocal()
<< dds::core::policy::History::KeepLast(100);
DataReader<sensors::SensorData> reader(subscriber, topic, rqos);
std::cout << "RTI Connext Subscriber started. Waiting for HDDS publisher..." << std::endl;
// Read loop
while (true) {
LoanedSamples<sensors::SensorData> samples = reader.take();
for (const auto& sample : samples) {
if (sample.info().valid()) {
std::cout << "Received from HDDS: sensor_id="
<< sample.data().sensor_id()
<< ", value=" << sample.data().value()
<< std::endl;
}
}
rti::util::sleep(dds::core::Duration::from_millisecs(10));
}
return 0;
}
RTI Connext Subscriber (Traditional C++)
// rti_subscriber_traditional.cxx
#include <iostream>
#include <ndds/ndds_cpp.h>
#include "sensor_data.h"
#include "sensor_dataSupport.h"
int main() {
// Create participant
DDSDomainParticipant* participant =
DDSTheParticipantFactory->create_participant(
0, DDS_PARTICIPANT_QOS_DEFAULT, NULL, DDS_STATUS_MASK_NONE);
// Register type
sensors_SensorDataTypeSupport::register_type(
participant, sensors_SensorDataTypeSupport::get_type_name());
// Create topic
DDSTopic* topic = participant->create_topic(
"SensorTopic",
sensors_SensorDataTypeSupport::get_type_name(),
DDS_TOPIC_QOS_DEFAULT, NULL, DDS_STATUS_MASK_NONE);
// Create subscriber
DDSSubscriber* subscriber = participant->create_subscriber(
DDS_SUBSCRIBER_QOS_DEFAULT, NULL, DDS_STATUS_MASK_NONE);
// Create reader with matching QoS
DDS_DataReaderQos rqos;
subscriber->get_default_datareader_qos(rqos);
rqos.reliability.kind = DDS_RELIABLE_RELIABILITY_QOS;
rqos.reliability.max_blocking_time.sec = 0;
rqos.reliability.max_blocking_time.nanosec = 100000000;
rqos.durability.kind = DDS_TRANSIENT_LOCAL_DURABILITY_QOS;
rqos.history.kind = DDS_KEEP_LAST_HISTORY_QOS;
rqos.history.depth = 100;
DDSDataReader* reader_base = subscriber->create_datareader(
topic, rqos, NULL, DDS_STATUS_MASK_NONE);
sensors_SensorDataDataReader* reader =
sensors_SensorDataDataReader::narrow(reader_base);
std::cout << "RTI Connext Subscriber started." << std::endl;
// Read loop
sensors_SensorDataSeq data_seq;
DDS_SampleInfoSeq info_seq;
while (true) {
DDS_ReturnCode_t retcode = reader->take(
data_seq, info_seq, DDS_LENGTH_UNLIMITED,
DDS_ANY_SAMPLE_STATE, DDS_ANY_VIEW_STATE, DDS_ANY_INSTANCE_STATE);
if (retcode == DDS_RETCODE_OK) {
for (int i = 0; i < data_seq.length(); i++) {
if (info_seq[i].valid_data) {
std::cout << "Received: sensor_id=" << data_seq[i].sensor_id
<< ", value=" << data_seq[i].value << std::endl;
}
}
reader->return_loan(data_seq, info_seq);
}
NDDSUtility::sleep(DDS_Duration_t{0, 10000000});
}
return 0;
}
HDDS Subscriber
// hdds_subscriber.rs
use hdds::*;
use std::time::Duration;
mod sensor_data;
use sensor_data::SensorData;
fn main() -> Result<(), HddsError> {
let participant = DomainParticipant::new(0)?;
let topic = participant.create_topic::<SensorData>("SensorTopic")?;
let subscriber = participant.create_subscriber()?;
let reader_qos = DataReaderQos::default()
.reliability(Reliability::Reliable)
.durability(Durability::TransientLocal)
.history(History::KeepLast { depth: 100 });
let reader = subscriber.create_datareader_with_qos(&topic, reader_qos)?;
println!("HDDS Subscriber started. Waiting for RTI Connext publisher...");
loop {
match reader.take() {
Ok(samples) => {
for (sample, info) in samples {
if info.valid_data {
println!("Received from RTI: sensor_id={}, value={:.1}",
sample.sensor_id, sample.value);
}
}
}
Err(HddsError::NoData) => {}
Err(e) => eprintln!("Error: {:?}", e),
}
std::thread::sleep(Duration::from_millis(10));
}
}
RTI Connext Publisher (Modern C++)
// rti_publisher.cxx
#include <iostream>
#include <chrono>
#include <dds/dds.hpp>
#include "sensor_data.hpp"
using namespace dds::core;
using namespace dds::domain;
using namespace dds::topic;
using namespace dds::pub;
int main() {
// Create participant
DomainParticipant participant(0);
// Create topic
Topic<sensors::SensorData> topic(participant, "SensorTopic");
// Create publisher
Publisher publisher(participant);
// Create writer with QoS
DataWriterQos wqos = publisher.default_datawriter_qos();
wqos << dds::core::policy::Reliability::Reliable(
dds::core::Duration::from_millisecs(100))
<< dds::core::policy::Durability::TransientLocal()
<< dds::core::policy::History::KeepLast(10);
DataWriter<sensors::SensorData> writer(publisher, topic, wqos);
std::cout << "RTI Connext Publisher started. Waiting for HDDS subscriber..." << std::endl;
// Wait for match
while (writer.publication_matched_status().current_count() == 0) {
rti::util::sleep(dds::core::Duration::from_millisecs(100));
}
std::cout << "Subscriber connected!" << std::endl;
// Publish
sensors::SensorData sample;
sample.sensor_id(2);
for (int i = 0; i < 100; i++) {
sample.value(25.0f + static_cast<float>(i % 20));
auto now = std::chrono::system_clock::now().time_since_epoch();
sample.timestamp(std::chrono::duration_cast<std::chrono::nanoseconds>(now).count());
writer.write(sample);
std::cout << "Published: sensor_id=" << sample.sensor_id()
<< ", value=" << sample.value() << std::endl;
rti::util::sleep(dds::core::Duration::from_millisecs(100));
}
return 0;
}
Build Instructions
RTI Connext (CMake)
# CMakeLists.txt
cmake_minimum_required(VERSION 3.16)
project(rti_interop)
# RTI Connext
set(CONNEXTDDS_DIR $ENV{NDDSHOME})
list(APPEND CMAKE_MODULE_PATH "${CONNEXTDDS_DIR}/resource/cmake")
find_package(RTIConnextDDS REQUIRED)
# Generate types
connextdds_rtiddsgen_run(
IDL_FILE sensor_data.idl
OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/gen
LANG C++11
)
add_executable(rti_publisher rti_publisher.cxx ${GENERATED_SRC})
target_link_libraries(rti_publisher RTIConnextDDS::cpp2_api)
target_include_directories(rti_publisher PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/gen)
add_executable(rti_subscriber rti_subscriber.cxx ${GENERATED_SRC})
target_link_libraries(rti_subscriber RTIConnextDDS::cpp2_api)
target_include_directories(rti_subscriber PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/gen)
source $NDDSHOME/resource/scripts/rtisetenv_x64Linux4gcc7.3.0.bash
mkdir build && cd build
cmake ..
make
HDDS (Cargo)
# Cargo.toml
[package]
name = "hdds_interop"
version = "0.1.0"
edition = "2021"
[dependencies]
hdds = "1.0"
[build-dependencies]
hdds-gen = "1.0"
// build.rs
fn main() {
hdds_gen::compile(&["sensor_data.idl"]).unwrap();
}
cargo build --release
Configuration
RTI Connext XML Profile
Create USER_QOS_PROFILES.xml:
<?xml version="1.0"?>
<dds xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="http://community.rti.com/schema/current/rti_dds_qos_profiles.xsd">
<qos_library name="InteropLibrary">
<qos_profile name="InteropProfile" is_default_qos="true">
<!-- Standard RTPS for interop -->
<participant_qos>
<transport_builtin>
<mask>UDPv4</mask>
</transport_builtin>
<discovery>
<initial_peers>
<element>builtin.udpv4://239.255.0.1</element>
</initial_peers>
</discovery>
</participant_qos>
<datawriter_qos>
<reliability>
<kind>RELIABLE_RELIABILITY_QOS</kind>
</reliability>
<durability>
<kind>TRANSIENT_LOCAL_DURABILITY_QOS</kind>
</durability>
<history>
<kind>KEEP_LAST_HISTORY_QOS</kind>
<depth>10</depth>
</history>
</datawriter_qos>
<datareader_qos>
<reliability>
<kind>RELIABLE_RELIABILITY_QOS</kind>
</reliability>
<durability>
<kind>TRANSIENT_LOCAL_DURABILITY_QOS</kind>
</durability>
<history>
<kind>KEEP_LAST_HISTORY_QOS</kind>
<depth>100</depth>
</history>
</datareader_qos>
</qos_profile>
</qos_library>
</dds>
Environment Setup
# RTI Connext license
export RTI_LICENSE_FILE=/path/to/rti_license.dat
# RTI environment
source $NDDSHOME/resource/scripts/rtisetenv_x64Linux4gcc7.3.0.bash
# QoS profile
export NDDS_QOS_PROFILES=USER_QOS_PROFILES.xml
Running the Example
Terminal 1: RTI Connext Subscriber
source $NDDSHOME/resource/scripts/rtisetenv_x64Linux4gcc7.3.0.bash
./build/rti_subscriber
Terminal 2: HDDS Publisher
./target/release/hdds_publisher
Expected output (RTI Connext):
RTI Connext Subscriber started. Waiting for HDDS publisher...
Received from HDDS: sensor_id=1, value=20
Received from HDDS: sensor_id=1, value=20.5
Received from HDDS: sensor_id=1, value=21
...
Reverse Direction
Terminal 1: HDDS Subscriber
./target/release/hdds_subscriber
Terminal 2: RTI Connext Publisher
./build/rti_publisher
Troubleshooting
No Discovery
# Enable RTI Connext discovery logging
export NDDS_DISCOVERY_PEERS=builtin.udpv4://239.255.0.1
# Check RTI Admin Console
$NDDSHOME/bin/rtiadminconsole
License Issues
# Verify license
$NDDSHOME/bin/rtilmutil -check
# Set license path
export RTI_LICENSE_FILE=/path/to/rti_license.dat
Type Mismatch
Ensure type compatibility:
# Regenerate with type consistency
rtiddsgen -language C++11 -typeCheckingOnMatch sensor_data.idl
RTI Connext XML for type consistency:
<type_consistency>
<kind>ALLOW_TYPE_COERCION</kind>
<ignore_sequence_bounds>true</ignore_sequence_bounds>
<ignore_string_bounds>true</ignore_string_bounds>
</type_consistency>
Wire Protocol Issues
Ensure standard RTPS wire protocol:
<participant_qos>
<wire_protocol>
<rtps_auto_id_kind>RTPS_AUTO_ID_FROM_UUID</rtps_auto_id_kind>
</wire_protocol>
</participant_qos>
RTI Admin Console
Use RTI Admin Console for debugging:
$NDDSHOME/bin/rtiadminconsole
Features:
- View discovered participants
- Monitor topics and endpoints
- Inspect QoS policies
- Capture data samples
Performance Tuning
Shared Memory (Same Host)
RTI Connext shared memory is not compatible with HDDS. For same-host:
<participant_qos>
<transport_builtin>
<!-- Use UDP loopback for cross-vendor -->
<mask>UDPv4</mask>
</transport_builtin>
</participant_qos>
Large Data
For large samples:
<datawriter_qos>
<publish_mode>
<kind>ASYNCHRONOUS_PUBLISH_MODE_QOS</kind>
</publish_mode>
</datawriter_qos>
Next Steps
- Setup - Configuration details
- QoS Mapping - QoS translation
- FastDDS Example - FastDDS interop