Throughput Optimization
Guide to achieving maximum throughput in HDDS applications.
Throughput Factors
Throughput = min(
Serialization rate,
Network bandwidth,
Writer capacity,
Reader capacity,
Transport efficiency
)
QoS Configuration
Best Effort (Maximum Throughput)
let qos = DataWriterQos::default()
.reliability(Reliability::BestEffort)
.history(History::KeepLast { depth: 1 })
.durability(Durability::Volatile);
High-Throughput Reliable
let qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_secs(1),
})
.history(History::KeepLast { depth: 1000 }) // Large buffer
.durability(Durability::Volatile);
Large history depth prevents blocking when reader is slow.
Batching
Enable Batching
let config = DataWriterConfig::default()
.batching_enabled(true)
.max_batch_size(64 * 1024) // 64 KB batches
.batch_flush_period(Duration::from_millis(1));
Manual Batching
// Collect samples
let batch: Vec<SensorData> = collect_samples();
// Write in burst
for sample in batch {
writer.write(&sample)?;
}
// Explicit flush
writer.flush()?;
Writer Optimization
Asynchronous Write
// Non-blocking write (queue and continue)
let qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::ZERO, // Non-blocking
})
.history(History::KeepLast { depth: 10000 });
// Write without waiting
match writer.write(&sample) {
Ok(()) => { /* queued */ }
Err(HddsError::WouldBlock) => {
// Buffer full, handle backpressure
}
Err(e) => return Err(e.into()),
}
Pre-allocate Samples
// Reuse sample to avoid allocation
let mut sample = SensorData::default();
for i in 0..1_000_000 {
sample.sensor_id = 1;
sample.value = get_value(i);
sample.timestamp = now();
writer.write(&sample)?;
}
Multiple Writers
// Parallel writers for higher throughput
use std::thread;
let handles: Vec<_> = (0..4)
.map(|i| {
let participant = participant.clone();
thread::spawn(move || {
let publisher = participant.create_publisher()?;
let writer = publisher.create_writer(&topic)?;
for j in 0..250_000 {
writer.write(&sample)?;
}
Ok::<_, HddsError>(())
})
})
.collect();
for handle in handles {
handle.join().unwrap()?;
}
Reader Optimization
Batch Reading
// Read multiple samples at once
let samples = reader.take()?; // Takes all available
// Or with explicit limit
let samples = reader.take_max(1000)?; // Up to 1000 samples
Parallel Processing
use rayon::prelude::*;
let samples = reader.take()?;
// Process in parallel
samples.par_iter().for_each(|sample| {
process(sample);
});
Multiple Readers (Partitioned)
// Create readers for different partitions
let reader_a = subscriber.create_reader_with_qos(
&topic,
DataReaderQos::default().partition(Partition::new(vec!["sensor_a"]))
)?;
let reader_b = subscriber.create_reader_with_qos(
&topic,
DataReaderQos::default().partition(Partition::new(vec!["sensor_b"]))
)?;
// Process in parallel threads
Transport Optimization
Large Socket Buffers
let config = TransportConfig::default()
.socket_receive_buffer_size(16 * 1024 * 1024) // 16 MB
.socket_send_buffer_size(16 * 1024 * 1024);
UDP with Fragmentation
// Enable for payloads > 64 KB
let config = TransportConfig::default()
.max_message_size(256 * 1024) // 256 KB max
.fragment_size(64 * 1024); // 64 KB fragments
Shared Memory for Same-Host
let config = TransportConfig::default()
.enable_shared_memory(true)
.shared_memory_segment_size(256 * 1024 * 1024); // 256 MB
Network Configuration
Linux Kernel Tuning
# Increase buffer limits
sysctl -w net.core.rmem_max=268435456
sysctl -w net.core.wmem_max=268435456
sysctl -w net.core.netdev_max_backlog=100000
# UDP buffer sizes
sysctl -w net.ipv4.udp_rmem_min=16384
sysctl -w net.ipv4.udp_wmem_min=16384
Network Interface
# Increase ring buffer
ethtool -G eth0 rx 4096 tx 4096
# Enable hardware offloading
ethtool -K eth0 gso on gro on tso on
# Set MTU (if supported)
ip link set eth0 mtu 9000 # Jumbo frames
Payload Optimization
Optimal Payload Sizes
| Payload Size | Efficiency | Use Case |
|---|---|---|
| < 64 bytes | Low (header overhead) | Status flags |
| 256-1024 bytes | Good | Sensor data |
| 4-16 KB | Optimal | Video frames |
| 64 KB+ | Fragmentation overhead | Bulk transfer |
Compression
use flate2::write::GzEncoder;
use flate2::Compression;
// Compress large payloads
let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
encoder.write_all(&large_data)?;
let compressed = encoder.finish()?;
sample.compressed_data = compressed;
writer.write(&sample)?;
Efficient Serialization
// Use arrays instead of sequences
struct FastData {
float values[1024]; // Fixed, no length prefix
};
// Use bounded strings
struct LimitedData {
string<64> name; // Max 64 chars
};
History and Resource Limits
Writer History
let qos = DataWriterQos::default()
.history(History::KeepLast { depth: 10000 })
.resource_limits(ResourceLimits {
max_samples: 10000,
max_instances: 1,
max_samples_per_instance: 10000,
});
Reader History
let qos = DataReaderQos::default()
.history(History::KeepLast { depth: 1000 })
.resource_limits(ResourceLimits {
max_samples: 100000,
max_instances: 100,
max_samples_per_instance: 1000,
});
Backpressure Handling
Flow Control
// Monitor write status
let status = writer.get_offered_deadline_missed_status()?;
if status.total_count > 0 {
// Slow down publishing
publish_rate *= 0.9;
}
// Check for blocked writes
match writer.write(&sample) {
Err(HddsError::Timeout) => {
eprintln!("Reader can't keep up!");
// Reduce rate or drop samples
}
_ => {}
}
Adaptive Rate
struct AdaptivePublisher {
writer: DataWriter<Data>,
rate: f64,
min_rate: f64,
max_rate: f64,
}
impl AdaptivePublisher {
fn publish(&mut self, sample: &Data) -> Result<(), HddsError> {
match self.writer.write(sample) {
Ok(()) => {
// Increase rate on success
self.rate = (self.rate * 1.01).min(self.max_rate);
Ok(())
}
Err(HddsError::WouldBlock) => {
// Decrease rate on backpressure
self.rate = (self.rate * 0.9).max(self.min_rate);
Err(HddsError::WouldBlock)
}
Err(e) => Err(e),
}
}
}
Measuring Throughput
use std::time::Instant;
let start = Instant::now();
let mut count = 0u64;
let mut bytes = 0u64;
loop {
writer.write(&sample)?;
count += 1;
bytes += sample_size;
if start.elapsed() >= Duration::from_secs(10) {
break;
}
}
let elapsed = start.elapsed().as_secs_f64();
println!("Throughput: {:.0} msg/s", count as f64 / elapsed);
println!("Bandwidth: {:.2} MB/s", bytes as f64 / elapsed / 1_000_000.0);
Throughput Checklist
- Use BestEffort for non-critical high-rate data
- Enable batching for small messages
- Increase history depth for reliable delivery
- Use large socket buffers
- Enable shared memory for same-host
- Pre-allocate and reuse samples
- Use parallel writers/readers
- Compress large payloads
- Handle backpressure gracefully
- Use optimal payload sizes (256-16KB)
Common Throughput Issues
| Issue | Symptom | Solution |
|---|---|---|
| Small payloads | Low bandwidth | Batch messages |
| Reliable blocking | Writer stalls | Increase history |
| Slow reader | Dropped samples | Add readers/partitions |
| Network saturation | Packet loss | Reduce rate |
| CPU bottleneck | Low throughput | Parallel processing |
Next Steps
- Latency Tuning - Minimize latency
- Benchmarks - Performance baselines