并行编程之条件变量(posix condition variables)
在整理Java LockSupport.park()的东东,看到了个"Spurious wakeup",重新梳理下。
#include <pthread.h> struct msg { struct msg *m_next; /* ... more stuff here ... */ }; struct msg *workq; pthread_cond_t qready = PTHREAD_COND_INITIALIZER; pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER; void process_msg(void) { struct msg *mp; for (;;) { pthread_mutex_lock(&qlock); while (workq == NULL) pthread_cond_wait(&qready, &qlock); mp = workq; workq = mp->m_next; pthread_mutex_unlock(&qlock); /* now process the message mp */ } } void enqueue_msg(struct msg *mp) { pthread_mutex_lock(&qlock); mp->m_next = workq; workq = mp; pthread_mutex_unlock(&qlock); pthread_cond_signal(&qready); } 复制代码
- 一个简单的消息生产者和消费者的代码,它们之间用condition同步。
- 这个代码最容易让人搞混的是process_msg函数里的pthread_mutex_lock 和 pthread_mutex_unlock 是一对函数调用,前面加锁,后面解锁。的确,是加锁解锁,但是它们两不是一对的。它们的另一半在pthread_cond_wait函数里。
pthread_cond_wait函数可以认为它做了三件事:
- 把自身线程放到condition的等待队列里,把mutex解锁;
- 等待被唤醒(当其它线程调用pthread_cond_signal或者pthread_cond_broadcast时);
- 被唤醒之后,对mutex加锁,再返回。
- mutex和condition实际上是绑定在一起的,一个condition只能对应一个mutex。
在Java的代码里,Condition对象只能通过lock.newCondition()的函数来获取。
Spurious wakeup
所谓的spurious wakeup,指的是一个线程调用pthread_cond_signal(),却有可能不止一个线程被唤醒。
假定有三个线程,线程A正在执行pthread_cond_wait,线程B正在执行pthread_cond_signal,线程C正准备执行pthread_cond_wait函数。
pthread_cond_wait(mutex, cond): value = cond->value; /* 1 */ pthread_mutex_unlock(mutex); /* 2 */ pthread_mutex_lock(cond->mutex); /* 10 */ if (value == cond->value) { /* 11 */ me->next_cond = cond->waiter; cond->waiter = me; pthread_mutex_unlock(cond->mutex); unable_to_run(me); } else pthread_mutex_unlock(cond->mutex); /* 12 */ pthread_mutex_lock(mutex); /* 13 */ pthread_cond_signal(cond): pthread_mutex_lock(cond->mutex); /* 3 */ cond->value++; /* 4 */ if (cond->waiter) { /* 5 */ sleeper = cond->waiter; /* 6 */ cond->waiter = sleeper->next_cond; /* 7 */ able_to_run(sleeper); /* 8 */ } pthread_mutex_unlock(cond->mutex); /* 9 */ 复制代码
- 线程A执行了第1,2步,这时它释放了mutex,然后线程B拿到了这个mutext,并且pthread_cond_signal函数时执行并返回了。
于是线程B就是一个所谓的“spurious wakeup”。
/build/buildd/eglibc-2.19/nptl/pthread_cond_wait.c /build/buildd/eglibc-2.19/nptl/pthread_cond_signal.c
wait morphing优化
从而会有一个叫“wait morphing”优化,就是如果线程被唤醒但是不能获取到mutex,则线程被转移(morphing)到mutex的等待队列里。
The pthread_cond_broadcast() or pthread_cond_signal() functions may be called by a thread whether or not it currently owns the mutex that threads calling pthread_cond_wait() or pthread_cond_timedwait() have associated with the condition variable during their waits; however, if predictable scheduling behavior is required, then that mutex shall be locked by the thread calling pthread_cond_broadcast() or pthread_cond_signal().
是先调用pthread_mutex_unlock,再调用pthread_cond_signal。
void enqueue_msg(struct msg *mp) { pthread_mutex_lock(&qlock); mp->m_next = workq; workq = mp; pthread_mutex_unlock(&qlock); pthread_cond_signal(&qready); } 复制代码
有的地方给出的是先调用pthread_cond_signal,再调用pthread_mutex_unlock:
void enqueue_msg(struct msg *mp) { pthread_mutex_lock(&qlock); mp->m_next = workq; workq = mp; pthread_cond_signal(&qready); pthread_mutex_unlock(&qlock); } 复制代码
- 先unlock再signal,这有个好处,就是调用enqueue_msg的线程可以再次参与mutex的竞争中,这样意味着可以连续放入多个消息,这个可能会提高效率。类似Java里ReentrantLock的非公平模式。
- 先signal再unlock,有可能会出现一种情况是被signal唤醒的线程会因为不能马上拿到mutex(还没被释放),从而会再次休眠,这样影响了效率。
可见在调用signal之前,可以不持有mutex,除非是“predictable scheduling”,可预测的调度行为。这种可能是实时系统才有这种严格的要求。
为什么要用while循环来判断条件是否成立?
while (workq == NULL) pthread_cond_wait(&qready, &qlock); 复制代码
而不用if来判断?
if (workq == NULL) pthread_cond_wait(&qready, &qlock); 复制代码
一个原因是spurious wakeup,但即使没有spurious wakeup,也是要用While来判断的。
- 线程A,线程B在pthread_cond_wait函数中等待,然后线程C把消息放到队列里,再调用pthread_cond_broadcast,然后线程A先获取到mutex,处理完消息完后,这时workq就变成NULL了。
- 线程B才获取到mutex,那么这时实际上是没有资源供线程B使用的。所以从pthread_cond_wait函数返回之后,还是要判断条件是否成功,如果成立,再进行处理。
pthread_cond_signal和pthread_cond_broadcast
- 认为调用pthread_cond_broadcast来唤醒所有的线程是比较好的写法。
- 但是我认为pthread_cond_signal和pthread_cond_broadcast是两个不同东东,不能简单合并在同一个函数调用。
- 只唤醒一个效率和唤醒全部等待线程的效率显然不能等同。典型的condition是用CLH或者MCS来实现的,要通知所有的线程,则要历遍链表,显然效率降低。
mutex,condition是不是公平(fair)的?
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <unistd.h> pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; volatile int mutexCount = 0; void mutexFairTest(){ int localCount = 0; while(1){ pthread_mutex_lock(&lock); __sync_fetch_and_add(&mutexCount, 1); localCount += 1; if(mutexCount > 100000000){ break; } pthread_mutex_unlock(&lock); } pthread_mutex_unlock(&lock); printf("localCount:%d\n", localCount); } int main() { pthread_mutex_lock(&lock); pthread_create(new pthread_t, NULL, (void * (*)(void *))&mutexFairTest, NULL); pthread_create(new pthread_t, NULL, (void * (*)(void *))&mutexFairTest, NULL); pthread_create(new pthread_t, NULL, (void * (*)(void *))&mutexFairTest, NULL); pthread_create(new pthread_t, NULL, (void * (*)(void *))&mutexFairTest, NULL); pthread_create(new pthread_t, NULL, (void * (*)(void *))&mutexFairTest, NULL); pthread_create(new pthread_t, NULL, (void * (*)(void *))&mutexFairTest, NULL); pthread_mutex_unlock(&lock); sleep(100); } 复制代码
输出结果是:
localCount:16930422 localCount:16525616 localCount:16850294 localCount:16129844 localCount:17329693 localCount:16234137 复制代码
连续调用pthread_cond_signal,会唤醒多少次/多少个线程? 比如线程a,b 在调用pthread_cond_wait之后等待,然后线程c, d同时调用pthread_cond_signal,那么a, b线程是否都能被唤醒?
会不会出现c, d, a 这种调用顺序,然后b一直在等待,然后死锁了?
The pthread_cond_signal() function shall unblock at least one of the threads that are blocked on the specified condition variable cond (if any threads are blocked on cond).
因此,如果有线程已经在调用pthread_cond_wait等待的情况下,pthread_cond_signal调用至少会唤醒等待中的一个线程。
所以不会出现上面的线程b一直等待的情况。