二十:从库MTS多线程并行回放(二)(笔记)

简介: 一、工作线程执行Event外部循环slave_worker_exec_job_group ->pop_jobs_item 出队 获取event 但是不删除 如果队列为空则等待 stage_slave_waiting_event_from_coordinator Wa...

一、工作线程执行Event

外部循环
slave_worker_exec_job_group
  ->pop_jobs_item 出队 获取event 但是不删除
    如果队列为空则等待
    stage_slave_waiting_event_from_coordinator
    Waiting for an event from Coordinator
  ->进入循环
     
    如果是GTID EVENT 标记事物开始
    记录读取event时间
    Slave_worker::slave_worker_exec_event 执行event
    -> 如果是XID
        Xid_apply_log_event::do_apply_event_worker    
        
        -> Slave_worker::commit_positions
           -> 设置各种变量 将会出现在worker info里面 这里也可以看到这些信息是group的checkpoint信息
                  
                  当前执行的位置:
                  strmake(group_relay_log_name, ptr_g->group_relay_log_name,sizeof(group_relay_log_name) - 1);
                  group_relay_log_pos= ev->future_event_relay_log_pos;
                  set_group_master_log_pos(ev->common_header->log_pos);
                  set_group_master_log_name(c_rli->get_group_master_log_name());
                  
                  检查信息:
                  strmake(checkpoint_relay_log_name, ptr_g->checkpoint_relay_log_name,sizeof(checkpoint_relay_log_name) - 1);
                  checkpoint_relay_log_pos= ptr_g->checkpoint_relay_log_pos;
                  strmake(checkpoint_master_log_name, ptr_g->checkpoint_log_name,sizeof(checkpoint_master_log_name) - 1);
                  checkpoint_master_log_pos= ptr_g->checkpoint_log_pos;
                  
                  
                  checkpoint_seqno= ptr_g->checkpoint_seqno;
                      for (uint pos= ptr_g->shifted; pos < c_rli->checkpoint_group; pos++) //重新设置位图 因为checkpoint已经 
                     {                                                                     //ptr_g->shifted是GAQ中出队的事务个数
                        if (bitmap_is_set(&group_shifted, pos))                            //这里就需要偏移掉出队的事务,恢复已经不需要了
                        bitmap_set_bit(&group_executed, pos - ptr_g->shifted);
                      }
                  bitmap_set_bit(&group_executed, ptr_g->checkpoint_seqno);//在本次事务相应的位置设置为1
                  
             -> 进行flush_info(force)操作刷盘 将信息写入到worker info 表中
  
        -> error= do_commit(thd); //做提交操作 

  ->remove_item_from_jobs 在队列中删除这个event 
    
    如果是XID event 则退出循环 代表整个 事物执行完成
    
    Slave_worker::slave_worker_ends_group
     -> get_commit_order_manager()->report_commit(this);
         
     ->更新Slave_job_group信息
            ptr_g->group_master_log_pos= get_group_master_log_pos(); //更新 job group信息
            ptr_g->group_relay_log_pos= group_relay_log_pos;
            my_atomic_store32(&ptr_g->done, 1);
            last_group_done_index= gaq_index;
            last_groups_assigned_index= ptr_g->total_seqno;
            reset_gaq_index();
            groups_done++; //更新GROUP信息 这里也可以看到GROUP的信息来自work的类      
     ->
相关文章
|
6月前
|
消息中间件 存储 前端开发
[笔记]C++并发编程实战 《四》同步并发操作(三)
[笔记]C++并发编程实战 《四》同步并发操作(三)
|
存储 Java
2021年了,生产环境的问题你怎么解决呢?快学习下线程Dump分析(上)
2021年了,生产环境的问题你怎么解决呢?快学习下线程Dump分析
321 0
2021年了,生产环境的问题你怎么解决呢?快学习下线程Dump分析(上)
|
9月前
|
算法 Docker Python
二十七 | 案例篇:为什么我的磁盘I/O延迟很高?
二十七 | 案例篇:为什么我的磁盘I/O延迟很高?
223 0
|
6月前
|
存储 前端开发 Unix
[笔记]C++并发编程实战 《四》同步并发操作(二)
[笔记]C++并发编程实战 《四》同步并发操作(二)
|
6月前
|
存储 前端开发 安全
[笔记]C++并发编程实战 《四》同步并发操作(一)
[笔记]C++并发编程实战 《四》同步并发操作
|
6月前
|
存储 前端开发 调度
[笔记]C++并发编程实战 《四》同步并发操作(四)
[笔记]C++并发编程实战 《四》同步并发操作(四)
|
9月前
|
设计模式 算法 安全
并发 并行 同步 异步 你分清了吗
并发 并行 同步 异步 你分清了吗
|
IDE 物联网 测试技术
项目实战:Qt并发服务器通讯,受同一时刻最大线程数限制(笔者本本同一时刻600多)
项目实战:Qt并发服务器通讯,受同一时刻最大线程数限制(笔者本本同一时刻600多)
项目实战:Qt并发服务器通讯,受同一时刻最大线程数限制(笔者本本同一时刻600多)
Java性能优化实践:异步多线程+同步执行(下)
Java性能优化实践:异步多线程+同步执行(下)
|
存储 前端开发 Java
Java性能优化实践:异步多线程+同步执行(上)
Java性能优化实践:异步多线程+同步执行(上)