Netty之协议设计

简介: Netty之协议设计

为什么需要协议

TCP/IP 中消息传输基于流的方式,没有边界。

协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则

例如:在网络上传输

下雨天留客天留我不留

是中文一句著名的无标点符号句子,在没有标点符号情况下,这句话有数种拆解方式,而意思却是完全不同,所以常被用作讲述标点符号的重要性

一种解读

下雨天留客,天留,我不留

另一种解读

下雨天,留客天,留我不?留

如何设计协议呢?其实就是给网络传输的信息加上“标点符号”。但通过分隔符来断句不是很好,因为分隔符本身如果用于传输,那么必须加以区分。因此,下面一种协议较为常用

定长字节表示内容长度 + 实际内容

例如,假设一个中文字符长度为 3,按照上述协议的规则,发送信息方式如下,就不会被接收方弄错意思了

0f下雨天留客06天留09我不留

redis协议示例

*3

$3

SET

$5

mykey

$7

myvalue

对于上面的内容发出的命令为SET mykey myvalue

*3表示要发三个数组

$3表示第一个数组的长度为3

后接具体的指令为SET

后面的$5 $7也是同样的道理

1. public class redisHttp {
2. public static void main(String[] args) {
3. NioEventLoopGroup worker = new NioEventLoopGroup();
4. byte[] LINE={13,10};
5. try {
6. Bootstrap bootstrap = new Bootstrap();
7.             bootstrap.channel(NioSocketChannel.class);
8.             bootstrap.group(worker);
9.             bootstrap.handler(new ChannelInitializer<SocketChannel>() {
10. protected void initChannel(SocketChannel ch) throws Exception {
11.                     ch.pipeline().addLast(new LoggingHandler());
12.                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
13. public void channelActive(ChannelHandlerContext ctx) throws Exception {
14.                             set(ctx);
15.                             get(ctx);
16.                         };
17. private void get(ChannelHandlerContext ctx){
18. ByteBuf buf = ctx.alloc().buffer();
19.                             buf.writeBytes("*2".getBytes());
20.                             buf.writeBytes(LINE);
21.                             buf.writeBytes("$3".getBytes());
22.                             buf.writeBytes(LINE);
23.                             buf.writeBytes("get".getBytes());
24.                             buf.writeBytes(LINE);
25.                             buf.writeBytes("aaa".getBytes());
26.                             buf.writeBytes(LINE);
27.                             ctx.writeAndFlush(buf);
28.                         }
29. 
30. private  void  set(ChannelHandlerContext ctx){
31. ByteBuf buf = ctx.alloc().buffer();
32.                             buf.writeBytes("*3".getBytes());
33.                             buf.writeBytes(LINE);
34.                             buf.writeBytes("$3".getBytes());
35.                             buf.writeBytes(LINE);
36.                             buf.writeBytes("set".getBytes());
37.                             buf.writeBytes(LINE);
38.                             buf.writeBytes("$3".getBytes());
39.                             buf.writeBytes(LINE);
40.                             buf.writeBytes("aaa".getBytes());
41.                             buf.writeBytes(LINE);
42.                             buf.writeBytes("xxx".getBytes());
43.                             buf.writeBytes(LINE);
44.                             ctx.writeAndFlush(buf);
45.                         }
46.                     });
47.                 }
48.             });
49. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6379);
50.             channelFuture.channel().closeFuture().sync();
51. 
52.         }catch (Exception e){
53.             e.printStackTrace();
54.         }finally {
55.             worker.shutdownGracefully();
56.         }
57.     }
58. }

http协议举例

1. public class httpSimple {
2. static final Logger log = LoggerFactory.getLogger(HelloWordServer.class);
3. public static void main(String[] args) {
4. 
5.         NioEventLoopGroup boss=new NioEventLoopGroup();
6.         NioEventLoopGroup worker=new NioEventLoopGroup();
7. try {
8. ServerBootstrap serverBootstrap = new ServerBootstrap();
9.             serverBootstrap.channel(NioServerSocketChannel.class);
10.             serverBootstrap.group(boss,worker);
11.             serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
12. protected void initChannel(SocketChannel ch) throws Exception {
13.                     ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
14.                     ch.pipeline().addLast(new HttpServerCodec());
15.                     ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
16. @Override
17. protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
18. //获取请求
19.                             log.debug(msg.uri());
20. // 返回响应
21. DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
22. byte[] bytes = "<h1>Hello,world!</h1>".getBytes();
23.                             response.headers().setInt(CONTENT_LENGTH,bytes.length);
24.                             response.content().writeBytes(bytes);
25. //写回响应
26.                             ctx.writeAndFlush(response);
27.                         }
28.                     });
29. 
30.                 }
31.             });
32. ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
33.             channelFuture.channel().closeFuture().sync();
34. 
35.         }catch (Exception e){
36.             e.printStackTrace();
37.         }finally {
38.             boss.shutdownGracefully();
39.             worker.shutdownGracefully();
40.         }
41. 
42.     }
43. }

启动服务端在浏览器中输入localhost:8080

自定义协议

要素

  • 魔数,用来在第一时间判定是否是无效数据包
  • 版本号,可以支持协议的升级
  • 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
  • 指令类型,是登录、注册、单聊、群聊... 跟业务相关
  • 请求序号,为了双工通信,提供异步能力
  • 正文长度
  • 消息正文

编解码器

1. @Slf4j
2. public class MessageCodec extends ByteToMessageCodec<Message> {
3. 
4. @Override
5. protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
6. 
7.     }
8. 
9. @Override
10. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
11. 
12.     }
13. }

重写编码方法

1. @Override
2. protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
3. // 1. 4 字节的魔数
4.         out.writeBytes(new byte[]{1, 2, 3, 4});
5. // 2. 1 字节的版本,
6.         out.writeByte(1);
7. // 3. 1 字节的序列化方式 jdk 0 , json 1
8.         out.writeByte(0);
9. // 4. 1 字节的指令类型
10.         out.writeByte(msg.getMessageType());
11. // 5. 4 个字节
12.         out.writeInt(msg.getSequenceId());
13. // 无意义,对齐填充
14.         out.writeByte(0xff);
15. // 6. 获取内容的字节数组
16. ByteArrayOutputStream bos = new ByteArrayOutputStream();
17. ObjectOutputStream oos = new ObjectOutputStream(bos);
18.         oos.writeObject(msg);
19. byte[] bytes = bos.toByteArray();
20. // 7. 长度
21.         out.writeInt(bytes.length);
22. // 8. 写入内容
23.         out.writeBytes(bytes);
24. 
25.     }

用于将自定义Message对象编码成二进制数据流发送给远程服务器。具体解释如下:

  1. 4字节的魔数:这个魔数是用来标志协议的,客户端和服务端都要保持一致,表示这是同一种协议。
  2. 1字节的版本:表示当前数据流的版本号。
  3. 1字节的序列化方式:表示使用哪种序列化方式将Message对象转为二进制数据流,其中0代表JDK序列化方式,1代表JSON序列化方式。
  4. 1字节的指令类型:表示Message对象中的指令类型,也就是表示这个消息是干什么用的。
  5. 4字节的序列号:表示该消息的序列号,用于检测是否有消息丢失或重复等问题。
  6. 无意义,8位填充:由于前面魔数、版本、序列化方式、指令类型、序列号已经使用了12个字节的长度,而长度字段需要占用4个字节的长度,为了对其,需要在这里填充一个字节,使得总长度为13个字节。
  7. 4字节的消息体长度:表示消息体的长度。
  8. 消息内容:将Message对象序列化为字节数组,再写到输出流中。

最终,这个编码器将Message对象转化为了一个二进制数据流,方便通过网络传输到远程服务器。

重写解码方法

1. @Override
2. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
3. int magicNum = in.readInt();
4. byte version = in.readByte();
5. byte serializerType = in.readByte();
6. byte messageType = in.readByte();
7. int sequenceId = in.readInt();
8.         in.readByte();
9. int length = in.readInt();
10. byte[] bytes = new byte[length];
11.         in.readBytes(bytes, 0, length);
12. ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
13. Message message = (Message) ois.readObject();
14.         log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
15.         log.debug("{}", message);
16.         out.add(message);
17.     }

用于将接收到的二进制数据流解码成自定义的Message对象。具体解释如下:

  1. 读取4字节的魔数。
  2. 读取1字节的版本。
  3. 读取1字节的序列化方式。
  4. 读取1字节的指令类型。
  5. 读取4字节的序列号。
  6. 读取1字节,这个字节被视为无意义填充。
  7. 读取4字节的消息体长度,也就是消息内容的字节长度。
  8. 根据消息体长度创建一个字节数组,并从输入流中读取相应的字节数据。
  9. 将字节数组反序列化成一个Message对象。
  10. 输出相应的日志信息,包括魔数、版本、序列化方式、指令类型、序列号、消息体长度以及反序列化后的Message对象。
  11. 把反序列化后的Message对象添加到out列表中。

最终,这个解码器将二进制数据流转化为了自定义的Message对象,方便在业务逻辑中使用。

测试

1. EmbeddedChannel channel = new EmbeddedChannel(
2. new LoggingHandler(),
3. new LengthFieldBasedFrameDecoder(
4. 1024, 12, 4, 0, 0),
5. new MessageCodec()
6. );
7. // encode
8. LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123", "张三");
9. //        channel.writeOutbound(message);
10. // decode
11. ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
12. new MessageCodec().encode(null, message, buf);
13. 
14. ByteBuf s1 = buf.slice(0, 100);
15. ByteBuf s2 = buf.slice(100, buf.readableBytes() - 100);
16. s1.retain(); // 引用计数 2
17. channel.writeInbound(s1); // release 1
18. channel.writeInbound(s2);


相关文章
|
4月前
|
Java Spring
Springboot整合Netty,自定义协议实现
以上就是在Spring Boot中整合Netty并实现自定义协议的基本步骤。你需要根据你的自定义协议的具体需求,来实现你的编码器、解码器和处理器。
140 0
|
4月前
|
编解码 JSON 网络协议
Netty使用篇:Http协议编解码
Netty使用篇:Http协议编解码
|
JSON 网络协议 算法
由浅入深Netty协议设计与解析
由浅入深Netty协议设计与解析
55 0
|
Nacos
Netty自定义消息协议的实现逻辑处理粘包拆包、心跳机制
Netty自定义消息协议的实现逻辑处理粘包拆包、心跳机制
143 0
|
数据安全/隐私保护
Netty实战(十四)WebSocket协议(二)
我们之前说过为了将 ChannelHandler 安装到 ChannelPipeline 中,需要扩展了ChannelInitializer,并实现 initChannel()方法
167 0
|
Rust Dubbo 网络协议
通过 HTTP/2 协议案例学习 Java & Netty 性能调优:工具、技巧与方法论
通过 HTTP/2 协议案例学习 Java & Netty 性能调优:工具、技巧与方法论
12640 8
|
网络协议 前端开发 Linux
快来体验快速通道,netty中epoll传输协议详解
在前面的章节中,我们讲解了kqueue的使用和原理,接下来我们再看一下epoll的使用。两者都是更加高级的IO方式,都需要借助native的方法实现,不同的是Kqueue用在mac系统中,而epoll用在liunx系统中。
|
Java
Netty自定义协议
先写一个Messsage类,解码的时候将要把ByteBuf解码为Message
91 0
|
NoSQL Redis
使用netty按照Redis协议发消息完成set key value 命令
使用netty按照Redis协议发消息完成set key value 命令
82 0
|
前端开发 Java 网络性能优化
Netty网络编程(五):使用UDP协议
Netty网络编程(五):使用UDP协议
518 0