MGR插件初始化过程分析

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: [TOC] # MySQL Group Replication插件初始化过程 一、INSTALL PLUGIN group_replication SONAME 'group_replication.

一、INSTALL PLUGIN group_replication SONAME 'group_replication.so'

其实在MySQL中,不同的插件初始化过程都很类似,server 层的调用栈如下

(lldb) bt
* thread #20, stop reason = step over
  * frame #0: 0x000000011d4a6282 group_replication.so`plugin_group_replication_init(plugin_info=0x00007fb910815d38) at plugin.cc:1027
    frame #1: 0x00000001015e2ee4 mysqld`plugin_initialize(plugin=0x00007fb910815d38) at sql_plugin.cc:1244
    frame #2: 0x00000001015e7803 mysqld`mysql_install_plugin(thd=0x00007fb913859a00, name=0x00007fb91009fe18, dl=0x00007fb91009fe28) at sql_plugin.cc:2224
    frame #3: 0x00000001015e73e3 mysqld`Sql_cmd_install_plugin::execute(this=0x00007fb91009fe10, thd=0x00007fb913859a00) at sql_plugin.cc:4435
    frame #4: 0x00000001015abcc4 mysqld`mysql_execute_command(thd=0x00007fb913859a00, first_level=true) at sql_parse.cc:4835
    frame #5: 0x00000001015a10f5 mysqld`mysql_parse(thd=0x00007fb913859a00, parser_state=0x0000700001a95250) at sql_parse.cc:5570
    frame #6: 0x000000010159ded8 mysqld`dispatch_command(thd=0x00007fb913859a00, com_data=0x0000700001a95db8, command=COM_QUERY) at sql_parse.cc:1484
    frame #7: 0x00000001015a0216 mysqld`do_command(thd=0x00007fb913859a00) at sql_parse.cc:1025
    frame #8: 0x0000000101738c40 mysqld`::handle_connection(arg=0x00007fb91109f890) at connection_handler_per_thread.cc:300
    frame #9: 0x0000000101f35ffc mysqld`::pfs_spawn_thread(arg=0x00007fb911004ae0) at pfs.cc:2190
    frame #10: 0x00007fff784c2661 libsystem_pthread.dylib`_pthread_body + 340
    frame #11: 0x00007fff784c250d libsystem_pthread.dylib`_pthread_start + 377
    frame #12: 0x00007fff784c1bf9 libsystem_pthread.dylib`thread_start + 13

所有的插件初始化都会定义如下的初始化函数,对应到group_replication插件中如下

int plugin_group_replication_init(MYSQL_PLUGIN plugin_info)

初始化的过程如下:

二、 如果开启了performance_schema,注册相关监控项

  // Register all PSI keys at the time plugin init
#ifdef HAVE_PSI_INTERFACE
  register_all_group_replication_psi_keys();
#endif /* HAVE_PSI_INTERFACE */

register_all_group_replication_psi_keys();

void register_all_group_replication_psi_keys()
{
  register_group_replication_mutex_psi_keys(all_group_replication_psi_mutex_keys,
                                            array_elements(all_group_replication_psi_mutex_keys));
  register_group_replication_cond_psi_keys(all_group_replication_psi_condition_keys,
                                           array_elements(all_group_replication_psi_condition_keys));
  register_group_replication_thread_psi_keys(all_group_replication_psi_thread_keys,
                                             array_elements(all_group_replication_psi_thread_keys));
  register_group_replication_rwlock_psi_keys(all_group_replication_psi_rwlock_keys,
                                             array_elements(all_group_replication_psi_rwlock_keys));
}

2.1 注册mutex 监控项,在performance_schema开启的情况下,就可以在对应的表中查看这些mutex的占用情况。

static PSI_mutex_info all_group_replication_psi_mutex_keys[]=
{
  {&key_GR_LOCK_applier_module_run, "LOCK_applier_module_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_applier_module_suspend, "LOCK_applier_module_suspend", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_cert_broadcast_run, "LOCK_certifier_broadcast_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_cert_broadcast_dispatcher_run, "LOCK_certifier_broadcast_dispatcher_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_certification_info, "LOCK_certification_info", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_cert_members, "LOCK_certification_members", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_channel_observation_list, "LOCK_channel_observation_list", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_delayed_init_run, "LOCK_delayed_init_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_delayed_init_server_ready, "LOCK_delayed_init_server_ready", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_group_part_handler_run, "key_GR_LOCK_group_part_handler_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_group_part_handler_abort, "key_GR_LOCK_group_part_handler_abort", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_view_modification_wait, "LOCK_view_modification_wait", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_group_info_manager, "LOCK_group_info_manager", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_pipeline_continuation, "LOCK_pipeline_continuation", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_synchronized_queue, "LOCK_synchronized_queue", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_count_down_latch, "LOCK_count_down_latch", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_wait_ticket, "LOCK_wait_ticket", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_recovery_module_run, "LOCK_recovery_module_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_recovery, "LOCK_recovery", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_recovery_donor_selection, "LOCK_recovery_donor_selection", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_session_thread_method_exec, "LOCK_session_thread_method_exec", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_session_thread_run, "LOCK_session_thread_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_plugin_running, "LOCK_plugin_running", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_force_members_running, "LOCK_force_members_running", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_write_lock_protection, "LOCK_write_lock_protection", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_pipeline_stats_flow_control, "LOCK_pipeline_stats_flow_control", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_pipeline_stats_transactions_waiting_apply, "LOCK_pipeline_stats_transactions_waiting_apply", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_trx_unlocking, "LOCK_transaction_unblocking", PSI_FLAG_GLOBAL}
};

2.2 注册条件变量监控

  register_group_replication_cond_psi_keys(all_group_replication_psi_condition_keys,
                                           array_elements(all_group_replication_psi_condition_keys));

group_replication涉及到的条件变量有如下

static PSI_cond_info all_group_replication_psi_condition_keys[]=
{
  {&key_GR_COND_applier_module_run, "COND_applier_module_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_applier_module_suspend,  "COND_applier_module_suspend", PSI_FLAG_GLOBAL},
  {&key_GR_COND_applier_module_wait, "COND_applier_module_wait", PSI_FLAG_GLOBAL},
  {&key_GR_COND_cert_broadcast_run, "COND_certifier_broadcast_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_cert_broadcast_dispatcher_run, "COND_certifier_broadcast_dispatcher_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_delayed_init_run,  "COND_delayed_init_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_delayed_init_server_ready, "COND_delayed_init_server_ready", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_group_part_handler_run, "COND_group_part_handler_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_group_part_handler_abort, "COND_group_part_handler_abort", PSI_FLAG_GLOBAL},
  {&key_GR_COND_view_modification_wait, "COND_view_modification_wait", PSI_FLAG_GLOBAL},
  {&key_GR_COND_pipeline_continuation, "COND_pipeline_continuation", PSI_FLAG_GLOBAL},
  {&key_GR_COND_synchronized_queue, "COND_synchronized_queue", PSI_FLAG_GLOBAL},
  {&key_GR_COND_count_down_latch, "COND_count_down_latch", PSI_FLAG_GLOBAL},
  {&key_GR_COND_wait_ticket, "COND_wait_ticket", PSI_FLAG_GLOBAL},
  {&key_GR_COND_recovery_module_run, "COND_recovery_module_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_recovery, "COND_recovery", PSI_FLAG_GLOBAL},
  {&key_GR_COND_session_thread_method_exec, "COND_session_thread_method_exec", PSI_FLAG_GLOBAL},
  {&key_GR_COND_session_thread_run, "COND_session_thread_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_pipeline_stats_flow_control, "COND_pipeline_stats_flow_control", PSI_FLAG_GLOBAL},
};

2.3 注册group_replication相关的线程

register_group_replication_thread_psi_keys(all_group_replication_psi_thread_keys,
                                             array_elements(all_group_replication_psi_thread_keys));

group_replication中涉及到如下线程

static PSI_thread_info all_group_replication_psi_thread_keys[]=
{
  {&key_GR_THD_applier_module_receiver, "THD_applier_module_receiver", PSI_FLAG_GLOBAL},
  {&key_GR_THD_cert_broadcast, "THD_certifier_broadcast", PSI_FLAG_GLOBAL},
  {&key_GR_THD_delayed_init, "THD_delayed_initialization", PSI_FLAG_GLOBAL},
  {&key_GR_THD_plugin_session, "THD_plugin_server_session", PSI_FLAG_GLOBAL},
  {&key_GR_THD_group_partition_handler, "THD_group_partition_handler", PSI_FLAG_GLOBAL},
  {&key_GR_THD_recovery, "THD_recovery", PSI_FLAG_GLOBAL}
};

2.4 注册读写锁监控

register_group_replication_rwlock_psi_keys(all_group_replication_psi_rwlock_keys,
                                             array_elements(all_group_replication_psi_rwlock_keys));

涉及到的读写锁有如下

static PSI_rwlock_info all_group_replication_psi_rwlock_keys[]=
{
  {&key_GR_RWLOCK_cert_stable_gtid_set, "RWLOCK_certifier_stable_gtid_set", PSI_FLAG_GLOBAL},
  {&key_GR_RWLOCK_io_cache_unused_list , "RWLOCK_io_cache_unused_list", PSI_FLAG_GLOBAL},
  {&key_GR_RWLOCK_plugin_stop, "RWLOCK_plugin_stop", PSI_FLAG_GLOBAL},
  {&key_GR_RWLOCK_gcs_operations, "RWLOCK_gcs_operations", PSI_FLAG_GLOBAL},
  {&key_GR_RWLOCK_gcs_operations_finalize_ongoing, "RWLOCK_gcs_operations_finalize_ongoing", PSI_FLAG_GLOBAL}
};

三、初始化会使用到的互斥锁,读写锁等

  mysql_mutex_init(key_GR_LOCK_plugin_running, &plugin_running_mutex,
                   MY_MUTEX_INIT_FAST);
  mysql_mutex_init(key_GR_LOCK_force_members_running,
                   &force_members_running_mutex,
                   MY_MUTEX_INIT_FAST);

  plugin_stop_lock= new Checkable_rwlock(
#ifdef HAVE_PSI_INTERFACE
                                         key_GR_RWLOCK_plugin_stop
#endif /* HAVE_PSI_INTERFACE */
                                        );

  shared_plugin_stop_lock= new Shared_writelock(plugin_stop_lock);
  • plugin_running_mutex

负责保护变量group_replication_running

  • force_members_running_mutex

负责保护force_members_running

四、作为观察者向server注册回调函数

注册的目的是为了在某些过程中的回调。

4.1 注册server_state_observer

server state observer主要用于实现,在数据库启动停止时的回调动作。如下:

Server_state_observer server_state_observer = {
  sizeof(Server_state_observer),

  group_replication_before_handle_connection,  //before the client connects to the server
  group_replication_before_recovery,           //before recovery
  group_replication_after_engine_recovery,     //after engine recovery
  group_replication_after_recovery,            //after recovery
  group_replication_before_server_shutdown,    //before shutdown
  group_replication_after_server_shutdown,     //after shutdown
};

但其实group_replication并不关心这些操作,所以回调函数基本直接返回。


/*
  DBMS lifecycle events observers.
*/
int group_replication_before_handle_connection(Server_state_param *param)
{
  if (wait_on_engine_initialization)
  {
    delayed_initialization_thread->signal_thread_ready();
    delayed_initialization_thread->wait_for_read_mode();
  }
  return 0;
}

int group_replication_before_recovery(Server_state_param *param)
{
  return 0;
}

int group_replication_after_engine_recovery(Server_state_param *param)
{
  return 0;
}

int group_replication_after_recovery(Server_state_param *param)
{
  return 0;
}

int group_replication_before_server_shutdown(Server_state_param *param)
{
  return 0;
}

int group_replication_after_server_shutdown(Server_state_param *param)
{
  server_shutdown_status= true;
  plugin_group_replication_stop();

  return 0;
}

在用户链接时,需要做如下操作, 暂时没有细究,应该是在切换,或者初始化时等待状态的切换完成。

int group_replication_before_handle_connection(Server_state_param *param)
{
  if (wait_on_engine_initialization)
  {
    delayed_initialization_thread->signal_thread_ready();
    delayed_initialization_thread->wait_for_read_mode();
  }
  return 0;
}

在数据库shutdown时,需要关闭MGR插件,所以执行函数plugin_group_replication_stop().

int group_replication_after_server_shutdown(Server_state_param *param)
{
  server_shutdown_status= true;
  plugin_group_replication_stop();

  return 0;
}

4.2 Register a transaction observer

注册事务操作回调

  if (register_trans_observer(&trans_observer, (void *)plugin_info_ptr))
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL,
                "Failure when registering the transactions state observers");
    return 1;
    /* purecov: end */
  }

事务操作的前后需要进行的回调操作,如下,主要的功能点也在这里实现。

Trans_observer trans_observer = {
  sizeof(Trans_observer),

  group_replication_trans_before_dml,
  group_replication_trans_before_commit,
  group_replication_trans_before_rollback,
  group_replication_trans_after_commit,
  group_replication_trans_after_rollback,
};
  • group_replication_trans_before_dml
    在进行dml操作前,MGR需要提前进行一些检查操作,如下

image

  • group_replication_trans_before_commit
    更多的一致性实现过程在group_replication_trans_before_commit中。

4.3 Register a binlog transmit observer

这部分注册,用于在binlog的发送过程中进行回调

  if (register_binlog_transmit_observer(&binlog_transmit_observer,
                                        (void *)plugin_info_ptr))
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL,
                "Failure when registering the binlog state observers");
    return 1;
    /* purecov: end */
  }

这部分分为如下几种情况,在semisync插件中这部分应用的很多,但是在group_replication插件中,这部分基本没有使用,也就是回调函数不做操作。只有group_replication_reset_master_logs,设置了一个内部的变量known_server_reset;

Binlog_transmit_observer binlog_transmit_observer = {
  sizeof(Binlog_transmit_observer),

  group_replication_transmit_start,     // transmit_start,
  group_replication_transmit_stop,      // transmit_stop,
  group_replication_reserve_header,     // reserve_header,
  group_replication_before_send_event,  // before_send_event,
  group_replication_after_send_event,   // after_send_event,
  group_replication_reset_master_logs   // reset_master
};

五、其它的初始化操作

  //Initialize the recovery SSL option map
  initialize_ssl_option_map();

  //Initialize channel observation and auto increment handlers before start
  auto_increment_handler= new Plugin_group_replication_auto_increment();
  channel_observation_manager= new Channel_observation_manager(plugin_info);
  view_change_notifier= new Plugin_gcs_view_modification_notifier();
  gcs_module= new Gcs_operations();

  //Initialize the compatibility module before starting
  init_compatibility_manager();

另外如果开启了初始化时启动mgr,则会调用启动函数

 plugin_is_auto_starting= start_group_replication_at_boot_var;
  if (start_group_replication_at_boot_var && plugin_group_replication_start())
  {
    log_message(MY_ERROR_LEVEL,
                "Unable to start Group Replication on boot");
  }
相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
Python
FastAPI(35)- 依赖项中使用 yield + Context Manager 上下文管理器
FastAPI(35)- 依赖项中使用 yield + Context Manager 上下文管理器
334 0
|
缓存 数据库
项目启动时执行指定任务如何实现?
项目启动时执行指定任务如何实现?
项目启动时执行指定任务如何实现?
|
缓存 JavaScript 开发者
require 函数加载模块原理(被加载的模块会先执行一次)|学习笔记
快速学习 require 函数加载模块原理(被加载的模块会先执行一次)
require 函数加载模块原理(被加载的模块会先执行一次)|学习笔记
|
SQL 存储 缓存
如何在SpringBoot启动时执行初始化操作,两个简单接口就可以实现
最近遇到一个功能点,数据库中一张很简单的表有一千多条数据,这里的数据主要做到了值域映射的作用,简单来讲就是我可以通过中文名拿到数据库中对应的code值。原本的实现方式是每次用到之后去查一次sql,虽然不会有什么问题,但是只要是走了网络io,都会消耗时间。所以这个方案需要想办法优化。 优化的方式其实很简单,数据量不多,一千多条数据放在内存里也占不了多少空间。因此完全可以把一次性把数据加载到内存中,后面只需要每次去内存里调用就可以了。
|
Arthas Java 测试技术
JAR冲突问题的解决以及运行状态下如何查看加载的类
JAR冲突问题的解决以及运行状态下如何查看加载的类
251 0
|
前端开发 Java 应用服务中间件
Tomcat源码分析----初始化与启动
在阅读tomcat源码前,我们一般都会有如下几个疑问: - web容器和servlet容器的区别是什么; - 在springMVC中的web.xml是什么时候加载到tomcat中的; - tomcat是怎么加载我们的web服务的; - tomcat是怎么实现的热部署; - 一个http请求
19267 0
|
XML 应用服务中间件 数据格式
Tomcat源码分析----Container初始化与加载
在本章节中,以下几个问题会被回答:web容器和servlet容器的区别是什么;在springMVC中的web.xml是什么时候加载到tomcat中的;tomcat是怎么加载我们的web服务的;tomcat是怎么实现的热部署;
8435 0
|
Java Spring 缓存
Java项目启动时执行指定方法的几种方式
版权声明:本文为博主原创文章,未经博主允许不得转载。博客源地址为zhixiang.org.cn https://blog.csdn.net/myFirstCN/article/details/81750854 很多时候我们都会碰到需要在程序启动时去执行的方法,比如说去读取某个配置,预加载缓存,定时任务的初始化等。
2725 0