MySQL 深潜 - X-plugin的传输协议

本文涉及的产品
云原生数据库 PolarDB 分布式版,标准版 2核8GB
RDS SQL Server Serverless,2-4RCU 50GB 3个月
推荐场景:
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
简介: 文章详细解析了X protocol的认证方式(如PLAIN、MYSQL41等)、协议格式及连接建立过程,包括服务端初始化、任务调度、请求处理等关键步骤,并结合代码示例说明认证流程。

作者:原宇

本文主要通过阅读MySQL8.0源码来介绍X plugin如何通过X protocol与客户端建立连接。

背景

MySQL5.7发布时自带了一个MySQL X插件,启用插件后,可以通过X protocol提供一个类似于MongoDB的服务。这个插件是默认加载的,show plugins可以看到。

mysql>SHOW PLUGINS;
+---------------------------------+----------+--------------------+---------+-------------+
| Name                            | Status   | Type               | Library | License     |
| mysqlx                          | ACTIVE   | DAEMON             | NULL    | GPL         |
| mysqlx_cache_cleaner            | ACTIVE   | AUDIT              | NULL    | GPL         |
+---------------------------------+----------+--------------------+---------+-------------+

X-plugin使用单独的协议(X protocol)来实现与客户端的交互,这个新协议利用了当前的行业标准protobuf(Google开发的一种语言无关、平台无关、可扩展的序列化结构数据的方法)来通信。protobuf可以将结构数据或对象转换成便于存储与传输的格式也就是序列化,同时可以保证这个序列化的结果可以被重建成原来的结构或对象。


X protocol

这个新协议主要从三个方面做了提升:可扩展性、性能以及安全性。


可扩展性

可扩展性主要来源于对protobuf的使用,其中定义的.proto格式的文件提供了X protocol的完整消息定义,protobuf可以基于.proto文件自动生成多种语言的代码。X protocol对.proto的使用可以使协议清晰明了,不再需要去分析消息格式。.proto文件中定义的信息使得在客户端连接器代码中实现X protocol变得更加容易,对协议进行编码只是所需工作的一小部分。


性能

通过X protocol可以将多个request打包成一个packet发送给X plugin的服务端,服务端会依次解析和处理每一个request,这是X-plugin的一个流水线功能(pipelining),这个功能的好处是客户端可以一次向服务器发送多个request,不再需要等待每个request的响应。


安全性

X protocol基于SASL(Simple Authentication and Security Layer,简单认证和安全层,是一种用于扩充C/S模式验证能力的机制)提供了多种认证方式。

通常包括三种方式:

  • PLAIN Authentication


  • EXTERNAL Authentication


  • MYSQL41 Authentication


PLAIN Authentication和EXTERNAL Authentication的认证过程比较简单,所以需要依赖SSL/TSL,如果服务端不支持SSL,就无法使用这两种认证方式。

MYSQL41 Authentication是一种挑战/应答的认证方式,客户端首先会发送一个认证请求,此时通常会携带需要认证的用户名,服务端查询到是合法用户后,会产生一个20字节的随机数作为“挑战”发送给客户端,客户端会将密码和随机数做Hash,生成一个字节串作为“应答”,服务端将应答串与自己的计算结果进行比较,如果相同则通过验证,反之则认证失败,将认证结果发送给客户端。


协议格式


length | type | payload

  • 4 byte length
  • 1 byte message type
  • length byte message payload

使用X protocol的客户端和服务端通过如上的数据格式来封装将要发送的数据,这称作是一个request。这里的payload就是通过protobuf序列化后的数据,也是.proto文件中定义的某一类Message,message type就是该Message的类型,例如认证时需要的Message AuthenticateStart和Message AuthenticateContinue。

message AuthenticateStart {
  required string mech_name = 1;
  optional bytes auth_data = 2;
  optional bytes initial_response = 3;
}

这个Message包括三个字段,mech_name表示认证的方法,auth_data表示认证所需的数据,initial_response表示初次响应的数据。其中required表示必不可少的数据,optional是可有可无的数据,比如PLAIN Authentication就不需要initial_responce字段,EXTERNAL Authentication不需要auth_data字段。

message AuthenticateContinue {
  required bytes auth_data = 1;
}

这个Message只有一个required字段auth_data,通常是密码和服务端指定的随机salt的运算结果,这个Message在MYSQL41 Authentication的认证过程会用到。


通过X protocol建立连接的过程


X-plugin的相关代码都在plugin/x目录下面,这个插件是默认开启的,插件的入口函数是plugin_main,这个函数首先会初始化一个X-plugin的Server对象,从如下代码可以看到,当前可以支持的认证方式有三种: PLAIN Authentication、MYSQL41 Authentication和SHA256_MEMORY Authentication。

这里没有提供EXTERNAL Authentication而且还扩展了与MYSQL41 Authentication类似的SHA256_MEMORY Authentication,两种的区别在于使用不同的Hash函数。

instance->server().add_authentication_mechanism(
        "PLAIN", Sasl_plain_auth::create, use_only_through_secure_connection);
    instance->server().add_authentication_mechanism(
        "MYSQL41", Sasl_mysql41_auth::create,
        use_only_in_non_secure_connection);
    instance->server().add_authentication_mechanism(
        "MYSQL41", Sasl_mysql41_auth::create,
        use_only_through_secure_connection);
    instance->server().add_authentication_mechanism(
        "SHA256_MEMORY", Sasl_sha256_memory_auth::create,
        use_only_in_non_secure_connection);
    instance->server().add_authentication_mechanism(
        "SHA256_MEMORY", Sasl_sha256_memory_auth::create,
        use_only_through_secure_connection);

最后,plugin_main函数会把net_thread函数下发到任务队列,交给worker线程去执行,如下所示:

instance->m_nscheduler->post(std::bind(&Server::net_thread, instance));

post函数定义如下,由于类成员函数都有一个默认的参数this作为第一个参数,这就导致了类成员函数不能直接赋值给std::function,所以这里结合std::bind将net_thread赋值给Task。

typedef std::function<void()> Task;
bool Scheduler_dynamic::post(const Task &task);

net_thread函数中会先调用ngs::Server::prepare函数将ngs::Server::on_accept函数作为回调函数绑定在Socket_events上,由于plugin_main函数中实例化Server对象时将一个Socket_acceptors_task放到了Server_task_vector中,所以net_thread函数中执行task->prepare(&context)会通过如下的流程实现回调函数的绑定。

Socket_acceptors_task::prepare
    |__Socket_acceptors_task::prepare_impl
        |__Listener_tcp::setup_listener
            |__Socket_events::listen

然后net_thread函数会调用ngs::Server::start函数将run_task函数下发给m_accept_scheduler去执行。

m_accept_scheduler->post([this, task]() { run_task(task); });

只要Server一直处于运行状态,run_task函数就会一直监听端口(不同于常规的MySQL协议,X protocol默认使用33060作为监听端口),当有连接请求到来,就会调用回调函数on_accept来处理请求。

run_task调用流程如下:

ngs::Server::run_task
    |__Socket_acceptors_task::loop
        |__Socket_events::loop
            |__event_base_loop
                |__ngs::Server::on_accept

on_accept函数会用Client_interface指针指向ngs::Client对象,并且将ngs::Client::run函数下发到m_worker_scheduler去执行。也就是每当有connect请求到来,就会让worker线程通过ngs::Client::run去处理。

run函数体如下所示,在on_accept中会设置当前状态为State::k_accepted,并且创建一个Session,后面通过一个while循环来实现批量处理request。通过read_one_message_and_dispatch函数来解析并且处理到来的request,如果当前没有request到来,那么会阻塞在ngs::Protocol_decoder::read_header函数上,否则可以获取到message_size和message_type,然后获取对应大小的payload,对这个payload做反序列化处理,在本地还原一个Message_request。

void Client::run(const bool skip_name_resolve) {
  try {
    on_client_addr(skip_name_resolve);
    on_accept();
    while (m_state != State::k_closing && m_session) {
      Error_code error = read_one_message_and_dispatch();
      // read could took some time, thus lets recheck the state
      if (m_state == State::k_closing) break;
      if (error) {
        // !message and !error = EOF
        m_encoder->send_result(Fatal(error));
        disconnect_and_trigger_close();
        break;
      }
    }
  } catch (std::exception &e) {
    log_error(ER_XPLUGIN_FORCE_STOP_CLIENT, client_id(), e.what());
  }
  {
    MUTEX_LOCK(lock, server().get_client_exit_mutex());
    m_state = State::k_closed;
    remove_client_from_server();
  }
}

由于Client_interface指针指向的是子类ngs::Client的对象,所以m_dispatcher->handle(&request)会真正执行ngs::Client::handle_message,然后通过Session来处理request。

在ngs::Session::handle_message中,会判断当前的connection有没有经过认证,如果没有,那么首先就要去做认证。

bool Session::handle_message(ngs::Message_request &command) {
  if (m_state == k_authenticating) {
    return handle_auth_message(command);
  } else if (m_state == k_ready) {
    // handle session commands
    return handle_ready_message(command);
  }
  // msg not handled
  return false;
}

这里以MYSQL41 Authentication为例,MySQL8.0的认证与官方介绍的认证过程几乎相同,不同的地方在于SESS_AUTHENTICATE_START Message中的auth_data没有携带用户名,用户名是在SESS_AUTHENTICATE_CONTINUE Message中与password一起发送的。

MySQL8.0代码中,首先会检查SESS_AUTHENTICATE_START Message中的mech_name是否是当前服务端支持的认证方式,如果支持就会返回一个随机数给客户端作为salt。如下所示的代码中,首先会把salt(data)赋值给AuthenticateContinue Message,然后把Message序列化成一个string,最后将以length|type|payload的格式发送给客户端。

void Protocol_encoder::send_auth_continue(const std::string &data) {
  std::string out_serialized_msg;
  Mysqlx::Session::AuthenticateContinue msg;
  msg.set_auth_data(data);
  msg.SerializeToString(&out_serialized_msg);
  DBUG_EXECUTE_IF("authentication_timeout", {
    int i = 0;
    int max_iterations = 1000;
    while ((*xpl::Server::get_instance())->server().is_running() &&
           i < max_iterations) {
      my_sleep(10000);
      ++i;
    }
  });
  m_xproto_encoder
      .encode_xmessage<Mysqlx::ServerMessages::SESS_AUTHENTICATE_CONTINUE>(
          out_serialized_msg);
  send_raw_buffer(Mysqlx::ServerMessages::SESS_AUTHENTICATE_CONTINUE);
}

客户端收到这个Message之后需要再发送一个SESS_AUTHENTICATE_CONTINUE Message,其中auth_data是由schema、username以及password和salt的运算结果三部分组成,服务端收到这个Message会验证username对应的password和salt的运算结果是否与客户端的一致,如果一致就将当前状态设置为State::k_ready。

自此,客户端和服务端的连接才建立成功。


参考资料

MySQL Document

WL#8639: X Protocol


欢迎访问阿里云云数据库 RDS MySQL 详情页了解更多信息:https://www.aliyun.com/product/rds/mysql

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
打赏
0
1
1
1
469
分享
相关文章
SQL 质量革命:利用 DAS 智能索引推荐修复慢查询全流程
在数据驱动时代,数据库性能直接影响系统稳定与响应速度。慢查询常因索引缺失、复杂逻辑或数据量过大引发,导致延迟、用户体验下降甚至业务受损。DAS(数据库管理服务)提供智能索引推荐功能,通过分析SQL语句与数据分布,自动生成高效索引方案,显著提升查询性能。本文结合实战案例,详解DAS智能索引推荐原理与使用流程,帮助用户快速定位问题并优化数据库表现,实现系统高效运行。
157 61
MySQL 常用函数
我们这次全面梳理 MySQL 中的常用函数,涵盖 聚合函数、字符串函数、日期时间函数、数学函数 和 控制流函数 等五大类。每类函数均配有语法说明与实用示例,帮助读者提升数据处理能力,如统计分析、文本处理、日期计算、条件判断等。文章结尾提供了丰富的实战练习,帮助读者巩固和应用函数技巧,是进阶 SQL 编程与数据分析的实用工具手册。
153 2
告别切屏|阿里云DMS MCP+通义灵码30分钟搞定电商秒杀开发
DMS MCP+通义灵码的梦幻组合,标志着研发流程从“工具堆砌”向“智能闭环”的跃迁。通过统一数据管理、自然语言交互与自动化代码生成,开发者可专注于业务创新,而无需被琐碎的数据库操作所束缚。
告别切屏|阿里云DMS MCP+通义灵码30分钟搞定电商秒杀开发
AI 时代的 MySQL 数据库运维解决方案
本方案将大模型与MySQL运维深度融合,构建智能诊断、SQL优化与知识更新的自动化系统。通过知识库建设、大模型调用策略、MCP Server开发及监控闭环设计,全面提升数据库运维效率与准确性,实现从人工经验到智能决策的跃迁。
272 26
PolarDB Supabase最佳实践-Web应用
PolarDB Supabase 是基于 PolarDB PostgreSQL 版的全托管服务,集成 Realtime 实时数据库、RESTful API、身份认证、文件存储等功能,提供高性能、灵活扩展的后端解决方案。用户可快速构建 Web 应用、SaaS 平台及 AI 集成应用,简化运维操作,提升开发效率。
解锁数仓内AI流水线,AnalyticDB Ray基于多模ETL+ML提效开发与运维
AnalyticDB Ray 是AnalyticDB MySQL 推出的全托管Ray服务,基于开源 Ray 的丰富生态,经过多模态处理、具身智能、搜索推荐、金融风控等场景的锤炼,对Ray内核和服务能力进行了全栈增强。
SQL玩转多模态AI,轻松搞定图片+文本混合搜索
本文介绍了一种通过原生SQL实现多模态智能检索的破局思路,基于PolarDB创新融合AI智能引擎,解决传统AI检索系统数据迁移冗余和工具链割裂的问题。方案优势包括低门槛AI集成、灵活适配多场景、全链路数据安全及按需付费免运维。文章详细描述了部署资源、应用配置及方案验证步骤,并提供清理资源指南以避免额外费用。适合希望快速构建智能搜索应用的开发者参考实践。
MySQL 5.6/5.7 DDL 失败残留文件清理指南
通过本文的指南,您可以更安全地处理 MySQL 5.6 和 5.7 版本中 DDL 失败后的残留文件,有效避免数据丢失和数据库不一致的问题。
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问