Linux下各种锁地理解和使用以及总结解决一下epoll惊群问题(面试常考)

简介: Linux下各种锁地理解和使用以及总结解决一下epoll惊群问题(面试常考)

一.锁

  • 锁出现地原因

临界资源是什么:  多线程执行流所共享地资源  

锁地作用是什么, 可以做原子操作,  在多线程中针对临界资源地互斥访问...   保证一个时刻只有一个线程可以持有锁对于临界资源做修改操作...

任何一个线程如果需要修改,向临界资源做写入操作都必须持有锁,没有持有锁就不能对于临界资源做写入操作.

锁 : 保证同一时刻只能有一个线程对于临界资源做写入操作  (锁地功能)

  • 再一个直观地代码引出问题,再从指令集地角度去看问题
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
void* Routine(void* arg) {
  int *pcount = (int*)arg;
  for (int i = 0; i < 20000000; ++i) {
    (*pcount)++;
  }
  return (void*)0;  
}
int main() {
  int count = 0;
  pthread_t tid1, tid2, tid3;
  pthread_create(&tid1, NULL, Routine, (void*)&count);
  pthread_create(&tid2, NULL, Routine, (void*)&count);
  pthread_create(&tid3, NULL, Routine, (void*)&count);
  pthread_join(tid1, NULL);
  pthread_join(tid2, NULL);
  pthread_join(tid3, NULL);
  //看一看结果
  printf("count: %d\n", count);
  return 0;
}

  • 上述一个及其奇怪的结果,这个结果每一次运行都可能是不一样的,Why  ?  按照我们本来的想法是每一个线程 + 20000000   结果肯定应该是60000000呀,可以就是达不到这个值

为何?   (深入汇编指令来看)  一定将过程放置到汇编指令上去看就可以理解这个过程了.      

a++;  或者  a += 1; 这些操作的汇编操作是几个步骤?

其实是三个步骤:

将数据从内存读取到寄存器中

在寄存器中进行对应的运算

将数据运算结果从寄存器写回内存

正常情况下,数据少,操作的线程少,问题倒是不大,想一想要是这样的情况下,操作次数大,对齐操作的线程多,有些线程从中间切入进来了,在运算之后还没写回内存就另外一个线程切入进来同时对于之前的数据进行++  再写回内存, 啥效果,多次++ 操作之后结果确实一次加加操作后的结果。  这样的操作  (术语叫做函数的重入)  我觉得其实就是重入到了汇编指令中间了,还没将上一次运算的结果写回内存就从新对这个内存读取再运算写入,结果肯定和正常的逻辑后的结果不一样呀


来一幅图篇解释一下

咋办? 其实问题很清楚,我们只需要处理的是多条汇编指令不能让它中间被插入其他的线程运算.     (要想自己在执行汇编指令的时候别人不插入进来)  将多条汇编指令绑定成为一条指令不就OK了嘛。  

(也就是原子操作,将汇编指令改造成原子操作的方法我是不会的,感兴趣的可以去查阅一下咋办)

咱自己是汇编没学到精华不知道咋通过汇编来将多条指令搞成原子操作

但是操作系统给咱提供了线程的   绑定方式工具呀:mutex 互斥锁(互斥量), 自旋锁(spinlock),   读写锁(readers-writer lock)  他们也称作悲观锁. 作用都是一个样,将多个汇编指令锁成为一条原子操作     (此处的汇编指令也相当于如下的临界资源)

悲观锁:锁如其名,每次都悲观地认为其他线程也会来修改数据,进行写入操作,所以会在取数据前先加锁保护,当其他线程想要访问数据时,被阻塞挂起

乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前, 会判断其他数据在更新前有没有对数据进行修改。(个人了解不足,想要了解地可以查阅资料)

  • 互斥锁

最为常见使用地锁就是互斥锁, 也称互斥量.  mutex

特征,当其他线程持有互斥锁对临界资源做写入操作地时候,当前线程只能挂起等待,让出CPU,存在线程间切换工作  

解释一下存在线程间切换工作 :  当线程试图去获取锁对临界资源做写入操作时候,如果锁被别的线程正在持有,该线程会保存上下文直接挂起,让出CPU,等到锁被释放出来再进行线程间切换,从新持有CPU执行写入操作

互斥锁需要进行线程间切换,相比自旋锁而言性能会差上许多,因为自旋锁不会让出CPU, 也就不需要进行线程间切换地步骤,具体原理下一点详述

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
pthread_mutex_t mtx;
void* Routine(void* arg) {
  int *pcount = (int*)arg;
  for (int i = 0; i < 20000000; ++i) {
    pthread_mutex_lock(&mtx);
    (*pcount)++;
    pthread_mutex_unlock(&mtx);
  }
  return (void*)0;  
}
int main() {
  pthread_mutex_init(&mtx, NULL);
  int count = 0;
  pthread_t tid1, tid2, tid3;
  pthread_create(&tid1, NULL, Routine, (void*)&count);
  pthread_create(&tid2, NULL, Routine, (void*)&count);
  pthread_create(&tid3, NULL, Routine, (void*)&count);
  pthread_join(tid1, NULL);
  pthread_join(tid2, NULL);
  pthread_join(tid3, NULL);
  //看一看结果
  printf("count: %d\n", count);
  pthread_mutex_destroy(&mtx);//销毁锁
  return 0;
}

加互斥量(互斥锁)确实可以达到要求,但是会发现运行时间非常的长,因为线程间不断地切换也需要时间,  线程间切换的代价比较大.

  • 自旋锁

    spinlock.自旋锁.

对比互斥量(互斥锁)而言,获取自旋锁不需要进行线程间切换,如果自旋锁正在被别的线程占用,该线程也不会放弃CPU进行挂起休眠,而是恰如其名的在哪里不断地循环地查看自旋锁保持者(持有者)是否将自旋锁资源释放出来...    (自旋地原来就是如此)

口语解释自旋:持有自旋锁地线程不释放自旋锁,那也没有关系呀,我就在这里不断地一遍又一遍地查询自旋锁是否释放出来,一旦释放出来我立马就可以直接使用  (因为我并没有挂起等待,不需要像互斥锁还需要进行线程间切换,重新获取CPU,保存恢复上下文等等操作)

哪正是因为上述这些特点,线程尝试获取自旋锁,获取不到不会采取休眠挂起地方式,而是原地自旋(一遍又一遍查询自旋锁是否可以获取)效率是远高于互斥锁了.  哪我们是不是所有情况都使用自旋锁就行了呢,互斥锁就可以放弃使用了吗????

解释自旋锁地弊端:如果每一个线程都仅仅只是需要短时间获取这个锁,那我自旋占据CPU等待是没啥问题地。要是线程需要长时间地使用占据(锁)。。。  会造成过多地无端占据CPU资源,俗称站着茅坑不拉屎... 但是要是仅仅是短时间地自旋,平衡CPU利用率  +  程序运行效率  (自旋锁确实是在有些时候更加合适)

自旋锁需要场景:内核可抢占或者SMP(多处理器)情况下才真正需求  (避免死锁陷入死循环,疯狂地自旋,比如递归获取自旋锁.  你获取了还要获取,但是又没法释放)  

  • 自旋锁地使用函数其实和互斥锁几乎是一摸一样地,仅仅只是需要将所有的mutex换成spin即可
  • 仅仅只是在init存在些许不同  
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
pthread_spinlock_t mtx;
void* Routine(void* arg) {
  int *pcount = (int*)arg;
  for (int i = 0; i < 20000000; ++i) {
    pthread_spin_lock(&mtx);
    (*pcount)++;
    pthread_spin_unlock(&mtx);
  }
  return (void*)0;  
}
int main() {
  pthread_spin_init(&mtx, PTHREAD_PROCESS_SHARED);
  int count = 0;
  pthread_t tid1, tid2, tid3;
  pthread_create(&tid1, NULL, Routine, (void*)&count);
  pthread_create(&tid2, NULL, Routine, (void*)&count);
  pthread_create(&tid3, NULL, Routine, (void*)&count);
  pthread_join(tid1, NULL);
  pthread_join(tid2, NULL);
  pthread_join(tid3, NULL);
  //看一看结果
  printf("count: %d\n", count);
  pthread_spin_destroy(&mtx);//销毁锁
  return 0;
}

  • 解决上述地问题地方式二, 不是使用std=c99 而是直接将 int i 放置到for循环外面
  • 读写锁  +  读者写者模式:
  • 主要是处理读多写少地情况,和本文后序关联不大,需要的可自行查阅了解

二.epoll惊群问题地理解

何为惊群,池塘一堆🐟,  我瞄准一条插过去,但是好似所有的🐟都像是觉着自己正在被插一样的四处逃窜。 这个就是惊群的生活一点的理解

惊群现象其实一点也不少,比如说  accept   pthread_cond_broadcast 还有多个线程共享epoll监视一个listenfd  然后此刻  listenfd  说来 SYN了,放在了SYN队列中,然后完成了三次握手放在了 accept队列中了, 现在问题是这个connect我应该交付给哪一个线程处理呢.      

多个epoll监视准备工作的线程 就是这群 (🐟),然后connet就是鱼叉,这一叉下去肯定是所有的   epoll线程都会被惊醒    (多线程共享listenfd引发的epoll惊群)

同样如果将上述的多个线程换成多个进程共享监视   同一个 listenfd    就是(多进程的epoll惊群现象)  咱再画一个草图再来理解一下这个惊群:

  • 如果是多进程道理是一样滴,仅仅只是将所有的线程换成进程就OK了

三. epoll惊群问题地解决


    终是来到了今天地正题了:  epoll惊群问题地解决上面了...

首先  先说说accept地惊群问题,没想到吧accept 平时大家写它的多线程地时候,多个线程同时accept同一个listensock地时候也是会存在惊群问题地,但是accept地惊群问题已经被Linux内核处理了:    当有新的连接进入到accept队列的时候,内核唤醒且仅唤醒一个进程来处理

但是对于epoll地惊群问题,内核却没有直接进行处理。哪既然内核没有直接帮我们处理,我们应该如何针对这种现象做出一定地措施呢?  

惊群效应带来地弊端:     惊群现象会造成epoll地伪唤醒,本来epoll是阻塞挂起等待着地,这个时候因为挂起等待是不会占用CPU地。。。   但是一旦唤醒就会占用CPU去处理发生地IO事件, 但是其实是一个伪唤醒,这个就是对于线程或者进程地无效调度。然而进程或者线程地调取是需要花费代价地,需要上下文切换。需要进行进程(线程)间地不断切换...                 本来多核CPU是用来支持高并发地,但是现在却被用来无效地唤醒,对于多核CPU简直就是一种浪费  (浪费系统资源)  还会影响系统地性能.

解决方式(一般是两种)

Nginx地解决方式:

  • 加锁:惊群问题发生的前提是多个进程(线程)监听同一个套接字(listensock)上的事件,所以我们只让一个进程(线程)去处理监听套接字就可以了。
// 是否开启 accept 锁,
// 开启则需要抢锁,以防惊群,默认是关闭的。
if (ngx_use_accept_mutex) {
    if (ngx_accept_disabled > 0) {
        // ngx_accept_disabled 的值是经过算法计算出来的,
        // 当值大于 0 时,说明此进程负载过高,不再接收新连接。
        ngx_accept_disabled--;
    } else {
        // 尝试抢 accept 锁,发生错误直接返回
        if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
            return;
        }
        if (ngx_accept_mutex_held) {
            // 抢到锁,设置事件处理标识,后续事件先暂存队列中。
            flags |= NGX_POST_EVENTS;
        } else {
            // 未抢到锁,修改阻塞等待时间,使得下一次抢锁不会等待太久
            if (timer == NGX_TIMER_INFINITE
                || timer > ngx_accept_mutex_delay)
            {
                timer = ngx_accept_mutex_delay;
            }
        }
    }
}
  • 方式2: 是小杰可以理解地方式方法:  使用 设置SO_REUSEPORT:使得端口号可以复用, 如此多个进程或者线程便可以绑定同一个端口号了    这样相当于是每一个进程或线程都监视一个listensock

画两张图来理解一下:

四.  代码演示:

#include <stdio.h>
#include <sys/epoll.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/socket.h>
#include <string.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <sys/types.h>
#include <fcntl.h>
typedef struct sockaddr SA;
#define CLIENTSIZE 1000
#define BUFFSIZE 256
#define SERVE_PORT 8080
#define ERR_EXIT(m)\
  do { perror(m); close(EXIT_FAILURE); } while(0)
int CreateSocket() {
  int listensock = socket(AF_INET, SOCK_STREAM, 0);
  int reuseport = 1;
  if (-1 == setsockopt(listensock, SOL_SOCKET, SO_REUSEPORT, &reuseport, sizeof(reuseport))) {
    ERR_EXIT("setsocketopt");
  }
  struct sockaddr_in serveAdd;  
  //确定服务端协议地址簇
  memset(&serveAdd, 0, sizeof(serveAdd));//清空
  serveAdd.sin_family = AF_INET;
  serveAdd.sin_addr.s_addr = htonl(INADDR_ANY);//其实就是0.0.0.0 通配地址
  serveAdd.sin_port = htons(SERVE_PORT);
  if (-1 == bind(listensock, (SA*)&serveAdd, sizeof(serveAdd))) {
    ERR_EXIT("bind");
  } 
  if (-1 == listen(listensock, 5)) {
    ERR_EXIT("listen");
  }
  return listensock;
} 
void setnoblock(int fd) {
  int oldflag;
  oldflag = fcntl(fd, F_GETFL); //获取flag
  if (-1 == fcntl(fd, F_SETFL, oldflag | O_NONBLOCK)) {
    ERR_EXIT("fcnl");
  }
}
//像epfd中增加监视事件,将监视事件挂在到红黑树上
void addfd(int epfd, int fd) {
  struct epoll_event ev;
  ev.data.fd = fd;
  ev.events = EPOLLIN | EPOLLERR | EPOLLET;
  if (-1 == epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev)) {
    ERR_EXIT("epoll_ctl");
  }
  setnoblock(fd);
  //设置非阻塞IO,因为是ET
}
void delfd(int epfd, int fd) {
  struct epoll_event ev;
  if (-1 == epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev)) {
    ERR_EXIT("epoll_ctl");
  }
}
//使用多线程去演示
void* Routine(void* arg) {
  struct epoll_event* evs = (struct epoll_event*)calloc(CLIENTSIZE, sizeof(struct epoll_event));
  char buff[BUFFSIZE];
  //每一个线程都创建一个新地监视窗口,但是其实监视在一个port上
  //将问题抛给内核处理,
  int listensock = (int)arg;
  int epfd = epoll_create(CLIENTSIZE);
  int i;
  addfd(epfd, listensock);
  int count = 1;  //记录监视IO事件地数目
  while (1) {//循环监视
    int nready = epoll_wait(epfd, evs, count,-1);
    printf("tid: %d  线程被唤醒处理IO事件\n", pthread_self());
    sleep(2000);
    for (i = 0; i < nready; ++i) {
      if (evs[i].events & EPOLLERR) {
        //处理错误断开连接等等操作
      } else if ((evs[i].events & EPOLLIN) && evs[i].data.fd == listensock) {
        socklen_t clientLen;
        struct sockaddr_in clientAdd;
        //处理accept操作
        int connectsock = accept(listensock, (SA*)&clientAdd, &clientLen);
        if (connectsock == -1) {
          ERR_EXIT("accept");
        }
        printf("accept sucess and fd is %d\n", connectsock);
        //增加监视事件
        addfd(epfd, connectsock);
      } else if (evs[i].events & EPOLLIN) {
        //read
        //decode
        //compute
        //encode
        //修改成监视写事件
      } else if (evs[i].events & EPOLLOUT) {
        //write
        //改成读事件
      }
    }
  }
  free(evs);  //释放资源
}
int main() {
  pthread_t tid;
  int i;
//此处显示多个线程共享一个listensock 看看效果
  int listensock = CreateSocket();
  for (i = 0; i < 10; ++i) {  //简单地开十个线程
    //int listensock = CreateSocket();
    pthread_create(&tid, NULL, Routine, (void*)listensock);
    pthread_detach(tid);//分离线程
  }
    while (1);  //主线程等待子线程结束
  return 0;
}
  • 上述还没有进行一个每一个进程都对应一个listensock 而是多线程共享一个listensock 运行结果如下

所有的线程同时被唤醒了,但是实际上会处理连接的仅仅只是一个线程,

int main() {
  pthread_t tid;
  int i;
  //int listensock = CreateSocket();
  for (i = 0; i < 10; ++i) {  //简单地开十个线程
    int listensock = CreateSocket();
    pthread_create(&tid, NULL, Routine, (void*)listensock);
    pthread_detach(tid);//分离线程
  }
    while (1);  //主线程等待子线程结束
  return 0;
}
  • 咱仅仅只是将主线程做如上这样一个简单的修改,每一个线程对应一个listensock;每一个线程一个独有的监视窗口,将问题抛给内核去处理,让内核去负载均衡 : 结果如下  

仅仅唤醒一个线程来进行处理连接,解决了惊群问题

五. 总结本章:


    本文通过介绍两种锁入手,以及为什么需要锁,锁本质就是为了保护,持有锁你就有权力有能力操作写入一定的临界保护资源,没有锁你就不行需要等待,本质其实是将多条汇编指令绑定成原子操作

然后介绍了惊群现象,通过一个巧妙地例子,扔一颗石子,只是瞄准一条鱼扔过去了,但是整池鱼都被惊醒了,

对应我们地实际问题就是, 多个线程或者进程共同监视同一个listensock。。。。然后IO连接事件到来地时候本来仅仅只是需要一个线程醒过来处理即可,但是却会使得所有地线程(进程)全部醒过来,造成不必要地进程线程间切换,多核CPU被浪费喔,系统资源被浪费

处理方式   一。 Nginx  源码加互斥锁处理。。  二。设置SO_REUSEPORT,  使得多个进程线程可以同时连接同一个port ,  为每一个进程线程搞一个listensock...  将问题抛给内核去处理,让他去负载均衡地仅仅将IO连接事件分配给一个进程或线程


相关文章
|
12天前
|
Linux 数据库
Linux内核中的锁机制:保障并发操作的数据一致性####
【10月更文挑战第29天】 在多线程编程中,确保数据一致性和防止竞争条件是至关重要的。本文将深入探讨Linux操作系统中实现的几种关键锁机制,包括自旋锁、互斥锁和读写锁等。通过分析这些锁的设计原理和使用场景,帮助读者理解如何在实际应用中选择合适的锁机制以优化系统性能和稳定性。 ####
29 6
|
20天前
|
存储 JSON Java
细谈 Linux 中的多路复用epoll
大家好,我是 V 哥。`epoll` 是 Linux 中的一种高效多路复用机制,用于处理大量文件描述符(FD)事件。相比 `select` 和 `poll`,`epoll` 具有更高的性能和可扩展性,特别适用于高并发服务器。`epoll` 通过红黑树管理和就绪队列分离事件,实现高效的事件处理。本文介绍了 `epoll` 的核心数据结构、操作接口、触发模式以及优缺点,并通过 Java NIO 的 `Selector` 类展示了如何在高并发场景中使用多路复用。希望对大家有所帮助,欢迎关注威哥爱编程,一起学习进步。
|
1月前
|
NoSQL Java API
美团面试:Redis锁如何续期?Redis锁超时,任务没完怎么办?
在40岁老架构师尼恩的读者交流群中,近期有小伙伴在面试一线互联网企业时遇到了关于Redis分布式锁过期及自动续期的问题。尼恩对此进行了系统化的梳理,介绍了两种核心解决方案:一是通过增加版本号实现乐观锁,二是利用watch dog自动续期机制。后者通过后台线程定期检查锁的状态并在必要时延长锁的过期时间,确保锁不会因超时而意外释放。尼恩还分享了详细的代码实现和原理分析,帮助读者深入理解并掌握这些技术点,以便在面试中自信应对相关问题。更多技术细节和面试准备资料可在尼恩的技术文章和《尼恩Java面试宝典》中获取。
美团面试:Redis锁如何续期?Redis锁超时,任务没完怎么办?
|
1月前
|
Linux C++
Linux C/C++之IO多路复用(poll,epoll)
这篇文章详细介绍了Linux下C/C++编程中IO多路复用的两种机制:poll和epoll,包括它们的比较、编程模型、函数原型以及如何使用这些机制实现服务器端和客户端之间的多个连接。
24 0
Linux C/C++之IO多路复用(poll,epoll)
|
2月前
|
消息中间件 安全 前端开发
面试官:单核服务器可以不加锁吗?
面试官:单核服务器可以不加锁吗?
50 4
面试官:单核服务器可以不加锁吗?
|
2月前
|
存储 缓存 安全
【Java面试题汇总】多线程、JUC、锁篇(2023版)
线程和进程的区别、CAS的ABA问题、AQS、哪些地方使用了CAS、怎么保证线程安全、线程同步方式、synchronized的用法及原理、Lock、volatile、线程的六个状态、ThreadLocal、线程通信方式、创建方式、两种创建线程池的方法、线程池设置合适的线程数、线程安全的集合?ConcurrentHashMap、JUC
【Java面试题汇总】多线程、JUC、锁篇(2023版)
|
1月前
|
存储 Kubernetes 架构师
阿里面试:JVM 锁内存 是怎么变化的? JVM 锁的膨胀过程 ?
尼恩,一位经验丰富的40岁老架构师,通过其读者交流群分享了一系列关于JVM锁的深度解析,包括偏向锁、轻量级锁、自旋锁和重量级锁的概念、内存结构变化及锁膨胀流程。这些内容不仅帮助群内的小伙伴们顺利通过了多家一线互联网企业的面试,还整理成了《尼恩Java面试宝典》等技术资料,助力更多开发者提升技术水平,实现职业逆袭。尼恩强调,掌握这些核心知识点不仅能提高面试成功率,还能在实际工作中更好地应对高并发场景下的性能优化问题。
|
1月前
|
安全 Linux
Linux线程(十一)线程互斥锁-条件变量详解
Linux线程(十一)线程互斥锁-条件变量详解
|
3月前
|
监控 关系型数据库 MySQL
在Linux中,mysql的innodb如何定位锁问题?
在Linux中,mysql的innodb如何定位锁问题?
|
3月前
|
存储 安全 容器
【多线程面试题二十一】、 分段锁是怎么实现的?
这篇文章解释了分段锁的概念和实现方式,通过将数据分成多个段并在每段数据上使用独立锁,从而降低锁竞争,提高并发访问效率,举例说明了`ConcurrentHashMap`如何使用分段锁技术来实现高并发和线程安全。
【多线程面试题二十一】、 分段锁是怎么实现的?