From b9ba02c025bcb9987f22182bd2947834a6d69d40 Mon Sep 17 00:00:00 2001 From: Per K Date: Tue, 19 Dec 2023 12:42:55 +0100 Subject: [PATCH] Prioritize audio resend before video resend and implement TTL. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds separate priorities for audio and video retranmission. Done by adding an original type to RtpPacketToSend. Add possiblity to set TTL for audio nack, video nack and video packet separately. Oldest packet for these types are dropped when a new packet of that type is pushed to the pacer, or when the pacer switch current priority type to that priority. Effect is that: -pacer queue does not grow unlimited for these types if a TTL has been set. -an old packet is not sent. Bug: webrtc:15740 Change-Id: I38718bc570aebca54eacbded69824905f3694f41 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/331823 Commit-Queue: Per Kjellander Reviewed-by: Erik Språng Cr-Commit-Position: refs/heads/main@{#41414} --- modules/pacing/BUILD.gn | 1 + modules/pacing/pacing_controller.cc | 22 +-- modules/pacing/pacing_controller.h | 44 ++++-- modules/pacing/pacing_controller_unittest.cc | 38 +++++ modules/pacing/prioritized_packet_queue.cc | 127 +++++++++++++-- modules/pacing/prioritized_packet_queue.h | 26 ++- .../prioritized_packet_queue_unittest.cc | 148 ++++++++++++++++-- modules/rtp_rtcp/source/rtp_packet_to_send.cc | 11 ++ modules/rtp_rtcp/source/rtp_packet_to_send.h | 10 +- 9 files changed, 382 insertions(+), 45 deletions(-) diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn index ea80c8c819..87498817b6 100644 --- a/modules/pacing/BUILD.gn +++ b/modules/pacing/BUILD.gn @@ -63,6 +63,7 @@ rtc_library("pacing") { ] absl_deps = [ "//third_party/abseil-cpp/absl/cleanup", + "//third_party/abseil-cpp/absl/container:inlined_vector", "//third_party/abseil-cpp/absl/memory", "//third_party/abseil-cpp/absl/strings", "//third_party/abseil-cpp/absl/types:optional", diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc index 5b81207d56..41f97a37fb 100644 --- a/modules/pacing/pacing_controller.cc +++ b/modules/pacing/pacing_controller.cc @@ -19,11 +19,11 @@ #include "absl/strings/match.h" #include "api/units/data_size.h" #include "api/units/time_delta.h" +#include "api/units/timestamp.h" #include "modules/pacing/bitrate_prober.h" -#include "modules/pacing/interval_budget.h" +#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" -#include "rtc_base/time_utils.h" #include "system_wrappers/include/clock.h" namespace webrtc { @@ -44,8 +44,6 @@ bool IsEnabled(const FieldTrialsView& field_trials, absl::string_view key) { } // namespace -const TimeDelta PacingController::kMaxExpectedQueueLength = - TimeDelta::Millis(2000); const TimeDelta PacingController::kPausedProcessInterval = kCongestedPacketInterval; const TimeDelta PacingController::kMinSleepTime = TimeDelta::Millis(1); @@ -57,11 +55,13 @@ const TimeDelta PacingController::kMaxEarlyProbeProcessing = PacingController::PacingController(Clock* clock, PacketSender* packet_sender, - const FieldTrialsView& field_trials) + const FieldTrialsView& field_trials, + Configuration configuration) : clock_(clock), packet_sender_(packet_sender), field_trials_(field_trials), drain_large_queues_( + configuration.drain_large_queues && !IsDisabled(field_trials_, "WebRTC-Pacer-DrainQueue")), send_padding_if_silent_( IsEnabled(field_trials_, "WebRTC-Pacer-PadInSilence")), @@ -71,9 +71,10 @@ PacingController::PacingController(Clock* clock, fast_retransmissions_( IsEnabled(field_trials_, "WebRTC-Pacer-FastRetransmissions")), keyframe_flushing_( + configuration.keyframe_flushing || IsEnabled(field_trials_, "WebRTC-Pacer-KeyframeFlushing")), transport_overhead_per_packet_(DataSize::Zero()), - send_burst_interval_(kDefaultBurstInterval), + send_burst_interval_(configuration.send_burst_interval), last_timestamp_(clock_->CurrentTime()), paused_(false), media_debt_(DataSize::Zero()), @@ -86,9 +87,11 @@ PacingController::PacingController(Clock* clock, last_process_time_(clock->CurrentTime()), last_send_time_(last_process_time_), seen_first_packet_(false), - packet_queue_(/*creation_time=*/last_process_time_), + packet_queue_(/*creation_time=*/last_process_time_, + configuration.prioritize_audio_retransmission, + configuration.packet_queue_ttl), congested_(false), - queue_time_limit_(kMaxExpectedQueueLength), + queue_time_limit_(configuration.queue_time_limit), account_for_audio_(false), include_overhead_(false), circuit_breaker_threshold_(1 << 16) { @@ -710,8 +713,7 @@ Timestamp PacingController::NextUnpacedSendTime() const { } if (fast_retransmissions_) { Timestamp leading_retransmission_send_time = - packet_queue_.LeadingPacketEnqueueTime( - RtpPacketMediaType::kRetransmission); + packet_queue_.LeadingPacketEnqueueTimeForRetransmission(); if (leading_retransmission_send_time.IsFinite()) { return leading_retransmission_send_time; } diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h index 04e0a820f9..fe6ee737a9 100644 --- a/modules/pacing/pacing_controller.h +++ b/modules/pacing/pacing_controller.h @@ -67,11 +67,6 @@ class PacingController { } }; - // Expected max pacer delay. If ExpectedQueueTime() is higher than - // this value, the packet producers should wait (eg drop frames rather than - // encoding them). Bitrate sent may temporarily exceed target set by - // UpdateBitrate() so that this limit will be upheld. - static const TimeDelta kMaxExpectedQueueLength; // If no media or paused, wake up at least every `kPausedProcessIntervalMs` in // order to send a keep-alive packet so we don't get stuck in a bad state due // to lack of feedback. @@ -93,14 +88,45 @@ class PacingController { // the send burst interval. // Ex: max send burst interval = 63Kb / 10Mbit/s = 50ms. static constexpr DataSize kMaxBurstSize = DataSize::Bytes(63 * 1000); - // The pacer is allowed to send enqued packets in bursts and can build up a - // packet "debt" that correspond to approximately the send rate during - // the burst interval. + + // Configuration default values. static constexpr TimeDelta kDefaultBurstInterval = TimeDelta::Millis(40); + static constexpr TimeDelta kMaxExpectedQueueLength = TimeDelta::Millis(2000); + + struct Configuration { + // If the pacer queue grows longer than the configured max queue limit, + // pacer sends at the minimum rate needed to keep the max queue limit and + // ignore the current bandwidth estimate. + bool drain_large_queues = true; + // Expected max pacer delay. If ExpectedQueueTime() is higher than + // this value, the packet producers should wait (eg drop frames rather than + // encoding them). Bitrate sent may temporarily exceed target set by + // SetPacingRates() so that this limit will be upheld if + // `drain_large_queues` is set. + TimeDelta queue_time_limit = kMaxExpectedQueueLength; + // If the first packet of a keyframe is enqueued on a RTP stream, pacer + // skips forward to that packet and drops other enqueued packets on that + // stream, unless a keyframe is already being paced. + bool keyframe_flushing = false; + // Audio retransmission is prioritized before video retransmission packets. + bool prioritize_audio_retransmission = false; + // Configure separate timeouts per priority. After a timeout, a packet of + // that sort will not be paced and instead dropped. + // Note: to set TTL on audio retransmission, + // `prioritize_audio_retransmission` must be true. + PacketQueueTTL packet_queue_ttl; + // The pacer is allowed to send enqueued packets in bursts and can build up + // a packet "debt" that correspond to approximately the send rate during the + // burst interval. + TimeDelta send_burst_interval = kDefaultBurstInterval; + }; + + static Configuration DefaultConfiguration() { return Configuration{}; } PacingController(Clock* clock, PacketSender* packet_sender, - const FieldTrialsView& field_trials); + const FieldTrialsView& field_trials, + Configuration configuration = DefaultConfiguration()); ~PacingController(); diff --git a/modules/pacing/pacing_controller_unittest.cc b/modules/pacing/pacing_controller_unittest.cc index 9e6ede6dc0..2c3a71b369 100644 --- a/modules/pacing/pacing_controller_unittest.cc +++ b/modules/pacing/pacing_controller_unittest.cc @@ -2348,5 +2348,43 @@ TEST_F(PacingControllerTest, FlushesPacketsOnKeyFrames) { pacer->ProcessPackets(); } +TEST_F(PacingControllerTest, CanControlQueueSizeUsingTtl) { + const uint32_t kSsrc = 12345; + const uint32_t kAudioSsrc = 2345; + uint16_t sequence_number = 1234; + + PacingController::Configuration config; + config.drain_large_queues = false; + config.packet_queue_ttl.video = TimeDelta::Millis(500); + auto pacer = + std::make_unique(&clock_, &callback_, trials_, config); + pacer->SetPacingRates(DataRate::BitsPerSec(100'000), DataRate::Zero()); + + Timestamp send_time = Timestamp::Zero(); + for (int i = 0; i < 100; ++i) { + // Enqueue a new audio and video frame every 33ms. + if (clock_.CurrentTime() - send_time > TimeDelta::Millis(33)) { + for (int j = 0; j < 3; ++j) { + auto packet = BuildPacket(RtpPacketMediaType::kVideo, kSsrc, + /*sequence_number=*/++sequence_number, + /*capture_time_ms=*/2, + /*size_bytes=*/1000); + pacer->EnqueuePacket(std::move(packet)); + } + auto packet = BuildPacket(RtpPacketMediaType::kAudio, kAudioSsrc, + /*sequence_number=*/++sequence_number, + /*capture_time_ms=*/2, + /*size_bytes=*/100); + pacer->EnqueuePacket(std::move(packet)); + send_time = clock_.CurrentTime(); + } + + EXPECT_LE(clock_.CurrentTime() - pacer->OldestPacketEnqueueTime(), + TimeDelta::Millis(500)); + clock_.AdvanceTime(pacer->NextSendTime() - clock_.CurrentTime()); + pacer->ProcessPackets(); + } +} + } // namespace } // namespace webrtc diff --git a/modules/pacing/prioritized_packet_queue.cc b/modules/pacing/prioritized_packet_queue.cc index ea211ea683..15964848c2 100644 --- a/modules/pacing/prioritized_packet_queue.cc +++ b/modules/pacing/prioritized_packet_queue.cc @@ -10,41 +10,70 @@ #include "modules/pacing/prioritized_packet_queue.h" +#include +#include #include +#include "absl/container/inlined_vector.h" +#include "absl/types/optional.h" +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" +#include "modules/rtp_rtcp/source/rtp_packet_to_send.h" #include "rtc_base/checks.h" +#include "rtc_base/logging.h" namespace webrtc { namespace { constexpr int kAudioPrioLevel = 0; -int GetPriorityForType(RtpPacketMediaType type) { +int GetPriorityForType( + RtpPacketMediaType type, + absl::optional original_type) { // Lower number takes priority over higher. switch (type) { case RtpPacketMediaType::kAudio: // Audio is always prioritized over other packet types. return kAudioPrioLevel; case RtpPacketMediaType::kRetransmission: - // Send retransmissions before new media. + // Send retransmissions before new media. If original_type is set, audio + // retransmission is prioritized more than video retransmission. + if (original_type == RtpPacketToSend::OriginalType::kVideo) { + return kAudioPrioLevel + 2; + } return kAudioPrioLevel + 1; case RtpPacketMediaType::kVideo: case RtpPacketMediaType::kForwardErrorCorrection: // Video has "normal" priority, in the old speak. // Send redundancy concurrently to video. If it is delayed it might have a // lower chance of being useful. - return kAudioPrioLevel + 2; + return kAudioPrioLevel + 3; case RtpPacketMediaType::kPadding: // Packets that are in themselves likely useless, only sent to keep the // BWE high. - return kAudioPrioLevel + 3; + return kAudioPrioLevel + 4; } RTC_CHECK_NOTREACHED(); } } // namespace +absl::InlinedVector +PrioritizedPacketQueue::ToTtlPerPrio(PacketQueueTTL packet_queue_ttl) { + absl::InlinedVector + ttl_per_prio(kNumPriorityLevels, TimeDelta::PlusInfinity()); + ttl_per_prio[GetPriorityForType(RtpPacketMediaType::kRetransmission, + RtpPacketToSend::OriginalType::kAudio)] = + packet_queue_ttl.audio_retransmission; + ttl_per_prio[GetPriorityForType(RtpPacketMediaType::kRetransmission, + RtpPacketToSend::OriginalType::kVideo)] = + packet_queue_ttl.video_retransmission; + ttl_per_prio[GetPriorityForType(RtpPacketMediaType::kVideo, absl::nullopt)] = + packet_queue_ttl.video; + return ttl_per_prio; +} + DataSize PrioritizedPacketQueue::QueuedPacket::PacketSize() const { return DataSize::Bytes(packet->payload_size() + packet->padding_size()); } @@ -109,8 +138,13 @@ PrioritizedPacketQueue::StreamQueue::DequeueAll() { return packets_by_prio; } -PrioritizedPacketQueue::PrioritizedPacketQueue(Timestamp creation_time) - : queue_time_sum_(TimeDelta::Zero()), +PrioritizedPacketQueue::PrioritizedPacketQueue( + Timestamp creation_time, + bool prioritize_audio_retransmission, + PacketQueueTTL packet_queue_ttl) + : prioritize_audio_retransmission_(prioritize_audio_retransmission), + time_to_live_per_prio_(ToTtlPerPrio(packet_queue_ttl)), + queue_time_sum_(TimeDelta::Zero()), pause_time_sum_(TimeDelta::Zero()), size_packets_(0), size_packets_per_media_type_({}), @@ -133,7 +167,11 @@ void PrioritizedPacketQueue::Push(Timestamp enqueue_time, enqueue_times_.insert(enqueue_times_.end(), enqueue_time); RTC_DCHECK(packet->packet_type().has_value()); RtpPacketMediaType packet_type = packet->packet_type().value(); - int prio_level = GetPriorityForType(packet_type); + int prio_level = + GetPriorityForType(packet_type, prioritize_audio_retransmission_ + ? packet->original_packet_type() + : absl::nullopt); + PurgeOldPacketsAtPriorityLevel(prio_level, enqueue_time); RTC_DCHECK_GE(prio_level, 0); RTC_DCHECK_LT(prio_level, kNumPriorityLevels); QueuedPacket queued_packed = {.packet = std::move(packet), @@ -214,7 +252,8 @@ PrioritizedPacketQueue::SizeInPacketsPerRtpPacketMediaType() const { Timestamp PrioritizedPacketQueue::LeadingPacketEnqueueTime( RtpPacketMediaType type) const { - const int priority_level = GetPriorityForType(type); + RTC_DCHECK(type != RtpPacketMediaType::kRetransmission); + const int priority_level = GetPriorityForType(type, absl::nullopt); if (streams_by_prio_[priority_level].empty()) { return Timestamp::MinusInfinity(); } @@ -222,6 +261,39 @@ Timestamp PrioritizedPacketQueue::LeadingPacketEnqueueTime( priority_level); } +Timestamp PrioritizedPacketQueue::LeadingPacketEnqueueTimeForRetransmission() + const { + if (!prioritize_audio_retransmission_) { + const int priority_level = + GetPriorityForType(RtpPacketMediaType::kRetransmission, absl::nullopt); + if (streams_by_prio_[priority_level].empty()) { + return Timestamp::PlusInfinity(); + } + return streams_by_prio_[priority_level].front()->LeadingPacketEnqueueTime( + priority_level); + } + const int audio_priority_level = + GetPriorityForType(RtpPacketMediaType::kRetransmission, + RtpPacketToSend::OriginalType::kAudio); + const int video_priority_level = + GetPriorityForType(RtpPacketMediaType::kRetransmission, + RtpPacketToSend::OriginalType::kVideo); + + Timestamp next_audio = + streams_by_prio_[audio_priority_level].empty() + ? Timestamp::PlusInfinity() + : streams_by_prio_[audio_priority_level] + .front() + ->LeadingPacketEnqueueTime(audio_priority_level); + Timestamp next_video = + streams_by_prio_[video_priority_level].empty() + ? Timestamp::PlusInfinity() + : streams_by_prio_[video_priority_level] + .front() + ->LeadingPacketEnqueueTime(video_priority_level); + return std::min(next_audio, next_video); +} + Timestamp PrioritizedPacketQueue::OldestEnqueueTime() const { return enqueue_times_.empty() ? Timestamp::MinusInfinity() : enqueue_times_.front(); @@ -283,9 +355,6 @@ void PrioritizedPacketQueue::RemovePacketsForSsrc(uint32_t ssrc) { // Update the global top prio level if neccessary. RTC_DCHECK(streams_by_prio_[i].front() == &queue); streams_by_prio_[i].pop_front(); - if (i == top_active_prio_level_) { - MaybeUpdateTopPrioLevel(); - } } else { // More than stream had packets at this prio level, filter this one out. std::deque filtered_queue; @@ -298,6 +367,7 @@ void PrioritizedPacketQueue::RemovePacketsForSsrc(uint32_t ssrc) { } } } + MaybeUpdateTopPrioLevel(); } bool PrioritizedPacketQueue::HasKeyframePackets(uint32_t ssrc) const { @@ -340,13 +410,15 @@ void PrioritizedPacketQueue::DequeuePacketInternal(QueuedPacket& packet) { } void PrioritizedPacketQueue::MaybeUpdateTopPrioLevel() { - if (streams_by_prio_[top_active_prio_level_].empty()) { + if (top_active_prio_level_ == -1 || + streams_by_prio_[top_active_prio_level_].empty()) { // No stream queues have packets at this prio level, find top priority // that is not empty. if (size_packets_ == 0) { top_active_prio_level_ = -1; } else { for (int i = 0; i < kNumPriorityLevels; ++i) { + PurgeOldPacketsAtPriorityLevel(i, last_update_time_); if (!streams_by_prio_[i].empty()) { top_active_prio_level_ = i; break; @@ -356,4 +428,35 @@ void PrioritizedPacketQueue::MaybeUpdateTopPrioLevel() { } } +void PrioritizedPacketQueue::PurgeOldPacketsAtPriorityLevel(int prio_level, + Timestamp now) { + RTC_DCHECK(prio_level >= 0 && prio_level < kNumPriorityLevels); + TimeDelta time_to_live = time_to_live_per_prio_[prio_level]; + if (time_to_live.IsInfinite()) { + return; + } + + std::deque& queues = streams_by_prio_[prio_level]; + auto iter = queues.begin(); + while (iter != queues.end()) { + StreamQueue* queue_ptr = *iter; + while (queue_ptr->HasPacketsAtPrio(prio_level) && + (now - queue_ptr->LeadingPacketEnqueueTime(prio_level)) > + time_to_live) { + QueuedPacket packet = queue_ptr->DequeuePacket(prio_level); + RTC_LOG(LS_INFO) << "Dropping old packet on SSRC: " + << packet.packet->Ssrc() + << " seq:" << packet.packet->SequenceNumber() + << " time in queue:" << (now - packet.enqueue_time).ms() + << " ms"; + DequeuePacketInternal(packet); + } + if (!queue_ptr->HasPacketsAtPrio(prio_level)) { + iter = queues.erase(iter); + } else { + ++iter; + } + } +} + } // namespace webrtc diff --git a/modules/pacing/prioritized_packet_queue.h b/modules/pacing/prioritized_packet_queue.h index 935c530027..179ef104fe 100644 --- a/modules/pacing/prioritized_packet_queue.h +++ b/modules/pacing/prioritized_packet_queue.h @@ -18,8 +18,8 @@ #include #include #include -#include +#include "absl/container/inlined_vector.h" #include "api/units/data_size.h" #include "api/units/time_delta.h" #include "api/units/timestamp.h" @@ -27,9 +27,19 @@ namespace webrtc { +// Describes how long time a packet may stay in the queue before being dropped. +struct PacketQueueTTL { + TimeDelta audio_retransmission = TimeDelta::PlusInfinity(); + TimeDelta video_retransmission = TimeDelta::PlusInfinity(); + TimeDelta video = TimeDelta::PlusInfinity(); +}; + class PrioritizedPacketQueue { public: - explicit PrioritizedPacketQueue(Timestamp creation_time); + explicit PrioritizedPacketQueue( + Timestamp creation_time, + bool prioritize_audio_retransmission = false, + PacketQueueTTL packet_queue_ttl = PacketQueueTTL()); PrioritizedPacketQueue(const PrioritizedPacketQueue&) = delete; PrioritizedPacketQueue& operator=(const PrioritizedPacketQueue&) = delete; @@ -63,6 +73,7 @@ class PrioritizedPacketQueue { // method, for the given packet type. If queue has no packets, of that type, // returns Timestamp::MinusInfinity(). Timestamp LeadingPacketEnqueueTime(RtpPacketMediaType type) const; + Timestamp LeadingPacketEnqueueTimeForRetransmission() const; // Enqueue time of the oldest packet in the queue, // Timestamp::MinusInfinity() if queue is empty. @@ -90,7 +101,7 @@ class PrioritizedPacketQueue { bool HasKeyframePackets(uint32_t ssrc) const; private: - static constexpr int kNumPriorityLevels = 4; + static constexpr int kNumPriorityLevels = 5; class QueuedPacket { public: @@ -139,6 +150,15 @@ class PrioritizedPacketQueue { // if so move it to the lowest non-empty index. void MaybeUpdateTopPrioLevel(); + void PurgeOldPacketsAtPriorityLevel(int prio_level, Timestamp now); + + static absl::InlinedVector ToTtlPerPrio( + PacketQueueTTL); + + const bool prioritize_audio_retransmission_; + const absl::InlinedVector + time_to_live_per_prio_; + // Cumulative sum, over all packets, of time spent in the queue. TimeDelta queue_time_sum_; // Cumulative sum of time the queue has spent in a paused state. diff --git a/modules/pacing/prioritized_packet_queue_unittest.cc b/modules/pacing/prioritized_packet_queue_unittest.cc index 9ed19642c7..f0c5f0eb1c 100644 --- a/modules/pacing/prioritized_packet_queue_unittest.cc +++ b/modules/pacing/prioritized_packet_queue_unittest.cc @@ -10,6 +10,7 @@ #include "modules/pacing/prioritized_packet_queue.h" +#include #include #include "api/units/time_delta.h" @@ -26,18 +27,39 @@ constexpr uint32_t kDefaultSsrc = 123; constexpr int kDefaultPayloadSize = 789; std::unique_ptr CreatePacket(RtpPacketMediaType type, - uint16_t sequence_number, + uint16_t seq, uint32_t ssrc = kDefaultSsrc, bool is_key_frame = false) { auto packet = std::make_unique(/*extensions=*/nullptr); packet->set_packet_type(type); packet->SetSsrc(ssrc); - packet->SetSequenceNumber(sequence_number); + packet->SetSequenceNumber(seq); packet->SetPayloadSize(kDefaultPayloadSize); packet->set_is_key_frame(is_key_frame); return packet; } +std::unique_ptr CreateRetransmissionPacket( + RtpPacketMediaType original_type, + uint16_t seq, + uint32_t ssrc = kDefaultSsrc) { + auto packet = std::make_unique(/*extensions=*/nullptr); + packet->set_packet_type(original_type); + packet->set_packet_type(RtpPacketMediaType::kRetransmission); + RTC_DCHECK(packet->packet_type() == RtpPacketMediaType::kRetransmission); + if (original_type == RtpPacketMediaType::kVideo) { + RTC_DCHECK(packet->original_packet_type() == + RtpPacketToSend::OriginalType::kVideo); + } else { + RTC_DCHECK(packet->original_packet_type() == + RtpPacketToSend::OriginalType::kAudio); + } + packet->SetSsrc(ssrc); + packet->SetSequenceNumber(seq); + packet->SetPayloadSize(kDefaultPayloadSize); + return packet; +} + } // namespace TEST(PrioritizedPacketQueue, ReturnsPacketsInPrioritizedOrder) { @@ -49,18 +71,42 @@ TEST(PrioritizedPacketQueue, ReturnsPacketsInPrioritizedOrder) { queue.Push(now, CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/2)); queue.Push(now, CreatePacket(RtpPacketMediaType::kForwardErrorCorrection, /*seq=*/3)); - queue.Push(now, CreatePacket(RtpPacketMediaType::kRetransmission, /*seq=*/4)); - queue.Push(now, CreatePacket(RtpPacketMediaType::kAudio, /*seq=*/5)); + queue.Push(now, + CreateRetransmissionPacket(RtpPacketMediaType::kVideo, /*seq=*/4)); + queue.Push(now, + CreateRetransmissionPacket(RtpPacketMediaType::kAudio, /*seq=*/5)); + queue.Push(now, CreatePacket(RtpPacketMediaType::kAudio, /*seq=*/6)); // Packets should be returned in high to low order. - EXPECT_EQ(queue.Pop()->SequenceNumber(), 5); + EXPECT_EQ(queue.Pop()->SequenceNumber(), 6); + // Audio and video retransmission has same prio, but video was enqueued first. EXPECT_EQ(queue.Pop()->SequenceNumber(), 4); + EXPECT_EQ(queue.Pop()->SequenceNumber(), 5); // Video and FEC prioritized equally - but video was enqueued first. EXPECT_EQ(queue.Pop()->SequenceNumber(), 2); EXPECT_EQ(queue.Pop()->SequenceNumber(), 3); EXPECT_EQ(queue.Pop()->SequenceNumber(), 1); } +TEST(PrioritizedPacketQueue, + PrioritizeAudioRetransmissionBeforeVideoRetransmissionIfConfigured) { + Timestamp now = Timestamp::Zero(); + PrioritizedPacketQueue queue(now, /*prioritize_audio_retransmission=*/true); + + // Add packets in low to high packet order. + queue.Push(now, CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/3)); + queue.Push(now, + CreateRetransmissionPacket(RtpPacketMediaType::kVideo, /*seq=*/4)); + queue.Push(now, + CreateRetransmissionPacket(RtpPacketMediaType::kAudio, /*seq=*/5)); + queue.Push(now, CreatePacket(RtpPacketMediaType::kAudio, /*seq=*/6)); + + // Packets should be returned in high to low order. + EXPECT_EQ(queue.Pop()->SequenceNumber(), 6); + EXPECT_EQ(queue.Pop()->SequenceNumber(), 5); + EXPECT_EQ(queue.Pop()->SequenceNumber(), 4); +} + TEST(PrioritizedPacketQueue, ReturnsEqualPrioPacketsInRoundRobinOrder) { Timestamp now = Timestamp::Zero(); PrioritizedPacketQueue queue(now); @@ -251,6 +297,26 @@ TEST(PrioritizedPacketQueue, ReportsLeadingPacketEnqueueTime) { Timestamp::MinusInfinity()); } +TEST(PrioritizedPacketQueue, ReportsLeadingPacketEnqueueTimeForRetransmission) { + PrioritizedPacketQueue queue(/*creation_time=*/Timestamp::Zero(), + /*prioritize_audio_retransmission=*/true); + EXPECT_EQ(queue.LeadingPacketEnqueueTimeForRetransmission(), + Timestamp::PlusInfinity()); + + queue.Push(Timestamp::Millis(10), + CreateRetransmissionPacket(RtpPacketMediaType::kVideo, /*seq=*/1)); + queue.Push(Timestamp::Millis(11), + CreateRetransmissionPacket(RtpPacketMediaType::kAudio, /*seq=*/2)); + EXPECT_EQ(queue.LeadingPacketEnqueueTimeForRetransmission(), + Timestamp::Millis(10)); + queue.Pop(); // Pop audio retransmission since it has higher prio. + EXPECT_EQ(queue.LeadingPacketEnqueueTimeForRetransmission(), + Timestamp::Millis(10)); + queue.Pop(); // Pop video retransmission. + EXPECT_EQ(queue.LeadingPacketEnqueueTimeForRetransmission(), + Timestamp::PlusInfinity()); +} + TEST(PrioritizedPacketQueue, PushAndPopUpdatesSizeInPacketsPerRtpPacketMediaType) { Timestamp now = Timestamp::Zero(); @@ -272,7 +338,7 @@ TEST(PrioritizedPacketQueue, RtpPacketMediaType::kVideo)], 1); - queue.Push(now, CreatePacket(RtpPacketMediaType::kRetransmission, 3)); + queue.Push(now, CreateRetransmissionPacket(RtpPacketMediaType::kVideo, 3)); EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast( RtpPacketMediaType::kRetransmission)], 1); @@ -326,6 +392,8 @@ TEST(PrioritizedPacketQueue, ClearsPackets) { // Remove all of them. queue.RemovePacketsForSsrc(kSsrc); EXPECT_TRUE(queue.Empty()); + queue.RemovePacketsForSsrc(kSsrc); + EXPECT_TRUE(queue.Empty()); } TEST(PrioritizedPacketQueue, ClearPacketsAffectsOnlySpecifiedSsrc) { @@ -338,16 +406,16 @@ TEST(PrioritizedPacketQueue, ClearPacketsAffectsOnlySpecifiedSsrc) { // ensuring they are first in line. queue.Push( now, CreatePacket(RtpPacketMediaType::kAudio, /*seq=*/1, kRemovingSsrc)); - queue.Push(now, CreatePacket(RtpPacketMediaType::kRetransmission, /*seq=*/2, - kRemovingSsrc)); + queue.Push(now, CreateRetransmissionPacket(RtpPacketMediaType::kVideo, + /*seq=*/2, kRemovingSsrc)); // Add a video packet and a retransmission for the SSRC that will remain. // The retransmission packets now both have pointers to their respective qeues // from the same prio level. queue.Push(now, CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/3, kStayingSsrc)); - queue.Push(now, CreatePacket(RtpPacketMediaType::kRetransmission, /*seq=*/4, - kStayingSsrc)); + queue.Push(now, CreateRetransmissionPacket(RtpPacketMediaType::kVideo, + /*seq=*/4, kStayingSsrc)); EXPECT_EQ(queue.SizeInPackets(), 4); @@ -413,4 +481,64 @@ TEST(PrioritizedPacketQueue, ReportsKeyframePackets) { EXPECT_FALSE(queue.HasKeyframePackets(kVideoSsrc2)); } +TEST(PrioritizedPacketQueue, PacketsDroppedIfNotPulledWithinTttl) { + Timestamp now = Timestamp::Zero(); + PacketQueueTTL ttls; + ttls.audio_retransmission = TimeDelta::Millis(200); + PrioritizedPacketQueue queue(now, /*prioritize_audio_retransmission=*/true, + ttls); + + queue.Push(now, + CreateRetransmissionPacket(RtpPacketMediaType::kAudio, /*seq=*/1)); + now += ttls.audio_retransmission + TimeDelta::Millis(1); + EXPECT_EQ(queue.SizeInPackets(), 1); + queue.Push(now, + CreateRetransmissionPacket(RtpPacketMediaType::kAudio, /*seq=*/2)); + EXPECT_EQ(queue.SizeInPackets(), 1); + EXPECT_EQ(queue.Pop()->SequenceNumber(), 2); +} + +TEST(PrioritizedPacketQueue, DontSendPacketsAfterTttl) { + Timestamp now = Timestamp::Zero(); + PacketQueueTTL ttls; + ttls.audio_retransmission = TimeDelta::Millis(200); + PrioritizedPacketQueue queue(now, /*prioritize_audio_retransmission=*/true, + ttls); + + queue.Push(now, + CreateRetransmissionPacket(RtpPacketMediaType::kAudio, /*seq=*/1)); + now += ttls.audio_retransmission + TimeDelta::Millis(1); + EXPECT_EQ(queue.SizeInPackets(), 1); + queue.Push(now, CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/2)); + queue.Push(now, CreatePacket(RtpPacketMediaType::kAudio, /*seq=*/3)); + // Expect the old packet to have been removed since it was not popped in time. + EXPECT_EQ(queue.SizeInPackets(), 3); + EXPECT_EQ(queue.Pop()->SequenceNumber(), 3); + EXPECT_EQ(queue.SizeInPackets(), 1); + EXPECT_EQ(queue.Pop()->SequenceNumber(), 2); + EXPECT_EQ(queue.SizeInPackets(), 0); +} + +TEST(PrioritizedPacketQueue, + SendsPacketsAfterTttlIfPrioHigherThanPushedPackets) { + Timestamp now = Timestamp::Zero(); + PacketQueueTTL ttls; + ttls.audio_retransmission = TimeDelta::Millis(200); + PrioritizedPacketQueue queue(now, /*prioritize_audio_retransmission=*/true, + ttls); + + queue.Push(now, + CreateRetransmissionPacket(RtpPacketMediaType::kAudio, /*seq=*/1)); + now += ttls.audio_retransmission + TimeDelta::Millis(1); + EXPECT_EQ(queue.SizeInPackets(), 1); + queue.Push(now, CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/2)); + + // This test just show that TTL is not enforced strictly. If a new audio + // packet had been queued before a packet was popped, the audio retransmission + // packet would have been dropped. + EXPECT_EQ(queue.SizeInPackets(), 2); + EXPECT_EQ(queue.Pop()->SequenceNumber(), 1); + EXPECT_EQ(queue.SizeInPackets(), 1); +} + } // namespace webrtc diff --git a/modules/rtp_rtcp/source/rtp_packet_to_send.cc b/modules/rtp_rtcp/source/rtp_packet_to_send.cc index b55e74aaf0..691a243c5f 100644 --- a/modules/rtp_rtcp/source/rtp_packet_to_send.cc +++ b/modules/rtp_rtcp/source/rtp_packet_to_send.cc @@ -12,6 +12,8 @@ #include +#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" + namespace webrtc { RtpPacketToSend::RtpPacketToSend(const ExtensionManager* extensions) @@ -28,4 +30,13 @@ RtpPacketToSend& RtpPacketToSend::operator=(RtpPacketToSend&& packet) = default; RtpPacketToSend::~RtpPacketToSend() = default; +void RtpPacketToSend::set_packet_type(RtpPacketMediaType type) { + if (packet_type_ == RtpPacketMediaType::kAudio) { + original_packet_type_ = OriginalType::kAudio; + } else if (packet_type_ == RtpPacketMediaType::kVideo) { + original_packet_type_ = OriginalType::kVideo; + } + packet_type_ = type; +} + } // namespace webrtc diff --git a/modules/rtp_rtcp/source/rtp_packet_to_send.h b/modules/rtp_rtcp/source/rtp_packet_to_send.h index 438ca354ed..64f9ee1ab1 100644 --- a/modules/rtp_rtcp/source/rtp_packet_to_send.h +++ b/modules/rtp_rtcp/source/rtp_packet_to_send.h @@ -49,11 +49,18 @@ class RtpPacketToSend : public RtpPacket { webrtc::Timestamp capture_time() const { return capture_time_; } void set_capture_time(webrtc::Timestamp time) { capture_time_ = time; } - void set_packet_type(RtpPacketMediaType type) { packet_type_ = type; } + void set_packet_type(RtpPacketMediaType type); + absl::optional packet_type() const { return packet_type_; } + enum class OriginalType { kAudio, kVideo }; + // Original type does not change if packet type is changed to kRetransmission. + absl::optional original_packet_type() const { + return original_packet_type_; + } + // If this is a retransmission, indicates the sequence number of the original // media packet that this packet represents. If RTX is used this will likely // be different from SequenceNumber(). @@ -133,6 +140,7 @@ class RtpPacketToSend : public RtpPacket { private: webrtc::Timestamp capture_time_ = webrtc::Timestamp::Zero(); absl::optional packet_type_; + absl::optional original_packet_type_; bool allow_retransmission_ = false; absl::optional retransmitted_sequence_number_; rtc::scoped_refptr additional_data_;