线程同步
线程同步是指在多线程的情况下,如果多个线程去访问共享资源,需要按照一定规则顺序依次去访问,保证共享资源的数据一致性。
1. 互斥锁
互斥相关函数
//互斥量
pthread_mutex_t mutex;
//pthread_mutex_init()
//互斥量初始化
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
参数
mutex 传出参数
attr 互斥量属性,通常设为NULL,线程间共享
//pthread_mutex_lock()
//互斥量加锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
//pthread_mutex_unlock()
//互斥量解锁
int pthread_mutex_unlock(pthread_mutex_t *mutex);
//pthread_mutex_trylock
//尝试加锁
int pthread_mutex_trylock(pthread_mutex_t *mutex);
//pthread_mutex_destroy()
//销毁互斥量
int pthread_mutex_destroy(pthread_mutex_t *mutex);
示例:
多线程访问全局变量num,并实现同步。
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
int num = 0;
pthread_mutex_t mutex;
void *threadMethod1(void *arg)
{
int i, cur;
while (num<5000)
{
pthread_mutex_lock(&mutex);
if (num == 5000)
{
break;
}
cur = num;
cur++;
num = cur;
printf("[%ld] num [%d]\n", pthread_self(), num);
pthread_mutex_unlock(&mutex);
}
}
void *threadMethod2(void *arg)
{
int i, cur;
while (num<5000)
{
pthread_mutex_lock(&mutex);
if (num == 5000)
{
break;
}
cur = num;
cur++;
num = cur;
printf("[%ld] num [%d]\n", pthread_self(), num);
pthread_mutex_unlock(&mutex);
}
// for (i = 0; i < 5000; i++)
// {
// pthread_mutex_lock(&mutex);
// cur = num;
// cur++;
// num = cur;
// printf("[%ld] num [%d]\n", pthread_self(), num);
// pthread_mutex_unlock(&mutex);
// }
}
int main()
{
pthread_mutex_init(&mutex, NULL);
pthread_t thread1, thread2;
int ret = pthread_create(&thread1, NULL, threadMethod1, NULL);
if (ret != 0)
{
printf("pthread_create error, [%s]\n", strerror(ret));
return -1;
}
ret = pthread_create(&thread2, NULL, threadMethod1, NULL);
if (ret != 0)
{
printf("pthread_create error, [%s]\n", strerror(ret));
return -1;
}
void *exit_status;
ret = pthread_join(thread1, &exit_status);
if (ret)
{
printf("Thread join error: %d\n", ret);
return -1;
}
ret = pthread_join(thread2, &exit_status);
if (ret)
{
printf("Thread join error: %d\n", ret);
return -1;
}
printf("main thread, pid is [%d], tid is [%ld]\n", getpid(), pthread_self());
pthread_mutex_destroy(&mutex);
return 0;
}
2. 读写锁
读共享,写独占,适用于读多余写的情况。读写同时等待时,写的优先级更高。
读写锁相关函数:
//1. 读写锁变量
pthread_rwlock_t rwlock;
//2. 读写锁初始化,参数attr为读写锁属性,设置为NULL表示设置为默认属性
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
//3. 加读锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
//4.尝试加读锁
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
//5. 加写锁
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
//6. 尝试加写锁
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
//7.解锁
int pthread_rwlock_unlock(&pthread_rwlock_t *rwlock);
//8. 销毁锁
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
示例:
5个读线程,3个写线程来访问共享资源num
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#define FINALNUM 100
int num = 0;
pthread_rwlock_t rdlock;
// 读线程
void *threadMethod1(void *arg)
{
int cur;
while (num<FINALNUM)
{
pthread_rwlock_rdlock(&rdlock);
if (num > FINALNUM)
{
pthread_rwlock_unlock(&rdlock);
break;
}
cur = num;
printf("read [%d] : [%ld] num [%d]\n",*(int *)arg, pthread_self(), cur);
pthread_rwlock_unlock(&rdlock);
sleep(rand()%3);
}
}
// 写线程
void *threadMethod2(void *arg)
{
int cur;
while (num < FINALNUM)
{
pthread_rwlock_wrlock(&rdlock);
if (num == FINALNUM)
{
pthread_rwlock_unlock(&rdlock);
break;
}
cur = num;
cur++;
num = cur;
printf("write [%d] : [%ld] num [%d]\n",*(int *)arg, pthread_self(), cur);
pthread_rwlock_unlock(&rdlock);
sleep(rand()%3);
}
}
int main()
{
pthread_rwlock_init(&rdlock, NULL);
pthread_t threadr[5], threadw[3];
int i = 0;
int ret;
int arr[8];
// 5个读线程
for (i = 0; i < 5; i++)
{
arr[i] = i + 1;
ret = pthread_create(&threadr[i], NULL, threadMethod1, &arr[i]);
if (ret != 0)
{
printf("pthread_create read [%d] error, [%s]\n", i+1, strerror(ret));
return -1;
}
}
// 3个写线程
for (i = 0; i < 3; i++)
{
arr[i + 5] = i + 1;
ret = pthread_create(&threadw[i], NULL, threadMethod2, &arr[i + 5]);
if (ret != 0)
{
printf("pthread_create write [%d] error, [%s]\n", i+1, strerror(ret));
return -1;
}
}
void *exit_status;
for (i = 0; i < 5; i++)
{
ret = pthread_join(threadr[i], &exit_status);
if (ret)
{
printf("Thread join read [%d] error: [%s]\n", i+1, strerror(ret));
return -1;
}
}
for (i = 0; i < 3; i++)
{
ret = pthread_join(threadw[i], &exit_status);
if (ret)
{
printf("Thread join write [%d] error: [%s]\n", i+1, strerror(ret));
return -1;
}
}
printf("main thread, pid is [%d], tid is [%ld]\n", getpid(), pthread_self());
pthread_rwlock_destroy(&rdlock);
printf("here is main ,num final is [%d]\n",num);
return 0;
}
//输出
...
read [5] : [140482278684416] num [96]
read [5] : [140482278684416] num [96]
read [3] : [140482295469824] num [96]
write [3] : [140482253506304] num [97]
write [3] : [140482253506304] num [98]
write [3] : [140482253506304] num [99]
read [1] : [140482312255232] num [99]
read [2] : [140482303862528] num [99]
read [4] : [140482287077120] num [99]
write [2] : [140482261899008] num [100]
main thread, pid is [3151], tid is [140482320643840]
here is main ,num final is [100]
3. 条件变量
条件变量不是锁,但是不满足,线程阻塞,释放互斥锁。条件满足,通知所有阻塞的线程去争夺互斥锁。
条件变量相关函数
//1.定义一个条件变量
pthread_cond_t cond;
//2.条件变量初始化,成功返回0, 失败返回错误编号
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
参数:
cond: 条件变量
attr: 条件变量属性, 传NULL表示默认属性
//2.条件不满足, 引起线程阻塞并解锁;条件满足, 解除线程阻塞, 并加锁。成功返回0, 失败返回错误号
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
cond: 条件变量
mutex: 互斥锁变量
//3.唤醒至少一个阻塞在该条件变量上的线程,成功返回0, 失败返回错误号。
int pthread_cond_signal(pthread_cond_t *cond);
//4.销毁条件变量,成功返回0, 失败返回错误编号
int pthread_cond_destroy(pthread_cond_t *cond);
示例:
多线程模拟生产者消费者模型,并且设置条件变量进行线程同步
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
typedef struct linklist
{
int num;
struct linklist *next;
} LINKLIST;
LINKLIST *head = NULL;
pthread_mutex_t mutex;
pthread_cond_t cond;
int len;
void *producter(void *arg)
{
LINKLIST *node = NULL;
while (1)
{
pthread_mutex_lock(&mutex);
node = (LINKLIST *)malloc(sizeof(LINKLIST));
if (node == NULL)
{
perror("malloc error");
exit(-1);
}
head->num = head->num + 1;
len += 1;
node->num = len;
node->next = head->next;
head->next = node;
printf("linklist len is [%d]\n", len);
printf("producer [%d] , cur monunt is [%d],get data num [%d]\n", *(int *)arg, head->num, node->num);
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&cond);
sleep(rand() % 3);
}
}
void *consumer(void *arg)
{
LINKLIST *temp = NULL;
while (1)
{
pthread_mutex_lock(&mutex);
printf("consumer [%d] mutex lock\n", *(int *)arg);
if (head->next == NULL)
{
pthread_cond_wait(&cond, &mutex);
printf("first consumer [%d] cond wait\n", *(int *)arg);
if (head->next == NULL)
{
printf("linklist len is [%d]\n", len);
pthread_mutex_unlock(&mutex);
printf("first consumer [%d] mutex unlock\n", *(int *)arg);
continue;
}
}
temp = head->next;
printf("consumer [%d] , cur monunt is [%d],get data num [%d]\n", *(int *)arg, head->num, temp->num);
head->next = temp->next;
temp->next =NULL;
free(temp);
temp = NULL;
head->num = head->num - 1;
len -= 1;
pthread_mutex_unlock(&mutex);
printf("consumer [%d] mutex unlock\n", *(int *)arg);
sleep(rand() % 3);
}
}
int main()
{
head = (LINKLIST *)malloc(sizeof(LINKLIST));
head->num = 0;
head->next = NULL;
len = 0;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond, NULL);
pthread_t products[3], consumers[3];
int arr[6];
int ret;
int i;
for (i = 0; i < 3; i++)
{
arr[i] = i + 1;
ret = pthread_create(&products[i], NULL, producter, &arr[i]);
if (ret != 0)
{
printf("pthread_create products [%d] error, [%s]\n", i + 1, strerror(ret));
return -1;
}
}
for (i = 0; i < 3; i++)
{
arr[i + 3] = i + 1;
ret = pthread_create(&consumers[i], NULL, consumer, &arr[i + 3]);
if (ret != 0)
{
printf("pthread_create consumers [%d] error, [%s]\n", i + 1, strerror(ret));
return -1;
}
}
void *exit_status;
for (i = 0; i < 3; i++)
{
ret = pthread_join(products[i], &exit_status);
if (ret)
{
printf("Thread join products [%d] error: %d\n", i + 1, ret);
return -1;
}
}
for (i = 0; i < 3; i++)
{
ret = pthread_join(consumers[i], &exit_status);
if (ret)
{
printf("Thread join consumers [%d] error: %d\n", i + 1, ret);
return -1;
}
}
printf("main thread, pid is [%d], tid is [%ld]\n", getpid(), pthread_self());
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mutex);
return 0;
}
4. 信号量
信号量可以看成是当某个互斥锁有多把的时候,多个线程可以同时访问一个共享资源,当锁不够的时候,就会阻塞。如果锁只有一把,就等同于一个互斥量的情况。
#include <semaphore.h>
//1. 定义信号量
sem_t sem;
//2. 初始化信号量,成功返回0, 失败返回-1, 并设置errno值
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
sem: 信号量变量
pshared: 0表示线程同步, 1表示进程同步
value: 最多有几个线程操作共享数据
//2. 调用该函数一次, 相当于sem--, 当sem为0的时候, 引起阻塞
//成功返回0, 失败返回-1, 并设置errno值
int sem_wait(sem_t *sem);
//3. 调用一次, 相当于sem++,成功返回0, 失败返回-1, 并设置errno值
int sem_post(sem_t *sem);
//4. 尝试加锁, 若失败直接返回, 不阻塞
//成功返回0, 失败返回-1, 并设置errno值
int sem_trywait(sem_t *sem);
//5. 销毁信号量,成功返回0, 失败返回-1, 并设置errno值
int sem_destroy(sem_t *sem);
示例:
信号量只有一个的时候等同于互斥锁
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <semaphore.h>
typedef struct linklist
{
int num;
struct linklist *next;
} LINKLIST;
LINKLIST *head = NULL;
// 定义信号量
sem_t sem_producer;
sem_t sem_consumer;
int len;
int tim = 0;
void *producter(void *arg)
{
LINKLIST *node = NULL;
while (1)
{
sem_wait(&sem_producer);
node = (LINKLIST *)malloc(sizeof(LINKLIST));
if (node == NULL)
{
perror("malloc error");
exit(-1);
}
head->num = head->num + 1;
len += 1;
node->num = len;
node->next = head->next;
head->next = node;
tim++;
printf("[%d] producer [%d] , cur monunt is [%d],set data num [%d]\n", tim, *(int *)arg, head->num, node->num);
printf("[%d] linklist len is [%d]\n", tim, len);
sem_post(&sem_consumer);
sleep(rand() % 3);
}
}
void *consumer(void *arg)
{
LINKLIST *temp = NULL;
while (1)
{
sem_wait(&sem_consumer);
tim++;
temp = head->next;
printf("[%d] consumer [%d] , cur monunt is [%d],get data num [%d]\n", tim, *(int *)arg, head->num, temp->num);
head->next = temp->next;
temp->next = NULL;
free(temp);
temp = NULL;
head->num = head->num - 1;
len -= 1;
sem_post(&sem_producer);
sleep(rand() % 3);
}
}
int main()
{
head = (LINKLIST *)malloc(sizeof(LINKLIST));
head->num = 0;
head->next = NULL;
len = 0;
sem_init(&sem_consumer, 0, 0);
sem_init(&sem_producer, 0, 1);
pthread_t products[3], consumers[3];
int arr[6];
int ret;
int i;
for (i = 0; i < 3; i++)
{
arr[i] = i + 1;
ret = pthread_create(&products[i], NULL, producter, &arr[i]);
if (ret != 0)
{
printf("pthread_create products [%d] error, [%s]\n", i + 1, strerror(ret));
return -1;
}
}
for (i = 0; i < 3; i++)
{
arr[i + 3] = i + 1;
ret = pthread_create(&consumers[i], NULL, consumer, &arr[i + 3]);
if (ret != 0)
{
printf("pthread_create consumers [%d] error, [%s]\n", i + 1, strerror(ret));
return -1;
}
}
void *exit_status;
for (i = 0; i < 3; i++)
{
ret = pthread_join(products[i], &exit_status);
if (ret)
{
printf("Thread join products [%d] error: %d\n", i + 1, ret);
return -1;
}
}
for (i = 0; i < 3; i++)
{
ret = pthread_join(consumers[i], &exit_status);
if (ret)
{
printf("Thread join consumers [%d] error: %d\n", i + 1, ret);
return -1;
}
}
printf("main thread, pid is [%d], tid is [%ld]\n", getpid(), pthread_self());
sem_destroy(&sem_consumer);
sem_destroy(&sem_producer);
return 0;
}