背景
最近在浏览React Native代码的时候发现有提到Main Queue和Main Thread的区别,很早就有阅读GCD源码的冲动,这回总算找到机会了。
阅读源码之前先给个结论:Main Thread 和 Main Queue是两个不同的东西。
- Main Queue IS bound to Main Thread.
- Main Thread IS NOT bound to Main Queue.
源码剖析
关键点
queue和线程的关系
JUST REMEMBER THIS PNG!!
slowpath vs fastpath
可能大家对__builtin_expect
比较熟悉,这是编译器可以用来优化执行速度的函数。
程序员在写if条件的时候,可能知道比较的值更可能是哪种情况,因此就可以使用fastpath
或者slowpath
来告诉编译器,让编译来帮忙优化。
fastpath
表示条件更可能成立slowpath
表示条件更不可能成立
所以简单的来说,当我们遇到这个东西的时候,直接忽略,并不会影响我们对代码的理解。
#define fastpath(x) ((typeof(x))__builtin_expect((long)(x), ~0l))
#define slowpath(x) ((typeof(x))__builtin_expect((long)(x), 0l))
function vs block
gcd支持function和block的执行,相应的其提供了两种方法来支持function和block的入队,简单的举个例子。所以当我们看到不带f和带f的同名函数,默认他们干的是同一回事。
block的执行底层调用的是function的执行。
// 执行block
void dispatch_async(dispatch_queue_t dq, void (^work)(void));
// 执行function
void dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func);
常用宏定义
libdispatch为了在保证性能的情况下,尽量增加代码的可读性,大量的使用了宏。
因此了解结构体之前,我们需要先知道几个宏定义,方便后续分析代码.
// dispatch结构体定义宏
#define DISPATCH_DECL(name) typedef struct name##_s *name##_t
// os object头部定义宏
#define _OS_OBJECT_HEADER(isa, ref_cnt, xref_cnt)
isa // isa
ref_cnt // 引用计数,这是内部gcd内部使用的计数器
xref_cnt // 外部引用计数,这是gcd外部使用的计数器,两者都为0的时候才能dispose
// dispatch结构体头部
#define DISPATCH_STRUCT_HEADER(x)
_OS_OBJECT_HEADER
do_next // 下一个do
do_targetq // 目标queue
do_ctxt // do上下文
do_finalizer // do销毁时候调用函数
do_suspend_cnt // suspend计数,用作暂停标志
// dispatch continuation的头部
#define DISPATCH_CONTINUATION_HEADER(x)
_OS_OBJECT_HEADER
do_next // 下一个do
dc_func // dc封装的函数
dc_ctxt // dc封装的上下文
dc_data // dc封装的数据
dc_other // dc封装的其他信息
// dispatch queue头部
#define DISPATCH_QUEUE_HEADER
dq_running; // 队列正在运行的任务数
dq_width; // 队列可以并发的数目
dq_items_tail; // 队列末尾节点
dq_items_head; // 队列开头节点
dq_serialnum // 队列序列号,每个队列都有唯一的序列号
dq_specific_q //
// dispatch vtable头部
#define DISPATCH_VTABLE_HEADER(x)
do_type // do类型
do_kind // 种类,例如:semaphore/group/queue...
do_debug // debug回调
do_invoke // invoke回调,唤醒队列回调
do_probe // probe回调,important
do_dispose // dispose回调,销毁队列的方法
GCD结构体
因为本文只关心queue的实现,所以暂时省略了其他结构体,有兴趣的童鞋可以直接下源码看。
os_object_t
系统基类
typedef struct _os_object_s {
_OS_OBJECT_HEADER(
const _os_object_class_s *os_obj_isa,
os_obj_ref_cnt,
os_obj_xref_cnt);
} _os_object_s;
dispatch_object_t
该结构体可以看成gcd的基类。
typedef union {
struct _os_object_s *_os_obj;
struct dispatch_object_s *_do;
struct dispatch_continuation_s *_dc;
struct dispatch_queue_s *_dq;
struct dispatch_queue_attr_s *_dqa;
struct dispatch_group_s *_dg;
struct dispatch_source_s *_ds;
struct dispatch_source_attr_s *_dsa;
struct dispatch_semaphore_s *_dsema;
struct dispatch_data_s *_ddata;
struct dispatch_io_s *_dchannel;
struct dispatch_operation_s *_doperation;
struct dispatch_disk_s *_ddisk;
} dispatch_object_t __attribute__((__transparent_union__));
通过上面的结构体定义可以发现,dispatch_object_t
可以是union结构体中任何一种类型。
dispatch_continuation_t
该结构体主要用来封装block和function
struct dispatch_continuation_s {
DISPATCH_CONTINUATION_HEADER(continuation);
};
dispatch_queue_t
队列结构体,可能是gcd最重要的结构体了。
struct dispatch_queue_s {
DISPATCH_STRUCT_HEADER(queue);
DISPATCH_QUEUE_HEADER;
char dq_label[DISPATCH_QUEUE_MIN_LABEL_SIZE]; // must be last
char _dq_pad[DISPATCH_QUEUE_CACHELINE_PAD]; // for static queues only
};
dispatch_queue_attr_t
队列属性结构体。
struct dispatch_queue_attr_s {
DISPATCH_STRUCT_HEADER(queue_attr);
};
常用API解析
gcd提供了非常多的功能来简化针对多核设备的代码编写,这里我们慢慢添加对常用API的源码剖析。
dispatch_get_global_queue
dispatch_queue_t
dispatch_get_global_queue(long priority, unsigned long flags)
{
if (flags & ~DISPATCH_QUEUE_OVERCOMMIT) {
return NULL;
}
return _dispatch_get_root_queue(priority,
flags & DISPATCH_QUEUE_OVERCOMMIT);
}
当我们获取global queue来使用的时候,其实质上通过_dispatch_get_root_queue
来获取的非overcommit的预先生成的队列的。
_dispatch_get_root_queue
是从结构体_dispatch_root_queues
中获取相应的优先级的队列。_dispatch_root_queues
区分是否overcommit,定义了4中优先级的队列,他们分别是(参考前文的图)。最后1bit是1的代表overcommit。overcommit用来控制线程数能不能超越物理内核数,显然通过该接口获得的队列不会给系统创建过多的队列。
DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY = 0,
DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY,
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY,
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY,
DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY,
DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY,
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY,
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY,
最后提及一下,_dispatch_root_queues
对应的thread实现在_dispatch_root_queue_contexts
中,每一个context都是一个线程池,每个线程池的最大线程数限制是255。
dispatch_get_current_queue
dispatch_queue_t
dispatch_get_current_queue(void)
{
return _dispatch_queue_get_current() ?: _dispatch_get_root_queue(0, true);
}
static inline dispatch_queue_t
_dispatch_queue_get_current(void)
{
return (dispatch_queue_t)_dispatch_thread_getspecific(dispatch_queue_key);
}
可以发现当我们通过dispatch_get_current_queue
来获取当前运行的队列的时候,我们是通过TSD(Thread Specific Data)来确定到底当前是运行在那个queue上的,每当我们切换queue的时候,都是通过_dispatch_thread_setspecific
来设置当前queue。
这里主要涉及到TSD,也有叫TLD的,是个比较有意思的技术。
dispatch_queue_create
dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
dispatch_queue_t dq;
size_t label_len;
if (!label) {
label = "";
}
label_len = strlen(label);
if (label_len < (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1)) {
label_len = (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1);
}
// XXX switch to malloc()
dq = _dispatch_alloc(DISPATCH_VTABLE(queue),
sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_MIN_LABEL_SIZE -
DISPATCH_QUEUE_CACHELINE_PAD + label_len + 1);
_dispatch_queue_init(dq);
strcpy(dq->dq_label, label);
if (fastpath(!attr)) {
return dq;
}
if (fastpath(attr == DISPATCH_QUEUE_CONCURRENT)) {
dq->dq_width = UINT32_MAX;
dq->do_targetq = _dispatch_get_root_queue(0, false);
} else {
dispatch_debug_assert(!attr, "Invalid attribute");
}
return dq;
}
static inline void
_dispatch_queue_init(dispatch_queue_t dq)
{
dq->do_next = (struct dispatch_queue_s *)DISPATCH_OBJECT_LISTLESS;
// Default target queue is overcommit!
dq->do_targetq = _dispatch_get_root_queue(0, true);
dq->dq_running = 0;
dq->dq_width = 1;
dq->dq_serialnum = dispatch_atomic_inc(&_dispatch_queue_serial_numbers) - 1;
}
当我们调用dispatch_queue_create
进行queue的创建的时候,其会首先调用_dispatch_queue_init
初始化一个queue,该queue默认是Default的优先级(还记得前文的图么?),并且dq_width是1,也就是串行队列,并且序列号加1。
如果是并发队列的话,会将dq_width改成UINT32_MAX,并且将目标queue设置成非overcommit的。overcommit如果被设置成true,那就意味着可以创建超过物理核数目的线程数。因此可以发现,自定义并发队列线程数目是不会超过物理内核数的,而串行队列一般是没有这个限制的。
前面说到序列号加1了,那序列号是干什么的呢?在源码中有这样一段注释。
// skip zero
// 1 - main_q
// 2 - mgr_q
// 3 - _unused_
// 4,5,6,7,8,9,10,11 - global queues
// we use 'xadd' on Intel, so the initial value == next assigned
可以发现,序列号是1的时候表示main queue,2的时候表示manager queue,3没有使用,4到11表示global queue,再往后就是用户自定义queue了。
那接下来我们看看序列1和序列2的queue是怎么定义的。
struct dispatch_queue_s _dispatch_main_q = {
.do_vtable = DISPATCH_VTABLE(queue),
#if !DISPATCH_USE_RESOLVERS
.do_targetq = &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
#endif
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.dq_label = "com.apple.main-thread",
.dq_running = 1,
.dq_width = 1,
.dq_serialnum = 1,
};
值得说明的是,main queue的目标queue也只是一个优先级为default的overcommit queue,其背后也是普通的线程池,nothing special。
struct dispatch_queue_s _dispatch_mgr_q = {
.do_vtable = DISPATCH_VTABLE(queue_mgr),
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_targetq = &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
.dq_label = "com.apple.libdispatch-manager",
.dq_width = 1,
.dq_serialnum = 2,
};
该队列是用来管理GCD内部的任务的,比如对于各类Source的管理等。
dispatch_sync
void
dispatch_sync(dispatch_queue_t dq, void (^work)(void))
{
#if DISPATCH_COCOA_COMPAT
if (slowpath(dq == &_dispatch_main_q)) {
return _dispatch_sync_slow(dq, work);
}
#endif
struct Block_basic *bb = (void *)work;
dispatch_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
}
无论走哪个分支,深入查看之后,可以发现,其最终走的都是dispatch_sync_f
,通过_dispatch_Block_copy
或者Block_basic
来实现block到function的转换。
void
dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
if (fastpath(dq->dq_width == 1)) {
return dispatch_barrier_sync_f(dq, ctxt, func);
}
if (slowpath(!dq->do_targetq)) {
// the global root queues do not need strict ordering
(void)dispatch_atomic_add2o(dq, dq_running, 2);
return _dispatch_sync_f_invoke(dq, ctxt, func);
}
_dispatch_sync_f2(dq, ctxt, func);
}
如果dq_width是1的话,也就是dq是串行队列的话,必须要等待前面的任务执行完成之后才能执行该任务,因此会调用dispatch_barrier_sync_f
。barrier的实现是依靠信号量机制来保证的。
void
dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func)
{
// 1) ensure that this thread hasn't enqueued anything ahead of this call
// 2) the queue is not suspended
if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
}
if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1))) {
// global queues and main queue bound to main thread always falls into
// the slow case
return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
}
if (slowpath(dq->do_targetq->do_targetq)) {
return _dispatch_barrier_sync_f_recurse(dq, ctxt, func);
}
_dispatch_barrier_sync_f_invoke(dq, ctxt, func);
}
如果当前队列中有对象或者当前队列处于暂停状态或者当前队列没有运行任何任务的时候,就走_dispatch_barrier_sync_f_slow
慢通道,否则的话就直接调用_dispatch_barrier_sync_f_invoke
。可以发现dispatch_sync
一般来说都是在当前线程执行的,不进行线程的切换,这一点还是要特别注意的。只有走slow
的时候,才会做线程切换。下面就看下走slow
路径是什么样的。
static void
_dispatch_barrier_sync_f_slow(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func)
{
// It's preferred to execute synchronous blocks on the current thread
// due to thread-local side effects, garbage collection, etc. However,
// blocks submitted to the main thread MUST be run on the main thread
struct dispatch_barrier_sync_slow2_s dbss2 = {
.dbss2_dq = dq,
#if DISPATCH_COCOA_COMPAT || DISPATCH_LINUX_COMPAT
.dbss2_func = func,
.dbss2_ctxt = ctxt,
#endif
.dbss2_sema = _dispatch_get_thread_semaphore(),
};
struct dispatch_barrier_sync_slow_s dbss = {
.do_vtable = (void *)(DISPATCH_OBJ_BARRIER_BIT |
DISPATCH_OBJ_SYNC_SLOW_BIT),
.dc_func = _dispatch_barrier_sync_f_slow_invoke,
.dc_ctxt = &dbss2,
};
_dispatch_queue_push(dq, (void *)&dbss);
_dispatch_thread_semaphore_wait(dbss2.dbss2_sema);
_dispatch_put_thread_semaphore(dbss2.dbss2_sema);
......
代码比较长,截取开头一部分,我们可以发现,其通过信号量来同步任务的执行,需要切换线程。
简单看下_dispatch_function_invoke
,主要关注queue和thread的关系并非一一对应的。queue是基于thread的,我们在使用queue的时候,就不应该再操作thread,防止出现意外的情况。
static inline void
_dispatch_function_invoke(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func)
{
dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
_dispatch_thread_setspecific(dispatch_queue_key, dq);
_dispatch_client_callout(ctxt, func);
_dispatch_workitem_inc();
_dispatch_thread_setspecific(dispatch_queue_key, old_dq);
}
当我们切换线程的时候,我们会先更改TSD,执行block,然后再将之前的dq设置回去。
dispatch_async
void
dispatch_async(dispatch_queue_t dq, void (^work)(void))
{
dispatch_async_f(dq, _dispatch_Block_copy(work),
_dispatch_call_block_and_release);
}
这个就比较显而易见了,直接就把block转成function,然后掉xxx_f函数了。
void
dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
dispatch_continuation_t dc;
// No fastpath/slowpath hint because we simply don't know
if (dq->dq_width == 1) {
return dispatch_barrier_async_f(dq, ctxt, func);
}
dc = fastpath(_dispatch_continuation_alloc_cacheonly());
if (!dc) {
return _dispatch_async_f_slow(dq, ctxt, func);
}
dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
dc->dc_func = func;
dc->dc_ctxt = ctxt;
// No fastpath/slowpath hint because we simply don't know
if (dq->do_targetq) {
return _dispatch_async_f2(dq, dc);
}
_dispatch_queue_push(dq, dc);
}
看过前面几个例子之后就比较清晰了,如果dq是串行队列的话,就走dispatch_barrier_async_f
,否则的话就需要创建一个dispatch_continuation
对象来存放function,然后塞到队列后面。其实这两个路径都是干了一件事情,就是创建dc对象,然后塞到队列后面,唯一的区别是
// barrier
dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
// not barrier
dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
其实到这里我们可以知道,gcd队列的阻塞等待就是通过DISPATCH_OBJ_BARRIER_BIT
这个bit标识来实现的,全部的标识有如下几种:
#define DISPATCH_OBJ_ASYNC_BIT 0x1 // 异步
#define DISPATCH_OBJ_BARRIER_BIT 0x2 // 阻塞
#define DISPATCH_OBJ_GROUP_BIT 0x4 // 组
#define DISPATCH_OBJ_SYNC_SLOW_BIT 0x8 // 同步慢
理解这个对于阅读源码作用还是比较大的。
接下去是比较重要的部分,也是隐藏的比较深的部分,我们现在的确把dc给放进队列里面了,那什么时候改dc才会被执行呢?正常的实现应该是上一个任务完成之后主动触发下一个任务。我们看下几个函数
// 同步任务执行
static void
_dispatch_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func)
{
_dispatch_function_invoke(dq, ctxt, func);
if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2) == 0)) {
_dispatch_wakeup(dq);
}
}
// 异步任务执行
static void
_dispatch_async_f_redirect_invoke(void *_ctxt)
{
struct dispatch_continuation_s *dc = _ctxt;
struct dispatch_continuation_s *other_dc = dc->dc_other;
dispatch_queue_t old_dq, dq = dc->dc_data, rq;
old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
_dispatch_thread_setspecific(dispatch_queue_key, dq);
_dispatch_continuation_pop(other_dc);
_dispatch_thread_setspecific(dispatch_queue_key, old_dq);
rq = dq->do_targetq;
while (slowpath(rq->do_targetq) && rq != old_dq) {
if (dispatch_atomic_sub2o(rq, dq_running, 2) == 0) {
_dispatch_wakeup(rq);
}
rq = rq->do_targetq;
}
if (dispatch_atomic_sub2o(dq, dq_running, 2) == 0) {
_dispatch_wakeup(dq);
}
_dispatch_release(dq);
}
可以发现无论是同步的任务执行,还是异步的任务执行,其最终都会调用_dispatch_wakeup
,该函数的作用就是触发下一个任务。
dispatch_queue_t
_dispatch_wakeup(dispatch_object_t dou)
{
dispatch_queue_t tq;
if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou._do))) {
return NULL;
}
if (!dx_probe(dou._do) && !dou._dq->dq_items_tail) {
return NULL;
}
// _dispatch_source_invoke() relies on this testing the whole suspend count
// word, not just the lock bit. In other words, no point taking the lock
// if the source is suspended or canceled.
if (!dispatch_atomic_cmpxchg2o(dou._do, do_suspend_cnt, 0,
DISPATCH_OBJECT_SUSPEND_LOCK)) {
#if DISPATCH_COCOA_COMPAT || DISPATCH_LINUX_COMPAT
if (dou._dq == &_dispatch_main_q) {
return _dispatch_queue_wakeup_main();
}
#endif
return NULL;
}
dispatch_atomic_acquire_barrier();
_dispatch_retain(dou._do);
tq = dou._do->do_targetq;
_dispatch_queue_push(tq, dou._do);
return tq; // libdispatch does not need this, but the Instrument DTrace
// probe does
}
一开始看半天没看出来咋整的,有个非常不起眼的宏dx_probe
,他的定义是(x)->do_vtable->do_probe(x)
,搜了一下do_probe
,可以发现,在init.c中有相关的定义,我们讨论到现在的基本都是root_queue,所以只看root_queue。
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_root, queue,
.do_type = DISPATCH_QUEUE_GLOBAL_TYPE,
.do_kind = "global-queue",
.do_debug = dispatch_queue_debug,
.do_probe = _dispatch_queue_probe_root,
);
bool
_dispatch_queue_probe_root(dispatch_queue_t dq)
{
_dispatch_queue_wakeup_global2(dq, 1);
return false;
}
static inline void
_dispatch_queue_wakeup_global2(dispatch_queue_t dq, unsigned int n)
{
struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
if (!dq->dq_items_tail) {
return;
}
#if HAVE_PTHREAD_WORKQUEUES
if (
#if DISPATCH_USE_PTHREAD_POOL
(qc->dgq_kworkqueue != (void*)(~0ul)) &&
#endif
!dispatch_atomic_cmpxchg2o(qc, dgq_pending, 0, n)) {
_dispatch_debug("work thread request still pending on global queue: "
"%p", dq);
return;
}
#endif // HAVE_PTHREAD_WORKQUEUES
return _dispatch_queue_wakeup_global_slow(dq, n);
}
_dispatch_queue_wakeup_global_slow
的代码这里就不贴了,在这个函数中会发送生产者消息让下一个任务执行,同时也会在必要的时候创建线程。这里是比较隐晦的,但原理和我们猜的也差不多。
结论
看这个代码断断续续也看了几天,终于看出了一点端倪。感谢网友博客提供的关键信息说明,少走了很多弯路。特此记录,以备后用。