Latency Optimization
Guide to achieving minimal latency in HDDS applications.
Latency Components
Total Latency = Serialization + Queue + Network + Deserialization + Delivery
Typical breakdown (64-byte payload, same host):
Serialization: 50 ns (0.6%)
Queue/History: 500 ns (6%)
Network (UDP): 5 us (63%)
Deserialization: 40 ns (0.5%)
Delivery: 2.4 us (30%)
─────────────────────────────────
Total: ~8 us
Transport Selection
Shared Memory (Lowest Latency)
let config = DomainParticipantConfig::default()
.transport(TransportConfig::default()
.enable_shared_memory(true)
.prefer_shared_memory(true)); // Use SHM when available
Latency: 1-5 us for small payloads
UDP Unicast
let config = DomainParticipantConfig::default()
.transport(TransportConfig::default()
.enable_udp_unicast(true)
.disable_multicast(true)); // Avoid multicast overhead
Latency: 20-100 us depending on network
QoS Configuration
Best Effort (Fastest)
let qos = DataWriterQos::default()
.reliability(Reliability::BestEffort)
.history(History::KeepLast { depth: 1 })
.durability(Durability::Volatile);
No acknowledgment overhead, but may lose samples.
Low-Latency Reliable
let qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_micros(100), // Short timeout
})
.history(History::KeepLast { depth: 1 }) // Minimal buffering
.durability(Durability::Volatile);
Disable Unnecessary Features
let qos = DataWriterQos::default()
// No deadline monitoring (saves timer overhead)
.deadline(Deadline::infinite())
// No lifespan expiration
.lifespan(Lifespan::infinite())
// Automatic liveliness (no manual assertions)
.liveliness(Liveliness::Automatic {
lease_duration: Duration::from_secs(30),
});
Writer Optimization
Batching Disabled
let config = DataWriterConfig::default()
.batching_enabled(false) // Send immediately
.flush_on_write(true); // No internal buffering
Pre-Register Instances
// Pre-register for keyed topics
let handle = writer.register_instance(&sample)?;
// Fast write path
loop {
sample.value = get_sensor_value();
writer.write_w_handle(&sample, handle)?; // Skip key lookup
}
Zero-Copy Write
// Loan buffer from writer (avoids copy)
let mut loan = writer.loan_sample()?;
*loan = SensorData {
sensor_id: 1,
value: 42.5,
timestamp: now(),
};
loan.write()?; // Direct to transport buffer
Reader Optimization
Polling Mode
// Tight polling loop (lowest latency)
loop {
if let Ok(samples) = reader.try_take() {
for sample in samples {
process(sample);
}
}
// Optional: yield to prevent 100% CPU
std::hint::spin_loop();
}
WaitSet with Short Timeout
let waitset = WaitSet::new()?;
waitset.attach(reader.status_condition())?;
loop {
// Short wait, fast wakeup
if waitset.wait(Duration::from_micros(100)).is_ok() {
let samples = reader.take()?;
for sample in samples {
process(sample);
}
}
}
Take vs Read
// take() is faster (no copy, removes from cache)
let samples = reader.take()?;
// read() is slower (copies, keeps in cache)
let samples = reader.read()?;
Threading Model
Dedicated Reader Thread
use std::thread;
// Pin to CPU core for cache locality
let reader_thread = thread::Builder::new()
.name("dds-reader".into())
.spawn(move || {
// Set thread priority (Linux)
#[cfg(target_os = "linux")]
unsafe {
libc::setpriority(libc::PRIO_PROCESS, 0, -20);
}
loop {
if let Ok(samples) = reader.try_take() {
for sample in samples {
// Process inline, no channel overhead
process_sample(&sample);
}
}
}
})?;
CPU Affinity
#[cfg(target_os = "linux")]
fn pin_to_core(core_id: usize) {
use libc::{cpu_set_t, sched_setaffinity, CPU_SET};
unsafe {
let mut set: cpu_set_t = std::mem::zeroed();
CPU_SET(core_id, &mut set);
sched_setaffinity(0, std::mem::size_of::<cpu_set_t>(), &set);
}
}
Network Tuning
Socket Buffer Sizes
let config = TransportConfig::default()
.socket_receive_buffer_size(4 * 1024 * 1024) // 4 MB
.socket_send_buffer_size(4 * 1024 * 1024);
Linux Kernel Parameters
# Increase socket buffers
sysctl -w net.core.rmem_max=16777216
sysctl -w net.core.wmem_max=16777216
sysctl -w net.core.rmem_default=4194304
sysctl -w net.core.wmem_default=4194304
# Reduce latency
sysctl -w net.ipv4.tcp_low_latency=1
# Disable Nagle's algorithm (already off for UDP)
# For TCP transport:
sysctl -w net.ipv4.tcp_nodelay=1
Network Interface
# Disable interrupt coalescing
ethtool -C eth0 rx-usecs 0 tx-usecs 0
# Enable busy polling
sysctl -w net.core.busy_poll=50
sysctl -w net.core.busy_read=50
Serialization Optimization
Use Fixed-Size Types
// Faster (no length prefix)
struct FastSensor {
uint32 sensor_id;
float values[8]; // Fixed array
};
// Slower (variable length)
struct SlowSensor {
uint32 sensor_id;
sequence<float> values; // Dynamic sequence
};
Minimize Payload Size
// Use smallest types that fit
struct CompactSensor {
uint16 sensor_id; // Not uint32
int16 value_x10; // Fixed-point, not float
uint32 timestamp_ms; // Not uint64 nanoseconds
};
Discovery Optimization
Fast Discovery
let config = DomainParticipantConfig::default()
// Faster initial announcements
.initial_announcement_period(Duration::from_millis(50))
// Shorter intervals
.announcement_period(Duration::from_millis(500))
// Pre-configure peers (skip multicast discovery)
.initial_peers(vec!["192.168.1.100:7400".parse()?]);
Static Discovery
// No discovery overhead after startup
let config = DomainParticipantConfig::default()
.enable_spdp(false) // Disable participant discovery
.static_endpoints(vec![
StaticEndpoint::writer("sensor", "SensorTopic", "SensorData"),
StaticEndpoint::reader("controller", "SensorTopic", "SensorData"),
]);
Measuring Latency
Instrumentation
use std::time::Instant;
// Publisher side
let start = Instant::now();
writer.write(&sample)?;
let write_time = start.elapsed();
// Subscriber side (requires synchronized clocks or round-trip)
let receive_time = Instant::now();
let samples = reader.take()?;
let delivery_time = receive_time.duration_since(sample.timestamp);
Latency Histogram
struct LatencyStats {
samples: Vec<Duration>,
}
impl LatencyStats {
fn add(&mut self, latency: Duration) {
self.samples.push(latency);
}
fn percentile(&self, p: f64) -> Duration {
let mut sorted = self.samples.clone();
sorted.sort();
let idx = (sorted.len() as f64 * p / 100.0) as usize;
sorted[idx.min(sorted.len() - 1)]
}
fn report(&self) {
println!("Latency p50: {:?}", self.percentile(50.0));
println!("Latency p95: {:?}", self.percentile(95.0));
println!("Latency p99: {:?}", self.percentile(99.0));
}
}
Latency Checklist
- Use shared memory transport when possible
- Set BestEffort reliability for non-critical data
- Use KeepLast(1) history
- Disable batching
- Pre-register instances
- Use take() instead of read()
- Pin threads to CPU cores
- Tune socket buffer sizes
- Use fixed-size types in IDL
- Minimize payload size
Common Latency Issues
| Issue | Symptom | Solution |
|---|---|---|
| GC pauses | Periodic spikes | Use pre-allocated buffers |
| Lock contention | Inconsistent latency | Reduce shared state |
| Large payloads | High baseline | Fragment or compress |
| Multicast | Added delay | Use unicast |
| Reliable ACKs | Blocking writes | Increase history depth |
Next Steps
- Throughput Tuning - Maximize bandwidth
- Benchmarks - Performance baselines