1、线程的概念
1.1、进程 vs 线程
进程是资源分配的基本单位。拥有独立的资源,当进程切换时,需要保存较多的上下文,系统开销大,因此引入了线程。
线程是 cpu 调度的基本单位。一个进程里面至少拥有一个线程(主线程),多个线程交替并发执行。同一进程里的多线程共享地址空间,线程只拥有少量的栈空间。线程执行开销少,占用 cpu 少,线程间的切换快,但不利于资源的管理和保护,而进程正好相反。
posix 线程是在 C 程序中处理线程的一个标准接口,在所有的 linux 系统上都可用,编译时添加-lpthread
# 查看线程相关信息的命令,显示的是内核级线程的 id,thid 是用户级线程的 id ps -elLf
1.2、线程的种类
- 用户级线程:在用户空间实现的线程,由用户态的线程库来完成线程的管理;
- 核心级线程:在内核实现的线程,由内核管理的线程
2、线程的控制
2.1、线程的创建
/* 返回值:成功返回0,失败返回错误码 参数: - tid:线程 id - attr:线程的属性,NULL - start_routine:线程的处理函数 - arg:线程处理函数的参数 */ int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg); // 新线程获取自己的线程 ID pthread_t pthread_self(void);
2.2、线程的退出
- 顶层的线程实例返回,线程会隐式终止
- 调用
pthread_exit
,线程会显式终止。若主线程调用该函数,它会等待所有其他对等线程终止,然后再终止整个主线程和整个进程。
// retval 返回的值传递给 pthread_join void pthread_exit(void *retval);
2.3、线程的取消
线程被其它线程杀掉,被杀掉的线程可以选择处理信号的方式,默认运行至取消点后终止。阻塞系统调用一定是取消点,非阻塞函数可能是取消点。
尽量不使用线程取消函数,若没有取消点,线程无法结束;或提前取消,造成堆空间的内存泄漏。
/* 返回值:成功返回0, 失败返回错误码 参数 pid:要取消的线程 id */ int pthread_cancel(pthread_t thread)
2.4、线程的等待
回收已终止线程的资源,等待其他线程的终止。pthread_join
会阻塞,直到线程 tid 终止,回收已终止线程占用的所有内存资源。与 wait
函数不同,pthread_join
函数只能等待一个指定的线程终止,不能等待一个任意线程的终止。
/* 返回值:成功返回0, 失败返回错误码。 参数 - tid:等待退出的线程id, - retval:接收子线程的退出,可以接收指针类型的退出状态 */ int pthread_join(pthread_t thread, void **retval)
2.5、线程的分离
在任何一个时间点上,线程是可结合 joinble
或是分离的 detached
。
- 一个可结合的线程能够被其他线程回收和杀死。在被其他线程回收前,它的内存资源不释放
- 一个分离的线程不能被其他线程回收和杀死,其内存资源在终止时由系统自动释放
默认情况下,线程被创建成可结合的。为了避免内存泄漏,每个可结合的线程,要么被其他线程显示回收,要么调用 detach
函数被分离。
int pthread_detach(pthread_t thread);
例如:对于服务器,没有必要显示等待每个对等线程的终止。这样每个对等线程都在开始处理请求前通过 pthread_self
为参数的pthread_detach
调用分离自身,这样就能在终止后回收它的内存资源。
2.5、线程清理函数
防止线程访问临界资源时被取消或中断而导致该临界资源永远处于锁定状态得不到释放。函数对,栈结构管理,调用 push
清理函数入栈,调用 pop
清理函数出栈。且必须成对使用,必须在代码的同一层次使用({}
匹配)。push
放在紧邻需要清理的资源下面,pop
尽量往后放。
#define pthread_cleanup_push(routine,arg) \ { struct _pthread_cleanup_buffer _buffer; \ _pthread_cleanup_push (&_buffer, (routine), (arg)); #define pthread_cleanup_pop(execute) \ _pthread_cleanup_pop (&_buffer, (execute)); } /* 参数 - routine;清理函数 - arg:清理函数的参数 */ void pthread_cleanup_push(void (*routine) (void *), void *arg) /* 参数 execute:= 0,清理函数出栈;!0,清理函数不出栈 */ void pthread_cleanup_pop(int execute)
线程清理函数响应的时机
- 线程被取消
pthread_cancel
- 显式弹栈
pthread_cleanup_pop(1)
- 线程退出
pthread_exit
线程清理函数不响应的时机
- 不弹栈
pthread_cleanup_pop(0)
- 线程 return
例:线程清理函数
#include <pthread.h> #include <stdio.h> #include <string.h> #include <stdlib.h> void cleanFunc(void* p) { printf("cleanFunc\n"); free(p); p = NULL; printf("free success\n"); } void *threadFunc(void *p) { char *p1 = (char*)malloc(20); strcpy(p1, "hello"); // 线程清理函数入栈 pthread_cleanup_push(cleanFunc, p1); printf("p1 = %s\n", p1); // 线程退出,清理函数响应 // pthread_exit(NULL); // 线程清理函数不响应 /* return 0; pthread_cleanup_pop(0); */ // 弹栈,线程清理函数响应 pthread_cleanup_pop(1); } int main(int argc, char **argv) { int ret = 0; pthread_t thid; ret = pthread_create(&thid, NULL, threadFunc, NULL); THREAD_ERROR_CHECK(ret, "pthread_create"); // 线程取消,线程清理函数响应 // ret = pthread_cancel(thid); // THREAD_ERROR_CHECK(ret, "pthread_cancel"); printf("i am main thread\n"); pthread_join(thid, NULL); return 0; }
3、线程的同步和互斥
临界资源:一次仅允许一个进程使用的资源,访问临界资源的代码称为临界区。
- 同步:直接制约关系。多进程需要在某些位置上协调工作次序而等待,传递消息而产生的制约关系。
- 异步:间接制约关系。一个进程进入临界区访问临界资源,另一个进程必须等待。
临界区访问准则:空闲让进、忙则等待、有限等待、让权等待。
线程的互斥通过锁机制实现,线程的同步通过条件变量实现。
3.1、锁机制
当一个线程访问的时候,需要加上锁以防止另外的线程对它进行访问,实现资源的独占。在一个时刻只能有一个线程掌握某个互斥锁,拥有上锁状态的线程能够对共享资源进行操作。
3.1.1、锁的类型
最基本的锁
- 互斥锁
mutex
:竞争失败,线程挂起,让出 cpu,陷入休眠 - 自旋锁
spinlock
:竞争失败,线程轮询忙等,占用 cpu,直到获取锁。
读写锁rwlock
:写独占,读共享,大部分采用强写同步。
根据是否加锁,分为:
- 悲观锁:先加锁,后访问共享资源。互斥锁、自旋锁、读写锁都属于悲观锁
- 乐观锁:先修改共享资源,再验证这段段时间有无冲突,有冲突,则放弃本次操作。只有在冲突概率低,且加锁成本高,考虑无锁编程。
具体内容,参考我的博客:原子操作CAS与锁实现。接下来,仅以互斥锁为例介绍锁,其他锁类似。
3.1.2、锁的使用
锁的初始化
锁的创建有动态方式和静态方式
动态方式
/* 返回值:成功返回0,失败返回错误码 参数: - mutex:需要初始化的锁变量 - mutexattr:锁的属性 */ int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr)
静态方式
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
锁的属性设置
锁的属性
- 普通锁 | 快速锁
NULL,PTHREAD_MUTEX_TIMED_NP
:不允许连续加锁,解锁者可以是同进程内任意线程。 - 嵌套锁
PTHREAD_MUTEX_RECURSIVE
:可以连续加锁多次,文档必须由加锁者解锁 - 检错锁
PTHREAD_MUTEX_ERRORCHECK
int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int type) /* 参数1:设置锁的属性变量 参数2:需要设置的类型type - 普通锁、快速锁:NULL,PTHREAD_MUTEX_TIMED_NP - 嵌套锁:PTHREAD_MUTEX_RECURSIVE - 检错锁:PTHREAD_MUTEX_ERRORCHECK */
加锁 & 解锁
加锁:阻塞性函数,连续加锁会阻塞。
int pthread_mutex_lock(pthread_mutex_t *mutex)
测试加锁:非阻塞性函数,若锁正在使用,那么使用 trylock 不会加锁成功,也不会阻塞,会返回错误码。
int pthread_mutex_trylock(pthread_mutex_t *mutex)
解锁
int pthread_mutex_unlock(pthread_mutex_t *mutex)
锁的销毁
阻塞性函数。不能对正在使用的锁进行销毁操作。
加锁一次:锁的引用计数 + 1;解锁时,锁的引用计数-1。当锁的引用计数为 0,锁才能被销毁。所以,对一把锁加锁 1 次,连续解锁两次,destroy 不会成功。
int pthread_mutex_destroy(pthread_mutex_t *mutex)
3.1.3、死锁
死锁:多个进程因竞争资源而造成的互相等待。若无外力作用,这些进程都将无法推进。
死锁产生的原因
- 系统资源的竞争
- 进程的推进顺序不合理
死锁产生的必要条件
- 互斥条件
- 请求与保持
- 不可剥夺条件
- 循环等待
解决方案:
- 预防死锁:破坏四个必要条件之一
- 避免死锁:银行家算法,采用预分配策略检查分配完成时系统是否处于安全状态
- 检测死锁:利用死锁定理简化资源分配图来检测死锁的存在。参考我的博客:手写死锁检测组件
- 解除死锁:资源剥夺、撤销进程、进程回退
3.2、条件变量
条件变量是利用线程间共享的全局变量进行同步的一种机制,条件变量记录阻塞的原因,每个条件变量对应一个等待队列,记录因该条件变量而阻塞的所有进程。为了防止竞争,通常和互斥锁配合使用。
条件变量主要包括两个动作:
- 一个线程等待条件变量上的条件成立而挂起
- 另一个线程使条件成立(给出条件成立信号)。
条件变量的初始化
动态方式
/* 返回值:函数成功返回0;失败返回错误码。 参数: - cond:待初始化的条件变量, - attr:条件变量的属性,一般都填NULL(attr一般表示属性) */ int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
静态方式
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
条件变量的销毁
只有在条件变量没有被使用的时候,才能被销毁
int pthread_cond_destroy(pthread_cond_t *cond);
条件变量的等待
无论哪种等待方式,都必须和一个互斥锁配合,以防止多个线程同时请求竞争条件(Race Condition)。
条件等待函数的原理
- 上半部:条件变量上排队,解锁,阻塞
- 下半部:被唤醒,加锁,函数返回
/* @brief: 无条件等待 @return: 成功返回 0,失败返回 -1 @cond: 条件变量 @mutex: 互斥量 */ int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) /* @brief: 计时等待 @return: 成功返回 0,失败返回 -1 @abstime: 计时器 返回值:函数成功返回0;失败返回错误码。 int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime)
条件变量的唤醒
// 每次唤醒一个条件变量 int pthread_cond_signal(pthread_cond_t *cond) // 唤醒所有条件变量 int pthread_cond_broadcast(pthread_cond_t *cond)
* 虚假唤醒
虚假唤醒指的是在多线程环境下,多个线程等待在同一个条件上。当条件满足时,可能唤醒多个线程。如果这个资源只能由一个线程获得,剩余线程无法获得该资源。对于无法获得资源的线程来说,这种唤醒是无意义的,这种现象称为虚假唤醒。
虚假唤醒产生的原因
- 信号中断导致的问题(Linux2.6以后已解决)
pthread_cond_signal
,至少唤醒1个线程,即可能同时唤醒多个线程。
为避免虚假唤醒的发生,每个被唤醒的线程都需要再检查一次条件是否满足。如果不满足,应该继续睡眠;只有满足了才能往下执行。因此,需要把条件变量的判断从 if 判断换成 while 判断,此时,一次只能唤醒一个线程。
3.4、经典同步问题
3.4.1、生产者-消费者问题
问题描述:生产者和消费者共享一个大小为 n 的缓冲区。缓冲区不满,生产者生产消息,放入缓冲区,否则等待;缓冲区不空,消费者取出消息,否则等等待。
问题分析
- 同步:缓冲区不满,生产者生产;缓冲区不空,消费者消费。
- 互斥:缓冲区
信号量伪代码
// 互斥 semaphore mutex = 1; // 同步 semaphore empty = n; semaphore full = 0; producer() { while (1) { p(empty); p(mutex); 生产数据放入缓冲区 v(mutex) v(full) } } consumer() { while (1) { p(full); p(mutex); 从缓冲区读取数据 v(mutex) v(empty) } }
例:信号量实现
#include <sys/sem.h> #include <unistd.h> #include <string.h> #include <stdio.h> int main(int argc, char **argv) { int ret = 0; // 创建信号量集合 int semid = semget(3000, 2, IPC_CREAT|0666); // 创建 unsigned short 数组来 SETALL unsigned short arr[2] = {0, 5}; // 初始:商品数量 0,货架数量 5 semctl(semid, 0, SETALL, arr); // 0 表示对全部信号量操作 // 信号量的操作函数,数组一次实现两个原子操作 struct sembuf sop[2]; memset(sop, 0, sizeof(sop)); // 父进程:生产者 if(fork()){ sop[0].sem_num = 0; // 对信号量 0 操作,即商品 sop[0].sem_op = 1; // 生产商品 sop[0].sem_flg = 0; sop[1].sem_num = 1; // 对信号量 1 操作,即货架 sop[1].sem_op = -1; // 消耗货架 sop[1].sem_flg = 0; while(1){ // 信号量操作 ret = semop(semid, sop, 2); // GETVAL 直接返回信号代表的资源数量 printf("生产者:商品的数量 = %d, 货架的数量 = %d\n", semctl(semid, 0, GETVAL), semctl(semid, 1, GETVAL)); sleep(1); } } // 子进程:消费者 else{ sop[0].sem_num = 0; sop[0].sem_op = -1; sop[0].sem_flg = 0; sop[1].sem_num = 1; sop[1].sem_op = 1; sop[1].sem_flg = 0; while(1){ ret = semop(semid, sop, 2); printf("消费者:商品的数量 = %d, 货架的数量 = %d\n", semctl(semid, 0, GETVAL), semctl(semid, 1, GETVAL)); sleep(2); } } return 0; }
例:条件变量 + 互斥量
场景:12306 官方放票,官方和黄牛卖票
#include <pthread.h> #include <stdio.h> typedef struct { int num; pthread_cond_t cond_sell; pthread_cond_t cond_produce; pthread_mutex_t mutex; } ticket_t, *pticket_t; void* seller_1(void *p) { pticket_t pt = (pticket_t) p; int count = 0; while(1) { pthread_mutex_lock(&pt->mutex); if(pt->num) { pt->num--; printf("黄牛卖了%d张票\n", ++count); } else { // 没票,则唤醒生产者生产票 pthread_cond_signal(&pt->cond_produce); // 等待票的就绪 pthread_cond_wait(&pt->cond_sell, &pt->mutex); } pthread_mutex_unlock(&pt->mutex); } pthread_exit(NULL); } void* seller_2(void *p) { pticket_t pt = (pticket_t) p; int count = 0; while(1) { pthread_mutex_lock(&pt->mutex); if(pt->num) { pt->num--; printf("官方卖了%d张票\n", ++count); } else { pthread_cond_signal(&pt->cond_produce); pthread_cond_wait(&pt->cond_sell, &pt->mutex); } pthread_mutex_unlock(&pt->mutex); } pthread_exit(NULL); } void* setTickets(void *p) { pticket_t pt = (pticket_t) p; while(1) { pthread_mutex_lock(&pt->mutex); if(pt->num == 0) { // 没票,生产票,并唤醒消费者卖票 pt->num = 100; pthread_cond_signal(&pt->cond_sell); puts("selltickets now"); } else { // 有票,则等待消费者的卖完票 pthread_cond_wait(&pt->cond_produce, &pt->mutex); } pthread_mutex_unlock(&pt->mutex); } pthread_exit(NULL); } int main(int argc,char*argv[]) { ticket_t ticket; ticket.num = 10; pthread_mutex_init(&ticket.mutex, NULL); pthread_cond_init(&ticket.cond_sell, NULL); pthread_cond_init(&ticket.cond_produce, NULL); pthread_t thid1, thid2, thid3; int ret = 0; ret = pthread_create(&thid1, NULL, setTickets, &ticket); ret = pthread_create(&thid2, NULL, seller_1, &ticket); ret = pthread_create(&thid3, NULL, seller_2, &ticket); pthread_join(thid1, NULL); pthread_join(thid2, NULL); pthread_join(thid3, NULL); pthread_mutex_destroy(&ticket.mutex); pthread_cond_destroy(&ticket.cond_sell); pthread_cond_destroy(&ticket.cond_produce); return 0; }
3.4.2、读者和写者问题
问题描述:读者和写者两组进程,共享一个文件。为避免数据不一致,规定:读读允许、读写互斥、写写互斥
读者优先
semaphore rCountMutex = 1; // 控制 rCount 变量的更新 semaphore rw = 1; // 控制读者和写者互斥 int rCount = 0; // 记录读者数量 writer() { while (1) { p(rw); write(); v(rw) } } reader() { while (1) { p(rCountMutex); if (rCount == 0) { p(rw); // 有写者,则阻塞写进程 } ++rCount; v(rCountMutex); read(); p(rCountMutex); --rCount; if (rCount == 0) { v(rw); // 最后一个读者离开,唤醒写者 } v(rCountMutex); } }
写者优先
semaphore rCountMutex = 1; // 控制 rCount 变量的更新 semaphore rw = 1; // 控制读者和写者互斥 semaphore w = 1; // 多了一个:控制写者写操作间互斥 int rCount = 0; // 记录读者数量 writer() { while (1) { p(w); / p(rw); write(); v(rw); v(w); } } reader() { while (1) { p(w); // * 在无写者时,才能请求进入 p(rCountMutex); if (rCount == 0) { p(rw); } ++rCount; v(rCountMutex); v(w) // * read(); p(rCountMutex); --rCount; if (rCount == 0) { v(rw); } v(rCountMutex); } }
读写公平
读者优先只要后续有读者到达,读者就可以进入读者队列, 而写者必须等待,直到没有读者到达。使用信号量flag
,读写共享,当写者到来时,即时读者队列不为空,后续的读者阻塞在 flag
上,读者队列逐渐为空,直到写者执行。
同时写者也不能立即执行,阻塞在 rw
上,只有读者队列中所有的读者读完后,才能唤醒写者,执行写操作。
semaphore rCountMutex = 1; // 控制 rCount 变量的更新 semaphore rw = 1; // 控制读者和写者互斥 semaphore flag = 1; // 实现读写公平:去掉读者到来就可以进入队列的权限 int rCount = 0; // 记录读者数量 writer() { while (1) { p(flag); p(rw); write(); v(rw); v(flag); } } reader() { while (1) { p(falg); // * 在无写者时,才能请求进入 p(rCountMutex); if (rCount == 0) { p(rw); } ++rCount; v(rCountMutex); v(flag) // * read(); p(rCountMutex); --rCount; if (rCount == 0) { v(rw); } v(rCountMutex); } }
3.4.3、哲学家问题
问题描述:一张圆桌子上坐着五名哲学家,每两个哲学家间摆着一根筷子,两个筷子间是一碗饭。哲学家毕生都在思考和用餐。当哲学家饥饿时,才试图拿起两根筷子,只有拿到了两根筷子才能进餐,否则需要等待。就餐结束后,继续思考。
解决1:哲学家一要么次取两根筷子,要么不拿。
semaphore chopstick[5] = {1, 1, 1, 1, 1}; semaphore mutex = 1; pi() { while (1) { p(mutex); p(chopstick[i]); p(chopstick[(i + 1) % 5]); v(mutex); eat; v(chopstick[i]); v(chopstick[(i + 1) % 5]); think; } }
解决2:仅允许 4 名哲学家同时进餐
semaphore chopstick[5] = {1, 1, 1, 1, 1}; semaphore eating = 4; pi() { while (1) { p(eating); p(chopstick[i]); p(chopstick[(i + 1) % 5]); eat; v(chopstick[i]); v(chopstick[(i + 1) % 5]); v(eating); think; } }
解法3:奇数号哲学家争夺左边筷子,偶数号哲学家争夺右边筷子
semaphore chopstick[5] = {1, 1, 1, 1, 1}; pi() { while (1) { if (i & 1) { p(chopstick[i]); p(chopstick[(i + 1) % 5]); eat; v(chopstick[i]); v(chopstick[(i + 1) % 5]); } else { p(chopstick[(i + 1) % 5]); p(chopstick[i]); eat; v(chopstick[(i + 1) % 5]); v(chopstick[i]); } think; } }