MGR插件初始化过程分析

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: [TOC] # MySQL Group Replication插件初始化过程 一、INSTALL PLUGIN group_replication SONAME 'group_replication.

一、INSTALL PLUGIN group_replication SONAME 'group_replication.so'

其实在MySQL中,不同的插件初始化过程都很类似,server 层的调用栈如下

(lldb) bt
* thread #20, stop reason = step over
  * frame #0: 0x000000011d4a6282 group_replication.so`plugin_group_replication_init(plugin_info=0x00007fb910815d38) at plugin.cc:1027
    frame #1: 0x00000001015e2ee4 mysqld`plugin_initialize(plugin=0x00007fb910815d38) at sql_plugin.cc:1244
    frame #2: 0x00000001015e7803 mysqld`mysql_install_plugin(thd=0x00007fb913859a00, name=0x00007fb91009fe18, dl=0x00007fb91009fe28) at sql_plugin.cc:2224
    frame #3: 0x00000001015e73e3 mysqld`Sql_cmd_install_plugin::execute(this=0x00007fb91009fe10, thd=0x00007fb913859a00) at sql_plugin.cc:4435
    frame #4: 0x00000001015abcc4 mysqld`mysql_execute_command(thd=0x00007fb913859a00, first_level=true) at sql_parse.cc:4835
    frame #5: 0x00000001015a10f5 mysqld`mysql_parse(thd=0x00007fb913859a00, parser_state=0x0000700001a95250) at sql_parse.cc:5570
    frame #6: 0x000000010159ded8 mysqld`dispatch_command(thd=0x00007fb913859a00, com_data=0x0000700001a95db8, command=COM_QUERY) at sql_parse.cc:1484
    frame #7: 0x00000001015a0216 mysqld`do_command(thd=0x00007fb913859a00) at sql_parse.cc:1025
    frame #8: 0x0000000101738c40 mysqld`::handle_connection(arg=0x00007fb91109f890) at connection_handler_per_thread.cc:300
    frame #9: 0x0000000101f35ffc mysqld`::pfs_spawn_thread(arg=0x00007fb911004ae0) at pfs.cc:2190
    frame #10: 0x00007fff784c2661 libsystem_pthread.dylib`_pthread_body + 340
    frame #11: 0x00007fff784c250d libsystem_pthread.dylib`_pthread_start + 377
    frame #12: 0x00007fff784c1bf9 libsystem_pthread.dylib`thread_start + 13

所有的插件初始化都会定义如下的初始化函数,对应到group_replication插件中如下

int plugin_group_replication_init(MYSQL_PLUGIN plugin_info)

初始化的过程如下:

二、 如果开启了performance_schema,注册相关监控项

  // Register all PSI keys at the time plugin init
#ifdef HAVE_PSI_INTERFACE
  register_all_group_replication_psi_keys();
#endif /* HAVE_PSI_INTERFACE */

register_all_group_replication_psi_keys();

void register_all_group_replication_psi_keys()
{
  register_group_replication_mutex_psi_keys(all_group_replication_psi_mutex_keys,
                                            array_elements(all_group_replication_psi_mutex_keys));
  register_group_replication_cond_psi_keys(all_group_replication_psi_condition_keys,
                                           array_elements(all_group_replication_psi_condition_keys));
  register_group_replication_thread_psi_keys(all_group_replication_psi_thread_keys,
                                             array_elements(all_group_replication_psi_thread_keys));
  register_group_replication_rwlock_psi_keys(all_group_replication_psi_rwlock_keys,
                                             array_elements(all_group_replication_psi_rwlock_keys));
}

2.1 注册mutex 监控项,在performance_schema开启的情况下,就可以在对应的表中查看这些mutex的占用情况。

static PSI_mutex_info all_group_replication_psi_mutex_keys[]=
{
  {&key_GR_LOCK_applier_module_run, "LOCK_applier_module_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_applier_module_suspend, "LOCK_applier_module_suspend", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_cert_broadcast_run, "LOCK_certifier_broadcast_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_cert_broadcast_dispatcher_run, "LOCK_certifier_broadcast_dispatcher_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_certification_info, "LOCK_certification_info", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_cert_members, "LOCK_certification_members", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_channel_observation_list, "LOCK_channel_observation_list", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_delayed_init_run, "LOCK_delayed_init_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_delayed_init_server_ready, "LOCK_delayed_init_server_ready", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_group_part_handler_run, "key_GR_LOCK_group_part_handler_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_group_part_handler_abort, "key_GR_LOCK_group_part_handler_abort", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_view_modification_wait, "LOCK_view_modification_wait", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_group_info_manager, "LOCK_group_info_manager", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_pipeline_continuation, "LOCK_pipeline_continuation", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_synchronized_queue, "LOCK_synchronized_queue", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_count_down_latch, "LOCK_count_down_latch", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_wait_ticket, "LOCK_wait_ticket", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_recovery_module_run, "LOCK_recovery_module_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_recovery, "LOCK_recovery", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_recovery_donor_selection, "LOCK_recovery_donor_selection", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_session_thread_method_exec, "LOCK_session_thread_method_exec", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_session_thread_run, "LOCK_session_thread_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_plugin_running, "LOCK_plugin_running", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_force_members_running, "LOCK_force_members_running", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_write_lock_protection, "LOCK_write_lock_protection", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_pipeline_stats_flow_control, "LOCK_pipeline_stats_flow_control", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_pipeline_stats_transactions_waiting_apply, "LOCK_pipeline_stats_transactions_waiting_apply", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_trx_unlocking, "LOCK_transaction_unblocking", PSI_FLAG_GLOBAL}
};

2.2 注册条件变量监控

  register_group_replication_cond_psi_keys(all_group_replication_psi_condition_keys,
                                           array_elements(all_group_replication_psi_condition_keys));

group_replication涉及到的条件变量有如下

static PSI_cond_info all_group_replication_psi_condition_keys[]=
{
  {&key_GR_COND_applier_module_run, "COND_applier_module_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_applier_module_suspend,  "COND_applier_module_suspend", PSI_FLAG_GLOBAL},
  {&key_GR_COND_applier_module_wait, "COND_applier_module_wait", PSI_FLAG_GLOBAL},
  {&key_GR_COND_cert_broadcast_run, "COND_certifier_broadcast_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_cert_broadcast_dispatcher_run, "COND_certifier_broadcast_dispatcher_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_delayed_init_run,  "COND_delayed_init_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_delayed_init_server_ready, "COND_delayed_init_server_ready", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_group_part_handler_run, "COND_group_part_handler_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_group_part_handler_abort, "COND_group_part_handler_abort", PSI_FLAG_GLOBAL},
  {&key_GR_COND_view_modification_wait, "COND_view_modification_wait", PSI_FLAG_GLOBAL},
  {&key_GR_COND_pipeline_continuation, "COND_pipeline_continuation", PSI_FLAG_GLOBAL},
  {&key_GR_COND_synchronized_queue, "COND_synchronized_queue", PSI_FLAG_GLOBAL},
  {&key_GR_COND_count_down_latch, "COND_count_down_latch", PSI_FLAG_GLOBAL},
  {&key_GR_COND_wait_ticket, "COND_wait_ticket", PSI_FLAG_GLOBAL},
  {&key_GR_COND_recovery_module_run, "COND_recovery_module_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_recovery, "COND_recovery", PSI_FLAG_GLOBAL},
  {&key_GR_COND_session_thread_method_exec, "COND_session_thread_method_exec", PSI_FLAG_GLOBAL},
  {&key_GR_COND_session_thread_run, "COND_session_thread_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_pipeline_stats_flow_control, "COND_pipeline_stats_flow_control", PSI_FLAG_GLOBAL},
};

2.3 注册group_replication相关的线程

register_group_replication_thread_psi_keys(all_group_replication_psi_thread_keys,
                                             array_elements(all_group_replication_psi_thread_keys));

group_replication中涉及到如下线程

static PSI_thread_info all_group_replication_psi_thread_keys[]=
{
  {&key_GR_THD_applier_module_receiver, "THD_applier_module_receiver", PSI_FLAG_GLOBAL},
  {&key_GR_THD_cert_broadcast, "THD_certifier_broadcast", PSI_FLAG_GLOBAL},
  {&key_GR_THD_delayed_init, "THD_delayed_initialization", PSI_FLAG_GLOBAL},
  {&key_GR_THD_plugin_session, "THD_plugin_server_session", PSI_FLAG_GLOBAL},
  {&key_GR_THD_group_partition_handler, "THD_group_partition_handler", PSI_FLAG_GLOBAL},
  {&key_GR_THD_recovery, "THD_recovery", PSI_FLAG_GLOBAL}
};

2.4 注册读写锁监控

register_group_replication_rwlock_psi_keys(all_group_replication_psi_rwlock_keys,
                                             array_elements(all_group_replication_psi_rwlock_keys));

涉及到的读写锁有如下

static PSI_rwlock_info all_group_replication_psi_rwlock_keys[]=
{
  {&key_GR_RWLOCK_cert_stable_gtid_set, "RWLOCK_certifier_stable_gtid_set", PSI_FLAG_GLOBAL},
  {&key_GR_RWLOCK_io_cache_unused_list , "RWLOCK_io_cache_unused_list", PSI_FLAG_GLOBAL},
  {&key_GR_RWLOCK_plugin_stop, "RWLOCK_plugin_stop", PSI_FLAG_GLOBAL},
  {&key_GR_RWLOCK_gcs_operations, "RWLOCK_gcs_operations", PSI_FLAG_GLOBAL},
  {&key_GR_RWLOCK_gcs_operations_finalize_ongoing, "RWLOCK_gcs_operations_finalize_ongoing", PSI_FLAG_GLOBAL}
};

三、初始化会使用到的互斥锁,读写锁等

  mysql_mutex_init(key_GR_LOCK_plugin_running, &plugin_running_mutex,
                   MY_MUTEX_INIT_FAST);
  mysql_mutex_init(key_GR_LOCK_force_members_running,
                   &force_members_running_mutex,
                   MY_MUTEX_INIT_FAST);

  plugin_stop_lock= new Checkable_rwlock(
#ifdef HAVE_PSI_INTERFACE
                                         key_GR_RWLOCK_plugin_stop
#endif /* HAVE_PSI_INTERFACE */
                                        );

  shared_plugin_stop_lock= new Shared_writelock(plugin_stop_lock);
  • plugin_running_mutex

负责保护变量group_replication_running

  • force_members_running_mutex

负责保护force_members_running

四、作为观察者向server注册回调函数

注册的目的是为了在某些过程中的回调。

4.1 注册server_state_observer

server state observer主要用于实现,在数据库启动停止时的回调动作。如下:

Server_state_observer server_state_observer = {
  sizeof(Server_state_observer),

  group_replication_before_handle_connection,  //before the client connects to the server
  group_replication_before_recovery,           //before recovery
  group_replication_after_engine_recovery,     //after engine recovery
  group_replication_after_recovery,            //after recovery
  group_replication_before_server_shutdown,    //before shutdown
  group_replication_after_server_shutdown,     //after shutdown
};

但其实group_replication并不关心这些操作,所以回调函数基本直接返回。


/*
  DBMS lifecycle events observers.
*/
int group_replication_before_handle_connection(Server_state_param *param)
{
  if (wait_on_engine_initialization)
  {
    delayed_initialization_thread->signal_thread_ready();
    delayed_initialization_thread->wait_for_read_mode();
  }
  return 0;
}

int group_replication_before_recovery(Server_state_param *param)
{
  return 0;
}

int group_replication_after_engine_recovery(Server_state_param *param)
{
  return 0;
}

int group_replication_after_recovery(Server_state_param *param)
{
  return 0;
}

int group_replication_before_server_shutdown(Server_state_param *param)
{
  return 0;
}

int group_replication_after_server_shutdown(Server_state_param *param)
{
  server_shutdown_status= true;
  plugin_group_replication_stop();

  return 0;
}

在用户链接时,需要做如下操作, 暂时没有细究,应该是在切换,或者初始化时等待状态的切换完成。

int group_replication_before_handle_connection(Server_state_param *param)
{
  if (wait_on_engine_initialization)
  {
    delayed_initialization_thread->signal_thread_ready();
    delayed_initialization_thread->wait_for_read_mode();
  }
  return 0;
}

在数据库shutdown时,需要关闭MGR插件,所以执行函数plugin_group_replication_stop().

int group_replication_after_server_shutdown(Server_state_param *param)
{
  server_shutdown_status= true;
  plugin_group_replication_stop();

  return 0;
}

4.2 Register a transaction observer

注册事务操作回调

  if (register_trans_observer(&trans_observer, (void *)plugin_info_ptr))
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL,
                "Failure when registering the transactions state observers");
    return 1;
    /* purecov: end */
  }

事务操作的前后需要进行的回调操作,如下,主要的功能点也在这里实现。

Trans_observer trans_observer = {
  sizeof(Trans_observer),

  group_replication_trans_before_dml,
  group_replication_trans_before_commit,
  group_replication_trans_before_rollback,
  group_replication_trans_after_commit,
  group_replication_trans_after_rollback,
};
  • group_replication_trans_before_dml
    在进行dml操作前,MGR需要提前进行一些检查操作,如下

image

  • group_replication_trans_before_commit
    更多的一致性实现过程在group_replication_trans_before_commit中。

4.3 Register a binlog transmit observer

这部分注册,用于在binlog的发送过程中进行回调

  if (register_binlog_transmit_observer(&binlog_transmit_observer,
                                        (void *)plugin_info_ptr))
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL,
                "Failure when registering the binlog state observers");
    return 1;
    /* purecov: end */
  }

这部分分为如下几种情况,在semisync插件中这部分应用的很多,但是在group_replication插件中,这部分基本没有使用,也就是回调函数不做操作。只有group_replication_reset_master_logs,设置了一个内部的变量known_server_reset;

Binlog_transmit_observer binlog_transmit_observer = {
  sizeof(Binlog_transmit_observer),

  group_replication_transmit_start,     // transmit_start,
  group_replication_transmit_stop,      // transmit_stop,
  group_replication_reserve_header,     // reserve_header,
  group_replication_before_send_event,  // before_send_event,
  group_replication_after_send_event,   // after_send_event,
  group_replication_reset_master_logs   // reset_master
};

五、其它的初始化操作

  //Initialize the recovery SSL option map
  initialize_ssl_option_map();

  //Initialize channel observation and auto increment handlers before start
  auto_increment_handler= new Plugin_group_replication_auto_increment();
  channel_observation_manager= new Channel_observation_manager(plugin_info);
  view_change_notifier= new Plugin_gcs_view_modification_notifier();
  gcs_module= new Gcs_operations();

  //Initialize the compatibility module before starting
  init_compatibility_manager();

另外如果开启了初始化时启动mgr,则会调用启动函数

 plugin_is_auto_starting= start_group_replication_at_boot_var;
  if (start_group_replication_at_boot_var && plugin_group_replication_start())
  {
    log_message(MY_ERROR_LEVEL,
                "Unable to start Group Replication on boot");
  }
相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
6月前
|
Java
SpringBoot关闭过程中是如何销毁一个DisposableBean的?
SpringBoot关闭过程中是如何销毁一个DisposableBean的?
59 0
|
12月前
|
小程序 前端开发 JavaScript
小程序框架及小程序生命周期----超详细
小程序框架及小程序生命周期----超详细
107 0
quartz(二)动态增删改查停止启用job
quartz(二)动态增删改查停止启用job
60 0
|
缓存 数据库
项目启动时执行指定任务如何实现?
项目启动时执行指定任务如何实现?
项目启动时执行指定任务如何实现?
|
Java Android开发
JobService源码探究之 onStartJob()里执行耗时逻辑导致Job可能被强制销毁
JobService源码探究之 onStartJob()里执行耗时逻辑导致Job可能被强制销毁
为什么APF框架初始化时有两个一模一样的analyticConfiguration请求
为什么APF框架初始化时有两个一模一样的analyticConfiguration请求
108 0
为什么APF框架初始化时有两个一模一样的analyticConfiguration请求
|
jvm-sandbox Shell Java
Jvm-Sandbox源码分析--启动时加载模块
在上一篇Jvm-Sandbox源码分析--启动简析 简单介绍了一下jvm-sandbox启动流程,在这篇文章中我们来分析一下系统模块和用户的自定义模块在启动时,是怎么加载的。
4428 0
|
前端开发 Java 应用服务中间件
Tomcat源码分析----初始化与启动
在阅读tomcat源码前,我们一般都会有如下几个疑问: - web容器和servlet容器的区别是什么; - 在springMVC中的web.xml是什么时候加载到tomcat中的; - tomcat是怎么加载我们的web服务的; - tomcat是怎么实现的热部署; - 一个http请求
19264 0
|
XML 应用服务中间件 数据格式
Tomcat源码分析----Container初始化与加载
在本章节中,以下几个问题会被回答:web容器和servlet容器的区别是什么;在springMVC中的web.xml是什么时候加载到tomcat中的;tomcat是怎么加载我们的web服务的;tomcat是怎么实现的热部署;
8432 0
|
Java Spring 缓存
Java项目启动时执行指定方法的几种方式
版权声明:本文为博主原创文章,未经博主允许不得转载。博客源地址为zhixiang.org.cn https://blog.csdn.net/myFirstCN/article/details/81750854 很多时候我们都会碰到需要在程序启动时去执行的方法,比如说去读取某个配置,预加载缓存,定时任务的初始化等。
2718 0