Skip to content

Conversation

@datagutt
Copy link
Member

@datagutt datagutt commented Jan 7, 2026

Summary by CodeRabbit

  • New Features

    • Batched UDP send/receive and queued packet sender to reduce syscall overhead.
    • Public sequence-tracking and toggle-snapshot APIs; cached quality-multiplier accessor.
  • Performance Improvements

    • Fixed-size, O(1) sequence tracking, fewer atomic loads, batching across send/recv, and faster selection/NAK/keepalive paths.
  • Documentation

    • Added a Performance Optimization Plan.
  • Tests

    • New/updated unit and integration tests for batching, sequence tracking, and recovery logic.

✏️ Tip: You can customize this high-level summary in your review settings.

datagutt and others added 24 commits January 5, 2026 02:09
**Problem**: `now_ms()` calls `SystemTime::now()` which is a syscall. Currently called 3-5 times per packet in:
- `connection/mod.rs:340` - `register_packet()`
- `packet_handler.rs:318,338` - `forward_via_connection()`
- `connection/mod.rs:375` - `handle_srt_ack()`
- `quality.rs:52` - `calculate_quality_multiplier()`

**Current Code**:
```rust
// Called multiple times per packet
pub fn now_ms() -> u64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_else(|_| std::time::Duration::from_millis(0))
        .as_millis() as u64
}
```

**Solution**: Capture timestamp once at packet receive and pass through function chain.

```rust
// In handle_srt_packet, capture once at entry:
let packet_time_ms = now_ms();
let packet_instant = Instant::now();

// Pass to all downstream functions
forward_via_connection(
    sel_idx, pkt, seq, connections,
    last_selected_idx, last_switch_time_ms,
    seq_to_conn, seq_order,
    packet_time_ms,  // Add parameter
).await;

// Update register_packet signature
pub fn register_packet(&mut self, seq: i32, send_time_ms: u64) {
    let idx = self.packet_idx % PKT_LOG_SIZE;
    self.packet_log[idx] = seq;
    self.packet_send_times_ms[idx] = send_time_ms;  // Use passed value
    // ...
}
```

**Files to Modify**:
- `src/utils.rs` - Document hot path usage
- `src/sender/packet_handler.rs` - Capture timestamp at entry
- `src/connection/mod.rs` - Update `register_packet()`, `send_data_with_tracking()`
- `src/sender/selection/quality.rs` - Accept timestamp parameter

**Estimated Impact**: 3-4 fewer syscalls per packet = ~15-20% CPU reduction in hot path
**Problem**: 3 atomic loads per packet.

**Current Code** (`packet_handler.rs:245-253`):
```rust
let enable_quality = toggles.quality_scoring_enabled.load(Ordering::Relaxed);
let enable_explore = toggles.exploration_enabled.load(Ordering::Relaxed);
let classic = toggles.classic_mode.load(Ordering::Relaxed);
```

**Solution**: Create a snapshot struct, load once at start of select iteration.

```rust
#[derive(Clone, Copy)]
pub struct ToggleSnapshot {
    pub classic_mode: bool,
    pub quality_scoring_enabled: bool,
    pub exploration_enabled: bool,
}

impl DynamicToggles {
    #[inline]
    pub fn snapshot(&self) -> ToggleSnapshot {
        ToggleSnapshot {
            classic_mode: self.classic_mode.load(Ordering::Relaxed),
            quality_scoring_enabled: self.quality_scoring_enabled.load(Ordering::Relaxed),
            exploration_enabled: self.exploration_enabled.load(Ordering::Relaxed),
        }
    }
}

// In main select loop:
loop {
    let toggle_snap = toggles.snapshot(); // Once per iteration

    tokio::select! {
        res = local_listener.recv_from(&mut recv_buf) => {
            handle_srt_packet(..., &toggle_snap).await;
        }
        // ...
    }
}
```
…oved performance; try synchronous send first
…on all connections.

Moblin does this with standard 10-byte keepalives; we use extended 38-byte keepalives to provide the receiver with telemetry (window, RTT, NAKs, bitrate).

Extended keepalives should always be sent for best performance; unlike C version which only sends keepalive for idle connections
…story

Previously, connections that never received NAKs (last_nak_time_ms == 0)
were excluded from time-based window recovery. This caused connections
to get stuck at their initial window size (20,000) after reconnection
when they didn't receive enough traffic for ACK-based growth.

This fix treats 'never had NAK' as 'perfect connection' by using
u64::MAX for time_since_last_nak, enabling aggressive recovery (200%
rate) for these healthy connections.

Note: A similar change was made earlier and then reverted in commit
a48aa8b. The reason for that revert
is unclear, but the current implementation causes a feedback loop where
connections can become permanently stuck at low throughput, especially
when combined with receiver-side ACK throttling based on extended
keepalive telemetry.
@coderabbitai
Copy link

coderabbitai bot commented Jan 7, 2026

Warning

Rate limit exceeded

@datagutt has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 21 minutes and 0 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 9ad4fa6 and 0c2248f.

📒 Files selected for processing (2)
  • src/connection/mod.rs
  • src/test_helpers.rs

Walkthrough

Adds cross-platform batched UDP I/O (recv/send), a fixed-size ring-buffer SequenceTracker, batch-aware sender and connection APIs, quality caching and toggle snapshots, time-based congestion-recovery tuning, and corresponding tests, helpers, and docs. Multiple public signatures updated to thread new abstractions through hot paths.

Changes

Cohort / File(s) Summary
Dependencies
Cargo.toml
Added rustc-hash = "2.1" and unix-only libc = "0.2".
Documentation
docs/PERFORMANCE_OPTIMIZATION_PLAN.md
New performance optimization plan with phases, sketches, profiling commands, and dependency recommendations.
Batch I/O
src/connection/batch_recv.rs, src/connection/batch_send.rs
New cross-platform BatchUdpSocket and RecvMmsgBuffer (Linux recvmmsg + fallback); BatchSender buffers up to 16 packets and flushes on threshold/time. Exposes BatchUdpSocket, RecvMmsgBuffer, BatchSender, BATCH_RECV_SIZE, BATCH_SIZE_THRESHOLD.
Connection Core
src/connection/mod.rs
SrtlaConnection now uses Arc<BatchUdpSocket>, FxHashMap packet_log, highest_acked_seq, last_keepalive_sent, quality_cache, and batch_sender. New APIs: queue_data_packet, needs_batch_flush, has_queued_packets, flush_batch, get_cached_quality_multiplier, and updated reconnect/reset behavior.
Uplink Reader
src/sender/uplink.rs
Reader switched to BatchUdpSocket::recv_batch + shared RecvMmsgBuffer; spawn_reader accepts Arc<BatchUdpSocket>; batch errors signaled differently with backoff.
Batch Sender Integration
src/sender/packet_handler.rs, src/sender/mod.rs
Integrated batching: per-connection queues, periodic 15ms flush timer, flush_all_batches; threaded SequenceTracker and ToggleSnapshot through hot paths; removed prior seq map/VecDeque usage.
Sequence Tracking
src/sender/sequence.rs, src/sender/mod.rs
New zero-allocation fixed-size SequenceTracker with SEQ_TRACKING_SIZE, SEQUENCE_TRACKING_MAX_AGE_MS, and methods insert, get, remove_connection; old HashMap/VecDeque cleanup removed.
Selection & Quality
src/sender/selection/*
Selection functions now accept &mut [SrtlaConnection] and flags (enable_quality, enable_explore, classic); quality functions take current_time_ms; caching and cold-path logging added; inline hints applied.
Congestion Control
src/connection/congestion/enhanced.rs
perform_window_recovery treats missing NAK history as long-ago, adds explicit time thresholds (>10s, >7s, >5s, else) to scale recovery, unconditionally evaluates NAK-burst logic; tests added.
Housekeeping & Sender API
src/sender/housekeeping.rs, src/sender/packet_handler.rs
Removed periodic sequence-cleanup; handle_housekeeping signature changed to accept packet_tx and updated to use SequenceTracker.
Toggles
src/toggles.rs
Added ToggleSnapshot and DynamicToggles::snapshot() to capture toggles in one relaxed read for hot paths.
Tests & Helpers
src/test_helpers.rs, src/tests/*
New test constructors using BatchUdpSocket/BatchSender and FxHashMap packet_log; tests updated to use SequenceTracker, mutable connection slices, time-parameterized quality calls, and register_packet(seq, timestamp).

Sequence Diagram(s)

sequenceDiagram
    rect rgba(235,245,255,0.6)
    participant Client as Uplink Producer
    participant Handler as Packet Handler
    participant Selector as Selection Logic
    participant Conn as SrtlaConnection
    participant Batch as BatchSender
    participant UDP as BatchUdpSocket
    end

    Client->>Handler: handle_uplink_packet(packet)
    Handler->>Selector: select_connection_idx(&mut conns, ..., toggle_snap)
    Selector->>Conn: get_cached_quality_multiplier(now_ms)
    alt cache valid
        Conn-->>Selector: multiplier
    else
        Conn->>Conn: calculate_quality_multiplier_uncached(now_ms)
        Conn-->>Selector: multiplier (and cache)
    end
    Selector-->>Handler: chosen connection idx
    Handler->>Conn: queue_data_packet(data, seq, time)
    Conn->>Batch: queue_packet(...)
    Note over Batch: buffer until threshold or timer (15ms)
    alt flush condition
        Conn->>Batch: flush(socket)
        Batch->>UDP: send() × N packets
        UDP-->>Batch: send results
        Batch-->>Conn: Vec<(seq, enqueue_time)>
        Conn->>Conn: register_packet per flushed seq
    end
Loading
sequenceDiagram
    participant SRT as SRT Stack
    participant Conn as SrtlaConnection
    participant Seq as SequenceTracker
    participant Cong as Congestion Control

    SRT->>Conn: handle_srt_ack(ack_seq)
    Conn->>Seq: get(ack_seq, now_ms)
    alt entry found
        Seq-->>Conn: conn_id
        Conn->>Conn: handle_srtla_ack_specific(conn_id)
        Conn->>Cong: update_ack_state()
    else
        Seq-->>Conn: None (expired/miss)
        Conn->>Conn: fallback handling
    end

    SRT->>Conn: handle_nak(seq)
    Conn->>Cong: perform_window_recovery(now_ms)
    note right of Cong: time_since_last_nak used to scale recovery (>10s, >7s, >5s, else)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Poem

🐰 Batches hop, buffers hum, and rings spin light,
O(1) lookups find their target in the night.
Quality cached each fifty ticks so bright,
Toggles snap once and packets take flight.
A rabbit cheers: fast paths hop into sight! 🚀

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 inconclusive)
Check name Status Explanation Resolution
Title check ❓ Inconclusive The title 'Refactor/optimizations' is vague and generic, using non-descriptive terms that don't clearly convey the main objectives of the substantial changeset. Consider using a more specific title that highlights the primary optimization focus, such as 'Add batch UDP send/receive and performance optimizations' or 'Implement packet batching and quality caching for improved throughput'.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch refactor/optimizations

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🤖 Fix all issues with AI agents
In @Cargo.toml:
- Around line 22-24: The cross-platform crates (tokio, tracing,
tracing-subscriber, socket2, bytes, chrono, smallvec, mimalloc) are incorrectly
placed under [target.'cfg(unix)'.dependencies]; move each of these entries out
of that target-specific table into the main [dependencies] table so they are
available on all platforms, keeping their versions and feature lists intact, and
leave only truly Unix-only crates (e.g., libc) under
[target.'cfg(unix)'.dependencies].

In @src/connection/batch_recv.rs:
- Around line 21-22: The module `unix_impl` is incorrectly enabled for all Unix
targets but uses Linux-only syscalls (`recvmmsg`, `mmsghdr`); change its cfg
from `#[cfg(unix)]` to a Linux-only guard such as `#[cfg(target_os = "linux")]`,
and update any related re-exports (the items re-exporting from `unix_impl`) to
the same Linux-specific cfg so macOS builds won't reference Linux-only symbols;
locate the `mod unix_impl { ... }` declaration and corresponding `pub use` or
`cfg` lines and replace their `unix` cfg with `target_os = "linux"`.
- Around line 259-264: The Default impl for RecvMmsgBuffer uses unsafe
std::mem::zeroed() leaving internal iov/mmsghdr pointers null and producing an
unusable buffer; fix by removing the Default impl entirely or by making Default
produce a properly-initialized instance instead of zeroed memory. Concretely:
either delete the impl Default for RecvMmsgBuffer block, or add/adjust an
initializer so Default::default() delegates to a safe constructor (e.g., change
RecvMmsgBuffer::new() to return Self or add RecvMmsgBuffer::new_unboxed() that
returns Self) and have Default call that initializer rather than using zeroed().
- Around line 326-340: The non-Unix fallback_impl module references
tokio::net::UdpSocket (used in struct BatchUdpSocket) but tokio is only declared
under the unix-target deps, so the crate won't compile on Windows; either move
tokio to the top-level [dependencies] in Cargo.toml so tokio::net::UdpSocket is
available for the fallback_impl, or rewrite the fallback_impl/BatchUdpSocket to
avoid tokio (e.g., use std::net::UdpSocket with a blocking-to-async bridge or a
Windows-specific async API) and adjust any usages of MTU/BatchUdpSocket to match
the new API.
- Around line 46-48: Remove the manual unsafe impl blocks for Send and Sync on
BatchUdpSocket: delete the two lines `unsafe impl Send for BatchUdpSocket {}`
and `unsafe impl Sync for BatchUdpSocket {}` since AsyncFd<Socket> already
implements Send + Sync when socket2::Socket is Send + Sync; no other changes to
BatchUdpSocket or its usages are needed.

In @src/connection/batch_send.rs:
- Around line 88-123: The flush method can fail mid-loop and return before
cleaning up, causing duplicates and sequence mismatches; modify flush to track
how many packets were successfully sent (e.g., a sent_count incremented inside
the loop over self.queue) and if socket.send(...) returns an Err, remove the
first sent_count items from self.queue and the corresponding entries from
self.sequences and self.queue_times (or clear all three consistently), update
self.last_flush_time, then return the error; reference symbols: flush,
socket.send, self.queue, self.sequences, self.queue_times, last_flush_time,
packet_count.
🧹 Nitpick comments (4)
docs/PERFORMANCE_OPTIMIZATION_PLAN.md (1)

497-501: Minor version inconsistency with Cargo.toml.

The documentation shows rustc-hash = "2.0" but Cargo.toml uses version "2.1". Consider updating the documentation to match the actual dependency version for consistency.

📝 Suggested update
 # Cargo.toml
 [dependencies]
-rustc-hash = "2.0"  # Fast non-cryptographic hash
+rustc-hash = "2.1"  # Fast non-cryptographic hash
src/sender/selection/enhanced.rs (1)

42-50: Consider using #[inline] instead of #[inline(always)] for this function.

This function is ~100 lines with multiple branches and conditionals. #[inline(always)] forces inlining regardless of size, which can increase binary size and potentially hurt instruction cache performance. Regular #[inline] lets the compiler make the decision based on call site context.

♻️ Suggested change
-#[inline(always)]
+#[inline]
 pub fn select_connection(
src/test_helpers.rs (1)

19-22: Conn ID counters are fragmented across functions, risking ID collisions in tests.

NEXT_TEST_CONN_ID is defined as a static inside each function separately. On a single platform, calling both create_test_connection() and create_test_connections() will generate overlapping IDs (both starting from 1000). If any tests depend on unique conn_id values, this could cause flakiness.

♻️ Suggested fix: Hoist the counter to module level
+use std::sync::atomic::{AtomicU64, Ordering};
+
+/// Shared test connection ID counter for all helper functions
+static NEXT_TEST_CONN_ID: AtomicU64 = AtomicU64::new(1000);
+
 #[cfg(unix)]
 pub async fn create_test_connection() -> SrtlaConnection {
-    use std::sync::atomic::{AtomicU64, Ordering};
-    static NEXT_TEST_CONN_ID: AtomicU64 = AtomicU64::new(1000);
-
     use socket2::{Domain, Protocol, Socket, Type};
     // ... rest unchanged

Apply similar removal to the other three function definitions.

Also applies to: 62-63, 99-100, 148-149

src/connection/mod.rs (1)

717-778: Code duplication between Unix and non-Unix reconnect methods.

The Unix (lines 717-746) and non-Unix (lines 748-778) reconnect methods are nearly identical except for socket creation. This duplication could be reduced by extracting common reset logic.

♻️ Optional: Extract common reconnect logic
fn reset_connection_state(&mut self) {
    self.connected = false;
    self.last_received = None;
    self.window = WINDOW_DEF * WINDOW_MULT;
    self.in_flight_packets = 0;
    self.packet_log.clear();
    self.highest_acked_seq = i32::MIN;
    self.congestion.reset();
    self.rtt.reset();
    self.bitrate.reset();
    self.batch_sender.reset();
    self.reconnection.last_reconnect_attempt_ms = now_ms();
    self.reconnection.reconnect_failure_count = 0;
    self.mark_reconnect_success();
    self.reconnection.reset_startup_grace();
}

Then both reconnect variants would just do socket creation + reset_connection_state().

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0ee7682 and a35a7c1.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (19)
  • Cargo.toml
  • docs/PERFORMANCE_OPTIMIZATION_PLAN.md
  • src/connection/batch_recv.rs
  • src/connection/batch_send.rs
  • src/connection/congestion/enhanced.rs
  • src/connection/mod.rs
  • src/sender/housekeeping.rs
  • src/sender/mod.rs
  • src/sender/packet_handler.rs
  • src/sender/selection/classic.rs
  • src/sender/selection/enhanced.rs
  • src/sender/selection/mod.rs
  • src/sender/selection/quality.rs
  • src/sender/sequence.rs
  • src/sender/uplink.rs
  • src/test_helpers.rs
  • src/tests/connection_tests.rs
  • src/tests/sender_tests.rs
  • src/toggles.rs
🧰 Additional context used
📓 Path-based instructions (4)
**/*.rs

📄 CodeRabbit inference engine (AGENTS.md)

**/*.rs: All Rust code must pass cargo fmt --all -- --check
All Rust code must pass clippy with -D warnings (no warnings allowed)
Use anyhow::Result for error propagation
Group imports as std → external → crate with module-level granularity (imports_granularity = "Module", group_imports = "StdExternalCrate")
Follow naming conventions: constants SCREAMING_SNAKE_CASE
Follow naming conventions: structs PascalCase
Follow naming conventions: functions snake_case
Follow naming conventions: modules snake_case (file/module names)
Use tracing macros (debug!, info!, warn!, etc.) for logging
Use Tokio for async runtime (net, time, io, signal) and async operations
Keep code self-documenting; add doc comments for public API when necessary

Files:

  • src/sender/selection/classic.rs
  • src/sender/selection/mod.rs
  • src/toggles.rs
  • src/connection/batch_send.rs
  • src/connection/congestion/enhanced.rs
  • src/test_helpers.rs
  • src/sender/uplink.rs
  • src/sender/selection/enhanced.rs
  • src/sender/mod.rs
  • src/sender/housekeeping.rs
  • src/sender/sequence.rs
  • src/sender/selection/quality.rs
  • src/tests/connection_tests.rs
  • src/sender/packet_handler.rs
  • src/connection/batch_recv.rs
  • src/tests/sender_tests.rs
  • src/connection/mod.rs
src/**/*.rs

📄 CodeRabbit inference engine (AGENTS.md)

Expose internal fields for testing only behind #[cfg(feature = "test-internals")]

Files:

  • src/sender/selection/classic.rs
  • src/sender/selection/mod.rs
  • src/toggles.rs
  • src/connection/batch_send.rs
  • src/connection/congestion/enhanced.rs
  • src/test_helpers.rs
  • src/sender/uplink.rs
  • src/sender/selection/enhanced.rs
  • src/sender/mod.rs
  • src/sender/housekeeping.rs
  • src/sender/sequence.rs
  • src/sender/selection/quality.rs
  • src/tests/connection_tests.rs
  • src/sender/packet_handler.rs
  • src/connection/batch_recv.rs
  • src/tests/sender_tests.rs
  • src/connection/mod.rs
src/tests/**/*.rs

📄 CodeRabbit inference engine (AGENTS.md)

Place integration, end-to-end, and protocol tests under src/tests/

Files:

  • src/tests/connection_tests.rs
  • src/tests/sender_tests.rs
Cargo.toml

📄 CodeRabbit inference engine (AGENTS.md)

Cargo.toml: Set Rust edition to 2024 and target MSRV 1.87 (requires nightly toolchain)
Maintain build profiles: dev, release-debug (thin LTO), release-lto (full LTO, stripped)

Files:

  • Cargo.toml
🧠 Learnings (8)
📚 Learning: 2025-10-15T14:24:40.523Z
Learnt from: CR
Repo: irlserver/srtla_send PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-10-15T14:24:40.523Z
Learning: Applies to Cargo.toml : Set Rust edition to 2024 and target MSRV 1.87 (requires nightly toolchain)

Applied to files:

  • Cargo.toml
📚 Learning: 2025-10-15T14:24:40.523Z
Learnt from: CR
Repo: irlserver/srtla_send PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-10-15T14:24:40.523Z
Learning: Applies to **/*.rs : Use Tokio for async runtime (net, time, io, signal) and async operations

Applied to files:

  • Cargo.toml
  • src/connection/mod.rs
📚 Learning: 2025-10-15T14:24:40.523Z
Learnt from: CR
Repo: irlserver/srtla_send PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-10-15T14:24:40.523Z
Learning: Use Rust nightly toolchain for development and CI

Applied to files:

  • Cargo.toml
📚 Learning: 2025-10-15T14:24:40.523Z
Learnt from: CR
Repo: irlserver/srtla_send PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-10-15T14:24:40.523Z
Learning: Applies to rustfmt.toml : Use the project's rustfmt configuration with unstable features enabled

Applied to files:

  • Cargo.toml
📚 Learning: 2025-10-15T14:24:40.523Z
Learnt from: CR
Repo: irlserver/srtla_send PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-10-15T14:24:40.523Z
Learning: Applies to **/*.rs : All Rust code must pass clippy with -D warnings (no warnings allowed)

Applied to files:

  • Cargo.toml
📚 Learning: 2025-10-15T14:24:40.523Z
Learnt from: CR
Repo: irlserver/srtla_send PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-10-15T14:24:40.523Z
Learning: Applies to **/*.rs : Use anyhow::Result for error propagation

Applied to files:

  • src/sender/packet_handler.rs
📚 Learning: 2025-10-15T14:24:40.523Z
Learnt from: CR
Repo: irlserver/srtla_send PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-10-15T14:24:40.523Z
Learning: Applies to src/**/*.rs : Expose internal fields for testing only behind #[cfg(feature = "test-internals")]

Applied to files:

  • src/tests/sender_tests.rs
  • src/connection/mod.rs
📚 Learning: 2025-10-15T14:24:40.523Z
Learnt from: CR
Repo: irlserver/srtla_send PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-10-15T14:24:40.523Z
Learning: Applies to src/tests/**/*.rs : Place integration, end-to-end, and protocol tests under src/tests/

Applied to files:

  • src/tests/sender_tests.rs
🧬 Code graph analysis (7)
src/connection/batch_send.rs (1)
src/connection/mod.rs (1)
  • has_queued_packets (239-241)
src/connection/congestion/enhanced.rs (2)
src/connection/congestion/mod.rs (1)
  • perform_window_recovery (147-158)
src/utils.rs (1)
  • now_ms (15-20)
src/test_helpers.rs (5)
src/sender/mod.rs (1)
  • tokio (93-93)
src/utils.rs (1)
  • now_ms (15-20)
src/connection/batch_send.rs (2)
  • new (50-57)
  • default (43-45)
src/registration.rs (2)
  • new (53-71)
  • default (47-49)
src/connection/mod.rs (1)
  • default (49-54)
src/sender/selection/enhanced.rs (2)
src/connection/mod.rs (3)
  • total_nak_count (666-668)
  • time_since_last_nak_ms (662-664)
  • nak_burst_count (670-672)
src/connection/congestion/mod.rs (1)
  • time_since_last_nak_ms (161-167)
src/tests/connection_tests.rs (3)
src/utils.rs (1)
  • now_ms (15-20)
src/connection/mod.rs (3)
  • needs_keepalive (594-606)
  • nak_burst_count (670-672)
  • time_since_last_nak_ms (662-664)
src/test_helpers.rs (2)
  • create_test_connection (19-58)
  • create_test_connection (61-95)
src/sender/packet_handler.rs (2)
src/utils.rs (1)
  • now_ms (15-20)
src/sender/selection/mod.rs (1)
  • select_connection_idx (49-72)
src/tests/sender_tests.rs (4)
src/sender/selection/mod.rs (1)
  • select_connection_idx (49-72)
src/sender/sequence.rs (2)
  • new (65-70)
  • get (99-108)
src/utils.rs (1)
  • now_ms (15-20)
src/sender/selection/quality.rs (1)
  • calculate_quality_multiplier (60-62)
🪛 GitHub Check: Test (macOS)
src/connection/batch_recv.rs

[failure] 220-220:
cannot find function recvmmsg in crate libc


[failure] 187-187:
cannot find struct, variant or union type mmsghdr in crate libc


[failure] 151-151:
cannot find type mmsghdr in crate libc

🪛 GitHub Check: Test (Windows)
src/sender/housekeeping.rs

[failure] 6-6:
unresolved import tracing


[failure] 5-5:
failed to resolve: use of unresolved module or unlinked crate tokio


[failure] 4-4:
failed to resolve: use of unresolved module or unlinked crate tokio

src/connection/batch_recv.rs

[failure] 331-331:
failed to resolve: use of unresolved module or unlinked crate tokio

🪛 LanguageTool
docs/PERFORMANCE_OPTIMIZATION_PLAN.md

[uncategorized] ~54-~54: If this is a compound adjective that modifies the following noun, use a hyphen.
Context: ...- Medium impact, good ROI - Minor - Low impact but good practice - Architectural -...

(EN_COMPOUND_ADJECTIVE_INTERNAL)


[grammar] ~287-~287: Ensure spelling is correct
Context: ...ion, recalculate periodically (every 50-100ms). ```rust pub struct CachedQuality { ...

(QB_NEW_EN_ORTHOGRAPHY_ERROR_IDS_1)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test
🔇 Additional comments (47)
src/sender/selection/classic.rs (1)

22-23: LGTM!

The #[inline(always)] hint is appropriate for this hot-path selection function. The function is compact and called frequently during packet processing, making it a good candidate for forced inlining.

docs/PERFORMANCE_OPTIMIZATION_PLAN.md (1)

1-7: Well-structured optimization plan.

This documentation provides excellent guidance for the performance improvements, with clear prioritization, impact estimates, and code sketches. The phased approach and measurement plan will help track progress effectively.

src/connection/congestion/enhanced.rs (3)

85-98: Good fix for no-NAK connection recovery.

Using u64::MAX as a sentinel for connections without NAK history elegantly ensures they receive aggressive recovery. The comment clearly explains the rationale—connections shouldn't get stuck at low windows just because they've been healthy.


132-138: Verify integer division ordering for slow/minimal recovery rates.

The calculations WINDOW_INCR * fast_mode_bonus / 2 and WINDOW_INCR * fast_mode_bonus / 4 perform multiplication before division, which is correct. However, when fast_mode_bonus = 1:

  • Line 134: 30 * 1 / 2 = 15
  • Line 137: 30 * 1 / 4 = 7

The ordering is correct and avoids truncation issues. The implementation looks good.


229-264: Good test coverage for no-NAK history scenario.

The test validates that connections without NAK history receive aggressive recovery (200% rate = WINDOW_INCR * 2), preventing the stuck-window issue described in the comments.

src/toggles.rs (2)

11-18: Well-designed snapshot struct for hot-path optimization.

The ToggleSnapshot struct with Copy trait enables zero-cost passing through the packet processing pipeline. Good documentation explains the purpose.


49-60: LGTM!

The snapshot() method correctly uses Ordering::Relaxed since these toggles are independent configuration flags without cross-field synchronization requirements. The #[inline] hint is appropriate for this hot-path accessor.

src/sender/uplink.rs (2)

36-59: Clean batch receive implementation.

The batch receive pattern correctly:

  • Allocates the buffer once outside the loop
  • Processes all received packets before the next batch
  • Handles empty data defensively
  • Properly exits on channel closure

Good optimization that reduces syscall overhead on Unix platforms.


64-80: Appropriate error backoff strategy.

The 100ms sleep on receive errors prevents CPU-intensive tight loops while allowing quick recovery. The comment clearly explains the rationale for this specific duration.

src/tests/connection_tests.rs (3)

35-46: Tests correctly updated for new register_packet signature.

The tests properly capture current_time = now_ms() and pass it to register_packet. The HashMap-based packet_log verification using contains_key and get aligns with the new data structure.


356-366: Keepalive test correctly uses last_keepalive_sent.

The test properly exercises the new last_keepalive_sent field for keepalive timing logic, matching the updated needs_keepalive() implementation.


547-567: Good coverage for HashMap-based packet log behavior.

This test verifies that the HashMap grows dynamically beyond PKT_LOG_SIZE (unlike the previous fixed-size array) and that cumulative ACKs correctly remove acknowledged packets from the log.

src/sender/selection/mod.rs (1)

48-72: LGTM! Clean API evolution with appropriate inlining.

The signature change to accept &mut [SrtlaConnection] enables in-place quality cache updates in enhanced mode. The #[inline(always)] is appropriate for this thin dispatch function, and the boolean flags provide clear mode control.

src/sender/selection/enhanced.rs (1)

155-178: Good use of #[cold] and #[inline(never)] to optimize the hot path.

Moving debug logging to a separate cold function is an effective optimization. The compiler will place this code in a separate section, keeping the hot path compact.

src/connection/batch_send.rs (1)

1-11: Good documentation with clear performance expectations.

The module-level documentation clearly explains the batching strategy, references the Moblin implementation, and provides concrete performance numbers. This helps future maintainers understand the design rationale.

src/sender/housekeeping.rs (1)

137-138: Good documentation of the design change.

The comment clearly explains why periodic cleanup is no longer needed with the ring buffer approach. This helps future maintainers understand the architectural decision.

src/sender/sequence.rs (2)

76-90: Well-designed O(1) insertion with appropriate documentation.

The bitwise mask for modulo is correct since SEQ_TRACKING_SIZE is validated as power-of-two in tests. The count field tracking is appropriately documented as "approximate, for logging only" since it only tracks insertions into empty slots, not overwrites.


45-55: Good design: heap-allocated array avoids stack overflow.

Using Box<[SequenceTrackingEntry; SEQ_TRACKING_SIZE]> (~320KB) on the heap instead of a stack array is the correct choice. The #[allow(clippy::len_without_is_empty)] is appropriate since an "empty" concept doesn't apply to this fixed-size ring buffer.

src/sender/selection/quality.rs (2)

59-62: Clean API design with explicit timestamp parameter.

Accepting current_time_ms as a parameter rather than calling now_ms() internally is a good performance optimization. When processing multiple connections in a loop, the caller can obtain the timestamp once and reuse it, avoiding repeated system calls.


115-139: Well-documented RTT bonus calculation.

The inline documentation with concrete RTT examples (50ms, 100ms, 200ms, 400ms) makes the behavior clear. The two-stage clamping (line 135 caps the bonus, line 138 ensures no penalty) is defensive and correct.

src/tests/sender_tests.rs (6)

1-14: LGTM - Imports and test setup are correct.

The imports correctly bring in the new SequenceTracker via the crate::sender::* glob import, and the test helpers are properly used for connection creation.


16-28: LGTM - Classic mode test correctly uses updated API.

The test properly validates that classic mode selects the connection with highest score (lowest in-flight packets).


30-240: LGTM - Connection selection and dampening tests are comprehensive.

The tests properly exercise:

  • Quality scoring with NAK penalties (lines 30-52)
  • Burst NAK penalties (lines 54-75)
  • Time-based switch dampening within cooldown (lines 77-106)
  • Dampening bypass after cooldown (lines 108-137)
  • Immediate switch when current connection is invalid (lines 139-173)
  • Exploration blocking during cooldown (lines 175-206)
  • Classic mode ignoring dampening (lines 208-240)

242-279: LGTM - NAK attribution test validates correct connection tracking.

The test properly verifies that NAKs increment the count only for the connection that originally sent the packet.


316-354: LGTM - Sequence tracking tests validate ring buffer behavior.

The tests correctly validate:

  • Cleanup of sequence entries when connections are removed (lines 316-354)
  • Ring buffer collision behavior where newer sequences overwrite older ones at the same index (lines 396-417)

Also applies to: 396-417


461-533: LGTM - Quality multiplier test is comprehensive.

The test thoroughly validates the exponential decay formula at multiple time points and covers edge cases like no-NAK bonus (1.1x) and burst NAK penalties (0.7x additional multiplier).

src/sender/mod.rs (5)

114-122: LGTM - Batch flush timer configuration is appropriate.

The 15ms interval with MissedTickBehavior::Skip is a good choice - it allows packet batching while maintaining low latency, and skipping missed ticks prevents burst processing under load.


162-174: LGTM - Toggle snapshot caching is a good optimization.

Creating the snapshot once per select! iteration rather than per-packet reduces atomic load overhead significantly on the hot path.


434-489: LGTM - Connection changes properly clean up sequence tracker.

The function correctly:

  1. Collects connection IDs before removal
  2. Removes stale connections from the vector
  3. Cleans up corresponding sequence tracker entries
  4. Resets selection state when connections change

524-527: LGTM - Early exit optimization for logging.

The tracing::enabled! check prevents all computation when INFO logging is disabled, which is excellent for production environments where verbose logging may be turned off.


299-415: LGTM - Non-Unix loop is consistent with Unix loop.

The non-Unix implementation correctly mirrors the Unix loop's handling of packet reception, uplink events, housekeeping, and batch flushing, with SIGHUP handling appropriately omitted.

src/sender/packet_handler.rs (6)

10-15: LGTM - Imports correctly reference new types.

The imports for SequenceTracker and ToggleSnapshot are appropriate for the refactored packet handling.


70-90: LGTM - NAK handling with O(1) sequence lookup.

The NAK processing correctly:

  1. Captures time once for batch processing
  2. Uses O(1) ring buffer lookup for sequence → connection mapping
  3. Falls back to linear connection scan if entry expired (edge case handling)

The O(n) find by conn_id is acceptable since connections are typically few (3-4).


152-190: LGTM - Bounded packet queue draining.

The MAX_DRAIN_PACKETS = 64 limit effectively prevents CPU spikes from large accumulated queues while ensuring remaining packets are processed on subsequent event loop iterations.


313-324: LGTM - Batch flush on connection switch ensures ordering.

Flushing the previous connection's queued packets before switching is critical for maintaining packet ordering across connections. The error handling with warn! is appropriate.


346-350: LGTM - Sequence tracking at queue time is correct.

Tracking packets immediately when queued (not when flushed) ensures accurate NAK attribution even if NAKs arrive during the batch interval.


365-388: LGTM - Two-pass flush optimization.

The two-pass design is intentional and correct:

  1. First pass (read-only) enables early exit when no work is needed
  2. Second pass (mutable) performs actual flushes

This avoids acquiring mutable references when traffic is idle, which is the common case on the 15ms timer.

src/connection/batch_recv.rs (3)

212-236: LGTM - recvmmsg syscall implementation.

The implementation correctly:

  • Uses MSG_DONTWAIT for non-blocking operation
  • Returns std::io::Error::last_os_error() on failure
  • Stores the packet count for iterator access

295-319: LGTM - Socket address conversion is correct.

The function properly handles both IPv4 and IPv6 address families using standard pointer casts to interpret the kernel-filled sockaddr_storage.


326-454: LGTM - Non-Unix fallback provides compatible API.

The fallback implementation correctly provides the same API surface with single-packet receives, which is the expected behavior on platforms without recvmmsg.

src/connection/mod.rs (7)

1-2: LGTM - Module structure and re-exports.

The new batch modules are properly declared and their public types are re-exported for use by other parts of the codebase.

Also applies to: 15-16


35-55: LGTM - Quality multiplier caching.

Caching the quality multiplier with 50ms granularity reduces expensive exp() calculations from every packet to ~20 times per second, which is a significant optimization on the hot path.


86-97: LGTM - FxHashMap for packet log with highest_acked_seq optimization.

The combination of:

  1. FxHashMap<i32, u64> for O(1) insert/remove instead of O(256) array scan
  2. highest_acked_seq to skip redundant ACK processing

...significantly improves performance for packet tracking operations.


474-512: LGTM - Optimized cumulative ACK handling.

The implementation cleverly:

  1. Skips duplicate/out-of-order ACKs via highest_acked_seq
  2. Uses targeted removal for small ranges (O(k) where k ≤ 64)
  3. Falls back to retain() for large gaps (e.g., after reconnect)

This significantly reduces work for the common case of sequential ACKs.


410-421: LGTM - ACK fast path optimization.

Trying synchronous try_send_to before falling back to the channel avoids unnecessary task context switches for the common case where the socket is ready.


246-264: LGTM - Batch flush with packet registration.

The implementation correctly:

  1. Early-exits when no packets are queued
  2. Flushes via batch sender
  3. Registers all sent packets for in-flight tracking
  4. Updates last_sent timestamp

133-168: LGTM - Connection initialization with batching infrastructure.

The Unix connect_from_ip properly initializes:

  • BatchUdpSocket via socket2::Socket
  • FxHashMap with pre-allocated capacity
  • highest_acked_seq to i32::MIN
  • batch_sender and quality_cache

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @src/connection/batch_recv.rs:
- Line 318: Remove the unused std::sync::Arc import from the fallback_impl
module: locate the import statement referencing Arc in the top of the
fallback_impl module (the `use std::sync::Arc;` line) and delete it so the
module no longer contains the unused Arc import.
🧹 Nitpick comments (1)
src/connection/batch_recv.rs (1)

154-157: Enhance the safety documentation for unsafe impl Send.

The unsafe impl Send is necessary because libc::iovec and libc::mmsghdr contain raw pointers that don't automatically implement Send. However, the current comment could be more explicit about why this is safe.

📝 Suggested documentation improvement
-    // Safety: All fields are either Copy types or raw pointers that point
-    // to data within this struct. The struct is self-contained.
+    // Safety: This struct contains raw pointers in `iov` and `mmsghdr` fields,
+    // which don't auto-implement Send. However, these pointers are self-referential
+    // (they only point to data within `addr_storage` and `buffers` in the same struct).
+    // Since the struct is always heap-allocated (Box) and the pointers are internal,
+    // it's safe to move across thread boundaries. The struct is self-contained and
+    // moving it doesn't invalidate the internal pointer relationships.
     unsafe impl Send for RecvMmsgBuffer {}
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a35a7c1 and 0b00369.

📒 Files selected for processing (3)
  • Cargo.toml
  • src/connection/batch_recv.rs
  • src/connection/batch_send.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • Cargo.toml
🧰 Additional context used
📓 Path-based instructions (2)
**/*.rs

📄 CodeRabbit inference engine (AGENTS.md)

**/*.rs: All Rust code must pass cargo fmt --all -- --check
All Rust code must pass clippy with -D warnings (no warnings allowed)
Use anyhow::Result for error propagation
Group imports as std → external → crate with module-level granularity (imports_granularity = "Module", group_imports = "StdExternalCrate")
Follow naming conventions: constants SCREAMING_SNAKE_CASE
Follow naming conventions: structs PascalCase
Follow naming conventions: functions snake_case
Follow naming conventions: modules snake_case (file/module names)
Use tracing macros (debug!, info!, warn!, etc.) for logging
Use Tokio for async runtime (net, time, io, signal) and async operations
Keep code self-documenting; add doc comments for public API when necessary

Files:

  • src/connection/batch_send.rs
  • src/connection/batch_recv.rs
src/**/*.rs

📄 CodeRabbit inference engine (AGENTS.md)

Expose internal fields for testing only behind #[cfg(feature = "test-internals")]

Files:

  • src/connection/batch_send.rs
  • src/connection/batch_recv.rs
🧠 Learnings (1)
📚 Learning: 2025-10-15T14:24:40.523Z
Learnt from: CR
Repo: irlserver/srtla_send PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-10-15T14:24:40.523Z
Learning: Applies to **/*.rs : Use Tokio for async runtime (net, time, io, signal) and async operations

Applied to files:

  • src/connection/batch_recv.rs
🧬 Code graph analysis (2)
src/connection/batch_send.rs (1)
src/connection/mod.rs (1)
  • has_queued_packets (239-241)
src/connection/batch_recv.rs (3)
src/sender/mod.rs (1)
  • tokio (93-93)
src/connection/batch_send.rs (2)
  • new (50-57)
  • default (43-45)
src/connection/mod.rs (1)
  • default (49-54)
🪛 GitHub Actions: CI
src/connection/batch_recv.rs

[error] 318-318: unused import: std::sync::Arc

🪛 GitHub Check: Test (macOS)
src/connection/batch_recv.rs

[failure] 318-318:
unused import: std::sync::Arc

🪛 GitHub Check: Test (Windows)
src/connection/batch_recv.rs

[failure] 318-318:
unused import: std::sync::Arc

🔇 Additional comments (13)
src/connection/batch_recv.rs (7)

1-16: LGTM!

The module documentation clearly explains the batch receive optimization and fallback strategy, and the batch size constant (32) is well-chosen for balancing syscall reduction with memory usage.


42-138: LGTM!

The BatchUdpSocket implementation correctly wraps socket2::Socket in AsyncFd for async readiness polling. The poll_recv_batch, recv_batch, and send methods properly handle WouldBlock errors and follow tokio's async patterns. The unsafe transmute in try_recv (line 123) is sound for the socket2 API.


256-308: LGTM!

The iterator implementation correctly traverses the received packets, and the sockaddr_storage_to_socket_addr converter properly handles both IPv4 and IPv6 addresses with correct byte-order conversions. The unsafe code is standard FFI pattern for working with libc socket structures.


416-425: LGTM!

The Default implementation for the fallback RecvMmsgBuffer correctly initializes all fields with safe values. This addresses the issue from an earlier version that used unsafe { std::mem::zeroed() } with uninitialized pointers.


445-449: LGTM!

The conditional re-exports correctly use target_os = "linux" instead of the broader unix cfg, properly addressing the platform compatibility issue from earlier reviews where macOS builds would incorrectly reference Linux-only syscalls.


451-471: LGTM!

The tests appropriately validate buffer initialization and size constraints. The Linux-specific buffer size test ensures the batch buffer stays within reasonable memory bounds.


320-320: Tokio is unavailable on non-Unix platforms, causing Windows build failures.

According to the pipeline failures and past review comments, tokio is declared only under [target.'cfg(unix)'.dependencies] in Cargo.toml, making it unavailable on Windows. However, the non-Linux fallback (line 320) imports and uses tokio::net::UdpSocket, which causes compilation failures on Windows as confirmed by the Test (Windows) pipeline failure.

The past review comment suggested two solutions:

  1. Move tokio to the main [dependencies] section to make it available on all platforms, or
  2. Implement the Windows fallback without tokio using a platform-specific approach

Since this is a duplicate of an existing unfixed issue, please prioritize addressing this to unblock Windows builds.

⛔ Skipped due to learnings
Learnt from: CR
Repo: irlserver/srtla_send PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-10-15T14:24:40.523Z
Learning: Applies to **/*.rs : Use Tokio for async runtime (net, time, io, signal) and async operations
src/connection/batch_send.rs (6)

1-19: LGTM!

The module documentation clearly explains the batching optimization with concrete performance metrics (15x syscall reduction), and all imports are properly utilized throughout the code.


20-40: LGTM!

The constants align with the documented Moblin strategy, and the parallel vector design efficiently tracks per-packet metadata. The use of SmallVec<u8, 1500> is well-chosen to avoid heap allocations for typical MTU-sized packets.


42-82: LGTM!

The initialization and queueing methods are well-implemented. Pre-allocation in new() avoids repeated reallocations, and queue_packet efficiently uses SmallVec::from_slice_copy. The needs_time_flush logic correctly combines both emptiness and timing checks.


88-133: LGTM! Partial send failure handling is now correct.

The flush method now properly handles partial send failures (lines 97-112). The sent_count tracker ensures that if socket.send() fails mid-batch, only the successfully sent packets are removed from the queue via drain() (lines 106-108), while unsent packets remain for the next retry. This correctly addresses the issue from the previous review where partial failures could cause duplicate sends and sequence tracking inconsistencies.

The decision not to update last_flush_time on partial failure (line 126 only updates on success) is intentional and correct—it allows needs_time_flush() to trigger retry attempts soon.


135-141: LGTM!

The reset method correctly clears all internal state and resets the flush timer, which is appropriate for reconnection scenarios.


144-189: LGTM!

The tests comprehensively validate the key batching behaviors: threshold-based flushing, time-based flushing, and state reset. All test logic is correct and covers the essential functionality.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants