【Apache Mina2.0开发之二】自定义实现Server/Client端的编解码工厂(自定义编码与解码器)!

简介:

在上一篇博文中已经简单介绍过“过滤器”的概念,那么在Mina 中的协议编解码器通过过滤器 ProtocolCodecFilter 构造,这个过滤器的构造方法需 要一个 ProtocolCodecFactory,这从前面注册 TextLineCodecFactory 的代码就可以看出来。 ProtocolCodecFactory 中有如下两个方法:

public interface ProtocolCodecFactory {

ProtocolEncoder getEncoder(IoSession session) throws Exception;

ProtocolDecoder getDecoder(IoSession session) throws Exception;

}

因此,构建一个 ProtocolCodecFactory 需要 ProtocolEncoder、ProtocolDecoder 两个实例。你可能要问 JAVA 对象和二进制数据之间如何转换呢?这个要依据具体的通信协议,也就是 Server 端要和 Client 端约定网络传输的数据是什么样的格式,譬如:第一个字节表示数据 长度,第二个字节是数据类型,后面的就是真正的数据(有可能是文字、有可能是图片等等), 然后你可以依据长度从第三个字节向后读,直到读取到指定第一个字节指定长度的数据。

简单的说,HTTP 协议就是一种浏览器与 Web 服务器之间约定好的通信协议,双方按照指定 的协议编解码数据。我们再直观一点儿说,前面一直使用的 TextLine 编解码器就是在读取 网络上传递过来的数据时,只要发现哪个字节里存放的是 ASCII 的 10、13 字符(\r、\n), 就认为之前的字节就是一个字符串(默认使用 UTF-8 编码)。

以上所说的就是各种协议实际上就是网络七层结构中的应用层协议,它位于网络层(IP)、 传输层(TCP)之上,Mina 的协议编解码器就是让你实现一套自己的应用层协议栈。

首先我们创建一个传递的对象类:

 


  
  
  1. package com.entity; 
  2. import javax.persistence.Column; 
  3. import javax.persistence.Entity; 
  4. import javax.persistence.GeneratedValue; 
  5. import javax.persistence.GenerationType; 
  6. import javax.persistence.Id; 
  7. import javax.persistence.Table; 
  8.  
  9. import org.hibernate.annotations.Index; 
  10.  
  11. /** 
  12.  * @author Himi 
  13.  */ 
  14. @Entity 
  15. @Table(name = "playerAccount"
  16. public class PlayerAccount_Entity { 
  17.  
  18.     private int id; 
  19.     private String name; 
  20.     private String emailAdress; 
  21.     private int sex;// 0=man 1=woman 
  22.  
  23.     @Id 
  24.     @Column(name = "playerAccountID"
  25.     @GeneratedValue(strategy = GenerationType.AUTO) 
  26.     public int getId() { 
  27.         return id; 
  28.     } 
  29.  
  30.     public void setId(int id) { 
  31.         this.id = id; 
  32.     } 
  33.  
  34.     @Index(name="nameIndex"
  35.     public String getName() { 
  36.         return name; 
  37.     } 
  38.  
  39.     public void setName(String name) { 
  40.         this.name = name; 
  41.     } 
  42.  
  43.     public String getEmailAdress() { 
  44.         return emailAdress; 
  45.     } 
  46.  
  47.     public void setEmailAdress(String emailAdress) { 
  48.         this.emailAdress = emailAdress; 
  49.     } 
  50.  
  51.     public int getSex() { 
  52.         return sex; 
  53.     } 
  54.  
  55.     public void setSex(int sex) { 
  56.         this.sex = sex; 
  57.     } 
  58.  

2. 创建一个编码类:

 


  
  
  1. package com.protocol; 
  2. /** 
  3.  * @author Himi 
  4.  */ 
  5.  
  6. import java.nio.charset.Charset; 
  7. import java.nio.charset.CharsetEncoder; 
  8.  
  9. import org.apache.mina.core.buffer.IoBuffer; 
  10. import org.apache.mina.core.session.IoSession; 
  11. import org.apache.mina.filter.codec.ProtocolEncoderAdapter; 
  12. import org.apache.mina.filter.codec.ProtocolEncoderOutput; 
  13.  
  14. import com.entity.PlayerAccount_Entity; 
  15.  
  16. public class HEncoder extends ProtocolEncoderAdapter { 
  17.  
  18.     private final Charset charset; 
  19.  
  20.     public HEncoder(Charset charset) { 
  21.         this.charset = charset; 
  22.  
  23.     } 
  24.  
  25.     @Override 
  26.     public void encode(IoSession arg0, Object arg1, ProtocolEncoderOutput arg2) 
  27.             throws Exception { 
  28.  
  29.         CharsetEncoder ce = charset.newEncoder(); 
  30.  
  31.         PlayerAccount_Entity paEntity = (PlayerAccount_Entity) arg1; 
  32.         String name = paEntity.getName();  
  33.  
  34.         IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true); 
  35.         buffer.putString(name, ce);  
  36.         buffer.flip(); 
  37.         arg2.write(buffer); 
  38.  
  39.     } 
  40.  

在 Mina 中编写编码器可以实现 ProtocolEncoder,其中有 encode()、dispose()两个方法需 要实现。这里的 dispose()方法用于在销毁编码器时释放关联的资源,由于这个方法一般我 们并不关心,所以通常我们直接继承适配器 ProtocolEncoderAdapter。

 

3.创建一个解码类:


  
  
  1. package com.protocol; 
  2. /** 
  3.  * @author Himi 
  4.  */ 
  5. import java.nio.charset.Charset; 
  6. import java.nio.charset.CharsetDecoder; 
  7.  
  8. import org.apache.mina.core.buffer.IoBuffer; 
  9. import org.apache.mina.core.session.IoSession; 
  10. import org.apache.mina.filter.codec.CumulativeProtocolDecoder; 
  11. import org.apache.mina.filter.codec.ProtocolDecoderOutput; 
  12.  
  13. import com.entity.PlayerAccount_Entity; 
  14.  
  15. public class HDecoder extends CumulativeProtocolDecoder { 
  16.  
  17.     private final Charset charset; 
  18.  
  19.     public HDecoder(Charset charset) { 
  20.         this.charset = charset; 
  21.  
  22.     } 
  23.  
  24.     @Override 
  25.     protected boolean doDecode(IoSession arg0, IoBuffer arg1, 
  26.             ProtocolDecoderOutput arg2) throws Exception { 
  27.         CharsetDecoder cd = charset.newDecoder(); 
  28.  
  29.         String name = arg1.getString(cd);  
  30.  
  31.         PlayerAccount_Entity paEntity = new PlayerAccount_Entity(); 
  32.         paEntity.setName(name);  
  33.  
  34.         arg2.write(paEntity); 
  35.         return true
  36.     } 
  37.  

在 Mina 中编写解码器,可以实现 ProtocolDecoder 接口,其中有 decode()、finishDecode()、 dispose()三个方法。这里的 finishDecode()方法可以用于处理在 IoSession 关闭时剩余的 读取数据,一般这个方法并不会被使用到,除非协议中未定义任何标识数据什么时候截止 的约定,譬如:Http 响应的 Content-Length 未设定,那么在你认为读取完数据后,关闭 TCP 连接(IoSession 的关闭)后,就可以调用这个方法处理剩余的数据,当然你也可以忽略调 剩余的数据。同样的,一般情况下,我们只需要继承适配器 ProtocolDecoderAdapter,关 注 decode()方法即可。

但前面说过解码器相对编码器来说,最麻烦的是数据发送过来的规模,以聊天室为例,一个 TCP 连接建立之后,那么隔一段时间就会有聊天内容发送过来,也就是 decode()方法会被往 复调用,这样处理起来就会非常麻烦。那么 Mina 中幸好提供了 CumulativeProtocolDecoder 类,从名字上可以看出累积性的协议解码器,也就是说只要有数据发送过来,这个类就会去 读取数据,然后累积到内部的 IoBuffer 缓冲区,但是具体的拆包(把累积到缓冲区的数据 解码为 JAVA 对象)交由子类的 doDecode()方法完成,实际上 CumulativeProtocolDecoder 就是在 decode()反复的调用暴漏给子类实现的 doDecode()方法。

具体执行过程如下所示:

A. 你的 doDecode()方法返回 true 时,CumulativeProtocolDecoder 的 decode()方法会首先判断你是否在 doDecode()方法中从内部的 IoBuffer 缓冲区读取了数据,如果没有,ce); buffer.putString(smsContent, ce);buffer.flip();则会抛出非法的状态异常,也就是你的 doDecode()方法返回 true 就表示你已经消费了 本次数据(相当于聊天室中一个完整的消息已经读取完毕),进一步说,也就是此时你 必须已经消费过内部的 IoBuffer 缓冲区的数据(哪怕是消费了一个字节的数据)。如果 验证过通过,那么 CumulativeProtocolDecoder 会检查缓冲区内是否还有数据未读取, 如果有就继续调用 doDecode()方法,没有就停止对 doDecode()方法的调用,直到有新 的数据被缓冲。

B. 当你的 doDecode()方法返回 false 时,CumulativeProtocolDecoder 会停止对 doDecode() 方法的调用,但此时如果本次数据还有未读取完的,就将含有剩余数据的 IoBuffer 缓 冲区保存到 IoSession 中,以便下一次数据到来时可以从 IoSession 中提取合并。如果 发现本次数据全都读取完毕,则清空 IoBuffer 缓冲区。简而言之,当你认为读取到的数据已经够解码了,那么就返回 true,否则就返回 false。这 个 CumulativeProtocolDecoder 其实最重要的工作就是帮你完成了数据的累积,因为这个工 作是很烦琐的。

4.创建一个编解码工厂类:

 


  
  
  1. package com.protocol; 
  2.  
  3. import java.nio.charset.Charset; 
  4.  
  5. import org.apache.mina.core.session.IoSession; 
  6. import org.apache.mina.filter.codec.ProtocolCodecFactory; 
  7. import org.apache.mina.filter.codec.ProtocolDecoder; 
  8. import org.apache.mina.filter.codec.ProtocolEncoder; 
  9. /** 
  10.  *  
  11.  * @author Himi 
  12.  * 
  13.  */ 
  14. public class HCoderFactory implements ProtocolCodecFactory { 
  15.  
  16.     private final HEncoder encoder; 
  17.     private final HDecoder decoder; 
  18.  
  19.     public HCoderFactory() { 
  20.         this(Charset.defaultCharset()); 
  21.     } 
  22.  
  23.     public HCoderFactory(Charset charSet) { 
  24.         this.encoder = new HEncoder(charSet); 
  25.         this.decoder = new HDecoder(charSet); 
  26.     } 
  27.  
  28.     @Override 
  29.     public ProtocolDecoder getDecoder(IoSession arg0) throws Exception { 
  30.         // TODO Auto-generated method stub 
  31.         return decoder; 
  32.     } 
  33.  
  34.     @Override 
  35.     public ProtocolEncoder getEncoder(IoSession arg0) throws Exception { 
  36.         // TODO Auto-generated method stub 
  37.         return encoder; 
  38.     } 
  39.  

这个工厂类就是包装了编码器、解码器,通过接口中的 getEncoder()、getDecoder() 方法向 ProtocolCodecFilter 过滤器返回编解码器实例,以便在过滤器中对数据进行编解码 处理。

    5. 以上3个编解码有关的类在Server与Client读需要有,那么同时我们创建好了自定义的编解码有关的类后,我们设置Server和Client的编码工厂为我们自定义的编码工厂类:

 


  
  
  1. DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();      chain.addLast("mycoder"new ProtocolCodecFilter(new HCoderFactory(             Charset.forName("UTF-8")))); 

 

6.书写测试的消息处理器类Client和Server端;

Client端消息处理器: 

 


  
  
  1. /** 
  2.  * @author Himi 
  3.  */ 
  4.  
  5. import org.apache.mina.core.service.IoHandlerAdapter; 
  6. import org.apache.mina.core.session.IoSession; 
  7.  
  8. import com.protocol.PlayerAccount_Entity; 
  9.  
  10. public class ClientMainHanlder extends IoHandlerAdapter { 
  11.     // 当一个客端端连结到服务器后 
  12.     @Override 
  13.     public void sessionOpened(IoSession session) throws Exception { 
  14.         PlayerAccount_Entity ho = new PlayerAccount_Entity(); 
  15.         ho.setName("李华明 xiaominghimi@gmail.com"); 
  16.         session.write(ho); 
  17.     } 
  18.  
  19.     // 当一个客户端关闭时 
  20.     @Override 
  21.     public void sessionClosed(IoSession session) { 
  22.         System.out.println("I'm Client &&  I closed!"); 
  23.     } 
  24.  
  25.     // 当服务器端发送的消息到达时: 
  26.     @Override 
  27.     public void messageReceived(IoSession session, Object message) 
  28.             throws Exception { 
  29.         PlayerAccount_Entity ho = (PlayerAccount_Entity) message; 
  30.         System.out.println("Server Say:name:" + ho.getName()); 
  31.     } 

Server端消息处理器:

 


  
  
  1. /** 
  2.  * @author Himi 
  3.  */ 
  4.  
  5. import org.apache.mina.core.service.IoHandlerAdapter; 
  6. import org.apache.mina.core.session.IdleStatus; 
  7. import org.apache.mina.core.session.IoSession; 
  8.  
  9. import com.entity.PlayerAccount_Entity; 
  10. import com.sessionUtilities.HibernateUtil; 
  11.  
  12. public class MainHanlder extends IoHandlerAdapter { 
  13.  
  14.     private int count = 0
  15.  
  16.     // 当一个新客户端连接后触发此方法. 
  17.     /* 
  18.      * 这个方法当一个 Session 对象被创建的时候被调用。对于 TCP 连接来说,连接被接受的时候 调用,但要注意此时 TCP 
  19.      * 连接并未建立,此方法仅代表字面含义,也就是连接的对象 IoSession 被创建完毕的时候,回调这个方法。 对于 UDP 
  20.      * 来说,当有数据包收到的时候回调这个方法,因为 UDP 是无连接的。 
  21.      */ 
  22.     public void sessionCreated(IoSession session) { 
  23.         System.out.println("新客户端连接"); 
  24.     } 
  25.  
  26.     // 当一个客端端连结进入时 @Override 
  27.     /* 
  28.      * 这个方法在连接被打开时调用,它总是在 sessionCreated()方法之后被调用。对于 TCP 来 
  29.      * 说,它是在连接被建立之后调用,你可以在这里执行一些认证操作、发送数据等。 对于 UDP 来说,这个方法与 
  30.      * sessionCreated()没什么区别,但是紧跟其后执行。如果你每 隔一段时间,发送一些数据,那么 
  31.      * sessionCreated()方法只会在第一次调用,但是 sessionOpened()方法每次都会调用。 
  32.      */ 
  33.     public void sessionOpened(IoSession session) throws Exception { 
  34.         count++; 
  35.         System.out.println("第 " + count + " 个 client 登陆!address: : " 
  36.                 + session.getRemoteAddress()); 
  37.          
  38.  
  39.     } 
  40.  
  41.     // 当客户端发送的消息到达时: 
  42.     /* 
  43.      * 对于 TCP 来说,连接被关闭时,调用这个方法。 对于 UDP 来说,IoSession 的 close()方法被调用时才会毁掉这个方法。 
  44.      */ 
  45.     @Override 
  46.     public void messageReceived(IoSession session, Object message) 
  47.             throws Exception { 
  48.         // // 我们己设定了服务器解析消息的规则是一行一行读取,这里就可转为String: 
  49.         // String s = (String) message; 
  50.         // // Write the received data back to remote peer 
  51.         // System.out.println("收到客户机发来的消息: " + s); 
  52.         // // 测试将消息回送给客户端 session.write(s+count); count++; 
  53.  
  54.         PlayerAccount_Entity ho = (PlayerAccount_Entity) message; 
  55.         System.out.println("Client Say:" + ho.getName()); 
  56.          
  57.          
  58.         ho.setName("Himi  317426208@qq.com"); 
  59.         session.write(ho); 
  60.          
  61.     } 
  62.  
  63.     // 当信息已经传送给客户端后触发此方法. 
  64.     /* 
  65.      * 当发送消息成功时调用这个方法,注意这里的措辞,发送成功之后,也就是说发送消息是不 能用这个方法的。 
  66.      */ 
  67.     @Override 
  68.     public void messageSent(IoSession session, Object message) { 
  69.         System.out.println("信息已经传送给客户端"); 
  70.  
  71.     } 
  72.  
  73.     // 当一个客户端关闭时 
  74.     /* 
  75.      * 对于 TCP 来说,连接被关闭时,调用这个方法。 对于 UDP 来说,IoSession 的 close()方法被调用时才会毁掉这个方法。 
  76.      */ 
  77.     @Override 
  78.     public void sessionClosed(IoSession session) { 
  79.         System.out.println("one Clinet Disconnect !"); 
  80.     } 
  81.  
  82.     // 当连接空闲时触发此方法. 
  83.     /* 
  84.      * 这个方法在 IoSession 的通道进入空闲状态时调用,对于 UDP 协议来说,这个方法始终不会 被调用。 
  85.      */ 
  86.     @Override 
  87.     public void sessionIdle(IoSession session, IdleStatus status) { 
  88. //      System.out.println("连接空闲"); 
  89.     } 
  90.  
  91.     // 当接口中其他方法抛出异常未被捕获时触发此方法 
  92.     /* 
  93.      * 这个方法在你的程序、Mina 自身出现异常时回调,一般这里是关闭 IoSession。 
  94.      */ 
  95.     @Override 
  96.     public void exceptionCaught(IoSession session, Throwable cause) { 
  97.         System.out.println("其他方法抛出异常"); 
  98.     } 
  99.  

OK,首先启动Server端,然后运行Client端,观察控制台:










本文转自 xiaominghimi 51CTO博客,原文链接:http://blog.51cto.com/xiaominghimi/969791,如需转载请自行联系原作者
目录
相关文章
|
4月前
|
Apache 数据库
杨校老师课堂之基于Apache的数据库连接池DBCP的工具类开发
杨校老师课堂之基于Apache的数据库连接池DBCP的工具类开发
25 0
|
2月前
|
存储 分布式计算 物联网
Apache IoTDB进行IoT相关开发实践
当今社会,物联网技术的发展带来了许多繁琐的挑战,尤其是在数据库管理系统领域,比如实时整合海量数据、处理流中的事件以及处理数据的安全性。例如,应用于智能城市的基于物联网的交通传感器可以实时生成大量的交通数据。据估计,未来5年,物联网设备的数量将达数万亿。物联网产生大量的数据,包括流数据、时间序列数据、RFID数据、传感数据等。要有效地管理这些数据,就需要使用数据库。数据库在充分处理物联网数据方面扮演着非常重要的角色。因此,适当的数据库与适当的平台同等重要。由于物联网在世界上不同的环境中运行,选择合适的数据库变得非常重要。 原创文字,IoTDB 社区可进行使用与传播 一、什么是IoTDB 我
145 9
Apache IoTDB进行IoT相关开发实践
|
2月前
|
Java 持续交付 项目管理
Maven是一款基于Apache许可的项目管理和构建自动化工具,在Java开发中极为流行。
Maven是一款基于Apache许可的项目管理和构建自动化工具,在Java开发中极为流行。它采用项目对象模型(POM)来描述项目,简化构建流程。Maven提供依赖管理、标准构建生命周期、插件扩展等功能,支持多模块项目及版本控制。在Java Web开发中,Maven能够自动生成项目结构、管理依赖、自动化构建流程并运行多种插件任务,如代码质量检查和单元测试。遵循Maven的最佳实践,结合持续集成工具,可以显著提升开发效率和项目质量。
46 1
|
2月前
|
Apache 开发者 Java
Apache Wicket揭秘:如何巧妙利用模型与表单机制,实现Web应用高效开发?
【8月更文挑战第31天】本文深入探讨了Apache Wicket的模型与表单处理机制。Wicket作为一个组件化的Java Web框架,提供了多种模型实现,如CompoundPropertyModel等,充当组件与数据间的桥梁。文章通过示例介绍了模型创建及使用方法,并详细讲解了表单组件、提交处理及验证机制,帮助开发者更好地理解如何利用Wicket构建高效、易维护的Web应用程序。
40 0
|
3月前
|
存储 分布式计算 物联网
Apache IoTDB进行IoT相关开发实践
IoTDB是专为物联网(IoT)设计的开源时间序列数据库,提供数据收集、存储、管理和分析。它支持高效的数据写入、查询,适用于处理大规模物联网数据,包括流数据、时间序列等。IoTDB采用轻量级架构,可与Hadoop和Spark集成,支持多种存储策略,确保数据安全和高可用性。此外,它还具有InfluxDB协议适配器,允许无缝迁移和兼容InfluxDB的API和查询语法,简化物联网项目的数据管理。随着物联网设备数量的快速增长,选择适合的数据库如IoTDB对于数据管理和分析至关重要。
225 12
|
3月前
|
存储 分布式计算 物联网
Apache IoTDB进行IoT相关开发实践
The article introduces IoTDB, an open-source time-series database designed for efficient management of IoT-generated data. It addresses challenges like real-time integration of massive datasets and security. IoTDB supports high-performance storage,
132 0
Apache IoTDB进行IoT相关开发实践
|
4月前
|
存储 分布式计算 物联网
Apache IoTDB进行IoT相关开发实践
IoTDB是面向物联网的时序数据库,专注于时间序列数据管理,提供高效的数据处理、集成Hadoop和Spark生态、支持多目录存储策略。它还具有InfluxDB协议适配器,允许无缝迁移原本使用InfluxDB的业务。文章讨论了IoTDB的体系结构,包括数据文件、系统文件和预写日志文件的存储策略,并介绍了如何配置数据存储目录。此外,还提及了InfluxDB版本和查询语法的支持情况。IoTDB在物联网数据管理和分析中扮演关键角色,尤其适合处理大规模实时数据。
89 5
|
4月前
|
存储 分布式计算 物联网
Apache IoTDB进行IoT相关开发实践
物联网技术带来数据库管理挑战,特别是实时数据整合与安全性。IoTDB是一个专为时间序列数据设计的数据库,提供数据收集、存储和分析服务,适用于海量物联网数据。其架构包括数据文件、系统文件和预写日志文件的管理,并支持多目录存储策略。此外,IoTDB还开发了InfluxDB协议适配器,使得用户能无缝迁移原有InfluxDB业务。此适配器基于IoTDB的Java服务接口,转换InfluxDB的元数据格式,实现与IoTDB的数据交互。目前,适配器支持InfluxDB 1.x版本及部分查询语法。
179 5
|
3月前
|
安全 Java Apache
如何安装与使用Spring Boot 2.2.x、Spring Framework 5.2.x与Apache Shiro 1.7进行高效开发
【7月更文第1天】在现代Java Web开发领域,Spring Boot以其简化配置、快速开发的特点备受青睐。结合Spring Framework的成熟与Apache Shiro的强大权限控制能力,我们可以轻松构建安全且高效的Web应用。本篇文章将指导你如何安装并使用Spring Boot 2.2.x、Spring Framework 5.2.x以及Apache Shiro 1.7来构建一个具备基础权限管理功能的项目。
71 0
|
3月前
|
Java 应用服务中间件 API
如何安装与使用Java EE 8、Servlet 3.0及Apache Maven进行高效开发
【7月更文第1天】搭建高效Java EE 8开发环境,包括安装JDK、选择WildFly或Payara Server作为应用服务器,以及安装Apache Maven。使用Maven创建Servlet 3.0 Web项目,编写 HelloWorldServlet,打包部署到服务器,通过访问特定URL测试应用。这一流程助力开发者实现快速原型和大型项目开发。
89 0

推荐镜像

更多