Apache Mina2.x网络通信框架使用入门

简介:
  1. 开发服务器和客户端网络应用

    使用Mina开发,客户端连接器和服务端接收器有更多的相似之处,Mina在API设计的时候使用了更高的抽象如:IoService,对于创建服务器端接收器我们将关注IoAccepor,而客户端连接器则是IoConnector.




    下面图1是关于Mina的基本使用的描述:

     wKioL1SLyv-w_cAPAAPtIT5H-1E092.jpg

                                  图1

  

    这里对图1做简单的说明:

    图1由大的三部分组成,通过颜色就很容易区分和理解器表示的意思。对于服务器(Server)和客户端(Client)而言它们都需要中间最大一块的组成部分,其中包含了配置(Configure),会话数据工厂(SessionDataFactory),过滤器链(FilterChina,由多个过滤器组成),监听器组(有多个Listener组成),处理器(IoHandler);第三部分则可以看出服务器对应的是绑定(bind),客户端对应的是连接(connect)由此区分了服务器和客户端。


    说了这么多,就中间部分而言,Mina框架最大程度的解放了开发过程要进行的会话管理,会话数据管理,服务监听处理,过滤器,服务配置等的实现,其都提供了默认实现,当然可以根据使用情况实现对应的接口来自行处理。


2.过滤器

  Mina中的过滤器的顶层接口是IoFilter,其自身提供了多种过滤器,比如:LoggingFilter,ExecutorFilter,BlacklistFilter(请求地址黑名单过滤),SSLFilter,编码和解密过滤器等等(更多参考API http://mina.apache.org/mina-project/apidocs/index.html)。

 

  过滤器是Mina框架中极其重要和有价值的部分,其提供了日志,安全,权限控制,统计等过滤器,并且其是可插拔的,可以通过不同的过滤器以不同的顺序组成过滤器链将实现不同的功能。另外可以通过实现IoFilter接口或者继承IoFilterAdapter来创建更多具体业务中需要的IoFilter,当然这么做之前,可以仔细看看Mina的org.apache.mina.filter包下是否已经提供了实现。


3.编码和解码

  编码和解码作为网络应用开发必须面对的问题,而Mina作为全功能的网络通讯框架,实现对数据报文的编码和解码自然是其分内之事,具体使用者可更多关注IoHandler,即具体处理接收和发送报文的相关业务。

  在org.apache.mina.filter.codec包下有更多的关于编码和解码的实现。


  关于编码其方式有很多种,比如Protobuf,serialization(对象序列化),JSON,BASE64等,而解码则涉及到字节流分割的问题,下图2是三种常用的字节流分割的方式:

 

  wKiom1SL1AjT0vS-AADuFoFyICU175.jpg图2



   上面三种方式中2和3在Mina中都有对应的实现,比如3特殊字符结尾标记对应的实现有TextLineEncoder和TextLineDecoder,两者组成了TextLineCodecFactory; 2固定字节的head表示数据字节数有PrefixedStringEncoder和PrefixedStringDecoder,两者组成了PrefixedStringCodecFactory。

第一种固定长度字节数这种主要应用在传输命令的场景中,其传输的字节数是固定,应用中可以自己根据具体情况来实现对应的编码和解码类。


4.一个具体案例来贯穿全文

  本案例通过客户端发送短信信息到服务器,然后服务器将其短信信息的发送者和接受者对调,短信内容设置"OK",发回给客户端。

  4.1定义短信格式(protobuf):

  

1
2
3
4
5
6
7
8
9
10
11
package secondriver.mina.bo.protobuf;
  
option java_package = "secondriver.mina.bo.protobuf";
option java_outer_classname = "SmsDataProtocal";
  
message Sms {
   required string protocol         = 1;
   required string sender       = 2;
   required string receiver         = 3;
   required string content      = 4;
}

  使用protoc命令将定义个消息生成Java类(使用方式可以参考:)。


  4.2编写Sms对象的编码和解密类,这里我们直接编写编码解密工程类,其由编码和解密类组合而成。

  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package  secondriver.mina.bo.protobuf;
 
import  java.nio.charset.Charset;
import  java.nio.charset.StandardCharsets;
 
import  org.apache.mina.core.buffer.IoBuffer;
import  org.apache.mina.core.session.IoSession;
import  org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import  org.apache.mina.filter.codec.ProtocolCodecFactory;
import  org.apache.mina.filter.codec.ProtocolDecoder;
import  org.apache.mina.filter.codec.ProtocolDecoderOutput;
import  org.apache.mina.filter.codec.ProtocolEncoder;
import  org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import  org.apache.mina.filter.codec.ProtocolEncoderOutput;
 
import  com.google.protobuf.ByteString;
 
import  secondriver.mina.bo.protobuf.SmsDataProtocal.Sms;
 
public  class  SmsDataCodecFactory  implements  ProtocolCodecFactory {
 
     private  final  Charset charset = StandardCharsets.UTF_8;
 
     private  int  prefixLength =  4 ;
 
     private  int  maxDataLength =  1024 ;
 
     @Override
     public  ProtocolEncoder getEncoder(IoSession session)  throws  Exception {
 
         return  new  ProtocolEncoderAdapter() {
 
             @Override
             public  void  encode(IoSession session, Object message,
                     ProtocolEncoderOutput out)  throws  Exception {
                 Sms sms = (Sms) message;
 
                 String content = sms.toByteString().toStringUtf8();
 
                 IoBuffer buffer = IoBuffer.allocate(content.length())
                         .setAutoExpand( true );
 
                 buffer.putPrefixedString(content, prefixLength,
                         charset.newEncoder());
 
                 if  (buffer.position() > maxDataLength) {
                     throw  new  IllegalArgumentException( "Data length: "
                             + buffer.position());
                 }
 
                 buffer.flip();
                 out.write(buffer);
             }
         };
     }
 
     @Override
     public  ProtocolDecoder getDecoder(IoSession session)  throws  Exception {
         return  new  CumulativeProtocolDecoder() {
 
             @Override
             protected  boolean  doDecode(IoSession session, IoBuffer in,
                     ProtocolDecoderOutput out)  throws  Exception {
 
                 if  (in.prefixedDataAvailable(prefixLength, maxDataLength)) {
 
                     String msg = in.getPrefixedString(prefixLength,
                             charset.newDecoder());
 
                     Sms sms = Sms.parseFrom(ByteString.copyFrom(msg,
                             charset.name()));
 
                     out.write(sms);
                     return  true ;
                 }
                 return  false ;
             }
         };
     }
}

  

 4.3参见文中1端来写服务端

    创建IoAccptor对象->设置过滤器->设置IoHandler->配置->绑定到指定IP和端口

    

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package  secondriver.mina.server;
 
import  java.io.IOException;
import  java.net.InetSocketAddress;
import  java.util.concurrent.Executors;
 
import  org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import  org.apache.mina.core.service.IoAcceptor;
import  org.apache.mina.core.service.IoHandlerAdapter;
import  org.apache.mina.core.session.IdleStatus;
import  org.apache.mina.core.session.IoSession;
import  org.apache.mina.filter.codec.ProtocolCodecFilter;
import  org.apache.mina.filter.executor.ExecutorFilter;
import  org.apache.mina.filter.logging.LogLevel;
import  org.apache.mina.filter.logging.LoggingFilter;
import  org.apache.mina.transport.socket.nio.NioSocketAcceptor;
 
import  secondriver.mina.bo.protobuf.SmsDataCodecFactory;
import  secondriver.mina.bo.protobuf.SmsDataProtocal.Sms;
 
public  class  SmsServer {
 
     public  static  final  int  PORT =  9001 ;
 
     public  static  void  main(String[] args)  throws  IOException {
 
         // 接收器
         IoAcceptor acceptor =  new  NioSocketAcceptor();
 
         // 过滤器链
         DefaultIoFilterChainBuilder builder =  new  DefaultIoFilterChainBuilder();
 
         LoggingFilter loggingFilter =  new  LoggingFilter();
         loggingFilter.setExceptionCaughtLogLevel(LogLevel.DEBUG);
 
         builder.addLast( "logging" , loggingFilter);
         builder.addLast( "codec" new  ProtocolCodecFilter(
                 new  SmsDataCodecFactory()));
         builder.addLast( "threadPool" ,
                 new  ExecutorFilter(Executors.newCachedThreadPool()));
         acceptor.setFilterChainBuilder(builder);
 
         // 设置处理器IoHandler
         acceptor.setHandler( new  IoHandlerAdapter() {
 
             @Override
             public  void  messageReceived(IoSession session, Object message)
                     throws  Exception {
                 Sms sms = (Sms) message;
                 System.out.println( "客户端发来:" );
                 System.out.println(sms.toString());
 
                 // 服务器发送
                 Sms serverSms = Sms.newBuilder().setProtocol(sms.getProtocol())
                         .setContent( "OK" ).setReceiver(sms.getSender())
                         .setSender(sms.getSender()).build();
                 session.write(serverSms);
             }
         });
 
         // 配置服务器(IoAccptor)
         acceptor.getSessionConfig().setReadBufferSize( 2048 );
         acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,  10 );
         // 绑定到指定IP和端口
         acceptor.bind( new  InetSocketAddress(PORT));
     }
}


 4.4 参见文中1端编写客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package  secondriver.mina.client;
 
import  java.net.InetSocketAddress;
import  java.util.Scanner;
import  java.util.concurrent.Executors;
import  java.util.concurrent.TimeUnit;
 
import  org.apache.mina.core.RuntimeIoException;
import  org.apache.mina.core.future.ConnectFuture;
import  org.apache.mina.core.service.IoConnector;
import  org.apache.mina.core.service.IoHandlerAdapter;
import  org.apache.mina.core.session.IoSession;
import  org.apache.mina.filter.codec.ProtocolCodecFilter;
import  org.apache.mina.filter.executor.ExecutorFilter;
import  org.apache.mina.transport.socket.nio.NioSocketConnector;
 
import  secondriver.mina.bo.protobuf.SmsDataCodecFactory;
import  secondriver.mina.bo.protobuf.SmsDataProtocal.Sms;
 
public  class  SmsClient {
     private  static  InetSocketAddress server =  new  InetSocketAddress(
             "127.0.0.1" 9001 );
 
     public  static  void  main(String[] args)  throws  InterruptedException {
 
         // 客户端连接器
         IoConnector connector =  new  NioSocketConnector();
 
         // 过滤器
         connector.getFilterChain().addLast( "codec" ,
                 new  ProtocolCodecFilter( new  SmsDataCodecFactory()));
         connector.getFilterChain().addLast( "threadPool" ,
                 new  ExecutorFilter(Executors.newCachedThreadPool()));
 
         // 处理器
         connector.setHandler( new  IoHandlerAdapter() {
 
             @Override
             public  void  sessionCreated(IoSession session)  throws  Exception {
             }
 
             @Override
             public  void  messageReceived(IoSession session, Object message)
                     throws  Exception {
                 System.out.println( "服务器响应:" );
                 System.out.println(((Sms) message).toString());
             }
 
         });
 
         // 建立会话Session
         IoSession session =  null ;
         while  ( true ) {
             try  {
                 ConnectFuture future = connector.connect(server);
                 future.awaitUninterruptibly( 100 , TimeUnit.SECONDS);
                 session = future.getSession();
                 if  ( null  != session) {
                     break ;
                 }
             catch  (RuntimeIoException e) {
                 System.err.println( "Failed to connect with "
                         + server.toString());
                 e.printStackTrace();
                 try  {
                     Thread.sleep( 5000 );
                 catch  (InterruptedException e1) {
                     e1.printStackTrace();
                 }
             }
         }
 
         // 客户端输入
         try  (Scanner scanner =  new  Scanner(System.in);) {
             while  ( true ) {
                 String sender =  "1814453211" ;
                 System.out.println( "请输入收信息手机号:" );
                 String receiver = scanner.nextLine();
                 System.out.println( "请输入信息内容:" );
                 String content = scanner.nextLine();
 
                 Sms sms = Sms.newBuilder()
                         .setProtocol( "ip.weixin.com TC-C/2.0" )
                         .setSender(sender).setReceiver(receiver)
                         .setContent(content).build();
 
                 session.write(sms);
 
                 Thread.sleep( 10000 );
                 System.out.println( "是否继续,回车继续 , q or quit 退出:" );
                 String line = scanner.nextLine();
                 if  (line.trim().equalsIgnoreCase( "q" )
                         || line.trim().equalsIgnoreCase( "quit" )) {
                     break ;
                 }
             }
         }
         session.close( false );
         connector.dispose();
     }
}


  4.5 启动服务,启动客户端

      图3是运行的结果:


客户端信息:wKiom1SL28eAd-5kAAC_jlM5AoE362.jpg



服务器信息:wKioL1SL3GHQWZp0AABxyEjAnvM745.jpg

  

 图3


   说明:上面客户端和服务器端的关于IoHandler直接使用了匿名类的方式对数据的接收做了相应的简单处理。Sms对象转换成UTF-8编码的字符串,采用了3端中编码和解密的第2中方式,并且传输的数据最大长度为1024byte(1k)。另外,Potobuf-java和Mina集成,mina3.x提供了对protobuf定义的消息的编码和解码提供了实现支持。


   为了需要更多关注Mina3.x,另外Netty的发展势头正旺,netty有种子承父业的感觉,也值得拥有!



本文转自 secondriver 51CTO博客,原文链接:http://blog.51cto.com/aiilive/1589561,如需转载请自行联系原作者

相关文章
|
15天前
|
数据采集 存储 JSON
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第27天】本文介绍了Python网络爬虫Scrapy框架的实战应用与技巧。首先讲解了如何创建Scrapy项目、定义爬虫、处理JSON响应、设置User-Agent和代理,以及存储爬取的数据。通过具体示例,帮助读者掌握Scrapy的核心功能和使用方法,提升数据采集效率。
59 6
|
23天前
|
机器学习/深度学习 人工智能
类人神经网络再进一步!DeepMind最新50页论文提出AligNet框架:用层次化视觉概念对齐人类
【10月更文挑战第18天】这篇论文提出了一种名为AligNet的框架,旨在通过将人类知识注入神经网络来解决其与人类认知的不匹配问题。AligNet通过训练教师模型模仿人类判断,并将人类化的结构和知识转移至预训练的视觉模型中,从而提高模型在多种任务上的泛化能力和稳健性。实验结果表明,人类对齐的模型在相似性任务和出分布情况下表现更佳。
54 3
|
5天前
|
消息中间件 编解码 网络协议
Netty从入门到精通:高性能网络编程的进阶之路
【11月更文挑战第17天】Netty是一个基于Java NIO(Non-blocking I/O)的高性能、异步事件驱动的网络应用框架。使用Netty,开发者可以快速、高效地开发可扩展的网络服务器和客户端程序。本文将带您从Netty的背景、业务场景、功能点、解决问题的关键、底层原理实现,到编写一个详细的Java示例,全面了解Netty,帮助您从入门到精通。
24 0
|
8天前
|
存储 安全 网络安全
网络安全法律框架:全球视角下的合规性分析
网络安全法律框架:全球视角下的合规性分析
20 1
|
11天前
|
机器学习/深度学习 自然语言处理 前端开发
前端神经网络入门:Brain.js - 详细介绍和对比不同的实现 - CNN、RNN、DNN、FFNN -无需准备环境打开浏览器即可测试运行-支持WebGPU加速
本文介绍了如何使用 JavaScript 神经网络库 **Brain.js** 实现不同类型的神经网络,包括前馈神经网络(FFNN)、深度神经网络(DNN)和循环神经网络(RNN)。通过简单的示例和代码,帮助前端开发者快速入门并理解神经网络的基本概念。文章还对比了各类神经网络的特点和适用场景,并简要介绍了卷积神经网络(CNN)的替代方案。
|
16天前
|
数据采集 前端开发 中间件
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第26天】Python是一种强大的编程语言,在数据抓取和网络爬虫领域应用广泛。Scrapy作为高效灵活的爬虫框架,为开发者提供了强大的工具集。本文通过实战案例,详细解析Scrapy框架的应用与技巧,并附上示例代码。文章介绍了Scrapy的基本概念、创建项目、编写简单爬虫、高级特性和技巧等内容。
39 4
|
16天前
|
网络协议 物联网 API
Python网络编程:Twisted框架的异步IO处理与实战
【10月更文挑战第26天】Python 是一门功能强大且易于学习的编程语言,Twisted 框架以其事件驱动和异步IO处理能力,在网络编程领域独树一帜。本文深入探讨 Twisted 的异步IO机制,并通过实战示例展示其强大功能。示例包括创建简单HTTP服务器,展示如何高效处理大量并发连接。
39 1
|
23天前
|
Java
[Java]Socket套接字(网络编程入门)
本文介绍了基于Java Socket实现的一对一和多对多聊天模式。一对一模式通过Server和Client类实现简单的消息收发;多对多模式则通过Server类维护客户端集合,并使用多线程实现实时消息广播。文章旨在帮助读者理解Socket的基本原理和应用。
19 1
|
4天前
|
网络协议 Unix Linux
精选2款C#/.NET开源且功能强大的网络通信框架
精选2款C#/.NET开源且功能强大的网络通信框架
|
4天前
|
网络协议 网络安全 Apache
一个整合性、功能丰富的.NET网络通信框架
一个整合性、功能丰富的.NET网络通信框架

推荐镜像

更多