Skip to main content

DDS-RPC Examples

Practical examples of using DDS-RPC for service-oriented communication.

Echo Service

The simplest RPC service - returns whatever it receives.

Server

use hdds::Participant;
use hdds::rpc::{ServiceServer, SampleIdentity};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = Participant::builder("echo-server").build()?;

let handler = |_id: SampleIdentity, payload: &[u8]| {
// Simply return the payload as-is
Ok(payload.to_vec())
};

let server = ServiceServer::new(&participant, "echo", handler)?;

println!("Echo server running on rq/echo, rr/echo");
server.spin().await;

Ok(())
}

Client

use hdds::Participant;
use hdds::rpc::ServiceClient;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = Participant::builder("echo-client").build()?;
let client = ServiceClient::new(&participant, "echo")?;

let message = b"Hello, RPC!";
let reply = client.call_raw(message, Duration::from_secs(5)).await?;

assert_eq!(reply, message);
println!("Echo reply: {:?}", String::from_utf8_lossy(&reply));

Ok(())
}

Calculator Service

A service with multiple operations.

Shared Types

// Shared between client and server
#[repr(u8)]
enum Operation {
Add = 0,
Sub = 1,
Mul = 2,
Div = 3,
}

struct CalcRequest {
op: Operation,
a: i32,
b: i32,
}

impl CalcRequest {
fn encode(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(9);
buf.push(self.op as u8);
buf.extend_from_slice(&self.a.to_le_bytes());
buf.extend_from_slice(&self.b.to_le_bytes());
buf
}

fn decode(data: &[u8]) -> Option<Self> {
if data.len() < 9 {
return None;
}
Some(Self {
op: match data[0] {
0 => Operation::Add,
1 => Operation::Sub,
2 => Operation::Mul,
3 => Operation::Div,
_ => return None,
},
a: i32::from_le_bytes([data[1], data[2], data[3], data[4]]),
b: i32::from_le_bytes([data[5], data[6], data[7], data[8]]),
})
}
}

Server

use hdds::Participant;
use hdds::rpc::{ServiceServer, RemoteExceptionCode, SampleIdentity};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = Participant::builder("calc-server").build()?;

let handler = |_id: SampleIdentity, payload: &[u8]| {
let req = CalcRequest::decode(payload)
.ok_or((RemoteExceptionCode::InvalidArgument, "invalid request".into()))?;

let result = match req.op {
Operation::Add => req.a + req.b,
Operation::Sub => req.a - req.b,
Operation::Mul => req.a * req.b,
Operation::Div => {
if req.b == 0 {
return Err((
RemoteExceptionCode::InvalidArgument,
"division by zero".into()
));
}
req.a / req.b
}
};

Ok(result.to_le_bytes().to_vec())
};

let server = ServiceServer::new(&participant, "calculator", handler)?;
server.spin().await;

Ok(())
}

Client

use hdds::Participant;
use hdds::rpc::ServiceClient;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = Participant::builder("calc-client").build()?;
let client = ServiceClient::new(&participant, "calculator")?;

// Add
let req = CalcRequest { op: Operation::Add, a: 10, b: 5 };
let reply = client.call_raw(&req.encode(), Duration::from_secs(5)).await?;
let result = i32::from_le_bytes([reply[0], reply[1], reply[2], reply[3]]);
println!("10 + 5 = {}", result);

// Multiply
let req = CalcRequest { op: Operation::Mul, a: 7, b: 8 };
let reply = client.call_raw(&req.encode(), Duration::from_secs(5)).await?;
let result = i32::from_le_bytes([reply[0], reply[1], reply[2], reply[3]]);
println!("7 * 8 = {}", result);

// Division by zero (error)
let req = CalcRequest { op: Operation::Div, a: 10, b: 0 };
match client.call_raw(&req.encode(), Duration::from_secs(5)).await {
Err(e) => println!("Expected error: {}", e),
Ok(_) => println!("Unexpected success"),
}

Ok(())
}

Key-Value Store

A stateful service example.

Server

use hdds::Participant;
use hdds::rpc::{ServiceServer, RemoteExceptionCode, SampleIdentity};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = Participant::builder("kv-server").build()?;

// Shared state
let store: Arc<RwLock<HashMap<String, Vec<u8>>>> = Arc::new(RwLock::new(HashMap::new()));
let store_clone = store.clone();

let handler = move |_id: SampleIdentity, payload: &[u8]| {
if payload.is_empty() {
return Err((RemoteExceptionCode::InvalidArgument, "empty request".into()));
}

let op = payload[0];
let data = &payload[1..];

match op {
// GET: key_len (4 bytes) + key
0 => {
if data.len() < 4 {
return Err((RemoteExceptionCode::InvalidArgument, "invalid GET".into()));
}
let key_len = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
let key = String::from_utf8_lossy(&data[4..4+key_len]).to_string();

let store = store_clone.read().unwrap();
match store.get(&key) {
Some(value) => Ok(value.clone()),
None => Err((RemoteExceptionCode::InvalidArgument, "key not found".into())),
}
}
// SET: key_len + key + value
1 => {
if data.len() < 4 {
return Err((RemoteExceptionCode::InvalidArgument, "invalid SET".into()));
}
let key_len = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
let key = String::from_utf8_lossy(&data[4..4+key_len]).to_string();
let value = data[4+key_len..].to_vec();

let mut store = store_clone.write().unwrap();
store.insert(key, value);
Ok(vec![1]) // Success
}
// DELETE
2 => {
if data.len() < 4 {
return Err((RemoteExceptionCode::InvalidArgument, "invalid DELETE".into()));
}
let key_len = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
let key = String::from_utf8_lossy(&data[4..4+key_len]).to_string();

let mut store = store_clone.write().unwrap();
store.remove(&key);
Ok(vec![1]) // Success
}
_ => Err((RemoteExceptionCode::UnsupportedMethod, "unknown op".into())),
}
};

let server = ServiceServer::new(&participant, "kv", handler)?;
server.spin().await;

Ok(())
}

Timeout Handling

use hdds::rpc::{ServiceClient, RpcError};
use std::time::Duration;

async fn call_with_retry(
client: &ServiceClient,
payload: &[u8],
max_retries: u32,
) -> Result<Vec<u8>, RpcError> {
let mut last_error = RpcError::Timeout;

for attempt in 0..max_retries {
match client.call_raw(payload, Duration::from_secs(2)).await {
Ok(reply) => return Ok(reply),
Err(RpcError::Timeout) => {
println!("Attempt {} timed out, retrying...", attempt + 1);
last_error = RpcError::Timeout;
}
Err(e) => return Err(e), // Non-timeout errors fail immediately
}
}

Err(last_error)
}

Graceful Shutdown

use hdds::Participant;
use hdds::rpc::ServiceServer;
use tokio::signal;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = Participant::builder("server").build()?;

let handler = |_, payload: &[u8]| Ok(payload.to_vec());
let server = ServiceServer::new(&participant, "service", handler)?;

// Spawn server task
let server_handle = tokio::spawn(async move {
server.spin().await;
});

// Wait for Ctrl+C
signal::ctrl_c().await?;
println!("Shutting down...");

// Server will shutdown when dropped
server_handle.abort();

Ok(())
}

Multiple Concurrent Clients

use hdds::Participant;
use hdds::rpc::ServiceClient;
use std::sync::Arc;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = Arc::new(Participant::builder("multi-client").build()?);

let mut handles = vec![];

for i in 0..10 {
let p = participant.clone();
handles.push(tokio::spawn(async move {
let client = ServiceClient::new(&p, "echo").unwrap();

let msg = format!("Hello from client {}", i);
let reply = client
.call_raw(msg.as_bytes(), Duration::from_secs(5))
.await
.unwrap();

println!("Client {} got: {}", i, String::from_utf8_lossy(&reply));
}));
}

for h in handles {
h.await?;
}

Ok(())
}