一、队列
① 队列实现源码分析
- 在源码中搜索 dispatch_queue_create 关键字,可以在 queue.c 中发现:
dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
return _dispatch_lane_create_with_target(label, attr,
DISPATCH_TARGET_QUEUE_DEFAULT, true);
}
- 进入 _dispatch_lane_create_with_target 中:
DISPATCH_NOINLINE
static dispatch_queue_t
_dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa,
dispatch_queue_t tq, bool legacy)
{
// 创建 dqai
dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);
// 规范化参数,例如qos, overcommit, tq 和 Initialize the queue
...
// 拼接队列名称
const void *vtable;
dispatch_queue_flags_t dqf = legacy ? DQF_MUTABLE : 0;
if (dqai.dqai_concurrent) { // vtable表示类的类型
// OS_dispatch_queue_concurrent
vtable = DISPATCH_VTABLE(queue_concurrent);
} else {
vtable = DISPATCH_VTABLE(queue_serial);
}
....
// 创建队列,并初始化
dispatch_lane_t dq = _dispatch_object_alloc(vtable,
sizeof(struct dispatch_lane_s)); // alloc
// 根据dqai.dqai_concurrent的值,就能判断队列 是 串行 还是并发
_dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
(dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0)); // init
// 设置队列label标识符
dq->dq_label = label;// label赋值
dq->dq_priority = _dispatch_priority_make((dispatch_qos_t)dqai.dqai_qos, dqai.dqai_relpri);// 优先级处理
...
// 类似于类与元类的绑定,不是直接的继承关系,而是类似于模型与模板的关系
dq->do_targetq = tq;
_dispatch_object_debug(dq, "%s", __func__);
return _dispatch_trace_queue_create(dq)._dq; // 研究dq
}
- 通过 _dispatch_queue_attr_to_info 方法传入 dqa(即队列类型,串行、并发等)创建 dispatch_queue_attr_info_t 类型的对象 dqai,用于存储队列的相关属性信息:
dispatch_queue_attr_info_t
_dispatch_queue_attr_to_info(dispatch_queue_attr_t dqa)
{
dispatch_queue_attr_info_t dqai = { };
if (!dqa) return dqai;
#if DISPATCH_VARIANT_STATIC
if (dqa == &_dispatch_queue_attr_concurrent) {
dqai.dqai_concurrent = true;
return dqai;
}
#endif
if (dqa < _dispatch_queue_attrs ||
dqa >= &_dispatch_queue_attrs[DISPATCH_QUEUE_ATTR_COUNT]) {
DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
}
size_t idx = (size_t)(dqa - _dispatch_queue_attrs);
dqai.dqai_inactive = (idx % DISPATCH_QUEUE_ATTR_INACTIVE_COUNT);
idx /= DISPATCH_QUEUE_ATTR_INACTIVE_COUNT;
dqai.dqai_concurrent = !(idx % DISPATCH_QUEUE_ATTR_CONCURRENCY_COUNT);
idx /= DISPATCH_QUEUE_ATTR_CONCURRENCY_COUNT;
dqai.dqai_relpri = -(int)(idx % DISPATCH_QUEUE_ATTR_PRIO_COUNT);
idx /= DISPATCH_QUEUE_ATTR_PRIO_COUNT;
dqai.dqai_qos = idx % DISPATCH_QUEUE_ATTR_QOS_COUNT;
idx /= DISPATCH_QUEUE_ATTR_QOS_COUNT;
dqai.dqai_autorelease_frequency =
idx % DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY_COUNT;
idx /= DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY_COUNT;
dqai.dqai_overcommit = idx % DISPATCH_QUEUE_ATTR_OVERCOMMIT_COUNT;
idx /= DISPATCH_QUEUE_ATTR_OVERCOMMIT_COUNT;
return dqai;
}
- 设置队列相关联的属性,qos, overcommit, tq 和 Initialize the queue 的实现如下:
//
// Step 1: Normalize arguments (qos, overcommit, tq)
//
dispatch_qos_t qos = dqai.dqai_qos;
#if !HAVE_PTHREAD_WORKQUEUE_QOS
if (qos == DISPATCH_QOS_USER_INTERACTIVE) {
dqai.dqai_qos = qos = DISPATCH_QOS_USER_INITIATED;
}
if (qos == DISPATCH_QOS_MAINTENANCE) {
dqai.dqai_qos = qos = DISPATCH_QOS_BACKGROUND;
}
#endif // !HAVE_PTHREAD_WORKQUEUE_QOS
_dispatch_queue_attr_overcommit_t overcommit = dqai.dqai_overcommit;
if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) {
if (tq->do_targetq) {
DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and "
"a non-global target queue");
}
}
if (tq && dx_type(tq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) {
// Handle discrepancies between attr and target queue, attributes win
if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
if (tq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) {
overcommit = _dispatch_queue_attr_overcommit_enabled;
} else {
overcommit = _dispatch_queue_attr_overcommit_disabled;
}
}
if (qos == DISPATCH_QOS_UNSPECIFIED) {
qos = _dispatch_priority_qos(tq->dq_priority);
}
tq = NULL;
} else if (tq && !tq->do_targetq) {
// target is a pthread or runloop root queue, setting QoS or overcommit
// is disallowed
if (overcommit != _dispatch_queue_attr_overcommit_unspecified) {
DISPATCH_CLIENT_CRASH(tq, "Cannot specify an overcommit attribute "
"and use this kind of target queue");
}
} else {
if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
// Serial queues default to overcommit!
overcommit = dqai.dqai_concurrent ?
_dispatch_queue_attr_overcommit_disabled :
_dispatch_queue_attr_overcommit_enabled;
}
}
if (!tq) {
tq = _dispatch_get_root_queue(
qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
overcommit == _dispatch_queue_attr_overcommit_enabled)->_as_dq;
if (unlikely(!tq)) {
DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
}
}
//
// Step 2: Initialize the queue
//
if (legacy) {
// if any of these attributes is specified, use non legacy classes
if (dqai.dqai_inactive || dqai.dqai_autorelease_frequency) {
legacy = false;
}
}
- 通过 DISPATCH_VTABLE 拼接队列名称,即 vtable,其中 DISPATCH_VTABLE 是宏定义,队列的类型是通过 OS_dispatch + 队列类型 queue_concurrent 拼接而成的;
-
- 创建以下队列DISPATCH_QUEUE_SERIA、DISPATCH_QUEUE_CONCURRENT、dispatch_get_main_queue、dispatch_get_global_queue:
// OS_dispatch_queue_serial
dispatch_queue_t serial = dispatch_queue_create("YDW", DISPATCH_QUEUE_SERIAL);
// OS_dispatch_queue_concurrent
// OS_dispatch_queue_concurrent
dispatch_queue_t conque = dispatch_queue_create("YDW", DISPATCH_QUEUE_CONCURRENT);
// DISPATCH_QUEUE_SERIAL max && 1
// queue 对象 alloc init class
dispatch_queue_t mainQueue = dispatch_get_main_queue();
// 多个集合
dispatch_queue_t globQueue = dispatch_get_global_queue(0, 0);
NSLog(@"%@-%@-%@-%@", serial, conque, mainQueue, globQueue);
-
- 串行队列类型:OS_dispatch_queue_serial,调试结果如下:
(lldb) po object_getClass(serial)
OS_dispatch_queue_serial
-
- 并发队列类型:OS_dispatch_queue_concurrent,调试结果如下:
(lldb) po object_getClass(conque)
OS_dispatch_queue_concurrent
-
- DISPATCH 的定义如下:
#define DISPATCH_VTABLE(name) DISPATCH_OBJC_CLASS(name)
#define DISPATCH_OBJC_CLASS(name) (&DISPATCH_CLASS_SYMBOL(name))
#define DISPATCH_CLASS(name) OS_dispatch_##name
- 通过 alloc+init 初始化队列,即 dq,其中在 _dispatch_queue_init 传参中根据dqai.dqai_concurrent 的布尔值,就能判断队列是串行还是并发,我们知道 vtable 表示的是队列类型,因此也可以说明队列也是对象;
-
- 进入 _dispatch_object_alloc -> _os_object_alloc_realized 方法中,可以看到设置了 isa 的指向,这里也可以验证队列也是对象:
inline _os_object_t
_os_object_alloc_realized(const void *cls, size_t size)
{
_os_object_t obj;
dispatch_assert(size >= sizeof(struct _os_object_s));
while (unlikely(!(obj = calloc(1u, size)))) {
_dispatch_temporary_resource_shortage();
}
// 设置 isa 指向
obj->os_obj_isa = cls;
return obj;
}
- 进入 _dispatch_queue_init 方法,队列类型是 dispatch_queue_t,并设置队列的相关属性:
// Note to later developers: ensure that any initialization changes are
// made for statically allocated queues (i.e. _dispatch_main_q).
static inline dispatch_queue_class_t
_dispatch_queue_init(dispatch_queue_class_t dqu, dispatch_queue_flags_t dqf,
uint16_t width, uint64_t initial_state_bits)
{
uint64_t dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(width);
dispatch_queue_t dq = dqu._dq;
dispatch_assert((initial_state_bits & ~(DISPATCH_QUEUE_ROLE_MASK |
DISPATCH_QUEUE_INACTIVE)) == 0);
if (initial_state_bits & DISPATCH_QUEUE_INACTIVE) {
dq->do_ref_cnt += 2; // rdar://8181908 see _dispatch_lane_resume
if (dx_metatype(dq) == _DISPATCH_SOURCE_TYPE) {
dq->do_ref_cnt++; // released when DSF_DELETED is set
}
}
dq_state |= initial_state_bits;
dq->do_next = DISPATCH_OBJECT_LISTLESS;
// 串行还是并发
dqf |= DQF_WIDTH(width);
os_atomic_store2o(dq, dq_atomic_flags, dqf, relaxed);
dq->dq_state = dq_state;
dq->dq_serialnum =
os_atomic_inc_orig(&_dispatch_queue_serial_numbers, relaxed);
return dqu;
}
- 通过 _dispatch_trace_queue_create 对创建的队列进行处理,其中_dispatch_trace_queue_create 是 _dispatch_introspection_queue_create 封装的宏定义,最后会返回处理过的_dq;
dispatch_queue_class_t
_dispatch_introspection_queue_create(dispatch_queue_t dq)
{
dispatch_queue_introspection_context_t dqic;
size_t sz = sizeof(struct dispatch_queue_introspection_context_s);
if (!_dispatch_introspection.debug_queue_inversions) {
sz = offsetof(struct dispatch_queue_introspection_context_s,
__dqic_no_queue_inversion);
}
dqic = _dispatch_calloc(1, sz);
dqic->dqic_queue._dq = dq;
if (_dispatch_introspection.debug_queue_inversions) {
LIST_INIT(&dqic->dqic_order_top_head);
LIST_INIT(&dqic->dqic_order_bottom_head);
}
dq->do_finalizer = dqic;
_dispatch_unfair_lock_lock(&_dispatch_introspection.queues_lock);
LIST_INSERT_HEAD(&_dispatch_introspection.queues, dqic, dqic_list);
_dispatch_unfair_lock_unlock(&_dispatch_introspection.queues_lock);
DISPATCH_INTROSPECTION_INTERPOSABLE_HOOK_CALLOUT(queue_create, dq);
if (DISPATCH_INTROSPECTION_HOOK_ENABLED(queue_create)) {
_dispatch_introspection_queue_create_hook(dq);
}
return upcast(dq)._dqu;
}
- 进入 _dispatch_introspection_queue_create_hook -> dispatch_introspection_queue_get_info -> _dispatch_introspection_lane_get_info 中可以看出,与我们自定义的类还是有区别的,创建队列在底层的实现是通过模板创建的:
DISPATCH_ALWAYS_INLINE
static inline dispatch_introspection_queue_s
_dispatch_introspection_lane_get_info(dispatch_lane_class_t dqu)
{
dispatch_lane_t dq = dqu._dl;
bool global = _dispatch_object_is_global(dq);
uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
dispatch_introspection_queue_s diq = {
.queue = dq->_as_dq,
.target_queue = dq->do_targetq,
.label = dq->dq_label,
.serialnum = dq->dq_serialnum,
.width = dq->dq_width,
.suspend_count = _dq_state_suspend_cnt(dq_state) + dq->dq_side_suspend_cnt,
.enqueued = _dq_state_is_enqueued(dq_state) && !global,
.barrier = _dq_state_is_in_barrier(dq_state) && !global,
.draining = (dq->dq_items_head == (void*)~0ul) ||
(!dq->dq_items_head && dq->dq_items_tail),
.global = global,
.main = dx_type(dq) == DISPATCH_QUEUE_MAIN_TYPE,
};
return diq;
}
② 队列实现流程总结
- 队列创建方法 dispatch_queue_create 中的参数二(即队列类型),决定了下层中 max & 1(用于区分是串行还是并发),其中1表示串行;
- queue 也是一个对象,也需要底层通过 alloc + init 创建,并且在 alloc 中也有一个class,这个 class 是通过宏定义拼接而成,并且同时会指定 isa 的指向;
- 创建队列在底层的处理是通过模板创建的,其类型是 dispatch_introspection_queue_s 结构体。
③ 队列实现流程示意图
二、函数
① 异步函数
- 进入 dispatch_async 的源码实现,主要有两个函数:_dispatch_continuation_init 任务包装函数,_dispatch_continuation_async 并发处理函数,如下所示:
#ifdef __BLOCKS__
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME;
dispatch_qos_t qos;
qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
#endif
- 进入 _dispatch_continuation_init 源码实现,可以看到,主要业务是包装任务,并设置线程的回程函数,相当于初始化:
DISPATCH_ALWAYS_INLINE
static inline dispatch_qos_t
_dispatch_continuation_init(dispatch_continuation_t dc,
dispatch_queue_class_t dqu, dispatch_block_t work,
dispatch_block_flags_t flags, uintptr_t dc_flags)
{
// 拷贝任务
void *ctxt = _dispatch_Block_copy(work);
dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
if (unlikely(_dispatch_block_has_private_data(work))) {
dc->dc_flags = dc_flags;
// 赋值
dc->dc_ctxt = ctxt;
// will initialize all fields but requires dc_flags & dc_ctxt to be set
return _dispatch_continuation_init_slow(dc, dqu, flags);
}
// 封装work - 异步回调
dispatch_function_t func = _dispatch_Block_invoke(work);
if (dc_flags & DC_FLAG_CONSUME) {
// 回调函数赋值 - 同步回调
func = _dispatch_call_block_and_release;
}
return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
}
- 分析说明:
-
- 通过 _dispatch_Block_copy 拷贝任务;
-
- 通过 _dispatch_Block_invoke 封装任务,其中 _dispatch_Block_invoke 是个宏定义,根据以上分析得知是异步回调:
#define _dispatch_Block_invoke(bb) \
((dispatch_function_t)((struct Block_layout *)bb)->invoke)
-
- 如果是同步的,则回调函数赋值为 _dispatch_call_block_and_release;
-
- 通过 _dispatch_continuation_init_f 方法将回调函数赋值,即 f 就是 func,将其保存在属性中:
DISPATCH_ALWAYS_INLINE
static inline dispatch_qos_t
_dispatch_continuation_init_f(dispatch_continuation_t dc,
dispatch_queue_class_t dqu, void *ctxt, dispatch_function_t f,
dispatch_block_flags_t flags, uintptr_t dc_flags)
{
pthread_priority_t pp = 0;
dc->dc_flags = dc_flags | DC_FLAG_ALLOCATED;
dc->dc_func = f;
dc->dc_ctxt = ctxt;
// in this context DISPATCH_BLOCK_HAS_PRIORITY means that the priority
// should not be propagated, only taken from the handler if it has one
if (!(flags & DISPATCH_BLOCK_HAS_PRIORITY)) {
pp = _dispatch_priority_propagate();
}
_dispatch_continuation_voucher_set(dc, flags);
return _dispatch_continuation_priority_set(dc, dqu, pp, flags);
}
- 进入 _dispatch_continuation_async 的源码实现,如下所示:
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
_dispatch_trace_item_push(dqu, dc); // 跟踪日志
}
#else
(void)dc_flags;
#endif
return dx_push(dqu._dq, dc, qos); // 与dx_invoke一样,都是宏
}
-
- 其中,最主要的是 dx_push(dqu._dq, dc, qos),dx_push 是宏定义,如下所示:
#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
-
- dq_push 需要根据队列的类型,执行不同的函数:
DISPATCH_VTABLE_INSTANCE(workloop,
.do_type = DISPATCH_WORKLOOP_TYPE,
.do_dispose = _dispatch_workloop_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_workloop_invoke,
.dq_activate = _dispatch_queue_no_activate,
.dq_wakeup = _dispatch_workloop_wakeup,
.dq_push = _dispatch_workloop_push,
);
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_serial, lane,
.do_type = DISPATCH_QUEUE_SERIAL_TYPE,
.do_dispose = _dispatch_lane_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,
.dq_activate = _dispatch_lane_activate,
.dq_wakeup = _dispatch_lane_wakeup,
.dq_push = _dispatch_lane_push,
);
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
.do_type = DISPATCH_QUEUE_CONCURRENT_TYPE,
.do_dispose = _dispatch_lane_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,
.dq_activate = _dispatch_lane_activate,
.dq_wakeup = _dispatch_lane_wakeup,
.dq_push = _dispatch_lane_concurrent_push,
);
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane,
.do_type = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE,
.do_dispose = _dispatch_object_no_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_object_no_invoke,
.dq_activate = _dispatch_queue_no_activate,
.dq_wakeup = _dispatch_root_queue_wakeup,
.dq_push = _dispatch_root_queue_push,
);
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_pthread_root, lane,
.do_type = DISPATCH_QUEUE_PTHREAD_ROOT_TYPE,
.do_dispose = _dispatch_pthread_root_queue_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_object_no_invoke,
.dq_activate = _dispatch_queue_no_activate,
.dq_wakeup = _dispatch_root_queue_wakeup,
.dq_push = _dispatch_root_queue_push,
);
#endif // DISPATCH_USE_PTHREAD_ROOT_QUEUES
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_mgr, lane,
.do_type = DISPATCH_QUEUE_MGR_TYPE,
.do_dispose = _dispatch_object_no_dispose,
.do_debug = _dispatch_queue_debug,
#if DISPATCH_USE_MGR_THREAD
.do_invoke = _dispatch_mgr_thread,
#else
.do_invoke = _dispatch_object_no_invoke,
#endif
.dq_activate = _dispatch_queue_no_activate,
.dq_wakeup = _dispatch_mgr_queue_wakeup,
.dq_push = _dispatch_mgr_queue_push,
);
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_main, lane,
.do_type = DISPATCH_QUEUE_MAIN_TYPE,
.do_dispose = _dispatch_lane_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,
.dq_activate = _dispatch_queue_no_activate,
.dq_wakeup = _dispatch_main_queue_wakeup,
.dq_push = _dispatch_main_queue_push,
);
#if DISPATCH_COCOA_COMPAT
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_runloop, lane,
.do_type = DISPATCH_QUEUE_RUNLOOP_TYPE,
.do_dispose = _dispatch_runloop_queue_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,
.dq_activate = _dispatch_queue_no_activate,
.dq_wakeup = _dispatch_runloop_queue_wakeup,
.dq_push = _dispatch_lane_push,
);
#endif
DISPATCH_VTABLE_INSTANCE(source,
.do_type = DISPATCH_SOURCE_KEVENT_TYPE,
.do_dispose = _dispatch_source_dispose,
.do_debug = _dispatch_source_debug,
.do_invoke = _dispatch_source_invoke,
.dq_activate = _dispatch_source_activate,
.dq_wakeup = _dispatch_source_wakeup,
.dq_push = _dispatch_lane_push,
);
DISPATCH_VTABLE_INSTANCE(channel,
.do_type = DISPATCH_CHANNEL_TYPE,
.do_dispose = _dispatch_channel_dispose,
.do_debug = _dispatch_channel_debug,
.do_invoke = _dispatch_channel_invoke,
.dq_activate = _dispatch_lane_activate,
.dq_wakeup = _dispatch_channel_wakeup,
.dq_push = _dispatch_lane_push,
);
#if HAVE_MACH
DISPATCH_VTABLE_INSTANCE(mach,
.do_type = DISPATCH_MACH_CHANNEL_TYPE,
.do_dispose = _dispatch_mach_dispose,
.do_debug = _dispatch_mach_debug,
.do_invoke = _dispatch_mach_invoke,
.dq_activate = _dispatch_mach_activate,
.dq_wakeup = _dispatch_mach_wakeup,
.dq_push = _dispatch_lane_push,
);
#endif // HAVE_MACH
- 调试执行函数
-
- 由于是并发队列,可以通过增加 _dispatch_lane_concurrent_push 符号断点,看看是否会执行:
dispatch_queue_t conque = dispatch_queue_create("com.YDW.Queue", DISPATCH_QUEUE_CONCURRENT);
dispatch_async(conque, ^{
NSLog(@"异步函数");
});
- 运行可以看到,确实执行了 _dispatch_lane_concurrent_push;
- 我们进入 _dispatch_lane_concurrent_push 源码,可以发现有两步,继续通过符号断点 _dispatch_continuation_redirect_push 和 _dispatch_lane_push 调试,可以看到 执行了 _dispatch_continuation_redirect_push;
- 继续进入 _dispatch_continuation_redirect_push 源码,继续执行了 dx_push,即产生了“递归”,综合前面队列创建时可知,队列也是一个对象,有父类、根类,所以会递归执行到根类的方法:
DISPATCH_NOINLINE
static void
_dispatch_continuation_redirect_push(dispatch_lane_t dl,
dispatch_object_t dou, dispatch_qos_t qos)
{
if (likely(!_dispatch_object_is_redirection(dou))) {
dou._dc = _dispatch_async_redirect_wrap(dl, dou);
} else if (!dou._dc->dc_ctxt) {
// find first queue in descending target queue order that has
// an autorelease frequency set, and use that as the frequency for
// this continuation.
dou._dc->dc_ctxt = (void *)
(uintptr_t)_dispatch_queue_autorelease_frequency(dl);
}
dispatch_queue_t dq = dl->do_targetq;
if (!qos) qos = _dispatch_priority_qos(dq->dq_priority);
// 递归
dx_push(dq, dou, qos);
}
- 接下来,通过根类的 _dispatch_root_queue_push 符号断点,来验证猜想是否正确,从运行结果看出,猜想完全正确。
- 继续进入 _dispatch_root_queue_push -> _dispatch_root_queue_push_inline ->_dispatch_root_queue_poke -> _dispatch_root_queue_poke_slow 源码,符号断点继续可以验证,查看该方法的源码实现,主要有两步:
-
- 通过 _dispatch_root_queues_init 方法注册回调;
-
- 通过 do-while 循环创建线程,使用 pthread_create 方法;
DISPATCH_NOINLINE
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
int remaining = n;
int r = ENOSYS;
_dispatch_root_queues_init();
...
// do-while循环创建线程
do {
_dispatch_retain(dq); // released in _dispatch_worker_thread
while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
if (r != EAGAIN) {
(void)dispatch_assume_zero(r);
}
_dispatch_temporary_resource_shortage();
}
} while (--remaining);
...
}
- 进入 _dispatch_root_queues_init 源码实现,发现它是一个 dispatch_once_f 单例,其中传入的 func 是 _dispatch_root_queues_init_once:
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_root_queues_init(void)
{
dispatch_once_f(&_dispatch_root_queues_pred, NULL, _dispatch_root_queues_init_once);
}
- 进入 _dispatch_root_queues_init_once 的源码,其内部不同事务的调用句柄都是_dispatch_worker_thread2:
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunreachable-code"
if (unlikely(!_dispatch_kevent_workqueue_enabled)) {
#if DISPATCH_USE_KEVENT_SETUP
cfg.workq_cb = _dispatch_worker_thread2;
r = pthread_workqueue_setup(&cfg, sizeof(cfg));
#else
r = _pthread_workqueue_init(_dispatch_worker_thread2,
offsetof(struct dispatch_queue_s, dq_serialnum), 0);
#endif // DISPATCH_USE_KEVENT_SETUP
#if DISPATCH_USE_KEVENT_WORKLOOP
} else if (wq_supported & WORKQ_FEATURE_WORKLOOP) {
#if DISPATCH_USE_KEVENT_SETUP
cfg.workq_cb = _dispatch_worker_thread2;
cfg.kevent_cb = (pthread_workqueue_function_kevent_t) _dispatch_kevent_worker_thread;
cfg.workloop_cb = (pthread_workqueue_function_workloop_t) _dispatch_workloop_worker_thread;
r = pthread_workqueue_setup(&cfg, sizeof(cfg));
#else
r = _pthread_workqueue_init_with_workloop(_dispatch_worker_thread2,
(pthread_workqueue_function_kevent_t)
_dispatch_kevent_worker_thread,
(pthread_workqueue_function_workloop_t)
_dispatch_workloop_worker_thread,
offsetof(struct dispatch_queue_s, dq_serialnum), 0);
#endif // DISPATCH_USE_KEVENT_SETUP
#endif // DISPATCH_USE_KEVENT_WORKLOOP
#if DISPATCH_USE_KEVENT_WORKQUEUE
} else if (wq_supported & WORKQ_FEATURE_KEVENT) {
#if DISPATCH_USE_KEVENT_SETUP
cfg.workq_cb = _dispatch_worker_thread2;
cfg.kevent_cb = (pthread_workqueue_function_kevent_t) _dispatch_kevent_worker_thread;
r = pthread_workqueue_setup(&cfg, sizeof(cfg));
#else
r = _pthread_workqueue_init_with_kevent(_dispatch_worker_thread2,
(pthread_workqueue_function_kevent_t)
_dispatch_kevent_worker_thread,
offsetof(struct dispatch_queue_s, dq_serialnum), 0);
#endif // DISPATCH_USE_KEVENT_SETUP
#endif
} else {
DISPATCH_INTERNAL_CRASH(wq_supported, "Missing Kevent WORKQ support");
}
#pragma clang diagnostic pop
if (r != 0) {
DISPATCH_INTERNAL_CRASH((r << 16) | wq_supported,
"Root queue initialization failed");
}
#endif // DISPATCH_USE_INTERNAL_WORKQUEUE
}
- 其block回调执行的流程为:_dispatch_root_queues_init_once ->_dispatch_worker_thread2 -> _dispatch_root_queue_drain -> _dispatch_root_queue_drain -> _dispatch_continuation_pop_inline -> _dispatch_continuation_invoke_inline -> _dispatch_client_callout -> dispatch_call_block_and_release,通过 bt 打印堆栈信息,如下:
(lldb) bt
* thread #3, queue = 'com.YDW.Queue', stop reason = breakpoint 1.1
* frame #0: 0x00000001073d6367 函数与队列`__29-[ViewController viewDidLoad]_block_invoke(.block_descriptor=0x00000001073d9108) at ViewController.m:23:9
frame #1: 0x00000001076477ec libdispatch.dylib`_dispatch_call_block_and_release + 12
frame #2: 0x00000001076489c8 libdispatch.dylib`_dispatch_client_callout + 8
frame #3: 0x000000010764b316 libdispatch.dylib`_dispatch_continuation_pop + 557
frame #4: 0x000000010764a71c libdispatch.dylib`_dispatch_async_redirect_invoke + 779
frame #5: 0x000000010765a508 libdispatch.dylib`_dispatch_root_queue_drain + 351
frame #6: 0x000000010765ae6d libdispatch.dylib`_dispatch_worker_thread2 + 135
frame #7: 0x00007fff611639f7 libsystem_pthread.dylib`_pthread_wqthread + 220
frame #8: 0x00007fff61162b77 libsystem_pthread.dylib`start_wqthread + 15
(lldb)
- 特别说明:单例的 block 回调和异步函数的 block 回调是不同的:
- 单例中,block 回调中的 func 是 _dispatch_Block_invoke(block);
- 而异步函数中,block 回调中的 func 是 dispatch_call_block_and_release。
- 综上所述,异步函数的底层实现:将异步任务拷贝并封装,并设置回调函数 func,再通过 dx_push 递归,会重定向到根队列,然后通过 pthread_creat 创建线程,最后通过 dx_invoke 执行 block 回调(注意 dx_push 和 dx_invoke 是成对的)。
② 同步函数
- 进入 dispatch_sync 源码实现,其底层的实现是通过栅栏函数实现的:
DISPATCH_NOINLINE
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
uintptr_t dc_flags = DC_FLAG_BLOCK;
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}
_dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
- 进入 _dispatch_sync_f 源码:
DISPATCH_NOINLINE
static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
uintptr_t dc_flags)
{
_dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}
- 查看 _dispatch_sync_f_inline 源码,其中 width = 1 表示是串行队列;
- 栅栏:_dispatch_barrier_sync_f,可以得出同步函数的底层实现其实是同步栅栏函数;
- 死锁:_dispatch_sync_f_slow,如果存在相互等待的情况,就会造成死锁。
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
if (likely(dq->dq_width == 1)) {// 表示是串行队列
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);//栅栏
}
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);//死锁
}
if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
}
_dispatch_introspection_sync_begin(dl); // 处理当前信息
_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags))); // block执行并释放
}
- 进入 _dispatch_sync_f_slow,当前的主队列是挂起阻塞:
DISPATCH_NOINLINE
static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
dispatch_function_t func, uintptr_t top_dc_flags,
dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
dispatch_queue_t top_dq = top_dqu._dq;
dispatch_queue_t dq = dqu._dq;
if (unlikely(!dq->do_targetq)) {
return _dispatch_sync_function_invoke(dq, ctxt, func);
}
pthread_priority_t pp = _dispatch_get_priority();
struct dispatch_sync_context_s dsc = {
.dc_flags = DC_FLAG_SYNC_WAITER | dc_flags,
.dc_func = _dispatch_async_and_wait_invoke,
.dc_ctxt = &dsc,
.dc_other = top_dq,
.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
.dc_voucher = _voucher_get(),
.dsc_func = func,
.dsc_ctxt = ctxt,
.dsc_waiter = _dispatch_tid_self(),
};
_dispatch_trace_item_push(top_dq, &dsc);
__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);
if (dsc.dsc_func == NULL) {
dispatch_queue_t stop_dq = dsc.dc_other;
return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
}
_dispatch_introspection_sync_begin(top_dq);
_dispatch_trace_item_pop(top_dq, &dsc);
_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
DISPATCH_TRACE_ARG(&dsc));
}
- 往一个队列中加入任务,会 push 加入主队列,进入 _dispatch_trace_item_push:
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_trace_item_push(dispatch_queue_class_t dqu, dispatch_object_t _tail)
{
if (unlikely(DISPATCH_QUEUE_PUSH_ENABLED())) {
_dispatch_trace_continuation(dqu._dq, _tail._do, DISPATCH_QUEUE_PUSH);
}
_dispatch_trace_item_push_inline(dqu._dq, _tail._do);
_dispatch_introspection_queue_push(dqu, _tail);
}
- 进入__DISPATCH_WAIT_FOR_QUEUE__,判断dq是否为正在等待的队列,然后给出一个状态state,然后将dq的状态和当前任务依赖的队列进行匹配:
DISPATCH_NOINLINE
static void
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
// 判断dq是否为正在等待的队列
uint64_t dq_state = _dispatch_wait_prepare(dq);
if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"dispatch_sync called on queue "
"already owned by current thread");
}
// Blocks submitted to the main thread MUST run on the main thread, and
// dispatch_async_and_wait also executes on the remote context rather than
// the current thread.
//
// For both these cases we need to save the frame linkage for the sake of
// _dispatch_async_and_wait_invoke
_dispatch_thread_frame_save_state(&dsc->dsc_dtf);
if (_dq_state_is_suspended(dq_state) ||
_dq_state_is_base_anon(dq_state)) {
dsc->dc_data = DISPATCH_WLH_ANON;
} else if (_dq_state_is_base_wlh(dq_state)) {
dsc->dc_data = (dispatch_wlh_t)dq;
} else {
_dispatch_wait_compute_wlh(upcast(dq)._dl, dsc);
}
if (dsc->dc_data == DISPATCH_WLH_ANON) {
dsc->dsc_override_qos_floor = dsc->dsc_override_qos =
(uint8_t)_dispatch_get_basepri_override_qos_floor();
_dispatch_thread_event_init(&dsc->dsc_event);
}
dx_push(dq, dsc, _dispatch_qos_from_pp(dsc->dc_priority));
_dispatch_trace_runtime_event(sync_wait, dq, 0);
if (dsc->dc_data == DISPATCH_WLH_ANON) {
_dispatch_thread_event_wait(&dsc->dsc_event); // acquire
} else {
_dispatch_event_loop_wait_for_ownership(dsc);
}
if (dsc->dc_data == DISPATCH_WLH_ANON) {
_dispatch_thread_event_destroy(&dsc->dsc_event);
// If _dispatch_sync_waiter_wake() gave this thread an override,
// ensure that the root queue sees it.
if (dsc->dsc_override_qos > dsc->dsc_override_qos_floor) {
_dispatch_set_basepri_override_qos(dsc->dsc_override_qos);
}
}
}
- 继续进入 _dq_state_drain_locked_by -> _dispatch_lock_is_locked_by 源码,如果当前等待的和正在执行的是同一个队列,即判断线程ID是否相等,如果相等,则会造成死锁,如下:
DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
// equivalent to _dispatch_lock_owner(lock_value) == tid
// 异或操作:相同为0,不同为1,如果相同,则为0,0 &任何数都为0
// 即判断 当前要等待的任务 和 正在执行的任务是否一样,通俗的意思就是执行和等待的是否在同一队列
return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}
- 综上所述,同步函数的底层实现实际是同步栅栏函数;同步函数中如果当前正在执行的队列和等待的是同一个队列,形成相互等待的局面,则会造成死锁。
- 同步函数的底层实现流程如下:
三、单例
① 单例的使用
- 在日常开发中,我们一般使用 GCD 的 dispatch_once 来创建单例,如下所示:
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
NSLog(@"这是一个单例!!");
});
- 那么,单例的流程只执行一次,底层是如何控制的,为什么只能执行一次?单例的 block 是在什么时候进行调用的?
② 源码分析
- 进入 dispatch_once 源码,可以看到一个 dispatch_once_f 方法实现:
#ifdef __BLOCKS__
void
// 参数1:onceToken,它是一个静态变量,由于不同位置定义的静态变量是不同的,所以静态变量具有唯一性
// 参数2:block回调
dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}
#endif
- 继续进入 dispatch_once_f 源码,其中的 val 是外界传入的 onceToken 静态变量,而 func 是 _dispatch_Block_invoke(block),其底层实现如下所示:
DISPATCH_NOINLINE
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
dispatch_once_gate_t l = (dispatch_once_gate_t)val;
#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
uintptr_t v = os_atomic_load(&l->dgo_once, acquire);//load
if (likely(v == DLOCK_ONCE_DONE)) { // 已经执行过了,直接返回
return;
}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
if (likely(DISPATCH_ONCE_IS_GEN(v))) {
return _dispatch_once_mark_done_if_quiesced(l, v);
}
#endif
#endif
if (_dispatch_once_gate_tryenter(l)) { // 尝试进入
return _dispatch_once_callout(l, ctxt, func);
}
return _dispatch_once_wait(l); // 无限次等待
}
- dispatch_once_f 实现源码,分析如下:
- 将val,也即为静态变量转换为 dispatch_once_gate_t 类型的变量 l;
- 通过 os_atomic_load 获取此时的任务的标识符 v;
- 如果 v 等于 DLOCK_ONCE_DONE,表示任务已经被执行,直接 return;
- 如果任务执行,加锁失败,则执行 _dispatch_once_mark_done_if_quiesced 函数,再次进行存储,将标识符置为 DLOCK_ONCE_DONE;
- 否则,则通过 _dispatch_once_gate_tryenter 尝试进入任务,即解锁,然后执行 _dispatch_once_callout 执行 block 回调;
- 如果此时有任务正在执行,再次进来一个任务2,则通过 _dispatch_once_wait 函数让任务2进入无限次等待。
- 进入 _dispatch_once_gate_tryenter 解锁方法实现,可以看到它是通过 os_atomic_cmpxchg 方法进行对比,如果比较没有问题,则进行加锁,即任务的标识符置为 DLOCK_ONCE_UNLOCKED:
DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
// 首先对比,然后进行改变
return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
(uintptr_t)_dispatch_lock_value_for_self(), relaxed);
}
- 进入 _dispatch_once_callout 回调,它主要通过 _dispatch_client_callout:block 回调执行,以及 _dispatch_once_gate_broadcast:进行广播:
DISPATCH_NOINLINE
static void
_dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
dispatch_function_t func)
{
// block调用执行
_dispatch_client_callout(ctxt, func);
// 进行广播:告诉别人已有归属,不要再找寻
_dispatch_once_gate_broadcast(l);
}
- 进入 _dispatch_client_callout 源码,主要就是执行 block 回调,其中的 f 等于 _dispatch_Block_invoke(block),即异步回调:
#undef _dispatch_client_callout
void
_dispatch_client_callout(void *ctxt, dispatch_function_t f)
{
@try {
return f(ctxt);
}
@catch (...) {
objc_terminate();
}
}
- 进入 _dispatch_once_gate_broadcast -> _dispatch_once_mark_done 源码,它主要就是给 dgo->dgo_once 赋一个值,然后将任务的标识符为DLOCK_ONCE_DONE,即解锁;
DISPATCH_ALWAYS_INLINE
static inline uintptr_t
_dispatch_once_mark_done(dispatch_once_gate_t dgo)
{
// 如果不相同,直接改为相同,然后上锁 -- DLOCK_ONCE_DONE
return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release);
}
- 看完单例的底层源码实现,我们就可以解释上面的问题:
-
- 单例只执行一次的原理:GCD 单例中,有两个重要参数,onceToken 和 block,其中 onceToken 是静态变量,具有唯一性,在底层被封装成了 dispatch_once_gate_t 类型的变量 l,l 主要是用来获取底层原子封装性的关联,即变量 v,通过 v 来查询任务的状态,如果此时 v 等于 DLOCK_ONCE_DONE,说明任务已经处理过一次了,则不会再继续执行,直接 return;
-
- block 调用时机:如果此时任务没有执行过,则会在底层通过 C++ 函数的比较,将任务进行加锁,即任务状态置为 DLOCK_ONCE_UNLOCK(目的是为了保证当前任务执行的唯一性,防止在其它地方有多次定义),加锁之后进行block回调函数的执行,执行完成后,将当前任务解锁,将当前的任务状态置为 DLOCK_ONCE_DONE,在下次进来时,就不会在执行,会直接返回;
-
- 多线程影响:如果在当前任务执行期间,有其它任务进来,会进入无限次等待,原因是当前任务已经获取了锁,进行了加锁,那么其它任务是无法获取锁的。
- 单例的底层分析流程如下:
四、栅栏函数
① 栅栏函数说明
- GCD 中常用的栅栏函数,主要有两种:同步栅栏函数dispatch_barrier_sync(在主线程中执行):前面的任务执行完毕才会执行栅栏函数之后的任务,但是同步栅栏函数会堵塞线程,影响后面的任务执行;异步栅栏函数 dispatch_barrier_async:前面的任务执行完毕才会继续执行栅栏函数的之后的任务。
- 栅栏函数最直接的作用就是控制任务执行顺序,使同步执行。
- 栅栏函数只能控制同一并发队列;
- 同步栅栏添加进入队列的时候,当前线程会被锁死,直到同步栅栏之前的任务和同步栅栏任务本身执行完毕时,当前线程才会打开然后继续执行下一句代码。
- 在使用栅栏函数时,使用自定义队列才有意义,如果用的是串行队列或者系统提供的全局并发队列,这个栅栏函数的作用等同于一个同步函数的作用,没有任何意义。
② 栅栏函数使用
- 现在总共有4个任务,其中前2个任务有依赖关系,即任务1执行完,执行任务2,此时可以使用栅栏函数;
- 异步栅栏函数:
-
- 使用异步栅栏函数如下所示:
dispatch_queue_t concurrentQueue = dispatch_queue_create("com.YDW.queue", DISPATCH_QUEUE_CONCURRENT);
// 异步函数
dispatch_async(concurrentQueue, ^{
NSLog(@"任务1");
});
// 栅栏函数
dispatch_barrier_async(concurrentQueue, ^{
NSLog(@"任务2");
});
// 异步函数
dispatch_async(concurrentQueue, ^{
NSLog(@"任务3");
});
NSLog(@"任务4");
-
- 执行结果如下所示:
2021-03-31 19:52:08.355734+0800 GCD[98653:3157789] 任务4
2021-03-31 19:52:08.355882+0800 GCD[98653:3157142] 任务1
2021-03-31 19:52:08.356001+0800 GCD[98653:3157142] 任务2
2021-03-31 19:52:08.356011+0800 GCD[98653:3157789] 任务3
-
- 由此,异步栅栏函数不会阻塞主线程 ,异步堵塞的是队列;
- 同步栅栏函数:
-
- 使用同步栅栏函数如下所示:
dispatch_queue_t concurrentQueue = dispatch_queue_create("com.YDW.queue", DISPATCH_QUEUE_CONCURRENT);
// 异步函数
dispatch_async(concurrentQueue, ^{
NSLog(@"任务1");
});
// 栅栏函数
dispatch_barrier_sync(concurrentQueue, ^{
NSLog(@"任务2");
});
// 异步函数
dispatch_async(concurrentQueue, ^{
NSLog(@"任务3");
});
NSLog(@"任务4");
-
- 执行结果如下:
2021-03-31 19:52:08.355734+0800 GCD[98653:3157789] 任务1
2021-03-31 19:52:08.355882+0800 GCD[98653:3157142] 任务2
2021-03-31 19:52:08.356001+0800 GCD[98653:3157142] 任务4
2021-03-31 19:52:08.356011+0800 GCD[98653:3157789] 任务3
-
- 因此,同步栅栏函数会堵塞主线程,同步堵塞是当前的线程。
- 栅栏函数除了用于任务有依赖关系时,同时还可以用于数据安全。
-
- 现有如下的代码,执行会出现什么问题?
dispatch_queue_t concurrentQueue = dispatch_queue_create("com.YDW.queue", DISPATCH_QUEUE_CONCURRENT);
NSMutableArray *array = [NSMutableArray array];
for (int i = 0; i < 10000; i++) {
dispatch_async(concurrentQueue, ^{
[array addObject:[NSString stringWithFormat:@"%d",i]];
});
}
- 可以看到,程序出现了崩溃,崩溃原因如下:
malloc: *** error for object 0x7fe0ae809400: pointer being freed was not allocated
malloc: *** set a breakpoint in malloc_error_break to debug
- 这是因为数据在不断的 retain 和 release,在数据还没有 retain 完毕时,然后又已经开始了 release,相当于加了一个空数据,进行 release,从而闪退。那么怎么解决呢?
-
- 使用栅栏函数就可以解决问题,如下所示:
dispatch_queue_t concurrentQueue = dispatch_queue_create("com.YDW.queue", DISPATCH_QUEUE_CONCURRENT);
NSMutableArray *array = [NSMutableArray array];
for (int i = 0; i < 10000; i++) {
dispatch_async(concurrentQueue, ^{
dispatch_barrier_async(concurrentQueue, ^{
[array addObject:[NSString stringWithFormat:@"%d",i]];
});
});
}
-
- 使用互斥锁 synchronized 也可以解决,如下:
dispatch_queue_t concurrentQueue = dispatch_queue_create("com.YDW.queue", DISPATCH_QUEUE_CONCURRENT);
NSMutableArray *array = [NSMutableArray array];
for (int i = 0; i < 10000; i++) {
dispatch_async(concurrentQueue, ^{
@synchronized (self) {
[array addObject:[NSString stringWithFormat:@"%d", i]];
};
});
}
-
- 这里需要注意:
- 如果栅栏函数中使用全局队列, 运行会崩溃,原因是系统也在用全局并发队列,使用栅栏同时会拦截系统的,所以会出现崩溃;
- 如果将自定义并发队列改为串行队列,即 serial ,串行队列本身就是有序同步,此时加栅栏会浪费性能;
- 栅栏函数只会阻塞一次。
- 这里需要注意:
③ 底层分析
- 异步栅栏函数:进入 dispatch_barrier_async 源码,其底层的实现与 dispatch_async 类似:
#ifdef __BLOCKS__
void
dispatch_barrier_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_BARRIER;
dispatch_qos_t qos;
qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
_dispatch_continuation_async(dq, dc, qos, dc_flags);
}
#endif
- 同步栅栏函数:
-
- 进入 dispatch_barrier_sync 源码,如下:
void
dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work)
{
uintptr_t dc_flags = DC_FLAG_BARRIER | DC_FLAG_BLOCK;
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}
_dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
-
- 进入 _dispatch_barrier_sync_f -> _dispatch_barrier_sync_f_inline 源码:
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
dispatch_tid tid = _dispatch_tid_self();// 获取线程的id,即线程的唯一标识
...
// 判断线程状态,需不需要等待,是否回收
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) { // 栅栏函数死锁
return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl, // 没有回收
DC_FLAG_BARRIER | dc_flags);
}
// 验证target是否存在,如果存在,加入栅栏函数的递归查找 是否等待
if (unlikely(dl->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func,
DC_FLAG_BARRIER | dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
dq, ctxt, func, dc_flags | DC_FLAG_BARRIER))); // 执行
}
-
- 其中,_dispatch_queue_try_acquire_barrier_sync 实现如下:
DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
static inline bool
_dispatch_queue_try_acquire_barrier_sync(dispatch_queue_class_t dq, uint32_t tid)
{
return _dispatch_queue_try_acquire_barrier_sync_and_suspend(dq._dl, tid, 0);
}
- 进入 _dispatch_queue_try_acquire_barrier_sync_and_suspend:
DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
static inline bool
_dispatch_queue_try_acquire_barrier_sync_and_suspend(dispatch_lane_t dq,
uint32_t tid, uint64_t suspend_count)
{
uint64_t init = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER |
_dispatch_lock_value_from_tid(tid) |
(suspend_count * DISPATCH_QUEUE_SUSPEND_INTERVAL);
uint64_t old_state, new_state;
return os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, {
uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK;
if (old_state != (init | role)) {
os_atomic_rmw_loop_give_up(break);
}
new_state = value | role;
});
}
-
- 通过 _dispatch_introspection_sync_begin 对向前信息进行处理:
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_introspection_sync_begin(dispatch_queue_class_t dq)
{
if (!_dispatch_introspection.debug_queue_inversions) return;
_dispatch_introspection_order_record(dq._dq);
}
-
- 通过 _dispatch_lane_barrier_sync_invoke_and_complete 执行 block 并释放:
DISPATCH_NOINLINE
static void
_dispatch_lane_barrier_sync_invoke_and_complete(dispatch_lane_t dq,
void *ctxt, dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
{
_dispatch_sync_function_invoke_inline(dq, ctxt, func);
_dispatch_trace_item_complete(dc);
if (unlikely(dq->dq_items_tail || dq->dq_width > 1)) {
return _dispatch_lane_barrier_complete(dq, 0, 0);
}
// Presence of any of these bits requires more work that only
// _dispatch_*_barrier_complete() handles properly
//
// Note: testing for RECEIVED_OVERRIDE or RECEIVED_SYNC_WAIT without
// checking the role is sloppy, but is a super fast check, and neither of
// these bits should be set if the lock was never contended/discovered.
const uint64_t fail_unlock_mask = DISPATCH_QUEUE_SUSPEND_BITS_MASK |
DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_DIRTY |
DISPATCH_QUEUE_RECEIVED_OVERRIDE | DISPATCH_QUEUE_SYNC_TRANSFER |
DISPATCH_QUEUE_RECEIVED_SYNC_WAIT;
uint64_t old_state, new_state;
// similar to _dispatch_queue_drain_try_unlock
// 对下层状态的释放
os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
new_state = old_state - DISPATCH_QUEUE_SERIAL_DRAIN_OWNED;
new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
if (unlikely(old_state & fail_unlock_mask)) {
os_atomic_rmw_loop_give_up({
// 通知 barrier 执行完毕
return _dispatch_lane_barrier_complete(dq, 0, 0);
});
}
});
if (_dq_state_is_base_wlh(old_state)) {
_dispatch_event_loop_assert_not_owned((dispatch_wlh_t)dq);
}
}
-
- 同步栅栏函数底层流程总结:
- 通过 _dispatch_tid_self 获取线程ID;
- 通过 _dispatch_queue_try_acquire_barrier_sync 判断线程状态;
- 通过 _dispatch_queue_try_acquire_barrier_sync_and_suspend 进行释放;
- 通过 _dispatch_sync_recurse 递归查找栅栏函数的 target;
- 通过 _dispatch_introspection_sync_begin 对向前信息进行处理;
- 通过 _dispatch_lane_barrier_sync_invoke_and_complete 执行 block 并释放。
- 同步栅栏函数底层流程总结:
五、信号量
① dispatch_semaphore_create 创建
- 信号量的作用一般是用来使任务同步执行,类似于互斥锁,用户可以根据需要控制GCD最大并发数,一般使用如下:
// 信号量
dispatch_semaphore_t sem = dispatch_semaphore_create(1);
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
dispatch_semaphore_signal(sem);
- 创建的底层实现,主要是初始化信号量,并设置GCD的最大并发数,其最大并发数必须大于0,如下所示:
dispatch_semaphore_t
dispatch_semaphore_create(long value)
{
dispatch_semaphore_t dsema;
// If the internal value is negative, then the absolute of the value is
// equal to the number of waiting threads. Therefore it is bogus to
// initialize the semaphore with a negative value.
if (value < 0) {
return DISPATCH_BAD_INPUT;
}
dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
sizeof(struct dispatch_semaphore_s));
dsema->do_next = DISPATCH_OBJECT_LISTLESS;
dsema->do_targetq = _dispatch_get_default_queue(false);
dsema->dsema_value = value;
_dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
dsema->dsema_orig = value;
return dsema;
}
② dispatch_semaphore_wait 加锁
- 该函数的实现,其主要作用是对信号量 dsema 通过 os_atomic_dec2o 进行了–操作,其内部是执行的 C++ 的 atomic_fetch_sub_explicit 方法:
- 如果value 大于等于0,表示操作无效,即执行成功;
- 如果value 等于LONG_MIN,系统会抛出一个 crash;
- 如果value 小于0,则进入长等待。
long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
// dsema_value 进行 -- 操作
long value = os_atomic_dec2o(dsema, dsema_value, acquire);
if (likely(value >= 0)) { // 表示执行操作无效,即执行成功
return 0;
}
return _dispatch_semaphore_wait_slow(dsema, timeout); // 长等待
}
- 其中,os_atomic_dec2o 的宏定义转换如下:
os_atomic_inc2o(p, f, m)
os_atomic_sub2o(p, f, 1, m)
_os_atomic_c11_op((p), (v), m, sub, -)
_os_atomic_c11_op((p), (v), m, add, +)
({ _os_atomic_basetypeof(p) _v = (v), _r = \
atomic_fetch_##o##_explicit(_os_atomic_c11_atomic(p), _v, \
memory_order_##m); (__typeof__(_r))(_r op _v); })
// 传值代入
os_atomic_dec2o(dsema, dsema_value, acquire);
os_atomic_sub2o(dsema, dsema_value, 1, m)
os_atomic_sub(dsema->dsema_value, 1, m)
_os_atomic_c11_op(dsema->dsema_value, 1, m, sub, -)
_r = atomic_fetch_sub_explicit(dsema->dsema_value, 1),
等价于 dsema->dsema_value - 1
- 进入 _dispatch_semaphore_wait_slow 的源码实现,当 value 小于 0 时,根据等待事件 timeout 做出不同操作:
DISPATCH_NOINLINE
static long
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
dispatch_time_t timeout)
{
long orig;
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
switch (timeout) {
default:
if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
break;
}
// Fall through and try to undo what the fast path did to
// dsema->dsema_value
case DISPATCH_TIME_NOW:
orig = dsema->dsema_value;
while (orig < 0) {
if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1,
&orig, relaxed)) {
return _DSEMA4_TIMEOUT();
}
}
// Another thread called semaphore_signal().
// Fall through and drain the wakeup.
case DISPATCH_TIME_FOREVER:
_dispatch_sema4_wait(&dsema->dsema_sema);
break;
}
return 0;
}
③ dispatch_semaphore_signal 解锁
- dispatch_semaphore_signal 实现原理,核心是通过 os_atomic_inc2o 函数对 value 进行 ++ 操作,os_atomic_inc2o 内部是通过 C++ 的 atomic_fetch_add_explicit,如果value 大于 0,表示操作无效,即执行成功;如果value 等于0,则进入长等待:
long
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
// signal 对 value 是 ++
long value = os_atomic_inc2o(dsema, dsema_value, release);
if (likely(value > 0)) { // 返回0,表示当前的执行操作无效,相当于执行成功
return 0;
}
if (unlikely(value == LONG_MIN)) {
DISPATCH_CLIENT_CRASH(value,
"Unbalanced call to dispatch_semaphore_signal()");
}
return _dispatch_semaphore_signal_slow(dsema); // 进入长等待
}
- os_atomic_dec2o 的宏定义转换如下:
os_atomic_inc2o(p, f, m)
os_atomic_add2o(p, f, 1, m)
os_atomic_add(&(p)->f, (v), m)
_os_atomic_c11_op((p), (v), m, add, +)
({ _os_atomic_basetypeof(p) _v = (v), _r = \
atomic_fetch_##o##_explicit(_os_atomic_c11_atomic(p), _v, \
memory_order_##m); (__typeof__(_r))(_r op _v); })
// 传值如下:
os_atomic_inc2o(dsema, dsema_value, release);
os_atomic_add2o(dsema, dsema_value, 1, m)
os_atomic_add(&(dsema)->dsema_value, (1), m)
_os_atomic_c11_op((dsema->dsema_value), (1), m, add, +)
_r = atomic_fetch_add_explicit(dsema->dsema_value, 1), // 等价于 dsema->dsema_value + 1
④ dispatch_semaphore 底层实现流程
- dispatch_semaphore_create 主要初始化信号量;
- dispatch_semaphore_wait 是对信号量的 value 进行–,即加锁操作;
- dispatch_semaphore_signal 是对信号量的 value 进行++,即解锁操作。
六、调度组
① dispatch_group_create 创建组
- 进入 dispatch_group_create 源码:
dispatch_group_t
dispatch_group_create(void)
{
return _dispatch_group_create_with_count(0);
}
- 进入 _dispatch_group_create_with_count 源码,主要是对 group 对象属性赋值,并返回 group 对象,n 等于 0;
DISPATCH_ALWAYS_INLINE
static inline dispatch_group_t
_dispatch_group_create_with_count(uint32_t n)
{
// 创建group对象,类型为OS_dispatch_group
dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
sizeof(struct dispatch_group_s));
// group对象赋值
dg->do_next = DISPATCH_OBJECT_LISTLESS;
dg->do_targetq = _dispatch_get_default_queue(false);
if (n) {
os_atomic_store2o(dg, dg_bits,
(uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
os_atomic_store2o(dg, do_ref_cnt, 1, relaxed);
}
return dg;
}
② dispatch_group_enter 进组
- 进入 dispatch_group_enter 源码,通过 os_atomic_sub_orig2o 对 dg->dg.bits 作 – 操作,对数值进行处理:
void
dispatch_group_enter(dispatch_group_t dg)
{
// The value is decremented on a 32bits wide atomic so that the carry
// for the 0 -> -1 transition is not propagated to the upper 32bits.
uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits, // 原子递减 0 -> -1
DISPATCH_GROUP_VALUE_INTERVAL, acquire);
uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
if (unlikely(old_value == 0)) { // 如果old_value
_dispatch_retain(dg);
}
if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) { // 到达临界值,会报crash
DISPATCH_CLIENT_CRASH(old_bits,
"Too many nested calls to dispatch_group_enter()");
}
}
- 其中当 old_value == DISPATCH_GROUP_VALUE_MAX,到达临界值,会报crash, DISPATCH_GROUP_VALUE_MAX 的定义如下:
#define DISPATCH_GROUP_GEN_MASK 0xffffffff00000000ULL
#define DISPATCH_GROUP_VALUE_MASK 0x00000000fffffffcULL
#define DISPATCH_GROUP_VALUE_INTERVAL 0x0000000000000004ULL
#define DISPATCH_GROUP_VALUE_1 DISPATCH_GROUP_VALUE_MASK
#define DISPATCH_GROUP_VALUE_MAX DISPATCH_GROUP_VALUE_INTERVAL
#define DISPATCH_GROUP_HAS_NOTIFS 0x0000000000000002ULL
#define DISPATCH_GROUP_HAS_WAITERS 0x0000000000000001ULL
③ dispatch_group_leave 出组
- 进入 dispatch_group_leave 源码:
void
dispatch_group_leave(dispatch_group_t dg)
{
// The value is incremented on a 64bits wide atomic so that the carry for
// the -1 -> 0 transition increments the generation atomically.
uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,// 原子递增 ++
DISPATCH_GROUP_VALUE_INTERVAL, release);
uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
// 根据状态,唤醒
if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
old_state += DISPATCH_GROUP_VALUE_INTERVAL;
do {
new_state = old_state;
if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
} else {
// If the group was entered again since the atomic_add above,
// we can't clear the waiters bit anymore as we don't know for
// which generation the waiters are for
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
}
if (old_state == new_state) break;
} while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
old_state, new_state, &old_state, relaxed)));
return _dispatch_group_wake(dg, old_state, true);//唤醒
}
//-1 -> 0, 0+1 -> 1,即多次leave,会报crash,简单来说就是enter-leave不平衡
if (unlikely(old_value == 0)) {
DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
"Unbalanced call to dispatch_group_leave()");
}
}
- 分析说明:
-
- 1 到 0,即++操作;
-
- 根据状态,do-while 循环,唤醒执行 block 任务;
-
- 如果0 + 1 = 1,enter-leave 不平衡,即 leave 多次调用,会 crash。
- 进入 _dispatch_group_wake 源码,do-while 循环进行异步命中,调用 _dispatch_continuation_async 执行:
DISPATCH_NOINLINE
static void
_dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
{
uint16_t refs = needs_release ? 1 : 0;
if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
dispatch_continuation_t dc, next_dc, tail;
// Snapshot before anything is notified/woken <rdar://problem/8554546>
dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
do {
dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
_dispatch_continuation_async(dsn_queue, dc,
_dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags); // block任务执行
_dispatch_release(dsn_queue);
} while ((dc = next_dc)); // do-while循环,进行异步任务的命中
refs++;
}
if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
_dispatch_wake_by_address(&dg->dg_gen); // 地址释放
}
if (refs) _dispatch_release_n(dg, refs); // 引用释放
}
- 进入 _dispatch_continuation_async 源码:
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
_dispatch_trace_item_push(dqu, dc); // 跟踪日志
}
#else
(void)dc_flags;
#endif
return dx_push(dqu._dq, dc, qos); // 与dx_invoke一样,都是宏
}
④ dispatch_group_notify 通知
- 进入 dispatch_group_notify 源码,如果 old_state 等于 0,就可以进行释放:
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dsn)
{
uint64_t old_state, new_state;
dispatch_continuation_t prev;
dsn->dc_data = dq;
_dispatch_retain(dq);
// 获取dg底层的状态标识码,通过os_atomic_store2o获取的值,即从dg的状态码 转成了 os底层的state
prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
if (os_mpsc_push_was_empty(prev)) {
os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
if ((uint32_t)old_state == 0) { // 如果等于0,则可以进行释放了
os_atomic_rmw_loop_give_up({
return _dispatch_group_wake(dg, new_state, false); // 唤醒
});
}
});
}
}
- 其中 os_mpsc_push_update_tail 定义如下,用于获取 dg 的状态码:
#define os_mpsc_push_update_tail(Q, tail, _o_next) ({ \
os_mpsc_node_type(Q) _tl = (tail); \
os_atomic_store2o(_tl, _o_next, NULL, relaxed); \
os_atomic_xchg(_os_mpsc_tail Q, _tl, release); \
})
⑤ dispatch_group_async
- 进入 dispatch_group_async 源码,主要是包装任务和异步处理任务:
#ifdef __BLOCKS__
void
dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_block_t db)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
dispatch_qos_t qos;
// 任务包装器
qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
// 处理任务
_dispatch_continuation_group_async(dg, dq, dc, qos);
}
#endif
- 进入 _dispatch_continuation_group_async 源码,主要集成 dispatch_group_enter 进组操作:
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dc, dispatch_qos_t qos)
{
dispatch_group_enter(dg); // 进组
dc->dc_data = dg;
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags); // 异步操作
}
- 进入 _dispatch_continuation_async 源码,执行常规的异步函数底层操作,既然有 enter,肯定有 leave,可以猜测 block 执行之后隐性的执行 leave。现有如下代码:
dispatch_group_t group = dispatch_group_create();
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_group_enter(group);
dispatch_async(queue, ^{
sleep(1);
NSLog(@"任务1");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(queue, ^{
NSLog(@"任务2");
dispatch_group_leave(group);
});
dispatch_group_async(group, queue, ^{
NSLog(@"任务5");
});
dispatch_group_enter(group);
dispatch_async(queue, ^{
NSLog(@"任务3");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(queue, ^{
NSLog(@"任务4");
dispatch_group_leave(group);
});
- 打印堆栈信息:
(lldb) bt
* thread #6, queue = 'com.apple.root.default-qos', stop reason = breakpoint 1.1
* frame #0: 0x00000001053c7517 函数与队列`__29-[ViewController viewDidLoad]_block_invoke_2(.block_descriptor=0x00000001053ca128) at ViewController.m:35:9
frame #1: 0x00000001056387ec libdispatch.dylib`_dispatch_call_block_and_release + 12
frame #2: 0x00000001056399c8 libdispatch.dylib`_dispatch_client_callout + 8
frame #3: 0x000000010563bfe2 libdispatch.dylib`_dispatch_queue_override_invoke + 1444
frame #4: 0x000000010564b508 libdispatch.dylib`_dispatch_root_queue_drain + 351
frame #5: 0x000000010564be6d libdispatch.dylib`_dispatch_worker_thread2 + 135
frame #6: 0x00007fff611639f7 libsystem_pthread.dylib`_pthread_wqthread + 220
frame #7: 0x00007fff61162b77 libsystem_pthread.dylib`start_wqthread + 15
(lldb)
- 可以看到 libdispatch.dylib`_dispatch_client_callout + 8,我们查看 _dispatch_client_callout 的调用,可以看到在 _dispatch_continuation_with_group_invoke 中,因此可以验证 dispatch_group_async 底层是 enter-leave;
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
struct dispatch_object_s *dou = dc->dc_data;
unsigned long type = dx_type(dou);
if (type == DISPATCH_GROUP_TYPE) { // 如果是调度组类型
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func); // block回调
_dispatch_trace_item_complete(dc);
dispatch_group_leave((dispatch_group_t)dou); // 出组
} else {
DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
}
⑥ dispatch_group 底层实现流程
- enter-leave 需要成对出现;
- dispatch_group_enter 在底层是通过 C++ 函数,对 group 的 value 进行 – 操作(即0 -> -1);
- dispatch_group_leave 在底层是通过 C++ 函数,对 group 的 value 进行 ++ 操作(即-1 -> 0);
- dispatch_group_notify 在底层主要是判断 group 的 state 是否等于0,当等于0时发出通知;
- block 任务的唤醒,可以通过 dispatch_group_leave,也可以通过 dispatch_group_notify;
- dispatch_group_async 等同于 enter - leave,其底层的实现就是 enter-leave。
七、源码下载
本文是在源码 libdispatch.dylib 中探索分析的,源码的下载地址:macOS 10.15 - Source。