开发者学堂课程【全面讲解开源数据库中间件MyCat使用及原理(三):MyCat-架构剖析-MyCat 实现 MySQL 协议】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/757/detail/13296
MyCat-架构剖析-MyCat 实现 MySQL 协议
MyCat 实现 MySQL 协议
1. 握手认证实现
1).握手包源码实现
在 MyCat 中同时实现了 NIO 和 AIO,通过配置可以选择 NIO 和 AIO。MyCatServer 在启动阶段已经选择好采用 NIO 还是 AIO,因此建
立 I/0 通道后,MyCat 服务端一直等待客户端端的连接,当有连接到来的时候,MyCat 首先发送握手包。
下面分析一下 Mycat 是如何通过 Java 代码来完成握手包的发送的。
要想解析这一问题,需要跟踪源码。
所有源码的入口都是 Mycatstartup。在 Mycatstartup 中调用了 MycatServer 的用法,所以在这要跟到 startup 当中来。
在这去读取的配置 UsingAIO 这个选项:
// startup manager
ManagerConnectionFactorymf=newManagerConnectionFactory();
ServerConnectionFactory sf=newServerConnectionFactory();
SocketAcceptor manager=null;
SocketAcceptor server=null;
aio = (system.getUsingAIC()==1);
下面就会去判定如果是 aio 怎么办,再往下看,else 就是 nio 走哪个分支。构建好了这两个 accepter 之后,一个是 manager 管理,一个是 server 具体提供服务的,调用 start 方法。然后点到 start 方法当中来:
选择 aio——>pendingaccpet——>accept
先对源码跟踪到对应的位置:
@Override
Public void completed(AsynchronousSocketchannel result, Long id) {
accept(result, id);
// next pending waiting
pendingAccept();
//调用 accept 方法:
FrontendConnection c = factory.make(channel);
c.setAccepted(true);
c.setid(id);
NIOprocessor processor=MycatServer.getInstance().nextProcessor();
c.setProcessor(processor);
c.register();
}catch (Exception e){
LOGGER.error("AioAcceptorError",e);
closeChannel(channel);
在 accept 方法当中,这段源码大家是见过的。首先构造了一个前端的一个链接FrontendConnection,接下来调用了 NIOprocessor 当中的一个方法叫做 register 注册。
MyCat 中的源码中 io.mycat.net.FrontendConnection
类的实现如下:
@Override
publia void register() throws l0Exception {
if(!isClosed.get()){
//生成认证数据
byte[] rand1=RandomUtil.randomBytes(size:8);
byte[] rand2=RandomUtil.randomBvtes(size:12);
这个注册方法用来发送握手包。FrontendConnection 的 register 方法来组装认证数据,然后保存认证数据。
//保存认证数据
byte[] seed =new byte[rand1.length+rand2.length];
System.arraycopy(rand1,srcPos:0,seed,destPos:0,rand1.length);
System.arraycopy(rand2,srcPos:0,seed,rand1.length,rand2.length);
this.seed=seed;
可以看到,它把握手包的数据组装到了 HandshakeV10 packet 这个包中。
public byte protocolVersion;
public byte[] serverversion; public long threadid;
public byte[] seed;// auth-plugin-data-part-1 public int serverCapabilities;
public byte serverCharsetIndex;
public int serverstatus;
public byte[] restofScrambleBuff; // auth-plugin-data-part-2
public byte[] authPluginName = DEFAULT AUTH PLUGIN NAME;
就是通过以上数据组装了握手包,这个握手包会记录当前协议的版本,服务的版本,现成的 ID。
//发送握手数据包booleanuseHandshakeV10=MycatServer.getInstance().qetConfig().qetSystem().getUseHandshakev10() == 1
if(useHandshakeV10){
Handshakev10Packet hs =new Handshakev10Packet();
hs.packetid=0;
hs.protocolVersion =VersionsPROTOCOL VERSION;
hs.serverVersion=VersionsSERVER VERSION;
hs.threadd=id;
hs.seed=rand1;
hs.serverCapabilities=getServerCapabilities();
hs.serverCharsetIndex=(byte)(charsetIndex&0xff);
hs.serverstatus=2;
hs.restofscrambleBuff=rand2;
hs.write(C:this);
} else
HandshakePacket hs = new HandshakePacket();
hs.packetid =0;
hs.protocolVersion = Versions.PROTOCOL VERSION;
hs.serverVersion = Versions.SERVER VERSION;
hs.threadId = id;
hs.seed = rand1;
hs.serverCapabilities = getServerCapabilities();
hs.serverCharsetIndex = (byte) (charsetIndex & 0xff);
hs.serverstatus =2;
hs.restofscrambleBuff = rand2;
hs.write(c:this)
以上这些信息都是抓取过的,接下来再看他这组装好了一个握手包之后,在下面这一块都是通过 java 代码组装的。组装好之后,用 hs.write 调用这个方法,实际上就是将握手包写给客户端了,所以在这一块客户端就可以拿到这个握手包当中的内容,在这里发送握手包,就是通过这一段 Java 代码来实现的。
这是第一阶段:握手包的源码实现是在 FrontendConnection 当中实现的。
接下来看第二阶段,服务端将握手包发送给客户端,客户端输入用户名和密码来进行认证登录。所以第二个部分就是认证包的源码实现。
2)认证包源码实现
客户端接收到握手包后,紧接看向服务端发起一个认证包,MyCat 封装为类AuthPacket:
//认证包的源码实现会涉及到认证包的封装类,叫做 AuthPacket。
public class AuthPacket extends MySQLPacket {
private static final byte[] FILLER = new byte[23];
public long clientflags;
public long maxPacketsize;
public int charsetindex;
public byte[] extra;// from FILLER(23)
public string user;
public byte[] password;
public string database;
客户端发送的认证包转由 FrontendAuthenticator 的 Handler 来处理,主要操作就是拆包,检查用户名,密码合法性,检查连接数是够超出限制。源码实现如下:
@Override
publia void handle(byte[] data){
//check quit packet;
if (data.length == QuitPacket.QUIT.length && data[4]== MySQLPacket.COM_QUIT)(
source.close(reason:"quit packet");
return;
下面找一下 AuthPacket 的源码:
public long clientflags;
public long maxPacketsize;
public int charsetindex;
public byte[] extra;// from FILLER(23)
public string user;
public byte[] password;
public string database;
在 AuthPacket 当中会记录用户名是什么,密码是什么,以及数据库和其他的一些标识等等。所有的认证包最终都会走动到 AuthPacket 当中来,以上数据最终是通过 Java 代码如何组装的呢?
在发送了初始化的握手数据包之,接下来下面还有一句话叫做:
// asynread response
this.asynread();
那么 asynread 这个方法如何操作呢?往下看:
@Override
public void asynRead() throws IOException {
ByteBuffer theBuffer=con.readBuffer;
if (theBuffer == null) {
theBuffer=con.processor.getBufferPool().allocate(conprocessor.getBufferPool().getChunksize());
con.readBuffer = theBuffer;}
int got = channel.read(theBuffer);
// 读取这个缓冲区当中的信息
con.onReadData(got);
//读取缓冲区当中的信息之后,又调用了 con 里面的 readdate 这个方法, readdate 这个方法又有什么作用呢?
//循环处理字节信息
int offset = readBufferOffset, length =0,position =readBuffer.position();
for (;;) { I
length=getPacketLength(readBuffer,offset);
if (length==-1) {
if (offset!= 0) {
this.readBuffer =compactReadBuffer(readBuffer,offset);} else if (readBuffer!= null &&!readBufferhasRemaining()){
throw new RuntimeException( "invalid readbuffer capacity too little buffer size "
+readBuffer.capacity());
}
break;
}
if (position >= offset + length && readBuffer!=null) {
readdate 循环处理字节信息,最终组装出 base。base 当中所接收到的就是客户端传递过来的用户名密码等认证信息。
接下来要进行处理,调用 handler 方法,这就是拿到的那个认证包。
点击进入 handler:
public void handle (byte[] data) {
// check quit packet
if (data.length == QuitPacket.QUIT.length && data[4] == MySQLPacket.COM QUIT) {
source.close( reason: "quit packet");
return;
}
AuthPacket auth=newAuthPacket();
auth.read(data);
//huangyiming add
int nopasswordLogin = MycatServer.getInstance().getConfig().getsystem().getNonePasswordLogin();
//如果无密码登陆则跳过密码验证这个步骤
boolean skipPassWord = false; String defaultUser = "";
if(nopassWordLogin == 1){
skipPassWord = true;
Map<string,UserConfig> userMaps = MycatServer.getInstance().getConfig().getUsers(); if(!userMaps.isEmpty()){
setDefaultAccount (auth, userMaps);
这一步是判断用户名密码的有效性。
//在这校验用户名和密码失败的时候,它会调用 failure 写入这个 ErrPacket 客户端。
如果有其中任何一个地方出错,不管用户名还是密码错误,都要直接调用方法failure,调取 WriteErrMessage 。WriteErrMessage 实际上就是去写这个错误的消息,就是 Errpacket。Errpacket 就和上面的:
是一致的。
public void writeErrMessage(byte id, int errno,String msg){
ErrorPacket err=newErrorPacket();
err.packetid=id;
err.errno=errno;
err.message=encodeString(msg,charset);
err.write( C: this);
Errpacket 当中的数据直接写入到客户端。
如果上面这些校验都通过了就会调用 success,success 实际上它所做的就是重点就是:
ByteBuffer buffer=source.allocate();
source.write (source.writeToBuffer(AUTH OK, buffer));
//AUTH OK 相当于 OK Packet。
如果服务端校验用户名密码通过,就会给客户端返回一个 OK 的标识,代表认证成功。
2. 命令认证实现
命令执行阶段就是 SOL 命令和 SOL 语句执行阶段,在该阶段 Mvcat 主要需要做的事情,就是对客户端发来的数据包进行拆包,并判断命令的类型,并解析 SQL 语句,执行响应的 SQL 语句,最后把执行结果封装在结果集包中,返回给客户端。
从客户端发来的命令交给 FrontendCommandHandler 中的 handle 方法处理:
@Override
public void handle(byte[] data)
{
if(source.getLoadDataInfileHandler()!=null&&source.getLoadDataInfileHandler().isStartLoadData())
{
MySQLMessage mm=newMyQLMesage(data);
Int packetLength=mm.readUB3(); if(packetLength+4==data.length)
{
source.loadDataInfileData(data);
}
return;
}
点击 nio handler 方法。如果是执行命令的数据会调用:
Frontendcommandhandler 这个类的 handler 方法要进行数据的处理,那么在这一块要去判定:
switch (data[4])
{
case MySLPacket.COM INIT DB:
commands.doInitDB();
source.initDB(data);
break;
case MySQLPacket.COM QUERY:
commands.doquery();
source.query(data);
break;
case MySQLPacket.COM PING:
commands.doPing(); source.ping(); break;
case MySQLPacket.COM QUIT:
commands.doQuit();
执行的操作是数据化的操作?还是查询的操作?还是退出的操作等等不同的操作类型,在这一块它有不同的处理。假如执行查询操作,那么它会调用 command 的doquery 方法。
那么点到这个里面你会发现它就做了一个计数的一个操作,主要来看下面这一个方法:Frontconnection 的 query方法。
//取得语句
String sql =null;
try {
MySQLMessagemm=newMySQLMessage(data); mm.position(i:5);
sgh = mm.readstring(charset);
} catch (UnsupportedEncodingException e){
writeErrMessage(ErrorCode.ER UNKNOWN CHARACTER SET, msg: "Unknown charset '" + charset +"");
return;
}
this.query(sq1 );
点击 query 方法,先拿到客户端传递过来的 SQL 语句,读取到这个 SQL 语句,拿到 SQL 语句之后,调用 query 方法执行 SQL 语句。在执行在口语句的时候首先要对 sql 语句进行见状性的判定,再去优化 SQL 语句。SQL 语句末文的分号在这个位置是需要去除掉的,还要记录这个 SQL 语句,再去判定 SQL 语句当中的防火墙的设置。还要校验 dml 权限:
// DML 权限检查
try {
boolean isPassed = privileges.checkDmlPrivilege(user, schema, sql);
if( !isPassed ) {
writeErrMessage(ErrorCode.ERR_WRONG_USED,
msg:"The statement DML privilege check is not passed, reject for user i" + user + "‘");
return;
}
}catch( com.alibaba.druid.sgl.parser.ParserException e1) {
writeErrMessage(ErrorCode.ERR_WRONG_USED,e1.getMessage());
LOGGER.error("parse exception", e1 );
Return;
可以在 Mycat 的配置文件当中可以配置 dml 的权限信息:
<!-- 表级 DML 权限设置
<privileges check="true">
<schema name="ITCAST" dml="1111" >
<table name="TB TEST" dm1="1110"></table>
</schema>
</privileges>
--></user>
1111 什么含义?1110 什么含义?针对于某一个逻辑库当中的某一个逻辑表,它是否具有对应的权限。这些权限配置了,实际上在这一块就要来进行校验。
如果校验权限校验未通过,直接写出对应的错误信息。如果权限校验通过,接下来就要执行 SQL 语句,调用 queryhandler 这个方法。
在 query 方法当中拿到 SQL 语句,接下来对 SQL 语句进行解析,解析之后下来要开始进行执行。用 handler 的方法进行执行,handler 方法把 circle 语句执行之后,例如点击 response,执行 SQL 语句之后:
public static void response(ServerConnection c) {
ByteBuffer buffer=c.allocate();
"buffer=headerwrite(buffer,cwriteSocketifFull: true);
for (Fieldpacket field :fields){
buffer=fieldwrite(buffercwriteSocketlfFulltrue);
}
buffer=eof.write(buffer,c,writeSocketlfFulltrue);
byte packetId=eof.packetid;
RowDataPacket row=new RowDataPacket(FIELD COUNT);
row.add(stringUtil.encode(c.getschema(),c.getcharset()));
row.packetid=++packetId;
buffer=row.write(buffer,cwriteSocketifFull:true);
EOFPacket lastEof=newEOFPacket();
lastEof.packetId=++packetid;
buffer =lastEof.write(buffer,cwriteSocketlfFull:true);
c.write(buffer);
在这里组装返回的数据包,fieldpacket 首先会写出这么一段数据,然后写出fieldpacket。
再来抓取一下,先执行 databases,这是响应回来的数据:
这个是 192.157 响应给 192.1 的这个客户端的。
这里又有很多的 packet:
每一个 packet 它实际上它就是一行数据,比如 Information Schema,对应相应的数据:
每一行数据都会写入相应的数据包,写入每一个域对应的数据包,往下再去写入其他的数据包,一次性将数据包写出来,这个数据包实际上就是给客户端返回给客户端返回对应的数据。
以上是简单跟踪了一下在命令执行阶段在 Mycat 当中是如何实现 MySQL 协议的。
再去跟踪源码的时候可以重点跟踪笔记中的源码。具体细化的源码等熟悉了源码之后再去选择,因为这个源码相对来说是比较深的,建议大家如果去跟踪源码可以和MySQL 协议的数据包对应起来,这样更加便于大家理解。
在这一小节所讲解的 MySQL 协议在 Mycat 当中是如何实现的,主要是给大家通过跟踪源码的形式,简单跟踪了两个阶段。一个是握手认证阶段,一个是命令执行阶段,这两个阶段都是通过 Java 或者模拟 MySQL 协议完成跟踪的。