基于Netty与RabbitMQ的消息服务

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介:

Netty作为一个高性能的异步网络开发框架,可以作为各种服务的开发框架。

前段时间的一个项目涉及到硬件设备实时数据的采集,采用Netty作为采集服务的实现框架,同时使用RabbitMQ作为采集服务和各个其他模块的通信消息队列,整个服务框架图如下:

将业务代码和实际协议解析部分的代码抽离,得到以上一个简单的设计图,代码开源在GitHub上,简单介绍下NettyMQServer采集服务涉及到的几个关键技术点:

1、设备TCP消息解析:

NettyMQServer和采集设备Device之间采用TCP通信,TCP消息的解析可以使用LengthFieldBasedFrameDecoder(消息头和消息体),可以有效的解决TCP消息“粘包”问题。

消息包解析图如下:bytes length field at offset 0, do not strip header, the length field represents the length of the whole message

 lengthFieldOffset   =  0
 lengthFieldLength   =  2
 lengthAdjustment    = -2 (= the length of the Length field)
 initialBytesToStrip =  0

 BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
 +--------+----------------+      +--------+----------------+
 | Length | Actual Content |----->| Length | Actual Content |
 | 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" |
 +--------+----------------+      +--------+----------------+

代码中消息长度的存储采用了4个字节,采用LengthFieldBasedFrameDecoder(65535,0,4,-4,0)解码,Netty会从接收的数据中头4个字节中得到消息的长度,进而得到一个TCP消息包。

2、给设备发消息:

首先在连接创建时,要保留TCP的连接:

复制代码
static final ChannelGroup channels = new DefaultChannelGroup(
            GlobalEventExecutor.INSTANCE);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // A closed channel will be removed from ChannelGroup automatically
        channels.add(ctx.channel());
    }
复制代码

在每次一个Channel Active(连接创建)的时候用ChannelGroup保存这个Channel连接,当需要给某个设备发消息的时候,可以遍历该ChannelGroup,找到对应的Channel,给该Channel发送消息:

for (io.netty.channel.Channel c : EchoServerHandler.channels) {
    ByteBuf msg = Unpooled.copiedBuffer(message.getBytes());
    c.writeAndFlush(msg);
}

这里是给所有的连接的设备都发。当连接断开的时候,ChannelGroup会自动remove掉这个连接,不需要我们手动管理。

3、心跳检测

当某个设备Device由于断电或是其他原因导致设备不正常无法采集数据,Netty服务端需要知道该设备是否在正常工作,可以使用Netty的IdleStateHandler,示例代码如下:

复制代码
// 3 minutes for read idle
ch.pipeline().addLast(new IdleStateHandler(3*60,0,0));
ch.pipeline().addLast(new HeartBeatHandler());

/**
 * Handler implementation for heart beating.
 */
public class HeartBeatHandler extends ChannelInboundHandlerAdapter{

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
            throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                // Read timeout
                System.out.println("READER_IDLE: read timeout from "+ctx.channel().remoteAddress());
                //ctx.disconnect(); //Channel disconnect
            }
        }
    }
}
复制代码

上面设置3分钟没有读到数据,则触发一个READER_IDLE事件。

4、RabbitMQ消息接收与发送

NettyMQServer消息发送采用了Spring AMQP,只需要在配置文件中简单配置一下,就可以方便使用。

NettyMQServer消息接收同样可以采用Spring AMQP,但由于对Spring相关的配置不是很熟悉,为了更灵活的使用MQ,这里使用了RabbitMQ Client Java API来实现:

复制代码
                    Connection connection = connnectionFactory.newConnection();
                    Channel channel = connection.createChannel();
                    channel.exchangeDeclare(exchangeName, "direct", true, false, null);
                    channel.queueDeclare(queueName, true, false, false, null);
                    channel.queueBind(queueName, exchangeName, routeKey);

                    // process the message one by one
                    channel.basicQos(1);

                    QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
                    // auto-ack is false
                    channel.basicConsume(queueName, false, queueingConsumer);
                    while (true) {
                        QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                        String message = new String(delivery.getBody());

                        log.debug("Mq Receiver get message");
                        // Send the message to all connected clients
                        // If you want to send to a specified client, just add
                        // your own logic and ack manually
                        // Be aware that ChannelGroup is thread safe
                        log.info(String.format("Conneted client number: %d",EchoServerHandler.channels.size()));
                        for (io.netty.channel.Channel c : EchoServerHandler.channels) {
                            ByteBuf msg = Unpooled.copiedBuffer(message.getBytes());
                            c.writeAndFlush(msg);
                        }
                        // manually ack to MQ server the message is consumed.
                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    }
复制代码

以上代码从一个Queue中读取数据,为了有效处理数据,防止异常数据丢失,使用了手动Ack。

RabbitMQ的使用方式:http://www.cnblogs.com/luxiaoxun/p/3918054.html

 

代码托管在GitHub上:https://github.com/luxiaoxun/NettyMqServer

 

参考:

http://netty.io/

http://netty.io/4.0/api/io/netty/handler/codec/LengthFieldBasedFrameDecoder.html

http://netty.io/4.0/api/io/netty/handler/timeout/IdleStateHandler.html

 

作者: 阿凡卢
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

相关文章
|
6月前
Netty 与硬件设备交互,下行命令时(服务对设备),如何等待设备响应,再进行业务操作解决方案
Netty 与硬件设备交互,下行命令时(服务对设备),如何等待设备响应,再进行业务操作解决方案
166 1
|
10月前
|
Java
Netty实现HTTP服务
Netty实现HTTP服务
77 0
|
前端开发 网络协议 Java
Netty服务开发及性能优化
造成假死的原因可能是公网丢包、客户端或服务端网络故障等,Netty为我们提供了IdleStateHandler 来解决超时假死问题,示例代码如下
147 0
|
安全 API
Gateway集成Netty服务
Netty是一个异步的,事件驱动的网络应用框架,用以快速开发高可靠、高性能的网络应用程序,提供网络传输能力的管理,支持常见的数据传输协议;
366 0
Gateway集成Netty服务
|
前端开发 Java
Netty之服务启动且注册成功之后
Netty之服务启动且注册成功之后
151 0
|
前端开发
Netty流程学习一-netty启动服务
问题:我们的线程:openSelector在什么时候创建的。 在创建NioEventLoop的时候,创建openSelector。 什么时候创建severSocketChannel、初始化serverSocketChannel,同时给serverSocketChannel从bossGroup中选择一个NioEventLoop 创建serverSocketChannel是在initAndRegister的时候,通过泛型+放射+工厂的方式创建serverSocketChannel。 而初始化则是设置channelOptions的相关参数信息、设置属性信息,同时通过channel的pipeline方
160 0
Netty流程学习一-netty启动服务
|
存储 负载均衡 API
Netty + ZooKeeper 实现简单的服务注册与发现
Netty + ZooKeeper 实现简单的服务注册与发现
377 0
Netty + ZooKeeper 实现简单的服务注册与发现
|
网络协议 Java Spring
netty案例,netty4.1基础入门篇十二《简单实现一个基于Netty搭建的Http服务》
Netty不仅可以搭建Socket服务,也可以搭建Http、Https服务。本章节我们通过一个简单的入门案例,来了解Netty搭建的Http服务,在我们后续的Netty网关服务中会使用到这样的功能点。 超文本传输协议(HTTP,HyperText Transfer Protocol)是互联网上应用最为广泛的一种网络协议。
315 0
netty案例,netty4.1基础入门篇十二《简单实现一个基于Netty搭建的Http服务》
netty案例,netty4.1中级拓展篇八《Netty心跳服务与断线重连》
在我们使用netty中,需要监测服务是否稳定以及在网络异常链接断开时候可以自动重连。需要实现监听;f.addListener(new MyChannelFutureListener()
223 0
netty案例,netty4.1中级拓展篇八《Netty心跳服务与断线重连》