Skip to main content

Throughput Optimization

Guide to achieving maximum throughput in HDDS applications.

Throughput Factors

Throughput = min(
Serialization rate,
Network bandwidth,
Writer capacity,
Reader capacity,
Transport efficiency
)

QoS Configuration

Best Effort (Maximum Throughput)

use hdds::QoS;

let qos = QoS::best_effort().keep_last(1).volatile();

High-Throughput Reliable

use hdds::QoS;

let qos = QoS::reliable()
.keep_last(1000) // Large buffer
.volatile();

Large history depth prevents blocking when reader is slow.

Batching

Enable Batching

use hdds::{Participant, QoS, DDS, TransportMode};
use std::time::Duration;

let participant = Participant::builder("app")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.build()?;

let topic = participant.topic::<SensorData>("SensorTopic")?;
let writer = topic
.writer()
.qos(QoS::reliable()
.keep_last(1000)
.batching(true)
.max_batch_size(64 * 1024) // 64 KB batches
.batch_flush_period(Duration::from_millis(1)))
.build()?;

Manual Batching

// Collect samples
let batch: Vec<SensorData> = collect_samples();

// Write in burst
for sample in batch {
writer.write(&sample)?;
}

// Explicit flush
writer.flush()?;

Writer Optimization

Asynchronous Write

use hdds::QoS;

// Non-blocking write (queue and continue)
let qos = QoS::reliable().keep_last(10000);

// Write without waiting
match writer.write(&sample) {
Ok(()) => { /* queued */ }
Err(hdds::Error::WouldBlock) => {
// Buffer full, handle backpressure
}
Err(e) => return Err(e.into()),
}

Pre-allocate Samples

// Reuse sample to avoid allocation
let mut sample = SensorData::default();

for i in 0..1_000_000 {
sample.sensor_id = 1;
sample.value = get_value(i);
sample.timestamp = now();
writer.write(&sample)?;
}

Multiple Writers

use hdds::{Participant, QoS, DDS, TransportMode};
use std::thread;

// Parallel writers for higher throughput
let handles: Vec<_> = (0..4)
.map(|i| {
let participant = participant.clone();
thread::spawn(move || -> Result<(), hdds::Error> {
let topic = participant.topic::<SensorData>("SensorTopic")?;
let writer = topic
.writer()
.qos(QoS::reliable().keep_last(1000))
.build()?;

for j in 0..250_000 {
writer.write(&sample)?;
}
Ok(())
})
})
.collect();

for handle in handles {
handle.join().unwrap()?;
}

Reader Optimization

Batch Reading

// Take all available samples
while let Some(sample) = reader.try_take()? {
process(&sample);
}

// Or read without removing
for sample in reader.read()? {
process(&sample);
}

Parallel Processing

use rayon::prelude::*;

let samples: Vec<_> = std::iter::from_fn(|| reader.try_take().ok().flatten())
.collect();

// Process in parallel
samples.par_iter().for_each(|sample| {
process(sample);
});

Multiple Readers (Partitioned)

use hdds::{Participant, QoS, DDS, TransportMode};

// Create readers for different partitions
let topic = participant.topic::<SensorData>("SensorTopic")?;

let reader_a = topic
.reader()
.qos(QoS::reliable().partition(&["sensor_a"]))
.build()?;

let reader_b = topic
.reader()
.qos(QoS::reliable().partition(&["sensor_b"]))
.build()?;

// Process in parallel threads

Transport Optimization

Large Socket Buffers

use hdds::{Participant, TransportMode};

let participant = Participant::builder("app")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.socket_buffer_size(16 * 1024 * 1024) // 16 MB
.build()?;

UDP with Fragmentation

use hdds::{Participant, TransportMode};

// Enable for payloads > 64 KB
let participant = Participant::builder("app")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.max_message_size(256 * 1024) // 256 KB max
.build()?;

IntraProcess for Same-Process Testing

use hdds::{Participant, TransportMode};

let participant = Participant::builder("app")
.domain_id(0)
.with_transport(TransportMode::IntraProcess) // Zero-copy, same process only
.build()?;
note

IntraProcess provides maximum throughput but only works within the same process. For cross-process or cross-host communication, use UdpMulticast.

Network Configuration

Linux Kernel Tuning

# Increase buffer limits
sysctl -w net.core.rmem_max=268435456
sysctl -w net.core.wmem_max=268435456
sysctl -w net.core.netdev_max_backlog=100000

# UDP buffer sizes
sysctl -w net.ipv4.udp_rmem_min=16384
sysctl -w net.ipv4.udp_wmem_min=16384

Network Interface

# Increase ring buffer
ethtool -G eth0 rx 4096 tx 4096

# Enable hardware offloading
ethtool -K eth0 gso on gro on tso on

# Set MTU (if supported)
ip link set eth0 mtu 9000 # Jumbo frames

Payload Optimization

Optimal Payload Sizes

Payload SizeEfficiencyUse Case
< 64 bytesLow (header overhead)Status flags
256-1024 bytesGoodSensor data
4-16 KBOptimalVideo frames
64 KB+Fragmentation overheadBulk transfer

Compression

use flate2::write::GzEncoder;
use flate2::Compression;

// Compress large payloads
let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
encoder.write_all(&large_data)?;
let compressed = encoder.finish()?;

sample.compressed_data = compressed;
writer.write(&sample)?;

Efficient Serialization

// Use arrays instead of sequences
struct FastData {
float values[1024]; // Fixed, no length prefix
};

// Use bounded strings
struct LimitedData {
string<64> name; // Max 64 chars
};

History and Resource Limits

Writer History

use hdds::QoS;

let qos = QoS::reliable()
.keep_last(10000)
.max_samples(10000)
.max_instances(1)
.max_samples_per_instance(10000);

Reader History

use hdds::QoS;

let qos = QoS::reliable()
.keep_last(1000)
.max_samples(100000)
.max_instances(100)
.max_samples_per_instance(1000);

Backpressure Handling

Flow Control

// Monitor write status
let status = writer.deadline_missed_status()?;
if status.total_count > 0 {
// Slow down publishing
publish_rate *= 0.9;
}

// Check for blocked writes
match writer.write(&sample) {
Err(hdds::Error::Timeout) => {
eprintln!("Reader can't keep up!");
// Reduce rate or drop samples
}
_ => {}
}

Adaptive Rate

use hdds::DataWriter;

struct AdaptivePublisher<T> {
writer: DataWriter<T>,
rate: f64,
min_rate: f64,
max_rate: f64,
}

impl<T: hdds::DDS> AdaptivePublisher<T> {
fn publish(&mut self, sample: &T) -> Result<(), hdds::Error> {
match self.writer.write(sample) {
Ok(()) => {
// Increase rate on success
self.rate = (self.rate * 1.01).min(self.max_rate);
Ok(())
}
Err(hdds::Error::WouldBlock) => {
// Decrease rate on backpressure
self.rate = (self.rate * 0.9).max(self.min_rate);
Err(hdds::Error::WouldBlock)
}
Err(e) => Err(e),
}
}
}

Measuring Throughput

use std::time::{Duration, Instant};

let start = Instant::now();
let mut count = 0u64;
let mut bytes = 0u64;
let sample_size = std::mem::size_of::<SensorData>() as u64;

loop {
writer.write(&sample)?;
count += 1;
bytes += sample_size;

if start.elapsed() >= Duration::from_secs(10) {
break;
}
}

let elapsed = start.elapsed().as_secs_f64();
println!("Throughput: {:.0} msg/s", count as f64 / elapsed);
println!("Bandwidth: {:.2} MB/s", bytes as f64 / elapsed / 1_000_000.0);

Throughput Checklist

  • Use BestEffort for non-critical high-rate data
  • Enable batching for small messages
  • Increase history depth for reliable delivery
  • Use large socket buffers
  • Enable shared memory for same-host
  • Pre-allocate and reuse samples
  • Use parallel writers/readers
  • Compress large payloads
  • Handle backpressure gracefully
  • Use optimal payload sizes (256-16KB)

Common Throughput Issues

IssueSymptomSolution
Small payloadsLow bandwidthBatch messages
Reliable blockingWriter stallsIncrease history
Slow readerDropped samplesAdd readers/partitions
Network saturationPacket lossReduce rate
CPU bottleneckLow throughputParallel processing

Next Steps