MGR插件初始化过程分析

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: [TOC] # MySQL Group Replication插件初始化过程 一、INSTALL PLUGIN group_replication SONAME 'group_replication.

一、INSTALL PLUGIN group_replication SONAME 'group_replication.so'

其实在MySQL中,不同的插件初始化过程都很类似,server 层的调用栈如下

(lldb) bt
* thread #20, stop reason = step over
  * frame #0: 0x000000011d4a6282 group_replication.so`plugin_group_replication_init(plugin_info=0x00007fb910815d38) at plugin.cc:1027
    frame #1: 0x00000001015e2ee4 mysqld`plugin_initialize(plugin=0x00007fb910815d38) at sql_plugin.cc:1244
    frame #2: 0x00000001015e7803 mysqld`mysql_install_plugin(thd=0x00007fb913859a00, name=0x00007fb91009fe18, dl=0x00007fb91009fe28) at sql_plugin.cc:2224
    frame #3: 0x00000001015e73e3 mysqld`Sql_cmd_install_plugin::execute(this=0x00007fb91009fe10, thd=0x00007fb913859a00) at sql_plugin.cc:4435
    frame #4: 0x00000001015abcc4 mysqld`mysql_execute_command(thd=0x00007fb913859a00, first_level=true) at sql_parse.cc:4835
    frame #5: 0x00000001015a10f5 mysqld`mysql_parse(thd=0x00007fb913859a00, parser_state=0x0000700001a95250) at sql_parse.cc:5570
    frame #6: 0x000000010159ded8 mysqld`dispatch_command(thd=0x00007fb913859a00, com_data=0x0000700001a95db8, command=COM_QUERY) at sql_parse.cc:1484
    frame #7: 0x00000001015a0216 mysqld`do_command(thd=0x00007fb913859a00) at sql_parse.cc:1025
    frame #8: 0x0000000101738c40 mysqld`::handle_connection(arg=0x00007fb91109f890) at connection_handler_per_thread.cc:300
    frame #9: 0x0000000101f35ffc mysqld`::pfs_spawn_thread(arg=0x00007fb911004ae0) at pfs.cc:2190
    frame #10: 0x00007fff784c2661 libsystem_pthread.dylib`_pthread_body + 340
    frame #11: 0x00007fff784c250d libsystem_pthread.dylib`_pthread_start + 377
    frame #12: 0x00007fff784c1bf9 libsystem_pthread.dylib`thread_start + 13

所有的插件初始化都会定义如下的初始化函数,对应到group_replication插件中如下

int plugin_group_replication_init(MYSQL_PLUGIN plugin_info)

初始化的过程如下:

二、 如果开启了performance_schema,注册相关监控项

  // Register all PSI keys at the time plugin init
#ifdef HAVE_PSI_INTERFACE
  register_all_group_replication_psi_keys();
#endif /* HAVE_PSI_INTERFACE */

register_all_group_replication_psi_keys();

void register_all_group_replication_psi_keys()
{
  register_group_replication_mutex_psi_keys(all_group_replication_psi_mutex_keys,
                                            array_elements(all_group_replication_psi_mutex_keys));
  register_group_replication_cond_psi_keys(all_group_replication_psi_condition_keys,
                                           array_elements(all_group_replication_psi_condition_keys));
  register_group_replication_thread_psi_keys(all_group_replication_psi_thread_keys,
                                             array_elements(all_group_replication_psi_thread_keys));
  register_group_replication_rwlock_psi_keys(all_group_replication_psi_rwlock_keys,
                                             array_elements(all_group_replication_psi_rwlock_keys));
}

2.1 注册mutex 监控项,在performance_schema开启的情况下,就可以在对应的表中查看这些mutex的占用情况。

static PSI_mutex_info all_group_replication_psi_mutex_keys[]=
{
  {&key_GR_LOCK_applier_module_run, "LOCK_applier_module_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_applier_module_suspend, "LOCK_applier_module_suspend", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_cert_broadcast_run, "LOCK_certifier_broadcast_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_cert_broadcast_dispatcher_run, "LOCK_certifier_broadcast_dispatcher_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_certification_info, "LOCK_certification_info", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_cert_members, "LOCK_certification_members", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_channel_observation_list, "LOCK_channel_observation_list", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_delayed_init_run, "LOCK_delayed_init_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_delayed_init_server_ready, "LOCK_delayed_init_server_ready", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_group_part_handler_run, "key_GR_LOCK_group_part_handler_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_group_part_handler_abort, "key_GR_LOCK_group_part_handler_abort", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_view_modification_wait, "LOCK_view_modification_wait", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_group_info_manager, "LOCK_group_info_manager", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_pipeline_continuation, "LOCK_pipeline_continuation", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_synchronized_queue, "LOCK_synchronized_queue", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_count_down_latch, "LOCK_count_down_latch", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_wait_ticket, "LOCK_wait_ticket", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_recovery_module_run, "LOCK_recovery_module_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_recovery, "LOCK_recovery", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_recovery_donor_selection, "LOCK_recovery_donor_selection", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_session_thread_method_exec, "LOCK_session_thread_method_exec", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_session_thread_run, "LOCK_session_thread_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_plugin_running, "LOCK_plugin_running", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_force_members_running, "LOCK_force_members_running", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_write_lock_protection, "LOCK_write_lock_protection", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_pipeline_stats_flow_control, "LOCK_pipeline_stats_flow_control", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_pipeline_stats_transactions_waiting_apply, "LOCK_pipeline_stats_transactions_waiting_apply", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_trx_unlocking, "LOCK_transaction_unblocking", PSI_FLAG_GLOBAL}
};

2.2 注册条件变量监控

  register_group_replication_cond_psi_keys(all_group_replication_psi_condition_keys,
                                           array_elements(all_group_replication_psi_condition_keys));

group_replication涉及到的条件变量有如下

static PSI_cond_info all_group_replication_psi_condition_keys[]=
{
  {&key_GR_COND_applier_module_run, "COND_applier_module_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_applier_module_suspend,  "COND_applier_module_suspend", PSI_FLAG_GLOBAL},
  {&key_GR_COND_applier_module_wait, "COND_applier_module_wait", PSI_FLAG_GLOBAL},
  {&key_GR_COND_cert_broadcast_run, "COND_certifier_broadcast_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_cert_broadcast_dispatcher_run, "COND_certifier_broadcast_dispatcher_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_delayed_init_run,  "COND_delayed_init_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_delayed_init_server_ready, "COND_delayed_init_server_ready", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_group_part_handler_run, "COND_group_part_handler_run", PSI_FLAG_GLOBAL},
  {&key_GR_LOCK_group_part_handler_abort, "COND_group_part_handler_abort", PSI_FLAG_GLOBAL},
  {&key_GR_COND_view_modification_wait, "COND_view_modification_wait", PSI_FLAG_GLOBAL},
  {&key_GR_COND_pipeline_continuation, "COND_pipeline_continuation", PSI_FLAG_GLOBAL},
  {&key_GR_COND_synchronized_queue, "COND_synchronized_queue", PSI_FLAG_GLOBAL},
  {&key_GR_COND_count_down_latch, "COND_count_down_latch", PSI_FLAG_GLOBAL},
  {&key_GR_COND_wait_ticket, "COND_wait_ticket", PSI_FLAG_GLOBAL},
  {&key_GR_COND_recovery_module_run, "COND_recovery_module_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_recovery, "COND_recovery", PSI_FLAG_GLOBAL},
  {&key_GR_COND_session_thread_method_exec, "COND_session_thread_method_exec", PSI_FLAG_GLOBAL},
  {&key_GR_COND_session_thread_run, "COND_session_thread_run", PSI_FLAG_GLOBAL},
  {&key_GR_COND_pipeline_stats_flow_control, "COND_pipeline_stats_flow_control", PSI_FLAG_GLOBAL},
};

2.3 注册group_replication相关的线程

register_group_replication_thread_psi_keys(all_group_replication_psi_thread_keys,
                                             array_elements(all_group_replication_psi_thread_keys));

group_replication中涉及到如下线程

static PSI_thread_info all_group_replication_psi_thread_keys[]=
{
  {&key_GR_THD_applier_module_receiver, "THD_applier_module_receiver", PSI_FLAG_GLOBAL},
  {&key_GR_THD_cert_broadcast, "THD_certifier_broadcast", PSI_FLAG_GLOBAL},
  {&key_GR_THD_delayed_init, "THD_delayed_initialization", PSI_FLAG_GLOBAL},
  {&key_GR_THD_plugin_session, "THD_plugin_server_session", PSI_FLAG_GLOBAL},
  {&key_GR_THD_group_partition_handler, "THD_group_partition_handler", PSI_FLAG_GLOBAL},
  {&key_GR_THD_recovery, "THD_recovery", PSI_FLAG_GLOBAL}
};

2.4 注册读写锁监控

register_group_replication_rwlock_psi_keys(all_group_replication_psi_rwlock_keys,
                                             array_elements(all_group_replication_psi_rwlock_keys));

涉及到的读写锁有如下

static PSI_rwlock_info all_group_replication_psi_rwlock_keys[]=
{
  {&key_GR_RWLOCK_cert_stable_gtid_set, "RWLOCK_certifier_stable_gtid_set", PSI_FLAG_GLOBAL},
  {&key_GR_RWLOCK_io_cache_unused_list , "RWLOCK_io_cache_unused_list", PSI_FLAG_GLOBAL},
  {&key_GR_RWLOCK_plugin_stop, "RWLOCK_plugin_stop", PSI_FLAG_GLOBAL},
  {&key_GR_RWLOCK_gcs_operations, "RWLOCK_gcs_operations", PSI_FLAG_GLOBAL},
  {&key_GR_RWLOCK_gcs_operations_finalize_ongoing, "RWLOCK_gcs_operations_finalize_ongoing", PSI_FLAG_GLOBAL}
};

三、初始化会使用到的互斥锁,读写锁等

  mysql_mutex_init(key_GR_LOCK_plugin_running, &plugin_running_mutex,
                   MY_MUTEX_INIT_FAST);
  mysql_mutex_init(key_GR_LOCK_force_members_running,
                   &force_members_running_mutex,
                   MY_MUTEX_INIT_FAST);

  plugin_stop_lock= new Checkable_rwlock(
#ifdef HAVE_PSI_INTERFACE
                                         key_GR_RWLOCK_plugin_stop
#endif /* HAVE_PSI_INTERFACE */
                                        );

  shared_plugin_stop_lock= new Shared_writelock(plugin_stop_lock);
  • plugin_running_mutex

负责保护变量group_replication_running

  • force_members_running_mutex

负责保护force_members_running

四、作为观察者向server注册回调函数

注册的目的是为了在某些过程中的回调。

4.1 注册server_state_observer

server state observer主要用于实现,在数据库启动停止时的回调动作。如下:

Server_state_observer server_state_observer = {
  sizeof(Server_state_observer),

  group_replication_before_handle_connection,  //before the client connects to the server
  group_replication_before_recovery,           //before recovery
  group_replication_after_engine_recovery,     //after engine recovery
  group_replication_after_recovery,            //after recovery
  group_replication_before_server_shutdown,    //before shutdown
  group_replication_after_server_shutdown,     //after shutdown
};

但其实group_replication并不关心这些操作,所以回调函数基本直接返回。


/*
  DBMS lifecycle events observers.
*/
int group_replication_before_handle_connection(Server_state_param *param)
{
  if (wait_on_engine_initialization)
  {
    delayed_initialization_thread->signal_thread_ready();
    delayed_initialization_thread->wait_for_read_mode();
  }
  return 0;
}

int group_replication_before_recovery(Server_state_param *param)
{
  return 0;
}

int group_replication_after_engine_recovery(Server_state_param *param)
{
  return 0;
}

int group_replication_after_recovery(Server_state_param *param)
{
  return 0;
}

int group_replication_before_server_shutdown(Server_state_param *param)
{
  return 0;
}

int group_replication_after_server_shutdown(Server_state_param *param)
{
  server_shutdown_status= true;
  plugin_group_replication_stop();

  return 0;
}

在用户链接时,需要做如下操作, 暂时没有细究,应该是在切换,或者初始化时等待状态的切换完成。

int group_replication_before_handle_connection(Server_state_param *param)
{
  if (wait_on_engine_initialization)
  {
    delayed_initialization_thread->signal_thread_ready();
    delayed_initialization_thread->wait_for_read_mode();
  }
  return 0;
}

在数据库shutdown时,需要关闭MGR插件,所以执行函数plugin_group_replication_stop().

int group_replication_after_server_shutdown(Server_state_param *param)
{
  server_shutdown_status= true;
  plugin_group_replication_stop();

  return 0;
}

4.2 Register a transaction observer

注册事务操作回调

  if (register_trans_observer(&trans_observer, (void *)plugin_info_ptr))
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL,
                "Failure when registering the transactions state observers");
    return 1;
    /* purecov: end */
  }

事务操作的前后需要进行的回调操作,如下,主要的功能点也在这里实现。

Trans_observer trans_observer = {
  sizeof(Trans_observer),

  group_replication_trans_before_dml,
  group_replication_trans_before_commit,
  group_replication_trans_before_rollback,
  group_replication_trans_after_commit,
  group_replication_trans_after_rollback,
};
  • group_replication_trans_before_dml
    在进行dml操作前,MGR需要提前进行一些检查操作,如下

image

  • group_replication_trans_before_commit
    更多的一致性实现过程在group_replication_trans_before_commit中。

4.3 Register a binlog transmit observer

这部分注册,用于在binlog的发送过程中进行回调

  if (register_binlog_transmit_observer(&binlog_transmit_observer,
                                        (void *)plugin_info_ptr))
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL,
                "Failure when registering the binlog state observers");
    return 1;
    /* purecov: end */
  }

这部分分为如下几种情况,在semisync插件中这部分应用的很多,但是在group_replication插件中,这部分基本没有使用,也就是回调函数不做操作。只有group_replication_reset_master_logs,设置了一个内部的变量known_server_reset;

Binlog_transmit_observer binlog_transmit_observer = {
  sizeof(Binlog_transmit_observer),

  group_replication_transmit_start,     // transmit_start,
  group_replication_transmit_stop,      // transmit_stop,
  group_replication_reserve_header,     // reserve_header,
  group_replication_before_send_event,  // before_send_event,
  group_replication_after_send_event,   // after_send_event,
  group_replication_reset_master_logs   // reset_master
};

五、其它的初始化操作

  //Initialize the recovery SSL option map
  initialize_ssl_option_map();

  //Initialize channel observation and auto increment handlers before start
  auto_increment_handler= new Plugin_group_replication_auto_increment();
  channel_observation_manager= new Channel_observation_manager(plugin_info);
  view_change_notifier= new Plugin_gcs_view_modification_notifier();
  gcs_module= new Gcs_operations();

  //Initialize the compatibility module before starting
  init_compatibility_manager();

另外如果开启了初始化时启动mgr,则会调用启动函数

 plugin_is_auto_starting= start_group_replication_at_boot_var;
  if (start_group_replication_at_boot_var && plugin_group_replication_start())
  {
    log_message(MY_ERROR_LEVEL,
                "Unable to start Group Replication on boot");
  }
相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
5月前
|
Java
SpringBoot关闭过程中是如何销毁一个DisposableBean的?
SpringBoot关闭过程中是如何销毁一个DisposableBean的?
31 0
|
缓存 数据库
项目启动时执行指定任务如何实现?
项目启动时执行指定任务如何实现?
项目启动时执行指定任务如何实现?
|
Java Android开发
JobService源码探究之 onStartJob()里执行耗时逻辑导致Job可能被强制销毁
JobService源码探究之 onStartJob()里执行耗时逻辑导致Job可能被强制销毁
|
Dart IDE 开发工具
Flutter框架对热重载在项目里的深度运用,状态热重新加载以及静态字段被延迟初始化【Flutter】
Flutter框架对热重载在项目里的深度运用,状态热重新加载以及静态字段被延迟初始化【Flutter】
|
SQL 存储 缓存
如何在SpringBoot启动时执行初始化操作,两个简单接口就可以实现
最近遇到一个功能点,数据库中一张很简单的表有一千多条数据,这里的数据主要做到了值域映射的作用,简单来讲就是我可以通过中文名拿到数据库中对应的code值。原本的实现方式是每次用到之后去查一次sql,虽然不会有什么问题,但是只要是走了网络io,都会消耗时间。所以这个方案需要想办法优化。 优化的方式其实很简单,数据量不多,一千多条数据放在内存里也占不了多少空间。因此完全可以把一次性把数据加载到内存中,后面只需要每次去内存里调用就可以了。
|
数据安全/隐私保护 容器
在 Flutter 失败时重新运行你的启动逻辑
本文主要介绍在 Flutter 失败时重新运行你的启动逻辑 有时,应用程序必须在启动之前运行异步函数。像加密交易工具这样的东西必须在线,所以他们会在开始时提出登录请求,在线游戏也是如此,或者在我的情况下,一个应用程序在启动时从磁盘(或网络,如果这是第一次)。将它构建到 HomeView 中会很容易,但是一些应用程序,比如我的,根据配置
173 0
|
移动开发
Flutter 无状态小部件中启动时调用函数
本文主要介绍如何在 Flutter 无状态小部件中启动时调用函数 有没有想过如何从无状态小部件在 Flutter 启动时调用异步函数? 移动开发中最常见的场景之一是在显示新视图时调用异步函数。在 Flutter 中,这可以使用有状态的小部件并在initState函数中调用您的代码来完成。
186 0
|
Java
JVM类加载、验证、准备、解析、初始化、卸载过程详解(上)
JVM类加载、验证、准备、解析、初始化、卸载过程详解(上)
136 0
JVM类加载、验证、准备、解析、初始化、卸载过程详解(上)
|
存储 安全 Java
JVM类加载、验证、准备、解析、初始化、卸载过程详解(中)
JVM类加载、验证、准备、解析、初始化、卸载过程详解(中)
185 0
|
IDE Java 编译器
JVM类加载、验证、准备、解析、初始化、卸载过程详解(下)
JVM类加载、验证、准备、解析、初始化、卸载过程详解(下)
229 0
JVM类加载、验证、准备、解析、初始化、卸载过程详解(下)