Skip to main content

DDS-RPC Overview

DDS-RPC implements the OMG DDS-RPC specification (formal/17-04-01), providing request/reply communication on top of DDS pub/sub.

What is DDS-RPC?

DDS-RPC enables service-oriented architectures where:

  • Clients send requests and wait for replies
  • Servers receive requests, process them, and send replies
  • Communication uses standard DDS topics with correlation
Client                          Server
│ │
├─► request_writer ──────► request_reader ──► handler()
│ (rq/service) │
│ ▼
└◄─ reply_reader ◄────────── reply_writer ◄──┘
(rr/service)

Feature Flag

DDS-RPC is behind a feature gate. Enable it in your Cargo.toml:

[dependencies]
hdds = { path = "../hdds/crates/hdds", features = ["rpc"] }

Quick Example

Server

use hdds::Participant;
use hdds::rpc::{ServiceServer, RemoteExceptionCode, SampleIdentity};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = Participant::builder("calculator-server").build()?;

// Define request handler
let handler = |_id: SampleIdentity, payload: &[u8]| {
// Parse request (2 i32 values)
if payload.len() < 8 {
return Err((RemoteExceptionCode::InvalidArgument, "need 8 bytes".into()));
}

let a = i32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
let b = i32::from_le_bytes([payload[4], payload[5], payload[6], payload[7]]);

// Compute result
let result = a + b;

// Return reply
Ok(result.to_le_bytes().to_vec())
};

let server = ServiceServer::new(&participant, "calculator", handler)?;
server.spin().await;

Ok(())
}

Client

use hdds::Participant;
use hdds::rpc::ServiceClient;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = Participant::builder("calculator-client").build()?;

let client = ServiceClient::new(&participant, "calculator")?;

// Build request payload
let a: i32 = 10;
let b: i32 = 20;
let mut payload = Vec::new();
payload.extend_from_slice(&a.to_le_bytes());
payload.extend_from_slice(&b.to_le_bytes());

// Call service
let reply = client.call_raw(&payload, Duration::from_secs(5)).await?;

// Parse result
let result = i32::from_le_bytes([reply[0], reply[1], reply[2], reply[3]]);
println!("10 + 20 = {}", result);

Ok(())
}

Topic Naming Convention

For a service named Calculator:

TopicDirectionPurpose
rq/CalculatorClient → ServerRequest messages
rr/CalculatorServer → ClientReply messages

QoS Configuration

DDS-RPC uses optimized QoS for reliable request/reply:

use hdds::rpc::rpc_qos;

let qos = rpc_qos();
// Equivalent to:
// QoS::reliable().keep_all().volatile()
QoS PolicyValueRationale
ReliabilityReliableEnsures requests/replies are not lost
HistoryKeepAllMaintains all samples for correlation
DurabilityVolatileNo persistence needed for transient RPC

Key Features

  • Async/await - Non-blocking request/reply
  • Correlation - Automatic request/reply matching via SampleIdentity
  • Timeout handling - Configurable per-call timeouts
  • Error codes - Standard RemoteExceptionCode for failures
  • Multiple clients - Many clients can call the same service
  • Shutdown - Graceful shutdown with cleanup