Skip to main content

Congestion Control

Adaptive rate limiting and priority-based traffic management for HDDS.

Overview

HDDS includes a sophisticated congestion control system that prevents network collapse under heavy load. The system uses:

  • AIMD Rate Control - Additive Increase / Multiplicative Decrease
  • Priority Queues - P0 (critical), P1 (normal), P2 (background)
  • Congestion Scoring - EWMA-based state machine
  • ECN Support - Explicit Congestion Notification (RFC 3168)

Architecture

┌─────────────────────────────────────────────────────────────┐
│ Participant │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ CongestionController │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────────────┐ │ │
│ │ │ Scorer │ │ RateCtrl │ │ BudgetAllocator │ │ │
│ │ │ - EWMA │ │ - AIMD │ │ - P0 reserve │ │ │
│ │ │ - state │ │ - cooldown │ │ - P1/P2 distribute │ │ │
│ │ └────────────┘ └────────────┘ └────────────────────┘ │ │
│ └───────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────────▼───────────────────────────┐ │
│ │ DataWriter │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ WriterPacer │ │ │
│ │ │ ┌──────────┐ ┌──────────────────────────────┐ │ │ │
│ │ │ │TokenBucket│ │ Queues │ │ │ │
│ │ │ │ (budget) │ │ ┌────┐ ┌────┐ ┌────┐ │ │ │ │
│ │ │ └──────────┘ │ │ P0 │ │ P1 │ │ P2 │ │ │ │ │
│ │ │ │ └────┘ └────┘ └────┘ │ │ │ │
│ │ │ └──────────────────────────────┘ │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘

Priority Levels

PriorityNameSemantics
P0CriticalNever dropped, guaranteed minimum budget, can bypass rate limit
P1NormalBest effort, drops oldest when queue full
P2BackgroundCoalesced by instance key ("last value wins"), dropped first

Use Cases

  • P0: Safety-critical commands, heartbeats, control signals
  • P1: Regular sensor data, status updates
  • P2: Telemetry, diagnostics, logging, bulk transfers

Configuration

CongestionConfig

use hdds::congestion::{CongestionConfig, Priority, BackpressurePolicy};
use std::time::Duration;

let config = CongestionConfig::new()
// Rate limits (bytes/sec)
.with_min_rate(10_000) // 10 KB/s floor
.with_max_rate(100_000_000) // 100 MB/s ceiling

// AIMD parameters
.with_ai_step(50_000) // +50 KB/s per stable window
.with_md_factors(0.5, 0.8) // hard: -50%, soft: -20%

// Queue sizes (samples)
.with_queue_sizes(100, 500, 100) // P0, P1, P2

// P0 protection
.with_p0_protection(0.2, 10_000) // 20% share, 10 KB/s minimum

// Scoring thresholds
.with_thresholds(60, 20) // decrease >= 60, increase <= 20

// Backpressure policy
.with_backpressure(BackpressurePolicy::BlockWithTimeout(
Duration::from_millis(500)
));

Default Values

ParameterDefaultDescription
min_rate_bps10,000Minimum guaranteed rate (10 KB/s)
max_rate_bps100,000,000Maximum allowed rate (100 MB/s)
ai_step_bps50,000Additive increase step (+50 KB/s)
md_factor_hard0.5Hard congestion decrease (-50%)
md_factor_soft0.8Soft congestion decrease (-20%)
stable_window_ms1000Stability window (1s)
cooldown_ms300Cooldown after decrease (300ms)
decrease_threshold60Score threshold for decrease
increase_threshold20Score threshold for increase
p0_min_share0.2P0 guaranteed share (20%)
max_queue_p0100P0 queue size
max_queue_p1500P1 queue size
max_queue_p2100P2 queue size (coalesced)
p2_coalescetrueEnable P2 coalescing
repair_budget_ratio0.3Max budget for retransmissions (30%)

Using WriterPacer

The WriterPacer manages per-writer rate limiting and priority queues:

use hdds::congestion::{CongestionConfig, WriterPacer, Priority, SendAction};

// Create a pacer with default config
let config = CongestionConfig::default();
let mut pacer = WriterPacer::new(config);

// Enqueue samples by priority
pacer.enqueue(critical_command, Priority::P0)?;
pacer.enqueue(sensor_reading, Priority::P1)?;
pacer.enqueue(telemetry_data, Priority::P2)?;

// Send in priority order with rate limiting
loop {
match pacer.try_send() {
SendAction::Send(sample) => {
transport.send(&sample.data)?;
}
SendAction::Wait(duration) => {
// Rate limited, wait before retry
std::thread::sleep(duration);
}
SendAction::Empty => {
// No more samples
break;
}
}
}

Using CongestionController

The CongestionController orchestrates all components at the participant level:

use hdds::congestion::{CongestionController, CongestionConfig, Priority};

let mut controller = CongestionController::new(CongestionConfig::default());

// Register writers with their priority
controller.register_writer(writer_id_1, Priority::P0);
controller.register_writer(writer_id_2, Priority::P1);
controller.register_writer(writer_id_3, Priority::P2);

// Call tick() periodically (every ~100ms)
if let Some(budget_updates) = controller.tick() {
for update in budget_updates {
// Apply new budget to writer
writers[update.writer_id].set_budget(update.budget_bps);
}
}

// Report congestion signals
controller.on_eagain(); // EAGAIN/ENOBUFS from socket
controller.on_nack(); // NACK from reader
controller.on_nacks(5); // Multiple NACKs
controller.on_rtt_sample(peer_id, 150.0); // RTT measurement (ms)

// Query state
let rate = controller.current_rate(); // Current rate (bytes/sec)
let score = controller.score(); // Congestion score (0-100)
let congested = controller.is_congested();

Congestion Signals

The controller responds to multiple congestion signals:

SignalTypeImpulseDescription
EAGAIN/ENOBUFSHard60Socket buffer full
RTT InflationSoft20RTT > 2x baseline
High NACK RateSoft20> 10 NACKs/sec
ECN CE MarkSoft20Congestion Experienced

Congestion States

┌─────────┐  score >= 60  ┌───────────┐
│ Stable │ ────────────► │ Congested │
│ │ │ │
└────┬────┘ └─────┬─────┘
│ │
│ score <= 20 │ score <= 20 + cooldown
│ │
▼ ▼
┌─────────┐ ┌───────────┐
│Increasing│◄─────────────│ Cooldown │
└─────────┘ └───────────┘

P2 Coalescing

P2 queue uses "last value wins" semantics for the same instance key:

use hdds::congestion::{CoalescingQueue, InstanceKey};

let mut queue = CoalescingQueue::new(100); // Max 100 unique instances

// Same instance key -> later value replaces earlier
let key = InstanceKey::new(topic_id, instance_handle);

queue.insert(old_telemetry, key.clone());
queue.insert(new_telemetry, key.clone()); // Replaces old_telemetry

assert_eq!(queue.len(), 1); // Only one sample
assert_eq!(queue.coalesced_count(), 1); // One replacement

ECN Support

ECN (Explicit Congestion Notification) allows routers to signal congestion without dropping packets:

use hdds::congestion::{CongestionConfig, CongestionController, EcnMode};

let config = CongestionConfig {
ecn_mode: EcnMode::Opportunistic, // Use ECN if supported
..Default::default()
};

let mut controller = CongestionController::new(config);

// Report ECN TOS byte from received packets
let tos = received_packet.ip_tos();
if controller.on_ecn_tos(tos) {
// Congestion signaled
}

// Or report CE mark directly
controller.on_ecn_ce();

// Check ECN statistics
if let Some(stats) = controller.ecn_stats() {
println!("ECT0: {}, ECT1: {}, CE: {}",
stats.ect0_received, stats.ect1_received, stats.ce_received);
}

ECN Modes

ModeDescription
OffECN disabled (default)
OpportunisticUse ECN if OS and peers support it
MandatoryRequire ECN, error if unsupported

Backpressure Policies

When queues or token buckets are exhausted:

use hdds::congestion::BackpressurePolicy;
use std::time::Duration;

// Return error immediately (default)
BackpressurePolicy::ReturnError

// Block until timeout
BackpressurePolicy::BlockWithTimeout(Duration::from_millis(500))

// Drop oldest sample (dangerous for P0!)
BackpressurePolicy::DropOldest

Metrics

let metrics = controller.metrics();

println!("Rate: {} bytes/sec", metrics.current_rate_bps);
println!("EAGAIN events: {}", metrics.eagain_total);
println!("Bytes sent: {}", metrics.bytes_sent);
println!("P0 sent: {}, P1 sent: {}, P2 sent: {}",
metrics.p0_sent, metrics.p1_sent, metrics.p2_sent);

// Transport feedback
let feedback = controller.transport_feedback();
println!("OK: {}, EAGAIN: {}, Errors: {}",
feedback.sends_ok, feedback.eagain_count, feedback.error_count);

Disabling Congestion Control

For testing or specific use cases:

// At configuration time
let config = CongestionConfig::disabled();

// At runtime
controller.set_enabled(false);

Best Practices

  1. Assign priorities carefully - Only use P0 for truly critical traffic
  2. Monitor metrics - Watch EAGAIN rate and score
  3. Tune for your network - Adjust ai_step and md_factors based on RTT
  4. Use P2 for telemetry - Coalescing prevents queue buildup
  5. Set appropriate queue sizes - Larger queues = more latency
  6. Enable ECN - If your network supports it, use Opportunistic mode

Example: Mixed Priority Traffic

use hdds::congestion::{CongestionConfig, CongestionController, WriterPacer, Priority};

// Configure for mixed traffic
let config = CongestionConfig::new()
.with_queue_sizes(50, 200, 100) // P0 small, P1 larger
.with_p0_protection(0.3, 20_000) // 30% for critical traffic
.with_max_rate(10_000_000); // 10 MB/s cap

let mut controller = CongestionController::new(config.clone());

// Create pacers per writer
let mut safety_writer = WriterPacer::new(config.clone());
let mut sensor_writer = WriterPacer::new(config.clone());
let mut telemetry_writer = WriterPacer::new(config);

// Enqueue with appropriate priorities
safety_writer.enqueue(estop_command, Priority::P0)?;
sensor_writer.enqueue(lidar_scan, Priority::P1)?;
telemetry_writer.enqueue(cpu_usage, Priority::P2)?;