使用多线程时,要注意防止不同线程意外修改数据的问题,下面总结一下常用的同步方式。
原子操作
原子操作是同步的一个简单的形式,它处理简单的数据类型。其优势是不妨碍竞争的线程。对于简单操作,比如递增一个计数器,原子操作比使用锁具有更高的性能优势。
void retain() {
// retainCount 为实例变量,相当于 retainCount++
OSAtomicIncrement64(&retainCount);
}
void release() {
unit32_t originalValue;
// 相当于 retainCount--
orginalValue = OSAtomicDecrement64(&retainCount);
if (originalValue == 1) {
this->free();
}
}
查看支持原子操作的列表,参阅/user/include/libkern/OSAtomic.h头文件和参见 atomic主页。
顺带提一下 iOS 中常用的属性关键字 atomic,它是默认值,能够保证线程安全,同时我们为了提高效率,在单线程下都会给变量属性声明为 nonatomic。
id objc_getProperty(id self, SEL _cmd, ptrdiff_t offset, BOOL atomic) {
if (offset == 0) {
return object_getClass(self);
}
// Retain release world
id *slot = (id*) ((char*)self + offset);
if (!atomic) return *slot;
// Atomic retain release world
spinlock_t& slotlock = PropertyLocks[slot];
slotlock.lock();
id value = objc_retain(*slot);
slotlock.unlock();
// for performance, we (safely) issue the autorelease OUTSIDE of the spinlock.
return objc_autoreleaseReturnValue(value);
}
void objc_setProperty_nonatomic(id self, SEL _cmd, id newValue, ptrdiff_t offset)
{
reallySetProperty(self, _cmd, newValue, offset, false, false, false);
}
static inline void reallySetProperty(id self, SEL _cmd, id newValue, ptrdiff_t offset, bool atomic, bool copy, bool mutableCopy)
{
if (offset == 0) {
object_setClass(self, newValue);
return;
}
id oldValue;
id *slot = (id*) ((char*)self + offset);
if (copy) {
newValue = [newValue copyWithZone:nil];
} else if (mutableCopy) {
newValue = [newValue mutableCopyWithZone:nil];
} else {
if (*slot == newValue) return;
newValue = objc_retain(newValue);
}
if (!atomic) {
oldValue = *slot;
*slot = newValue;
} else {
spinlock_t& slotlock = PropertyLocks[slot];
slotlock.lock();
oldValue = *slot;
*slot = newValue;
slotlock.unlock();
}
objc_release(oldValue);
}
Runtime 源码如上,如果一个线程调用 getter 方法获取到 atomic 属性的变量后,另一个线程调用 setter 方法,atomic 能保证 getter 方法的引用计数增加,即使另一个线程调用 setter 方法 release oldValue,也就能保证 getter 方法获取到的 oldValue 变量依然没有被释放。
内存屏障(Memory Barrier)
内存屏障(Memory Barrier)是用来确保内存操作按照正确顺序工作的非阻塞同步工具。内存屏障的作用就像一个栅栏,迫使处理器来完成位于障碍前面的任何加载和存储操作,才允许它执行位于屏障之后的加载和存储操作。
Volatile 变量适用于独立变量的另一个内存限制类型。编译器优化代码通过加载这些变量的值进入寄存器。对于本地变量,这通常不会有什么问题。但是如果一个变量对另外一个线程可见,那么这种优化可能会阻止其他线程发现变量的任何变化。在变量之前加上关键字 volatile 可以强制编译器每次使用变量的时候都从内存里面加载。如果一个变量的值随时可能给编译器无法检测的外部源更改,那么你可以把该变量声明为 volatile 变量。
volatile 使用场景有:
并行设备的硬件寄存器(如:状态寄存器);一个中断服务子程序中会访问到的非自动变量(Non-automatic variables);多线程应用中被几个任务共享的变量
因为内存屏障和 volatile 变量降低了编译器可执行的优化,因此应该谨慎使用它们,只在有需要的地方时候,以确保正确性。关于更多使用内存屏障的信息,参阅 OSMemoryBarrier 主页。
// ...
OSMemoryBarrier();
// ...
锁 - 互斥锁
被这个锁保护的临界区只允许一个线程进入,其他线程如果没有获取得到锁权限,只能等待
POSIX 互斥锁在很多程序里面很容易使用
pthread_mutex_t mutex;
void MyInitFunction() {
pthread_mutex_init(&mutex, NULL);
}
void MyLockingFunction() {
pthread_mutex_lock(&mutex);
// do something ...
pthread_mutex_unlock(&mutex);
}
NSLock 中实现了一个简单的互斥锁
NSLock *theLock = [[NSLock alloc] init];
if ([theLock lock]) {
// do something ...
[theLock unlock];
}
除了标准的锁行为,NSLock 类还增加了 tryLock 和 lockBeforeDate: 方法。方法 tryLock 试图获取一个锁,但是如果锁不可用的时候,它不会阻塞线程。相反,它只是返回 NO。而 lockBeforeDate: 方法试图获取一个锁,但是如果锁没有在规定的时间内被获得,它会让线程从阻塞状态变为非阻塞状态(或者返回 NO )。
BOOL moreToDo = YES;
NSLock *theLock = [[NSLock alloc] init];
// ...
while (moreToDo) {
// do another increment of calculation, until there’s no more to do.
if ([theLock tryLock]) {
// do something ...
[theLock unlock];
}
}
@synchronized 指令
@synchronized 是也创建一个互斥锁非常方便的方法。作为一种预防措施,@synchronized 块隐式的添加一个异常处理例程来保护代码。该处理例程会在异常抛出的时候自动的释放互斥锁。这意味着为了使用@synchronized指令,你必须在你的代码中启用异常处理。了如果你不想让隐式的异常处理例程带来额外的开销,你应该考虑使用锁的类。
- (void)myMethod:(id)anObj {
@synchronized(anObj) {
// Everything between the braces is protected by the @synchronized directive.
}
}
官方文档 告诉我们 @synchronized 在被保护的代码上暗暗的添加了一个异常处理,为的是同步对象抛出异常后能把锁释放掉,可想而知,这个锁的效率是不高的。其实现方式如下
@try {
objc_sync_enter(obj);
// do work
} @finally {
objc_sync_exit(obj);
}
WWDC 早期的一次大会中推荐我们在实现单例的时候使用 GCD 的 dispatch_once 代替 @synchronized,我们从源码分析上来看一下二者的区别
先把 @synchronized 往祖坟里刨一刨
int objc_sync_enter(id obj)
{
int result = OBJC_SYNC_SUCCESS;
if (obj) {
SyncData* data = id2data(obj, ACQUIRE);
data->mutex.lock();
}
return result;
}
int objc_sync_exit(id obj)
{
int result = OBJC_SYNC_SUCCESS;
if (obj) {
SyncData* data = id2data(obj, RELEASE);
bool okay = data->mutex.tryUnlock();
if (!okay) {
result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
}
}
return result;
}
SyncData 中的定义如下
typedef struct SyncData {
struct SyncData* nextData;
DisguisedPtr<objc_object> object;
int32_t threadCount; // number of THREADS using this block
recursive_mutex_t mutex;
} SyncData;
struct SyncList {
SyncData *data;
spinlock_t lock;
};
#define LOCK_FOR_OBJ(obj) sDataLists[obj].lock
#define LIST_FOR_OBJ(obj) sDataLists[obj].data
static StripedMap<SyncList> sDataLists;
static SyncData* id2data(id object, enum usage why)
{
spinlock_t *lockp = &LOCK_FOR_OBJ(object);
SyncData **listp = &LIST_FOR_OBJ(object);
SyncData* result = NULL;
lockp->lock();
{
SyncData* p;
SyncData* firstUnused = NULL;
for (p = *listp; p != NULL; p = p->nextData) {
if ( p->object == object ) {
result = p;
OSAtomicIncrement32Barrier(&result->threadCount);
goto done;
}
if ( (firstUnused == NULL) && (p->threadCount == 0) )
firstUnused = p;
}
if ( firstUnused != NULL ) {
result = firstUnused;
result->object = (objc_object *)object;
result->threadCount = 1;
goto done;
}
}
result = (SyncData*)calloc(sizeof(SyncData), 1);
result->object = (objc_object *)object;
result->threadCount = 1;
new (&result->mutex) recursive_mutex_t();
result->nextData = *listp;
*listp = result;
done:
lockp->unlock();
return result;
}
调用 objc_sync_enter(obj) 后,obj 会在哈希表对应的链表中查询对应的对象,查询不到则创建一个新对象加入到链表,然后新对象上加锁;调用 objc_sync_exit 后试图解锁,来保证线程安全。
再看下 dispatch_once 源码
void dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
struct Block_basic *bb = (void *)block;
dispatch_once_f(val, block, (void *)bb->Block_invoke);
}
void dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func) {
struct _dispatch_once_waiter_s * volatile *vval = (struct _dispatch_once_waiter_s**)val;
struct _dispatch_once_waiter_s dow = { NULL, 0 };
struct _dispatch_once_waiter_s *tail, *tmp;
_dispatch_thread_semaphore_t sema;
if (__sync_bool_compare_and_swap(vval, NULL, &dow)) {
dispatch_atomic_acquire_barrier();
_dispatch_client_callout(ctxt, func);
dispatch_atomic_maximally_synchronizing_barrier();
tmp = __sync_swap(vval, DISPATCH_ONCE_DONE);
tail = &dow;
while (tail != tmp) {
while (!tmp->dow_next) {
_dispatch_hardware_pause();
}
sema = tmp->dow_sema;
tmp = (struct _dispatch_once_waiter_s*)tmp->dow_next;
_dispatch_thread_semaphore_signal(sema);
}
}
else
{
dow.dow_sema = _dispatch_get_thread_semaphore();
for (;;) {
tmp = *vval;
if (tmp == DISPATCH_ONCE_DONE) {
break;
}
dispatch_atomic_store_barrier();
if (__sync_bool_compare_and_swap(vval, tmp, &dow)) {
dow.dow_next = tmp;
_dispatch_thread_semaphore_wait(dow.dow_sema);
}
}
_dispatch_put_thread_semaphore(dow.dow_sema);
}
}
struct _dispatch_once_waiter_s {
volatile struct _dispatch_once_waiter_s *volatile dow_next;
_dispatch_thread_semaphore_t dow_sema;
};
#define DISPATCH_ONCE_DONE ((struct _dispatch_once_waiter_s *)~0l)
当第一个线程调用 dispatch_once 时,vval 的值是 NULL,完成后设置其为 DISPATCH_ONCE_DONE,这个过程中间其他线程再调用该方法则会等待,等待第一个线程执行完成后唤醒各线程。dispatch_once 使用内存屏障和信号量结合实现线程同步。
NSConditionLock 对象
它定义了一个互斥锁,可以使用特定值来锁住和解锁。不要把该类型的锁和条件(参见“条件”部分)混淆了。它的行为和条件有点类似,但是它们的实现非常不同。
通常,当多线程需要以特定的顺序来执行任务的时候,你可以使用一个 NSConditionLock 对象,比如当一个线程生产数据,而另外一个线程消费数据。生产者执行时,消费者使用由你程序指定的条件来获取锁(条件本身是一个你定义的整形值)。当生产者完成时,它会解锁该锁并设置锁的条件为合适的整形值来唤醒消费者线程,之后消费线程继续处理数据。
NSConditionLock 的锁住和解锁方法可以任意组合使用。比如,你可以使用 unlockWithCondition: 和 lock 消息,或使用 lockWhenCondition: 和 unlock 消息。当然,后面的组合可以解锁一个锁但是可能没有释放任何等待某特定条件值的线程。
// Common
id condLock = [[NSConditionLock alloc] initWithCondition:NO_DATA];
// Thread A,生产者
while(true) {
[condLock lockWhenCondition:NO_DATA];
// 生产数据
[condLock unlockWithCondition:HAS_DATA];
}
// Thread B,消费者
while (true) {
[condLock lockWhenCondition:HAS_DATA];
//消费
[condLock unlockWithCondition:NO_DATA];
}
锁 - 递归锁
NSRecursiveLock 类定义的锁可以在同一线程多次获得,而不会造成死锁,即多次调用不会阻塞已获取该锁的线程。一个递归锁会跟踪它被多少次成功获得了。每次成功的获得该锁都必须平衡调用锁住和解锁的操作。只有所有的锁住和解锁操作都平衡的时候,锁才真正被释放给其他线程获得。
NSRecursiveLock *theLock = [[NSRecursiveLock alloc] init];
void MyRecursiveFunction(int value) {
[theLock lock];
if (value != 0) {
--value;
MyRecursiveFunction(value);
}
[theLock unlock];
}
MyRecursiveFunction(5);
如果把 NSRecursiveLock 换成 NSLock 就会出现死锁,由于第一次调用 MyRecursiveFunction(5) 时,加锁,该函数内部递归调用自己,第二次调用自己的时候,由于之前的锁还没有解锁,它会等待锁解除从而导致死锁。
正如它名字所言,这种类型的锁通常被用在一个递归函数里面来防止递归造成阻塞线程。你可以类似的在非递归的情况下使用他来调用函数,这些函数的语义要求它们使用锁。以下是一个简单递归函数,它在递归中获取锁。如果你不在该代码里使用 NSRecursiveLock 对象,当函数被再次调用的时候线程将会出现死锁。
锁 - 读写锁
// TODO: 待完善…
锁 - 分布锁
NSDistributedLock 分布锁,文件方式实现,可以跨进程用 tryLock 方法获取锁。用 unlock 方法释放锁。
NSDistributedLock 类可以被多台主机上的多个应用程序使用来限制对某些共享资源的访问,比如一个文件。锁本身是一个高效的互斥锁,它使用文件系统项目来实现,比如一个文件或目录。对于一个可用的NSDistributedLock对象,锁必须由所有使用它的程序写入。这通常意味着把它放在文件系统,该文件系统可以被所有运行在计算机上面的应用程序访问。
不像其他类型的锁,NSDistributedLock 并没有实现 NSLocking 协议,所有它没有lock 方法。一个 lock 方法将会阻塞线程的执行,并要求系统以预定的速度轮询锁。以其在你的代码中实现这种约束,NSDistributedLock 提供了一个 tryLock 方法,并让你决定是否轮询。
因为它使用文件系统来实现,一个 NSDistributedLock 对象不会被释放除非它的拥有者显式的释放它。如果你的程序在用户一个分布锁的时候崩溃了,其他客户端简无法访问该受保护的资源。在这种情况下,你可以使用 breadLock 方法来打破现存的锁以便你可以获取它。但是通常应该避免打破锁,除非你确定拥有进程已经死亡并不可能再释放该锁。
和其他类型的锁一样,当你使用 NSDistributedLock 对象时,你可以通过调用 unlock 方法来释放它。
锁 - 自旋锁
自旋锁:当上一个线程的任务没有执行完毕的时候(被锁住),那么下一个线程会一直等待(不会睡眠),当上一个线程的任务执行完毕,下一个线程会立即执行。
互斥锁:当上一个线程的任务没有执行完毕的时候(被锁住),那么下一个线程会进入睡眠状态等待任务执行完毕,当上一个线程的任务执行完毕,下一个线程会自动唤醒然后执行任务。
自旋锁在被锁后,下一个线程会一直等待上一个线程执行完毕,互斥锁则会进入睡眠状态等待。因为自旋不会引起调用者睡眠,所以效率高于互斥锁,它适合不耗时的操作,否则他一直占用 CPU 使 CPU 效率降低。
OSSpinLock spinlock = OS_SPINLOCK_INIT;
OSSpinLockLock(&spinlock);
// do something ...
OSSpinLockUnlock(&spinlock);
OSSpinLock 在iOS 10 上已被废弃,建议用 os_unfair_lock 来替代
锁 - 双重检查锁
// TODO: 待完善…
信号量
GCD提供一种信号的机制,使用它我们可以创建“锁”,这个地方加引号是因为它不是锁。
我们把信号量当作是一个计数器,dispatch_semaphore_wait 等待信号,当信号总量为 0 时则一直等待,否则就可以正常的执行,并让信号总量 -1;dispatch_semaphore_signal 是发送一个信号,让信号总量加 1。
// 实例类 person
Person *person = [[Person alloc] init];
// 创建并设置信号量
dispatch_semaphore_t semaphore = dispatch_semaphore_create(1);
// 线程A
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
[person personA];
[NSThread sleepForTimeInterval:5];
dispatch_semaphore_signal(semaphore);
});
// 线程B
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
[person personB];
dispatch_semaphore_signal(semaphore);
});
条件
条件是信号量的另外一个形式,它允许在条件为真的时候线程间互相发送信号。
事件队列可能使用条件变量来给等待线程发送信号,此时它们在事件队列中的时候。如果一个事件到达时,队列将给条件发送合适信号。如果一个线程已经处于等待,它会被唤醒,届时它将会取出事件并处理它。如果两个事件到达队列的时间大致相同,队列将会发送两次信号唤醒两个线程。
条件 - 使用 NSCondition 类
// Thread A,为等待一个 NSCondition 对象的事件序列。
// cocaoCondition 变量包含了一个 NSCondition 对象
[cocoaCondition lock];
// timeToDoWork 变量是一个整形,它在其他线程里面发送条件信号时立即递增。
while (timeToDoWork <= 0) {
[cocoaCondition wait];
}
timeToDoWork--;
// do some real work here ...
[cocoaCondition unlock];
// Thread B,条件发送信号代码,并递增他断言变量。你应该在给它发送信号前锁住条件。
[cocoaCondition lock];
timeToDoWork++;
[cocoaCondition signal];
[cocoaCondition unlock];
条件 - 使用 POSIX 条件
pthread_mutex_t mutex;
pthread_cond_t condition;
Boolean ready_to_go = true;
void MyCondInitFunction() {
pthread_mutex_init(&mutex);
pthread_cond_init(&condition, NULL);
}
// Thread A
void MyWaitOnConditionFunction() {
// Lock the mutex.
pthread_mutex_lock(&mutex);
// If the predicate is already set, then the while loop is bypassed;
// otherwise, the thread sleeps until the predicate is set.
while(ready_to_go == false) {
pthread_cond_wait(&condition, &mutex);
}
// Do work. (The mutex should stay locked.)
// Reset the predicate and release the mutex.
ready_to_go = false;
pthread_mutex_unlock(&mutex);
}
// Thread B
void SignalThreadUsingCondition() {
// At this point, there should be work for the other thread to do.
pthread_mutex_lock(&mutex);
ready_to_go = true;
// Signal the other thread to begin work.
pthread_cond_signal(&condition);
pthread_mutex_unlock(&mutex);
}
执行 Selector
NSObject 类声明了方法可以在应用的一个活动线程上面执行 selector 的方法。这些方法允许你的线程以异步的方式来传递消息,以确保它们在同一个线程上面执行是同步的。
在《Effective Objective-C 2.0》第 42 条指出多用 GCD,少用 performSelector 系列方法。
[self performSelectorOnMainThread:@selector(doSomething) withObject:nil waitUntialDone:NO];
dispatch_async(dispatch_get_main_queue(), ^{
[self doSomething];
});
创建线程的成本
创建线程也是有开销的,iOS 下主要成本包括构造内核数据结构(大约 1KB),栈空间(主线程 1M,子线程 512K,不过可以使用 -setStackSize 自己设置,但是必须是 4K 的倍数,最小是 16K ),创建线程大约需要 90 毫秒
同步的性能
同步可以确保你的程序正确执行,但是可能会牺牲掉部分性能。如果发生锁的争夺,你的线程有可能进入阻塞,在体验上会产生更大的迟延。
参考下 Benchmark Demo,对比几种锁在加锁和解锁 33554432 次 (即 1024102432),耗时时间分别是
NSLock: 3.5175 sec
NSLock+IMP Cache: 3.1165 sec
pthread_mutex: 1.5870 sec
OSSpinLock: 1.0893
@synchronized: 9.9488 sec
- NSLock 和 NSLock+IMP 时间比较接近,他们都是 pthread mutexes 封装的,创建对象时需要额外开销
- pthread_mutex C 语言中的互斥锁,性能较高,且较安全
- OSSpinLock 自旋锁占用时间最少
- @synchronized 内部会创建异常捕获的 handler 和其他内部使用的锁,所以消耗时间最长
也可以参考下 ibireme (郭曜源) 的 Benchmark Demo。同时 ibireme 指出 OSSPinLock 不再安全,所以看他的源码基本上是用 pthread_mutex 的情况比较多。另外 iOS 10 已经推出了 OSSpinLock 的替代品 os_unfair_lock。
另外,在《Effective Objective-C 2.0》第 41 条指出应当多用派发队列,少用同步锁。即使用”串行同步队列” 或者 GCD 的 dispatch_barrier_async / dispatch_barrier_sync 来避开同步锁的使用。
参考内容
创建线程官方文档
iOS 多线程之线程安全
iOS 多线程编程指南之线程同步
Synchronized NSLock pthread OSSpinLock 的正确使用
Synchronized 的实现
不再安全的 OSSpinLock