Mysql源码学习——Thread Manager

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 一、前言    上篇的Connection Manager中,曾提及对于一个新到来的Connection,服务器会创建一个新的线程来处理这个连接。其实没那么简单,为了提高系统效率,减少频繁创建线程和中止线程的系统消耗,Mysql使用了线程缓冲区的概念,即如果一个连接断开,则并不销毁承载其的线程,而是将此线程放入线程缓冲区,并处于挂起状态,当下一个新的Connection到来时,首先去线程缓冲区去查找是否有空闲的线程,如果有,则使用之,如果没有则新建线程。

一、前言

    上篇的Connection Manager中,曾提及对于一个新到来的Connection,服务器会创建一个新的线程来处理这个连接。

其实没那么简单,为了提高系统效率,减少频繁创建线程和中止线程的系统消耗,Mysql使用了线程缓冲区的概念,即如果

一个连接断开,则并不销毁承载其的线程,而是将此线程放入线程缓冲区,并处于挂起状态,当下一个新的Connection到来

时,首先去线程缓冲区去查找是否有空闲的线程,如果有,则使用之,如果没有则新建线程。本问主要介绍这个线程缓冲区,

首先介绍下基本的概念。

 

二、基本概念

    1.线程创建函数

    大家知道,Mysql现在是插件式的存储引擎,只要实现规定的接口,就可实现自己的存储引擎。故Mysql的线程创建除了

出现在主服务器框架外,存储引擎也可能会进行线程的创建。通过设置断点,在我调试的版本中,发现了两个创建线程的函数。

pthread_create:Mysql自用的创建线程函数
os_thread_create:存储引擎innobase的创建线程的函数

    os_thread_create是存储引擎innobase的线程函数,先搁浅不研究了,重点看下pthread_create,首先看下其源码。

 

int  pthread_create(pthread_t *thread_id, pthread_attr_t *attr,
            pthread_handler func, void  *param)
{
   HANDLE  hThread;
   struct  pthread_map *map;
   DBUG_ENTER( "pthread_create" );
 
   if  (!(map= malloc ( sizeof (*map))))
     DBUG_RETURN(-1);

map->func=func; map->param=param;

   pthread_mutex_lock(&THR_LOCK_thread);
#ifdef __BORLANDC__
   hThread=( HANDLE )_beginthread(( void (_USERENTRY *)( void  *)) pthread_start,
                    attr->dwStackSize ? attr->dwStackSize :
                    65535, ( void *) map);
#else

hThread=(HANDLE)_beginthread((void( __cdecl *)(void *)) pthread_start, attr->dwStackSize ? attr->dwStackSize : 65535, (void*) map);

#endif
   DBUG_PRINT( "info" , ( "hThread=%lu" ,( long ) hThread));
   *thread_id=map->pthreadself=hThread;
   pthread_mutex_unlock(&THR_LOCK_thread);
 
   if  (hThread == ( HANDLE ) -1)
   {
     int  error= errno ;
     DBUG_PRINT( "error" ,
            ( "Can't create thread to handle request (error %d)" ,error));
     DBUG_RETURN(error ? error : -1);
   }
   VOID (SetThreadPriority(hThread, attr->priority)) ;
   DBUG_RETURN(0);
}

上面代码首先构造了一个map结构体,成员分别是函数地址和传入参数。然后调用操作系统的接口,_beginthread,但是执行函数并不是传入的函数——func,而是pthread_start,参数为map。继续跟踪pthread_start。

pthread_handler_t pthread_start( void  *param)
{
   pthread_handler

func=((struct pthread_map *) param)->func

;
   void  *func_param=(( struct  pthread_map *) param)->param;
   my_thread_init();         /* Will always succeed in windows */
   pthread_mutex_lock(&THR_LOCK_thread);   /* Wait for beginthread to return */
   win_pthread_self=(( struct  pthread_map *) param)->pthreadself;
   pthread_mutex_unlock(&THR_LOCK_thread);
   free (( char *) param);            /* Free param from create */
   pthread_exit(( void *) (*func)(func_param));
   return  0;               /* Safety */
}

   可以看出,pthread_start中调用了map的func元素,作为真正执行的函数体。OK,创建线程的函数跟踪到此!

   2.服务器启动时创建了哪些函数?

   通过在两个创建线程的地方设置断点,总结了下,在服务器启动时,创建了如下的线程。

pthread_create创建的线程

创建线程函数 线程执行函数

create_shutdown_thread

handle_shutdown

start_handle_manager

handle_manager

handle_connections_methods

handle_connections_sockets

 

innobase的os_thread_create创建的线程:

 

创建线程函数 线程执行函数

innobase_start_or_create_for_mysql

io_handler_thread(4个)

recv_recovery_from_checkpoint_finish

trx_rollback_or_clean_all_without_sess

innobase_start_or_create_for_mysql

srv_lock_timeout_thread

 

srv_error_monitor_thread

 

srv_monitor_thread

 

srv_master_thread

 

    还可以在调试过程中,通过暂停来看此时服务器中的线程,如下图:

1

   

三、线程缓冲池

        Mysql支持线程缓存,在多线程连接模式下,如果连接断开后,将这个线程放入空闲线程缓冲区,在下次有连接到来时,

先去缓冲池中查找是否有空闲线程,有则用之,无则创建。启动时可以设置线程缓冲池的数目:

Mysqld.exe --thread_cache_size=10

      在一个连接断开时,会调用cache_thread函数,将空闲的线程加入到cache中,以备后用。源码如下:

static  bool  cache_thread()
{
   safe_mutex_assert_owner(&LOCK_thread_count);
   if  (

cached_thread_count < thread_cache_size

&&
      ! abort_loop && !kill_cached_threads)
  {
    /* Don't kill the thread, just put it in cache for reuse */
    DBUG_PRINT( "info" , ( "Adding thread to cache" ));
    cached_thread_count++;
    while  (!abort_loop && ! wake_thread && ! kill_cached_threads)
      ( void ) pthread_cond_wait(&COND_thread_cache, &LOCK_thread_count);
    cached_thread_count--;
    if  (kill_cached_threads)

pthread_cond_signal(&COND_flush_thread_cache);

   if  (wake_thread)
   {
     THD *thd;
     wake_thread--;
     thd= thread_cache.get();
     thd->thread_stack= ( char *) &thd;          // For store_globals
     ( void ) thd->store_globals();
     /*
       THD::mysys_var::abort is associated with physical thread rather
       than with THD object. So we need to reset this flag before using
       this thread for handling of new THD object/connection.
     */
     thd->mysys_var-> abort = 0;
     thd->thr_create_utime= my_micro_time();
     threads.append(thd);
     return (1);
   }
}
return (0);
}

    上面我们的启动参数设置线程缓冲区为10,此时对应代码里面的thread_cache_size = 10,cached_thread_count记录

了此刻cache中的空闲线程数目,只有在cache未满的情况下,才会将新的空闲线程加入缓冲池中。加入到缓冲区其实就是将线

程挂起,pthread_cond_wait函数便是线程等待函数,在此函数中,会调用WaitForMultipleObjects进行事件等待。具体源码

如下:

int  pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
                            struct  timespec *abstime)
{
   int  result;
   long  timeout;
   union  ft64 now;
 
   if ( abstime != NULL )
   {
     GetSystemTimeAsFileTime(&now.ft);
 
     /*
       Calculate time left to abstime
       - subtract start time from current time(values are in 100ns units)
       - convert to millisec by dividing with 10000
     */
     timeout= ( long )((abstime->tv.i64 - now.i64) / 10000);
     
     /* Don't allow the timeout to be negative */
     if  (timeout < 0)
       timeout= 0L;
 
     /*
       Make sure the calucated timeout does not exceed original timeout
       value which could cause "wait for ever" if system time changes
     */
     if  (timeout > abstime->max_timeout_msec)
       timeout= abstime->max_timeout_msec;
 
   }
   else
   {
     /* No time specified; don't expire */
     timeout= INFINITE;
   }
 
   /*
     Block access if previous broadcast hasn't finished.
     This is just for safety and should normally not
     affect the total time spent in this function.
   */
   WaitForSingleObject(cond->broadcast_block_event, INFINITE);
 
   EnterCriticalSection(&cond->lock_waiting);
   cond->waiting++;
   LeaveCriticalSection(&cond->lock_waiting);
 
   LeaveCriticalSection(mutex);

result= WaitForMultipleObjects(2, cond->events, FALSE, timeout);

   EnterCriticalSection(&cond->lock_waiting);
   cond->waiting--;
   
   if  (cond->waiting == 0)
   {
     /*
       We're the last waiter to be notified or to stop waiting, so
       reset the manual event.
     */
     /* Close broadcast gate */
     ResetEvent(cond->events[BROADCAST]);
     /* Open block gate */
     SetEvent(cond->broadcast_block_event);
   }
   LeaveCriticalSection(&cond->lock_waiting);
   
   EnterCriticalSection(mutex);
 
   return  result == WAIT_TIMEOUT ? ETIMEDOUT : 0;
}

    此处是等待时间,何处进行事件通知呢?我们再次来到上篇所提及的为新的连接创建线程的代码中:

void  create_thread_to_handle_connection(THD *thd)
{
   if  (cached_thread_count > wake_thread)
   {
     /* Get thread from cache */
     thread_cache.append(thd);
     wake_thread++;

pthread_cond_signal(&COND_thread_cache);

   }
   Else
...
}

    上篇文章我们其实只讲了ELSE分支,而忽略了IF分支。wake_thread代表了唤醒的线程数,即在线程缓冲区中被再次使用的

线程,如果cache中的总数>被重新使用的数目,说明还有空闲的线程,此时进入if分支,调用phtread_cond_signal唤醒上面挂起

的空闲线程。

 

线程管理就到此为止了,这里只是介绍了下线程缓冲区的工作原理,并没有具体去介绍如何利用EVENT进行线程的挂起和唤醒,这些都是借助了操作系统的特性,有兴趣的可以自己研究下。这篇就到此为止,下节会介绍Mysql的用户身份认证原理和实现。

 

PS. 男怕入错行,夜半三更忙,一行又一行

踏着落叶,追寻着我的梦想。转载请注明出处
相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
6月前
|
运维 监控 安全
云HIS医疗管理系统源码——技术栈【SpringBoot+Angular+MySQL+MyBatis】
云HIS系统采用主流成熟技术,软件结构简洁、代码规范易阅读,SaaS应用,全浏览器访问前后端分离,多服务协同,服务可拆分,功能易扩展;支持多样化灵活配置,提取大量公共参数,无需修改代码即可满足不同客户需求;服务组织合理,功能高内聚,服务间通信简练。
209 4
|
16天前
|
关系型数据库 MySQL Linux
在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,并与使用 RPM 包安装进行了对比
本文介绍了在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,并与使用 RPM 包安装进行了对比。通过具体案例,读者可以了解如何准备环境、下载源码、编译安装、配置服务及登录 MySQL。编译源码安装虽然复杂,但提供了更高的定制性和灵活性,适用于需要高度定制的场景。
49 3
|
19天前
|
关系型数据库 MySQL Linux
在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置 MySQL 服务、登录设置等。
本文介绍了在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置 MySQL 服务、登录设置等。同时,文章还对比了编译源码安装与使用 RPM 包安装的优缺点,帮助读者根据需求选择最合适的方法。通过具体案例,展示了编译源码安装的灵活性和定制性。
61 2
|
1月前
|
关系型数据库 MySQL Linux
在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤
本文介绍了在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置服务等,并与使用 RPM 包安装进行了对比,帮助读者根据需求选择合适的方法。编译源码安装虽然复杂,但提供了更高的定制性和灵活性。
226 2
|
1月前
|
关系型数据库 MySQL Linux
在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤
【10月更文挑战第7天】本文介绍了在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置 MySQL 服务、登录设置等。同时,文章还对比了编译源码安装与使用 RPM 包安装的优缺点,帮助读者根据自身需求选择合适的方法。
58 3
|
5月前
|
存储 安全 Java
基于Java+MySQL停车场车位管理系统详细设计和实现(源码+LW+调试文档+讲解等)
基于Java+MySQL停车场车位管理系统详细设计和实现(源码+LW+调试文档+讲解等)
|
1月前
|
前端开发 Java 数据库连接
表白墙/留言墙 —— 中级SpringBoot项目,MyBatis技术栈MySQL数据库开发,练手项目前后端开发(带完整源码) 全方位全步骤手把手教学
本文是一份全面的表白墙/留言墙项目教程,使用SpringBoot + MyBatis技术栈和MySQL数据库开发,涵盖了项目前后端开发、数据库配置、代码实现和运行的详细步骤。
43 0
表白墙/留言墙 —— 中级SpringBoot项目,MyBatis技术栈MySQL数据库开发,练手项目前后端开发(带完整源码) 全方位全步骤手把手教学
|
3月前
|
存储 自然语言处理 关系型数据库
MySQL全文索引源码剖析之Insert语句执行过程
【8月更文挑战第17天】在MySQL中,处理含全文索引的`INSERT`语句涉及多步骤。首先进行语法解析确认语句结构无误;接着语义分析检查数据是否符合表结构及约束。随后存储引擎执行插入操作,若涉及全文索引则进行分词处理,并更新倒排索引结构。此外,事务管理确保了操作的完整性和一致性。通过示例创建含全文索引的表并插入数据,可见MySQL如何高效地处理此类操作,有助于优化数据库性能和提升全文搜索效果。
|
3月前
|
NoSQL 关系型数据库 MySQL
SpringBoot 集成 SpringSecurity + MySQL + JWT 附源码,废话不多直接盘
SpringBoot 集成 SpringSecurity + MySQL + JWT 附源码,废话不多直接盘
143 2
|
3月前
|
关系型数据库 MySQL Linux
【一键解锁神秘力量!】CentOS 7 通过编译源码方式安装 MySQL 数据库 —— 从零到英雄的数据库安装实战秘籍!
【8月更文挑战第9天】随着业务增长,对数据库的需求日益提高。在 CentOS 7 中,通过编译源码安装 MySQL 可提供更高定制性和灵活性。本文详细介绍从准备环境、下载源码、配置编译参数到安装 MySQL 的全过程,并对比 RPM 包安装方法,帮助读者根据需求选择合适方案。实践时需注意备份数据、选择合适版本、确保安全性和调优性能等要点。
217 1