Skip to main content

Latency Optimization

Guide to achieving minimal latency in HDDS applications.

Latency Components

Total Latency = Serialization + Queue + Network + Deserialization + Delivery

Typical breakdown (64-byte payload, same host):
Serialization: 50 ns (0.6%)
Queue/History: 500 ns (6%)
Network (UDP): 5 us (63%)
Deserialization: 40 ns (0.5%)
Delivery: 2.4 us (30%)
─────────────────────────────────
Total: ~8 us

Transport Selection

Shared Memory (Lowest Latency)

let config = DomainParticipantConfig::default()
.transport(TransportConfig::default()
.enable_shared_memory(true)
.prefer_shared_memory(true)); // Use SHM when available

Latency: 1-5 us for small payloads

UDP Unicast

let config = DomainParticipantConfig::default()
.transport(TransportConfig::default()
.enable_udp_unicast(true)
.disable_multicast(true)); // Avoid multicast overhead

Latency: 20-100 us depending on network

QoS Configuration

Best Effort (Fastest)

let qos = DataWriterQos::default()
.reliability(Reliability::BestEffort)
.history(History::KeepLast { depth: 1 })
.durability(Durability::Volatile);

No acknowledgment overhead, but may lose samples.

Low-Latency Reliable

let qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_micros(100), // Short timeout
})
.history(History::KeepLast { depth: 1 }) // Minimal buffering
.durability(Durability::Volatile);

Disable Unnecessary Features

let qos = DataWriterQos::default()
// No deadline monitoring (saves timer overhead)
.deadline(Deadline::infinite())

// No lifespan expiration
.lifespan(Lifespan::infinite())

// Automatic liveliness (no manual assertions)
.liveliness(Liveliness::Automatic {
lease_duration: Duration::from_secs(30),
});

Writer Optimization

Batching Disabled

let config = DataWriterConfig::default()
.batching_enabled(false) // Send immediately
.flush_on_write(true); // No internal buffering

Pre-Register Instances

// Pre-register for keyed topics
let handle = writer.register_instance(&sample)?;

// Fast write path
loop {
sample.value = get_sensor_value();
writer.write_w_handle(&sample, handle)?; // Skip key lookup
}

Zero-Copy Write

// Loan buffer from writer (avoids copy)
let mut loan = writer.loan_sample()?;
*loan = SensorData {
sensor_id: 1,
value: 42.5,
timestamp: now(),
};
loan.write()?; // Direct to transport buffer

Reader Optimization

Polling Mode

// Tight polling loop (lowest latency)
loop {
if let Ok(samples) = reader.try_take() {
for sample in samples {
process(sample);
}
}
// Optional: yield to prevent 100% CPU
std::hint::spin_loop();
}

WaitSet with Short Timeout

let waitset = WaitSet::new()?;
waitset.attach(reader.status_condition())?;

loop {
// Short wait, fast wakeup
if waitset.wait(Duration::from_micros(100)).is_ok() {
let samples = reader.take()?;
for sample in samples {
process(sample);
}
}
}

Take vs Read

// take() is faster (no copy, removes from cache)
let samples = reader.take()?;

// read() is slower (copies, keeps in cache)
let samples = reader.read()?;

Threading Model

Dedicated Reader Thread

use std::thread;

// Pin to CPU core for cache locality
let reader_thread = thread::Builder::new()
.name("dds-reader".into())
.spawn(move || {
// Set thread priority (Linux)
#[cfg(target_os = "linux")]
unsafe {
libc::setpriority(libc::PRIO_PROCESS, 0, -20);
}

loop {
if let Ok(samples) = reader.try_take() {
for sample in samples {
// Process inline, no channel overhead
process_sample(&sample);
}
}
}
})?;

CPU Affinity

#[cfg(target_os = "linux")]
fn pin_to_core(core_id: usize) {
use libc::{cpu_set_t, sched_setaffinity, CPU_SET};
unsafe {
let mut set: cpu_set_t = std::mem::zeroed();
CPU_SET(core_id, &mut set);
sched_setaffinity(0, std::mem::size_of::<cpu_set_t>(), &set);
}
}

Network Tuning

Socket Buffer Sizes

let config = TransportConfig::default()
.socket_receive_buffer_size(4 * 1024 * 1024) // 4 MB
.socket_send_buffer_size(4 * 1024 * 1024);

Linux Kernel Parameters

# Increase socket buffers
sysctl -w net.core.rmem_max=16777216
sysctl -w net.core.wmem_max=16777216
sysctl -w net.core.rmem_default=4194304
sysctl -w net.core.wmem_default=4194304

# Reduce latency
sysctl -w net.ipv4.tcp_low_latency=1

# Disable Nagle's algorithm (already off for UDP)
# For TCP transport:
sysctl -w net.ipv4.tcp_nodelay=1

Network Interface

# Disable interrupt coalescing
ethtool -C eth0 rx-usecs 0 tx-usecs 0

# Enable busy polling
sysctl -w net.core.busy_poll=50
sysctl -w net.core.busy_read=50

Serialization Optimization

Use Fixed-Size Types

// Faster (no length prefix)
struct FastSensor {
uint32 sensor_id;
float values[8]; // Fixed array
};

// Slower (variable length)
struct SlowSensor {
uint32 sensor_id;
sequence<float> values; // Dynamic sequence
};

Minimize Payload Size

// Use smallest types that fit
struct CompactSensor {
uint16 sensor_id; // Not uint32
int16 value_x10; // Fixed-point, not float
uint32 timestamp_ms; // Not uint64 nanoseconds
};

Discovery Optimization

Fast Discovery

let config = DomainParticipantConfig::default()
// Faster initial announcements
.initial_announcement_period(Duration::from_millis(50))

// Shorter intervals
.announcement_period(Duration::from_millis(500))

// Pre-configure peers (skip multicast discovery)
.initial_peers(vec!["192.168.1.100:7400".parse()?]);

Static Discovery

// No discovery overhead after startup
let config = DomainParticipantConfig::default()
.enable_spdp(false) // Disable participant discovery
.static_endpoints(vec![
StaticEndpoint::writer("sensor", "SensorTopic", "SensorData"),
StaticEndpoint::reader("controller", "SensorTopic", "SensorData"),
]);

Measuring Latency

Instrumentation

use std::time::Instant;

// Publisher side
let start = Instant::now();
writer.write(&sample)?;
let write_time = start.elapsed();

// Subscriber side (requires synchronized clocks or round-trip)
let receive_time = Instant::now();
let samples = reader.take()?;
let delivery_time = receive_time.duration_since(sample.timestamp);

Latency Histogram

struct LatencyStats {
samples: Vec<Duration>,
}

impl LatencyStats {
fn add(&mut self, latency: Duration) {
self.samples.push(latency);
}

fn percentile(&self, p: f64) -> Duration {
let mut sorted = self.samples.clone();
sorted.sort();
let idx = (sorted.len() as f64 * p / 100.0) as usize;
sorted[idx.min(sorted.len() - 1)]
}

fn report(&self) {
println!("Latency p50: {:?}", self.percentile(50.0));
println!("Latency p95: {:?}", self.percentile(95.0));
println!("Latency p99: {:?}", self.percentile(99.0));
}
}

Latency Checklist

  • Use shared memory transport when possible
  • Set BestEffort reliability for non-critical data
  • Use KeepLast(1) history
  • Disable batching
  • Pre-register instances
  • Use take() instead of read()
  • Pin threads to CPU cores
  • Tune socket buffer sizes
  • Use fixed-size types in IDL
  • Minimize payload size

Common Latency Issues

IssueSymptomSolution
GC pausesPeriodic spikesUse pre-allocated buffers
Lock contentionInconsistent latencyReduce shared state
Large payloadsHigh baselineFragment or compress
MulticastAdded delayUse unicast
Reliable ACKsBlocking writesIncrease history depth

Next Steps