Add support for priorities to TaskQueue.

BUG=webrtc:7216

Review-Url: https://codereview.webrtc.org/2708353003
Cr-Commit-Position: refs/heads/master@{#16834}
This commit is contained in:
tommi 2017-02-24 10:42:14 -08:00 committed by Commit bot
parent 3ba1a8cd1b
commit c9bb7918f6
4 changed files with 79 additions and 9 deletions

View File

@ -161,8 +161,16 @@ static std::unique_ptr<QueuedTask> NewClosure(const Closure& closure,
// so assumptions about lifetimes of pending tasks should not be made. // so assumptions about lifetimes of pending tasks should not be made.
class LOCKABLE TaskQueue { class LOCKABLE TaskQueue {
public: public:
explicit TaskQueue(const char* queue_name); // TaskQueue priority levels. On some platforms these will map to thread
// TODO(tommi): Implement move semantics? // priorities, on others such as Mac and iOS, GCD queue priorities.
enum class Priority {
NORMAL = 0,
HIGH,
LOW,
};
explicit TaskQueue(const char* queue_name,
Priority priority = Priority::NORMAL);
~TaskQueue(); ~TaskQueue();
static TaskQueue* Current(); static TaskQueue* Current();
@ -275,8 +283,11 @@ class LOCKABLE TaskQueue {
class WorkerThread : public PlatformThread { class WorkerThread : public PlatformThread {
public: public:
WorkerThread(ThreadRunFunction func, void* obj, const char* thread_name) WorkerThread(ThreadRunFunction func,
: PlatformThread(func, obj, thread_name) {} void* obj,
const char* thread_name,
ThreadPriority priority)
: PlatformThread(func, obj, thread_name, priority) {}
bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) { bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) {
return PlatformThread::QueueAPC(apc_function, data); return PlatformThread::QueueAPC(apc_function, data);

View File

@ -21,6 +21,22 @@
#include "webrtc/base/task_queue_posix.h" #include "webrtc/base/task_queue_posix.h"
namespace rtc { namespace rtc {
namespace {
using Priority = TaskQueue::Priority;
int TaskQueuePriorityToGCD(Priority priority) {
switch (priority) {
case Priority::NORMAL:
return DISPATCH_QUEUE_PRIORITY_DEFAULT;
case Priority::HIGH:
return DISPATCH_QUEUE_PRIORITY_HIGH;
case Priority::LOW:
return DISPATCH_QUEUE_PRIORITY_LOW;
}
}
}
using internal::GetQueuePtrTls; using internal::GetQueuePtrTls;
using internal::AutoSetCurrentQueuePtr; using internal::AutoSetCurrentQueuePtr;
@ -94,7 +110,7 @@ struct TaskQueue::PostTaskAndReplyContext : public TaskQueue::TaskContext {
dispatch_queue_t reply_queue_; dispatch_queue_t reply_queue_;
}; };
TaskQueue::TaskQueue(const char* queue_name) TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
: queue_(dispatch_queue_create(queue_name, DISPATCH_QUEUE_SERIAL)), : queue_(dispatch_queue_create(queue_name, DISPATCH_QUEUE_SERIAL)),
context_(new QueueContext(this)) { context_(new QueueContext(this)) {
RTC_DCHECK(queue_name); RTC_DCHECK(queue_name);
@ -104,6 +120,9 @@ TaskQueue::TaskQueue(const char* queue_name)
// to the queue is released. This may run after the TaskQueue object has // to the queue is released. This may run after the TaskQueue object has
// been deleted. // been deleted.
dispatch_set_finalizer_f(queue_, &QueueContext::DeleteContext); dispatch_set_finalizer_f(queue_, &QueueContext::DeleteContext);
dispatch_set_target_queue(
queue_, dispatch_get_global_queue(TaskQueuePriorityToGCD(priority), 0));
} }
TaskQueue::~TaskQueue() { TaskQueue::~TaskQueue() {

View File

@ -30,6 +30,8 @@ static const char kQuit = 1;
static const char kRunTask = 2; static const char kRunTask = 2;
static const char kRunReplyTask = 3; static const char kRunReplyTask = 3;
using Priority = TaskQueue::Priority;
// This ignores the SIGPIPE signal on the calling thread. // This ignores the SIGPIPE signal on the calling thread.
// This signal can be fired when trying to write() to a pipe that's being // This signal can be fired when trying to write() to a pipe that's being
// closed or while closing a pipe that's being written to. // closed or while closing a pipe that's being written to.
@ -84,6 +86,21 @@ void EventAssign(struct event* ev,
RTC_CHECK_EQ(0, event_base_set(base, ev)); RTC_CHECK_EQ(0, event_base_set(base, ev));
#endif #endif
} }
ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
switch (priority) {
case Priority::HIGH:
return kRealtimePriority;
case Priority::LOW:
return kLowPriority;
case Priority::NORMAL:
return kNormalPriority;
default:
RTC_NOTREACHED();
break;
}
return kNormalPriority;
}
} // namespace } // namespace
struct TaskQueue::QueueContext { struct TaskQueue::QueueContext {
@ -201,10 +218,13 @@ class TaskQueue::SetTimerTask : public QueuedTask {
const uint32_t posted_; const uint32_t posted_;
}; };
TaskQueue::TaskQueue(const char* queue_name) TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
: event_base_(event_base_new()), : event_base_(event_base_new()),
wakeup_event_(new event()), wakeup_event_(new event()),
thread_(&TaskQueue::ThreadMain, this, queue_name) { thread_(&TaskQueue::ThreadMain,
this,
queue_name,
TaskQueuePriorityToThreadPriority(priority)) {
RTC_DCHECK(queue_name); RTC_DCHECK(queue_name);
int fds[2]; int fds[2];
RTC_CHECK(pipe(fds) == 0); RTC_CHECK(pipe(fds) == 0);

View File

@ -24,6 +24,8 @@ namespace {
#define WM_RUN_TASK WM_USER + 1 #define WM_RUN_TASK WM_USER + 1
#define WM_QUEUE_DELAYED_TASK WM_USER + 2 #define WM_QUEUE_DELAYED_TASK WM_USER + 2
using Priority = TaskQueue::Priority;
DWORD g_queue_ptr_tls = 0; DWORD g_queue_ptr_tls = 0;
BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) { BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) {
@ -49,6 +51,21 @@ void CALLBACK InitializeQueueThread(ULONG_PTR param) {
::TlsSetValue(GetQueuePtrTls(), data->thread_context); ::TlsSetValue(GetQueuePtrTls(), data->thread_context);
data->started->Set(); data->started->Set();
} }
ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
switch (priority) {
case Priority::HIGH:
return kRealtimePriority;
case Priority::LOW:
return kLowPriority;
case Priority::NORMAL:
return kNormalPriority;
default:
RTC_NOTREACHED();
break;
}
return kNormalPriority;
}
} // namespace } // namespace
class TaskQueue::MultimediaTimer { class TaskQueue::MultimediaTimer {
@ -145,8 +162,11 @@ class TaskQueue::MultimediaTimer {
RTC_DISALLOW_COPY_AND_ASSIGN(MultimediaTimer); RTC_DISALLOW_COPY_AND_ASSIGN(MultimediaTimer);
}; };
TaskQueue::TaskQueue(const char* queue_name) TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
: thread_(&TaskQueue::ThreadMain, this, queue_name) { : thread_(&TaskQueue::ThreadMain,
this,
queue_name,
TaskQueuePriorityToThreadPriority(priority)) {
RTC_DCHECK(queue_name); RTC_DCHECK(queue_name);
thread_.Start(); thread_.Start();
Event event(false, false); Event event(false, false);