为什么需要协议
TCP/IP 中消息传输基于流的方式,没有边界。
协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则
例如:在网络上传输
下雨天留客天留我不留
是中文一句著名的无标点符号句子,在没有标点符号情况下,这句话有数种拆解方式,而意思却是完全不同,所以常被用作讲述标点符号的重要性
一种解读
下雨天留客,天留,我不留
另一种解读
下雨天,留客天,留我不?留
如何设计协议呢?其实就是给网络传输的信息加上“标点符号”。但通过分隔符来断句不是很好,因为分隔符本身如果用于传输,那么必须加以区分。因此,下面一种协议较为常用
定长字节表示内容长度 + 实际内容
例如,假设一个中文字符长度为 3,按照上述协议的规则,发送信息方式如下,就不会被接收方弄错意思了
0f下雨天留客06天留09我不留
redis协议示例
*3
$3
SET
$5
mykey
$7
myvalue
对于上面的内容发出的命令为SET mykey myvalue
*3表示要发三个数组
$3表示第一个数组的长度为3
后接具体的指令为SET
后面的$5 $7也是同样的道理
1. public class redisHttp { 2. public static void main(String[] args) { 3. NioEventLoopGroup worker = new NioEventLoopGroup(); 4. byte[] LINE={13,10}; 5. try { 6. Bootstrap bootstrap = new Bootstrap(); 7. bootstrap.channel(NioSocketChannel.class); 8. bootstrap.group(worker); 9. bootstrap.handler(new ChannelInitializer<SocketChannel>() { 10. protected void initChannel(SocketChannel ch) throws Exception { 11. ch.pipeline().addLast(new LoggingHandler()); 12. ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ 13. public void channelActive(ChannelHandlerContext ctx) throws Exception { 14. set(ctx); 15. get(ctx); 16. }; 17. private void get(ChannelHandlerContext ctx){ 18. ByteBuf buf = ctx.alloc().buffer(); 19. buf.writeBytes("*2".getBytes()); 20. buf.writeBytes(LINE); 21. buf.writeBytes("$3".getBytes()); 22. buf.writeBytes(LINE); 23. buf.writeBytes("get".getBytes()); 24. buf.writeBytes(LINE); 25. buf.writeBytes("aaa".getBytes()); 26. buf.writeBytes(LINE); 27. ctx.writeAndFlush(buf); 28. } 29. 30. private void set(ChannelHandlerContext ctx){ 31. ByteBuf buf = ctx.alloc().buffer(); 32. buf.writeBytes("*3".getBytes()); 33. buf.writeBytes(LINE); 34. buf.writeBytes("$3".getBytes()); 35. buf.writeBytes(LINE); 36. buf.writeBytes("set".getBytes()); 37. buf.writeBytes(LINE); 38. buf.writeBytes("$3".getBytes()); 39. buf.writeBytes(LINE); 40. buf.writeBytes("aaa".getBytes()); 41. buf.writeBytes(LINE); 42. buf.writeBytes("xxx".getBytes()); 43. buf.writeBytes(LINE); 44. ctx.writeAndFlush(buf); 45. } 46. }); 47. } 48. }); 49. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6379); 50. channelFuture.channel().closeFuture().sync(); 51. 52. }catch (Exception e){ 53. e.printStackTrace(); 54. }finally { 55. worker.shutdownGracefully(); 56. } 57. } 58. }
http协议举例
1. public class httpSimple { 2. static final Logger log = LoggerFactory.getLogger(HelloWordServer.class); 3. public static void main(String[] args) { 4. 5. NioEventLoopGroup boss=new NioEventLoopGroup(); 6. NioEventLoopGroup worker=new NioEventLoopGroup(); 7. try { 8. ServerBootstrap serverBootstrap = new ServerBootstrap(); 9. serverBootstrap.channel(NioServerSocketChannel.class); 10. serverBootstrap.group(boss,worker); 11. serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { 12. protected void initChannel(SocketChannel ch) throws Exception { 13. ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); 14. ch.pipeline().addLast(new HttpServerCodec()); 15. ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() { 16. @Override 17. protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception { 18. //获取请求 19. log.debug(msg.uri()); 20. // 返回响应 21. DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK); 22. byte[] bytes = "<h1>Hello,world!</h1>".getBytes(); 23. response.headers().setInt(CONTENT_LENGTH,bytes.length); 24. response.content().writeBytes(bytes); 25. //写回响应 26. ctx.writeAndFlush(response); 27. } 28. }); 29. 30. } 31. }); 32. ChannelFuture channelFuture = serverBootstrap.bind(8080).sync(); 33. channelFuture.channel().closeFuture().sync(); 34. 35. }catch (Exception e){ 36. e.printStackTrace(); 37. }finally { 38. boss.shutdownGracefully(); 39. worker.shutdownGracefully(); 40. } 41. 42. } 43. }
启动服务端在浏览器中输入localhost:8080
自定义协议
要素
- 魔数,用来在第一时间判定是否是无效数据包
- 版本号,可以支持协议的升级
- 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
- 指令类型,是登录、注册、单聊、群聊... 跟业务相关
- 请求序号,为了双工通信,提供异步能力
- 正文长度
- 消息正文
编解码器
1. @Slf4j 2. public class MessageCodec extends ByteToMessageCodec<Message> { 3. 4. @Override 5. protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception { 6. 7. } 8. 9. @Override 10. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { 11. 12. } 13. }
重写编码方法
1. @Override 2. protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception { 3. // 1. 4 字节的魔数 4. out.writeBytes(new byte[]{1, 2, 3, 4}); 5. // 2. 1 字节的版本, 6. out.writeByte(1); 7. // 3. 1 字节的序列化方式 jdk 0 , json 1 8. out.writeByte(0); 9. // 4. 1 字节的指令类型 10. out.writeByte(msg.getMessageType()); 11. // 5. 4 个字节 12. out.writeInt(msg.getSequenceId()); 13. // 无意义,对齐填充 14. out.writeByte(0xff); 15. // 6. 获取内容的字节数组 16. ByteArrayOutputStream bos = new ByteArrayOutputStream(); 17. ObjectOutputStream oos = new ObjectOutputStream(bos); 18. oos.writeObject(msg); 19. byte[] bytes = bos.toByteArray(); 20. // 7. 长度 21. out.writeInt(bytes.length); 22. // 8. 写入内容 23. out.writeBytes(bytes); 24. 25. }
用于将自定义Message对象编码成二进制数据流发送给远程服务器。具体解释如下:
- 4字节的魔数:这个魔数是用来标志协议的,客户端和服务端都要保持一致,表示这是同一种协议。
- 1字节的版本:表示当前数据流的版本号。
- 1字节的序列化方式:表示使用哪种序列化方式将Message对象转为二进制数据流,其中0代表JDK序列化方式,1代表JSON序列化方式。
- 1字节的指令类型:表示Message对象中的指令类型,也就是表示这个消息是干什么用的。
- 4字节的序列号:表示该消息的序列号,用于检测是否有消息丢失或重复等问题。
- 无意义,8位填充:由于前面魔数、版本、序列化方式、指令类型、序列号已经使用了12个字节的长度,而长度字段需要占用4个字节的长度,为了对其,需要在这里填充一个字节,使得总长度为13个字节。
- 4字节的消息体长度:表示消息体的长度。
- 消息内容:将Message对象序列化为字节数组,再写到输出流中。
最终,这个编码器将Message对象转化为了一个二进制数据流,方便通过网络传输到远程服务器。
重写解码方法
1. @Override 2. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { 3. int magicNum = in.readInt(); 4. byte version = in.readByte(); 5. byte serializerType = in.readByte(); 6. byte messageType = in.readByte(); 7. int sequenceId = in.readInt(); 8. in.readByte(); 9. int length = in.readInt(); 10. byte[] bytes = new byte[length]; 11. in.readBytes(bytes, 0, length); 12. ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); 13. Message message = (Message) ois.readObject(); 14. log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length); 15. log.debug("{}", message); 16. out.add(message); 17. }
用于将接收到的二进制数据流解码成自定义的Message对象。具体解释如下:
- 读取4字节的魔数。
- 读取1字节的版本。
- 读取1字节的序列化方式。
- 读取1字节的指令类型。
- 读取4字节的序列号。
- 读取1字节,这个字节被视为无意义填充。
- 读取4字节的消息体长度,也就是消息内容的字节长度。
- 根据消息体长度创建一个字节数组,并从输入流中读取相应的字节数据。
- 将字节数组反序列化成一个Message对象。
- 输出相应的日志信息,包括魔数、版本、序列化方式、指令类型、序列号、消息体长度以及反序列化后的Message对象。
- 把反序列化后的Message对象添加到out列表中。
最终,这个解码器将二进制数据流转化为了自定义的Message对象,方便在业务逻辑中使用。
测试
1. EmbeddedChannel channel = new EmbeddedChannel( 2. new LoggingHandler(), 3. new LengthFieldBasedFrameDecoder( 4. 1024, 12, 4, 0, 0), 5. new MessageCodec() 6. ); 7. // encode 8. LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123", "张三"); 9. // channel.writeOutbound(message); 10. // decode 11. ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); 12. new MessageCodec().encode(null, message, buf); 13. 14. ByteBuf s1 = buf.slice(0, 100); 15. ByteBuf s2 = buf.slice(100, buf.readableBytes() - 100); 16. s1.retain(); // 引用计数 2 17. channel.writeInbound(s1); // release 1 18. channel.writeInbound(s2);