/* include globals */
#include "unpipc.h"
#define MAXNITEMS 1000000
#define MAXNTHREADS 100
/* globals shared by threads */
int nitems; /* read-only by producer and consumer */
int buff[MAXNITEMS];
struct {
pthread_mutex_t mutex;
int nput; /* next index to store */
int nval; /* next value to store */
} put = { PTHREAD_MUTEX_INITIALIZER };
struct {
pthread_mutex_t mutex;
pthread_cond_t cond;
int nready; /* number ready for consumer */
} nready = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER };
/* end globals */
void *produce(void *), *consume(void *);
/* include main */
int
main(int argc, char **argv)
{
int i, nthreads, count[MAXNTHREADS];
pthread_t tid_produce[MAXNTHREADS], tid_consume;
if (argc != 3)
err_quit("usage: prodcons6 <#items> <#threads>");
nitems = min(atoi(argv[1]), MAXNITEMS);
nthreads = min(atoi(argv[2]), MAXNTHREADS);
Set_concurrency(nthreads + 1);
/* 4create all producers and one consumer */
for (i = 0; i < nthreads; i++) {
count[i] = 0;
Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
}
Pthread_create(&tid_consume, NULL, consume, NULL);
/* wait for all producers and the consumer */
for (i = 0; i < nthreads; i++) {
Pthread_join(tid_produce[i], NULL);
printf("count[%d] = %d\n", i, count[i]);
}
Pthread_join(tid_consume, NULL);
exit(0);
}
/* end main */
/* include prodcons */
void *
produce(void *arg)
{
for ( ; ; ) {
Pthread_mutex_lock(&put.mutex);
if (put.nput >= nitems) {
Pthread_mutex_unlock(&put.mutex);
return(NULL); /* array is full, we're done */
}
buff[put.nput] = put.nval;
put.nput++;
put.nval++;
Pthread_mutex_unlock(&put.mutex);
Pthread_mutex_lock(&nready.mutex);
if (nready.nready == 0)
Pthread_cond_signal(&nready.cond);//发出信号
nready.nready++;//置为1
Pthread_mutex_unlock(&nready.mutex);
*((int *) arg) += 1;
}
}
void *
consume(void *arg)
{
int i;
for (i = 0; i < nitems; i++) {
Pthread_mutex_lock(&nready.mutex);
while (nready.nready == 0)
Pthread_cond_wait(&nready.cond, &nready.mutex);//wait条件变量
nready.nready--;//置为0
Pthread_mutex_unlock(&nready.mutex);
if (buff[i] != i)
printf("buff[%d] = %d\n", i, buff[i]);
}
return(NULL);
}
/* end prodcons */
这里在生产者的代码中,当它获取到互斥锁时,若发出信号唤醒消费者,则此时可能系统立即调度唤醒消费者,但互斥锁任然在生产者之手,则消费者获取互斥锁必然失败,为了避免此种低效的情况出现,我们可以直到生产者释放互斥锁后才给与之关联的条件变量发送信号,这在Posix里是可以这么做的,但Posix又接着说:若要可预见的调度行为,则调用pthead_cond_signal的线程必须锁住该互斥锁。
当在进程间共享互斥锁时,持有该互斥锁的进程可能在持有期间终止,但无法让系统在终止时自动释放掉所持有的锁。一个线程也可以在持有互斥锁期间终止,可能是自己调用pthread_exit或被另一个线程取消,若是前者,则它应该知道自己还有一个互斥锁,若是后者,则该线程可以先安装一个将在被取消时调用的清理处理程序。但最致命的情况是由于此线程的终止导致整个进程的终止。即使一个进程终止时系统自动释放其持有的锁,但也会导致临界区内数据处于不一致状态,
读写锁的规则:
1,只要没有线程持有某个给定的读写锁用于写,则任意数目的线程可以持有
该读写锁用于读
2,仅当没有线程持有某个给定的读写锁用于读或用于写时,才能分配该读写锁用于写
这种锁在那些读数据比写数据频繁的应用中使用比较有用,允许多个读者提供了更高的并发度,同时在写者修改数据时保护数据,避免任何其他读者或写者的干扰。
这种对某个给定资源的共享访问也叫共享—独占上锁,获取一个读写锁用于读称为共享锁,获取一个读写锁用于写称为独占锁。在操作系统中就介绍过这种,经典的问题就是读者—写者问题,有多种类型:多读者,单写者或多读者,多写者。,此外还有要考虑的就是读者和写者谁优先,也就产生了1类和2类读写问题。
读写锁类型为pthread_rwlock_t。pthread_rwlock_rdlock获取一个读出锁,若对应的读写锁已经被某个写者持有,则阻塞调用线程,pthread_rwlock_wrlock获取一个写出锁,若对应的读写锁已经被另一个写者持有或被一个或多个读者持有,则阻塞调用线程,pthread_rwlock_unlock释放一个读出锁或写入锁。
使用互斥锁和条件变量实现读写锁(写者优先)
typedef struct {
pthread_mutex_t rw_mutex; /* basic lock on this struct *///访问此读写锁使用的互斥锁
pthread_cond_t rw_condreaders; /* for reader threads waiting 读者线程使用*/
pthread_cond_t rw_condwriters; /* for writer threads waiting 写者线程使用*/
int rw_magic; /* for error checking 初始化成功后, 被设置为RW_MAGIC,所有函数测试此成员,检查调用者是否作为参数传递了指向某个已经初始化的读写锁的指针,读写锁摧毁时,被设置为0*/
int rw_nwaitreaders;/* the number waiting 读者计数器*/
int rw_nwaitwriters;/* the number waiting 写者计数器*/
int rw_refcount;//本读写锁的当前状态,-1表示是写入锁(任意时刻只有一个),0表示可用,大于0表示当前容纳着的读出锁数目
/* 4-1 if writer has the lock, else # readers holding the lock */
} pthread_rwlock_t;
int pthread_rwlock_init(pthread_rwlock_t *rw, pthread_rwlockattr_t *attr)
{
int result;
if (attr != NULL)
return(EINVAL); /* not supported */
if ( (result = pthread_mutex_init(&rw->rw_mutex, NULL)) != 0)
goto err1;
if ( (result = pthread_cond_init(&rw->rw_condreaders, NULL)) != 0)
goto err2;
if ( (result = pthread_cond_init(&rw->rw_condwriters, NULL)) != 0)
goto err3;
rw->rw_nwaitreaders = 0;
rw->rw_nwaitwriters = 0;
rw->rw_refcount = 0;
rw->rw_magic = RW_MAGIC;
return(0);
err3:
pthread_cond_destroy(&rw->rw_condreaders);
err2:
pthread_mutex_destroy(&rw->rw_mutex);
err1:
return(result); /* an errno value */
}
int pthread_rwlock_destroy(pthread_rwlock_t *rw)
{
//检查参数是否有效
if (rw->rw_magic != RW_MAGIC)
return(EINVAL);
if (rw->rw_refcount != 0 ||
rw->rw_nwaitreaders != 0 || rw->rw_nwaitwriters != 0)
return(EBUSY);
pthread_mutex_destroy(&rw->rw_mutex);
pthread_cond_destroy(&rw->rw_condreaders);
pthread_cond_destroy(&rw->rw_condwriters);
rw->rw_magic = 0;
return(0);
}
int pthread_rwlock_rdlock(pthread_rwlock_t *rw)
{
int result;
//检查参数是否有效
if (rw->rw_magic != RW_MAGIC)
return(EINVAL);
//操作读写锁前,先给其互斥锁上锁
if ( (result = pthread_mutex_lock(&rw->rw_mutex)) != 0)
return(result);
/* 4give preference to waiting writers */
while (rw->rw_refcount < 0 || rw->rw_nwaitwriters > 0)
{// rw_refcount 小于0(表示有写者持有读写锁),rw_nwaitwriters大于0表示有线程正等着获取读写锁的一个写入锁,则无法获取该读写锁的一个读出锁
rw->rw_nwaitreaders++;
result = pthread_cond_wait(&rw->rw_condreaders, &rw->rw_mutex);
rw->rw_nwaitreaders--;
if (result != 0)
break;
}
if (result == 0)
rw->rw_refcount++; /* another reader has a read lock */
pthread_mutex_unlock(&rw->rw_mutex);
return (result);
}
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rw)
{
int result;
if (rw->rw_magic != RW_MAGIC)
return(EINVAL);
if ( (result = pthread_mutex_lock(&rw->rw_mutex)) != 0)
return(result);
if (rw->rw_refcount < 0 || rw->rw_nwaitwriters > 0)
result = EBUSY; /* held by a writer or waiting writers */
else
rw->rw_refcount++; /* increment count of reader locks */
pthread_mutex_unlock(&rw->rw_mutex);
return(result);
}
int pthread_rwlock_wrlock(pthread_rwlock_t *rw)
{
int result;
if (rw->rw_magic != RW_MAGIC)
return(EINVAL);
if ( (result = pthread_mutex_lock(&rw->rw_mutex)) != 0)
return(result);
while (rw->rw_refcount != 0)
{//只要有读者持有读出锁或有一个写者持有唯一的写入锁,调用线程阻塞
rw->rw_nwaitwriters++;
result = pthread_cond_wait(&rw->rw_condwriters, &rw->rw_mutex);
rw->rw_nwaitwriters--;
if (result != 0)
break;
}
if (result == 0)
rw->rw_refcount = -1;
pthread_mutex_unlock(&rw->rw_mutex);
return(result);
}
int pthread_rwlock_trywrlock(pthread_rwlock_t *rw)
{
int result;
if (rw->rw_magic != RW_MAGIC)
return(EINVAL);
if ( (result = pthread_mutex_lock(&rw->rw_mutex)) != 0)
return(result);
if (rw->rw_refcount != 0)
result = EBUSY; /* held by either writer or reader(s) */
else
rw->rw_refcount = -1; /* available, indicate a writer has it */
pthread_mutex_unlock(&rw->rw_mutex);
return(result);
}
int pthread_rwlock_unlock(pthread_rwlock_t *rw)
{
int result;
if (rw->rw_magic != RW_MAGIC)
return(EINVAL);
if ( (result = pthread_mutex_lock(&rw->rw_mutex)) != 0)
return(result);
if (rw->rw_refcount > 0)
rw->rw_refcount--; /* releasing a reader */
else if (rw->rw_refcount == -1)
rw->rw_refcount = 0; /* releasing a reader */
else
err_dump("rw_refcount = %d", rw->rw_refcount);
/* 4give preference to waiting writers over waiting readers */
if (rw->rw_nwaitwriters > 0) {
if (rw->rw_refcount == 0)
result = pthread_cond_signal(&rw->rw_condwriters);
} else if (rw->rw_nwaitreaders > 0)
result = pthread_cond_broadcast(&rw->rw_condreaders);
pthread_mutex_unlock(&rw->rw_mutex);
return(result);
}
int pthread_rwlock_unlock(pthread_rwlock_t *rw)
{
int result;
if (rw->rw_magic != RW_MAGIC)
return(EINVAL);
if ( (result = pthread_mutex_lock(&rw->rw_mutex)) != 0)
return(result);
if (rw->rw_refcount > 0)
rw->rw_refcount--; /* releasing a reader */
else if (rw->rw_refcount == -1)
rw->rw_refcount = 0; /* releasing a writer */
else
err_dump("rw_refcount = %d", rw->rw_refcount);
/* 4give preference to waiting writers over waiting readers */
//写者优先,而只有一个写者
if (rw->rw_nwaitwriters > 0) {
if (rw->rw_refcount == 0)
result = pthread_cond_signal(&rw->rw_condwriters);//通知等待的那个写者
} else if (rw->rw_nwaitreaders > 0)
result = pthread_cond_broadcast(&rw->rw_condreaders);//通知所有等待的读者
pthread_mutex_unlock(&rw->rw_mutex);
return(result);
}
这里的读出锁和写入锁函数都有一个问题,若调用线程阻塞在pthread_cond_wati调用上,并且随后此线程被取消了,则它会在还持有互斥锁的情况下终止,于是rw_nwaitreaders计数器的值会出错
本文转自Phinecos(洞庭散人)博客园博客,原文链接:http://www.cnblogs.com/phinecos/archive/2008/05/28/1209230.html,如需转载请自行联系原作者