基于zbus的MySQL透明代理(<100行)

本文涉及的产品
RDS AI 助手,专业版
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
简介: zbus.NET网络通讯模块(二),以实现MySQL服务器透明代理讲述zbus.NET网络通讯模块的小巧与精干的编程能力。

我们上次讲到zbus网络通讯的核心API:

Dispatcher -- 负责-NIO网络事件Selector引擎的管理,对Selector引擎负载均衡

IoAdaptor -- 网络事件的处理,服务器与客户端共用,负责读写,消息分包组包等

Session -- 代表网络链接,可以读写消息

实际的应用,我们几乎只需要做IoAdaptor的个性化实现就能完成高效的网络通讯服务,今天我们将举例说明如何个性化这个IoAdaptor。

我们今天要完成的目标是:实现MySQL服务器的透明代理。效果是,你访问代理服务器跟访问目标MySQL无差异。

我们在测试环境10.17.2.30:3306 这台机器上提供了MySql,在我们本地机器上跑起来我们今天基于zbus.NET实现的一个代理程序,就能达到下面的效果。

image
image

完成大概不到100 行的代码, Cool?Let’s roll!

首先,我们思考透明TCP代理到底在干啥,透明的TCP代理的业务逻辑其实非常简单,可以描述为,将来自代理上游(发起请求到代理)的数据转发到目标TCP服务器,把目标服务器回来的数据原路返回代理上游客户端。 注意这个原路,如何做到原路返回成为关键点。这个示例其实跟MySQL没有任何关系,原则上任何TCP层面的服务都应该适配。

基于zbus.NET怎么来将上面的逻辑在体现出来,也就是如何个性化IoAdaptor?直观的讲,我们要处理的几个事件应该包括:1)从上游客户端发起的链接请求--代理服务器的Accept事件,2)代理服务器连接目标服务器的Connect事件,3)上下游的数据事件onMessage。

zbus.NET的IoAdaptor提供的个性化事件如下

image

基本包括一个链接(客户端或者服务端)的生命周期,与消息的编解码。

我们的代理IoAdaptor就是逐一个性化处理。

第一步,编解码: 透明代理对消息内容不做理解,所以不需要编解码。

// 透传不需要编解码,简单返回ByteBuffer数据
    public IoBuffer encode(Object msg) {
        if (msg instanceof IoBuffer) {
            IoBuffer buff = (IoBuffer) msg;
            return buff;
        } else {
            throw new RuntimeException("Message Not Support");
        }
    }

    // 透传不需要编解码,简单返回ByteBuffer数据
    public Object decode(IoBuffer buff) {
        if (buff.remaining() > 0) {
            byte[] data = new byte[buff.remaining()];
            buff.readBytes(data);
            return IoBuffer.wrap(data);
        } else {
            return null;
        }
    }

第二步,代理服务接入:

@Override
    protected void onSessionAccepted(Session sess) throws IOException {
        Session target = null;
        Dispatcher dispatcher = sess.getDispatcher();
        try {
            target = dispatcher.createClientSession(targetAddress, this);
        } catch (Exception e) {
            sess.asyncClose();
            return;
        }
        sess.chain = target;
        target.chain = sess;
        dispatcher.registerSession(SelectionKey.OP_CONNECT, target);
    }

这里的逻辑思路是,代理服务器每接受到一个请求--通过onSessionAccepted表达,我们将同时创建一个到目标服务器的链接,今天的例子是目标MySQL服务器,注意上面的处理中把创建目标服务器Session过程与真正链接到目标服务分开(Dispatcher也提供合并二者的工具方法),是为了能在没有发生链接之前绑定上好上下游关系,通过Session的chain变量来表达,也就是当前Session的关联Session,关联好之后启动感兴趣Connect事件,逻辑处理完毕。

第三步,链接成功事件(第二步中需要链接到目标服务器)

@Override
    public void onSessionConnected(Session sess) throws IOException {  
        Session chain = sess.chain;
        if(chain == null){ 
            sess.asyncClose();
            return; 
        }   
        if(sess.isActive() && chain.isActive()){ 
            sess.register(SelectionKey.OP_READ);
            chain.register(SelectionKey.OP_READ);
        }
    }

这里的一个核心是当上下游都处于链接正常态,上下游Session都启动感兴趣消息读事件(写事件是在读取处理中自动触发),为什么在这里做的原因是一定要等上下游都正常态后才启动双方消息处理,不然会出现字节丢失。

第四步,处理上下游数据事件

@Override
    protected void onMessage(Object msg, Session sess) throws IOException {  
        Session chain = sess.chain;
        if(chain == null){
            sess.asyncClose(); 
            return;
        } 
        chain.write(msg); 
    }

是不是非常简单,类似pipeline,从一端的数据写到另外一端。

原则上面4步结束,整个透明代理就完成了,但是为了处理链接异常清理,我们增加了Session清理处理,如下

@Override
    public void onSessionToDestroy(Session sess) throws IOException {   
        try {
            sess.close();
        } catch (IOException e) { //ignore
        } 
        if (sess.chain == null) return; 
        try {    
            sess.chain.close();    
            sess.chain.chain = null;
            sess.chain = null;
        } catch (IOException e) { 
        }
    }

工作就是解决上下游链接清理链接。

至此为止我们的IoAdaptor个性化就完成了,是不是非常简单,现在我们要跑起来测试了,下面的代码就是上一次讲到重复的设置,没有新意。

public static void main(String[] args) throws Exception {   
        Dispatcher dispatcher = new Dispatcher(); 
        IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306"); 
        final Server server = new Server(dispatcher, ioAdaptor, 3306); 
        server.start();
    }

骚年,包括渣渣import和少许注释加起来折腾了不到100行,该跑一跑了,还是那句话,不是HelloWorld,你可以规模压力测。看看你是否在本地代理出来了你的目标服务MySQL,gl,hf, gogogo.

完整代码可运行代码如下,也可直接到zbus示例代码库中找到

package org.zbus.net;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import org.zbus.net.core.Dispatcher;
import org.zbus.net.core.IoAdaptor;
import org.zbus.net.core.IoBuffer;
import org.zbus.net.core.Session;  
public class TcpProxyAdaptor extends IoAdaptor {
    private String targetAddress;

    public TcpProxyAdaptor(String targetAddress) {
        this.targetAddress = targetAddress;
    }

    // 透传不需要编解码,简单返回ByteBuffer数据
    public IoBuffer encode(Object msg) {
        if (msg instanceof IoBuffer) {
            IoBuffer buff = (IoBuffer) msg;
            return buff;
        } else {
            throw new RuntimeException("Message Not Support");
        }
    }

    // 透传不需要编解码,简单返回ByteBuffer数据
    public Object decode(IoBuffer buff) {
        if (buff.remaining() > 0) {
            byte[] data = new byte[buff.remaining()];
            buff.readBytes(data);
            return IoBuffer.wrap(data);
        } else {
            return null;
        }
    }

    @Override
    protected void onSessionAccepted(Session sess) throws IOException {
        Session target = null;
        Dispatcher dispatcher = sess.getDispatcher();
        try {
            target = dispatcher.createClientSession(targetAddress, this);
        } catch (Exception e) {
            sess.asyncClose();
            return;
        }
        sess.chain = target;
        target.chain = sess;
        dispatcher.registerSession(SelectionKey.OP_CONNECT, target);
    }

    @Override
    public void onSessionConnected(Session sess) throws IOException {  
        Session chain = sess.chain;
        if(chain == null){ 
            sess.asyncClose();
            return; 
        }   
        if(sess.isActive() && chain.isActive()){ 
            sess.register(SelectionKey.OP_READ);
            chain.register(SelectionKey.OP_READ);
        }
    }

    @Override
    protected void onMessage(Object msg, Session sess) throws IOException {  
        Session chain = sess.chain;
        if(chain == null){
            sess.asyncClose(); 
            return;
        } 
        chain.write(msg); 
    }

    @Override
    public void onSessionToDestroy(Session sess) throws IOException {   
        try {
            sess.close();
        } catch (IOException e) { //ignore
        } 
        if (sess.chain == null) return; 
        try {    
            sess.chain.close();    
            sess.chain.chain = null;
            sess.chain = null;
        } catch (IOException e) { 
        }
    }

    @SuppressWarnings("resource")
    public static void main(String[] args) throws Exception {   
        Dispatcher dispatcher = new Dispatcher(); 
        IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306"); 
        final Server server = new Server(dispatcher, ioAdaptor, 3306);
        server.setServerName("TcpProxyServer");
        server.start();
    }
}

文章转载自 开源中国社区[https://www.oschina.net]

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
相关文章
|
安全 关系型数据库 数据管理
DMS产品常见问题之香港地区RDS开启安全访问代理失败如何解决
DMS(数据管理服务,Data Management Service)是阿里云提供的一种数据库管理和维护工具,它支持数据的查询、编辑、分析及安全管控;本汇总集中了DMS产品在实际使用中用户常遇到的问题及其相应的解答,目的是为使用者提供快速参考,帮助他们有效地解决在数据管理过程中所面临的挑战。
|
DataWorks 安全 关系型数据库
DMS产品常见问题之香港RDS走代理失败如何解决
DMS(数据管理服务,Data Management Service)是阿里云提供的一种数据库管理和维护工具,它支持数据的查询、编辑、分析及安全管控;本汇总集中了DMS产品在实际使用中用户常遇到的问题及其相应的解答,目的是为使用者提供快速参考,帮助他们有效地解决在数据管理过程中所面临的挑战。
|
SQL 算法 关系型数据库
Sharding-Proxy代理Mysql服务
Apache shardingSphere Sharding-proxy落地实战
467 80
|
NoSQL 关系型数据库 MySQL
Docker安装详细步骤及相关环境安装配置(mysql、jdk、redis、自己的私有仓库Gitlab 、C和C++环境以及Nginx服务代理)
Docker安装详细步骤及相关环境安装配置(mysql、jdk、redis、自己的私有仓库Gitlab 、C和C++环境以及Nginx服务代理)
1215 0
|
关系型数据库 MySQL 中间件
|
运维 负载均衡 Cloud Native
「读写分离」RDS PostgreSQL数据库代理发布,助力降本增效
基于MaxScale的RDS数据库代理服务能够帮助客户实现数据库的读写分离架构,以低成本实现应用横向扩展能力。
1274 0
「读写分离」RDS PostgreSQL数据库代理发布,助力降本增效
|
SQL 监控 关系型数据库
|
分布式计算 Java 关系型数据库
linux 设置代理 安装jdk mysql tomcat redis hadoop
 1.修改linux用户名和密码 前提进入root用户  原用户名 XX,改成用户名 YY         执行 usermod -l YY XX  修改用户 YY 的密码,如果你在root权限下 执行 passwd YY  useradd -d /home/usr/baoyou -m...
1459 0
|
5月前
|
缓存 关系型数据库 BI
使用MYSQL Report分析数据库性能(下)
使用MYSQL Report分析数据库性能
430 158
|
5月前
|
关系型数据库 MySQL 数据库
自建数据库如何迁移至RDS MySQL实例
数据库迁移是一项复杂且耗时的工程,需考虑数据安全、完整性及业务中断影响。使用阿里云数据传输服务DTS,可快速、平滑完成迁移任务,将应用停机时间降至分钟级。您还可通过全量备份自建数据库并恢复至RDS MySQL实例,实现间接迁移上云。

推荐镜像

更多