Skip to main content

Throughput Optimization

Guide to achieving maximum throughput in HDDS applications.

Throughput Factors

Throughput = min(
Serialization rate,
Network bandwidth,
Writer capacity,
Reader capacity,
Transport efficiency
)

QoS Configuration

Best Effort (Maximum Throughput)

let qos = DataWriterQos::default()
.reliability(Reliability::BestEffort)
.history(History::KeepLast { depth: 1 })
.durability(Durability::Volatile);

High-Throughput Reliable

let qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_secs(1),
})
.history(History::KeepLast { depth: 1000 }) // Large buffer
.durability(Durability::Volatile);

Large history depth prevents blocking when reader is slow.

Batching

Enable Batching

let config = DataWriterConfig::default()
.batching_enabled(true)
.max_batch_size(64 * 1024) // 64 KB batches
.batch_flush_period(Duration::from_millis(1));

Manual Batching

// Collect samples
let batch: Vec<SensorData> = collect_samples();

// Write in burst
for sample in batch {
writer.write(&sample)?;
}

// Explicit flush
writer.flush()?;

Writer Optimization

Asynchronous Write

// Non-blocking write (queue and continue)
let qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::ZERO, // Non-blocking
})
.history(History::KeepLast { depth: 10000 });

// Write without waiting
match writer.write(&sample) {
Ok(()) => { /* queued */ }
Err(HddsError::WouldBlock) => {
// Buffer full, handle backpressure
}
Err(e) => return Err(e.into()),
}

Pre-allocate Samples

// Reuse sample to avoid allocation
let mut sample = SensorData::default();

for i in 0..1_000_000 {
sample.sensor_id = 1;
sample.value = get_value(i);
sample.timestamp = now();
writer.write(&sample)?;
}

Multiple Writers

// Parallel writers for higher throughput
use std::thread;

let handles: Vec<_> = (0..4)
.map(|i| {
let participant = participant.clone();
thread::spawn(move || {
let publisher = participant.create_publisher()?;
let writer = publisher.create_writer(&topic)?;

for j in 0..250_000 {
writer.write(&sample)?;
}
Ok::<_, HddsError>(())
})
})
.collect();

for handle in handles {
handle.join().unwrap()?;
}

Reader Optimization

Batch Reading

// Read multiple samples at once
let samples = reader.take()?; // Takes all available

// Or with explicit limit
let samples = reader.take_max(1000)?; // Up to 1000 samples

Parallel Processing

use rayon::prelude::*;

let samples = reader.take()?;

// Process in parallel
samples.par_iter().for_each(|sample| {
process(sample);
});

Multiple Readers (Partitioned)

// Create readers for different partitions
let reader_a = subscriber.create_reader_with_qos(
&topic,
DataReaderQos::default().partition(Partition::new(vec!["sensor_a"]))
)?;

let reader_b = subscriber.create_reader_with_qos(
&topic,
DataReaderQos::default().partition(Partition::new(vec!["sensor_b"]))
)?;

// Process in parallel threads

Transport Optimization

Large Socket Buffers

let config = TransportConfig::default()
.socket_receive_buffer_size(16 * 1024 * 1024) // 16 MB
.socket_send_buffer_size(16 * 1024 * 1024);

UDP with Fragmentation

// Enable for payloads > 64 KB
let config = TransportConfig::default()
.max_message_size(256 * 1024) // 256 KB max
.fragment_size(64 * 1024); // 64 KB fragments

Shared Memory for Same-Host

let config = TransportConfig::default()
.enable_shared_memory(true)
.shared_memory_segment_size(256 * 1024 * 1024); // 256 MB

Network Configuration

Linux Kernel Tuning

# Increase buffer limits
sysctl -w net.core.rmem_max=268435456
sysctl -w net.core.wmem_max=268435456
sysctl -w net.core.netdev_max_backlog=100000

# UDP buffer sizes
sysctl -w net.ipv4.udp_rmem_min=16384
sysctl -w net.ipv4.udp_wmem_min=16384

Network Interface

# Increase ring buffer
ethtool -G eth0 rx 4096 tx 4096

# Enable hardware offloading
ethtool -K eth0 gso on gro on tso on

# Set MTU (if supported)
ip link set eth0 mtu 9000 # Jumbo frames

Payload Optimization

Optimal Payload Sizes

Payload SizeEfficiencyUse Case
< 64 bytesLow (header overhead)Status flags
256-1024 bytesGoodSensor data
4-16 KBOptimalVideo frames
64 KB+Fragmentation overheadBulk transfer

Compression

use flate2::write::GzEncoder;
use flate2::Compression;

// Compress large payloads
let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
encoder.write_all(&large_data)?;
let compressed = encoder.finish()?;

sample.compressed_data = compressed;
writer.write(&sample)?;

Efficient Serialization

// Use arrays instead of sequences
struct FastData {
float values[1024]; // Fixed, no length prefix
};

// Use bounded strings
struct LimitedData {
string<64> name; // Max 64 chars
};

History and Resource Limits

Writer History

let qos = DataWriterQos::default()
.history(History::KeepLast { depth: 10000 })
.resource_limits(ResourceLimits {
max_samples: 10000,
max_instances: 1,
max_samples_per_instance: 10000,
});

Reader History

let qos = DataReaderQos::default()
.history(History::KeepLast { depth: 1000 })
.resource_limits(ResourceLimits {
max_samples: 100000,
max_instances: 100,
max_samples_per_instance: 1000,
});

Backpressure Handling

Flow Control

// Monitor write status
let status = writer.get_offered_deadline_missed_status()?;
if status.total_count > 0 {
// Slow down publishing
publish_rate *= 0.9;
}

// Check for blocked writes
match writer.write(&sample) {
Err(HddsError::Timeout) => {
eprintln!("Reader can't keep up!");
// Reduce rate or drop samples
}
_ => {}
}

Adaptive Rate

struct AdaptivePublisher {
writer: DataWriter<Data>,
rate: f64,
min_rate: f64,
max_rate: f64,
}

impl AdaptivePublisher {
fn publish(&mut self, sample: &Data) -> Result<(), HddsError> {
match self.writer.write(sample) {
Ok(()) => {
// Increase rate on success
self.rate = (self.rate * 1.01).min(self.max_rate);
Ok(())
}
Err(HddsError::WouldBlock) => {
// Decrease rate on backpressure
self.rate = (self.rate * 0.9).max(self.min_rate);
Err(HddsError::WouldBlock)
}
Err(e) => Err(e),
}
}
}

Measuring Throughput

use std::time::Instant;

let start = Instant::now();
let mut count = 0u64;
let mut bytes = 0u64;

loop {
writer.write(&sample)?;
count += 1;
bytes += sample_size;

if start.elapsed() >= Duration::from_secs(10) {
break;
}
}

let elapsed = start.elapsed().as_secs_f64();
println!("Throughput: {:.0} msg/s", count as f64 / elapsed);
println!("Bandwidth: {:.2} MB/s", bytes as f64 / elapsed / 1_000_000.0);

Throughput Checklist

  • Use BestEffort for non-critical high-rate data
  • Enable batching for small messages
  • Increase history depth for reliable delivery
  • Use large socket buffers
  • Enable shared memory for same-host
  • Pre-allocate and reuse samples
  • Use parallel writers/readers
  • Compress large payloads
  • Handle backpressure gracefully
  • Use optimal payload sizes (256-16KB)

Common Throughput Issues

IssueSymptomSolution
Small payloadsLow bandwidthBatch messages
Reliable blockingWriter stallsIncrease history
Slow readerDropped samplesAdd readers/partitions
Network saturationPacket lossReduce rate
CPU bottleneckLow throughputParallel processing

Next Steps