一、如何理解死锁
比如线程A需要 线程B的资源才能解锁,线程B需要线程C的资源才能解锁,线程C需要线程A的资源才能解锁。现在A、B、C都不能获得想要的资源,于是都没法解锁,陷入了死锁。
如何进行死锁检测?
可以把死锁检测的问题转化为有向图的环路检测。
为了能够实现死锁检测,首先要模拟一个死锁出来。
二、模拟实现一个死锁
1、如何去模拟实现一个死锁?
可以创建2个线程a和b,使它们分别有 资源(锁)mtx1和mtx2。
在线程a的回调函数中,保证拥有资源mtx1,然后去申请mtx2
在线程b的回调函数中,保证拥有资源mtx2,然后去申请mtx1
这样子,两边都没办法获取对方的资源,从而造成死锁
这样子虽然可能出现死锁,但并不是百分百,比如出现线程b还未创建完,线程a的回调已经执行完了,这样子线程a和线程b的回调都能顺利执行,不会出现死锁现象。因此,可以在线程a获得mtx1后,sleep 1秒,使得线程b获得资源mtx2,同理,在线程b获得mtx2后,也可以执行sleep(1),再继续运行,会进入死锁状态。
并且可以添加printf信息,可知,线程a与线程b分别在获取mtx1和mtx2后进入了死锁的状态
2、创建4个线程模拟实现死锁
既然已经知道2个线程如何实现死锁,那用4个线程也同样比较容易
void *thread_rountine_1(void *args) { pthread_t selfid = pthread_self(); // printf("thread_routine 1 : %ld \n", selfid); pthread_mutex_lock(&mutex_1); sleep(1); pthread_mutex_lock(&mutex_2); pthread_mutex_unlock(&mutex_2); pthread_mutex_unlock(&mutex_1); return (void *)(0); } void *thread_rountine_2(void *args) { pthread_t selfid = pthread_self(); // printf("thread_routine 2 : %ld \n", selfid); pthread_mutex_lock(&mutex_2); sleep(1); pthread_mutex_lock(&mutex_3); pthread_mutex_unlock(&mutex_3); pthread_mutex_unlock(&mutex_2); return (void *)(0); } void *thread_rountine_3(void *args) { pthread_t selfid = pthread_self(); // printf("thread_routine 3 : %ld \n", selfid); pthread_mutex_lock(&mutex_3); sleep(1); pthread_mutex_lock(&mutex_4); pthread_mutex_unlock(&mutex_4); pthread_mutex_unlock(&mutex_3); return (void *)(0); } void *thread_rountine_4(void *args) { pthread_t selfid = pthread_self(); // printf("thread_routine 4 : %ld \n", selfid); pthread_mutex_lock(&mutex_4); sleep(1); pthread_mutex_lock(&mutex_1); pthread_mutex_unlock(&mutex_1); pthread_mutex_unlock(&mutex_4); return (void *)(0);
三、死锁检测
可以把线程A向线程B申请资源(线程B占有着线程A想要的锁)看作有向图的一条边,A指向B。如果有向图构成一条回路,就说明存在死锁。
那么如何构建一张图呢?
首先当前线程A,需要知道哪个线程占有着 线程A想要申请的锁。可以定义一个结构体,存储当前锁所属的线程id。这样线程A就能找到想要申请的锁,所在的线程(比如是线程C),那么就会构建一条A指向C的边。
那么如何去做这些操作?
如果直接往原来的代码中去加入这些内容,首先破坏了原先的代码结构,其次如果代码复杂可能存在漏加的情况,为了解决这些问题,因此要使用hook,在原来代码结构不变的情况下,通过hook,可以构建出有向图的信息。
1、实现hook
如何知道资源被哪个线程 占有了呢?可以使用hook,可以使得在原来代码不变的情况下,用自己实现的pthread_mutex_lock
代替系统调用的pthread_mutex_lock
(系统允许用户重新定义标准库函数,但此时该函数将失去原有含义)
下面的就是用了函数指针pthread_mutex_unlock_t
去指向系统中的pthread_mutex_lock
,用其他函数名去表示系统调用的pthread_mutex_lock
typedef int (*pthread_mutex_lock_t)(pthread_mutex_t *mutex); pthread_mutex_lock_t pthread_mutex_lock_f; typedef int (*pthread_mutex_unlock_t)(pthread_mutex_t *mutex); pthread_mutex_unlock_t pthread_mutex_unlock_f; static int init_hook() { pthread_mutex_lock_f = dlsym(RTLD_NEXT, "pthread_mutex_lock"); pthread_mutex_unlock_f = dlsym(RTLD_NEXT, "pthread_mutex_unlock"); }
然后用自己构建的pthread_mutex_lock
去取代系统调用的pthread_mutex_lock
,这样做的好处是不需要修改原来的代码,就可以实现自己想要的功能。
int pthread_mutex_lock(pthread_mutex_t *mutex) { pthread_t selfid = pthread_self(); lock_before(selfid, (uint64)mutex); pthread_mutex_lock_f(mutex);//系统调用的`pthread_mutex_lock` lock_after(selfid, (uint64)mutex); } int pthread_mutex_unlock(pthread_mutex_t *mutex) { pthread_t selfid = pthread_self(); pthread_mutex_unlock_f(mutex); unlock_after(selfid, (uint64)mutex); }
其中lock_before
和lock_after
中就包含了,有向图构建,增加(删除)顶点或边等一系列操作。
2、构建有向图
增加记录当前线程id(pthread_self()
)和锁的id(mutex
)的功能,就可以知道哪个线程占用了哪个锁,以此来构建有向图。
占用锁前,如果当前线程(A)申请的锁,被其他线程(B)占用,就构造一条(A指向B)有向图的边。
void lock_before(uint64 thread_id, uint64 lockaddr) { int idx = 0; // list<threadid, toThreadid> for(idx = 0;idx < tg->lockidx;idx ++) { if ((tg->locklist[idx].lock_id == lockaddr)) {//如果当前的锁已经被占用 //创建边的起始顶点 struct source_type from; from.id = thread_id; from.type = PROCESS; add_vertex(from); //创建边的目的顶点 struct source_type to; to.id = tg->locklist[idx].id; tg->locklist[idx].degress++; to.type = PROCESS; add_vertex(to); //添加一条有向边(当前线程 指向 占用锁的线程) if (!verify_edge(from, to)) { add_edge(from, to); // } } } }
占用锁后,如果该锁之前没被占用过,就进行占用,记录当前线程的信息(其他线程申请该锁失败就能构造一条有向图的边)。
如果该锁之前被占用过了,比如当前线程A申请线程B所占有的锁(现在已经释放),A已经拥有该锁,那么就将A指向B的边给删掉,并且更新所属信息。
void lock_after(uint64 thread_id, uint64 lockaddr) { int idx = 0; if (-1 == (idx = search_lock(lockaddr))) { // lock list opera 如果这个锁从来没有被使用过 int eidx = search_empty_lock(lockaddr);//在locklist中找到一个空的位置 用来存放该锁 tg->locklist[eidx].id = thread_id; tg->locklist[eidx].lock_id = lockaddr; inc(&tg->lockidx, 1); } else { //如果这个 锁之前被使用过,并且已经被释放了。但是它依然会存在locklist里面,只要修改一下就好 //由于该线程获取了这个锁,因此需要把该线程,指向之前持有锁的线程 的边给删掉。 struct source_type from; from.id = thread_id; from.type = PROCESS; struct source_type to; to.id = tg->locklist[idx].id; tg->locklist[idx].degress --; to.type = PROCESS; if (verify_edge(from, to)) remove_edge(from, to); tg->locklist[idx].id = thread_id;//锁对应的线程要更新,锁的id不需要变,还是同一把锁 } }
3、数据组织方式
添加一条边,采用邻接表的方式,
int add_edge(struct source_type from, struct source_type to) { add_vertex(from); add_vertex(to); struct vertex *v = &(tg->list[search_vertex(from)]); while (v->next != NULL) { v = v->next; } v->next = create_vertex(to); }
构建邻接表
每个顶点,将相邻的顶点用链表的方式组织起来。
4、代码逻辑
beforelock、afterlock、afterunlock是核心,对有向图进行构建。
四、完整代码
https://github.com/xuheding/deadlock_detector