【从入门到放弃-MySQL】数据库连接过程分析

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 前言上周出现了几次连接超时、连接池满还有dbc连接事务模板失败的问题。所以有必要深入了解下MySQL的连接过程。正好,上周研究了怎么用Clion调试MySQL源码,接下来通过调试来研究一下吧。服务端启动sql/main.

前言

上周出现了几次连接超时、连接池满还有dbc连接事务模板失败的问题。所以有必要深入了解下MySQL的连接过程。

正好,上周研究了怎么用Clion调试MySQL源码,接下来通过调试来研究一下吧。

服务端

启动

sql/main.cc

extern int mysqld_main(int argc, char **argv);

int main(int argc, char **argv) { return mysqld_main(argc, argv); }
  • main:入口文件,仅调用了mysqld_main函数

sql/mysqld.cc

int mysqld_main(int argc, char **argv)
#endif
{
if (my_init())  // init my_sys library & pthreads
  {
    ...
  }
  ...
  if (load_defaults(MYSQL_CONFIG_NAME, load_default_groups, &argc, &argv,
                    &argv_alloc)) {
    ...
  }
  mysqld_socket_acceptor->connection_event_loop();

  mysqld_exit(signal_hand_thr_exit_code);
}
  • mysql_main:MySQL服务端启动逻辑的主要处理函数
  • my_init:系统库和线程初始化
  • load_defaults:加载my.cnf各参数
  • connection_event_loop:循环监听套接字。

sql/conn_handler/connection_acceptor.h

  /**
    Connection acceptor loop to accept connections from clients.
  */
  void connection_event_loop() {
    Connection_handler_manager *mgr =
        Connection_handler_manager::get_instance();
    while (!connection_events_loop_aborted()) {
      Channel_info *channel_info = m_listener->listen_for_connection_event();
      if (channel_info != NULL) mgr->process_new_connection(channel_info);
    }
  }
  • connection_event_loop:通过socket_connection.cc::listen_for_connection_event循环监听,直到有新的连接,开始connection_handler_manager.cc::process_new_connection新连接的处理过程。

新连接

服务端一直处于监听状态,当有新连接请求时,调用process_new_connection处理新连接。

sql/conn_handler/connection_handler_manager.cc

void Connection_handler_manager::process_new_connection(
    Channel_info *channel_info) {
  if (connection_events_loop_aborted() ||
      !check_and_incr_conn_count(channel_info->is_admin_connection())) {
    channel_info->send_error_and_close_channel(ER_CON_COUNT_ERROR, 0, true);
    delete channel_info;
    return;
  }

  if (m_connection_handler->add_connection(channel_info)) {
    inc_aborted_connects();
    delete channel_info;
  }
}
  • connection_events_loop_aborted:先判断是否已取消监听
  • check_and_incr_conn_count:再判断(会加锁)是否现有连接数是否大于连接最大值(连接池满),未满,则将线程数加一,满了则拒绝连接。(注意,这里的判断逻辑使MySQL的实际最大连接数是max_connections + 1)
  • add_connection:调用add_connection添加连接

sql/conn_handler/connection_handler_pre_thread.cc

bool Per_thread_connection_handler::add_connection(Channel_info *channel_info) {
  int error = 0;
  my_thread_handle id;

  DBUG_ENTER("Per_thread_connection_handler::add_connection");

  // Simulate thread creation for test case before we check thread cache
  DBUG_EXECUTE_IF("fail_thread_create", error = 1; goto handle_error;);

  if (!check_idle_thread_and_enqueue_connection(channel_info))
    DBUG_RETURN(false);

  /*
    There are no idle threads avaliable to take up the new
    connection. Create a new thread to handle the connection
  */
  channel_info->set_prior_thr_create_utime();
  error =
      mysql_thread_create(key_thread_one_connection, &id, &connection_attrib,
                          handle_connection, (void *)channel_info);
#ifndef DBUG_OFF
handle_error:
#endif  // !DBUG_OFF

  if (error) {
    ...
    //错误处理,略
  }

  Global_THD_manager::get_instance()->inc_thread_created();
  DBUG_PRINT("info", ("Thread created"));
  DBUG_RETURN(false);
}
  • 调用check_idle_thread_and_enqueue_connection查看是否有空闲的线程,有则将本次连接信息加入等待队列,并给空闲线程发送唤醒信号;否则新建线程处理本次连接
  • 在新线程中,调用handle_connection函数开始进行逻辑处理。
static void *handle_connection(void *arg) {
  Global_THD_manager *thd_manager = Global_THD_manager::get_instance();
  Connection_handler_manager *handler_manager =
      Connection_handler_manager::get_instance();
  Channel_info *channel_info = static_cast<Channel_info *>(arg);
  bool pthread_reused MY_ATTRIBUTE((unused)) = false;

  if (my_thread_init()) {
    ...
    //错误处理,略
  }

  for (;;) {
    THD *thd = init_new_thd(channel_info);
    if (thd == NULL) {
      ...
      //错误处理,略
    }

#ifdef HAVE_PSI_THREAD_INTERFACE
    if (pthread_reused) {
      ...
      //错误处理,略
    }
#endif

#ifdef HAVE_PSI_THREAD_INTERFACE
    /* Find the instrumented thread */
    PSI_thread *psi = PSI_THREAD_CALL(get_thread)();
    /* Save it within THD, so it can be inspected */
    thd->set_psi(psi);
#endif /* HAVE_PSI_THREAD_INTERFACE */
    mysql_thread_set_psi_id(thd->thread_id());
    mysql_thread_set_psi_THD(thd);
    mysql_socket_set_thread_owner(
        thd->get_protocol_classic()->get_vio()->mysql_socket);

    thd_manager->add_thd(thd);

    if (thd_prepare_connection(thd))
      handler_manager->inc_aborted_connects();
    else {
      while (thd_connection_alive(thd)) {
        if (do_command(thd)) break;
      }
      end_connection(thd);
    }
    close_connection(thd, 0, false, false);

    thd->get_stmt_da()->reset_diagnostics_area();
    thd->release_resources();

    // Clean up errors now, before possibly waiting for a new connection.
#ifndef HAVE_WOLFSSL
#if OPENSSL_VERSION_NUMBER < 0x10100000L
    ERR_remove_thread_state(0);
#endif /* OPENSSL_VERSION_NUMBER < 0x10100000L */
#endif
    thd_manager->remove_thd(thd);
    Connection_handler_manager::dec_connection_count();

#ifdef HAVE_PSI_THREAD_INTERFACE
    /*
      Delete the instrumentation for the job that just completed.
    */
    thd->set_psi(NULL);
    PSI_THREAD_CALL(delete_current_thread)();
#endif /* HAVE_PSI_THREAD_INTERFACE */

    delete thd;

    // Server is shutting down so end the pthread.
    if (connection_events_loop_aborted()) break;

    channel_info = Per_thread_connection_handler::block_until_new_connection();
    if (channel_info == NULL) break;
    pthread_reused = true;
    if (connection_events_loop_aborted()) {
      ...
      //错误处理,略
    }
  }

  my_thread_end();
  my_thread_exit(0);
  return NULL;
}
  • 会对连接进行thd_prepare_connection预处理操作,没问题后继续下面的逻辑。
  • 当连接未被关闭,就会一直do_command处理请求。
  • 当连接关闭,则走下面关闭逻辑

执行

sql/sql_parse.cc

bool do_command(THD *thd) {
  bool return_value;
  int rc;
  NET *net = NULL;
  enum enum_server_command command;
  COM_DATA com_data;
  DBUG_ENTER("do_command");
  DBUG_ASSERT(thd->is_classic_protocol());

  /*
    indicator of uninitialized lex => normal flow of errors handling
    (see my_message_sql)
  */
  thd->lex->set_current_select(0);

  /*
    XXX: this code is here only to clear possible errors of init_connect.
    Consider moving to prepare_new_connection_state() instead.
    That requires making sure the DA is cleared before non-parsing statements
    such as COM_QUIT.
  */
  thd->clear_error();  // Clear error message
  thd->get_stmt_da()->reset_diagnostics_area();

  /*
    This thread will do a blocking read from the client which
    will be interrupted when the next command is received from
    the client, the connection is closed or "net_wait_timeout"
    number of seconds has passed.
  */
  net = thd->get_protocol_classic()->get_net();
  my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
  net_new_transaction(net);

  /*
    Synchronization point for testing of KILL_CONNECTION.
    This sync point can wait here, to simulate slow code execution
    between the last test of thd->killed and blocking in read().

    The goal of this test is to verify that a connection does not
    hang, if it is killed at this point of execution.
    (Bug#37780 - main.kill fails randomly)

    Note that the sync point wait itself will be terminated by a
    kill. In this case it consumes a condition broadcast, but does
    not change anything else. The consumed broadcast should not
    matter here, because the read/recv() below doesn't use it.
  */
  DEBUG_SYNC(thd, "before_do_command_net_read");

  /*
    Because of networking layer callbacks in place,
    this call will maintain the following instrumentation:
    - IDLE events
    - SOCKET events
    - STATEMENT events
    - STAGE events
    when reading a new network packet.
    In particular, a new instrumented statement is started.
    See init_net_server_extension()
  */
  thd->m_server_idle = true;
  rc = thd->get_protocol()->get_command(&com_data, &command);
  thd->m_server_idle = false;

  if (rc) {
    ...
    //错误处理,略
  }

  char desc[VIO_DESCRIPTION_SIZE];
  vio_description(net->vio, desc);
  DBUG_PRINT("info", ("Command on %s = %d (%s)", desc, command,
                      command_name[command].str));

  DBUG_PRINT("info", ("packet: '%*.s'; command: %d",
                      thd->get_protocol_classic()->get_packet_length(),
                      thd->get_protocol_classic()->get_raw_packet(), command));
  if (thd->get_protocol_classic()->bad_packet)
    DBUG_ASSERT(0);  // Should be caught earlier

  // Reclaim some memory
  thd->get_protocol_classic()->get_output_packet()->shrink(
      thd->variables.net_buffer_length);
  /* Restore read timeout value */
  my_net_set_read_timeout(net, thd->variables.net_read_timeout);

  return_value = dispatch_command(thd, &com_data, command);
  thd->get_protocol_classic()->get_output_packet()->shrink(
      thd->variables.net_buffer_length);

out:
  /* The statement instrumentation must be closed in all cases. */
  DBUG_ASSERT(thd->m_digest == NULL);
  DBUG_ASSERT(thd->m_statement_psi == NULL);
  DBUG_RETURN(return_value);
}
  • 主要的处理逻辑为dispatch_command,根据不同的command类型进行分发。
/**
  Perform one connection-level (COM_XXXX) command.

  @param thd             connection handle
  @param command         type of command to perform
  @param com_data        com_data union to store the generated command

  @todo
    set thd->lex->sql_command to SQLCOM_END here.
  @todo
    The following has to be changed to an 8 byte integer

  @retval
    0   ok
  @retval
    1   request of thread shutdown, i. e. if command is
        COM_QUIT
*/
bool dispatch_command(THD *thd, const COM_DATA *com_data,
                      enum enum_server_command command) {
  ... //太长不看
  
  switch (command) {
    case ... //太长不看
    case COM_QUERY: {
      ... //太长不看
      mysql_parse(thd, &parser_state);

      ... //太长不看

      DBUG_PRINT("info", ("query ready"));
      break;
    }
    case ... //太长不看
    default:
      my_error(ER_UNKNOWN_COM_ERROR, MYF(0));
      break;
  }
}
  • 主要看COM_QUERY这个逻辑,我们要用到的DDL、DML都会走这个流程,这个流程中主要是调用mysql_parse方法
/**
  Parse a query.

  @param thd          Current session.
  @param parser_state Parser state.
*/

void mysql_parse(THD *thd, Parser_state *parser_state) {
  ... //太长不看

  mysql_reset_thd_for_next_command(thd);
  
  if (!err) {
    err = parse_sql(thd, parser_state, NULL);
    ... //太长不看
  }

  if (!err) {
    
    mysql_rewrite_query(thd);

    ... //太长不看
  }

  if (!err) {
    ...
    error = mysql_execute_command(thd, true);
    ...
  }
}
  • 主要是SQL语法解析和执行
  • mysql_reset_thd_for_next_command是对下一次执行做准备,重置线程各变量
  • mysql_rewrite_query看着像是SQL优化?待定 还没追进去,记个TODO
  • 词法解析前不应该有缓存吗?没有找到缓存的逻辑,记个TODO

关闭连接

sql/conn_handler/connection_handler_pre_thread.cc

Channel_info *Per_thread_connection_handler::block_until_new_connection() {
  Channel_info *new_conn = NULL;
  mysql_mutex_lock(&LOCK_thread_cache);
  if (blocked_pthread_count < max_blocked_pthreads && !shrink_cache) {
    /* Don't kill the pthread, just block it for reuse */
    DBUG_PRINT("info", ("Blocking pthread for reuse"));

    /*
      mysys_var is bound to the physical thread,
      so make sure mysys_var->dbug is reset to a clean state
      before picking another session in the thread cache.
    */
    DBUG_POP();
    DBUG_ASSERT(!_db_is_pushed_());

    // Block pthread
    blocked_pthread_count++;
    while (!connection_events_loop_aborted() && !wake_pthread && !shrink_cache)
      mysql_cond_wait(&COND_thread_cache, &LOCK_thread_cache);
    blocked_pthread_count--;

    if (shrink_cache && blocked_pthread_count <= max_blocked_pthreads) {
      mysql_cond_signal(&COND_flush_thread_cache);
    }

    if (wake_pthread) {
      wake_pthread--;
      if (!waiting_channel_info_list->empty()) {
        new_conn = waiting_channel_info_list->front();
        waiting_channel_info_list->pop_front();
        DBUG_PRINT("info", ("waiting_channel_info_list->pop %p", new_conn));
      } else {
        DBUG_ASSERT(0);  // We should not get here.
      }
    }
  }
  mysql_mutex_unlock(&LOCK_thread_cache);
  return new_conn;
}
  • 如果阻塞的线程数小于最大阻塞线程数,则此线程不回收,而是进入阻塞状态(等待),等待新连接来的时候重复使用。
  • 否则关闭线程。

参考文献:
https://www.cnblogs.com/FateTHarlaown/p/8676166.html

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
7天前
|
SQL 关系型数据库 MySQL
MySQL事务日志-Undo Log工作原理分析
事务的持久性是交由Redo Log来保证,原子性则是交由Undo Log来保证。如果事务中的SQL执行到一半出现错误,需要把前面已经执行过的SQL撤销以达到原子性的目的,这个过程也叫做"回滚",所以Undo Log也叫回滚日志。
MySQL事务日志-Undo Log工作原理分析
|
4天前
|
SQL 关系型数据库 MySQL
网安入门之MySQL后端基础
《网安入门之MySQL后端基础》简介: 本文介绍了数据库及MySQL的基础知识,涵盖数据库的概念、结构与操作。数据库是组织化存储数据的集合,通过表、列、行等结构实现高效管理。MySQL作为开源的关系型数据库管理系统,广泛应用于Web开发。文中详细讲解了MySQL的基本操作,如增(INSERT)、删(DELETE)、改(UPDATE)、查(SELECT)等语句的使用方法,并介绍了数据库事务的ACID特性。此外,还探讨了SQL注入攻击的风险及防范措施,强调了预处理语句的重要性。最后,简述了PHP中mysqli扩展的使用方法,包括连接数据库、执行查询和关闭连接等步骤。
|
24天前
|
SQL 关系型数据库 MySQL
MySQL 窗口函数详解:分析性查询的强大工具
MySQL 窗口函数从 8.0 版本开始支持,提供了一种灵活的方式处理 SQL 查询中的数据。无需分组即可对行集进行分析,常用于计算排名、累计和、移动平均值等。基本语法包括 `function_name([arguments]) OVER ([PARTITION BY columns] [ORDER BY columns] [frame_clause])`,常见函数有 `ROW_NUMBER()`, `RANK()`, `DENSE_RANK()`, `SUM()`, `AVG()` 等。窗口框架定义了计算聚合值时应包含的行。适用于复杂数据操作和分析报告。
66 11
|
18天前
|
存储 Oracle 关系型数据库
数据库传奇:MySQL创世之父的两千金My、Maria
《数据库传奇:MySQL创世之父的两千金My、Maria》介绍了MySQL的发展历程及其分支MariaDB。MySQL由Michael Widenius等人于1994年创建,现归Oracle所有,广泛应用于阿里巴巴、腾讯等企业。2009年,Widenius因担心Oracle收购影响MySQL的开源性,创建了MariaDB,提供额外功能和改进。维基百科、Google等已逐步替换为MariaDB,以确保更好的性能和社区支持。掌握MariaDB作为备用方案,对未来发展至关重要。
44 3
|
18天前
|
安全 关系型数据库 MySQL
MySQL崩溃保险箱:探秘Redo/Undo日志确保数据库安全无忧!
《MySQL崩溃保险箱:探秘Redo/Undo日志确保数据库安全无忧!》介绍了MySQL中的三种关键日志:二进制日志(Binary Log)、重做日志(Redo Log)和撤销日志(Undo Log)。这些日志确保了数据库的ACID特性,即原子性、一致性、隔离性和持久性。Redo Log记录数据页的物理修改,保证事务持久性;Undo Log记录事务的逆操作,支持回滚和多版本并发控制(MVCC)。文章还详细对比了InnoDB和MyISAM存储引擎在事务支持、锁定机制、并发性等方面的差异,强调了InnoDB在高并发和事务处理中的优势。通过这些机制,MySQL能够在事务执行、崩溃和恢复过程中保持
47 3
|
18天前
|
SQL 关系型数据库 MySQL
数据库灾难应对:MySQL误删除数据的救赎之道,技巧get起来!之binlog
《数据库灾难应对:MySQL误删除数据的救赎之道,技巧get起来!之binlog》介绍了如何利用MySQL的二进制日志(Binlog)恢复误删除的数据。主要内容包括: 1. **启用二进制日志**:在`my.cnf`中配置`log-bin`并重启MySQL服务。 2. **查看二进制日志文件**:使用`SHOW VARIABLES LIKE &#39;log_%&#39;;`和`SHOW MASTER STATUS;`命令获取当前日志文件及位置。 3. **创建数据备份**:确保在恢复前已有备份,以防意外。 4. **导出二进制日志为SQL语句**:使用`mysqlbinlog`
62 2
|
1月前
|
关系型数据库 MySQL 数据库
Python处理数据库:MySQL与SQLite详解 | python小知识
本文详细介绍了如何使用Python操作MySQL和SQLite数据库,包括安装必要的库、连接数据库、执行增删改查等基本操作,适合初学者快速上手。
210 15
|
25天前
|
SQL 关系型数据库 MySQL
数据库数据恢复—Mysql数据库表记录丢失的数据恢复方案
Mysql数据库故障: Mysql数据库表记录丢失。 Mysql数据库故障表现: 1、Mysql数据库表中无任何数据或只有部分数据。 2、客户端无法查询到完整的信息。
|
1月前
|
关系型数据库 MySQL 数据库
数据库数据恢复—MYSQL数据库文件损坏的数据恢复案例
mysql数据库文件ibdata1、MYI、MYD损坏。 故障表现:1、数据库无法进行查询等操作;2、使用mysqlcheck和myisamchk无法修复数据库。
|
1月前
|
SQL 关系型数据库 MySQL
MySQL导入.sql文件后数据库乱码问题
本文分析了导入.sql文件后数据库备注出现乱码的原因,包括字符集不匹配、备注内容编码问题及MySQL版本或配置问题,并提供了详细的解决步骤,如检查和统一字符集设置、修改客户端连接方式、检查MySQL配置等,确保导入过程顺利。