1.1 初步剖析
不雅观察了下主线程堆栈,用到的锁是读写锁:
随后又去翻了下持有着锁的子线程,有各种各样的情形,且基本都处于正常的实行状态,例如有的处于打开文件状态,有的处于read状态,有的正在实行NSUserDefaults的方法···

通过不雅观察创造,出问题的线程都有QOS:BACKGROUND标记。整体看起来持有锁的子线程仍旧在实行,只是留给主线程的韶光不足了。为什么这些子线程在持有锁的情形下,须要实行这么久,直到主线程的 8s 卡去世?一种情形便是真的如此耗时,另一种则是涌现了优先级反转。
1.2 办理办法
这个案例里,持有读写锁且优先级低的线程迟迟得不到调度(又或者得到调度的时候又被抢占了,或者得到调度的时候韶光已然不足了)而具有高优先级的线程由于拿不到读写锁,一贯被壅塞,以是相互去世锁。iOS8之后引入了QualityOfService的观点,类似于线程的优先级,设置不同的QualityOfService的值后系统会分配不同的CPU韶光、网络资源和硬盘资源等,因此我们可以通过这个设置行列步队的优先级。
1.2.1方案一:去除对 NSOperationQueue 的优先级设置
在 Threading Programming Guide 文档中,苹果给出了提示:
Important: It is generally a good idea to leave the priorities of your threads at their default values. Increasing the priorities of some threads also increases the likelihood of starvation among lower-priority threads. If your application contains high-priority and low-priority threads that must interact with each other, the starvation of lower-priority threads may block other threads and create performance bottlenecks.
苹果的建议是不要随意修正线程的优先级,尤其是这些高低优先级线程之间存在临界资源竞争的情形。以是删除干系优先级设置代码即可办理问题。
1.2.2 方案二:临时修正线程优先级
在 pthread_rwlock_rdlock(3pthread) 创造了如下提示:
Realtime applications may encounter priority inversion when using read-write locks. The problem occurs when a high priority thread "locks" a read-write lock that is about to be "unlocked" by a low priority thread, but the low priority thread is preempted by a medium priority thread. This scenario leads to priority inversion; a high priority thread is blocked by lower priority threads for an unlimited period of time. During system design, realtime programmers must take into account the possibility of this kind of priority inversion. They can deal with it in a number of ways, such as by having critical sections that are guarded by read-write locks execute at a high priority, so that a thread cannot be preempted while executing in its critical section.
只管针对的是实时系统,但是还是有一些启迪和帮助。按照提示,对有问题的代码进行了修正:在线程通过 pthread_rwlock_wrlock 拿到 _rwlock 的时候,临时提升其优先级,在开释 _rwlock 之后,规复其原来的优先级。
- (id)remoteConfigWithAppID:(NSString )appID{ ....... pthread_rwlock_rdlock(&_rwlock); HMDHeimdallrConfig result = ....... // get existing config pthread_rwlock_unlock(&_rwlock); if(result == nil) { result = [[HMDHeimdallrConfig alloc] init]; // make a new config pthread_rwlock_wrlock(&_rwlock); qos_class_t oldQos = qos_class_self(); BOOL needRecover = NO; // 临时提升线程优先级 if (_enablePriorityInversionProtection && oldQos < QOS_CLASS_USER_INTERACTIVE) { int ret = pthread_set_qos_class_self_np(QOS_CLASS_USER_INTERACTIVE, 0); needRecover = (ret == 0); } ...... pthread_rwlock_unlock(&_rwlock); // 规复线程优先级 if (_enablePriorityInversionProtection && needRecover) { pthread_set_qos_class_self_np(oldQos, 0); } } return result;}
值得把稳的是,这里只能利用pthread的api,NSThread供应的API是不可行的
1.3 Demo 验证
为了验证上述的手动调度线程优先级是否有一定的效果,这里通过demo进行本地实验:定义了2000个operation(目的是为了CPU繁忙),优先级设置NSQualityOfServiceUserInitiated,且对个中可以被100整除的operation的优先级调度为NSQualityOfServiceBackground,在每个operation实行相同的耗时任务,然后对这当选中的10个operation进行耗时统计。
for (int j = 0; j < 2000; ++j) { NSOperationQueue operation = [[NSOperationQueue alloc] init]; operation.maxConcurrentOperationCount = 1; operation.qualityOfService = NSQualityOfServiceUserInitiated; // 模块1 // if (j % 100 == 0) { // operation.qualityOfService = NSQualityOfServiceBackground; // } // 模块1 [operation addOperationWithBlock:^{ // 模块2 // qos_class_t oldQos = qos_class_self(); // pthread_set_qos_class_self_np(QOS_CLASS_USER_INITIATED, 0); // 模块2 NSTimeInterval start = CFAbsoluteTimeGetCurrent(); double sum = 0; for (int i = 0; i < 100000; ++i) { sum += sin(i) + cos(i) + sin(i2) + cos(i2); } start = CFAbsoluteTimeGetCurrent() - start; if (j % 100 == 0) { printf("%.8f\n", start 1000); } // 模块2 // pthread_set_qos_class_self_np(oldQos, 0); // 模块2 }];}
统计信息如下表所示:
可以看到:
正常情形下,每个任务的均匀耗时为:11.8190561;当operation被设置为低优先级时,其耗时大幅度提升为:94.70210189;当operation被设置为低优先级时,又在Block中手动规复其原有的优先级,其耗时已经大幅度降落:15.04005137(耗时比正常情形高,大家可以思考下为什么)通过Demo可以创造,通过手动调度其优先级,低优先级任务的整体耗时得到大幅度的降落,这样在持有锁的情形下,可以减少对主线程的壅塞韶光。
1.4 上线效果
该问题的验证过程分为2个阶段:
第一个阶段如第1个红框所示,从3月6号开始在版本19.7上有较大幅度的低落,紧张缘故原由:堆栈中被等待的行列步队信息由QOS:BACKGROUND变为了com.apple.root.default-qos,行列步队的优先级从QOS_CLASS_BACKGROUND提升为QOS_CLASS_DEFAULT,相称于履行了方案一,利用了默认优先级。第二个阶段如第2个红框所示,从4月24号在版本20.3上开始验证。目前看起来效果暂时不明显,推测一个紧张缘故原由是:demo中是把优先级从QOS_CLASS_BACKGROUND提升为QOS_CLASS_USER_INITIATED,而线上相称于把行列步队的优先级从默认的优先级QOS_CLASS_DEFAULT提升为QOS_CLASS_USER_INITIATEDa. QOS_CLASS_BACKGROUND的Mach层级优先级数是4;
b. QOS_CLASS_DEFAULT的Mach层级优先级数是31;
c. QOS_CLASS_USER_INITIATED的Mach层级优先级数是37。
以是相对来说,线上的提升相对有限。
2. 深刻理解优先级反转那么是否所有锁都须要像上文一样,手动提升持有锁的线程优先级?系统是否会自动调度线程的优先级?如果有这样的机制,是否可以覆盖所有的锁?要理解这些问题,须要深刻认识优先级反转。
2.1 什么是优先级反转?
优先级反转,是指某同步资源被较低优先级的进程/线程所拥有,较高优先级的进程/线程竞争该同步资源未得到该资源,而使得较高优先级进程/线程反而推迟被调度实行的征象。根据壅塞类型的不同,优先级反转又被分为Bounded priority inversion和Unbounded priority inversion。
这里借助 Introduction to RTOS - Solution to Part 11 的图进行示意。
2.1.1 Bounded priority inversion
如图所示,高优先级任务(Task H)被持有锁的低优先级任务(Task L)壅塞,由于壅塞的韶光取决于低优先级任务在临界区的韶光(持有锁的韶光),以是被称为bounded priority inversion。只要Task L一贯持有锁,Task H就会一贯被壅塞,低优先级的任务运行在高优先级任务的前面,优先级被反转。
这里的任务也可以理解为线程
2.1.2 Unbounded priority inversion
在Task L持有锁的情形下,如果有一个中间优先级的任务(Task M)打断了Task L,前面的bounded就会变为unbounded,由于Task M只要抢占了Task L的CPU,就可能会壅塞Task H任意多的韶光(Task M可能不止1个)。
2.2 优先级反转常规办理思路
目前办理Unbounded priority inversion有2种方法:一种被称作优先权极限(priority ceiling protocol),另一种被称作优先级继续(priority inheritance)。
2.2.1 Priority ceiling protocol
在优先权极限方案中,系统把每一个临界资源与 1 个极限优先权干系联。当1个任务进入临界区时,系统便把这个极限优先权通报给这个任务,使得这个任务的优先权最高;当这个任务退出临界区后,系统立即把它的优先权规复正常,从而担保系统不会涌现优先权反转的情形。该极限优先权的值是由所有须要该临界资源的任务的最大优先级来决定的。
如图所示,锁的极限优先权是 3。当Task L持有锁的时候,它的优先级将会被提升到3,和Task H一样的优先级。这样就可以阻挡Task M(优先级是2)的运行,直到Task L和Task H不再须要该锁。
2.2.2 Priority inheritance
在优先级继续方案中,大致事理是:高优先级任务在考试测验获取锁的时候,如果该锁恰好被低优先级任务持有,此时会临时把高优先级线程的优先级转移给拥有锁的低优先级线程,使低优先级线程能更快的实行并开释同步资源,开释同步资源后再规复其原来的优先级。
priority ceiling protocol和priority inheritance都会在开释锁的时候,规复低优先级任务的优先级。同时要把稳,以上2种方法只能阻挡Unbounded priority inversion,而无法阻挡Bounded priority inversion(Task H必须等待Task L实行完毕才能实行,这个反转是无法避免的)。
可以通过以下几种发生来避免或者转移Bounded priority inversion:
减少临界区的实行韶光,减少Bounded priority inversion的反转耗时;避免利用会壅塞高优先级任务的临界区资源;专门利用一个行列步队来管理资源,避免利用锁。优先级继续必须是可通报的。举个栗子:当T1壅塞在被T2持有的资源上,而T2又壅塞在T3持有的一个资源上。如果T1的优先级高于T2和T3的优先级,T3必须通过T2继续T1的优先级。否则,如果其余一个优先级高于T2和T3,小于T1的线程T4,将抢占T3,引发相对付T1的优先级反转。因此,线程所继续的优先级必须是直接或者间接壅塞的线程的最高优先级。
3. 如何避免优先级反转?3.1 QoS 通报
iOS 系统紧张利用以下两种机制来在不同线程(或 queue)间通报 QoS:
机制 1:dispatch_async
dispatch_async() automatically propagates the QoS from the calling thread, though it will translate User Interactive to User Initiated to avoid assigning that priority to non-main threads.Capturedattimeofblocksubmission,translateuserinteractivetouserinitiated.UsedifdestinationqueuedoesnothaveaQoSanddoesnotlowertheQoS(exdispatch_asyncbacktothemainthread).机制 2:基于 XPC 的进程间通信(IPC)
系统的 QoS 通报规则比较繁芜,紧张参考以下信息:
当前哨程的 QoS如果是利用 dispatch_block_create() 方法天生的 dispatch_block,则考虑天生 block时所调用的参数dispatch_async 或 IPC 的目标 queue 或线程的 QoS调度程序会根据这些信息决定 block 以什么优先级运行。
如果没有其他线程同步地等待此 block,则 block就按上面所说的优先级来运行。如果涌现了线程间同步等待的情形,则调度程序会根据情形调度线程的运行优先级。3.2 如何触发优先级反转避免机制?
如果当前哨程因等待某线程(线程 1)上正在进行的操作(如 block1)而受阻,而系统知道 block1 所在的目标线程(owner),系统会通过提高干系线程的优先级来办理优先级反转的问题。反之如果系统不知道 block1 所在目标线程,则无法知道该当提高谁的优先级,也就无法办理反转问题;
记录了持有者信息(owner)的系统 API 如下:
pthread mutex、os_unfair_lock、以及基于这二者实现的上层APIa. dispatch_once 的实现是基于 os_unfair_lock的b. NSLock、NSRecursiveLock、@synchronized 等的实现是基于 pthreadmutexdispatch_sync、dispatch_waitxpc_connection_send_with_message_sync利用以上这些 API 能够在发生优先级反转时使系统启用优先级反转避免机制。
3.3 根本 API 验证
接下来对前文提到的各种「根本系统API」进行验证
测试验证环境:仿照器 iOS15.2
3.3.1 pthread mutex
pthread mutex的数据构造pthread_mutex_s个中有一个m_tid字段,专门来记录持有该锁的线程Id。
// types_internal.hstruct pthread_mutex_s { long sig; _pthread_lock lock; union { uint32_t value; struct pthread_mutex_options_s options; } mtxopts; int16_t prioceiling; int16_t priority;#if defined(__LP64__) uint32_t _pad;#endif union { struct { uint32_t m_tid[2]; // thread id of thread that has mutex locked uint32_t m_seq[2]; // mutex sequence id uint32_t m_mis[2]; // for misaligned locks m_tid/m_seq will span into here } psynch; struct _pthread_mutex_ulock_s ulock; };#if defined(__LP64__) uint32_t _reserved[4];#else uint32_t _reserved[1];#endif};
代码来验证一下:线程优先级是否会被提升?
// printThreadPriority用来打印线程的优先级信息void printThreadPriority() { thread_t cur_thread = mach_thread_self(); mach_port_deallocate(mach_task_self(), cur_thread); mach_msg_type_number_t thread_info_count = THREAD_INFO_MAX; thread_info_data_t thinfo; kern_return_t kr = thread_info(cur_thread, THREAD_EXTENDED_INFO, (thread_info_t)thinfo, &thread_info_count); if (kr != KERN_SUCCESS) { return; } thread_extended_info_t extend_info = (thread_extended_info_t)thinfo; printf("pth_priority: %d, pth_curpri: %d, pth_maxpriority: %d\n", extend_info->pth_priority, extend_info->pth_curpri, extend_info->pth_maxpriority);}
先在子线程上锁并休眠,然后主线程要求该锁。
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND, 0), ^{ printf("begin : \n"); printThreadPriority(); printf("queue before lock \n"); pthread_mutex_lock(&_lock); //确保 backgroundQueue 先得到锁 printf("queue lock \n"); printThreadPriority(); dispatch_async(dispatch_get_main_queue(), ^{ printf("before main lock\n"); pthread_mutex_lock(&_lock); printf("in main lock\n"); pthread_mutex_unlock(&_lock); printf("after main unlock\n"); }); sleep(10); printThreadPriority(); printf("queue unlock\n"); pthread_mutex_unlock(&_lock); printf("queue after unlock\n");});
begin : pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63queue before lock queue lock pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63before main lockpth_priority: 47, pth_curpri: 47, pth_maxpriority: 63queue unlockin main lockafter main unlockqueue after unlock
可以看到,低优先级子线程先持有锁,当时的优先级为4,而该锁被主线程要求的时候,子线程的优先级被提升为47
3.3.2 os_unfair_lock
os_unfair_lock用来更换OSSpinLock,办理优先级反转问题。等待os_unfair_lock锁的线程会处于休眠状态,从用户态切换到内核态,而并非忙等。os_unfair_lock将线程ID保存到了锁的内部,锁的等待者会把自己的优先级让出来,从而避免优先级反转。验证一下:
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND, 0), ^{ printf("begin : \n"); printThreadPriority(); printf("queue before lock \n"); os_unfair_lock_lock(&_unfair_lock); //确保 backgroundQueue 先得到锁 printf("queue lock \n"); printThreadPriority(); dispatch_async(dispatch_get_main_queue(), ^{ printf("before main lock\n"); os_unfair_lock_lock(&_unfair_lock); printf("in main lock\n"); os_unfair_lock_unlock(&_unfair_lock); printf("after main unlock\n"); }); sleep(10); printThreadPriority(); printf("queue unlock\n"); os_unfair_lock_unlock(&_unfair_lock); printf("queue after unlock\n"); });
begin : pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63queue before lock queue lock pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63before main lockpth_priority: 47, pth_curpri: 47, pth_maxpriority: 63queue unlockin main lockafter main unlockqueue after unlock
结果和pthread mutex同等。
3.3.3 pthread_rwlock_t
在 pthread_rwlock_init 有如下提示:
Caveats: Beware of priority inversion when using read-write locks. A high-priority thread may be blocked waiting on a read-write lock locked by a low-priority thread. The microkernel has no knowledge of read-write locks, and therefore can't boost the low-priority thread to prevent the priority inversion.
大意是内核不感知读写锁,无法提升低优先级线程的优先级,从而无法避免优先级反转。通过查询定义创造:pthread_rwlock_s包含了字段rw_tid,专门来记录持有写锁的线程,这不由令人好奇:为什么pthread_rwlock_s有owner信息却仍旧无法避免优先级反转?
struct pthread_rwlock_s { long sig; _pthread_lock lock; uint32_t unused:29, misalign:1, pshared:2; uint32_t rw_flags;#if defined(__LP64__) uint32_t _pad;#endif uint32_t rw_tid[2]; // thread id of thread that has exclusive (write) lock uint32_t rw_seq[4]; // rw sequence id (at 128-bit aligned boundary) uint32_t rw_mis[4]; // for misaligned locks rw_seq will span into here#if defined(__LP64__) uint32_t _reserved[34];#else uint32_t _reserved[18];#endif};
https://news.ycombinator.com/item?id=21751269 链接中提到:
xnu supports priority inheritance through "turnstiles", a kernel-internal mechanism which is used by default by a number of locking primitives (list at [1]), including normal pthread mutexes (though not read-write locks [2]), as well as the os_unfair_lock API (via the ulock syscalls). With pthread mutexes, you can actually explicitly request priority inheritance by calling pthread_mutexattr_setprotocol [3] with PTHREAD_PRIO_INHERIT; the Apple implementation supports it, but currently ignores the protocol setting and just gives all mutexes priority inheritance.
大意是:XNU利用 turnstiles 内核机制进行优先级继续,这种机制被运用在 pthread mutex 和 os_unfair_lock 上。
顺藤摸瓜,在ksyn_wait方法中找到了_kwq_use_turnstile的调用,个中的注释对读写锁阐明的比较委婉,添加了 at least sometimes
pthread mutexes and rwlocks both (at least sometimes) know their owner and can use turnstiles. Otherwise, we pass NULL as the tstore to the shims so they wait on the global waitq.
// libpthread/kern/kern_synch.cintksyn_wait(ksyn_wait_queue_t kwq, kwq_queue_type_t kqi, uint32_t lockseq, int fit, uint64_t abstime, uint16_t kwe_flags, thread_continue_t continuation, block_hint_t block_hint){ thread_t th = current_thread(); uthread_t uth = pthread_kern->get_bsdthread_info(th); struct turnstile tstore = NULL; int res; assert(continuation != THREAD_CONTINUE_NULL); ksyn_waitq_element_t kwe = pthread_kern->uthread_get_uukwe(uth); bzero(kwe, sizeof(kwe)); kwe->kwe_count = 1; kwe->kwe_lockseq = lockseq & PTHRW_COUNT_MASK; kwe->kwe_state = KWE_THREAD_INWAIT; kwe->kwe_uth = uth; kwe->kwe_thread = th; kwe->kwe_flags = kwe_flags; res = ksyn_queue_insert(kwq, kqi, kwe, lockseq, fit); if (res != 0) { //panic("psynch_rw_wrlock: failed to enqueue\n"); // XXX ksyn_wqunlock(kwq); return res; } PTHREAD_TRACE(psynch_mutex_kwqwait, kwq->kw_addr, kwq->kw_inqueue, kwq->kw_prepost.count, kwq->kw_intr.count); if (_kwq_use_turnstile(kwq)) { // pthread mutexes and rwlocks both (at least sometimes) know their // owner and can use turnstiles. Otherwise, we pass NULL as the // tstore to the shims so they wait on the global waitq. tstore = &kwq->kw_turnstile; } ......}
再去查看_kwq_use_turnstile的定义,代码还是很老实的,只有在KSYN_WQTYPE_MTX才会启用turnstile进行优先级反转保护,而读写锁的类型为KSYN_WQTYPE_RWLOCK,这解释读写锁不会利用_kwq_use_turnstile,以是无法避免优先级反转。
#define KSYN_WQTYPE_MTX 0x01#define KSYN_WQTYPE_CVAR 0x02#define KSYN_WQTYPE_RWLOCK 0x04#define KSYN_WQTYPE_SEMA 0x08static inline bool_kwq_use_turnstile(ksyn_wait_queue_t kwq){ // If we had writer-owner information from the // rwlock then we could use the turnstile to push on it. For now, only // plain mutexes use it. return (_kwq_type(kwq) == KSYN_WQTYPE_MTX);}
其余在_pthread_find_owner也可以看到,读写锁的owner是0
void_pthread_find_owner(thread_t thread, struct stackshot_thread_waitinfo waitinfo){ ksyn_wait_queue_t kwq = _pthread_get_thread_kwq(thread); switch (waitinfo->wait_type) { case kThreadWaitPThreadMutex: assert((kwq->kw_type & KSYN_WQTYPE_MASK) == KSYN_WQTYPE_MTX); waitinfo->owner = thread_tid(kwq->kw_owner); waitinfo->context = kwq->kw_addr; break; / Owner of rwlock not stored in kernel space due to races. Punt and hope that the userspace address is helpful enough. / case kThreadWaitPThreadRWLockRead: case kThreadWaitPThreadRWLockWrite: assert((kwq->kw_type & KSYN_WQTYPE_MASK) == KSYN_WQTYPE_RWLOCK); waitinfo->owner = 0; waitinfo->context = kwq->kw_addr; break; / Condvars don't have owners, so just give the userspace address. / case kThreadWaitPThreadCondVar: assert((kwq->kw_type & KSYN_WQTYPE_MASK) == KSYN_WQTYPE_CVAR); waitinfo->owner = 0; waitinfo->context = kwq->kw_addr; break; case kThreadWaitNone: default: waitinfo->owner = 0; waitinfo->context = 0; break; }}
把锁改换为读写锁,验证一下前面的理论是否精确:
pthread_rwlock_init(&_rwlock, NULL);dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND, 0), ^{ printf("begin : \n"); printThreadPriority(); printf("queue before lock \n"); pthread_rwlock_rdlock(&_rwlock); //确保 backgroundQueue 先得到锁 printf("queue lock \n"); printThreadPriority(); dispatch_async(dispatch_get_main_queue(), ^{ printf("before main lock\n"); pthread_rwlock_wrlock(&_rwlock); printf("in main lock\n"); pthread_rwlock_unlock(&_rwlock); printf("after main unlock\n"); }); sleep(10); printThreadPriority(); printf("queue unlock\n"); pthread_rwlock_unlock(&_rwlock); printf("queue after unlock\n");});
begin : pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63queue before lock queue lock pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63before main lockpth_priority: 4, pth_curpri: 4, pth_maxpriority: 63queue unlockqueue after unlockin main lockafter main unlock
可以看到读写锁不会发生优先级提升。
3.3.4 dispatch_sync
这个API都比较熟习了,这里直接验证:
// 当前哨程为主线程dispatch_queue_attr_t qosAttribute = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_BACKGROUND, 0);_queue = dispatch_queue_create("com.demo.test", qosAttribute);printThreadPriority();dispatch_async(_queue, ^{ printf("dispatch_async before dispatch_sync : \n"); printThreadPriority();});dispatch_sync(_queue, ^{ printf("dispatch_sync: \n"); printThreadPriority();});dispatch_async(_queue, ^{ printf("dispatch_async after dispatch_sync: \n"); printThreadPriority();});
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63 dispatch_async before dispatch_sync : pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63dispatch_sync: pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63dispatch_async after dispatch_sync: pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
_queue是一个低优先级行列步队(QOS_CLASS_BACKGROUND),可以看到dispatch_sync调用压入行列步队的任务,以及在这之前dispatch_async压入的任务,都被提升到较高的优先级47(和主线程同等),而末了一个dispatch_async的任务则以优先级4来实行。
3.3.5 dispatch_wait
// 当前哨程为主线程dispatch_queue_attr_t qosAttribute = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_BACKGROUND, 0);_queue = dispatch_queue_create("com.demo.test", qosAttribute);printf("main thread\n");printThreadPriority();dispatch_block_t block = dispatch_block_create(DISPATCH_BLOCK_INHERIT_QOS_CLASS, ^{ printf("sub thread\n"); sleep(2); printThreadPriority();});dispatch_async(_queue, block);dispatch_wait(block, DISPATCH_TIME_FOREVER);
_queue是一个低优先级行列步队(QOS_CLASS_BACKGROUND),当在当前主线程利用dispatch_wait进行等待时,输出如下,低优先级的任务被提升到优先级47
main threadpth_priority: 47, pth_curpri: 47, pth_maxpriority: 63sub threadpth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
而如果将dispatch_wait(block, DISPATCH_TIME_FOREVER)注释掉之后,输出如下:
main threadpth_priority: 47, pth_curpri: 47, pth_maxpriority: 63sub threadpth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
值得把稳的是,dispatch_wait是一个宏(C11的泛型),或者是一个入口函数,它可以接管dispatch_block_t,dispatch_group_t,dispatch_semaphore_t 3种类型的参数,但是这里的详细含义该当是指dispatch_block_wait,只有dispatch_block_wait会调度优先级,避免优先级反转。
intptr_tdispatch_wait(void object, dispatch_time_t timeout);#if __has_extension(c_generic_selections)#define dispatch_wait(object, timeout) \ _Generic((object), \ dispatch_block_t:dispatch_block_wait, \ dispatch_group_t:dispatch_group_wait, \ dispatch_semaphore_t:dispatch_semaphore_wait \ )((object),(timeout))#endif
3.4 神秘的旗子暗记量
3.4.1 dispatch_semaphore
之前对dispatch_semaphore的认知非常浅薄,常常把二值旗子暗记量和互斥锁划等号。但是通过调研后创造:dispatch_semaphore 没有 QoS 的观点,没有记录当前持有旗子暗记量的线程(owner),以是有高优先级的线程在等待锁时,内核无法知道该提高哪个线程的调试优先级(QoS)。如果锁持有者优先级比其他线程低,高优先级的等待线程将一贯等待。Mutexvs Semaphore: What’s the Difference? 一文详细比对了Mutex和Semaphore之间的差异。
Semaphores are for signaling (sames a condition variables, events) while mutexes are for mutual exclusion. Technically, you can also use semaphores for mutual exclusion (a mutex can be thought as a binary semaphore) but you really shouldn't.
Right, but libdispatch doesn't have a mutex. It has semaphores and queues. So if you're trying to use libdispatch and you don't want the closure-based aspect of queues, you might be tempted to use a semaphore instead. Don't do that, use os_unfair_lock or pthread_mutex (or a higher-level construct like NSLock) instead.
这些是一些警示,可以看到dispatch_semaphore十分危险,利用须要特殊小心。
这里通过苹果官方供应的demo进行阐明:
__block NSString taskName = nil;dispatch_semaphore_t sema = dispatch_semaphore_create(0); [self.connection.remoteObjectProxy requestCurrentTaskName:^(NSString task) { taskName = task; dispatch_semaphore_signal(sema); }]; dispatch_semaphore_wait(sema, DISPATCH_TIME_FOREVER); return taskName;
假设在主线程实行这段代码,那么当前哨程的优先级是QOS_CLASS_USER_INTERACTIVE;由于从主线程进行了异步,异步任务行列步队的QoS将会被提升为QOS_CLASS_USER_INITIATED;主线程被旗子暗记量sema壅塞,而卖力开释该旗子暗记量的异步任务的优先级QOS_CLASS_USER_INITIATED低于主线程的优先级QOS_CLASS_USER_INTERACTIVE,因此可能会发生优先级反转。
值得一提的是,Clang专门针对这种情形进行了静态检测:https://github.com/llvm-mirror/clang/blob/master/lib/StaticAnalyzer/Checkers/GCDAntipatternChecker.cpp
static auto findGCDAntiPatternWithSemaphore() -> decltype(compoundStmt()) { const char SemaphoreBinding = "semaphore_name"; auto SemaphoreCreateM = callExpr(allOf( callsName("dispatch_semaphore_create"), hasArgument(0, ignoringParenCasts(integerLiteral(equals(0)))))); auto SemaphoreBindingM = anyOf( forEachDescendant( varDecl(hasDescendant(SemaphoreCreateM)).bind(SemaphoreBinding)), forEachDescendant(binaryOperator(bindAssignmentToDecl(SemaphoreBinding), hasRHS(SemaphoreCreateM)))); auto HasBlockArgumentM = hasAnyArgument(hasType( hasCanonicalType(blockPointerType()) )); auto ArgCallsSignalM = hasAnyArgument(stmt(hasDescendant(callExpr( allOf( callsName("dispatch_semaphore_signal"), equalsBoundArgDecl(0, SemaphoreBinding) ))))); auto HasBlockAndCallsSignalM = allOf(HasBlockArgumentM, ArgCallsSignalM); auto HasBlockCallingSignalM = forEachDescendant( stmt(anyOf( callExpr(HasBlockAndCallsSignalM), objcMessageExpr(HasBlockAndCallsSignalM) ))); auto SemaphoreWaitM = forEachDescendant( callExpr( allOf( callsName("dispatch_semaphore_wait"), equalsBoundArgDecl(0, SemaphoreBinding) ) ).bind(WarnAtNode)); return compoundStmt( SemaphoreBindingM, HasBlockCallingSignalM, SemaphoreWaitM);}
如果想利用该功能,只须要打开xcode设置即可:
其余,dispatch_group 跟 semaphore 类似,在调用 enter() 方法时,无法预知谁会调用 leave(),以是系统也无法知道其 owner是谁,以是同样不会有优先级提升的问题。
3.4.2 旗子暗记量卡去世现身说法
dispatch_semaphore给笔者的印象非常深刻,之前写过一段这样的代码:利用旗子暗记量在主线程同步等待相机授权结果。
__block BOOL auth = NO;dispatch_semaphore_t semaphore = dispatch_semaphore_create(0);[KTAuthorizeService requestAuthorizationWithType:KTPermissionsTypeCamera completionHandler:^(BOOL allow) { auth = allow; dispatch_semaphore_signal(semaphore);}];dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
上线后长期霸占卡去世top1,当时百思不得其解,在深入理解到旗子暗记量无法避免优先级反转后,终于豁然开朗,一扫之前心中的阴霾。
这类问题一样平常通过2种办法来办理:
1. 利用同步API
BOOL auth = [KTAuthorizeService authorizationWithType:KTPermissionsTypeCamera];// do something next
2. 异步回调,不要在当前哨程等待
[KTAuthorizeService requestAuthorizationWithType:KTPermissionsTypeCamera completionHandler:^(BOOL allow) { BOOL auth = allow; // do something next via callback}];
4. 几个观点
4.1 turnstile
前文提到XNU利用turnstile进行优先级继续,这里对turnstile机制进行大略的描述和理解。在XNU内核中,存在着大量的同步工具(例如lck_mtx_t),为理解决优先级反转的问题,每个同步工具都必须对应一个分离的数据构造来掩护大量的信息,例如壅塞在这个同步工具上的线程行列步队。可以想象一下,如果每个同步工具都要分配一个这样的数据构造,将造成极大的内存摧残浪费蹂躏。
为理解决这个问题,XNU采取了turnstile机制,一种空间利用率很高的办理方案。该方案的提出依据是同一个线程在同一时候不能同时壅塞于多个同步工具上。这一事实许可所有同步工具只须要保留一个指向turnstile的指针,且在须要的时候去分配一个turnstile即可,而turnstile则包含了操作一个同步工具须要的所有信息,例如壅塞线程的行列步队、拥有这个同步工具的线程指针。turnstile是从池中动态分配的,这个池的大小会随着系统中已分配的线程数目增加而增加,以是turnstile总数将始终低于或即是线程数,这也决定了turnstile的数目是可控的。turnstile由壅塞在该同步工具上的第一个线程卖力分配,当没有更多线程壅塞在该同步工具上,turnstile会被开释,回收到池中。
turnstile的数据构造如下:
struct turnstile { struct waitq ts_waitq; / waitq embedded in turnstile / turnstile_inheritor_t ts_inheritor; / thread/turnstile inheriting the priority (IL, WL) / union { struct turnstile_list ts_free_turnstiles; / turnstile free list (IL) / SLIST_ENTRY(turnstile) ts_free_elm; / turnstile free list element (IL) / }; struct priority_queue_sched_max ts_inheritor_queue; / Queue of turnstile with us as an inheritor (WL) / union { struct priority_queue_entry_sched ts_inheritor_links; / Inheritor queue links / struct mpsc_queue_chain ts_deallocate_link; / thread deallocate link / }; SLIST_ENTRY(turnstile) ts_htable_link; / linkage for turnstile in global hash table / uintptr_t ts_proprietor; / hash key lookup turnstile (IL) / os_refcnt_t ts_refcount; / reference count for turnstiles / _Atomic uint32_t ts_type_gencount; / gen count used for priority chaining (IL), type of turnstile (IL) / uint32_t ts_port_ref; / number of explicit refs from ports on send turnstile / turnstile_update_flags_t ts_inheritor_flags; / flags for turnstile inheritor (IL, WL) / uint8_t ts_priority; / priority of turnstile (WL) /#if DEVELOPMENT || DEBUG uint8_t ts_state; / current state of turnstile (IL) / queue_chain_t ts_global_elm; / global turnstile chain / thread_t ts_thread; / thread the turnstile is attached to / thread_t ts_prev_thread; / thread the turnstile was attached before donation /#endif};
4.2 优先级数值
在验证环节有一些优先级数值,这里借助「Mac OS® X and iOS Internals」阐明一下:实验中涉及到的优先级数值都是相对付Mach层而言的,且都是用户线程数值。
用户线程的优先级是0~63;a. NSQualityOfServiceBackground的Mach层级优先级数是4;b. NSQualityOfServiceUtility的Mach层级优先级数是20;c. NSQualityOfServiceDefault的Mach层级优先级数是31;d. NSQualityOfServiceUserInitiated的Mach层级优先级数是37;e. NSQualityOfServiceUserInteractive的Mach层级优先级是47。内核线程的优先级是80~95;实时系统线程的优先级是96~127;64~79被保留给系统利用。5. 总结本文紧张阐述了优先级反转的一些观点和解决思路,并结合iOS平台的几种锁进行了详细的调研。通过深入的理解,可以去规避一些不必要的优先级反转,从而进一步避免卡去世非常。字节跳动 APM团队也针对线程的优先级做了监控处理,进而达到创造和预防优先级反转的目的。
6. 参考文档WWDC18 What' s New in LLVM - actorsfithttps://developer.apple.com/videos/play/wwdc2015/718https://developer.apple.com/forums/thread/124155https://developer.apple.com/library/archive/documentation/Cocoa/Conceptual/Multithreading/CreatingThreads/CreatingThreads.htmlhttps://developer.apple.com/library/archive/documentation/Performance/Conceptual/EnergyGuide-iOS/PrioritizeWorkWithQoS.htmlhttps://github.com/llvm-mirror/clang/blob/google/stable/lib/StaticAnalyzer/Checkers/GCDAntipatternChecker.cppDon't use dispatch semaphores where mutexes (or dispatch queues) would sufficeConcurrency Problems Written by Scott Groschhttps://www.jianshu.com/p/af64e05de503https://pubs.opengroup.org/onlinepubs/7908799/xsh/pthread_rwlock_wrlock.htmliOS 中各种“锁”的理解及运用不再安全的 OSSpinLockhttps://blog.actorsfit.com/a?ID=00001-499b1c8e-8a7f-4960-a1c1-c8e2f42c08c6https://objccn.io/issue-2-1/#Priority-InversionIntroduction to RTOS - Solution to Part 11 (Priority Inversion)https://threadreaderapp.com/thread/1229999590482444288.html#深入理解 iOS 中的锁Threads can infect each other with their low priority7. 关于我们字节跳动 APM 中台致力于提升全体集团内全系产品的性能和稳定性表现,技能栈覆盖 iOS / Android / Server / Web / Hybrid / PC / 游戏 / 小程序等,事情内容包括但不限于性能稳定性监控,问题排查,深度优化,防劣化等。长期期望为业界输出更多更有培植性的问题创造和深度优化手段。