MyCat-架构剖析-MyCat 实现 MySQL 协议 | 学习笔记

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 快速学习 MyCat-架构剖析-MyCat 实现 MySQL 协议

开发者学堂课程【全面讲解开源数据库中间件MyCat使用及原理(三):MyCat-架构剖析-MyCat 实现 MySQL 协议】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/757/detail/13296


MyCat-架构剖析-MyCat 实现 MySQL 协议

MyCat 实现 MySQL 协议

1. 握手认证实现

1).握手包源码实现

在 MyCat 中同时实现了 NIO 和 AIO,通过配置可以选择 NIO 和 AIO。MyCatServer 在启动阶段已经选择好采用 NIO 还是 AIO,因此建

立 I/0 通道后,MyCat 服务端一直等待客户端端的连接,当有连接到来的时候,MyCat 首先发送握手包。

下面分析一下 Mycat 是如何通过 Java 代码来完成握手包的发送的。

要想解析这一问题,需要跟踪源码。

所有源码的入口都是 Mycatstartup。在 Mycatstartup 中调用了 MycatServer 的用法,所以在这要跟到 startup 当中来。

在这去读取的配置 UsingAIO 这个选项:

// startup manager

ManagerConnectionFactorymf=newManagerConnectionFactory();

ServerConnectionFactory sf=newServerConnectionFactory();

SocketAcceptor manager=null;

SocketAcceptor server=null;

aio = (system.getUsingAIC()==1);

下面就会去判定如果是 aio 怎么办,再往下看,else 就是 nio 走哪个分支。构建好了这两个 accepter 之后,一个是 manager 管理,一个是 server 具体提供服务的,调用 start 方法。然后点到 start 方法当中来:

选择 aio——>pendingaccpet——>accept

先对源码跟踪到对应的位置:

@Override

Public void completed(AsynchronousSocketchannel result, Long id) {

accept(result, id);

// next pending waiting

pendingAccept();

//调用 accept 方法:

FrontendConnection c = factory.make(channel);

c.setAccepted(true);

c.setid(id);

NIOprocessor processor=MycatServer.getInstance().nextProcessor();

c.setProcessor(processor);

c.register();

}catch (Exception e){

LOGGER.error("AioAcceptorError",e);

closeChannel(channel);

在 accept 方法当中,这段源码大家是见过的。首先构造了一个前端的一个链接FrontendConnection,接下来调用了 NIOprocessor 当中的一个方法叫做 register 注册。

MyCat 中的源码中 io.mycat.net.FrontendConnection 类的实现如下:

@Override

publia void register() throws l0Exception {

if(!isClosed.get()){

//生成认证数据

byte[] rand1=RandomUtil.randomBytes(size:8);

byte[] rand2=RandomUtil.randomBvtes(size:12);

这个注册方法用来发送握手包。FrontendConnection 的 register 方法来组装认证数据,然后保存认证数据。

//保存认证数据

byte[] seed =new byte[rand1.length+rand2.length];

System.arraycopy(rand1,srcPos:0,seed,destPos:0,rand1.length);

System.arraycopy(rand2,srcPos:0,seed,rand1.length,rand2.length);

this.seed=seed;

可以看到,它把握手包的数据组装到了 HandshakeV10 packet 这个包中。

public byte protocolVersion;

public byte[] serverversion; public long threadid;

public byte[] seed;// auth-plugin-data-part-1 public int serverCapabilities;

public byte serverCharsetIndex;

public int serverstatus;

public byte[] restofScrambleBuff; // auth-plugin-data-part-2

public byte[] authPluginName = DEFAULT AUTH PLUGIN NAME;

就是通过以上数据组装了握手包,这个握手包会记录当前协议的版本,服务的版本,现成的 ID。

//发送握手数据包booleanuseHandshakeV10=MycatServer.getInstance().qetConfig().qetSystem().getUseHandshakev10() == 1

if(useHandshakeV10){

Handshakev10Packet hs =new Handshakev10Packet();

hs.packetid=0;

hs.protocolVersion =VersionsPROTOCOL VERSION;

hs.serverVersion=VersionsSERVER VERSION;

hs.threadd=id;

hs.seed=rand1;

hs.serverCapabilities=getServerCapabilities();

hs.serverCharsetIndex=(byte)(charsetIndex&0xff);

hs.serverstatus=2;

hs.restofscrambleBuff=rand2;

hs.write(C:this);

} else

HandshakePacket hs = new HandshakePacket();

hs.packetid =0;

hs.protocolVersion = Versions.PROTOCOL VERSION;

hs.serverVersion = Versions.SERVER VERSION;

hs.threadId = id;

hs.seed = rand1;

hs.serverCapabilities = getServerCapabilities();

hs.serverCharsetIndex = (byte) (charsetIndex & 0xff);

hs.serverstatus =2;

hs.restofscrambleBuff = rand2;

hs.write(c:this)

以上这些信息都是抓取过的,接下来再看他这组装好了一个握手包之后,在下面这一块都是通过 java 代码组装的。组装好之后,用 hs.write 调用这个方法,实际上就是将握手包写给客户端了,所以在这一块客户端就可以拿到这个握手包当中的内容,在这里发送握手包,就是通过这一段 Java 代码来实现的。

这是第一阶段:握手包的源码实现是在 FrontendConnection 当中实现的。

接下来看第二阶段,服务端将握手包发送给客户端,客户端输入用户名和密码来进行认证登录。所以第二个部分就是认证包的源码实现。

2)认证包源码实现

客户端接收到握手包后,紧接看向服务端发起一个认证包,MyCat 封装为类AuthPacket:

//认证包的源码实现会涉及到认证包的封装类,叫做 AuthPacket。

public class AuthPacket extends MySQLPacket {

private static final byte[] FILLER = new byte[23];

public long clientflags;

public long maxPacketsize;

public int charsetindex;

public byte[] extra;// from FILLER(23)

public string user;

public byte[] password;

public string database;

客户端发送的认证包转由 FrontendAuthenticator 的 Handler 来处理,主要操作就是拆包,检查用户名,密码合法性,检查连接数是够超出限制。源码实现如下:

@Override

publia void handle(byte[] data){

//check quit packet;

if (data.length == QuitPacket.QUIT.length && data[4]== MySQLPacket.COM_QUIT)(

source.close(reason:"quit packet");

return;

下面找一下 AuthPacket 的源码:

public long clientflags;

public long maxPacketsize;

public int charsetindex;

public byte[] extra;// from FILLER(23)

public string user;

public byte[] password;

public string database;

在 AuthPacket 当中会记录用户名是什么,密码是什么,以及数据库和其他的一些标识等等。所有的认证包最终都会走动到 AuthPacket 当中来,以上数据最终是通过 Java 代码如何组装的呢?

在发送了初始化的握手数据包之,接下来下面还有一句话叫做:

// asynread response

this.asynread();

那么 asynread 这个方法如何操作呢?往下看:

@Override

public void asynRead() throws IOException {

ByteBuffer theBuffer=con.readBuffer;

if (theBuffer == null) {theBuffer=con.processor.getBufferPool().allocate(conprocessor.getBufferPool().getChunksize());

con.readBuffer = theBuffer;}

int got = channel.read(theBuffer);

// 读取这个缓冲区当中的信息

con.onReadData(got);

//读取缓冲区当中的信息之后,又调用了 con 里面的 readdate 这个方法, readdate 这个方法又有什么作用呢?

//循环处理字节信息

int offset = readBufferOffset, length =0,position =readBuffer.position();

for (;;) { I

length=getPacketLength(readBuffer,offset);

if (length==-1) {

if (offset!= 0) {

this.readBuffer =compactReadBuffer(readBuffer,offset);} else if (readBuffer!= null &&!readBufferhasRemaining()){

throw new RuntimeException( "invalid readbuffer capacity too little buffer size "

+readBuffer.capacity());

}

break;

}

if (position >= offset + length && readBuffer!=null) {

readdate 循环处理字节信息,最终组装出 base。base 当中所接收到的就是客户端传递过来的用户名密码等认证信息。

接下来要进行处理,调用 handler 方法,这就是拿到的那个认证包。

点击进入 handler:

image.png

public void handle (byte[] data) {

// check quit packet

if (data.length == QuitPacket.QUIT.length && data[4] == MySQLPacket.COM QUIT) {

source.close( reason: "quit packet");

return;

}

AuthPacket auth=newAuthPacket();

auth.read(data);

//huangyiming add

int nopasswordLogin = MycatServer.getInstance().getConfig().getsystem().getNonePasswordLogin();

//如果无密码登陆则跳过密码验证这个步骤

boolean skipPassWord = false; String defaultUser = "";

if(nopassWordLogin == 1){

skipPassWord = true;

Map<string,UserConfig> userMaps = MycatServer.getInstance().getConfig().getUsers(); if(!userMaps.isEmpty()){

setDefaultAccount (auth, userMaps);

这一步是判断用户名密码的有效性。

//在这校验用户名和密码失败的时候,它会调用 failure 写入这个 ErrPacket 客户端。

如果有其中任何一个地方出错,不管用户名还是密码错误,都要直接调用方法failure,调取 WriteErrMessage 。WriteErrMessage 实际上就是去写这个错误的消息,就是 Errpacket。Errpacket 就和上面的:

是一致的。

public void writeErrMessage(byte id, int errno,String msg){

ErrorPacket err=newErrorPacket();

err.packetid=id;

err.errno=errno;

err.message=encodeString(msg,charset);

err.write( C: this);

Errpacket 当中的数据直接写入到客户端。

image.png

如果上面这些校验都通过了就会调用 success,success 实际上它所做的就是重点就是:

ByteBuffer buffer=source.allocate();

source.write (source.writeToBuffer(AUTH OK, buffer));

//AUTH OK 相当于 OK Packet。

如果服务端校验用户名密码通过,就会给客户端返回一个 OK 的标识,代表认证成功。

2. 命令认证实现

命令执行阶段就是 SOL 命令和 SOL 语句执行阶段,在该阶段 Mvcat 主要需要做的事情,就是对客户端发来的数据包进行拆包,并判断命令的类型,并解析 SQL 语句,执行响应的 SQL 语句,最后把执行结果封装在结果集包中,返回给客户端。

从客户端发来的命令交给 FrontendCommandHandler 中的 handle 方法处理:

@Override

public void handle(byte[] data)

{if(source.getLoadDataInfileHandler()!=null&&source.getLoadDataInfileHandler().isStartLoadData())

{

MySQLMessage mm=newMyQLMesage(data);

Int packetLength=mm.readUB3(); if(packetLength+4==data.length)

{

source.loadDataInfileData(data);

}

return;

}

点击 nio handler 方法。如果是执行命令的数据会调用:

image.png

Frontendcommandhandler 这个类的 handler 方法要进行数据的处理,那么在这一块要去判定:

switch (data[4])

{

case MySLPacket.COM INIT DB:

commands.doInitDB();

source.initDB(data);

break;

case MySQLPacket.COM QUERY:

commands.doquery();

source.query(data);

break;

case MySQLPacket.COM PING:

commands.doPing(); source.ping(); break;

case MySQLPacket.COM QUIT:

commands.doQuit();

执行的操作是数据化的操作?还是查询的操作?还是退出的操作等等不同的操作类型,在这一块它有不同的处理。假如执行查询操作,那么它会调用 command 的doquery 方法。

那么点到这个里面你会发现它就做了一个计数的一个操作,主要来看下面这一个方法:Frontconnection 的 query方法。

//取得语句

String sql =null;

try {

MySQLMessagemm=newMySQLMessage(data); mm.position(i:5);

sgh = mm.readstring(charset);

} catch (UnsupportedEncodingException e){

writeErrMessage(ErrorCode.ER UNKNOWN CHARACTER SET, msg: "Unknown charset '" + charset +"");

return;

}

this.query(sq1 );

点击 query 方法,先拿到客户端传递过来的 SQL 语句,读取到这个 SQL 语句,拿到 SQL 语句之后,调用 query 方法执行 SQL 语句。在执行在口语句的时候首先要对 sql 语句进行见状性的判定,再去优化 SQL 语句。SQL 语句末文的分号在这个位置是需要去除掉的,还要记录这个 SQL 语句,再去判定 SQL 语句当中的防火墙的设置。还要校验 dml 权限:

// DML 权限检查

try {

boolean isPassed = privileges.checkDmlPrivilege(user, schema, sql);

if( !isPassed ) {

writeErrMessage(ErrorCode.ERR_WRONG_USED,

msg:"The statement DML privilege check is not passed, reject for user i" + user + "‘");

return;

}

}catch( com.alibaba.druid.sgl.parser.ParserException e1) {

writeErrMessage(ErrorCode.ERR_WRONG_USED,e1.getMessage());

LOGGER.error("parse exception", e1 );

Return;

可以在 Mycat 的配置文件当中可以配置 dml 的权限信息:

<!-- 表级 DML 权限设置

<privileges check="true">

<schema name="ITCAST" dml="1111" >

<table name="TB TEST" dm1="1110"></table>

</schema>

</privileges>

--></user>

1111 什么含义?1110 什么含义?针对于某一个逻辑库当中的某一个逻辑表,它是否具有对应的权限。这些权限配置了,实际上在这一块就要来进行校验。

如果校验权限校验未通过,直接写出对应的错误信息。如果权限校验通过,接下来就要执行 SQL 语句,调用 queryhandler 这个方法。

在 query 方法当中拿到 SQL 语句,接下来对 SQL 语句进行解析,解析之后下来要开始进行执行。用 handler 的方法进行执行,handler 方法把 circle 语句执行之后,例如点击 response,执行 SQL 语句之后:

public static void response(ServerConnection c) {

ByteBuffer buffer=c.allocate();

"buffer=headerwrite(buffer,cwriteSocketifFull: true);

for (Fieldpacket field :fields){

buffer=fieldwrite(buffercwriteSocketlfFulltrue);

}

buffer=eof.write(buffer,c,writeSocketlfFulltrue);

byte packetId=eof.packetid;

RowDataPacket row=new RowDataPacket(FIELD COUNT);

row.add(stringUtil.encode(c.getschema(),c.getcharset()));

row.packetid=++packetId;

buffer=row.write(buffer,cwriteSocketifFull:true);

EOFPacket lastEof=newEOFPacket();

lastEof.packetId=++packetid;

buffer =lastEof.write(buffer,cwriteSocketlfFull:true);

c.write(buffer);

在这里组装返回的数据包,fieldpacket 首先会写出这么一段数据,然后写出fieldpacket。

再来抓取一下,先执行 databases,这是响应回来的数据:

image.png

这个是 192.157 响应给 192.1 的这个客户端的。

这里又有很多的 packet:

每一个 packet 它实际上它就是一行数据,比如 Information Schema,对应相应的数据:

每一行数据都会写入相应的数据包,写入每一个域对应的数据包,往下再去写入其他的数据包,一次性将数据包写出来,这个数据包实际上就是给客户端返回给客户端返回对应的数据。

以上是简单跟踪了一下在命令执行阶段在 Mycat 当中是如何实现 MySQL 协议的。

再去跟踪源码的时候可以重点跟踪笔记中的源码。具体细化的源码等熟悉了源码之后再去选择,因为这个源码相对来说是比较深的,建议大家如果去跟踪源码可以和MySQL 协议的数据包对应起来,这样更加便于大家理解。

在这一小节所讲解的 MySQL 协议在 Mycat 当中是如何实现的,主要是给大家通过跟踪源码的形式,简单跟踪了两个阶段。一个是握手认证阶段,一个是命令执行阶段,这两个阶段都是通过 Java 或者模拟 MySQL 协议完成跟踪的。

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
2月前
|
存储 SQL 关系型数据库
Mysql学习笔记(二):数据库命令行代码总结
这篇文章是关于MySQL数据库命令行操作的总结,包括登录、退出、查看时间与版本、数据库和数据表的基本操作(如创建、删除、查看)、数据的增删改查等。它还涉及了如何通过SQL语句进行条件查询、模糊查询、范围查询和限制查询,以及如何进行表结构的修改。这些内容对于初学者来说非常实用,是学习MySQL数据库管理的基础。
139 6
|
2月前
|
SQL 关系型数据库 MySQL
Mysql学习笔记(三):fetchone(), fetchmany(), fetchall()详细总结
MySQL中用于数据检索的`fetchone()`, `fetchmany()`, `fetchall()`函数的功能、SQL语句示例和应用场景。
85 3
Mysql学习笔记(三):fetchone(), fetchmany(), fetchall()详细总结
|
2月前
|
SQL Ubuntu 关系型数据库
Mysql学习笔记(一):数据库详细介绍以及Navicat简单使用
本文为MySQL学习笔记,介绍了数据库的基本概念,包括行、列、主键等,并解释了C/S和B/S架构以及SQL语言的分类。接着,指导如何在Windows和Ubuntu系统上安装MySQL,并提供了启动、停止和重启服务的命令。文章还涵盖了Navicat的使用,包括安装、登录和新建表格等步骤。最后,介绍了MySQL中的数据类型和字段约束,如主键、外键、非空和唯一等。
78 3
Mysql学习笔记(一):数据库详细介绍以及Navicat简单使用
|
2月前
|
关系型数据库 MySQL 数据库
Mysql学习笔记(四):Python与Mysql交互--实现增删改查
如何使用Python与MySQL数据库进行交互,实现增删改查等基本操作的教程。
73 1
用户态协议栈05—架构优化
用户态协议栈05—架构优化
|
4月前
|
SQL 关系型数据库 MySQL
MySQL学习笔记
这篇文章是一份关于MySQL数据库操作的学习笔记,涵盖了数据库的终端操作、数据类型、建表约束、事务处理以及SQL的连接查询等基础知识点。
|
4月前
|
安全 网络安全 数据安全/隐私保护
|
20天前
|
弹性计算 API 持续交付
后端服务架构的微服务化转型
本文旨在探讨后端服务从单体架构向微服务架构转型的过程,分析微服务架构的优势和面临的挑战。文章首先介绍单体架构的局限性,然后详细阐述微服务架构的核心概念及其在现代软件开发中的应用。通过对比两种架构,指出微服务化转型的必要性和实施策略。最后,讨论了微服务架构实施过程中可能遇到的问题及解决方案。
|
1月前
|
Cloud Native Devops 云计算
云计算的未来:云原生架构与微服务的革命####
【10月更文挑战第21天】 随着企业数字化转型的加速,云原生技术正迅速成为IT行业的新宠。本文深入探讨了云原生架构的核心理念、关键技术如容器化和微服务的优势,以及如何通过这些技术实现高效、灵活且可扩展的现代应用开发。我们将揭示云原生如何重塑软件开发流程,提升业务敏捷性,并探索其对企业IT架构的深远影响。 ####
43 3
|
1月前
|
Cloud Native 安全 数据安全/隐私保护
云原生架构下的微服务治理与挑战####
随着云计算技术的飞速发展,云原生架构以其高效、灵活、可扩展的特性成为现代企业IT架构的首选。本文聚焦于云原生环境下的微服务治理问题,探讨其在促进业务敏捷性的同时所面临的挑战及应对策略。通过分析微服务拆分、服务间通信、故障隔离与恢复等关键环节,本文旨在为读者提供一个关于如何在云原生环境中有效实施微服务治理的全面视角,助力企业在数字化转型的道路上稳健前行。 ####