一、INSTALL PLUGIN group_replication SONAME 'group_replication.so'
其实在MySQL中,不同的插件初始化过程都很类似,server 层的调用栈如下
(lldb) bt
* thread #20, stop reason = step over
* frame #0: 0x000000011d4a6282 group_replication.so`plugin_group_replication_init(plugin_info=0x00007fb910815d38) at plugin.cc:1027
frame #1: 0x00000001015e2ee4 mysqld`plugin_initialize(plugin=0x00007fb910815d38) at sql_plugin.cc:1244
frame #2: 0x00000001015e7803 mysqld`mysql_install_plugin(thd=0x00007fb913859a00, name=0x00007fb91009fe18, dl=0x00007fb91009fe28) at sql_plugin.cc:2224
frame #3: 0x00000001015e73e3 mysqld`Sql_cmd_install_plugin::execute(this=0x00007fb91009fe10, thd=0x00007fb913859a00) at sql_plugin.cc:4435
frame #4: 0x00000001015abcc4 mysqld`mysql_execute_command(thd=0x00007fb913859a00, first_level=true) at sql_parse.cc:4835
frame #5: 0x00000001015a10f5 mysqld`mysql_parse(thd=0x00007fb913859a00, parser_state=0x0000700001a95250) at sql_parse.cc:5570
frame #6: 0x000000010159ded8 mysqld`dispatch_command(thd=0x00007fb913859a00, com_data=0x0000700001a95db8, command=COM_QUERY) at sql_parse.cc:1484
frame #7: 0x00000001015a0216 mysqld`do_command(thd=0x00007fb913859a00) at sql_parse.cc:1025
frame #8: 0x0000000101738c40 mysqld`::handle_connection(arg=0x00007fb91109f890) at connection_handler_per_thread.cc:300
frame #9: 0x0000000101f35ffc mysqld`::pfs_spawn_thread(arg=0x00007fb911004ae0) at pfs.cc:2190
frame #10: 0x00007fff784c2661 libsystem_pthread.dylib`_pthread_body + 340
frame #11: 0x00007fff784c250d libsystem_pthread.dylib`_pthread_start + 377
frame #12: 0x00007fff784c1bf9 libsystem_pthread.dylib`thread_start + 13
所有的插件初始化都会定义如下的初始化函数,对应到group_replication插件中如下
int plugin_group_replication_init(MYSQL_PLUGIN plugin_info)
初始化的过程如下:
二、 如果开启了performance_schema,注册相关监控项
// Register all PSI keys at the time plugin init
#ifdef HAVE_PSI_INTERFACE
register_all_group_replication_psi_keys();
#endif /* HAVE_PSI_INTERFACE */
register_all_group_replication_psi_keys();
void register_all_group_replication_psi_keys()
{
register_group_replication_mutex_psi_keys(all_group_replication_psi_mutex_keys,
array_elements(all_group_replication_psi_mutex_keys));
register_group_replication_cond_psi_keys(all_group_replication_psi_condition_keys,
array_elements(all_group_replication_psi_condition_keys));
register_group_replication_thread_psi_keys(all_group_replication_psi_thread_keys,
array_elements(all_group_replication_psi_thread_keys));
register_group_replication_rwlock_psi_keys(all_group_replication_psi_rwlock_keys,
array_elements(all_group_replication_psi_rwlock_keys));
}
2.1 注册mutex 监控项,在performance_schema开启的情况下,就可以在对应的表中查看这些mutex的占用情况。
static PSI_mutex_info all_group_replication_psi_mutex_keys[]=
{
{&key_GR_LOCK_applier_module_run, "LOCK_applier_module_run", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_applier_module_suspend, "LOCK_applier_module_suspend", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_cert_broadcast_run, "LOCK_certifier_broadcast_run", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_cert_broadcast_dispatcher_run, "LOCK_certifier_broadcast_dispatcher_run", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_certification_info, "LOCK_certification_info", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_cert_members, "LOCK_certification_members", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_channel_observation_list, "LOCK_channel_observation_list", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_delayed_init_run, "LOCK_delayed_init_run", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_delayed_init_server_ready, "LOCK_delayed_init_server_ready", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_group_part_handler_run, "key_GR_LOCK_group_part_handler_run", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_group_part_handler_abort, "key_GR_LOCK_group_part_handler_abort", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_view_modification_wait, "LOCK_view_modification_wait", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_group_info_manager, "LOCK_group_info_manager", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_pipeline_continuation, "LOCK_pipeline_continuation", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_synchronized_queue, "LOCK_synchronized_queue", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_count_down_latch, "LOCK_count_down_latch", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_wait_ticket, "LOCK_wait_ticket", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_recovery_module_run, "LOCK_recovery_module_run", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_recovery, "LOCK_recovery", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_recovery_donor_selection, "LOCK_recovery_donor_selection", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_session_thread_method_exec, "LOCK_session_thread_method_exec", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_session_thread_run, "LOCK_session_thread_run", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_plugin_running, "LOCK_plugin_running", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_force_members_running, "LOCK_force_members_running", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_write_lock_protection, "LOCK_write_lock_protection", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_pipeline_stats_flow_control, "LOCK_pipeline_stats_flow_control", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_pipeline_stats_transactions_waiting_apply, "LOCK_pipeline_stats_transactions_waiting_apply", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_trx_unlocking, "LOCK_transaction_unblocking", PSI_FLAG_GLOBAL}
};
2.2 注册条件变量监控
register_group_replication_cond_psi_keys(all_group_replication_psi_condition_keys,
array_elements(all_group_replication_psi_condition_keys));
group_replication涉及到的条件变量有如下
static PSI_cond_info all_group_replication_psi_condition_keys[]=
{
{&key_GR_COND_applier_module_run, "COND_applier_module_run", PSI_FLAG_GLOBAL},
{&key_GR_COND_applier_module_suspend, "COND_applier_module_suspend", PSI_FLAG_GLOBAL},
{&key_GR_COND_applier_module_wait, "COND_applier_module_wait", PSI_FLAG_GLOBAL},
{&key_GR_COND_cert_broadcast_run, "COND_certifier_broadcast_run", PSI_FLAG_GLOBAL},
{&key_GR_COND_cert_broadcast_dispatcher_run, "COND_certifier_broadcast_dispatcher_run", PSI_FLAG_GLOBAL},
{&key_GR_COND_delayed_init_run, "COND_delayed_init_run", PSI_FLAG_GLOBAL},
{&key_GR_COND_delayed_init_server_ready, "COND_delayed_init_server_ready", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_group_part_handler_run, "COND_group_part_handler_run", PSI_FLAG_GLOBAL},
{&key_GR_LOCK_group_part_handler_abort, "COND_group_part_handler_abort", PSI_FLAG_GLOBAL},
{&key_GR_COND_view_modification_wait, "COND_view_modification_wait", PSI_FLAG_GLOBAL},
{&key_GR_COND_pipeline_continuation, "COND_pipeline_continuation", PSI_FLAG_GLOBAL},
{&key_GR_COND_synchronized_queue, "COND_synchronized_queue", PSI_FLAG_GLOBAL},
{&key_GR_COND_count_down_latch, "COND_count_down_latch", PSI_FLAG_GLOBAL},
{&key_GR_COND_wait_ticket, "COND_wait_ticket", PSI_FLAG_GLOBAL},
{&key_GR_COND_recovery_module_run, "COND_recovery_module_run", PSI_FLAG_GLOBAL},
{&key_GR_COND_recovery, "COND_recovery", PSI_FLAG_GLOBAL},
{&key_GR_COND_session_thread_method_exec, "COND_session_thread_method_exec", PSI_FLAG_GLOBAL},
{&key_GR_COND_session_thread_run, "COND_session_thread_run", PSI_FLAG_GLOBAL},
{&key_GR_COND_pipeline_stats_flow_control, "COND_pipeline_stats_flow_control", PSI_FLAG_GLOBAL},
};
2.3 注册group_replication相关的线程
register_group_replication_thread_psi_keys(all_group_replication_psi_thread_keys,
array_elements(all_group_replication_psi_thread_keys));
group_replication中涉及到如下线程
static PSI_thread_info all_group_replication_psi_thread_keys[]=
{
{&key_GR_THD_applier_module_receiver, "THD_applier_module_receiver", PSI_FLAG_GLOBAL},
{&key_GR_THD_cert_broadcast, "THD_certifier_broadcast", PSI_FLAG_GLOBAL},
{&key_GR_THD_delayed_init, "THD_delayed_initialization", PSI_FLAG_GLOBAL},
{&key_GR_THD_plugin_session, "THD_plugin_server_session", PSI_FLAG_GLOBAL},
{&key_GR_THD_group_partition_handler, "THD_group_partition_handler", PSI_FLAG_GLOBAL},
{&key_GR_THD_recovery, "THD_recovery", PSI_FLAG_GLOBAL}
};
2.4 注册读写锁监控
register_group_replication_rwlock_psi_keys(all_group_replication_psi_rwlock_keys,
array_elements(all_group_replication_psi_rwlock_keys));
涉及到的读写锁有如下
static PSI_rwlock_info all_group_replication_psi_rwlock_keys[]=
{
{&key_GR_RWLOCK_cert_stable_gtid_set, "RWLOCK_certifier_stable_gtid_set", PSI_FLAG_GLOBAL},
{&key_GR_RWLOCK_io_cache_unused_list , "RWLOCK_io_cache_unused_list", PSI_FLAG_GLOBAL},
{&key_GR_RWLOCK_plugin_stop, "RWLOCK_plugin_stop", PSI_FLAG_GLOBAL},
{&key_GR_RWLOCK_gcs_operations, "RWLOCK_gcs_operations", PSI_FLAG_GLOBAL},
{&key_GR_RWLOCK_gcs_operations_finalize_ongoing, "RWLOCK_gcs_operations_finalize_ongoing", PSI_FLAG_GLOBAL}
};
三、初始化会使用到的互斥锁,读写锁等
mysql_mutex_init(key_GR_LOCK_plugin_running, &plugin_running_mutex,
MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_GR_LOCK_force_members_running,
&force_members_running_mutex,
MY_MUTEX_INIT_FAST);
plugin_stop_lock= new Checkable_rwlock(
#ifdef HAVE_PSI_INTERFACE
key_GR_RWLOCK_plugin_stop
#endif /* HAVE_PSI_INTERFACE */
);
shared_plugin_stop_lock= new Shared_writelock(plugin_stop_lock);
- plugin_running_mutex
负责保护变量group_replication_running
- force_members_running_mutex
负责保护force_members_running
四、作为观察者向server注册回调函数
注册的目的是为了在某些过程中的回调。
4.1 注册server_state_observer
server state observer主要用于实现,在数据库启动停止时的回调动作。如下:
Server_state_observer server_state_observer = {
sizeof(Server_state_observer),
group_replication_before_handle_connection, //before the client connects to the server
group_replication_before_recovery, //before recovery
group_replication_after_engine_recovery, //after engine recovery
group_replication_after_recovery, //after recovery
group_replication_before_server_shutdown, //before shutdown
group_replication_after_server_shutdown, //after shutdown
};
但其实group_replication并不关心这些操作,所以回调函数基本直接返回。
/*
DBMS lifecycle events observers.
*/
int group_replication_before_handle_connection(Server_state_param *param)
{
if (wait_on_engine_initialization)
{
delayed_initialization_thread->signal_thread_ready();
delayed_initialization_thread->wait_for_read_mode();
}
return 0;
}
int group_replication_before_recovery(Server_state_param *param)
{
return 0;
}
int group_replication_after_engine_recovery(Server_state_param *param)
{
return 0;
}
int group_replication_after_recovery(Server_state_param *param)
{
return 0;
}
int group_replication_before_server_shutdown(Server_state_param *param)
{
return 0;
}
int group_replication_after_server_shutdown(Server_state_param *param)
{
server_shutdown_status= true;
plugin_group_replication_stop();
return 0;
}
在用户链接时,需要做如下操作, 暂时没有细究,应该是在切换,或者初始化时等待状态的切换完成。
int group_replication_before_handle_connection(Server_state_param *param)
{
if (wait_on_engine_initialization)
{
delayed_initialization_thread->signal_thread_ready();
delayed_initialization_thread->wait_for_read_mode();
}
return 0;
}
在数据库shutdown时,需要关闭MGR插件,所以执行函数plugin_group_replication_stop().
int group_replication_after_server_shutdown(Server_state_param *param)
{
server_shutdown_status= true;
plugin_group_replication_stop();
return 0;
}
4.2 Register a transaction observer
注册事务操作回调
if (register_trans_observer(&trans_observer, (void *)plugin_info_ptr))
{
/* purecov: begin inspected */
log_message(MY_ERROR_LEVEL,
"Failure when registering the transactions state observers");
return 1;
/* purecov: end */
}
事务操作的前后需要进行的回调操作,如下,主要的功能点也在这里实现。
Trans_observer trans_observer = {
sizeof(Trans_observer),
group_replication_trans_before_dml,
group_replication_trans_before_commit,
group_replication_trans_before_rollback,
group_replication_trans_after_commit,
group_replication_trans_after_rollback,
};
- group_replication_trans_before_dml
在进行dml操作前,MGR需要提前进行一些检查操作,如下
- group_replication_trans_before_commit
更多的一致性实现过程在group_replication_trans_before_commit中。
4.3 Register a binlog transmit observer
这部分注册,用于在binlog的发送过程中进行回调
if (register_binlog_transmit_observer(&binlog_transmit_observer,
(void *)plugin_info_ptr))
{
/* purecov: begin inspected */
log_message(MY_ERROR_LEVEL,
"Failure when registering the binlog state observers");
return 1;
/* purecov: end */
}
这部分分为如下几种情况,在semisync插件中这部分应用的很多,但是在group_replication插件中,这部分基本没有使用,也就是回调函数不做操作。只有group_replication_reset_master_logs,设置了一个内部的变量known_server_reset;
Binlog_transmit_observer binlog_transmit_observer = {
sizeof(Binlog_transmit_observer),
group_replication_transmit_start, // transmit_start,
group_replication_transmit_stop, // transmit_stop,
group_replication_reserve_header, // reserve_header,
group_replication_before_send_event, // before_send_event,
group_replication_after_send_event, // after_send_event,
group_replication_reset_master_logs // reset_master
};
五、其它的初始化操作
//Initialize the recovery SSL option map
initialize_ssl_option_map();
//Initialize channel observation and auto increment handlers before start
auto_increment_handler= new Plugin_group_replication_auto_increment();
channel_observation_manager= new Channel_observation_manager(plugin_info);
view_change_notifier= new Plugin_gcs_view_modification_notifier();
gcs_module= new Gcs_operations();
//Initialize the compatibility module before starting
init_compatibility_manager();
另外如果开启了初始化时启动mgr,则会调用启动函数
plugin_is_auto_starting= start_group_replication_at_boot_var;
if (start_group_replication_at_boot_var && plugin_group_replication_start())
{
log_message(MY_ERROR_LEVEL,
"Unable to start Group Replication on boot");
}