DDS-RPC Examples
Practical examples of using DDS-RPC for service-oriented communication.
Echo Service
The simplest RPC service - returns whatever it receives.
Server
use hdds::Participant;
use hdds::rpc::{ServiceServer, SampleIdentity};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = Participant::builder("echo-server").build()?;
let handler = |_id: SampleIdentity, payload: &[u8]| {
// Simply return the payload as-is
Ok(payload.to_vec())
};
let server = ServiceServer::new(&participant, "echo", handler)?;
println!("Echo server running on rq/echo, rr/echo");
server.spin().await;
Ok(())
}
Client
use hdds::Participant;
use hdds::rpc::ServiceClient;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = Participant::builder("echo-client").build()?;
let client = ServiceClient::new(&participant, "echo")?;
let message = b"Hello, RPC!";
let reply = client.call_raw(message, Duration::from_secs(5)).await?;
assert_eq!(reply, message);
println!("Echo reply: {:?}", String::from_utf8_lossy(&reply));
Ok(())
}
Calculator Service
A service with multiple operations.
Shared Types
// Shared between client and server
#[repr(u8)]
enum Operation {
Add = 0,
Sub = 1,
Mul = 2,
Div = 3,
}
struct CalcRequest {
op: Operation,
a: i32,
b: i32,
}
impl CalcRequest {
fn encode(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(9);
buf.push(self.op as u8);
buf.extend_from_slice(&self.a.to_le_bytes());
buf.extend_from_slice(&self.b.to_le_bytes());
buf
}
fn decode(data: &[u8]) -> Option<Self> {
if data.len() < 9 {
return None;
}
Some(Self {
op: match data[0] {
0 => Operation::Add,
1 => Operation::Sub,
2 => Operation::Mul,
3 => Operation::Div,
_ => return None,
},
a: i32::from_le_bytes([data[1], data[2], data[3], data[4]]),
b: i32::from_le_bytes([data[5], data[6], data[7], data[8]]),
})
}
}
Server
use hdds::Participant;
use hdds::rpc::{ServiceServer, RemoteExceptionCode, SampleIdentity};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = Participant::builder("calc-server").build()?;
let handler = |_id: SampleIdentity, payload: &[u8]| {
let req = CalcRequest::decode(payload)
.ok_or((RemoteExceptionCode::InvalidArgument, "invalid request".into()))?;
let result = match req.op {
Operation::Add => req.a + req.b,
Operation::Sub => req.a - req.b,
Operation::Mul => req.a * req.b,
Operation::Div => {
if req.b == 0 {
return Err((
RemoteExceptionCode::InvalidArgument,
"division by zero".into()
));
}
req.a / req.b
}
};
Ok(result.to_le_bytes().to_vec())
};
let server = ServiceServer::new(&participant, "calculator", handler)?;
server.spin().await;
Ok(())
}
Client
use hdds::Participant;
use hdds::rpc::ServiceClient;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = Participant::builder("calc-client").build()?;
let client = ServiceClient::new(&participant, "calculator")?;
// Add
let req = CalcRequest { op: Operation::Add, a: 10, b: 5 };
let reply = client.call_raw(&req.encode(), Duration::from_secs(5)).await?;
let result = i32::from_le_bytes([reply[0], reply[1], reply[2], reply[3]]);
println!("10 + 5 = {}", result);
// Multiply
let req = CalcRequest { op: Operation::Mul, a: 7, b: 8 };
let reply = client.call_raw(&req.encode(), Duration::from_secs(5)).await?;
let result = i32::from_le_bytes([reply[0], reply[1], reply[2], reply[3]]);
println!("7 * 8 = {}", result);
// Division by zero (error)
let req = CalcRequest { op: Operation::Div, a: 10, b: 0 };
match client.call_raw(&req.encode(), Duration::from_secs(5)).await {
Err(e) => println!("Expected error: {}", e),
Ok(_) => println!("Unexpected success"),
}
Ok(())
}
Key-Value Store
A stateful service example.
Server
use hdds::Participant;
use hdds::rpc::{ServiceServer, RemoteExceptionCode, SampleIdentity};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = Participant::builder("kv-server").build()?;
// Shared state
let store: Arc<RwLock<HashMap<String, Vec<u8>>>> = Arc::new(RwLock::new(HashMap::new()));
let store_clone = store.clone();
let handler = move |_id: SampleIdentity, payload: &[u8]| {
if payload.is_empty() {
return Err((RemoteExceptionCode::InvalidArgument, "empty request".into()));
}
let op = payload[0];
let data = &payload[1..];
match op {
// GET: key_len (4 bytes) + key
0 => {
if data.len() < 4 {
return Err((RemoteExceptionCode::InvalidArgument, "invalid GET".into()));
}
let key_len = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
let key = String::from_utf8_lossy(&data[4..4+key_len]).to_string();
let store = store_clone.read().unwrap();
match store.get(&key) {
Some(value) => Ok(value.clone()),
None => Err((RemoteExceptionCode::InvalidArgument, "key not found".into())),
}
}
// SET: key_len + key + value
1 => {
if data.len() < 4 {
return Err((RemoteExceptionCode::InvalidArgument, "invalid SET".into()));
}
let key_len = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
let key = String::from_utf8_lossy(&data[4..4+key_len]).to_string();
let value = data[4+key_len..].to_vec();
let mut store = store_clone.write().unwrap();
store.insert(key, value);
Ok(vec![1]) // Success
}
// DELETE
2 => {
if data.len() < 4 {
return Err((RemoteExceptionCode::InvalidArgument, "invalid DELETE".into()));
}
let key_len = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
let key = String::from_utf8_lossy(&data[4..4+key_len]).to_string();
let mut store = store_clone.write().unwrap();
store.remove(&key);
Ok(vec![1]) // Success
}
_ => Err((RemoteExceptionCode::UnsupportedMethod, "unknown op".into())),
}
};
let server = ServiceServer::new(&participant, "kv", handler)?;
server.spin().await;
Ok(())
}
Timeout Handling
use hdds::rpc::{ServiceClient, RpcError};
use std::time::Duration;
async fn call_with_retry(
client: &ServiceClient,
payload: &[u8],
max_retries: u32,
) -> Result<Vec<u8>, RpcError> {
let mut last_error = RpcError::Timeout;
for attempt in 0..max_retries {
match client.call_raw(payload, Duration::from_secs(2)).await {
Ok(reply) => return Ok(reply),
Err(RpcError::Timeout) => {
println!("Attempt {} timed out, retrying...", attempt + 1);
last_error = RpcError::Timeout;
}
Err(e) => return Err(e), // Non-timeout errors fail immediately
}
}
Err(last_error)
}
Graceful Shutdown
use hdds::Participant;
use hdds::rpc::ServiceServer;
use tokio::signal;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = Participant::builder("server").build()?;
let handler = |_, payload: &[u8]| Ok(payload.to_vec());
let server = ServiceServer::new(&participant, "service", handler)?;
// Spawn server task
let server_handle = tokio::spawn(async move {
server.spin().await;
});
// Wait for Ctrl+C
signal::ctrl_c().await?;
println!("Shutting down...");
// Server will shutdown when dropped
server_handle.abort();
Ok(())
}
Multiple Concurrent Clients
use hdds::Participant;
use hdds::rpc::ServiceClient;
use std::sync::Arc;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let participant = Arc::new(Participant::builder("multi-client").build()?);
let mut handles = vec![];
for i in 0..10 {
let p = participant.clone();
handles.push(tokio::spawn(async move {
let client = ServiceClient::new(&p, "echo").unwrap();
let msg = format!("Hello from client {}", i);
let reply = client
.call_raw(msg.as_bytes(), Duration::from_secs(5))
.await
.unwrap();
println!("Client {} got: {}", i, String::from_utf8_lossy(&reply));
}));
}
for h in handles {
h.await?;
}
Ok(())
}