手写死锁检测组件

简介: 手写死锁检测组件

1、死锁的概念

死锁:多个进程因竞争资源而造成的一种僵局(互相等待),若无外力作用,这些进程都将无法推进。

死锁产生的必要条件

  • 互斥条件:一段时间内某资源仅为一个进程占有。
  • 不剥夺条件:资源只能由获取该资源的进程自己来释放。
  • 请求并保持:进程已经保持了至少一个资源,但又提出了新的资源请求,而该资源已被其他进程占有,此时进程资源阻塞,但自己已获得的资源保持不放。
  • 循环等待:存在一种进程资源的循环等待链,链中每个进程已获得的资源同时被链中下一个进程请求。

2、死锁检测的实现

2.1、资源分配图

根据死锁的条件可以得出:死锁必定有环,有环不一定死锁。所以检测死锁的关键是检测资源分配图有没有构成一回路,可以使用 dfs 判断。

在资源分配图中,图中的顶点,表示进程,每个进程拥有自己的资源,同时也可以申请其他进程的资源;图中的边,代表资源申请边,弧尾顶点表示申请该资源的进程,弧头顶点表示拥有该资源的进程。

我们来手动构建一个简单的资源分配图,采用邻接表实现。图中的每个顶点代表线程,线程拥有的锁既作为资源使用,又作为互斥量使用。如图所示,是一种死锁的情况。

image.png


资源信息的数据结构,线程既可以拥有资源,也可以申请资源,资源绑定线程。为了实现复用,增加了 type 字段,当 type = RESOURCE,该结构体作为资源使用,放入资源链表; type = PROCESS,该结构体作为线程使用。

enum Type {PROCESS, RESOURCE};
 // 资源信息
 struct source_type {
     uint64 id;          // 拥有该资源的线程 id
     enum Type type;     // 顶点类型:线程 or 资源
     uint64 lock_id;     // 资源(锁) id
     int degress;        // 资源的出度,该资源被多少顶点(线程)申请
 };

图的顶点,表示线程;图的资源申请边,表示线程间的资源申请关系。

struct vertex {
     struct source_type s;   // 资源信息
     struct vertex *next;    // 指向下一个顶点(邻接表)
 };

图的管理信息,资源 (type = RESOURCE) 存储在资源链表,线程(type = PROCESS) 作为图上的顶点。

struct task_graph {
     struct vertex list[MAX];            // 存储顶点
     int num;                           // 顶点的数量
     struct source_type locklist[MAX];    // 资源链表(锁)
     int lockidx;                       // 资源(锁)的数量
 };

图的基本操作:增删改查

// 创建顶点
 struct vertex *create_vertex(struct source_type type) {
     struct vertex *tex = (struct vertex *)malloc(sizeof(struct vertex ));
     tex->s = type;
     tex->next = NULL;
     return tex;
 }
 // 寻找顶点信息对应在图中顶点的下标
 int search_vertex(struct source_type type) {
     for (int i = 0; i < tg->num; ++i) {
         if (tg->list[i].s.type == type.type && tg->list[i].s.id == type.id) {
             return i;
         }
     }
     return -1;
 }
 // 添加顶点
 void add_vertex(struct source_type type) {
     // 当前顶点不存在,才能添加顶点
     if (search_vertex(type) == -1) {
         tg->list[tg->num].s = type;
         tg->list[tg->num].next = NULL;
         tg->num ++;
     }
 }
 // 添加边
 int add_edge(struct source_type from, struct source_type to) {
     add_vertex(from);
     add_vertex(to);
     struct vertex *v = &(tg->list[search_vertex(from)]);
     while (v->next != NULL) {
         v = v->next;
     }
     v->next = create_vertex(to);
 }
 // 判断两个顶点间是否存在边
 int verify_edge(struct source_type i, struct source_type j) {
     if (tg->num == 0) return 0;
     int i = search_vertex(i);
     if (i == -1) {
         return 0;
     }
     struct vertex *v = &(tg->list[idx]);
     while (v != NULL) {
         if (v->s.id == j.id) return 1;
         v = v->next;    
     }
     return 0;
 }
 // 删除边
 int remove_edge(struct source_type from, struct source_type to) {
     int idxi = search_vertex(from);
     int idxj = search_vertex(to);
     // 如果边上两个顶点存在,则删除边
     if (idxi != -1 && idxj != -1) {
         struct vertex *v = &tg->list[idxi];
         struct vertex *remove;
         while (v->next != NULL) {
             if (v->next->s.id == to.id) {
                 remove = v->next;
                 v->next = v->next->next;
                 free(remove);
                 break;
             }
             v = v->next;
         }
     }
 }

判断图中是否有回路,最简单的方法是使用 dfs

int DFS(int idx) {
     struct vertex *ver = &tg->list[idx];
     // 如果当前结点已经访问过,说明存在环
     if (visited[idx] == 1) {
         path[k++] = idx;
         print_loopwait();
         deadlock = 1;
         return 0;
     }
     visited[idx] = 1;
     path[k++] = idx;
     // 继续 dfs
     while (ver->next != NULL) {
         DFS(search_vertex(ver->next->s));
         --k;    
         ver = ver->next;
     }   
     return 1;
 }
 // 检测图中顶点否存在死锁资源获取环
 int search_for_cycle(int idx) {
     struct vertex *ver = &tg->list[idx];
     visited[idx] = 1;
     k = 0;
     path[k++] = idx;
     while (ver->next != NULL) {
         int i = 0;
         for (i = 0; i < tg->num; ++i) {
             if (i == idx) continue;
             visited[i] = 0;
         }
         // 初始化结点路径
         for (i = 1;i <= MAX;i ++) {
             path[i] = -1;
         }
         k = 1;
         // dfs 判断是否有环
         DFS(search_vertex(ver->next->s));
         ver = ver->next;
     }
 }

2.2、死锁检测

如何检测死锁,首先要做的是判断资源被哪个线程占用?

这里采用 hook 机制,改造系统的 pthread_mutex_lockpthread_mutex_unlock 函数。

#define _GNU_SOURCE
 #include <dlfcn.h>
 // 1、typedef 系统函数指针
 typedef int (*pthread_mutex_lock_t)(pthread_mutex_t *mutex);
 typedef int (*pthread_mutex_unlock_t)(pthread_mutex_t *mutex);
 // 2、定义函数指针
 pthread_mutex_lock_t pthread_mutex_lock_f;
 pthread_mutex_unlock_t pthread_mutex_unlock_f;
 // 3、改造原有的系统函数,判断资源被哪个线程占用
 int pthread_mutex_lock(pthread_mutex_t *mutex) {
     pthread_t selfid = pthread_self(); 
     lock_before(selfid, (uint64)mutex);
     pthread_mutex_lock_f(mutex);
     lock_after(selfid, (uint64)mutex);
 }
 int pthread_mutex_unlock(pthread_mutex_t *mutex) {
     pthread_t selfid = pthread_self();
     pthread_mutex_unlock_f(mutex);
     unlock_after(selfid, (uint64)mutex);
 }
 // hook 
 static int init_hook() {
     pthread_mutex_lock_f = dlsym(RTLD_NEXT, "pthread_mutex_lock");
     pthread_mutex_unlock_f = dlsym(RTLD_NEXT, "pthread_mutex_unlock");
 }

这里注意到在改造系统函数的时候,有三个函数lock_beforelock_afterlock_after。接下来,分别介绍它们的作用。

2.2.1、lock_before

获取资源前,检测该资源是否被其他线程占用。

  • 如果被占用,则创建一条资源申请边,表示当前进程正在向拥有资源的线程申请该资源。这一过程就是构建资源分配图的过程。
  • 如果没有被占用,则跳过。

之后,尝试对该资源进行加锁,获取资源,线程安全。

void lock_before(uint64 thread_id, uint64 lockaddr) {
     for(int i = 0; i < tg->lockidx; ++i) {
         if ((tg->locklist[i].lock_id == lockaddr)) {
             // 构建资源分配图的过程
              // 创建申请该资源的顶点(PROCESS类型)
             struct source_type from;
             from.id = thread_id;
             from.type = PROCESS; 
             add_vertex(from);
             // 创建拥有该资源的顶点(PROCESS类型)
             struct source_type to;
             to.id = tg->locklist[i].id;
             // 申请该资源的结点数量+1
             tg->locklist[i].degress++;
             to.type = PROCESS;
             add_vertex(to);
             // 如果两个顶点间不存在资源申请边,增加一条边
             if (!verify_edge(from, to)) {
                 add_edge(from, to); 
             }
         }
     }
 }

2.2.2、lock_after

线程获取资源后,检查该资源是否存在(资源链表中是否存在)

  • 若该资源之前不存在,创建该资源,添加资源到资源链表中
  • 若该资源已经存在,则移除自己对该资源的申请边,表示请求已经得到满足
void lock_after(uint64 thread_id, uint64 lockaddr) {
     int i = 0;
     // 检查该资源是否存在
     // 1、该资源不存在,添加资源(RESOURCE类型)到资源链表中
     if (-1 == (i = search_lock(lockaddr))) {  // lock list opera 
         // 寻找资源链表中空闲的位置并添加该资源
         int eidx = search_empty_lock(lockaddr);
         tg->locklist[eidx].id = thread_id;      
         tg->locklist[eidx].lock_id = lockaddr;
         tg->locklist[eidx].type = RESOURCE; 
         inc(&tg->lockidx, 1);   // 原子操作:资源的数量+1
     }
     //  2、该资源(锁)存在,需要移除自己的请求边
     else {
         // 申请该资源的顶点(PROCESS类型)
         struct source_type from;
         from.id = thread_id;
         from.type = PROCESS;
         // 拥有该资源的顶点(PROCESS类型)
         struct source_type to;
         to.id = tg->locklist[i].id;
         // 申请该资源的顶点数-1
         tg->locklist[i].degress--;
         to.type = PROCESS;
         // 如果存在该资源申请边,则删除
         if (verify_edge(from, to)) {
             remove_edge(from, to);
         }
         // 线程占用该资源(锁)   
         tg->locklist[i].id = thread_id;
     }
 }

2.2.3、unlock_after

线程释放该资源后,检查该资源是否还被线程申请,没有则将其从资源链表中移除。

void unlock_after(uint64 thread_id, uint64 lockaddr) {
     // 查找该资源(锁)
     int i = search_lock(lockaddr);
     // 若该资源没有线程申请,则将其从资源链表中移除
     if (tg->locklist[i].degress == 0) {
         tg->locklist[i].id = 0;
         tg->locklist[i].lock_id = 0;
     }
 }

2.3、测试方法

参考2.1中的资源分配图,创建4个线程来实现该图中的资源分配情况。

pthread_mutex_t mutex_1 = PTHREAD_MUTEX_INITIALIZER;
 pthread_mutex_t mutex_2 = PTHREAD_MUTEX_INITIALIZER;
 pthread_mutex_t mutex_3 = PTHREAD_MUTEX_INITIALIZER;
 pthread_mutex_t mutex_4 = PTHREAD_MUTEX_INITIALIZER;
 void *thread_rountine_1(void *args) {
     pthread_t selfid = pthread_self(); 
     printf("thread_routine 1 : %ld \n", selfid);
     pthread_mutex_lock(&mutex_1);
     sleep(1); // 休眠,防止线程提前结束
     pthread_mutex_lock(&mutex_2);
     pthread_mutex_unlock(&mutex_2);
     pthread_mutex_unlock(&mutex_1);
     return (void *)(0);
 }
 void *thread_rountine_2(void *args) {
     pthread_t selfid = pthread_self(); 
     printf("thread_routine 2 : %ld \n", selfid);
     pthread_mutex_lock(&mutex_2);
     sleep(1);
     pthread_mutex_lock(&mutex_3);
     pthread_mutex_unlock(&mutex_3);
     pthread_mutex_unlock(&mutex_2);
     return (void *)(0);
 }
 void *thread_rountine_3(void *args) {
     pthread_t selfid = pthread_self(); 
     printf("thread_routine 3 : %ld \n", selfid);
     pthread_mutex_lock(&mutex_3);
     sleep(1);
     pthread_mutex_lock(&mutex_4);
     pthread_mutex_unlock(&mutex_4);
     pthread_mutex_unlock(&mutex_3);
     return (void *)(0);
 }
 void *thread_rountine_4(void *args) {
     pthread_t selfid = pthread_self(); 
     printf("thread_routine 4 : %ld \n", selfid);
     pthread_mutex_lock(&mutex_4);
     sleep(1);
     pthread_mutex_lock(&mutex_1);
     pthread_mutex_unlock(&mutex_1);
     pthread_mutex_unlock(&mutex_4);
     return (void *)(0);
 }
 int main() {
     init_hook();    
     start_check();  
     printf("start_check\n");
     pthread_t tid1, tid2, tid3, tid4;
     pthread_create(&tid1, NULL, thread_rountine_1, NULL);
     pthread_create(&tid2, NULL, thread_rountine_2, NULL);
     pthread_create(&tid3, NULL, thread_rountine_3, NULL);
     pthread_create(&tid4, NULL, thread_rountine_4, NULL);
     // 四个线程结束后,检测线程随着主线程结束
     pthread_join(tid1, NULL);
     pthread_join(tid2, NULL);
     pthread_join(tid3, NULL);
     pthread_join(tid4, NULL);
     return 0;
 }

单独一个线程用于死锁检测

// 检测死锁的方法
 void check_dead_lock(void) {
     deadlock = 0;
     for (int i = 0; i < tg->num; ++i) {
         if (deadlock == 1) break;
         // 真正的死锁检测环
         search_for_cycle(i);
     }
     if (deadlock == 0) {
         printf("no deadlock\n");
     }
 }
 // 检测死锁的线程
 static void *thread_routine(void *args) {
     while (1) {
         sleep(5);
         check_dead_lock();
     }
 }
 // 开启死锁检测
 void start_check(void) {
     tg = (struct task_graph*)malloc(sizeof(struct task_graph));
     tg->num = 0;
     tg->lockidx = 0;
     pthread_t tid;
     pthread_create(&tid, NULL, thread_routine, NULL);
 }

3、结果分析

gcc -o deadlock deadlock.c -lpthread -ldl

死锁出现的情况如图所示,检测到一直在 loopwait,循环等待,无法退出。

1704877305748.jpg

出现死锁


修改资源分配图(四个线程中获取资源的过程),使之不出现死锁,如图所示,死锁检测线程随着主线的结束而结束。

1704877328530.jpg

没有死锁

相关文章
|
3月前
|
Java 编译器
手写JAVA线程池
该博客文章介绍了如何手写一个简单的Java线程池,包括创建固定线程、提交任务到并发队列、线程从队列中消费任务以及如何停止线程的实现过程。
|
存储 监控 算法
实现死锁检测组件
实现死锁检测组件
58 0
01 # 手写 new 的原理
01 # 手写 new 的原理
43 0
|
6月前
|
Java API 开发工具
13分钟聊聊并发包中常用同步组件并手写一个自定义同步组件
13分钟聊聊并发包中常用同步组件并手写一个自定义同步组件
|
6月前
|
存储 监控 程序员
线程死锁检测组件逻辑与源码
线程死锁检测组件逻辑与源码
84 2
|
6月前
|
存储
动手实现死锁检测组件(通过hook与有向图实现)
动手实现死锁检测组件(通过hook与有向图实现)
78 1
|
6月前
|
C++
手写C/C++死锁检测
手写C/C++死锁检测
118 0
|
6月前
|
数据处理 数据库
死锁检测组件实现
死锁检测组件实现
40 0
死锁检测组件原理及代码实现
死锁检测组件原理及代码实现
|
6月前
|
存储
死锁检测组件
死锁检测组件
34 0