Apache Mina2.x网络通信框架使用入门

简介:
  1. 开发服务器和客户端网络应用

    使用Mina开发,客户端连接器和服务端接收器有更多的相似之处,Mina在API设计的时候使用了更高的抽象如:IoService,对于创建服务器端接收器我们将关注IoAccepor,而客户端连接器则是IoConnector.




    下面图1是关于Mina的基本使用的描述:

     wKioL1SLyv-w_cAPAAPtIT5H-1E092.jpg

                                  图1

  

    这里对图1做简单的说明:

    图1由大的三部分组成,通过颜色就很容易区分和理解器表示的意思。对于服务器(Server)和客户端(Client)而言它们都需要中间最大一块的组成部分,其中包含了配置(Configure),会话数据工厂(SessionDataFactory),过滤器链(FilterChina,由多个过滤器组成),监听器组(有多个Listener组成),处理器(IoHandler);第三部分则可以看出服务器对应的是绑定(bind),客户端对应的是连接(connect)由此区分了服务器和客户端。


    说了这么多,就中间部分而言,Mina框架最大程度的解放了开发过程要进行的会话管理,会话数据管理,服务监听处理,过滤器,服务配置等的实现,其都提供了默认实现,当然可以根据使用情况实现对应的接口来自行处理。


2.过滤器

  Mina中的过滤器的顶层接口是IoFilter,其自身提供了多种过滤器,比如:LoggingFilter,ExecutorFilter,BlacklistFilter(请求地址黑名单过滤),SSLFilter,编码和解密过滤器等等(更多参考API http://mina.apache.org/mina-project/apidocs/index.html)。

 

  过滤器是Mina框架中极其重要和有价值的部分,其提供了日志,安全,权限控制,统计等过滤器,并且其是可插拔的,可以通过不同的过滤器以不同的顺序组成过滤器链将实现不同的功能。另外可以通过实现IoFilter接口或者继承IoFilterAdapter来创建更多具体业务中需要的IoFilter,当然这么做之前,可以仔细看看Mina的org.apache.mina.filter包下是否已经提供了实现。


3.编码和解码

  编码和解码作为网络应用开发必须面对的问题,而Mina作为全功能的网络通讯框架,实现对数据报文的编码和解码自然是其分内之事,具体使用者可更多关注IoHandler,即具体处理接收和发送报文的相关业务。

  在org.apache.mina.filter.codec包下有更多的关于编码和解码的实现。


  关于编码其方式有很多种,比如Protobuf,serialization(对象序列化),JSON,BASE64等,而解码则涉及到字节流分割的问题,下图2是三种常用的字节流分割的方式:

 

  wKiom1SL1AjT0vS-AADuFoFyICU175.jpg图2



   上面三种方式中2和3在Mina中都有对应的实现,比如3特殊字符结尾标记对应的实现有TextLineEncoder和TextLineDecoder,两者组成了TextLineCodecFactory; 2固定字节的head表示数据字节数有PrefixedStringEncoder和PrefixedStringDecoder,两者组成了PrefixedStringCodecFactory。

第一种固定长度字节数这种主要应用在传输命令的场景中,其传输的字节数是固定,应用中可以自己根据具体情况来实现对应的编码和解码类。


4.一个具体案例来贯穿全文

  本案例通过客户端发送短信信息到服务器,然后服务器将其短信信息的发送者和接受者对调,短信内容设置"OK",发回给客户端。

  4.1定义短信格式(protobuf):

  

1
2
3
4
5
6
7
8
9
10
11
package secondriver.mina.bo.protobuf;
  
option java_package = "secondriver.mina.bo.protobuf";
option java_outer_classname = "SmsDataProtocal";
  
message Sms {
   required string protocol         = 1;
   required string sender       = 2;
   required string receiver         = 3;
   required string content      = 4;
}

  使用protoc命令将定义个消息生成Java类(使用方式可以参考:)。


  4.2编写Sms对象的编码和解密类,这里我们直接编写编码解密工程类,其由编码和解密类组合而成。

  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package  secondriver.mina.bo.protobuf;
 
import  java.nio.charset.Charset;
import  java.nio.charset.StandardCharsets;
 
import  org.apache.mina.core.buffer.IoBuffer;
import  org.apache.mina.core.session.IoSession;
import  org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import  org.apache.mina.filter.codec.ProtocolCodecFactory;
import  org.apache.mina.filter.codec.ProtocolDecoder;
import  org.apache.mina.filter.codec.ProtocolDecoderOutput;
import  org.apache.mina.filter.codec.ProtocolEncoder;
import  org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import  org.apache.mina.filter.codec.ProtocolEncoderOutput;
 
import  com.google.protobuf.ByteString;
 
import  secondriver.mina.bo.protobuf.SmsDataProtocal.Sms;
 
public  class  SmsDataCodecFactory  implements  ProtocolCodecFactory {
 
     private  final  Charset charset = StandardCharsets.UTF_8;
 
     private  int  prefixLength =  4 ;
 
     private  int  maxDataLength =  1024 ;
 
     @Override
     public  ProtocolEncoder getEncoder(IoSession session)  throws  Exception {
 
         return  new  ProtocolEncoderAdapter() {
 
             @Override
             public  void  encode(IoSession session, Object message,
                     ProtocolEncoderOutput out)  throws  Exception {
                 Sms sms = (Sms) message;
 
                 String content = sms.toByteString().toStringUtf8();
 
                 IoBuffer buffer = IoBuffer.allocate(content.length())
                         .setAutoExpand( true );
 
                 buffer.putPrefixedString(content, prefixLength,
                         charset.newEncoder());
 
                 if  (buffer.position() > maxDataLength) {
                     throw  new  IllegalArgumentException( "Data length: "
                             + buffer.position());
                 }
 
                 buffer.flip();
                 out.write(buffer);
             }
         };
     }
 
     @Override
     public  ProtocolDecoder getDecoder(IoSession session)  throws  Exception {
         return  new  CumulativeProtocolDecoder() {
 
             @Override
             protected  boolean  doDecode(IoSession session, IoBuffer in,
                     ProtocolDecoderOutput out)  throws  Exception {
 
                 if  (in.prefixedDataAvailable(prefixLength, maxDataLength)) {
 
                     String msg = in.getPrefixedString(prefixLength,
                             charset.newDecoder());
 
                     Sms sms = Sms.parseFrom(ByteString.copyFrom(msg,
                             charset.name()));
 
                     out.write(sms);
                     return  true ;
                 }
                 return  false ;
             }
         };
     }
}

  

 4.3参见文中1端来写服务端

    创建IoAccptor对象->设置过滤器->设置IoHandler->配置->绑定到指定IP和端口

    

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package  secondriver.mina.server;
 
import  java.io.IOException;
import  java.net.InetSocketAddress;
import  java.util.concurrent.Executors;
 
import  org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import  org.apache.mina.core.service.IoAcceptor;
import  org.apache.mina.core.service.IoHandlerAdapter;
import  org.apache.mina.core.session.IdleStatus;
import  org.apache.mina.core.session.IoSession;
import  org.apache.mina.filter.codec.ProtocolCodecFilter;
import  org.apache.mina.filter.executor.ExecutorFilter;
import  org.apache.mina.filter.logging.LogLevel;
import  org.apache.mina.filter.logging.LoggingFilter;
import  org.apache.mina.transport.socket.nio.NioSocketAcceptor;
 
import  secondriver.mina.bo.protobuf.SmsDataCodecFactory;
import  secondriver.mina.bo.protobuf.SmsDataProtocal.Sms;
 
public  class  SmsServer {
 
     public  static  final  int  PORT =  9001 ;
 
     public  static  void  main(String[] args)  throws  IOException {
 
         // 接收器
         IoAcceptor acceptor =  new  NioSocketAcceptor();
 
         // 过滤器链
         DefaultIoFilterChainBuilder builder =  new  DefaultIoFilterChainBuilder();
 
         LoggingFilter loggingFilter =  new  LoggingFilter();
         loggingFilter.setExceptionCaughtLogLevel(LogLevel.DEBUG);
 
         builder.addLast( "logging" , loggingFilter);
         builder.addLast( "codec" new  ProtocolCodecFilter(
                 new  SmsDataCodecFactory()));
         builder.addLast( "threadPool" ,
                 new  ExecutorFilter(Executors.newCachedThreadPool()));
         acceptor.setFilterChainBuilder(builder);
 
         // 设置处理器IoHandler
         acceptor.setHandler( new  IoHandlerAdapter() {
 
             @Override
             public  void  messageReceived(IoSession session, Object message)
                     throws  Exception {
                 Sms sms = (Sms) message;
                 System.out.println( "客户端发来:" );
                 System.out.println(sms.toString());
 
                 // 服务器发送
                 Sms serverSms = Sms.newBuilder().setProtocol(sms.getProtocol())
                         .setContent( "OK" ).setReceiver(sms.getSender())
                         .setSender(sms.getSender()).build();
                 session.write(serverSms);
             }
         });
 
         // 配置服务器(IoAccptor)
         acceptor.getSessionConfig().setReadBufferSize( 2048 );
         acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,  10 );
         // 绑定到指定IP和端口
         acceptor.bind( new  InetSocketAddress(PORT));
     }
}


 4.4 参见文中1端编写客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package  secondriver.mina.client;
 
import  java.net.InetSocketAddress;
import  java.util.Scanner;
import  java.util.concurrent.Executors;
import  java.util.concurrent.TimeUnit;
 
import  org.apache.mina.core.RuntimeIoException;
import  org.apache.mina.core.future.ConnectFuture;
import  org.apache.mina.core.service.IoConnector;
import  org.apache.mina.core.service.IoHandlerAdapter;
import  org.apache.mina.core.session.IoSession;
import  org.apache.mina.filter.codec.ProtocolCodecFilter;
import  org.apache.mina.filter.executor.ExecutorFilter;
import  org.apache.mina.transport.socket.nio.NioSocketConnector;
 
import  secondriver.mina.bo.protobuf.SmsDataCodecFactory;
import  secondriver.mina.bo.protobuf.SmsDataProtocal.Sms;
 
public  class  SmsClient {
     private  static  InetSocketAddress server =  new  InetSocketAddress(
             "127.0.0.1" 9001 );
 
     public  static  void  main(String[] args)  throws  InterruptedException {
 
         // 客户端连接器
         IoConnector connector =  new  NioSocketConnector();
 
         // 过滤器
         connector.getFilterChain().addLast( "codec" ,
                 new  ProtocolCodecFilter( new  SmsDataCodecFactory()));
         connector.getFilterChain().addLast( "threadPool" ,
                 new  ExecutorFilter(Executors.newCachedThreadPool()));
 
         // 处理器
         connector.setHandler( new  IoHandlerAdapter() {
 
             @Override
             public  void  sessionCreated(IoSession session)  throws  Exception {
             }
 
             @Override
             public  void  messageReceived(IoSession session, Object message)
                     throws  Exception {
                 System.out.println( "服务器响应:" );
                 System.out.println(((Sms) message).toString());
             }
 
         });
 
         // 建立会话Session
         IoSession session =  null ;
         while  ( true ) {
             try  {
                 ConnectFuture future = connector.connect(server);
                 future.awaitUninterruptibly( 100 , TimeUnit.SECONDS);
                 session = future.getSession();
                 if  ( null  != session) {
                     break ;
                 }
             catch  (RuntimeIoException e) {
                 System.err.println( "Failed to connect with "
                         + server.toString());
                 e.printStackTrace();
                 try  {
                     Thread.sleep( 5000 );
                 catch  (InterruptedException e1) {
                     e1.printStackTrace();
                 }
             }
         }
 
         // 客户端输入
         try  (Scanner scanner =  new  Scanner(System.in);) {
             while  ( true ) {
                 String sender =  "1814453211" ;
                 System.out.println( "请输入收信息手机号:" );
                 String receiver = scanner.nextLine();
                 System.out.println( "请输入信息内容:" );
                 String content = scanner.nextLine();
 
                 Sms sms = Sms.newBuilder()
                         .setProtocol( "ip.weixin.com TC-C/2.0" )
                         .setSender(sender).setReceiver(receiver)
                         .setContent(content).build();
 
                 session.write(sms);
 
                 Thread.sleep( 10000 );
                 System.out.println( "是否继续,回车继续 , q or quit 退出:" );
                 String line = scanner.nextLine();
                 if  (line.trim().equalsIgnoreCase( "q" )
                         || line.trim().equalsIgnoreCase( "quit" )) {
                     break ;
                 }
             }
         }
         session.close( false );
         connector.dispose();
     }
}


  4.5 启动服务,启动客户端

      图3是运行的结果:


客户端信息:wKiom1SL28eAd-5kAAC_jlM5AoE362.jpg



服务器信息:wKioL1SL3GHQWZp0AABxyEjAnvM745.jpg

  

 图3


   说明:上面客户端和服务器端的关于IoHandler直接使用了匿名类的方式对数据的接收做了相应的简单处理。Sms对象转换成UTF-8编码的字符串,采用了3端中编码和解密的第2中方式,并且传输的数据最大长度为1024byte(1k)。另外,Potobuf-java和Mina集成,mina3.x提供了对protobuf定义的消息的编码和解码提供了实现支持。


   为了需要更多关注Mina3.x,另外Netty的发展势头正旺,netty有种子承父业的感觉,也值得拥有!



本文转自 secondriver 51CTO博客,原文链接:http://blog.51cto.com/aiilive/1589561,如需转载请自行联系原作者

相关文章
|
1月前
|
监控 网络协议 Java
Linux 网络编程从入门到进阶 学习指南
在上一篇文章中,我们探讨了 Linux 系统编程的诸多基础构件,包括文件操作、进程管理和线程同步等,接下来,我们将视野扩展到网络世界。在这个新篇章里,我们要让应用跳出单机限制,学会在网络上跨机器交流信息。
Linux 网络编程从入门到进阶 学习指南
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
482 5
|
3月前
|
数据采集 存储 数据处理
Scrapy:Python网络爬虫框架的利器
在当今信息时代,网络数据已成为企业和个人获取信息的重要途径。而Python网络爬虫框架Scrapy则成为了网络爬虫工程师的必备工具。本文将介绍Scrapy的概念与实践,以及其在数据采集和处理过程中的应用。
23 1
|
12天前
|
网络协议 Java API
Python网络编程基础(Socket编程)Twisted框架简介
【4月更文挑战第12天】在网络编程的实践中,除了使用基本的Socket API之外,还有许多高级的网络编程库可以帮助我们更高效地构建复杂和健壮的网络应用。这些库通常提供了异步IO、事件驱动、协议实现等高级功能,使得开发者能够专注于业务逻辑的实现,而不用过多关注底层的网络细节。
|
存储 设计模式 网络协议
Netty网络框架(一)
Netty网络框架
31 1
|
1月前
|
数据采集 前端开发 Java
利用Scala与Apache HttpClient实现网络音频流的抓取
利用Scala与Apache HttpClient实现网络音频流的抓取
|
1月前
|
监控 测试技术 Linux
性能工具之 Apache Bench 入门使用
ab 全称为:apache bench,ab 为小型压力工具,对于在 Linux 中简单压测 HTTP 接口轻巧灵活。
23 1
|
1月前
|
SQL 分布式计算 HIVE
Apache Hudi入门指南(含代码示例)
Apache Hudi入门指南(含代码示例)
58 0
|
1月前
|
安全 网络安全 数据安全/隐私保护
网络拓扑结构入门快速介绍
网络拓扑结构入门快速介绍
|
1月前
|
网络协议 安全 网络安全
网络基础与通信原理:构建数字世界的框架
网络基础与通信原理:构建数字世界的框架
46 1

推荐镜像

更多