线程同步——条件变量
什么是条件变量?
线程间的同步有这样一种情况:线程A需要等待某个条件成立才会继续往下执行,如果条件不成立则阻塞等待,而线程B在执行过程中使得这个条件成立,然后唤醒线程A继续往下执行。那么这个条件就是条件变量。
pthread库中通过使用pthread_cond_t类型来表示条件变量类型。
对这种类型的操作包括以下:
pthread_cond_init(3)
#include <pthread.h> //静态初始化一个条件变量 pthread_cond_t cond=PTHREAD_COND_INITIALIZER; int pthread_cond_init(pthread_cond_t *cond,\ pthread_condattr_t *cond_attr); 功能:初始化一个条件变量 参数: cond:指定要初始化的条件变量 cond_attr:属性 NULL 默认值 返回值: 0 成功 非0 错误码
int pthread_cond_signal(pthread_cond_t *cond);
功能:启动在条件变量上等待的所有线程中的一个。 参数: cond:指定了条件变量,在这个条件变量上等待的线程 返回值: 0 成功 非0 错误码 int pthread_cond_broadcast(pthread_cond_t *cond); 功能:启动所有的在cond条件变量上等待的线程 参数: cond:指定了条件变量。 返回值: 0 成功 非0 错误码
int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex);
功能:在条件变量上等待一个条件变为真。 参数: cond:线程等待的条件 mutex:指定使用的mutex锁 返回值: 0 成功 非0 错误码 补充: (1)先解mutex锁 (2)等待signaled的到来 (3)重新加mutex锁
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);
功能:在条件变量上等待signaled的到来,如果指定的时间内,没有等到,返回ETIMEDOUT错误码。 参数: cond:同上 mutex:同上 abstime:指定等待的最长时间 返回值: 0 成功 非0 错误码
int pthread_cond_destroy(pthread_cond_t *cond);
功能:销毁一个条件变量 参数: cond:指定要销毁的条件变量 返回值: 0 成功 非0 错误码
代码示例
使用条件变量完成一个生产者和消费者的问题。
- p_c.c
#include <stdio.h> #include <pthread.h> #include <stdlib.h> typedef struct node{ int data; struct node *next; }node_t; typedef node_t *node_p; node_p head;//用于保存链表的头部 pthread_cond_t cond=PTHREAD_COND_INITIALIZER; pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER; //用于生产者 void *p(void *arg){ node_p new; while(1){ //创建新节点 new=(node_p)malloc(sizeof(node_t)); //初始化新节点的内容 new->next=NULL; //使用随机数初始化data的值 new->data=rand()%1000+1; //为了防止生产者和消费者两个的异步访问需要加mutex锁 pthread_mutex_lock(&mutex); //将新节点添加到链表的头部 new->next=head; head=new; printf("p %d\n",new->data); //操作完成的时候需要解除mutex锁 pthread_mutex_unlock(&mutex); //需要告诉消费者 pthread_cond_signal(&cond); sleep(rand()%5); } } //用于消费者 void *c(void *arg){ node_p tmp; while(1){ //加锁 pthread_mutex_lock(&mutex); //链表为空的时候 while(head==NULL){ pthread_cond_wait(&cond,&mutex); } //链表不为空 tmp=head; head=head->next; //解锁 pthread_mutex_unlock(&mutex); //消费tmp printf("c %d\n",tmp->data); free(tmp); tmp=NULL; sleep(rand()%5); } } int main(void){ pthread_t pid,cid; //配置随机数的种子 srand(time(NULL)); //创建两个线程,分别用于生产者和消费者 pthread_create(&pid,NULL,p,NULL); pthread_create(&cid,NULL,c,NULL); //等待线程汇合 pthread_join(cid,NULL); pthread_join(pid,NULL); //销毁锁和条件变量 pthread_mutex_destroy(&mutex); pthread_cond_destroy(&cond); return 0; }
- 执行结果
线程同步——信号量
对于多个资源的共享使用信号量。
想使用其中的一个资源的时候,首先判断是否有可用资源。如果有,是资源的可用数量减1.如果没有可用资源,线程等待其他线程释放资源。当线程释放资源的时候,资源可用数量加1.
信号量是一个类型 sem_t
关于信号量的操作,系统提供了以下函数
sem_init(3)
#include <semaphore.h> int sem_init(sem_t *sem, int pshared, unsigned int value); 功能:初始化一个匿名的信号量 参数: sem:指定了要初始化的信号量变量的地址空间 pshared: 0 应用多线程 非0 应用于多进程 value:指定了信号量变量的初值。 返回值: 0 成功 -1 错误 errno被设置
sem_destroy(3)
#include <semaphore.h> int sem_destroy(sem_t *sem); 功能:销毁一个信号量 参数: sem:指定要销毁的信号量变量的地址 返回值: 0 成功 -1 错误 errno被设置
sem_post(3)
#include <semaphore.h> int sem_post(sem_t *sem); 功能:使信号量变量的值加1,实质是释放资源 参数: sem:指定要操作的信号量类型的变量 返回值: 0 成功 -1 失败 errno被设置
sem_wait(3)
#include <semaphore.h> int sem_wait(sem_t *sem); 功能:使信号量变量的值减1。实质是获取一个资源。如果可用资源的数量是0,那么阻塞等待可用资源的释放。 参数: sem:指定要操作的信号量类型的变量 返回值: 0 成功 -1 失败 errno被设置
int sem_trywait(sem_t *sem);
功能: 参数: sem:指定要操作的信号量类型的变量 返回值: 0 成功 -1 失败 errno被设置 int sem_timedwait(sem_t *sem,const struct timespec *abs_timeout); 功能: 参数: sem:指定要操作的信号量类型的变量 返回值: 0 成功 -1 失败 errno被设置
代码示例
使用信号量完成生产者和消费者模型,不再使用链表,使用循环队列完成。
- p_cq.c
#include <stdio.h> #include <pthread.h> #include <time.h> #include <semaphore.h> int queue[5]; //需要使用到两个信号量,一个用于控制生产,一个用于控制消费 sem_t c_num,p_num; //生产者线程的代码 void *p(void *arg){ int j=0; while(1){ //将可生产的信号量减一 sem_wait(&p_num); queue[j]=rand()%1000+1; printf("p %d\n",queue[j]); //将可消费的信号量加1 sem_post(&c_num); j=(j+1)%5; sleep(rand()%5); } } //消费者线程的代码 void *c(void *arg){ int i=0; while(1){ //将可消费的信号量减1 sem_wait(&c_num); printf("c %d\n",queue[i]); queue[i]=0; //将可生产的信号量加1 sem_post(&p_num); i=(i+1)%5; sleep(rand()%5); } } int main(void){ pthread_t pid,cid; //设置随机数的种子 srand(time(NULL)); //初始化信号量的值 sem_init(&c_num,0,0); sem_init(&p_num,0,5); //创建生产者和消费者两个线程 pthread_create(&pid,NULL,p,NULL); pthread_create(&cid,NULL,c,NULL); //等待线程的汇合 pthread_join(pid,NULL); pthread_join(cid,NULL); //将信号量销毁 sem_destroy(&c_num); sem_destroy(&p_num); return 0; }
- 执行结果
进程间通讯(IPC)信号量集
信号量集就是有一个或多个信号量组成的集合。
如何使用信号量集实现进程间的通讯?
- 获取一个键值。
- 通过键值获取信号量集semid。
semget(2)
#include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> int semget(key_t key, int nsems, int semflg); 功能:获取一个信号量集的id 参数: key:ftok(3)的返回值 nsems:指定了信号量集中的信号量的个数 semflg: IPC_CREAT:指定创建信号量集。 IPC_EXCL:如果IPC_CREAT和IPC_EXCL一起指定,信号量集存在的话,报错。 mode:指定信号量集的被访问权限。 返回值: -1 错误 errno被设置 返回一个非负整数,是信号量集的id。
对信号量集中的某一个信号量进行pv操作
使用函数semop(2)
#include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> int semop(int semid, struct sembuf *sops, unsigned nsops); 功能:信号量的操作 参数: semid:指定了信号量集的id sops:指定了对信号量的操作 nsops:指定了要操作的信号量的个数 返回值: 0 成功 -1 失败 errno被设置 补充: unsigned short semval; /* semaphore value */ unsigned short semzcnt; /* # waiting for zero */ unsigned short semncnt; /* # waiting for increase */ pid_t sempid; /* process that did last op */ struct sembuf{ unsigned short sem_num; /* semaphore number */ short sem_op; /* semaphore operation */ short sem_flg; /* operation flags */ }; sem_num 指定了要操作的信号量的下标 sem_op 指定了对信号量的pv操作 >0 semval+sem_op v操作 =0 等待到0处理 <0 如果sem_op的绝对值小于semval,立即处理, semval-sem_op p操作 如果sem_op的绝对值大于semval,IPC_NOWAIT被指定,semop(2)立即返回错误。 sem_flg IPC_NOWAIT :非阻塞 SEM_UNDO 0 阻塞
为信号量集中的某一个信号量设置初值。或者获取信号量的值。
控制信号量集中某一个信号量,需要使用到semctl(2)
#include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> int semctl(int semid, int semnum, int cmd, ...); 功能:对信号量的控制操作(不是对信号量的pv操作) 参数: semid:指定了要操作的信号量集的id semnum:指定了信号量在信号量集中的下标(下标是0开始) cmd:指定了对这个信号量的操作 GETVAL:获取信号量集中地几个信号量的semval值 SETVAL:设置第semnum个信号量的值。需要第四个参数。 ...:可变参数,这个参数的个数有cmd参数决定。 返回值: -1 失败 errno被设置 如果cmd是GETVAL,返回信号量的semval。 其他 0 成功 补充: 第四个参数: union semun { int val; /* Value for SETVAL */ struct semid_ds *buf;/* Buffer for IPC_STAT, IPC_SET */ unsigned short *array;/* Array for GETALL, SETALL */ struct seminfo *__buf; /* Buffer for IPC_INFO (Linuxspecific) */ };
代码示例
使用进程间通讯模仿TCP通讯结果。
- server.c
#include <stdio.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> /*user programm*/ typedef union semun { int val; struct semid_ds *buf; unsigned short *array; struct seminfo *__buf; }semun_t; int main(void){ key_t key; semun_t arg; struct sembuf sb={0,-1,0}; //struct sembuf sb={0,-1,IPC_NOWAIT}; //获取一个键值 key=ftok(".",51); if(key==-1){ perror("ftok"); return 1; } //根据键值获取semid int semid=semget(key,1,IPC_CREAT|0664); if(semid==-1){ perror("semget"); return 2; } //设置第一个信号量的初值为5 arg.val=5; int r=semctl(semid,0,SETVAL,arg); if(r==-1){ perror("semctl"); return 3; } //循环,每间隔3秒信号量的值减1 while(1){ if(semop(semid,&sb,1)==-1){ perror("semop"); return 4; } sleep(3); } return 0; }
- client.c
#include <stdio.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> int main(void){ key_t key; int ret; //获取键值 key=ftok(".",51); if(key==-1){ perror("ftok"); return 1; } //获取和键值相关的semid int semid=semget(key,1,IPC_CREAT|0664); if(semid==-1){ perror("semget"); return 2; } //获取信号量的semval while(1){ ret=semctl(semid,0,GETVAL); if(ret>0){ printf(" %d resources...\n",\ ret); }else{ printf("no resources...\n"); } sleep(3); } return 0; }