Congestion Control
Adaptive rate limiting and priority-based traffic management for HDDS.
Overview
HDDS includes a sophisticated congestion control system that prevents network collapse under heavy load. The system uses:
- AIMD Rate Control - Additive Increase / Multiplicative Decrease
- Priority Queues - P0 (critical), P1 (normal), P2 (background)
- Congestion Scoring - EWMA-based state machine
- ECN Support - Explicit Congestion Notification (RFC 3168)
Architecture
┌─────────────────────────────────────────────────────────────┐
│ Participant │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ CongestionController │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────────────┐ │ │
│ │ │ Scorer │ │ RateCtrl │ │ BudgetAllocator │ │ │
│ │ │ - EWMA │ │ - AIMD │ │ - P0 reserve │ │ │
│ │ │ - state │ │ - cooldown │ │ - P1/P2 distribute │ │ │
│ │ └────────────┘ └────────────┘ └────────────────────┘ │ │
│ └───────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────────▼───────────────────────────┐ │
│ │ DataWriter │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ WriterPacer │ │ │
│ │ │ ┌──────────┐ ┌──────────────────────────────┐ │ │ │
│ │ │ │TokenBucket│ │ Queues │ │ │ │
│ │ │ │ (budget) │ │ ┌────┐ ┌────┐ ┌────┐ │ │ │ │
│ │ │ └──────────┘ │ │ P0 │ │ P1 │ │ P2 │ │ │ │ │
│ │ │ │ └────┘ └────┘ └────┘ │ │ │ │
│ │ │ └──────────────────────────────┘ │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Priority Levels
| Priority | Name | Semantics |
|---|---|---|
| P0 | Critical | Never dropped, guaranteed minimum budget, can bypass rate limit |
| P1 | Normal | Best effort, drops oldest when queue full |
| P2 | Background | Coalesced by instance key ("last value wins"), dropped first |
Use Cases
- P0: Safety-critical commands, heartbeats, control signals
- P1: Regular sensor data, status updates
- P2: Telemetry, diagnostics, logging, bulk transfers
Configuration
CongestionConfig
use hdds::congestion::{CongestionConfig, Priority, BackpressurePolicy};
use std::time::Duration;
let config = CongestionConfig::new()
// Rate limits (bytes/sec)
.with_min_rate(10_000) // 10 KB/s floor
.with_max_rate(100_000_000) // 100 MB/s ceiling
// AIMD parameters
.with_ai_step(50_000) // +50 KB/s per stable window
.with_md_factors(0.5, 0.8) // hard: -50%, soft: -20%
// Queue sizes (samples)
.with_queue_sizes(100, 500, 100) // P0, P1, P2
// P0 protection
.with_p0_protection(0.2, 10_000) // 20% share, 10 KB/s minimum
// Scoring thresholds
.with_thresholds(60, 20) // decrease >= 60, increase <= 20
// Backpressure policy
.with_backpressure(BackpressurePolicy::BlockWithTimeout(
Duration::from_millis(500)
));
Default Values
| Parameter | Default | Description |
|---|---|---|
min_rate_bps | 10,000 | Minimum guaranteed rate (10 KB/s) |
max_rate_bps | 100,000,000 | Maximum allowed rate (100 MB/s) |
ai_step_bps | 50,000 | Additive increase step (+50 KB/s) |
md_factor_hard | 0.5 | Hard congestion decrease (-50%) |
md_factor_soft | 0.8 | Soft congestion decrease (-20%) |
stable_window_ms | 1000 | Stability window (1s) |
cooldown_ms | 300 | Cooldown after decrease (300ms) |
decrease_threshold | 60 | Score threshold for decrease |
increase_threshold | 20 | Score threshold for increase |
p0_min_share | 0.2 | P0 guaranteed share (20%) |
max_queue_p0 | 100 | P0 queue size |
max_queue_p1 | 500 | P1 queue size |
max_queue_p2 | 100 | P2 queue size (coalesced) |
p2_coalesce | true | Enable P2 coalescing |
repair_budget_ratio | 0.3 | Max budget for retransmissions (30%) |
Using WriterPacer
The WriterPacer manages per-writer rate limiting and priority queues:
use hdds::congestion::{CongestionConfig, WriterPacer, Priority, SendAction};
// Create a pacer with default config
let config = CongestionConfig::default();
let mut pacer = WriterPacer::new(config);
// Enqueue samples by priority
pacer.enqueue(critical_command, Priority::P0)?;
pacer.enqueue(sensor_reading, Priority::P1)?;
pacer.enqueue(telemetry_data, Priority::P2)?;
// Send in priority order with rate limiting
loop {
match pacer.try_send() {
SendAction::Send(sample) => {
transport.send(&sample.data)?;
}
SendAction::Wait(duration) => {
// Rate limited, wait before retry
std::thread::sleep(duration);
}
SendAction::Empty => {
// No more samples
break;
}
}
}
Using CongestionController
The CongestionController orchestrates all components at the participant level:
use hdds::congestion::{CongestionController, CongestionConfig, Priority};
let mut controller = CongestionController::new(CongestionConfig::default());
// Register writers with their priority
controller.register_writer(writer_id_1, Priority::P0);
controller.register_writer(writer_id_2, Priority::P1);
controller.register_writer(writer_id_3, Priority::P2);
// Call tick() periodically (every ~100ms)
if let Some(budget_updates) = controller.tick() {
for update in budget_updates {
// Apply new budget to writer
writers[update.writer_id].set_budget(update.budget_bps);
}
}
// Report congestion signals
controller.on_eagain(); // EAGAIN/ENOBUFS from socket
controller.on_nack(); // NACK from reader
controller.on_nacks(5); // Multiple NACKs
controller.on_rtt_sample(peer_id, 150.0); // RTT measurement (ms)
// Query state
let rate = controller.current_rate(); // Current rate (bytes/sec)
let score = controller.score(); // Congestion score (0-100)
let congested = controller.is_congested();
Congestion Signals
The controller responds to multiple congestion signals:
| Signal | Type | Impulse | Description |
|---|---|---|---|
| EAGAIN/ENOBUFS | Hard | 60 | Socket buffer full |
| RTT Inflation | Soft | 20 | RTT > 2x baseline |
| High NACK Rate | Soft | 20 | > 10 NACKs/sec |
| ECN CE Mark | Soft | 20 | Congestion Experienced |
Congestion States
┌─────────┐ score >= 60 ┌───────────┐
│ Stable │ ────────────► │ Congested │
│ │ │ │
└────┬────┘ └─────┬─────┘
│ │
│ score <= 20 │ score <= 20 + cooldown
│ │
▼ ▼
┌─────────┐ ┌───────────┐
│Increasing│◄─────────────│ Cooldown │
└─────────┘ └───────────┘
P2 Coalescing
P2 queue uses "last value wins" semantics for the same instance key:
use hdds::congestion::{CoalescingQueue, InstanceKey};
let mut queue = CoalescingQueue::new(100); // Max 100 unique instances
// Same instance key -> later value replaces earlier
let key = InstanceKey::new(topic_id, instance_handle);
queue.insert(old_telemetry, key.clone());
queue.insert(new_telemetry, key.clone()); // Replaces old_telemetry
assert_eq!(queue.len(), 1); // Only one sample
assert_eq!(queue.coalesced_count(), 1); // One replacement
ECN Support
ECN (Explicit Congestion Notification) allows routers to signal congestion without dropping packets:
use hdds::congestion::{CongestionConfig, CongestionController, EcnMode};
let config = CongestionConfig {
ecn_mode: EcnMode::Opportunistic, // Use ECN if supported
..Default::default()
};
let mut controller = CongestionController::new(config);
// Report ECN TOS byte from received packets
let tos = received_packet.ip_tos();
if controller.on_ecn_tos(tos) {
// Congestion signaled
}
// Or report CE mark directly
controller.on_ecn_ce();
// Check ECN statistics
if let Some(stats) = controller.ecn_stats() {
println!("ECT0: {}, ECT1: {}, CE: {}",
stats.ect0_received, stats.ect1_received, stats.ce_received);
}
ECN Modes
| Mode | Description |
|---|---|
Off | ECN disabled (default) |
Opportunistic | Use ECN if OS and peers support it |
Mandatory | Require ECN, error if unsupported |
Backpressure Policies
When queues or token buckets are exhausted:
use hdds::congestion::BackpressurePolicy;
use std::time::Duration;
// Return error immediately (default)
BackpressurePolicy::ReturnError
// Block until timeout
BackpressurePolicy::BlockWithTimeout(Duration::from_millis(500))
// Drop oldest sample (dangerous for P0!)
BackpressurePolicy::DropOldest
Metrics
let metrics = controller.metrics();
println!("Rate: {} bytes/sec", metrics.current_rate_bps);
println!("EAGAIN events: {}", metrics.eagain_total);
println!("Bytes sent: {}", metrics.bytes_sent);
println!("P0 sent: {}, P1 sent: {}, P2 sent: {}",
metrics.p0_sent, metrics.p1_sent, metrics.p2_sent);
// Transport feedback
let feedback = controller.transport_feedback();
println!("OK: {}, EAGAIN: {}, Errors: {}",
feedback.sends_ok, feedback.eagain_count, feedback.error_count);
Disabling Congestion Control
For testing or specific use cases:
// At configuration time
let config = CongestionConfig::disabled();
// At runtime
controller.set_enabled(false);
Best Practices
- Assign priorities carefully - Only use P0 for truly critical traffic
- Monitor metrics - Watch EAGAIN rate and score
- Tune for your network - Adjust
ai_stepandmd_factorsbased on RTT - Use P2 for telemetry - Coalescing prevents queue buildup
- Set appropriate queue sizes - Larger queues = more latency
- Enable ECN - If your network supports it, use
Opportunisticmode
Example: Mixed Priority Traffic
use hdds::congestion::{CongestionConfig, CongestionController, WriterPacer, Priority};
// Configure for mixed traffic
let config = CongestionConfig::new()
.with_queue_sizes(50, 200, 100) // P0 small, P1 larger
.with_p0_protection(0.3, 20_000) // 30% for critical traffic
.with_max_rate(10_000_000); // 10 MB/s cap
let mut controller = CongestionController::new(config.clone());
// Create pacers per writer
let mut safety_writer = WriterPacer::new(config.clone());
let mut sensor_writer = WriterPacer::new(config.clone());
let mut telemetry_writer = WriterPacer::new(config);
// Enqueue with appropriate priorities
safety_writer.enqueue(estop_command, Priority::P0)?;
sensor_writer.enqueue(lidar_scan, Priority::P1)?;
telemetry_writer.enqueue(cpu_usage, Priority::P2)?;