之前一直没有对semi sync做太多的关注,原因是线上的生产环境使用的非常少,最近需要提升semisync的生产环境优先级,以适应数据保护非常严格的场景,借此机会了解一下semisync,顺便过了一下大部分代码,以帮助后续的性能优化工作,以下分析基于代码MySQL5.6.13
如何配置
semisync的配置非常简单,采用MySQL Plugin的方式,在主库和备库上分别安装不同的插件,当然你也可以主库备库全部标准安装上,使用参数来控制sesmisync;
INSTALL PLUGIN rpl_semi_sync_master SONAME 'semisync_master.so'; INSTALL PLUGIN rpl_semi_sync_slave SONAME 'semisync_slave.so';
正常情况下,这两个PLUGIN都应该被标准安装,谁知道哪天备库会不会被切换成主库呢。
安装完PLUGIN后,我们可以根据拓扑结构来定义主库和备库的配置,主要包括以下几个配置项;
1.rpl_semi_sync_master_enabled—控制主库上是否开启semisync, 打开或关闭,立刻生效2.rpl_semi_sync_slave_enabled—控制备库是否开启semisync,当主库打开semisync时,则必须至少要有一个链接的备库是打开semisync的,否则主库线程每次都会去等待,直至超时;因此如果想关闭semisync必须要先关闭主库配置,再关闭备库配置
3.rpl_semi_sync_master_timeout
—控制主库上客户端的等待时间,当超过这么长时间等待后,客户端返回,同步复制退化成原生的异步复制
单位为毫秒,默认值为10000,即10秒
默认打开,表示当备库起来后,并跟上主库时,自动切换到同步模式,如果关闭,即使备库起来并跟上了,也不会启用半同步;
— 输出监控信息的级别,详细点击见文档,不同的级别,可能输出更详细的信息,用于DEBUG
运行状态变量也比较丰富,不细说了,网上介绍的很多,官方文档也很详细
root@(none) 02:58:27>show status like '%semi%'; +--------------------------------------------+-------+ | Variable_name | Value | +--------------------------------------------+-------+ | Rpl_semi_sync_master_clients | 0 | | Rpl_semi_sync_master_net_avg_wait_time | 0 | | Rpl_semi_sync_master_net_wait_time | 0 | | Rpl_semi_sync_master_net_waits | 0 | | Rpl_semi_sync_master_no_times | 0 | | Rpl_semi_sync_master_no_tx | 0 | | Rpl_semi_sync_master_status | OFF | | Rpl_semi_sync_master_timefunc_failures | 0 | | Rpl_semi_sync_master_tx_avg_wait_time | 0 | | Rpl_semi_sync_master_tx_wait_time | 0 | | Rpl_semi_sync_master_tx_waits | 0 | | Rpl_semi_sync_master_wait_pos_backtraverse | 0 | | Rpl_semi_sync_master_wait_sessions | 0 | | Rpl_semi_sync_master_yes_tx | 0 | | Rpl_semi_sync_slave_status | OFF | +--------------------------------------------+-------+ 15 rows in set (0.00 sec)
源代码实现
采用plugin +代码HOOK的方式实现,通过HOOK来回调在plugin中定义的函数
例如:
RUN_HOOK(transaction, after_commit, (head, all));
RUN_HOOK的定义在rpl_hander.h中:
#define RUN_HOOK(group, hook, args) \ (group ##_delegate->is_empty() ? \ 0 : group ##_delegate->hook args)
因此上例被转化成
transaction_delegate->after_commit(head, all);
更具体的回调接口函数在sql/rpl_hander.cc文件中定义
有四类_delegate对象
binlog_storage_delegate、transaction_delegate、binlog_transmit_delegate、binlog_relay_io_delegate
主库
在主库semisync加载或初始化时,调用函数semi_sync_master_plugin_init,为transaction_delegate,binlog_transmit_delegate和binlog_transmit_delegate增加了observer,分别对应该plugin的变量为trans_observer,storage_observer,transmit_observer,这三个obeserver定义了各自的函数接口,如下:
Trans_observer trans_observer = { sizeof(Trans_observer), // len repl_semi_report_commit, // after_commit repl_semi_report_rollback, // after_rollback }; Binlog_storage_observer storage_observer = { sizeof(Binlog_storage_observer), // len repl_semi_report_binlog_update, // report_update }; Binlog_transmit_observer transmit_observer = { sizeof(Binlog_transmit_observer), // len repl_semi_binlog_dump_start, // start repl_semi_binlog_dump_end, // stop repl_semi_reserve_header, // reserve_header repl_semi_before_send_event, // before_send_event repl_semi_after_send_event, // after_send_event repl_semi_reset_master, // reset };
所有从server层向plugin的函数调用,都是通过函数指针来完成的,因此我们只需要搞清楚上述几个函数的调用逻辑,基本就可以清楚semisync plugin在MySQL里的处理逻辑
a.备库连接到主库时
调用函数repl_semi_binlog_dump_start
HOOK注入点:
mysql_binlog_send: 912 if (log_warnings > 1) 913 sql_print_information("Start binlog_dump to master_thread_id(%lu) slave_server(%d), pos(%s, %lu)", 914 thd->thread_id, thd->server_id, log_ident, (ulong)pos); 915 if (RUN_HOOK(binlog_transmit, transmit_start, (thd, 0/*flags*/, log_ident, pos))) 916 { 917 errmsg= "Failed to run hook 'transmit_start'"; 918 my_errno= ER_UNKNOWN_ERROR; 919 GOTO_ERR; 920 }
mysql_binlog_send用于每一个binlog dump请求,在开始dump之前,调用repl_semi_binlog_dump_start,该函数主要做以下几件事情:
1.首先判断连接过来的备库是否是开启semisync的,通过备库用户是否设置了变量rpl_semi_sync_slave来决定(ReplSemiSyncMaster::is_semi_sync_slave())
2.如果备库开启了semisync,增加连接的备库计数(repl_semisync.add_slave()),计数器用变量rpl_semi_sync_master_clients来维持,LOCK_binlog_锁来保护
3.
repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos); 后面介绍
b.当备库从主库上断开时,会调用repl_semi_binlog_dump_end
将计数器rpl_semi_sync_master_clients减1
c.我们以执行一条DML为例:
执行一条简单的SQL:insert into t1 values (1);
1.当将binlog写入到文件中后(尚未sync),调用repl_semisync.writeTranxInBinlog
HOOK位置(sql/binlog.cc):
6699 /* 6700 If the flush finished successfully, we can call the after_flush 6701 hook. Being invoked here, we have the guarantee that the hook is 6702 executed before the before/after_send_hooks on the dump thread 6703 preventing race conditions among these plug-ins. 6704 */ 6705 if (flush_error == 0) 6706 { 6707 const char *file_name_ptr= log_file_name + dirname_length(log_file_name); 6708 DBUG_ASSERT(flush_end_pos != 0); 6709 if (RUN_HOOK(binlog_storage, after_flush, 6710 (thd, file_name_ptr, flush_end_pos))) 6711 { 6712 sql_print_error("Failed to run 'after_flush' hooks"); 6713 flush_error= ER_ERROR_ON_WRITE; 6714 } 6715 6716 signal_update(); 6717 DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE();); 6718 }
repl_semisync.writeTranxInBinlog会进一步的调用repl_semisync.writeTranxInBinlog(如果主库的semisync打开的话)来存储当前的binlog文件名和偏移量, 更新当前最大的事务binlog位置,存储在repl_semisync对象的commit_file_name_和commit_file_pos_中(commit_file_name_inited_被设置为TRUE);然后将该事务的位点信息存储到active_tranxs中(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos)),这是一个链表,用来存储所有活跃的事务的位点信息,每个新加的节点都能保证位点在已有节点之后;另外还维持了一个key->value的数组,数组下标即为事务binlog坐标计算的hash,值为相同hash值的链表
这些操作都在锁LOCK_binlog_的保护下进行的, 即使semisync退化成同步状态,也会继续更新位点(但不写事务节点),主要是为了监控后续SLAVE时候能够跟上当前的事务Bilog状态;
事实上,有两个变量来控制SEMISYNC是否开启:
state_ 为true表示同步,为false表示异步状态,semi sync退化时会修改改变量(ReplSemiSyncMaster::switch_off)
另外一个变量是master_enabled_,这个是由参数rpl_semi_sync_master_enabled来控制的。
2.在事务commit之后,调用函数repl_semi_report_commit
HOOK位置 (sql/binlog.cc)
6388 void 6389 MYSQL_BIN_LOG::process_after_commit_stage_queue(THD *thd, THD *first) 6390 { 6391 Thread_excursion excursion(thd); 6392 for (THD *head= first; head; head= head->next_to_commit) 6393 { 6394 if (head->transaction.flags.run_hooks && 6395 head->commit_error == THD::CE_NONE) 6396 { 6397 if (excursion.attach_to(head)) 6398 { 6399 head->commit_error= THD::CE_COMMIT_ERROR; 6400 sql_print_error("Out of memory while attaching to session thread " 6401 "during the group commit phase."); 6402 } 6403 if (head->commit_error == THD::CE_NONE) 6404 { 6405 bool all= head->transaction.flags.real_commit; 6406 (void) RUN_HOOK(transaction, after_commit, (head, all)); 6407 /* 6408 When after_commit finished for the transaction, clear the run_hooks flag. 6409 This allow other parts of the system to check if after_commit was called. 6410 */ 6411 head->transaction.flags.run_hooks= false; 6412 } 6413 } 6414 } 6415 }
process_after_commit_stage_queue在GROUP COMMIT的第三个阶段,即COMMIT阶段,leader线程来调用,依次为其他线程调用repl_semi_report_commit,进一步的,调用repl_semisync.commitTrx(binlog_name, param->log_pos)
ReplSemiSyncMaster::commitTrx的参数是事务的binlog坐标, 该函数是实现客户端同步等待的主要部分,主要做以下事情:
1)用户线程进入新的状态:
”Waiting for semi-sync ACK from slave“
从SHOW PROCESSLIST的输出可以看到如上信息
2)检查当前的semisync状态:
/* This is the real check inside the mutex. */
if (!getMasterEnabled() || !is_on())
goto l_end;
这是在持有锁的状态下进行的检查;
3)根据wait_timeout_ 设置超时时间变量
随后进入while循环,
while (is_on())
{
……
}
只要semisync没有退化到异步状态,就会一直在while循环中等待,直到超时或者获得备库反馈;
while循环内的工作包括:
(1)判断:
当reply_file_name_inited_为true时,如果reply_file_name_及reply_file_pos_大于当前事务等待的位置,表示备库已经收到了比当前位置更后的事务,这时候无需等待,直接返回;
当wait_file_name_inited_为true时,比较当前事务位置和(wait_file_name_,wait_file_pos_)的大小,如果当期事务更小,则将wait_file_pos_和wait_file_name_设置为当前事务的值;
否则,若wait_file_name_inited_为false,将wait_file_name_inited_设置为TRUE,同样将上述两个变量设置为当前事务的值;
这么做的目的是为了维持当前需要等待的最小binlog位置
(2)增加等待线程计数rpl_semi_sync_master_wait_sessions++
wait_result = cond_timewait(&abstime); 线程进入condition wait
在唤醒或超时后rpl_semi_sync_master_wait_sessions–
如果是等待超时的,rpl_semi_sync_master_wait_timeouts++,并关闭semisync (switch_off(),将state_/wait_file_name_inited_/reply_file_name_inited_设置为false,rpl_semi_sync_master_off_times++,同时唤醒其他等待的线程(COND_binlog_send_))
如果是被唤醒的,则增加计数:rpl_semi_sync_master_trx_wait_num++、rpl_semi_sync_master_trx_wait_time += wait_time, 然后回到1),去检查相关变量;
4).退出循环后,更新计数
/* Update the status counter. */ if (is_on()) rpl_semi_sync_master_yes_transactions++; else rpl_semi_sync_master_no_transactions++;
然后返回;
可以看到,上述关键的一步是对(reply_file_name_,reply_file_pos_)的判断,以决定是否需要继续等待;该变量的更新由dump线程来触发
d.dump线程的处理逻辑
那么在执行一条事务后,dump线程会有哪些调用逻辑呢?
1.开始发送binlog之前需要重置packet(reset_transmit_packet)
调用函数repl_semi_reserve_header,用于在Packet的头部预留字节,以维护和备库的交互信息,目前共预留3个字节
这里在packet的头部拷贝两个字节,值为ReplSemiSyncBase::kSyncHeader,固定值,作为MAGIC NUMBER
const unsigned char ReplSemiSyncBase::kSyncHeader[2] = {ReplSemiSyncBase::kPacketMagicNum, 0};
只有备库开启了semisync的情况下,才会去保留额外的packet头部比特位,不管主库是否开启了semisync
2.在发送binlog事件之前调用repl_semi_before_send_event,确认是否需要备库反馈,通过设置之前预留的三个字节的第3个字节
HOOK位置(函数mysql_binlog_send)
1659 if (RUN_HOOK(binlog_transmit, before_send_event, 1660 (thd, 0/*flags*/, packet, log_file_name, pos))) 1661 { 1662 my_errno= ER_UNKNOWN_ERROR; 1663 errmsg= "run 'before_send_event' hook failed"; 1664 GOTO_ERR; 1665 }
repl_semi_before_send_event->
repl_semisync.updateSyncHeader(packet,
log_file,
log_pos,
param->server_id);
该函数执行以下步骤,目的是填充上一步保留的头部字节:
1)检查主库和备库是否同时打开了semisync,如果没有,直接返回
2)加锁LOCK_binlog_,再次检查主库是否开启semisync
设置sync为false;
3)如果当前semisync是同步状态(即state_为TRUE)
同样的先检查当前的binlog位点是否有其他备库已经接受到(reply_file_name_inited_为true,并且<reply_file_name_, reply_file_pos_>比当前dump线程的位点要大);则sync为false,goto l_end
如果当前正在等待的事务最小位点(wait_file_name_,wait_file_pos_)比当前dump线程的位点要小(或者wait_file_name_inited_为false,只有当前dump线程),再次检查当前dump线程的bin log位点是否是事务的最后一个事件(通过遍历由函数repl_semisync.writeTranxInBinlog产生的事务节点Hash链表来检查),如果是的话,则设置sync为true
4)如果当前semisync为异步状态(state_为FALSE)
当commit_file_name_inited_为TRUE时(事务提交位点信息被更新过,在函数repl_semisync.writeTranxInBinlog中),如果dump线程的位点大于等于上次事务提交的位点(commit_file_name_, commit_file_pos_),表示当前dump线程已经追赶上了主库位点,因此sync被设置为TRUE,
当commit_file_name_inited_为false时,表示还没有事务提交位点信息,同样设置sync为TRUE;
5)当sync为TRUE时,设置packet头部,告诉备库需要其提供反馈
if (sync) { (packet)[2] = kPacketFlagSync; }
plugin/semisync/semisync.cc:
const unsigned char ReplSemiSyncBase::kPacketFlagSync = 0x01;
3.在发送事件后,调用函数repl_semi_after_send_event来读取备库的反馈
HOOK位置(mysql_binlog_send)
1687 if (RUN_HOOK(binlog_transmit, after_send_event, (thd, 0/*flags*/, packet, 1688 log_file_name, skip_group ? pos : 0))) 1689 { 1690 my_errno= ER_UNKNOWN_ERROR; 1691 errmsg= "Failed to run hook 'after_send_event'"; 1692 GOTO_ERR; 1693 }
如果该事件需要skip,调用ReplSemiSyncMaster::skipSlaveReply,否则调用ReplSemiSyncMaster::readSlaveReply;前者只需要判断是否设置了事件的头部需要sync,如果是的,则调用reportReplyBinlog; 后者则先读取备库传递的数据包,从中读出备库传递的binlog坐标信息,函数ReplSemiSyncMaster::readSlaveReply主要有如下流程:
1)如果无需等待,直接返回,即没有设置sync标记;
此时可以保证这是事务的最后一个事件;
2)做一次net_flush将缓冲的数据刷走,然后net_clear,等待备库的反馈的数据包(my_net_read)
从my_net_read返回后,更新统计信息
rpl_semi_sync_master_net_wait_num++; rpl_semi_sync_master_net_wait_time += wait_time;
从数据包中读取备库传递过来的binlog位点信息,然后调用reportReplyBinlog:
result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
3)ReplSemiSyncMaster::reportReplyBinlog的流程如下:
<1>检查主库semisync是否打开,如果没有,goto l_end;
<2>如果当前semisync为异步状态,尝试将其切换为同步状态,(try_switch_on(server_id, log_file_name, log_file_pos);)
<3>如果reply_file_name_inited_为true(大多数情况)
比较当前dump线程接收到备库反馈的位点信息与(reply_file_name_, reply_file_pos_)做对比,如果小于后者,说明已经有别的备库读到更新的事务了,这时候无需更新(reply_file_name_, reply_file_pos_)
semisync只保证全局至少有一个备库读到更新的事务
<4>如果需要,更新reply_file_pos_和reply_file_name_
556 strcpy(reply_file_name_, log_file_name); 557 reply_file_pos_ = log_file_pos; 558 reply_file_name_inited_ = true;
然后清理当前位点之前的事务节点信息
562 active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos);
<5> 若当前等待的开启semisync的备库(rpl_semi_sync_master_wait_sessions > 0) ,且当前(reply_file_name_, reply_file_pos_) 大于(wait_file_name_, wait_file_pos_),即接收到的备库反馈位点信息大于等于当前等待的事务的最小位点,则设置
can_release_threads=true;
wait_file_name_inited_ = false; –>这意味着新的等待事务,需要重新设置等待位点信息
<6>释放锁,如果can_release_threads为true,进行一次broadcast,唤醒等待的用户线程
备库
当接受到主库发送的binlog后,开启了semisync的备库需要为其发送反馈
备库同样也是为Binlog_relay_IO_delegate增加一个observer,即relay_io_observer,通过HOOK的方式回调PLUGIN的函数,主要包括如下接口函数
Binlog_relay_IO_observer relay_io_observer = { sizeof(Binlog_relay_IO_observer), // len repl_semi_slave_io_start, // start repl_semi_slave_io_end, // stop repl_semi_slave_request_dump, // request_transmit repl_semi_slave_read_event, // after_read_event repl_semi_slave_queue_event, // after_queue_event repl_semi_reset_slave, // reset };
a.开启io线程时,在连接主库之前调用repl_semi_slave_io_start
HOOK位置 (handle_slave_io)
4081 if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi))) 4082 { 4083 mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, 4084 ER(ER_SLAVE_FATAL_ERROR), "Failed to run 'thread_start' hook"); 4085 goto err; 4086 }
主要就是设置全局变量rpl_semi_sync_slave_status,如果开启了备库的semisync则设置该变量为TRUE;
b.连接完主库后,请求发起dump时,调用repl_semi_slave_request_dump
HOOK位置 (handle_slave_io->request_dump)
3026 if (RUN_HOOK(binlog_relay_io, 3027 before_request_transmit, 3028 (thd, mi, binlog_flags))) 3029 goto err;
该函数用户检查主库是否支持SEMISYNC(检查是否存在rpl_semi_sync_master_enabled变量),如果支持的话,设置用户变量为rpl_semi_sync_slave:
query= "SET @rpl_semi_sync_slave= 1"; if (mysql_real_query(mysql, query, strlen(query)))
在主库上就是通过@rpl_semi_sync_slave来鉴别一个dump请求的SLAVE是否是开启semisync的;
初始化成功后,我们给主库一些DML,继续debug
c.读取事件,调用函数repl_semi_slave_read_event
HOOK位置 (handle_slave_io)
4274 if (RUN_HOOK(binlog_relay_io, after_read_event, 4275 (thd, mi,(const char*)mysql->net.read_pos + 1, 4276 event_len, &event_buf, &event_len))) 4277 {
由于我们在主库上是对packet头部有附加了3个比特的,这里需要将其读出来,同时需要更新event_buf及event_len的值;
如果rpl_semi_sync_slave_status为false,表示开启io线程时未打开semisync,直接使用packet的长度即可,否则调用ReplSemiSyncSlave::slaveReadSyncHeader,去读取packet的头部信息,如果需要给主库一个ack,则设置semi_sync_need_reply为TRUE
d.当将binlog写入relaylog之后(即完成函数queue_event之后),调用repl_semi_slave_queue_event
HOOK位置 (handle_slave_io)
4295 if (RUN_HOOK(binlog_relay_io, after_queue_event, 4296 (thd, mi, event_buf, event_len, synced))) 4297 { 4298 mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, 4299 ER(ER_SLAVE_FATAL_ERROR), 4300 "Failed to run 'after_queue_event' hook"); 4301 goto err; 4302 }
如果备库开启了semisync且需要ack时(pl_semi_sync_slave_status && semi_sync_need_reply),调用ReplSemiSyncSlave::slaveReply,向主库发送数据包,包括当前的binlog文件名及偏移量信息
e.停止IO线程时,调用函数repl_semi_slave_io_end,将rpl_semi_sync_slave_status设置为false,这里判断的mysql_reply实际上不会用到;
存在的问题主要集中在主库上:
1.锁的粒度太粗,看看能不能细化,或者使用lock-free的算法来优化
2.字符串比较的调用,大部分是对binlog文件名的比较,实际上只要比较后面的那一串数字就足够了;尝试下看看能否有性能优化