MySQL:MGR 学习(2):Write set(写集合)的写入过程

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介:

水平有限,有误请谅解。
源码版本5.7.22


一、前文总结

前文
MySQL:MGR 学习(1):写集合(Write set) 中
已经说明了Write set的生成过程,但是Write set是需要封装如下Transaction_context_log_event中进行广播到其他节点进行认证的。本文就描述Write set的写入和广播的过程。
如前文所描述,整个事物的Write set在函数binlog_log_row中生成,对于5.7来讲每一行的每个唯一键都会生成一个Write set(但是咨询宋利兵老师得知8.0唯一键不会再记录Write set了),每个Write set实际上是一个8字节的uint64类型其通过hash函数生成,并且在Rpl_transaction_write_set_ctx存储了一个vector数组和一个set集合来分别存储,如果修改的行比较多那么可能需要一个更多内存来存储这些hash值,虽然8字节比较小,但是如果是大事物上千万的表在一个事物里面做修改那么内存可能消耗会上百兆。如下图是事物执行期间(commit之前)最终形成的Write set内存空间示意图。
image.png

二、Transaction_context_log_event生成的时机

在事物执行期间会生成map event/query event/dml event等并且会源源不断的写入到binlog cache中,同时会将Write set 不断的写入到Rpl_transaction_write_set_ctx保存在内存中,这些逻辑都在binlog_log_row中。但是Transaction_context_log_event的生成却是在commit的时候,具体的位置是在MYSQL_BIN_LOG::prepare之后但是在MYSQL_BIN_LOG::ordered_commit之前,显而易见这个时候的binlog event还在bing cache中,还没有写入binlog file中。所以MGR的事物全局认证的动作是发生在binlog event落地之前。下面是这个栈帧:

#0  group_replication_trans_before_commit (param=0x7ffff0e7b8d0) at /root/softm/percona-server-5.7.22-22/rapid/plugin/group_replication/src/observer_trans.cc:511
#1  0x00000000014e4814 in Trans_delegate::before_commit (this=0x2e44800, thd=0x7fffd8000df0, all=false, trx_cache_log=0x7fffd8907a10, stmt_cache_log=0x7fffd8907858, 
    cache_log_max_size=18446744073709547520) at /root/softm/percona-server-5.7.22-22/sql/rpl_handler.cc:325
#2  0x000000000188a386 in MYSQL_BIN_LOG::commit (this=0x2e7b440, thd=0x7fffd8000df0, all=false) at /root/softm/percona-server-5.7.22-22/sql/binlog.cc:8974
#3  0x0000000000f80623 in ha_commit_trans (thd=0x7fffd8000df0, all=false, ignore_global_read_lock=false) at /root/softm/percona-server-5.7.22-22/sql/handler.cc:1830
#4  0x00000000016ddab9 in trans_commit_stmt (thd=0x7fffd8000df0) at /root/softm/percona-server-5.7.22-22/sql/transaction.cc:458
#5  0x00000000015d1a8d in mysql_execute_command (thd=0x7fffd8000df0, first_level=true) at /root/softm/percona-server-5.7.22-22/sql/sql_parse.cc:5293
#6  0x00000000015d3182 in mysql_parse (thd=0x7fffd8000df0, parser_state=0x7ffff0e7e600) at /root/softm/percona-server-5.7.22-22/sql/sql_parse.cc:5901
#7  0x00000000015c6d16 in dispatch_command (thd=0x7fffd8000df0, com_data=0x7ffff0e7ed70, command=COM_QUERY)
    at /root/softm/percona-server-5.7.22-22/sql/sql_parse.cc:1490
#8  0x00000000015c5aa3 in do_command (thd=0x7fffd8000df0) at /root/softm/percona-server-5.7.22-22/sql/sql_parse.cc:1021
#9  0x000000000170ebb0 in handle_connection (arg=0x3cd32d0) at /root/softm/percona-server-5.7.22-22/sql/conn_handler/connection_handler_per_thread.cc:312
#10 0x0000000001946140 in pfs_spawn_thread (arg=0x3c71630) at /root/softm/percona-server-5.7.22-22/storage/perfschema/pfs.cc:2190
#11 0x00007ffff7bc7851 in start_thread () from /lib64/libpthread.so.0
#12 0x00007ffff651290d in clone () from /lib64/libc.so.6
AI 代码解读

三、MGR全局认证发送内容的生成过程

下面是我通过对源码浅显的理解得出过程:

  • 1、获取当前的binlog cache内容记录为cache_log,这些就是已经在执行阶段生成map/query/dml event等。
  • 2、生成一个新的IO_CACHE作为临时存储为cache,目的在于存储。Transaction_context_log_event 和Gtid_log_event。
  • 3、将cache_log类型转换为READ类型同时初始化各种辅助类容如偏移量。
  • 4、初始化Transaction_context_log_event 。
  • 5、扫描Rpl_transaction_write_set_ctx中的write_set_unique 集合的内容,并且将其存储到Transaction_write_set 定义的内存空间中write_set中,注意这里只是用到了集合没用到数组。这里也就是进行Write set的一个拷贝而已其考到write_set临时变量中。
  • 6、将write_set内容填充到Transaction_context_log_event中,整个过程还会做base64的转换,最终填充到event的是base64格式的Write set类容。完成后析构write_set来临时变量
  • 7、 将Transaction_context_log_event写入到第二步定义的cache中。
  • 8、生成Gtid_log_event,只是做一些初始化动作,Gtid并没有生成。
  • 9、将Gtid_log_event写入到第二步定义的cache中。
  • 10、通过cache+cache_log的总和来对比group_replication_transaction_size_limit设置的值,也就是判断整个事物的binlog event是否操作了参数设置。
  • 11、将cache类型转换为READ类型同时初始化各种辅助类容如偏移量。
  • 12、将cache和cache_log分别写入到到transaction_msg中。
  • 13、流控相关,没仔细看,如果有机会学习流控机制在仔细学习。
  • 14、gcs_module负责发送transaction_msg到各个节点
  • 15、挂起等待事物认证的结果。

那么整个过程大概就是:

  • 经过hash的Write set (集合)->拷贝到write_set变量(类数组)->通过base64算法写入到Transaction_context_log_event ->合并其他binlog event到transaction_msg->gcs_module广播transaction_msg到其他节点->等待认证结果

四、相关源码

1、group_replication_trans_before_commit 函数相关内容
 if (trx_cache_log_position > 0 && stmt_cache_log_position == 0) //如果存在事物cache
  {
    cache_log= param->trx_cache_log; //设置到IO_cache
    cache_log_position= trx_cache_log_position;
  }
  else if (trx_cache_log_position == 0 && stmt_cache_log_position > 0)//如果存在语句cache
  {
    cache_log= param->stmt_cache_log;
    cache_log_position= stmt_cache_log_position;
    is_dml= false;
    may_have_sbr_stmts= true;
  }
  else
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "We can only use one cache type at a "
                                "time on session %u", param->thread_id);
    shared_plugin_stop_lock->release_read_lock();
    DBUG_RETURN(1);
    /* purecov: end */
  }

  applier_module->get_pipeline_stats_member_collector()
      ->increment_transactions_local();

  DBUG_ASSERT(cache_log->type == WRITE_CACHE);
  DBUG_PRINT("cache_log", ("thread_id: %u, trx_cache_log_position: %llu,"
                           " stmt_cache_log_position: %llu",
                           param->thread_id, trx_cache_log_position,
                           stmt_cache_log_position));

  /*
    Open group replication cache.
    Reuse the same cache on each session for improved performance.
  */
  cache= observer_trans_get_io_cache(param->thread_id,
                                     param->cache_log_max_size); //获取一个新的IO_CACHE系统
  if (cache == NULL) //错误处理
  {
    /* purecov: begin inspected */
    error= pre_wait_error;
    goto err;
    /* purecov: end */
  }

  // Reinit binlog cache to read.
  if (reinit_cache(cache_log, READ_CACHE, 0)) ////将IO_CACHE类型进行转换 并且位置还原
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Failed to reinit binlog cache log for read "
                                "on session %u", param->thread_id);
    error= pre_wait_error;
    goto err;
    /* purecov: end */
  }
  
  /*
    After this, cache_log should be reinit to old saved value when we
    are going out of the function scope.
  */
  reinit_cache_log_required= true;

  // Create transaction context.
  tcle= new Transaction_context_log_event(param->server_uuid, Rpl_transaction_write_set_ctx
                                          is_dml,
                                          param->thread_id,
                                          is_gtid_specified); //初始化 Transaction_context_log_event
  if (!tcle->is_valid())
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL,
                "Failed to create the context of the current "
                "transaction on session %u", param->thread_id);
    error= pre_wait_error;
    goto err;
    /* purecov: end */
  }

  if (is_dml)
  {
    Transaction_write_set* write_set= get_transaction_write_set(param->thread_id);// 获取前期得到write set 并且放回到一个临时内存空间 write_set中 
    /*
      When GTID is specified we may have empty transactions, that is,
      a transaction may have not write set at all because it didn't
      change any data, it will just persist that GTID as applied.
    */
    if ((write_set == NULL) && (!is_gtid_specified)) 
    {
      log_message(MY_ERROR_LEVEL, "Failed to extract the set of items written "
                                  "during the execution of the current "
                                  "transaction on session %u", param->thread_id);
      error= pre_wait_error;
      goto err;
    }

    if (write_set != NULL)
    {
      if (add_write_set(tcle, write_set))//将整个wirte_set内容复制到event Transaction_context_log_event中 此时就进入了event了
      {
        /* purecov: begin inspected */
        cleanup_transaction_write_set(write_set); //write set已经完成了它的功能需要析构
        log_message(MY_ERROR_LEVEL, "Failed to gather the set of items written "
                                    "during the execution of the current "
                                    "transaction on session %u", param->thread_id);
        error= pre_wait_error;
        goto err;
        /* purecov: end */
      } 
      cleanup_transaction_write_set(write_set); //如果add_write_set函数调用出现 有问题 也需要析构掉
      DBUG_ASSERT(is_gtid_specified || (tcle->get_write_set()->size() > 0));
    }
    else
    {
      /*
        For empty transactions we should set the GTID may_have_sbr_stmts. See
        comment at binlog_cache_data::may_have_sbr_stmts().
      */
      may_have_sbr_stmts= true; 
    } Log_event::write
  }

  // Write transaction context to group replication cache.
  tcle->write(cache); //写入到MGR CACHE 写入 TCLE的header(virtual)  body(virtual)   footer

  // Write Gtid log event to group replication cache.
  gle= new Gtid_log_event(param->server_id, is_dml, 0, 1,
                          may_have_sbr_stmts,
                          gtid_specification);
  gle->write(cache); //写入GTID event到MGR CACHE 占位

  transaction_size= cache_log_position + my_b_tell(cache);
  if (is_dml && transaction_size_limit &&
     transaction_size > transaction_size_limit)
  {
    log_message(MY_ERROR_LEVEL, "Error on session %u. "
                "Transaction of size %llu exceeds specified limit %lu. "
                "To increase the limit please adjust group_replication_transaction_size_limit option.",
                param->thread_id, transaction_size,
                transaction_size_limit); //group_replication_transaction_size_limit 事物大小参数
    error= pre_wait_error;
    goto err;
  }

  // Reinit group replication cache to read.
  if (reinit_cache(cache, READ_CACHE, 0))//将IO_CACHE类型进行转换 并且位置还原
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Error while re-initializing an internal "
                                "cache, for read operations, on session %u",
                                param->thread_id);
    error= pre_wait_error;
    goto err;
    /* purecov: end */
  }

  // Copy group replication cache to buffer.
  if (transaction_msg.append_cache(cache)) //加入到transaction_msg
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Error while appending data to an internal "
                                "cache on session %u", param->thread_id);
    error= pre_wait_error;
    goto err;
    /* purecov: end */
  }

  // Copy binlog cache content to buffer.
  if (transaction_msg.append_cache(cache_log))//加入到transaction_msg
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Error while writing binary log cache on "
                                "session %u", param->thread_id);
    error= pre_wait_error;
    goto err;
    /* purecov: end */
  }



  DBUG_ASSERT(certification_latch != NULL);
  if (certification_latch->registerTicket(param->thread_id))
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Unable to register for getting notifications "
                                "regarding the outcome of the transaction on "
                                "session %u", param->thread_id);
    error= pre_wait_error;
    goto err;
    /* purecov: end */
  }

#ifndef DBUG_OFF
  DBUG_EXECUTE_IF("test_basic_CRUD_operations_sql_service_interface",
                  {
                    DBUG_SET("-d,test_basic_CRUD_operations_sql_service_interface");
                    DBUG_ASSERT(!sql_command_check());
                  };);

  DBUG_EXECUTE_IF("group_replication_before_message_broadcast",
                  {
                    const char act[]= "now wait_for waiting";
                    DBUG_ASSERT(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
                  });
#endif

  /*
    Check if member needs to throttle its transactions to avoid
    cause starvation on the group.
  */
  applier_module->get_flow_control_module()->do_wait(); //流控相关

  //Broadcast the Transaction Message
  send_error= gcs_module->send_message(transaction_msg); //gcs广播
  if (send_error == GCS_MESSAGE_TOO_BIG)
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Error broadcasting transaction to the group "
                                "on session %u. Message is too big.",
                                param->thread_id);
    error= pre_wait_error;
    goto err;
    /* purecov: end */
  }
  else if (send_error == GCS_NOK)
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Error while broadcasting the transaction to "
                                "the group on session %u", param->thread_id);
    error= pre_wait_error;
    goto err;
    /* purecov: end */
  }

  shared_plugin_stop_lock->release_read_lock();

  DBUG_ASSERT(certification_latch != NULL);
  if (certification_latch->waitTicket(param->thread_id)) //等待认证结果
  {
    /* purecov: begin inspected */
    log_message(MY_ERROR_LEVEL, "Error while waiting for conflict detection "
                                "procedure to finish on session %u",
                                param->thread_id);
    error= post_wait_error;
    goto err;
    /* purecov: end */
  }
AI 代码解读
2、add_write_set函数
int add_write_set(Transaction_context_log_event *tcle,
                  Transaction_write_set *set)
{
  DBUG_ENTER("add_write_set");
  int iterator= set->write_set_size; //将循环次数设置为 set的长度 也就是有多少个write sets
  for (int i = 0; i < iterator; i++)
  {
    uchar buff[BUFFER_READ_PKE];
    int8store(buff, set->write_set[i]); //逐字节复制到buffer中
    uint64 const tmp_str_sz= base64_needed_encoded_length((uint64) BUFFER_READ_PKE);
    char *write_set_value= (char *) my_malloc(PSI_NOT_INSTRUMENTED,
                                              static_cast<size_t>(tmp_str_sz), MYF(MY_WME)); //13bytes (gdb) p tmp_str_sz $2 = 13
    if (!write_set_value)//分配内存错误
    { 
      /* purecov: begin inspected */
      log_message(MY_ERROR_LEVEL, "No memory to generate write identification hash");
      DBUG_RETURN(1);
      /* purecov: end */
    }

    if (base64_encode(buff, (size_t) BUFFER_READ_PKE, write_set_value)) //做base64算法 
    {
      /* purecov: begin inspected */
      log_message(MY_ERROR_LEVEL,
                  "Base 64 encoding of the write identification hash failed");
      DBUG_RETURN(1);
      /* purecov: end */
    }

    tcle->add_write_set(write_set_value); //最终将base64格式的write set写入到event中
  }
  DBUG_RETURN(0);
}
AI 代码解读
3、get_transaction_write_set函数
Transaction_write_set* get_transaction_write_set(unsigned long m_thread_id)
{
  DBUG_ENTER("get_transaction_write_set");
  THD *thd= NULL;
  Transaction_write_set *result_set= NULL;
  Find_thd_with_id find_thd_with_id(m_thread_id, false);

  thd= Global_THD_manager::get_instance()->find_thd(&find_thd_with_id);
  if (thd)
  {
    std::set<uint64> *write_set= thd->get_transaction()
        ->get_transaction_write_set_ctx()->get_write_set(); //Rpl_transaction_write_set_ctx   std::set<uint64> *get_write_set();
    unsigned long write_set_size= write_set->size(); //返回集合大小
    if (write_set_size == 0)
    {
      mysql_mutex_unlock(&thd->LOCK_thd_data);
      DBUG_RETURN(NULL);
    }

    result_set= (Transaction_write_set*)my_malloc(key_memory_write_set_extraction,
                                                  sizeof(Transaction_write_set),
                                                  MYF(0));//这里为其Transaction_write_set分配内存空间
    result_set->write_set_size= write_set_size; //获取size
    result_set->write_set=
        (unsigned long long*)my_malloc(key_memory_write_set_extraction,
                                       write_set_size *
                                       sizeof(unsigned long long),
                                       MYF(0));//分配内存
    int result_set_index= 0;
    for (std::set<uint64>::iterator it= write_set->begin();//完成复制注意是从set中复制到简单的内存中
         it != write_set->end();
         ++it)
    {
      uint64 temp= *it;
      result_set->write_set[result_set_index++]=temp;
    }
    mysql_mutex_unlock(&thd->LOCK_thd_data);
  }
  DBUG_RETURN(result_set);
}
AI 代码解读

作者微信:
微信.jpg

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
Redis 集合(Set)
10月更文挑战第17天
42 5
从HashSet到TreeSet,Java集合框架中的Set接口及其实现类以其“不重复性”要求,彻底改变了处理唯一性数据的方式。
从HashSet到TreeSet,Java集合框架中的Set接口及其实现类以其“不重复性”要求,彻底改变了处理唯一性数据的方式。HashSet基于哈希表实现,提供高效的元素操作;TreeSet则通过红黑树实现元素的自然排序,适合需要有序访问的场景。本文通过示例代码详细介绍了两者的特性和应用场景。
47 6
Java Set接口凭借其独特的“不重复”特性,在集合框架中占据重要地位
【10月更文挑战第16天】Java Set接口凭借其独特的“不重复”特性,在集合框架中占据重要地位。本文通过快速去重和高效查找两个案例,展示了Set如何简化数据处理流程,提升代码效率。使用HashSet可轻松实现数据去重,而contains方法则提供了快速查找的功能,彰显了Set在处理大量数据时的优势。
34 2
Java Set因其“无重复”特性在集合框架中独树一帜
【10月更文挑战第14天】Java Set因其“无重复”特性在集合框架中独树一帜。本文深入解析Set接口及其主要实现类(如HashSet、TreeSet)如何通过特定的数据结构(哈希表、红黑树)确保元素唯一性,并提供最佳实践建议,包括选择合适的Set实现类和正确实现自定义对象的`hashCode()`与`equals()`方法。
35 3
|
28天前
set集合
HashSet(无序,唯一): 基于 HashMap 实现的,底层采用 HashMap 来保存元素。 LinkedHashSet: LinkedHashSet 是 HashSet 的子类,并且其内部是通过 LinkedHashMap 来实现的。 TreeSet(有序,唯一): 红黑树(自平衡的排序二叉树)。
判断一个元素是否在 Java 中的 Set 集合中
【10月更文挑战第30天】使用`contains()`方法可以方便快捷地判断一个元素是否在Java中的`Set`集合中,但对于自定义对象,需要注意重写`equals()`方法以确保正确的判断结果,同时根据具体的性能需求选择合适的`Set`实现类。
在 Java 中,如何遍历一个 Set 集合?
【10月更文挑战第30天】开发者可以根据具体的需求和代码风格选择合适的遍历方式。增强for循环简洁直观,适用于大多数简单的遍历场景;迭代器则更加灵活,可在遍历过程中进行更多复杂的操作;而Lambda表达式和`forEach`方法则提供了一种更简洁的函数式编程风格的遍历方式。
Set 是 Java 集合框架中的一个接口,不包含重复元素且不保证元素顺序。
【10月更文挑战第16天】Java Set:无序之美,不重复之魅!Set 是 Java 集合框架中的一个接口,不包含重复元素且不保证元素顺序。通过 hashCode() 和 equals() 方法实现唯一性,适用于需要唯一性约束的数据处理。示例代码展示了如何使用 HashSet 添加和遍历元素,体现了 Set 的高效性和简洁性。
40 4
Set 是 Java 集合框架中的一个接口,不包含重复元素且不保证元素顺序。
Java Set:无序之美,不重复之魅!Set 是 Java 集合框架中的一个接口,不包含重复元素且不保证元素顺序。它通过 hashCode() 和 equals() 方法确保元素唯一性,适用于需要唯一性约束的数据处理。示例代码展示了如何使用 HashSet 实现这一特性。
32 5