webrtc_m130/modules/pacing/task_queue_paced_sender.cc

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

321 lines
11 KiB
C++
Raw Normal View History

/*
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/pacing/task_queue_paced_sender.h"
#include <algorithm>
#include <utility>
#include "absl/memory/memory.h"
#include "rtc_base/checks.h"
#include "rtc_base/event.h"
#include "rtc_base/logging.h"
#include "rtc_base/task_utils/to_queued_task.h"
Reland "Add trace of enqueued and sent RTP packets" This reverts commit 45bb717a2866c2d836b5332a24af0d09f2b30714. Reason for revert: Use #if RTC_TRACE_EVENTS_ENABLED to avoid unused variable. Original change's description: > Revert "Add trace of enqueued and sent RTP packets" > > This reverts commit 45b9192ad981dcdc12ad4aef087fff2195bd030c. > > Reason for revert: When tracing is disabled, this results in a clang warning (unused variable), which results in a build error since Werror is enabled by default. > > Original change's description: > > Add trace of enqueued and sent RTP packets > > > > This is useful in debugging the latency from a packet > > is enqueued until it's sent. > > > > Bug: webrtc:11617 > > Change-Id: Ic2f194334a2e178de221df3a0838481035bb3505 > > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/176231 > > Reviewed-by: Erik Språng <sprang@webrtc.org> > > Commit-Queue: Johannes Kron <kron@webrtc.org> > > Cr-Commit-Position: refs/heads/master@{#31381} > > TBR=sprang@webrtc.org,kron@webrtc.org > > # Not skipping CQ checks because original CL landed > 1 day ago. > > Bug: webrtc:11617 > Change-Id: I854c17e587c624691a0e5e3ec9fd38c2607eda84 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/176380 > Commit-Queue: Casey Fischer <caseyfischer@google.com> > Reviewed-by: Adam Nathan <adamnathan@google.com> > Cr-Commit-Position: refs/heads/master@{#31399} TBR=sprang@webrtc.org,yujo@chromium.org,adamnathan@google.com,kron@webrtc.org,caseyfischer@google.com # Not skipping CQ checks because this is a reland. Bug: webrtc:11617 Change-Id: I9de7f7ed290481a51c161a693f5b2d5df7d2eae3 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/176367 Commit-Queue: Johannes Kron <kron@webrtc.org> Reviewed-by: Erik Språng <sprang@webrtc.org> Reviewed-by: Johannes Kron <kron@webrtc.org> Cr-Commit-Position: refs/heads/master@{#31407}
2020-06-01 23:28:44 +00:00
#include "rtc_base/trace_event.h"
namespace webrtc {
namespace {
// If no calls to MaybeProcessPackets() happen, make sure we update stats
// at least every |kMaxTimeBetweenStatsUpdates| as long as the pacer isn't
// completely drained.
Use newer version of TimeDelta and TimeStamp factories in modules/ This change generated with following commands: find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/TimeDelta::Micros<\(.*\)>()/TimeDelta::Micros(\1)/g" find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/TimeDelta::Millis<\(.*\)>()/TimeDelta::Millis(\1)/g" find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/TimeDelta::Seconds<\(.*\)>()/TimeDelta::Seconds(\1)/g" find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/TimeDelta::us/TimeDelta::Micros/g" find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/TimeDelta::ms/TimeDelta::Millis/g" find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/TimeDelta::seconds/TimeDelta::Seconds/g" find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/Timestamp::Micros<\(.*\)>()/Timestamp::Micros(\1)/g" find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/Timestamp::Millis<\(.*\)>()/Timestamp::Millis(\1)/g" find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/Timestamp::Seconds<\(.*\)>()/Timestamp::Seconds(\1)/g" find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/Timestamp::us/Timestamp::Micros/g" find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/Timestamp::ms/Timestamp::Millis/g" find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/Timestamp::seconds/Timestamp::Seconds/g" git cl format Bug: None Change-Id: I117d64a54950be040d996035c54bc0043310943a Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/168340 Reviewed-by: Karl Wiberg <kwiberg@webrtc.org> Commit-Queue: Danil Chapovalov <danilchap@webrtc.org> Cr-Commit-Position: refs/heads/master@{#30489}
2020-02-07 14:53:52 +01:00
constexpr TimeDelta kMaxTimeBetweenStatsUpdates = TimeDelta::Millis(33);
// Don't call UpdateStats() more than |kMinTimeBetweenStatsUpdates| apart,
// for performance reasons.
Use newer version of TimeDelta and TimeStamp factories in modules/ This change generated with following commands: find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/TimeDelta::Micros<\(.*\)>()/TimeDelta::Micros(\1)/g" find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/TimeDelta::Millis<\(.*\)>()/TimeDelta::Millis(\1)/g" find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/TimeDelta::Seconds<\(.*\)>()/TimeDelta::Seconds(\1)/g" find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/TimeDelta::us/TimeDelta::Micros/g" find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/TimeDelta::ms/TimeDelta::Millis/g" find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/TimeDelta::seconds/TimeDelta::Seconds/g" find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/Timestamp::Micros<\(.*\)>()/Timestamp::Micros(\1)/g" find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/Timestamp::Millis<\(.*\)>()/Timestamp::Millis(\1)/g" find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/Timestamp::Seconds<\(.*\)>()/Timestamp::Seconds(\1)/g" find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/Timestamp::us/Timestamp::Micros/g" find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/Timestamp::ms/Timestamp::Millis/g" find modules -type f \( -name "*.h" -o -name "*.cc" \) | xargs sed -i -e "s/Timestamp::seconds/Timestamp::Seconds/g" git cl format Bug: None Change-Id: I117d64a54950be040d996035c54bc0043310943a Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/168340 Reviewed-by: Karl Wiberg <kwiberg@webrtc.org> Commit-Queue: Danil Chapovalov <danilchap@webrtc.org> Cr-Commit-Position: refs/heads/master@{#30489}
2020-02-07 14:53:52 +01:00
constexpr TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1);
} // namespace
TaskQueuePacedSender::TaskQueuePacedSender(
Clock* clock,
PacketRouter* packet_router,
RtcEventLog* event_log,
const WebRtcKeyValueConfig* field_trials,
TaskQueueFactory* task_queue_factory,
TimeDelta hold_back_window)
: clock_(clock),
hold_back_window_(hold_back_window),
pacing_controller_(clock,
Reland "Lets PacingController call PacketRouter directly." This reverts commit 980cadd02c7384397a41c0e334e9f329f3cc5c65. Reason for revert: Problematic code now fix. Original change's description: > Revert "Lets PacingController call PacketRouter directly." > > This reverts commit 848ea9f0d3678118cb8926a2898454e5a4df58ae. > > Reason for revert: Part of changes that may cause deadlock > > Original change's description: > > Lets PacingController call PacketRouter directly. > > > > Since locking model has been cleaned up, PacingController can now call > > PacketRouter directly - without having to go via PacedSender or > > TaskQueuePacedSender. > > > > Bug: webrtc:10809 > > Change-Id: I181f04167d677c35395286f8b246aefb4c3e7ec7 > > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/175909 > > Reviewed-by: Sebastian Jansson <srte@webrtc.org> > > Commit-Queue: Erik Språng <sprang@webrtc.org> > > Cr-Commit-Position: refs/heads/master@{#31342} > > TBR=sprang@webrtc.org,srte@webrtc.org > > # Not skipping CQ checks because original CL landed > 1 day ago. > > Bug: webrtc:10809 > Change-Id: I1d7d5217a03a51555b130ec5c2dd6a992b6e489e > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/178021 > Reviewed-by: Erik Språng <sprang@webrtc.org> > Reviewed-by: Sebastian Jansson <srte@webrtc.org> > Commit-Queue: Erik Språng <sprang@webrtc.org> > Cr-Commit-Position: refs/heads/master@{#31563} TBR=sprang@webrtc.org,srte@webrtc.org # Not skipping CQ checks because original CL landed > 1 day ago. Bug: webrtc:10809 Change-Id: I8bea1a5b1b1f618b697e4b09d83c9aac08099593 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/178389 Reviewed-by: Sebastian Jansson <srte@webrtc.org> Commit-Queue: Erik Språng <sprang@webrtc.org> Cr-Commit-Position: refs/heads/master@{#31600}
2020-06-30 11:53:37 +00:00
packet_router,
event_log,
field_trials,
PacingController::ProcessMode::kDynamic),
next_process_time_(Timestamp::MinusInfinity()),
stats_update_scheduled_(false),
last_stats_time_(Timestamp::MinusInfinity()),
is_shutdown_(false),
task_queue_(task_queue_factory->CreateTaskQueue(
"TaskQueuePacedSender",
TaskQueueFactory::Priority::NORMAL)) {}
TaskQueuePacedSender::~TaskQueuePacedSender() {
// Post an immediate task to mark the queue as shutting down.
// The rtc::TaskQueue destructor will wait for pending tasks to
// complete before continuing.
task_queue_.PostTask([&]() {
RTC_DCHECK_RUN_ON(&task_queue_);
is_shutdown_ = true;
});
}
void TaskQueuePacedSender::CreateProbeCluster(DataRate bitrate,
int cluster_id) {
task_queue_.PostTask([this, bitrate, cluster_id]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.CreateProbeCluster(bitrate, cluster_id);
MaybeProcessPackets(Timestamp::MinusInfinity());
});
}
void TaskQueuePacedSender::Pause() {
task_queue_.PostTask([this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.Pause();
});
}
void TaskQueuePacedSender::Resume() {
task_queue_.PostTask([this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.Resume();
MaybeProcessPackets(Timestamp::MinusInfinity());
});
}
void TaskQueuePacedSender::SetCongestionWindow(
DataSize congestion_window_size) {
task_queue_.PostTask([this, congestion_window_size]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.SetCongestionWindow(congestion_window_size);
MaybeProcessPackets(Timestamp::MinusInfinity());
});
}
void TaskQueuePacedSender::UpdateOutstandingData(DataSize outstanding_data) {
if (task_queue_.IsCurrent()) {
RTC_DCHECK_RUN_ON(&task_queue_);
// Fast path since this can be called once per sent packet while on the
// task queue.
pacing_controller_.UpdateOutstandingData(outstanding_data);
return;
}
task_queue_.PostTask([this, outstanding_data]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.UpdateOutstandingData(outstanding_data);
MaybeProcessPackets(Timestamp::MinusInfinity());
});
}
void TaskQueuePacedSender::SetPacingRates(DataRate pacing_rate,
DataRate padding_rate) {
task_queue_.PostTask([this, pacing_rate, padding_rate]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
MaybeProcessPackets(Timestamp::MinusInfinity());
});
}
void TaskQueuePacedSender::EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
Reland "Add trace of enqueued and sent RTP packets" This reverts commit 45bb717a2866c2d836b5332a24af0d09f2b30714. Reason for revert: Use #if RTC_TRACE_EVENTS_ENABLED to avoid unused variable. Original change's description: > Revert "Add trace of enqueued and sent RTP packets" > > This reverts commit 45b9192ad981dcdc12ad4aef087fff2195bd030c. > > Reason for revert: When tracing is disabled, this results in a clang warning (unused variable), which results in a build error since Werror is enabled by default. > > Original change's description: > > Add trace of enqueued and sent RTP packets > > > > This is useful in debugging the latency from a packet > > is enqueued until it's sent. > > > > Bug: webrtc:11617 > > Change-Id: Ic2f194334a2e178de221df3a0838481035bb3505 > > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/176231 > > Reviewed-by: Erik Språng <sprang@webrtc.org> > > Commit-Queue: Johannes Kron <kron@webrtc.org> > > Cr-Commit-Position: refs/heads/master@{#31381} > > TBR=sprang@webrtc.org,kron@webrtc.org > > # Not skipping CQ checks because original CL landed > 1 day ago. > > Bug: webrtc:11617 > Change-Id: I854c17e587c624691a0e5e3ec9fd38c2607eda84 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/176380 > Commit-Queue: Casey Fischer <caseyfischer@google.com> > Reviewed-by: Adam Nathan <adamnathan@google.com> > Cr-Commit-Position: refs/heads/master@{#31399} TBR=sprang@webrtc.org,yujo@chromium.org,adamnathan@google.com,kron@webrtc.org,caseyfischer@google.com # Not skipping CQ checks because this is a reland. Bug: webrtc:11617 Change-Id: I9de7f7ed290481a51c161a693f5b2d5df7d2eae3 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/176367 Commit-Queue: Johannes Kron <kron@webrtc.org> Reviewed-by: Erik Språng <sprang@webrtc.org> Reviewed-by: Johannes Kron <kron@webrtc.org> Cr-Commit-Position: refs/heads/master@{#31407}
2020-06-01 23:28:44 +00:00
#if RTC_TRACE_EVENTS_ENABLED
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
"TaskQueuePacedSender::EnqueuePackets");
for (auto& packet : packets) {
TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc"),
"TaskQueuePacedSender::EnqueuePackets::Loop",
"sequence_number", packet->SequenceNumber(), "rtp_timestamp",
packet->Timestamp());
}
#endif
task_queue_.PostTask([this, packets_ = std::move(packets)]() mutable {
RTC_DCHECK_RUN_ON(&task_queue_);
for (auto& packet : packets_) {
pacing_controller_.EnqueuePacket(std::move(packet));
}
MaybeProcessPackets(Timestamp::MinusInfinity());
});
}
void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) {
task_queue_.PostTask([this, account_for_audio]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.SetAccountForAudioPackets(account_for_audio);
});
}
Reland "Reland "Only include overhead if using send side bandwidth estimation."" This is a reland of 086055d0fd9b9b9efe8bcf85884324a019e9bd33 ANA was accitendly disabled even when transport sequence numbers were negotiated due to a bug in how the audio send stream is configured. To solve this we simply continue to always allow enabling ANA and leave it up to the application to ensure that it's not used together with receive side estimation. Original change's description: > Reland "Only include overhead if using send side bandwidth estimation." > > This is a reland of 8c79c6e1af354c526497082c79ccbe12af03a33e > > Original change's description: > > Only include overhead if using send side bandwidth estimation. > > > > Bug: webrtc:11298 > > Change-Id: Ia2daf690461b55d394c1b964d6a7977a98be8be2 > > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/166820 > > Reviewed-by: Oskar Sundbom <ossu@webrtc.org> > > Reviewed-by: Sam Zackrisson <saza@webrtc.org> > > Reviewed-by: Ali Tofigh <alito@webrtc.org> > > Reviewed-by: Erik Språng <sprang@webrtc.org> > > Commit-Queue: Sebastian Jansson <srte@webrtc.org> > > Cr-Commit-Position: refs/heads/master@{#30382} > > Bug: webrtc:11298 > Change-Id: I33205e869a8ae27c15ffe991f6d985973ed6d15a > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/167524 > Reviewed-by: Ali Tofigh <alito@webrtc.org> > Reviewed-by: Sam Zackrisson <saza@webrtc.org> > Reviewed-by: Erik Språng <sprang@webrtc.org> > Reviewed-by: Oskar Sundbom <ossu@webrtc.org> > Commit-Queue: Sebastian Jansson <srte@webrtc.org> > Cr-Commit-Position: refs/heads/master@{#30390} Bug: webrtc:11298 Change-Id: If2ad91e17ebfc85dc51edcd9607996e18c5d1f13 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/167883 Commit-Queue: Sebastian Jansson <srte@webrtc.org> Reviewed-by: Sebastian Jansson <srte@webrtc.org> Reviewed-by: Karl Wiberg <kwiberg@webrtc.org> Cr-Commit-Position: refs/heads/master@{#30413}
2020-01-29 17:42:52 +01:00
void TaskQueuePacedSender::SetIncludeOverhead() {
task_queue_.PostTask([this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.SetIncludeOverhead();
});
}
Reland "Adds trial to use correct overhead calculation in pacer." This reverts commit 7affd9bcbb7a778408942d8afa4fe3ce29a8fc0b. Reason for revert: The perf issue has been addressed in the reland (https://webrtc-review.googlesource.com/c/src/+/167883). Original change's description: > Revert "Adds trial to use correct overhead calculation in pacer." > > This reverts commit 71a77c4b3b314a5e3b4e6b2f12d4886cff1b60d7. > > Reason for revert: https://webrtc-review.googlesource.com/c/src/+/167524 needs to be reverted and this CL causes a merge conflict. > > Original change's description: > > Adds trial to use correct overhead calculation in pacer. > > > > Bug: webrtc:9883 > > Change-Id: I1f25a235468678bf823ee1399ba31d94acf33be9 > > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/166534 > > Reviewed-by: Erik Språng <sprang@webrtc.org> > > Commit-Queue: Sebastian Jansson <srte@webrtc.org> > > Cr-Commit-Position: refs/heads/master@{#30399} > > TBR=sprang@webrtc.org,srte@webrtc.org > > Change-Id: I7d3efa29f70aa0363311766980acae6d88bbcaaa > No-Presubmit: true > No-Tree-Checks: true > No-Try: true > Bug: webrtc:9883 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/167880 > Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org> > Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org> > Cr-Commit-Position: refs/heads/master@{#30409} TBR=mbonadei@webrtc.org,sprang@webrtc.org,srte@webrtc.org Change-Id: Iafdef81d08078000dc368e001f67bee660e2f5bc No-Presubmit: true No-Tree-Checks: true No-Try: true Bug: webrtc:9883 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/167861 Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org> Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org> Cr-Commit-Position: refs/heads/master@{#30414}
2020-01-29 18:45:00 +00:00
void TaskQueuePacedSender::SetTransportOverhead(DataSize overhead_per_packet) {
task_queue_.PostTask([this, overhead_per_packet]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.SetTransportOverhead(overhead_per_packet);
});
}
void TaskQueuePacedSender::SetQueueTimeLimit(TimeDelta limit) {
task_queue_.PostTask([this, limit]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.SetQueueTimeLimit(limit);
MaybeProcessPackets(Timestamp::MinusInfinity());
});
}
TimeDelta TaskQueuePacedSender::ExpectedQueueTime() const {
return GetStats().expected_queue_time;
}
DataSize TaskQueuePacedSender::QueueSizeData() const {
return GetStats().queue_size;
}
absl::optional<Timestamp> TaskQueuePacedSender::FirstSentPacketTime() const {
return GetStats().first_sent_packet_time;
}
TimeDelta TaskQueuePacedSender::OldestPacketWaitTime() const {
return GetStats().oldest_packet_wait_time;
}
void TaskQueuePacedSender::OnStatsUpdated(const Stats& stats) {
MutexLock lock(&stats_mutex_);
current_stats_ = stats;
}
void TaskQueuePacedSender::MaybeProcessPackets(
Timestamp scheduled_process_time) {
RTC_DCHECK_RUN_ON(&task_queue_);
if (is_shutdown_) {
return;
}
// Normally, run ProcessPackets() only if this is the scheduled task.
// If it is not but it is already time to process and there either is
// no scheduled task or the schedule has shifted forward in time, run
// anyway and clear any schedule.
Timestamp next_process_time = pacing_controller_.NextSendTime();
const Timestamp now = clock_->CurrentTime();
const bool is_scheduled_call = next_process_time_ == scheduled_process_time;
if (is_scheduled_call) {
// Indicate no pending scheduled call.
next_process_time_ = Timestamp::MinusInfinity();
}
if (is_scheduled_call ||
(now >= next_process_time && (next_process_time_.IsInfinite() ||
next_process_time < next_process_time_))) {
pacing_controller_.ProcessPackets();
next_process_time = pacing_controller_.NextSendTime();
}
absl::optional<TimeDelta> time_to_next_process;
if (pacing_controller_.IsProbing() &&
next_process_time != next_process_time_) {
// If we're probing and there isn't already a wakeup scheduled for the next
// process time, always post a task and just round sleep time down to
// nearest millisecond.
if (next_process_time.IsMinusInfinity()) {
time_to_next_process = TimeDelta::Zero();
} else {
time_to_next_process =
std::max(TimeDelta::Zero(),
(next_process_time - now).RoundDownTo(TimeDelta::Millis(1)));
}
} else if (next_process_time_.IsMinusInfinity() ||
next_process_time <= next_process_time_ - hold_back_window_) {
// Schedule a new task since there is none currently scheduled
// (|next_process_time_| is infinite), or the new process time is at least
// one holdback window earlier than whatever is currently scheduled.
time_to_next_process = std::max(next_process_time - now, hold_back_window_);
}
if (time_to_next_process) {
// Set a new scheduled process time and post a delayed task.
next_process_time_ = next_process_time;
task_queue_.PostDelayedTask(
[this, next_process_time]() { MaybeProcessPackets(next_process_time); },
time_to_next_process->ms<uint32_t>());
}
MaybeUpdateStats(false);
}
void TaskQueuePacedSender::MaybeUpdateStats(bool is_scheduled_call) {
if (is_shutdown_) {
if (is_scheduled_call) {
stats_update_scheduled_ = false;
}
return;
}
Timestamp now = clock_->CurrentTime();
if (is_scheduled_call) {
// Allow scheduled task to process packets to clear up an remaining debt
// level in an otherwise empty queue.
pacing_controller_.ProcessPackets();
} else {
if (now - last_stats_time_ < kMinTimeBetweenStatsUpdates) {
// Too frequent unscheduled stats update, return early.
return;
}
}
Stats new_stats;
new_stats.expected_queue_time = pacing_controller_.ExpectedQueueTime();
new_stats.first_sent_packet_time = pacing_controller_.FirstSentPacketTime();
new_stats.oldest_packet_wait_time = pacing_controller_.OldestPacketWaitTime();
new_stats.queue_size = pacing_controller_.QueueSizeData();
OnStatsUpdated(new_stats);
last_stats_time_ = now;
bool pacer_drained = pacing_controller_.QueueSizePackets() == 0 &&
pacing_controller_.CurrentBufferLevel().IsZero();
// If there's anything interesting to get from the pacer and this is a
// scheduled call (or no scheduled call in flight), post a new scheduled stats
// update.
if (!pacer_drained) {
if (!stats_update_scheduled_) {
// There is no pending delayed task to update stats, add one.
// Treat this call as being scheduled in order to bootstrap scheduling
// loop.
stats_update_scheduled_ = true;
is_scheduled_call = true;
}
// Only if on the scheduled call loop do we want to schedule a new delayed
// task.
if (is_scheduled_call) {
task_queue_.PostDelayedTask(
[this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
MaybeUpdateStats(true);
},
kMaxTimeBetweenStatsUpdates.ms<uint32_t>());
}
} else if (is_scheduled_call) {
// This is a scheduled call, signing out since there's nothing interesting
// left to check.
stats_update_scheduled_ = false;
}
}
TaskQueuePacedSender::Stats TaskQueuePacedSender::GetStats() const {
MutexLock lock(&stats_mutex_);
return current_stats_;
}
} // namespace webrtc