Revert of New task queueing primitive for async tasks: TaskQueue. (patchset #8 id:330001 of https://codereview.webrtc.org/1927133004/ )

Reason for revert:
sigh.  Have to revert again as there seems to have have been some change made for pnacl and CrOS.

Original issue's description:
> Reland of New task queueing primitive for async tasks: TaskQueue. (patchset #1 id:1 of https://codereview.webrtc.org/1935483002/ )
>
> New task queueing primitive for async tasks: TaskQueue.
> TaskQueue is a new way to asynchronously execute tasks sequentially
> in a thread safe manner with minimal locking.  The implementation
> uses OS supported APIs to do this that are compatible with async IO
> notifications from things like sockets and files.
>
> This class is a part of rtc_base_approved, so can be used by both
> the webrtc and libjingle parts of the WebRTC library.  Moving forward,
> we can replace rtc::Thread and webrtc::ProcessThread with this implementation.
>
> NOTE: It should not be assumed that all tasks that execute on a TaskQueue,
> run on the same thread.  E.g. on Mac and iOS, we use GCD dispatch queues
> which means that tasks might execute on different threads depending on
> what's the most efficient thing to do.
>
> TBR=perkj@webrtc.org
>
> Committed: https://crrev.com/65d1f2aba216d077c6d22488f03e56984aef1c68
> Cr-Commit-Position: refs/heads/master@{#12737}

TBR=perkj@webrtc.org,phoglund@webrtc.org
# Skipping CQ checks because original CL landed less than 1 days ago.
NOPRESUBMIT=true
NOTREECHECKS=true
NOTRY=true

Review-Url: https://codereview.webrtc.org/1981573002
Cr-Commit-Position: refs/heads/master@{#12738}
This commit is contained in:
tommi 2016-05-13 14:33:31 -07:00 committed by Commit bot
parent 65d1f2aba2
commit 3f90087ce8
11 changed files with 0 additions and 1337 deletions

View File

@ -86,7 +86,6 @@ if (rtc_build_ssl == 0) {
# The subset of rtc_base approved for use outside of libjingle.
static_library("rtc_base_approved") {
defines = []
deps = []
configs += [ "..:common_config" ]
public_configs = [ "..:common_inherited_config" ]
@ -148,12 +147,6 @@ static_library("rtc_base_approved") {
"swap_queue.h",
"systeminfo.cc",
"systeminfo.h",
"task_queue.h",
"task_queue_gcd.cc",
"task_queue_libevent.cc",
"task_queue_posix.cc",
"task_queue_posix.h",
"task_queue_win.cc",
"template_util.h",
"thread_annotations.h",
"thread_checker.h",
@ -179,19 +172,6 @@ static_library("rtc_base_approved") {
"logging_mac.mm",
]
}
if (!is_win && !is_mac && !is_ios) {
deps += [ "//base/third_party/libevent" ]
defines += [ "WEBRTC_BUILD_LIBEVENT" ]
}
if (is_mac || is_ios || is_win) {
sources -= [ "task_queue_libevent.cc" ]
}
if (is_linux || is_android || is_win) {
sources -= [ "task_queue_gcd.cc" ]
}
}
static_library("rtc_base") {

View File

@ -84,12 +84,6 @@
'swap_queue.h',
'systeminfo.cc',
'systeminfo.h',
'task_queue.h',
'task_queue_libevent.cc',
'task_queue_gcd.cc',
'task_queue_posix.cc',
'task_queue_posix.h',
'task_queue_win.cc',
'template_util.h',
'thread_annotations.h',
'thread_checker.h',
@ -117,19 +111,6 @@
'logging.h',
'logging_mac.mm',
],
'conditions': [
['build_libevent==1', {
'dependencies': [
'<(DEPTH)/base/third_party/libevent/libevent.gyp:libevent',
],
}],
],
}],
['build_libevent!=1', {
'sources!': [ 'task_queue_libevent.cc' ],
}],
['build_libevent==1 or OS=="linux" or OS=="android" or OS=="win"', {
'sources!': [ 'task_queue_gcd.cc' ],
}],
['OS=="mac" and build_with_chromium==0', {
'all_dependent_settings': {

View File

@ -108,7 +108,6 @@
'swap_queue_unittest.cc',
# TODO(ronghuawu): Reenable this test.
# 'systeminfo_unittest.cc',
'task_queue_unittest.cc',
'task_unittest.cc',
'testclient_unittest.cc',
'thread_checker_unittest.cc',

View File

@ -1,277 +0,0 @@
/*
* Copyright 2016 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef WEBRTC_BASE_TASK_QUEUE_H_
#define WEBRTC_BASE_TASK_QUEUE_H_
#include <list>
#include <memory>
#if defined(WEBRTC_MAC) && !defined(WEBRTC_BUILD_LIBEVENT)
#include <dispatch/dispatch.h>
#endif
#include "webrtc/base/constructormagic.h"
#include "webrtc/base/criticalsection.h"
#if defined(WEBRTC_WIN) || defined(WEBRTC_BUILD_LIBEVENT)
#include "webrtc/base/platform_thread.h"
#endif
#if defined(WEBRTC_BUILD_LIBEVENT)
struct event_base;
struct event;
#endif
namespace rtc {
// Base interface for asynchronously executed tasks.
// The interface basically consists of a single function, Run(), that executes
// on the target queue. For more details see the Run() method and TaskQueue.
class QueuedTask {
public:
QueuedTask() {}
virtual ~QueuedTask() {}
// Main routine that will run when the task is executed on the desired queue.
// The task should return |true| to indicate that it should be deleted or
// |false| to indicate that the queue should consider ownership of the task
// having been transferred. Returning |false| can be useful if a task has
// re-posted itself to a different queue or is otherwise being re-used.
virtual bool Run() = 0;
private:
RTC_DISALLOW_COPY_AND_ASSIGN(QueuedTask);
};
// Simple implementation of QueuedTask for use with rtc::Bind and lambdas.
template <class Closure>
class ClosureTask : public QueuedTask {
public:
explicit ClosureTask(const Closure& closure) : closure_(closure) {}
private:
bool Run() override {
closure_();
return true;
}
Closure closure_;
};
// Extends ClosureTask to also allow specifying cleanup code.
// This is useful when using lambdas if guaranteeing cleanup, even if a task
// was dropped (queue is too full), is required.
template <class Closure, class Cleanup>
class ClosureTaskWithCleanup : public ClosureTask<Closure> {
public:
ClosureTaskWithCleanup(const Closure& closure, Cleanup cleanup)
: ClosureTask<Closure>(closure), cleanup_(cleanup) {}
~ClosureTaskWithCleanup() { cleanup_(); }
private:
Cleanup cleanup_;
};
// Convenience function to construct closures that can be passed directly
// to methods that support std::unique_ptr<QueuedTask> but not template
// based parameters.
template <class Closure>
static std::unique_ptr<QueuedTask> NewClosure(const Closure& closure) {
return std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure));
}
template <class Closure, class Cleanup>
static std::unique_ptr<QueuedTask> NewClosure(const Closure& closure,
const Cleanup& cleanup) {
return std::unique_ptr<QueuedTask>(
new ClosureTaskWithCleanup<Closure, Cleanup>(closure, cleanup));
}
// Implements a task queue that asynchronously executes tasks in a way that
// guarantees that they're executed in FIFO order and that tasks never overlap.
// Tasks may always execute on the same worker thread and they may not.
// To DCHECK that tasks are executing on a known task queue, use IsCurrent().
//
// Here are some usage examples:
//
// 1) Asynchronously running a lambda:
//
// class MyClass {
// ...
// TaskQueue queue_("MyQueue");
// };
//
// void MyClass::StartWork() {
// queue_.PostTask([]() { Work(); });
// ...
//
// 2) Doing work asynchronously on a worker queue and providing a notification
// callback on the current queue, when the work has been done:
//
// void MyClass::StartWorkAndLetMeKnowWhenDone(
// std::unique_ptr<QueuedTask> callback) {
// DCHECK(TaskQueue::Current()) << "Need to be running on a queue";
// queue_.PostTaskAndReply([]() { Work(); }, std::move(callback));
// }
// ...
// my_class->StartWorkAndLetMeKnowWhenDone(
// NewClosure([]() { LOG(INFO) << "The work is done!";}));
//
// 3) Posting a custom task on a timer. The task posts itself again after
// every running:
//
// class TimerTask : public QueuedTask {
// public:
// TimerTask() {}
// private:
// bool Run() override {
// ++count_;
// TaskQueue::Current()->PostDelayedTask(
// std::unique_ptr<QueuedTask>(this), 1000);
// // Ownership has been transferred to the next occurance,
// // so return false to prevent from being deleted now.
// return false;
// }
// int count_ = 0;
// };
// ...
// queue_.PostDelayedTask(
// std::unique_ptr<QueuedTask>(new TimerTask()), 1000);
//
// For more examples, see task_queue_unittests.cc.
//
// A note on destruction:
//
// When a TaskQueue is deleted, pending tasks will not be executed but they will
// be deleted. The deletion of tasks may happen asynchronously after the
// TaskQueue itself has been deleted or it may happen synchronously while the
// TaskQueue instance is being deleted. This may vary from one OS to the next
// so assumptions about lifetimes of pending tasks should not be made.
class TaskQueue {
public:
explicit TaskQueue(const char* queue_name);
// TODO(tommi): Implement move semantics?
~TaskQueue();
static TaskQueue* Current();
// Used for DCHECKing the current queue.
static bool IsCurrent(const char* queue_name);
bool IsCurrent() const;
// TODO(tommi): For better debuggability, implement FROM_HERE.
// Ownership of the task is passed to PostTask.
void PostTask(std::unique_ptr<QueuedTask> task);
void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply,
TaskQueue* reply_queue);
void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply);
void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
template <class Closure>
void PostTask(const Closure& closure) {
PostTask(std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure)));
}
template <class Closure>
void PostDelayedTask(const Closure& closure, uint32_t milliseconds) {
PostDelayedTask(
std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure)),
milliseconds);
}
template <class Closure1, class Closure2>
void PostTaskAndReply(const Closure1& task,
const Closure2& reply,
TaskQueue* reply_queue) {
PostTaskAndReply(
std::unique_ptr<QueuedTask>(new ClosureTask<Closure1>(task)),
std::unique_ptr<QueuedTask>(new ClosureTask<Closure2>(reply)),
reply_queue);
}
template <class Closure>
void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
const Closure& reply) {
PostTaskAndReply(std::move(task), std::unique_ptr<QueuedTask>(
new ClosureTask<Closure>(reply)));
}
template <class Closure>
void PostTaskAndReply(const Closure& task,
std::unique_ptr<QueuedTask> reply) {
PostTaskAndReply(
std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(task)),
std::move(reply));
}
template <class Closure1, class Closure2>
void PostTaskAndReply(const Closure1& task, const Closure2& reply) {
PostTaskAndReply(
std::unique_ptr<QueuedTask>(new ClosureTask<Closure1>(task)),
std::unique_ptr<QueuedTask>(new ClosureTask<Closure2>(reply)));
}
private:
#if defined(WEBRTC_BUILD_LIBEVENT)
static bool ThreadMain(void* context);
static void OnWakeup(int socket, short flags, void* context); // NOLINT
static void RunTask(int fd, short flags, void* context); // NOLINT
static void RunTimer(int fd, short flags, void* context); // NOLINT
class PostAndReplyTask;
class SetTimerTask;
void PrepareReplyTask(PostAndReplyTask* reply_task);
void ReplyTaskDone(PostAndReplyTask* reply_task);
struct QueueContext;
int wakeup_pipe_in_ = -1;
int wakeup_pipe_out_ = -1;
event_base* event_base_;
std::unique_ptr<event> wakeup_event_;
PlatformThread thread_;
rtc::CriticalSection pending_lock_;
std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
std::list<PostAndReplyTask*> pending_replies_ GUARDED_BY(pending_lock_);
#elif defined(WEBRTC_MAC)
struct QueueContext;
struct TaskContext;
struct PostTaskAndReplyContext;
dispatch_queue_t queue_;
QueueContext* const context_;
#elif defined(WEBRTC_WIN)
static bool ThreadMain(void* context);
class WorkerThread : public PlatformThread {
public:
WorkerThread(ThreadRunFunction func, void* obj, const char* thread_name)
: PlatformThread(func, obj, thread_name) {}
bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) {
return PlatformThread::QueueAPC(apc_function, data);
}
};
WorkerThread thread_;
#else
#error not supported.
#endif
RTC_DISALLOW_COPY_AND_ASSIGN(TaskQueue);
};
} // namespace rtc
#endif // WEBRTC_BASE_TASK_QUEUE_H_

View File

@ -1,167 +0,0 @@
/*
* Copyright 2016 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
// This file contains the implementation of TaskQueue for Mac and iOS.
// The implementation uses Grand Central Dispatch queues (GCD) to
// do the actual task queuing.
#include "webrtc/base/task_queue.h"
#include <string.h>
#include "webrtc/base/checks.h"
#include "webrtc/base/logging.h"
#include "webrtc/base/task_queue_posix.h"
namespace rtc {
using internal::GetQueuePtrTls;
using internal::AutoSetCurrentQueuePtr;
struct TaskQueue::QueueContext {
explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
static void SetNotActive(void* context) {
QueueContext* qc = static_cast<QueueContext*>(context);
qc->is_active = false;
}
static void DeleteContext(void* context) {
QueueContext* qc = static_cast<QueueContext*>(context);
delete qc;
}
TaskQueue* const queue;
bool is_active;
};
struct TaskQueue::TaskContext {
TaskContext(QueueContext* queue_ctx, std::unique_ptr<QueuedTask> task)
: queue_ctx(queue_ctx), task(std::move(task)) {}
virtual ~TaskContext() {}
static void RunTask(void* context) {
std::unique_ptr<TaskContext> tc(static_cast<TaskContext*>(context));
if (tc->queue_ctx->is_active) {
AutoSetCurrentQueuePtr set_current(tc->queue_ctx->queue);
if (!tc->task->Run())
tc->task.release();
}
}
QueueContext* const queue_ctx;
std::unique_ptr<QueuedTask> task;
};
// Special case context for holding two tasks, a |first_task| + the task
// that's owned by the parent struct, TaskContext, that then becomes the
// second (i.e. 'reply') task.
struct TaskQueue::PostTaskAndReplyContext : public TaskQueue::TaskContext {
explicit PostTaskAndReplyContext(QueueContext* first_queue_ctx,
std::unique_ptr<QueuedTask> first_task,
QueueContext* second_queue_ctx,
std::unique_ptr<QueuedTask> second_task)
: TaskContext(second_queue_ctx, std::move(second_task)),
first_queue_ctx(first_queue_ctx),
first_task(std::move(first_task)) {
// Retain the reply queue for as long as this object lives.
// If we don't, we may have memory leaks and/or failures.
dispatch_retain(first_queue_ctx->queue->queue_);
}
~PostTaskAndReplyContext() override {
dispatch_release(first_queue_ctx->queue->queue_);
}
static void RunTask(void* context) {
auto* rc = static_cast<PostTaskAndReplyContext*>(context);
if (rc->first_queue_ctx->is_active) {
AutoSetCurrentQueuePtr set_current(rc->first_queue_ctx->queue);
if (!rc->first_task->Run())
rc->first_task.release();
}
// Post the reply task. This hands the work over to the parent struct.
// This task will eventually delete |this|.
dispatch_async_f(rc->queue_ctx->queue->queue_, rc, &TaskContext::RunTask);
}
QueueContext* const first_queue_ctx;
std::unique_ptr<QueuedTask> first_task;
};
TaskQueue::TaskQueue(const char* queue_name)
: queue_(dispatch_queue_create(queue_name, DISPATCH_QUEUE_SERIAL)),
context_(new QueueContext(this)) {
RTC_DCHECK(queue_name);
RTC_CHECK(queue_);
dispatch_set_context(queue_, context_);
// Assign a finalizer that will delete the context when the last reference
// to the queue is released. This may run after the TaskQueue object has
// been deleted.
dispatch_set_finalizer_f(queue_, &QueueContext::DeleteContext);
}
TaskQueue::~TaskQueue() {
RTC_DCHECK(!IsCurrent());
// Implementation/behavioral note:
// Dispatch queues are reference counted via calls to dispatch_retain and
// dispatch_release. Pending blocks submitted to a queue also hold a
// reference to the queue until they have finished. Once all references to a
// queue have been released, the queue will be deallocated by the system.
// This is why we check the context before running tasks.
// Use dispatch_sync to set the context to null to guarantee that there's not
// a race between checking the context and using it from a task.
dispatch_sync_f(queue_, context_, &QueueContext::SetNotActive);
dispatch_release(queue_);
}
// static
TaskQueue* TaskQueue::Current() {
return static_cast<TaskQueue*>(pthread_getspecific(GetQueuePtrTls()));
}
// static
bool TaskQueue::IsCurrent(const char* queue_name) {
TaskQueue* current = Current();
return current &&
strcmp(queue_name, dispatch_queue_get_label(current->queue_)) == 0;
}
bool TaskQueue::IsCurrent() const {
RTC_DCHECK(queue_);
return this == Current();
}
void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
auto* context = new TaskContext(context_, std::move(task));
dispatch_async_f(queue_, context, &TaskContext::RunTask);
}
void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) {
auto* context = new TaskContext(context_, std::move(task));
dispatch_after_f(
dispatch_time(DISPATCH_TIME_NOW, milliseconds * NSEC_PER_MSEC), queue_,
context, &TaskContext::RunTask);
}
void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply,
TaskQueue* reply_queue) {
auto* context = new PostTaskAndReplyContext(
context_, std::move(task), reply_queue->context_, std::move(reply));
dispatch_async_f(queue_, context, &PostTaskAndReplyContext::RunTask);
}
void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply) {
return PostTaskAndReply(std::move(task), std::move(reply), Current());
}
} // namespace rtc

View File

@ -1,318 +0,0 @@
/*
* Copyright 2016 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "webrtc/base/task_queue.h"
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#include "base/third_party/libevent/event.h"
#include "webrtc/base/checks.h"
#include "webrtc/base/logging.h"
#include "webrtc/base/task_queue_posix.h"
#include "webrtc/base/timeutils.h"
namespace rtc {
using internal::GetQueuePtrTls;
using internal::AutoSetCurrentQueuePtr;
namespace {
static const char kQuit = 1;
static const char kRunTask = 2;
struct TimerEvent {
explicit TimerEvent(std::unique_ptr<QueuedTask> task)
: task(std::move(task)) {}
~TimerEvent() { event_del(&ev); }
event ev;
std::unique_ptr<QueuedTask> task;
};
bool SetNonBlocking(int fd) {
const int flags = fcntl(fd, F_GETFL);
RTC_CHECK(flags != -1);
return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
}
} // namespace
struct TaskQueue::QueueContext {
explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
TaskQueue* queue;
bool is_active;
// Holds a list of events pending timers for cleanup when the loop exits.
std::list<TimerEvent*> pending_timers_;
};
class TaskQueue::PostAndReplyTask : public QueuedTask {
public:
PostAndReplyTask(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply,
TaskQueue* reply_queue)
: task_(std::move(task)),
reply_(std::move(reply)),
reply_queue_(reply_queue) {
reply_queue->PrepareReplyTask(this);
}
~PostAndReplyTask() override {
CritScope lock(&lock_);
if (reply_queue_)
reply_queue_->ReplyTaskDone(this);
}
void OnReplyQueueGone() {
CritScope lock(&lock_);
reply_queue_ = nullptr;
}
private:
bool Run() override {
if (!task_->Run())
task_.release();
CritScope lock(&lock_);
if (reply_queue_)
reply_queue_->PostTask(std::move(reply_));
return true;
}
CriticalSection lock_;
std::unique_ptr<QueuedTask> task_;
std::unique_ptr<QueuedTask> reply_;
TaskQueue* reply_queue_ GUARDED_BY(lock_);
};
class TaskQueue::SetTimerTask : public QueuedTask {
public:
SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
: task_(std::move(task)),
milliseconds_(milliseconds),
posted_(Time32()) {}
private:
bool Run() override {
// Compensate for the time that has passed since construction
// and until we got here.
uint32_t post_time = Time32() - posted_;
TaskQueue::Current()->PostDelayedTask(
std::move(task_),
post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
return true;
}
std::unique_ptr<QueuedTask> task_;
const uint32_t milliseconds_;
const uint32_t posted_;
};
TaskQueue::TaskQueue(const char* queue_name)
: event_base_(event_base_new()),
wakeup_event_(new event()),
thread_(&TaskQueue::ThreadMain, this, queue_name) {
RTC_DCHECK(queue_name);
int fds[2];
RTC_CHECK(pipe(fds) == 0);
SetNonBlocking(fds[0]);
SetNonBlocking(fds[1]);
wakeup_pipe_out_ = fds[0];
wakeup_pipe_in_ = fds[1];
event_set(wakeup_event_.get(), wakeup_pipe_out_, EV_READ | EV_PERSIST,
OnWakeup, this);
event_base_set(event_base_, wakeup_event_.get());
event_add(wakeup_event_.get(), 0);
thread_.Start();
}
TaskQueue::~TaskQueue() {
RTC_DCHECK(!IsCurrent());
struct timespec ts;
char message = kQuit;
while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
// The queue is full, so we have no choice but to wait and retry.
RTC_CHECK_EQ(EAGAIN, errno);
ts.tv_sec = 0;
ts.tv_nsec = 1000000;
nanosleep(&ts, nullptr);
}
thread_.Stop();
event_del(wakeup_event_.get());
close(wakeup_pipe_in_);
close(wakeup_pipe_out_);
wakeup_pipe_in_ = -1;
wakeup_pipe_out_ = -1;
{
// Synchronize against any pending reply tasks that might be running on
// other queues.
CritScope lock(&pending_lock_);
for (auto* reply : pending_replies_)
reply->OnReplyQueueGone();
pending_replies_.clear();
}
event_base_free(event_base_);
}
// static
TaskQueue* TaskQueue::Current() {
QueueContext* ctx =
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
return ctx ? ctx->queue : nullptr;
}
// static
bool TaskQueue::IsCurrent(const char* queue_name) {
TaskQueue* current = Current();
return current && current->thread_.name().compare(queue_name) == 0;
}
bool TaskQueue::IsCurrent() const {
return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
}
void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
RTC_DCHECK(task.get());
// libevent isn't thread safe. This means that we can't use methods such
// as event_base_once to post tasks to the worker thread from a different
// thread. However, we can use it when posting from the worker thread itself.
if (IsCurrent()) {
if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
task.get(), nullptr) == 0) {
task.release();
}
} else {
QueuedTask* task_id = task.get(); // Only used for comparison.
{
CritScope lock(&pending_lock_);
pending_.push_back(std::move(task));
}
char message = kRunTask;
if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
LOG(WARNING) << "Failed to queue task.";
CritScope lock(&pending_lock_);
pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
return t.get() == task_id;
});
}
}
}
void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) {
if (IsCurrent()) {
TimerEvent* timer = new TimerEvent(std::move(task));
evtimer_set(&timer->ev, &TaskQueue::RunTimer, timer);
event_base_set(event_base_, &timer->ev);
QueueContext* ctx =
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
ctx->pending_timers_.push_back(timer);
timeval tv = {milliseconds / 1000, (milliseconds % 1000) * 1000};
event_add(&timer->ev, &tv);
} else {
PostTask(std::unique_ptr<QueuedTask>(
new SetTimerTask(std::move(task), milliseconds)));
}
}
void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply,
TaskQueue* reply_queue) {
std::unique_ptr<QueuedTask> wrapper_task(
new PostAndReplyTask(std::move(task), std::move(reply), reply_queue));
PostTask(std::move(wrapper_task));
}
void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply) {
return PostTaskAndReply(std::move(task), std::move(reply), Current());
}
// static
bool TaskQueue::ThreadMain(void* context) {
TaskQueue* me = static_cast<TaskQueue*>(context);
QueueContext queue_context(me);
pthread_setspecific(GetQueuePtrTls(), &queue_context);
while (queue_context.is_active)
event_base_loop(me->event_base_, 0);
pthread_setspecific(GetQueuePtrTls(), nullptr);
for (TimerEvent* timer : queue_context.pending_timers_)
delete timer;
return false;
}
// static
void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
QueueContext* ctx =
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
char buf;
RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
switch (buf) {
case kQuit:
ctx->is_active = false;
event_base_loopbreak(ctx->queue->event_base_);
break;
case kRunTask: {
std::unique_ptr<QueuedTask> task;
{
CritScope lock(&ctx->queue->pending_lock_);
RTC_DCHECK(!ctx->queue->pending_.empty());
task = std::move(ctx->queue->pending_.front());
ctx->queue->pending_.pop_front();
RTC_DCHECK(task.get());
}
if (!task->Run())
task.release();
break;
}
default:
RTC_NOTREACHED();
break;
}
}
// static
void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT
auto* task = static_cast<QueuedTask*>(context);
if (task->Run())
delete task;
}
// static
void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
TimerEvent* timer = static_cast<TimerEvent*>(context);
if (!timer->task->Run())
timer->task.release();
QueueContext* ctx =
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
ctx->pending_timers_.remove(timer);
delete timer;
}
void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) {
RTC_DCHECK(reply_task);
CritScope lock(&pending_lock_);
pending_replies_.push_back(reply_task);
}
void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) {
CritScope lock(&pending_lock_);
pending_replies_.remove(reply_task);
}
} // namespace rtc

View File

@ -1,40 +0,0 @@
/*
* Copyright 2016 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "webrtc/base/task_queue_posix.h"
#include "webrtc/base/checks.h"
#include "webrtc/base/task_queue.h"
namespace rtc {
namespace internal {
pthread_key_t g_queue_ptr_tls = 0;
void InitializeTls() {
RTC_CHECK(pthread_key_create(&g_queue_ptr_tls, nullptr) == 0);
}
pthread_key_t GetQueuePtrTls() {
static pthread_once_t init_once = PTHREAD_ONCE_INIT;
RTC_CHECK(pthread_once(&init_once, &InitializeTls) == 0);
return g_queue_ptr_tls;
}
AutoSetCurrentQueuePtr::AutoSetCurrentQueuePtr(TaskQueue* q)
: prev_(TaskQueue::Current()) {
pthread_setspecific(GetQueuePtrTls(), q);
}
AutoSetCurrentQueuePtr::~AutoSetCurrentQueuePtr() {
pthread_setspecific(GetQueuePtrTls(), prev_);
}
} // namespace internal
} // namespace rtc

View File

@ -1,36 +0,0 @@
/*
* Copyright 2016 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef WEBRTC_BASE_TASK_QUEUE_POSIX_H_
#define WEBRTC_BASE_TASK_QUEUE_POSIX_H_
#include <pthread.h>
namespace rtc {
class TaskQueue;
namespace internal {
class AutoSetCurrentQueuePtr {
public:
explicit AutoSetCurrentQueuePtr(TaskQueue* q);
~AutoSetCurrentQueuePtr();
private:
TaskQueue* const prev_;
};
pthread_key_t GetQueuePtrTls();
} // namespace internal
} // namespace rtc
#endif // WEBRTC_BASE_TASK_QUEUE_POSIX_H_

View File

@ -1,261 +0,0 @@
/*
* Copyright 2016 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include <memory>
#include <vector>
#include "webrtc/base/bind.h"
#include "webrtc/base/event.h"
#include "webrtc/base/gunit.h"
#include "webrtc/base/task_queue.h"
#include "webrtc/base/timeutils.h"
namespace rtc {
namespace {
void CheckCurrent(const char* expected_queue, Event* signal, TaskQueue* queue) {
EXPECT_TRUE(TaskQueue::IsCurrent(expected_queue));
EXPECT_TRUE(queue->IsCurrent());
if (signal)
signal->Set();
}
} // namespace
TEST(TaskQueueTest, Construct) {
static const char kQueueName[] = "Construct";
TaskQueue queue(kQueueName);
EXPECT_FALSE(queue.IsCurrent());
}
TEST(TaskQueueTest, PostAndCheckCurrent) {
static const char kQueueName[] = "PostAndCheckCurrent";
TaskQueue queue(kQueueName);
// We're not running a task, so there shouldn't be a current queue.
EXPECT_FALSE(queue.IsCurrent());
EXPECT_FALSE(TaskQueue::Current());
Event event(false, false);
queue.PostTask(Bind(&CheckCurrent, kQueueName, &event, &queue));
EXPECT_TRUE(event.Wait(1000));
}
TEST(TaskQueueTest, PostCustomTask) {
static const char kQueueName[] = "PostCustomImplementation";
TaskQueue queue(kQueueName);
Event event(false, false);
class CustomTask : public QueuedTask {
public:
explicit CustomTask(Event* event) : event_(event) {}
private:
bool Run() override {
event_->Set();
return false; // Never allows the task to be deleted by the queue.
}
Event* const event_;
} my_task(&event);
// Please don't do this in production code! :)
queue.PostTask(std::unique_ptr<QueuedTask>(&my_task));
EXPECT_TRUE(event.Wait(1000));
}
TEST(TaskQueueTest, PostLambda) {
static const char kQueueName[] = "PostLambda";
TaskQueue queue(kQueueName);
Event event(false, false);
queue.PostTask([&event]() { event.Set(); });
EXPECT_TRUE(event.Wait(1000));
}
TEST(TaskQueueTest, PostFromQueue) {
static const char kQueueName[] = "PostFromQueue";
TaskQueue queue(kQueueName);
Event event(false, false);
queue.PostTask(
[&event, &queue]() { queue.PostTask([&event]() { event.Set(); }); });
EXPECT_TRUE(event.Wait(1000));
}
TEST(TaskQueueTest, PostDelayed) {
static const char kQueueName[] = "PostDelayed";
TaskQueue queue(kQueueName);
Event event(false, false);
uint32_t start = Time();
queue.PostDelayedTask(Bind(&CheckCurrent, kQueueName, &event, &queue), 100);
EXPECT_TRUE(event.Wait(1000));
uint32_t end = Time();
EXPECT_GE(end - start, 100u);
EXPECT_NEAR(end - start, 200u, 100u); // Accept 100-300.
}
TEST(TaskQueueTest, PostMultipleDelayed) {
static const char kQueueName[] = "PostMultipleDelayed";
TaskQueue queue(kQueueName);
std::vector<std::unique_ptr<Event>> events;
for (int i = 0; i < 10; ++i) {
events.push_back(std::unique_ptr<Event>(new Event(false, false)));
queue.PostDelayedTask(
Bind(&CheckCurrent, kQueueName, events.back().get(), &queue), 10);
}
for (const auto& e : events)
EXPECT_TRUE(e->Wait(100));
}
TEST(TaskQueueTest, PostDelayedAfterDestruct) {
static const char kQueueName[] = "PostDelayedAfterDestruct";
Event event(false, false);
{
TaskQueue queue(kQueueName);
queue.PostDelayedTask(Bind(&CheckCurrent, kQueueName, &event, &queue), 100);
}
EXPECT_FALSE(event.Wait(200)); // Task should not run.
}
TEST(TaskQueueTest, PostAndReply) {
static const char kPostQueue[] = "PostQueue";
static const char kReplyQueue[] = "ReplyQueue";
TaskQueue post_queue(kPostQueue);
TaskQueue reply_queue(kReplyQueue);
Event event(false, false);
post_queue.PostTaskAndReply(
Bind(&CheckCurrent, kPostQueue, nullptr, &post_queue),
Bind(&CheckCurrent, kReplyQueue, &event, &reply_queue), &reply_queue);
EXPECT_TRUE(event.Wait(1000));
}
TEST(TaskQueueTest, PostAndReuse) {
static const char kPostQueue[] = "PostQueue";
static const char kReplyQueue[] = "ReplyQueue";
TaskQueue post_queue(kPostQueue);
TaskQueue reply_queue(kReplyQueue);
int call_count = 0;
class ReusedTask : public QueuedTask {
public:
ReusedTask(int* counter, TaskQueue* reply_queue, Event* event)
: counter_(counter), reply_queue_(reply_queue), event_(event) {
EXPECT_EQ(0, *counter_);
}
private:
bool Run() override {
if (++(*counter_) == 1) {
std::unique_ptr<QueuedTask> myself(this);
reply_queue_->PostTask(std::move(myself));
// At this point, the object is owned by reply_queue_ and it's
// theoratically possible that the object has been deleted (e.g. if
// posting wasn't possible). So, don't touch any member variables here.
// Indicate to the current queue that ownership has been transferred.
return false;
} else {
EXPECT_EQ(2, *counter_);
EXPECT_TRUE(reply_queue_->IsCurrent());
event_->Set();
return true; // Indicate that the object should be deleted.
}
}
int* const counter_;
TaskQueue* const reply_queue_;
Event* const event_;
};
Event event(false, false);
std::unique_ptr<QueuedTask> task(
new ReusedTask(&call_count, &reply_queue, &event));
post_queue.PostTask(std::move(task));
EXPECT_TRUE(event.Wait(1000));
}
TEST(TaskQueueTest, PostAndReplyLambda) {
static const char kPostQueue[] = "PostQueue";
static const char kReplyQueue[] = "ReplyQueue";
TaskQueue post_queue(kPostQueue);
TaskQueue reply_queue(kReplyQueue);
Event event(false, false);
bool my_flag = false;
post_queue.PostTaskAndReply([&my_flag]() { my_flag = true; },
[&event]() { event.Set(); }, &reply_queue);
EXPECT_TRUE(event.Wait(1000));
EXPECT_TRUE(my_flag);
}
void TestPostTaskAndReply(TaskQueue* work_queue,
const char* work_queue_name,
Event* event) {
ASSERT_FALSE(work_queue->IsCurrent());
work_queue->PostTaskAndReply(
Bind(&CheckCurrent, work_queue_name, nullptr, work_queue),
NewClosure([event]() { event->Set(); }));
}
// Does a PostTaskAndReply from within a task to post and reply to the current
// queue. All in all there will be 3 tasks posted and run.
TEST(TaskQueueTest, PostAndReply2) {
static const char kQueueName[] = "PostAndReply2";
static const char kWorkQueueName[] = "PostAndReply2_Worker";
TaskQueue queue(kQueueName);
TaskQueue work_queue(kWorkQueueName);
Event event(false, false);
queue.PostTask(
Bind(&TestPostTaskAndReply, &work_queue, kWorkQueueName, &event));
EXPECT_TRUE(event.Wait(1000));
}
// Tests posting more messages than a queue can queue up.
// In situations like that, tasks will get dropped.
TEST(TaskQueueTest, PostALot) {
// To destruct the event after the queue has gone out of scope.
Event event(false, false);
int tasks_executed = 0;
int tasks_cleaned_up = 0;
static const int kTaskCount = 0xffff;
{
static const char kQueueName[] = "PostALot";
TaskQueue queue(kQueueName);
// On linux, the limit of pending bytes in the pipe buffer is 0xffff.
// So here we post a total of 0xffff+1 messages, which triggers a failure
// case inside of the libevent queue implementation.
queue.PostTask([&event]() { event.Wait(Event::kForever); });
for (int i = 0; i < kTaskCount; ++i)
queue.PostTask(NewClosure([&tasks_executed]() { ++tasks_executed; },
[&tasks_cleaned_up]() { ++tasks_cleaned_up; }));
event.Set(); // Unblock the first task.
}
EXPECT_GE(tasks_cleaned_up, tasks_executed);
EXPECT_EQ(kTaskCount, tasks_cleaned_up);
LOG(INFO) << "tasks executed: " << tasks_executed
<< ", tasks cleaned up: " << tasks_cleaned_up;
}
} // namespace rtc

View File

@ -1,184 +0,0 @@
/*
* Copyright 2016 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "webrtc/base/task_queue.h"
#include <string.h>
#include <unordered_map>
#include "webrtc/base/checks.h"
#include "webrtc/base/logging.h"
namespace rtc {
namespace {
#define WM_RUN_TASK WM_USER + 1
#define WM_QUEUE_DELAYED_TASK WM_USER + 2
DWORD g_queue_ptr_tls = 0;
BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) {
g_queue_ptr_tls = TlsAlloc();
return TRUE;
}
DWORD GetQueuePtrTls() {
static INIT_ONCE init_once = INIT_ONCE_STATIC_INIT;
InitOnceExecuteOnce(&init_once, InitializeTls, nullptr, nullptr);
return g_queue_ptr_tls;
}
struct ThreadStartupData {
Event* started;
void* thread_context;
};
void CALLBACK InitializeQueueThread(ULONG_PTR param) {
MSG msg;
PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
ThreadStartupData* data = reinterpret_cast<ThreadStartupData*>(param);
TlsSetValue(GetQueuePtrTls(), data->thread_context);
data->started->Set();
}
} // namespace
TaskQueue::TaskQueue(const char* queue_name)
: thread_(&TaskQueue::ThreadMain, this, queue_name) {
RTC_DCHECK(queue_name);
thread_.Start();
Event event(false, false);
ThreadStartupData startup = {&event, this};
RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread,
reinterpret_cast<ULONG_PTR>(&startup)));
event.Wait(Event::kForever);
}
TaskQueue::~TaskQueue() {
RTC_DCHECK(!IsCurrent());
while (!PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) {
RTC_CHECK(ERROR_NOT_ENOUGH_QUOTA == ::GetLastError());
Sleep(1);
}
thread_.Stop();
}
// static
TaskQueue* TaskQueue::Current() {
return static_cast<TaskQueue*>(TlsGetValue(GetQueuePtrTls()));
}
// static
bool TaskQueue::IsCurrent(const char* queue_name) {
TaskQueue* current = Current();
return current && current->thread_.name().compare(queue_name) == 0;
}
bool TaskQueue::IsCurrent() const {
return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
}
void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
if (PostThreadMessage(thread_.GetThreadRef(), WM_RUN_TASK, 0,
reinterpret_cast<LPARAM>(task.get()))) {
task.release();
}
}
void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) {
WPARAM wparam;
#if defined(_WIN64)
// GetTickCount() returns a fairly coarse tick count (resolution or about 8ms)
// so this compensation isn't that accurate, but since we have unused 32 bits
// on Win64, we might as well use them.
wparam = (static_cast<WPARAM>(::GetTickCount()) << 32) | milliseconds;
#else
wparam = milliseconds;
#endif
if (PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, wparam,
reinterpret_cast<LPARAM>(task.get()))) {
task.release();
}
}
void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply,
TaskQueue* reply_queue) {
QueuedTask* task_ptr = task.release();
QueuedTask* reply_task_ptr = reply.release();
DWORD reply_thread_id = reply_queue->thread_.GetThreadRef();
PostTask([task_ptr, reply_task_ptr, reply_thread_id]() {
if (task_ptr->Run())
delete task_ptr;
// If the thread's message queue is full, we can't queue the task and will
// have to drop it (i.e. delete).
if (!PostThreadMessage(reply_thread_id, WM_RUN_TASK, 0,
reinterpret_cast<LPARAM>(reply_task_ptr))) {
delete reply_task_ptr;
}
});
}
void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply) {
return PostTaskAndReply(std::move(task), std::move(reply), Current());
}
// static
bool TaskQueue::ThreadMain(void* context) {
std::unordered_map<UINT_PTR, std::unique_ptr<QueuedTask>> delayed_tasks;
BOOL ret;
MSG msg;
while ((ret = GetMessage(&msg, nullptr, 0, 0)) != 0 && ret != -1) {
if (!msg.hwnd) {
switch (msg.message) {
case WM_RUN_TASK: {
QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam);
if (task->Run())
delete task;
break;
}
case WM_QUEUE_DELAYED_TASK: {
QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam);
uint32_t milliseconds = msg.wParam & 0xFFFFFFFF;
#if defined(_WIN64)
// Subtract the time it took to queue the timer.
const DWORD now = GetTickCount();
DWORD post_time = now - (msg.wParam >> 32);
milliseconds =
post_time > milliseconds ? 0 : milliseconds - post_time;
#endif
UINT_PTR timer_id = SetTimer(nullptr, 0, milliseconds, nullptr);
delayed_tasks.insert(std::make_pair(timer_id, task));
break;
}
case WM_TIMER: {
KillTimer(nullptr, msg.wParam);
auto found = delayed_tasks.find(msg.wParam);
RTC_DCHECK(found != delayed_tasks.end());
if (!found->second->Run())
found->second.release();
delayed_tasks.erase(found);
break;
}
default:
RTC_NOTREACHED();
break;
}
} else {
TranslateMessage(&msg);
DispatchMessage(&msg);
}
}
return false;
}
} // namespace rtc

View File

@ -41,18 +41,10 @@
'apk_tests_path%': '<(DEPTH)/webrtc/build/apk_tests.gyp',
'modules_java_gyp_path%': '<(DEPTH)/webrtc/modules/modules_java.gyp',
}],
# Controls whether we use libevent on posix platforms.
['OS=="win" or OS=="mac" or OS=="ios"', {
'build_libevent%': 0,
}, {
'build_libevent%': 1,
}],
],
},
'build_with_chromium%': '<(build_with_chromium)',
'build_with_mozilla%': '<(build_with_mozilla)',
'build_libevent%': '<(build_libevent)',
'webrtc_root%': '<(webrtc_root)',
'apk_tests_path%': '<(apk_tests_path)',
'modules_java_gyp_path%': '<(modules_java_gyp_path)',
@ -64,7 +56,6 @@
},
'build_with_chromium%': '<(build_with_chromium)',
'build_with_mozilla%': '<(build_with_mozilla)',
'build_libevent%': '<(build_libevent)',
'webrtc_root%': '<(webrtc_root)',
'apk_tests_path%': '<(apk_tests_path)',
'test_runner_path': '<(DEPTH)/webrtc/build/android/test_runner.py',
@ -327,11 +318,6 @@
}],
],
}],
['build_libevent==1', {
'defines': [
'WEBRTC_BUILD_LIBEVENT',
],
}],
['target_arch=="arm64"', {
'defines': [
'WEBRTC_ARCH_ARM64',