DDS-RPC Overview
DDS-RPC implements the OMG DDS-RPC specification (formal/17-04-01), providing request/reply communication on top of DDS pub/sub.
What is DDS-RPC?
DDS-RPC enables service-oriented architectures where:
- Clients send requests and wait for replies
- Servers receive requests, process them, and send replies
- Communication uses standard DDS topics with correlation
Client Server
│ │
├─► request_writer ──────► request_reader ──► handler()
│ (rq/service) │
│ ▼
└◄─ reply_reader ◄────────── reply_writer ◄──┘
(rr/service)
Feature Flag
DDS-RPC is behind a feature gate. Enable it in your Cargo.toml:
[dependencies]
hdds = { path = "../hdds/crates/hdds", features = ["rpc"] }
Quick Example
Server
use hdds::Participant;
use hdds::rpc::{ServiceServer, RemoteExceptionCode, SampleIdentity};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = Participant::builder("calculator-server").build()?;
// Define request handler
let handler = |_id: SampleIdentity, payload: &[u8]| {
// Parse request (2 i32 values)
if payload.len() < 8 {
return Err((RemoteExceptionCode::InvalidArgument, "need 8 bytes".into()));
}
let a = i32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
let b = i32::from_le_bytes([payload[4], payload[5], payload[6], payload[7]]);
// Compute result
let result = a + b;
// Return reply
Ok(result.to_le_bytes().to_vec())
};
let server = ServiceServer::new(&participant, "calculator", handler)?;
server.spin().await;
Ok(())
}
Client
use hdds::Participant;
use hdds::rpc::ServiceClient;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = Participant::builder("calculator-client").build()?;
let client = ServiceClient::new(&participant, "calculator")?;
// Build request payload
let a: i32 = 10;
let b: i32 = 20;
let mut payload = Vec::new();
payload.extend_from_slice(&a.to_le_bytes());
payload.extend_from_slice(&b.to_le_bytes());
// Call service
let reply = client.call_raw(&payload, Duration::from_secs(5)).await?;
// Parse result
let result = i32::from_le_bytes([reply[0], reply[1], reply[2], reply[3]]);
println!("10 + 20 = {}", result);
Ok(())
}
Topic Naming Convention
For a service named Calculator:
| Topic | Direction | Purpose |
|---|---|---|
rq/Calculator | Client → Server | Request messages |
rr/Calculator | Server → Client | Reply messages |
QoS Configuration
DDS-RPC uses optimized QoS for reliable request/reply:
use hdds::rpc::rpc_qos;
let qos = rpc_qos();
// Equivalent to:
// QoS::reliable().keep_all().volatile()
| QoS Policy | Value | Rationale |
|---|---|---|
| Reliability | Reliable | Ensures requests/replies are not lost |
| History | KeepAll | Maintains all samples for correlation |
| Durability | Volatile | No persistence needed for transient RPC |
Key Features
- Async/await - Non-blocking request/reply
- Correlation - Automatic request/reply matching via
SampleIdentity - Timeout handling - Configurable per-call timeouts
- Error codes - Standard
RemoteExceptionCodefor failures - Multiple clients - Many clients can call the same service
- Shutdown - Graceful shutdown with cleanup