webrtc_m130/rtc_base/thread.cc

1250 lines
35 KiB
C++
Raw Normal View History

/*
* Copyright 2004 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "rtc_base/thread.h"
#if defined(WEBRTC_WIN)
#include <comdef.h>
#elif defined(WEBRTC_POSIX)
#include <time.h>
#else
#error "Either WEBRTC_WIN or WEBRTC_POSIX needs to be defined."
#endif
#if defined(WEBRTC_WIN)
// Disable warning that we don't care about:
// warning C4722: destructor never returns, potential memory leak
#pragma warning(disable : 4722)
#endif
#include <stdio.h>
#include <utility>
#include "absl/algorithm/container.h"
#include "api/sequence_checker.h"
#include "rtc_base/atomic_ops.h"
#include "rtc_base/checks.h"
#include "rtc_base/deprecated/recursive_critical_section.h"
#include "rtc_base/event.h"
Reland "Refactor rtc_base build targets." This is a reland of 69241a93fb14f6527a26d5c94dde879013012d2a Fix: The problem was related to NO_MAIN_THREAD_WRAPPING, which affects https://source.chromium.org/chromium/chromium/src/+/master:third_party/webrtc/rtc_base/thread.cc;l=257-263;drc=7acc2d9fe3a6e3c4d8881d2bdfc9b8968a724cd5. The original CL didn't attach the definition of the macro NO_MAIN_THREAD_WRAPPING when building for Chromium (which doesn't have to be related to //rtc_base anymore but to //rtc_base:threading). Original change's description: > Refactor rtc_base build targets. > > The "//rtc_base:rtc_base" build target has historically been one of the > biggest targets in the WebRTC build. Big targets are the main source of > circular dependencies and non-API types leakage. > > This CL is a step forward into splitting "//rtc_base:rtc_base" into > smaller targets (as originally started in 2018). > > The only non-automated changes are (like re-wiring the build system): > * The creation of //rtc_base/async_resolver.{h,cc} which allows to > break a circular dependency (is has been extracted from > //rtc_base/net_helpers.{h,cc}). > * The creation of //rtc_base/internal/default_socket_server.{h,cc} to > break another circular dependency. > > Bug: webrtc:9987 > Change-Id: I0c8f5e7efe2c8fd8e6bffa0d6dd2dd494cf3df02 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/196903 > Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org> > Reviewed-by: Harald Alvestrand <hta@webrtc.org> > Cr-Commit-Position: refs/heads/master@{#32941} Bug: webrtc:9987 Change-Id: I7cdf49d2aac8357f1f50f90010bf2c2f62fa19f6 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/202021 Reviewed-by: Niels Moller <nisse@webrtc.org> Reviewed-by: Harald Alvestrand <hta@webrtc.org> Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org> Cr-Commit-Position: refs/heads/master@{#33001}
2021-01-15 10:41:01 +01:00
#include "rtc_base/internal/default_socket_server.h"
#include "rtc_base/logging.h"
#include "rtc_base/null_socket_server.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/time_utils.h"
#include "rtc_base/trace_event.h"
#if defined(WEBRTC_MAC)
#include "rtc_base/system/cocoa_threading.h"
/*
* These are forward-declarations for methods that are part of the
* ObjC runtime. They are declared in the private header objc-internal.h.
* These calls are what clang inserts when using @autoreleasepool in ObjC,
* but here they are used directly in order to keep this file C++.
* https://clang.llvm.org/docs/AutomaticReferenceCounting.html#runtime-support
*/
extern "C" {
void* objc_autoreleasePoolPush(void);
void objc_autoreleasePoolPop(void* pool);
}
namespace {
class ScopedAutoReleasePool {
public:
ScopedAutoReleasePool() : pool_(objc_autoreleasePoolPush()) {}
~ScopedAutoReleasePool() { objc_autoreleasePoolPop(pool_); }
private:
void* const pool_;
};
} // namespace
#endif
namespace rtc {
namespace {
class MessageHandlerWithTask final : public MessageHandler {
public:
MessageHandlerWithTask() {}
void OnMessage(Message* msg) override {
static_cast<rtc_thread_internal::MessageLikeTask*>(msg->pdata)->Run();
delete msg->pdata;
}
private:
~MessageHandlerWithTask() override {}
RTC_DISALLOW_COPY_AND_ASSIGN(MessageHandlerWithTask);
};
class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
public:
MarkProcessingCritScope(const RecursiveCriticalSection* cs,
size_t* processing) RTC_EXCLUSIVE_LOCK_FUNCTION(cs)
: cs_(cs), processing_(processing) {
cs_->Enter();
*processing_ += 1;
}
~MarkProcessingCritScope() RTC_UNLOCK_FUNCTION() {
*processing_ -= 1;
cs_->Leave();
}
private:
const RecursiveCriticalSection* const cs_;
size_t* processing_;
RTC_DISALLOW_COPY_AND_ASSIGN(MarkProcessingCritScope);
};
} // namespace
ThreadManager* ThreadManager::Instance() {
static ThreadManager* const thread_manager = new ThreadManager();
return thread_manager;
}
ThreadManager::~ThreadManager() {
// By above RTC_DEFINE_STATIC_LOCAL.
RTC_NOTREACHED() << "ThreadManager should never be destructed.";
}
// static
void ThreadManager::Add(Thread* message_queue) {
return Instance()->AddInternal(message_queue);
}
void ThreadManager::AddInternal(Thread* message_queue) {
CritScope cs(&crit_);
// Prevent changes while the list of message queues is processed.
RTC_DCHECK_EQ(processing_, 0);
message_queues_.push_back(message_queue);
}
// static
void ThreadManager::Remove(Thread* message_queue) {
return Instance()->RemoveInternal(message_queue);
}
void ThreadManager::RemoveInternal(Thread* message_queue) {
{
CritScope cs(&crit_);
// Prevent changes while the list of message queues is processed.
RTC_DCHECK_EQ(processing_, 0);
std::vector<Thread*>::iterator iter;
iter = absl::c_find(message_queues_, message_queue);
if (iter != message_queues_.end()) {
message_queues_.erase(iter);
}
#if RTC_DCHECK_IS_ON
RemoveFromSendGraph(message_queue);
#endif
}
}
#if RTC_DCHECK_IS_ON
void ThreadManager::RemoveFromSendGraph(Thread* thread) {
for (auto it = send_graph_.begin(); it != send_graph_.end();) {
if (it->first == thread) {
it = send_graph_.erase(it);
} else {
it->second.erase(thread);
++it;
}
}
}
void ThreadManager::RegisterSendAndCheckForCycles(Thread* source,
Thread* target) {
RTC_DCHECK(source);
RTC_DCHECK(target);
CritScope cs(&crit_);
std::deque<Thread*> all_targets({target});
// We check the pre-existing who-sends-to-who graph for any path from target
// to source. This loop is guaranteed to terminate because per the send graph
// invariant, there are no cycles in the graph.
for (size_t i = 0; i < all_targets.size(); i++) {
const auto& targets = send_graph_[all_targets[i]];
all_targets.insert(all_targets.end(), targets.begin(), targets.end());
}
RTC_CHECK_EQ(absl::c_count(all_targets, source), 0)
<< " send loop between " << source->name() << " and " << target->name();
// We may now insert source -> target without creating a cycle, since there
// was no path from target to source per the prior CHECK.
send_graph_[source].insert(target);
}
#endif
// static
void ThreadManager::Clear(MessageHandler* handler) {
return Instance()->ClearInternal(handler);
}
void ThreadManager::ClearInternal(MessageHandler* handler) {
// Deleted objects may cause re-entrant calls to ClearInternal. This is
// allowed as the list of message queues does not change while queues are
// cleared.
MarkProcessingCritScope cs(&crit_, &processing_);
for (Thread* queue : message_queues_) {
queue->Clear(handler);
}
}
// static
void ThreadManager::ProcessAllMessageQueuesForTesting() {
return Instance()->ProcessAllMessageQueuesInternal();
}
void ThreadManager::ProcessAllMessageQueuesInternal() {
// This works by posting a delayed message at the current time and waiting
// for it to be dispatched on all queues, which will ensure that all messages
// that came before it were also dispatched.
volatile int queues_not_done = 0;
// This class is used so that whether the posted message is processed, or the
// message queue is simply cleared, queues_not_done gets decremented.
class ScopedIncrement : public MessageData {
public:
ScopedIncrement(volatile int* value) : value_(value) {
AtomicOps::Increment(value_);
}
~ScopedIncrement() override { AtomicOps::Decrement(value_); }
private:
volatile int* value_;
};
{
MarkProcessingCritScope cs(&crit_, &processing_);
for (Thread* queue : message_queues_) {
if (!queue->IsProcessingMessagesForTesting()) {
// If the queue is not processing messages, it can
// be ignored. If we tried to post a message to it, it would be dropped
// or ignored.
continue;
}
queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE,
new ScopedIncrement(&queues_not_done));
}
}
rtc::Thread* current = rtc::Thread::Current();
// Note: One of the message queues may have been on this thread, which is
// why we can't synchronously wait for queues_not_done to go to 0; we need
// to process messages as well.
while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
if (current) {
current->ProcessMessages(0);
}
}
}
// static
Thread* Thread::Current() {
ThreadManager* manager = ThreadManager::Instance();
Thread* thread = manager->CurrentThread();
#ifndef NO_MAIN_THREAD_WRAPPING
// Only autowrap the thread which instantiated the ThreadManager.
if (!thread && manager->IsMainThread()) {
Reland "Refactor rtc_base build targets." This is a reland of 69241a93fb14f6527a26d5c94dde879013012d2a Fix: The problem was related to NO_MAIN_THREAD_WRAPPING, which affects https://source.chromium.org/chromium/chromium/src/+/master:third_party/webrtc/rtc_base/thread.cc;l=257-263;drc=7acc2d9fe3a6e3c4d8881d2bdfc9b8968a724cd5. The original CL didn't attach the definition of the macro NO_MAIN_THREAD_WRAPPING when building for Chromium (which doesn't have to be related to //rtc_base anymore but to //rtc_base:threading). Original change's description: > Refactor rtc_base build targets. > > The "//rtc_base:rtc_base" build target has historically been one of the > biggest targets in the WebRTC build. Big targets are the main source of > circular dependencies and non-API types leakage. > > This CL is a step forward into splitting "//rtc_base:rtc_base" into > smaller targets (as originally started in 2018). > > The only non-automated changes are (like re-wiring the build system): > * The creation of //rtc_base/async_resolver.{h,cc} which allows to > break a circular dependency (is has been extracted from > //rtc_base/net_helpers.{h,cc}). > * The creation of //rtc_base/internal/default_socket_server.{h,cc} to > break another circular dependency. > > Bug: webrtc:9987 > Change-Id: I0c8f5e7efe2c8fd8e6bffa0d6dd2dd494cf3df02 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/196903 > Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org> > Reviewed-by: Harald Alvestrand <hta@webrtc.org> > Cr-Commit-Position: refs/heads/master@{#32941} Bug: webrtc:9987 Change-Id: I7cdf49d2aac8357f1f50f90010bf2c2f62fa19f6 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/202021 Reviewed-by: Niels Moller <nisse@webrtc.org> Reviewed-by: Harald Alvestrand <hta@webrtc.org> Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org> Cr-Commit-Position: refs/heads/master@{#33001}
2021-01-15 10:41:01 +01:00
thread = new Thread(CreateDefaultSocketServer());
thread->WrapCurrentWithThreadManager(manager, true);
}
#endif
return thread;
}
#if defined(WEBRTC_POSIX)
ThreadManager::ThreadManager() : main_thread_ref_(CurrentThreadRef()) {
#if defined(WEBRTC_MAC)
InitCocoaMultiThreading();
#endif
pthread_key_create(&key_, nullptr);
}
Thread* ThreadManager::CurrentThread() {
return static_cast<Thread*>(pthread_getspecific(key_));
}
void ThreadManager::SetCurrentThreadInternal(Thread* thread) {
pthread_setspecific(key_, thread);
}
#endif
#if defined(WEBRTC_WIN)
ThreadManager::ThreadManager()
: key_(TlsAlloc()), main_thread_ref_(CurrentThreadRef()) {}
Thread* ThreadManager::CurrentThread() {
return static_cast<Thread*>(TlsGetValue(key_));
}
void ThreadManager::SetCurrentThreadInternal(Thread* thread) {
TlsSetValue(key_, thread);
}
#endif
void ThreadManager::SetCurrentThread(Thread* thread) {
#if RTC_DLOG_IS_ON
if (CurrentThread() && thread) {
RTC_DLOG(LS_ERROR) << "SetCurrentThread: Overwriting an existing value?";
}
#endif // RTC_DLOG_IS_ON
Reland "Make sure that "current" rtc::Thread instances are always current for TaskQueueBase." This reverts commit 28685dc08cb34f756f9200519fba3222ba3a66f2. Reason for revert: Speculative reland after looking into downstream failures. It's possible that carryover state from unrelated tests running in parallel was causing failures. Original change's description: > Revert "Make sure that "current" rtc::Thread instances are always current for TaskQueueBase." > > This reverts commit 46b3bc6c24c233fe41a2401ce6e8eb8204a2d5a8. > > Reason for revert: Speculative revert. Breaks downstream project > > Original change's description: > > Make sure that "current" rtc::Thread instances are always current for TaskQueueBase. > > > > This is a necessary part of fulfilling the TaskQueueBase > > interface. If a thread does not register as the current TQ, yet offers > > the TQ interface, TQ 'current' checks will not work as expected and > > code that relies them (TaskQueueBase::Current() and IsCurrent()) > > will run in unexpected ways. > > > > Bug: webrtc:11572 > > Change-Id: Iab747bc474e74e6ce4f9e914cfd5b0578b19d19c > > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/175080 > > Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org> > > Commit-Queue: Tommi <tommi@webrtc.org> > > Cr-Commit-Position: refs/heads/master@{#31254} > > TBR=mbonadei@webrtc.org,tommi@webrtc.org > > Change-Id: I69ff3355f0ec447b25604bd95fdacbdb4d4f3f27 > No-Presubmit: true > No-Tree-Checks: true > No-Try: true > Bug: webrtc:11572 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/175104 > Reviewed-by: Artem Titov <titovartem@webrtc.org> > Commit-Queue: Artem Titov <titovartem@webrtc.org> > Cr-Commit-Position: refs/heads/master@{#31259} TBR=mbonadei@webrtc.org,tommi@webrtc.org,titovartem@webrtc.org # Not skipping CQ checks because this is a reland. Bug: webrtc:11572 Change-Id: I00c82d99af8e05851769e09cb682b5b73895a6f3 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/175133 Reviewed-by: Tommi <tommi@webrtc.org> Reviewed-by: Artem Titov <titovartem@webrtc.org> Commit-Queue: Tommi <tommi@webrtc.org> Cr-Commit-Position: refs/heads/master@{#31273}
2020-05-15 10:11:56 +02:00
if (thread) {
thread->EnsureIsCurrentTaskQueue();
} else {
Thread* current = CurrentThread();
if (current) {
// The current thread is being cleared, e.g. as a result of
// UnwrapCurrent() being called or when a thread is being stopped
// (see PreRun()). This signals that the Thread instance is being detached
// from the thread, which also means that TaskQueue::Current() must not
// return a pointer to the Thread instance.
current->ClearCurrentTaskQueue();
}
}
SetCurrentThreadInternal(thread);
}
void rtc::ThreadManager::ChangeCurrentThreadForTest(rtc::Thread* thread) {
SetCurrentThreadInternal(thread);
}
Thread* ThreadManager::WrapCurrentThread() {
Thread* result = CurrentThread();
if (nullptr == result) {
Reland "Refactor rtc_base build targets." This is a reland of 69241a93fb14f6527a26d5c94dde879013012d2a Fix: The problem was related to NO_MAIN_THREAD_WRAPPING, which affects https://source.chromium.org/chromium/chromium/src/+/master:third_party/webrtc/rtc_base/thread.cc;l=257-263;drc=7acc2d9fe3a6e3c4d8881d2bdfc9b8968a724cd5. The original CL didn't attach the definition of the macro NO_MAIN_THREAD_WRAPPING when building for Chromium (which doesn't have to be related to //rtc_base anymore but to //rtc_base:threading). Original change's description: > Refactor rtc_base build targets. > > The "//rtc_base:rtc_base" build target has historically been one of the > biggest targets in the WebRTC build. Big targets are the main source of > circular dependencies and non-API types leakage. > > This CL is a step forward into splitting "//rtc_base:rtc_base" into > smaller targets (as originally started in 2018). > > The only non-automated changes are (like re-wiring the build system): > * The creation of //rtc_base/async_resolver.{h,cc} which allows to > break a circular dependency (is has been extracted from > //rtc_base/net_helpers.{h,cc}). > * The creation of //rtc_base/internal/default_socket_server.{h,cc} to > break another circular dependency. > > Bug: webrtc:9987 > Change-Id: I0c8f5e7efe2c8fd8e6bffa0d6dd2dd494cf3df02 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/196903 > Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org> > Reviewed-by: Harald Alvestrand <hta@webrtc.org> > Cr-Commit-Position: refs/heads/master@{#32941} Bug: webrtc:9987 Change-Id: I7cdf49d2aac8357f1f50f90010bf2c2f62fa19f6 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/202021 Reviewed-by: Niels Moller <nisse@webrtc.org> Reviewed-by: Harald Alvestrand <hta@webrtc.org> Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org> Cr-Commit-Position: refs/heads/master@{#33001}
2021-01-15 10:41:01 +01:00
result = new Thread(CreateDefaultSocketServer());
result->WrapCurrentWithThreadManager(this, true);
}
return result;
}
void ThreadManager::UnwrapCurrentThread() {
Thread* t = CurrentThread();
if (t && !(t->IsOwned())) {
t->UnwrapCurrent();
delete t;
}
}
bool ThreadManager::IsMainThread() {
return IsThreadRefEqual(CurrentThreadRef(), main_thread_ref_);
}
Thread::ScopedDisallowBlockingCalls::ScopedDisallowBlockingCalls()
: thread_(Thread::Current()),
previous_state_(thread_->SetAllowBlockingCalls(false)) {}
Thread::ScopedDisallowBlockingCalls::~ScopedDisallowBlockingCalls() {
RTC_DCHECK(thread_->IsCurrent());
thread_->SetAllowBlockingCalls(previous_state_);
}
Add utility to count the number of blocking thread invokes. This is useful to understand how often we block in certain parts of the api and track improvements/regressions. There are two macros, both are only active for RTC_DCHECK_IS_ON builds: * RTC_LOG_THREAD_BLOCK_COUNT() Example: void MyClass::MyFunction() { RTC_LOG_THREAD_BLOCK_COUNT(); thread_->Invoke<void>([this](){ DoStuff(); }); } When executing this function during a test, the output could be: (my_file.cc:2): Blocking MyFunction: total=1 (actual=1, would=0) The words 'actual' and 'would' reflect whether an actual thread switch was made, or if in the case of a test using the same thread for more than one role (e.g. signaling, worker, network are all the same thread) that an actual thread switch did not occur but it would have occurred in the case of having dedicated threads. The 'total' count is the sum. * RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(x) Example: void MyClass::MyFunction() { RTC_LOG_THREAD_BLOCK_COUNT(); thread_->Invoke<void>([this](){ DoStuff(); }); thread_->Invoke<void>([this](){ MoreStuff(); }); RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(1); } When a function is known to have blocking calls and we want to not regress from the currently known number of blocking calls, we can use this macro to state that at a certain point in a function, below where RTC_LOG_THREAD_BLOCK_COUNT() is called, there must have occurred no more than |x| (total) blocking calls. If more occur, a DCHECK will hit and print out what the actual number of calls was: # Fatal error in: my_file.cc, line 5 # last system error: 60 # Check failed: blocked_call_count_printer.GetTotalBlockedCallCount() <= 1 (2 vs. 1) Bug: webrtc:12649 Change-Id: Ibac4f85f00b89680601dba54a651eac95a0f45d3 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/213782 Commit-Queue: Tommi <tommi@webrtc.org> Reviewed-by: Harald Alvestrand <hta@webrtc.org> Cr-Commit-Position: refs/heads/master@{#33632}
2021-04-07 10:08:28 +02:00
#if RTC_DCHECK_IS_ON
Thread::ScopedCountBlockingCalls::ScopedCountBlockingCalls(
std::function<void(uint32_t, uint32_t)> callback)
: thread_(Thread::Current()),
base_blocking_call_count_(thread_->GetBlockingCallCount()),
base_could_be_blocking_call_count_(
thread_->GetCouldBeBlockingCallCount()),
result_callback_(std::move(callback)) {}
Thread::ScopedCountBlockingCalls::~ScopedCountBlockingCalls() {
if (GetTotalBlockedCallCount() >= min_blocking_calls_for_callback_) {
result_callback_(GetBlockingCallCount(), GetCouldBeBlockingCallCount());
}
Add utility to count the number of blocking thread invokes. This is useful to understand how often we block in certain parts of the api and track improvements/regressions. There are two macros, both are only active for RTC_DCHECK_IS_ON builds: * RTC_LOG_THREAD_BLOCK_COUNT() Example: void MyClass::MyFunction() { RTC_LOG_THREAD_BLOCK_COUNT(); thread_->Invoke<void>([this](){ DoStuff(); }); } When executing this function during a test, the output could be: (my_file.cc:2): Blocking MyFunction: total=1 (actual=1, would=0) The words 'actual' and 'would' reflect whether an actual thread switch was made, or if in the case of a test using the same thread for more than one role (e.g. signaling, worker, network are all the same thread) that an actual thread switch did not occur but it would have occurred in the case of having dedicated threads. The 'total' count is the sum. * RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(x) Example: void MyClass::MyFunction() { RTC_LOG_THREAD_BLOCK_COUNT(); thread_->Invoke<void>([this](){ DoStuff(); }); thread_->Invoke<void>([this](){ MoreStuff(); }); RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(1); } When a function is known to have blocking calls and we want to not regress from the currently known number of blocking calls, we can use this macro to state that at a certain point in a function, below where RTC_LOG_THREAD_BLOCK_COUNT() is called, there must have occurred no more than |x| (total) blocking calls. If more occur, a DCHECK will hit and print out what the actual number of calls was: # Fatal error in: my_file.cc, line 5 # last system error: 60 # Check failed: blocked_call_count_printer.GetTotalBlockedCallCount() <= 1 (2 vs. 1) Bug: webrtc:12649 Change-Id: Ibac4f85f00b89680601dba54a651eac95a0f45d3 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/213782 Commit-Queue: Tommi <tommi@webrtc.org> Reviewed-by: Harald Alvestrand <hta@webrtc.org> Cr-Commit-Position: refs/heads/master@{#33632}
2021-04-07 10:08:28 +02:00
}
uint32_t Thread::ScopedCountBlockingCalls::GetBlockingCallCount() const {
return thread_->GetBlockingCallCount() - base_blocking_call_count_;
}
uint32_t Thread::ScopedCountBlockingCalls::GetCouldBeBlockingCallCount() const {
return thread_->GetCouldBeBlockingCallCount() -
base_could_be_blocking_call_count_;
}
uint32_t Thread::ScopedCountBlockingCalls::GetTotalBlockedCallCount() const {
return GetBlockingCallCount() + GetCouldBeBlockingCallCount();
}
#endif
Thread::Thread(SocketServer* ss) : Thread(ss, /*do_init=*/true) {}
Thread::Thread(std::unique_ptr<SocketServer> ss)
: Thread(std::move(ss), /*do_init=*/true) {}
Thread::Thread(SocketServer* ss, bool do_init)
: fPeekKeep_(false),
delayed_next_num_(0),
fInitialized_(false),
fDestroyed_(false),
stop_(0),
ss_(ss) {
RTC_DCHECK(ss);
ss_->SetMessageQueue(this);
SetName("Thread", this); // default name
if (do_init) {
DoInit();
}
}
Thread::Thread(std::unique_ptr<SocketServer> ss, bool do_init)
: Thread(ss.get(), do_init) {
own_ss_ = std::move(ss);
}
Thread::~Thread() {
Stop();
DoDestroy();
}
void Thread::DoInit() {
if (fInitialized_) {
return;
}
fInitialized_ = true;
ThreadManager::Add(this);
}
void Thread::DoDestroy() {
if (fDestroyed_) {
return;
}
fDestroyed_ = true;
// The signal is done from here to ensure
// that it always gets called when the queue
// is going away.
if (ss_) {
ss_->SetMessageQueue(nullptr);
}
ThreadManager::Remove(this);
ClearInternal(nullptr, MQID_ANY, nullptr);
}
SocketServer* Thread::socketserver() {
return ss_;
}
void Thread::WakeUpSocketServer() {
ss_->WakeUp();
}
void Thread::Quit() {
AtomicOps::ReleaseStore(&stop_, 1);
WakeUpSocketServer();
}
bool Thread::IsQuitting() {
return AtomicOps::AcquireLoad(&stop_) != 0;
}
void Thread::Restart() {
AtomicOps::ReleaseStore(&stop_, 0);
}
bool Thread::Peek(Message* pmsg, int cmsWait) {
if (fPeekKeep_) {
*pmsg = msgPeek_;
return true;
}
if (!Get(pmsg, cmsWait))
return false;
msgPeek_ = *pmsg;
fPeekKeep_ = true;
return true;
}
bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
// Return and clear peek if present
// Always return the peek if it exists so there is Peek/Get symmetry
if (fPeekKeep_) {
*pmsg = msgPeek_;
fPeekKeep_ = false;
return true;
}
// Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
int64_t cmsTotal = cmsWait;
int64_t cmsElapsed = 0;
int64_t msStart = TimeMillis();
int64_t msCurrent = msStart;
while (true) {
// Check for posted events
int64_t cmsDelayNext = kForever;
bool first_pass = true;
while (true) {
// All queue operations need to be locked, but nothing else in this loop
// (specifically handling disposed message) can happen inside the crit.
// Otherwise, disposed MessageHandlers will cause deadlocks.
{
CritScope cs(&crit_);
// On the first pass, check for delayed messages that have been
// triggered and calculate the next trigger time.
if (first_pass) {
first_pass = false;
while (!delayed_messages_.empty()) {
if (msCurrent < delayed_messages_.top().run_time_ms_) {
cmsDelayNext =
TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent);
break;
}
messages_.push_back(delayed_messages_.top().msg_);
delayed_messages_.pop();
}
}
// Pull a message off the message queue, if available.
if (messages_.empty()) {
break;
} else {
*pmsg = messages_.front();
messages_.pop_front();
}
} // crit_ is released here.
// If this was a dispose message, delete it and skip it.
if (MQID_DISPOSE == pmsg->message_id) {
RTC_DCHECK(nullptr == pmsg->phandler);
delete pmsg->pdata;
*pmsg = Message();
continue;
}
return true;
}
if (IsQuitting())
break;
// Which is shorter, the delay wait or the asked wait?
int64_t cmsNext;
if (cmsWait == kForever) {
cmsNext = cmsDelayNext;
} else {
cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
cmsNext = cmsDelayNext;
}
{
// Wait and multiplex in the meantime
if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
return false;
}
// If the specified timeout expired, return
msCurrent = TimeMillis();
cmsElapsed = TimeDiff(msCurrent, msStart);
if (cmsWait != kForever) {
if (cmsElapsed >= cmsWait)
return false;
}
}
return false;
}
void Thread::Post(const Location& posted_from,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata,
bool time_sensitive) {
RTC_DCHECK(!time_sensitive);
if (IsQuitting()) {
delete pdata;
return;
}
// Keep thread safe
// Add the message to the end of the queue
// Signal for the multiplexer to return
{
CritScope cs(&crit_);
Message msg;
msg.posted_from = posted_from;
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;
messages_.push_back(msg);
}
WakeUpSocketServer();
}
void Thread::PostDelayed(const Location& posted_from,
int delay_ms,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata) {
return DoDelayPost(posted_from, delay_ms, TimeAfter(delay_ms), phandler, id,
pdata);
}
void Thread::PostAt(const Location& posted_from,
int64_t run_at_ms,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata) {
return DoDelayPost(posted_from, TimeUntil(run_at_ms), run_at_ms, phandler, id,
pdata);
}
void Thread::DoDelayPost(const Location& posted_from,
int64_t delay_ms,
int64_t run_at_ms,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata) {
if (IsQuitting()) {
delete pdata;
return;
}
// Keep thread safe
// Add to the priority queue. Gets sorted soonest first.
// Signal for the multiplexer to return.
{
CritScope cs(&crit_);
Message msg;
msg.posted_from = posted_from;
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;
DelayedMessage delayed(delay_ms, run_at_ms, delayed_next_num_, msg);
delayed_messages_.push(delayed);
// If this message queue processes 1 message every millisecond for 50 days,
// we will wrap this number. Even then, only messages with identical times
// will be misordered, and then only briefly. This is probably ok.
++delayed_next_num_;
RTC_DCHECK_NE(0, delayed_next_num_);
}
WakeUpSocketServer();
}
int Thread::GetDelay() {
CritScope cs(&crit_);
if (!messages_.empty())
return 0;
if (!delayed_messages_.empty()) {
int delay = TimeUntil(delayed_messages_.top().run_time_ms_);
if (delay < 0)
delay = 0;
return delay;
}
return kForever;
}
void Thread::ClearInternal(MessageHandler* phandler,
uint32_t id,
MessageList* removed) {
// Remove messages with phandler
if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
if (removed) {
removed->push_back(msgPeek_);
} else {
delete msgPeek_.pdata;
}
fPeekKeep_ = false;
}
// Remove from ordered message queue
for (auto it = messages_.begin(); it != messages_.end();) {
if (it->Match(phandler, id)) {
if (removed) {
removed->push_back(*it);
} else {
delete it->pdata;
}
it = messages_.erase(it);
} else {
++it;
}
}
// Remove from priority queue. Not directly iterable, so use this approach
auto new_end = delayed_messages_.container().begin();
for (auto it = new_end; it != delayed_messages_.container().end(); ++it) {
if (it->msg_.Match(phandler, id)) {
if (removed) {
removed->push_back(it->msg_);
} else {
delete it->msg_.pdata;
}
} else {
*new_end++ = *it;
}
}
delayed_messages_.container().erase(new_end,
delayed_messages_.container().end());
delayed_messages_.reheap();
}
void Thread::Dispatch(Message* pmsg) {
TRACE_EVENT2("webrtc", "Thread::Dispatch", "src_file",
pmsg->posted_from.file_name(), "src_func",
pmsg->posted_from.function_name());
RTC_DCHECK_RUN_ON(this);
int64_t start_time = TimeMillis();
pmsg->phandler->OnMessage(pmsg);
int64_t end_time = TimeMillis();
int64_t diff = TimeDiff(end_time, start_time);
if (diff >= dispatch_warning_ms_) {
RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff
<< "ms to dispatch. Posted from: "
<< pmsg->posted_from.ToString();
// To avoid log spew, move the warning limit to only give warning
// for delays that are larger than the one observed.
dispatch_warning_ms_ = diff + 1;
}
}
bool Thread::IsCurrent() const {
return ThreadManager::Instance()->CurrentThread() == this;
}
std::unique_ptr<Thread> Thread::CreateWithSocketServer() {
Reland "Refactor rtc_base build targets." This is a reland of 69241a93fb14f6527a26d5c94dde879013012d2a Fix: The problem was related to NO_MAIN_THREAD_WRAPPING, which affects https://source.chromium.org/chromium/chromium/src/+/master:third_party/webrtc/rtc_base/thread.cc;l=257-263;drc=7acc2d9fe3a6e3c4d8881d2bdfc9b8968a724cd5. The original CL didn't attach the definition of the macro NO_MAIN_THREAD_WRAPPING when building for Chromium (which doesn't have to be related to //rtc_base anymore but to //rtc_base:threading). Original change's description: > Refactor rtc_base build targets. > > The "//rtc_base:rtc_base" build target has historically been one of the > biggest targets in the WebRTC build. Big targets are the main source of > circular dependencies and non-API types leakage. > > This CL is a step forward into splitting "//rtc_base:rtc_base" into > smaller targets (as originally started in 2018). > > The only non-automated changes are (like re-wiring the build system): > * The creation of //rtc_base/async_resolver.{h,cc} which allows to > break a circular dependency (is has been extracted from > //rtc_base/net_helpers.{h,cc}). > * The creation of //rtc_base/internal/default_socket_server.{h,cc} to > break another circular dependency. > > Bug: webrtc:9987 > Change-Id: I0c8f5e7efe2c8fd8e6bffa0d6dd2dd494cf3df02 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/196903 > Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org> > Reviewed-by: Harald Alvestrand <hta@webrtc.org> > Cr-Commit-Position: refs/heads/master@{#32941} Bug: webrtc:9987 Change-Id: I7cdf49d2aac8357f1f50f90010bf2c2f62fa19f6 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/202021 Reviewed-by: Niels Moller <nisse@webrtc.org> Reviewed-by: Harald Alvestrand <hta@webrtc.org> Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org> Cr-Commit-Position: refs/heads/master@{#33001}
2021-01-15 10:41:01 +01:00
return std::unique_ptr<Thread>(new Thread(CreateDefaultSocketServer()));
}
std::unique_ptr<Thread> Thread::Create() {
return std::unique_ptr<Thread>(
new Thread(std::unique_ptr<SocketServer>(new NullSocketServer())));
}
bool Thread::SleepMs(int milliseconds) {
AssertBlockingIsAllowedOnCurrentThread();
#if defined(WEBRTC_WIN)
::Sleep(milliseconds);
return true;
#else
// POSIX has both a usleep() and a nanosleep(), but the former is deprecated,
// so we use nanosleep() even though it has greater precision than necessary.
struct timespec ts;
ts.tv_sec = milliseconds / 1000;
ts.tv_nsec = (milliseconds % 1000) * 1000000;
int ret = nanosleep(&ts, nullptr);
if (ret != 0) {
RTC_LOG_ERR(LS_WARNING) << "nanosleep() returning early";
return false;
}
return true;
#endif
}
bool Thread::SetName(const std::string& name, const void* obj) {
RTC_DCHECK(!IsRunning());
name_ = name;
if (obj) {
// The %p specifier typically produce at most 16 hex digits, possibly with a
// 0x prefix. But format is implementation defined, so add some margin.
char buf[30];
snprintf(buf, sizeof(buf), " 0x%p", obj);
name_ += buf;
}
return true;
}
void Thread::SetDispatchWarningMs(int deadline) {
if (!IsCurrent()) {
PostTask(webrtc::ToQueuedTask(
[this, deadline]() { SetDispatchWarningMs(deadline); }));
return;
}
RTC_DCHECK_RUN_ON(this);
dispatch_warning_ms_ = deadline;
}
bool Thread::Start() {
RTC_DCHECK(!IsRunning());
if (IsRunning())
return false;
Restart(); // reset IsQuitting() if the thread is being restarted
// Make sure that ThreadManager is created on the main thread before
// we start a new thread.
ThreadManager::Instance();
owned_ = true;
#if defined(WEBRTC_WIN)
thread_ = CreateThread(nullptr, 0, PreRun, this, 0, &thread_id_);
if (!thread_) {
return false;
}
#elif defined(WEBRTC_POSIX)
pthread_attr_t attr;
pthread_attr_init(&attr);
int error_code = pthread_create(&thread_, &attr, PreRun, this);
if (0 != error_code) {
RTC_LOG(LS_ERROR) << "Unable to create pthread, error " << error_code;
thread_ = 0;
return false;
}
RTC_DCHECK(thread_);
#endif
return true;
}
bool Thread::WrapCurrent() {
return WrapCurrentWithThreadManager(ThreadManager::Instance(), true);
}
void Thread::UnwrapCurrent() {
// Clears the platform-specific thread-specific storage.
ThreadManager::Instance()->SetCurrentThread(nullptr);
#if defined(WEBRTC_WIN)
if (thread_ != nullptr) {
if (!CloseHandle(thread_)) {
RTC_LOG_GLE(LS_ERROR)
<< "When unwrapping thread, failed to close handle.";
}
thread_ = nullptr;
thread_id_ = 0;
}
#elif defined(WEBRTC_POSIX)
thread_ = 0;
#endif
}
void Thread::SafeWrapCurrent() {
WrapCurrentWithThreadManager(ThreadManager::Instance(), false);
}
void Thread::Join() {
if (!IsRunning())
return;
RTC_DCHECK(!IsCurrent());
if (Current() && !Current()->blocking_calls_allowed_) {
RTC_LOG(LS_WARNING) << "Waiting for the thread to join, "
"but blocking calls have been disallowed";
}
#if defined(WEBRTC_WIN)
RTC_DCHECK(thread_ != nullptr);
WaitForSingleObject(thread_, INFINITE);
CloseHandle(thread_);
thread_ = nullptr;
thread_id_ = 0;
#elif defined(WEBRTC_POSIX)
pthread_join(thread_, nullptr);
thread_ = 0;
#endif
}
bool Thread::SetAllowBlockingCalls(bool allow) {
RTC_DCHECK(IsCurrent());
bool previous = blocking_calls_allowed_;
blocking_calls_allowed_ = allow;
return previous;
}
// static
void Thread::AssertBlockingIsAllowedOnCurrentThread() {
#if !defined(NDEBUG)
Thread* current = Thread::Current();
RTC_DCHECK(!current || current->blocking_calls_allowed_);
#endif
}
// static
#if defined(WEBRTC_WIN)
DWORD WINAPI Thread::PreRun(LPVOID pv) {
#else
void* Thread::PreRun(void* pv) {
#endif
Thread* thread = static_cast<Thread*>(pv);
ThreadManager::Instance()->SetCurrentThread(thread);
rtc::SetCurrentThreadName(thread->name_.c_str());
#if defined(WEBRTC_MAC)
ScopedAutoReleasePool pool;
#endif
thread->Run();
ThreadManager::Instance()->SetCurrentThread(nullptr);
#ifdef WEBRTC_WIN
return 0;
#else
return nullptr;
#endif
} // namespace rtc
void Thread::Run() {
ProcessMessages(kForever);
}
bool Thread::IsOwned() {
RTC_DCHECK(IsRunning());
return owned_;
}
void Thread::Stop() {
Thread::Quit();
Join();
}
void Thread::Send(const Location& posted_from,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata) {
RTC_DCHECK(!IsQuitting());
if (IsQuitting())
return;
// Sent messages are sent to the MessageHandler directly, in the context
// of "thread", like Win32 SendMessage. If in the right context,
// call the handler directly.
Message msg;
msg.posted_from = posted_from;
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;
if (IsCurrent()) {
Add utility to count the number of blocking thread invokes. This is useful to understand how often we block in certain parts of the api and track improvements/regressions. There are two macros, both are only active for RTC_DCHECK_IS_ON builds: * RTC_LOG_THREAD_BLOCK_COUNT() Example: void MyClass::MyFunction() { RTC_LOG_THREAD_BLOCK_COUNT(); thread_->Invoke<void>([this](){ DoStuff(); }); } When executing this function during a test, the output could be: (my_file.cc:2): Blocking MyFunction: total=1 (actual=1, would=0) The words 'actual' and 'would' reflect whether an actual thread switch was made, or if in the case of a test using the same thread for more than one role (e.g. signaling, worker, network are all the same thread) that an actual thread switch did not occur but it would have occurred in the case of having dedicated threads. The 'total' count is the sum. * RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(x) Example: void MyClass::MyFunction() { RTC_LOG_THREAD_BLOCK_COUNT(); thread_->Invoke<void>([this](){ DoStuff(); }); thread_->Invoke<void>([this](){ MoreStuff(); }); RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(1); } When a function is known to have blocking calls and we want to not regress from the currently known number of blocking calls, we can use this macro to state that at a certain point in a function, below where RTC_LOG_THREAD_BLOCK_COUNT() is called, there must have occurred no more than |x| (total) blocking calls. If more occur, a DCHECK will hit and print out what the actual number of calls was: # Fatal error in: my_file.cc, line 5 # last system error: 60 # Check failed: blocked_call_count_printer.GetTotalBlockedCallCount() <= 1 (2 vs. 1) Bug: webrtc:12649 Change-Id: Ibac4f85f00b89680601dba54a651eac95a0f45d3 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/213782 Commit-Queue: Tommi <tommi@webrtc.org> Reviewed-by: Harald Alvestrand <hta@webrtc.org> Cr-Commit-Position: refs/heads/master@{#33632}
2021-04-07 10:08:28 +02:00
#if RTC_DCHECK_IS_ON
RTC_DCHECK(this->IsInvokeToThreadAllowed(this));
Add utility to count the number of blocking thread invokes. This is useful to understand how often we block in certain parts of the api and track improvements/regressions. There are two macros, both are only active for RTC_DCHECK_IS_ON builds: * RTC_LOG_THREAD_BLOCK_COUNT() Example: void MyClass::MyFunction() { RTC_LOG_THREAD_BLOCK_COUNT(); thread_->Invoke<void>([this](){ DoStuff(); }); } When executing this function during a test, the output could be: (my_file.cc:2): Blocking MyFunction: total=1 (actual=1, would=0) The words 'actual' and 'would' reflect whether an actual thread switch was made, or if in the case of a test using the same thread for more than one role (e.g. signaling, worker, network are all the same thread) that an actual thread switch did not occur but it would have occurred in the case of having dedicated threads. The 'total' count is the sum. * RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(x) Example: void MyClass::MyFunction() { RTC_LOG_THREAD_BLOCK_COUNT(); thread_->Invoke<void>([this](){ DoStuff(); }); thread_->Invoke<void>([this](){ MoreStuff(); }); RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(1); } When a function is known to have blocking calls and we want to not regress from the currently known number of blocking calls, we can use this macro to state that at a certain point in a function, below where RTC_LOG_THREAD_BLOCK_COUNT() is called, there must have occurred no more than |x| (total) blocking calls. If more occur, a DCHECK will hit and print out what the actual number of calls was: # Fatal error in: my_file.cc, line 5 # last system error: 60 # Check failed: blocked_call_count_printer.GetTotalBlockedCallCount() <= 1 (2 vs. 1) Bug: webrtc:12649 Change-Id: Ibac4f85f00b89680601dba54a651eac95a0f45d3 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/213782 Commit-Queue: Tommi <tommi@webrtc.org> Reviewed-by: Harald Alvestrand <hta@webrtc.org> Cr-Commit-Position: refs/heads/master@{#33632}
2021-04-07 10:08:28 +02:00
RTC_DCHECK_RUN_ON(this);
could_be_blocking_call_count_++;
#endif
msg.phandler->OnMessage(&msg);
return;
}
AssertBlockingIsAllowedOnCurrentThread();
Thread* current_thread = Thread::Current();
#if RTC_DCHECK_IS_ON
if (current_thread) {
Add utility to count the number of blocking thread invokes. This is useful to understand how often we block in certain parts of the api and track improvements/regressions. There are two macros, both are only active for RTC_DCHECK_IS_ON builds: * RTC_LOG_THREAD_BLOCK_COUNT() Example: void MyClass::MyFunction() { RTC_LOG_THREAD_BLOCK_COUNT(); thread_->Invoke<void>([this](){ DoStuff(); }); } When executing this function during a test, the output could be: (my_file.cc:2): Blocking MyFunction: total=1 (actual=1, would=0) The words 'actual' and 'would' reflect whether an actual thread switch was made, or if in the case of a test using the same thread for more than one role (e.g. signaling, worker, network are all the same thread) that an actual thread switch did not occur but it would have occurred in the case of having dedicated threads. The 'total' count is the sum. * RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(x) Example: void MyClass::MyFunction() { RTC_LOG_THREAD_BLOCK_COUNT(); thread_->Invoke<void>([this](){ DoStuff(); }); thread_->Invoke<void>([this](){ MoreStuff(); }); RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(1); } When a function is known to have blocking calls and we want to not regress from the currently known number of blocking calls, we can use this macro to state that at a certain point in a function, below where RTC_LOG_THREAD_BLOCK_COUNT() is called, there must have occurred no more than |x| (total) blocking calls. If more occur, a DCHECK will hit and print out what the actual number of calls was: # Fatal error in: my_file.cc, line 5 # last system error: 60 # Check failed: blocked_call_count_printer.GetTotalBlockedCallCount() <= 1 (2 vs. 1) Bug: webrtc:12649 Change-Id: Ibac4f85f00b89680601dba54a651eac95a0f45d3 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/213782 Commit-Queue: Tommi <tommi@webrtc.org> Reviewed-by: Harald Alvestrand <hta@webrtc.org> Cr-Commit-Position: refs/heads/master@{#33632}
2021-04-07 10:08:28 +02:00
RTC_DCHECK_RUN_ON(current_thread);
current_thread->blocking_call_count_++;
RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this));
ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread,
this);
}
#endif
// Perhaps down the line we can get rid of this workaround and always require
// current_thread to be valid when Send() is called.
std::unique_ptr<rtc::Event> done_event;
if (!current_thread)
done_event.reset(new rtc::Event());
bool ready = false;
PostTask(webrtc::ToQueuedTask(
[&msg]() mutable { msg.phandler->OnMessage(&msg); },
[this, &ready, current_thread, done = done_event.get()] {
if (current_thread) {
CritScope cs(&crit_);
ready = true;
current_thread->socketserver()->WakeUp();
} else {
done->Set();
}
}));
if (current_thread) {
bool waited = false;
crit_.Enter();
while (!ready) {
crit_.Leave();
current_thread->socketserver()->Wait(kForever, false);
waited = true;
crit_.Enter();
}
crit_.Leave();
// Our Wait loop above may have consumed some WakeUp events for this
// Thread, that weren't relevant to this Send. Losing these WakeUps can
// cause problems for some SocketServers.
//
// Concrete example:
// Win32SocketServer on thread A calls Send on thread B. While processing
// the message, thread B Posts a message to A. We consume the wakeup for
// that Post while waiting for the Send to complete, which means that when
// we exit this loop, we need to issue another WakeUp, or else the Posted
// message won't be processed in a timely manner.
if (waited) {
current_thread->socketserver()->WakeUp();
}
} else {
done_event->Wait(rtc::Event::kForever);
}
}
void Thread::InvokeInternal(const Location& posted_from,
rtc::FunctionView<void()> functor) {
TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file", posted_from.file_name(),
"src_func", posted_from.function_name());
class FunctorMessageHandler : public MessageHandler {
public:
explicit FunctorMessageHandler(rtc::FunctionView<void()> functor)
: functor_(functor) {}
void OnMessage(Message* msg) override { functor_(); }
private:
rtc::FunctionView<void()> functor_;
} handler(functor);
Send(posted_from, &handler);
}
Reland "Make sure that "current" rtc::Thread instances are always current for TaskQueueBase." This reverts commit 28685dc08cb34f756f9200519fba3222ba3a66f2. Reason for revert: Speculative reland after looking into downstream failures. It's possible that carryover state from unrelated tests running in parallel was causing failures. Original change's description: > Revert "Make sure that "current" rtc::Thread instances are always current for TaskQueueBase." > > This reverts commit 46b3bc6c24c233fe41a2401ce6e8eb8204a2d5a8. > > Reason for revert: Speculative revert. Breaks downstream project > > Original change's description: > > Make sure that "current" rtc::Thread instances are always current for TaskQueueBase. > > > > This is a necessary part of fulfilling the TaskQueueBase > > interface. If a thread does not register as the current TQ, yet offers > > the TQ interface, TQ 'current' checks will not work as expected and > > code that relies them (TaskQueueBase::Current() and IsCurrent()) > > will run in unexpected ways. > > > > Bug: webrtc:11572 > > Change-Id: Iab747bc474e74e6ce4f9e914cfd5b0578b19d19c > > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/175080 > > Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org> > > Commit-Queue: Tommi <tommi@webrtc.org> > > Cr-Commit-Position: refs/heads/master@{#31254} > > TBR=mbonadei@webrtc.org,tommi@webrtc.org > > Change-Id: I69ff3355f0ec447b25604bd95fdacbdb4d4f3f27 > No-Presubmit: true > No-Tree-Checks: true > No-Try: true > Bug: webrtc:11572 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/175104 > Reviewed-by: Artem Titov <titovartem@webrtc.org> > Commit-Queue: Artem Titov <titovartem@webrtc.org> > Cr-Commit-Position: refs/heads/master@{#31259} TBR=mbonadei@webrtc.org,tommi@webrtc.org,titovartem@webrtc.org # Not skipping CQ checks because this is a reland. Bug: webrtc:11572 Change-Id: I00c82d99af8e05851769e09cb682b5b73895a6f3 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/175133 Reviewed-by: Tommi <tommi@webrtc.org> Reviewed-by: Artem Titov <titovartem@webrtc.org> Commit-Queue: Tommi <tommi@webrtc.org> Cr-Commit-Position: refs/heads/master@{#31273}
2020-05-15 10:11:56 +02:00
// Called by the ThreadManager when being set as the current thread.
void Thread::EnsureIsCurrentTaskQueue() {
task_queue_registration_ =
std::make_unique<TaskQueueBase::CurrentTaskQueueSetter>(this);
}
// Called by the ThreadManager when being set as the current thread.
void Thread::ClearCurrentTaskQueue() {
task_queue_registration_.reset();
}
void Thread::QueuedTaskHandler::OnMessage(Message* msg) {
RTC_DCHECK(msg);
auto* data = static_cast<ScopedMessageData<webrtc::QueuedTask>*>(msg->pdata);
std::unique_ptr<webrtc::QueuedTask> task(data->Release());
// Thread expects handler to own Message::pdata when OnMessage is called
// Since MessageData is no longer needed, delete it.
delete data;
// QueuedTask interface uses Run return value to communicate who owns the
// task. false means QueuedTask took the ownership.
if (!task->Run())
task.release();
}
void Thread::AllowInvokesToThread(Thread* thread) {
#if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
if (!IsCurrent()) {
PostTask(webrtc::ToQueuedTask(
[thread, this]() { AllowInvokesToThread(thread); }));
return;
}
RTC_DCHECK_RUN_ON(this);
allowed_threads_.push_back(thread);
invoke_policy_enabled_ = true;
#endif
}
void Thread::DisallowAllInvokes() {
#if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
if (!IsCurrent()) {
PostTask(webrtc::ToQueuedTask([this]() { DisallowAllInvokes(); }));
return;
}
RTC_DCHECK_RUN_ON(this);
allowed_threads_.clear();
invoke_policy_enabled_ = true;
#endif
}
Add utility to count the number of blocking thread invokes. This is useful to understand how often we block in certain parts of the api and track improvements/regressions. There are two macros, both are only active for RTC_DCHECK_IS_ON builds: * RTC_LOG_THREAD_BLOCK_COUNT() Example: void MyClass::MyFunction() { RTC_LOG_THREAD_BLOCK_COUNT(); thread_->Invoke<void>([this](){ DoStuff(); }); } When executing this function during a test, the output could be: (my_file.cc:2): Blocking MyFunction: total=1 (actual=1, would=0) The words 'actual' and 'would' reflect whether an actual thread switch was made, or if in the case of a test using the same thread for more than one role (e.g. signaling, worker, network are all the same thread) that an actual thread switch did not occur but it would have occurred in the case of having dedicated threads. The 'total' count is the sum. * RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(x) Example: void MyClass::MyFunction() { RTC_LOG_THREAD_BLOCK_COUNT(); thread_->Invoke<void>([this](){ DoStuff(); }); thread_->Invoke<void>([this](){ MoreStuff(); }); RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(1); } When a function is known to have blocking calls and we want to not regress from the currently known number of blocking calls, we can use this macro to state that at a certain point in a function, below where RTC_LOG_THREAD_BLOCK_COUNT() is called, there must have occurred no more than |x| (total) blocking calls. If more occur, a DCHECK will hit and print out what the actual number of calls was: # Fatal error in: my_file.cc, line 5 # last system error: 60 # Check failed: blocked_call_count_printer.GetTotalBlockedCallCount() <= 1 (2 vs. 1) Bug: webrtc:12649 Change-Id: Ibac4f85f00b89680601dba54a651eac95a0f45d3 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/213782 Commit-Queue: Tommi <tommi@webrtc.org> Reviewed-by: Harald Alvestrand <hta@webrtc.org> Cr-Commit-Position: refs/heads/master@{#33632}
2021-04-07 10:08:28 +02:00
#if RTC_DCHECK_IS_ON
uint32_t Thread::GetBlockingCallCount() const {
RTC_DCHECK_RUN_ON(this);
return blocking_call_count_;
}
uint32_t Thread::GetCouldBeBlockingCallCount() const {
RTC_DCHECK_RUN_ON(this);
return could_be_blocking_call_count_;
}
#endif
// Returns true if no policies added or if there is at least one policy
// that permits invocation to `target` thread.
bool Thread::IsInvokeToThreadAllowed(rtc::Thread* target) {
#if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
RTC_DCHECK_RUN_ON(this);
if (!invoke_policy_enabled_) {
return true;
}
for (const auto* thread : allowed_threads_) {
if (thread == target) {
return true;
}
}
return false;
#else
return true;
#endif
}
void Thread::PostTask(std::unique_ptr<webrtc::QueuedTask> task) {
// Though Post takes MessageData by raw pointer (last parameter), it still
// takes it with ownership.
Post(RTC_FROM_HERE, &queued_task_handler_,
/*id=*/0, new ScopedMessageData<webrtc::QueuedTask>(std::move(task)));
}
void Thread::PostDelayedTask(std::unique_ptr<webrtc::QueuedTask> task,
uint32_t milliseconds) {
// Though PostDelayed takes MessageData by raw pointer (last parameter),
// it still takes it with ownership.
PostDelayed(RTC_FROM_HERE, milliseconds, &queued_task_handler_,
/*id=*/0,
new ScopedMessageData<webrtc::QueuedTask>(std::move(task)));
}
void Thread::Delete() {
Stop();
delete this;
}
bool Thread::IsProcessingMessagesForTesting() {
return (owned_ || IsCurrent()) && !IsQuitting();
}
void Thread::Clear(MessageHandler* phandler,
uint32_t id,
MessageList* removed) {
CritScope cs(&crit_);
ClearInternal(phandler, id, removed);
}
bool Thread::ProcessMessages(int cmsLoop) {
// Using ProcessMessages with a custom clock for testing and a time greater
// than 0 doesn't work, since it's not guaranteed to advance the custom
// clock's time, and may get stuck in an infinite loop.
RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 ||
cmsLoop == kForever);
int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);
int cmsNext = cmsLoop;
while (true) {
#if defined(WEBRTC_MAC)
ScopedAutoReleasePool pool;
#endif
Message msg;
if (!Get(&msg, cmsNext))
return !IsQuitting();
Dispatch(&msg);
if (cmsLoop != kForever) {
cmsNext = static_cast<int>(TimeUntil(msEnd));
if (cmsNext < 0)
return true;
}
}
}
bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager,
bool need_synchronize_access) {
RTC_DCHECK(!IsRunning());
#if defined(WEBRTC_WIN)
if (need_synchronize_access) {
// We explicitly ask for no rights other than synchronization.
// This gives us the best chance of succeeding.
thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId());
if (!thread_) {
RTC_LOG_GLE(LS_ERROR) << "Unable to get handle to thread.";
return false;
}
thread_id_ = GetCurrentThreadId();
}
#elif defined(WEBRTC_POSIX)
thread_ = pthread_self();
#endif
owned_ = false;
thread_manager->SetCurrentThread(this);
return true;
}
bool Thread::IsRunning() {
#if defined(WEBRTC_WIN)
return thread_ != nullptr;
#elif defined(WEBRTC_POSIX)
return thread_ != 0;
#endif
}
// static
MessageHandler* Thread::GetPostTaskMessageHandler() {
// Allocate at first call, never deallocate.
static MessageHandler* handler = new MessageHandlerWithTask;
return handler;
}
AutoThread::AutoThread()
Reland "Refactor rtc_base build targets." This is a reland of 69241a93fb14f6527a26d5c94dde879013012d2a Fix: The problem was related to NO_MAIN_THREAD_WRAPPING, which affects https://source.chromium.org/chromium/chromium/src/+/master:third_party/webrtc/rtc_base/thread.cc;l=257-263;drc=7acc2d9fe3a6e3c4d8881d2bdfc9b8968a724cd5. The original CL didn't attach the definition of the macro NO_MAIN_THREAD_WRAPPING when building for Chromium (which doesn't have to be related to //rtc_base anymore but to //rtc_base:threading). Original change's description: > Refactor rtc_base build targets. > > The "//rtc_base:rtc_base" build target has historically been one of the > biggest targets in the WebRTC build. Big targets are the main source of > circular dependencies and non-API types leakage. > > This CL is a step forward into splitting "//rtc_base:rtc_base" into > smaller targets (as originally started in 2018). > > The only non-automated changes are (like re-wiring the build system): > * The creation of //rtc_base/async_resolver.{h,cc} which allows to > break a circular dependency (is has been extracted from > //rtc_base/net_helpers.{h,cc}). > * The creation of //rtc_base/internal/default_socket_server.{h,cc} to > break another circular dependency. > > Bug: webrtc:9987 > Change-Id: I0c8f5e7efe2c8fd8e6bffa0d6dd2dd494cf3df02 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/196903 > Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org> > Reviewed-by: Harald Alvestrand <hta@webrtc.org> > Cr-Commit-Position: refs/heads/master@{#32941} Bug: webrtc:9987 Change-Id: I7cdf49d2aac8357f1f50f90010bf2c2f62fa19f6 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/202021 Reviewed-by: Niels Moller <nisse@webrtc.org> Reviewed-by: Harald Alvestrand <hta@webrtc.org> Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org> Cr-Commit-Position: refs/heads/master@{#33001}
2021-01-15 10:41:01 +01:00
: Thread(CreateDefaultSocketServer(), /*do_init=*/false) {
if (!ThreadManager::Instance()->CurrentThread()) {
// DoInit registers with ThreadManager. Do that only if we intend to
// be rtc::Thread::Current(), otherwise ProcessAllMessageQueuesInternal will
// post a message to a queue that no running thread is serving.
DoInit();
ThreadManager::Instance()->SetCurrentThread(this);
}
}
AutoThread::~AutoThread() {
Stop();
DoDestroy();
if (ThreadManager::Instance()->CurrentThread() == this) {
ThreadManager::Instance()->SetCurrentThread(nullptr);
}
}
AutoSocketServerThread::AutoSocketServerThread(SocketServer* ss)
: Thread(ss, /*do_init=*/false) {
DoInit();
old_thread_ = ThreadManager::Instance()->CurrentThread();
// Temporarily set the current thread to nullptr so that we can keep checks
// around that catch unintentional pointer overwrites.
rtc::ThreadManager::Instance()->SetCurrentThread(nullptr);
rtc::ThreadManager::Instance()->SetCurrentThread(this);
if (old_thread_) {
ThreadManager::Remove(old_thread_);
}
}
AutoSocketServerThread::~AutoSocketServerThread() {
RTC_DCHECK(ThreadManager::Instance()->CurrentThread() == this);
// Some tests post destroy messages to this thread. To avoid memory
// leaks, we have to process those messages. In particular
// P2PTransportChannelPingTest, relying on the message posted in
// cricket::Connection::Destroy.
ProcessMessages(0);
// Stop and destroy the thread before clearing it as the current thread.
// Sometimes there are messages left in the Thread that will be
// destroyed by DoDestroy, and sometimes the destructors of the message and/or
// its contents rely on this thread still being set as the current thread.
Stop();
DoDestroy();
rtc::ThreadManager::Instance()->SetCurrentThread(nullptr);
rtc::ThreadManager::Instance()->SetCurrentThread(old_thread_);
if (old_thread_) {
ThreadManager::Add(old_thread_);
}
}
} // namespace rtc