转载自【黑米GameDev街区】 原文链接: http://www.himigame.com/apache-mina/839.html
在上一篇博文中已经简单介绍过“过滤器”的概念,那么在Mina 中的协议编解码器通过过滤器 ProtocolCodecFilter 构造,这个过滤器的构造方法需 要一个 ProtocolCodecFactory,这从前面注册 TextLineCodecFactory 的代码就可以看出来。 ProtocolCodecFactory 中有如下两个方法:
public interface ProtocolCodecFactory {
ProtocolEncoder getEncoder(IoSession session) throws Exception;
ProtocolDecoder getDecoder(IoSession session) throws Exception;
}
因此,构建一个 ProtocolCodecFactory 需要 ProtocolEncoder、ProtocolDecoder 两个实例。你可能要问 JAVA 对象和二进制数据之间如何转换呢?这个要依据具体的通信协议,也就是 Server 端要和 Client 端约定网络传输的数据是什么样的格式,譬如:第一个字节表示数据 长度,第二个字节是数据类型,后面的就是真正的数据(有可能是文字、有可能是图片等等), 然后你可以依据长度从第三个字节向后读,直到读取到指定第一个字节指定长度的数据。
简单的说,HTTP 协议就是一种浏览器与 Web 服务器之间约定好的通信协议,双方按照指定 的协议编解码数据。我们再直观一点儿说,前面一直使用的 TextLine 编解码器就是在读取 网络上传递过来的数据时,只要发现哪个字节里存放的是 ASCII 的 10、13 字符(\r、\n), 就认为之前的字节就是一个字符串(默认使用 UTF-8 编码)。
以上所说的就是各种协议实际上就是网络七层结构中的应用层协议,它位于网络层(IP)、 传输层(TCP)之上,Mina 的协议编解码器就是让你实现一套自己的应用层协议栈。
首先我们创建一个传递的对象类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
package
com
.
entity
;
import
javax
.
persistence
.
Column
;
import
javax
.
persistence
.
Entity
;
import
javax
.
persistence
.
GeneratedValue
;
import
javax
.
persistence
.
GenerationType
;
import
javax
.
persistence
.
Id
;
import
javax
.
persistence
.
Table
;
import
org
.
hibernate
.
annotations
.
Index
;
/**
* @author Himi
*/
@
Entity
@
Table
(
name
=
"playerAccount"
)
public
class
PlayerAccount_Entity
{
private
int
id
;
private
String
name
;
private
String
emailAdress
;
private
int
sex
;
// 0=man 1=woman
@
Id
@
Column
(
name
=
"playerAccountID"
)
@
GeneratedValue
(
strategy
=
GenerationType
.
AUTO
)
public
int
getId
(
)
{
return
id
;
}
public
void
setId
(
int
id
)
{
this
.
id
=
id
;
}
@
Index
(
name
=
"nameIndex"
)
public
String
getName
(
)
{
return
name
;
}
public
void
setName
(
String
name
)
{
this
.
name
=
name
;
}
public
String
getEmailAdress
(
)
{
return
emailAdress
;
}
public
void
setEmailAdress
(
String
emailAdress
)
{
this
.
emailAdress
=
emailAdress
;
}
public
int
getSex
(
)
{
return
sex
;
}
public
void
setSex
(
int
sex
)
{
this
.
sex
=
sex
;
}
}
|
2. 创建一个编码类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
package
com
.
protocol
;
/**
* @author Himi
*/
import
java
.
nio
.
charset
.
Charset
;
import
java
.
nio
.
charset
.
CharsetEncoder
;
import
org
.
apache
.
mina
.
core
.
buffer
.
IoBuffer
;
import
org
.
apache
.
mina
.
core
.
session
.
IoSession
;
import
org
.
apache
.
mina
.
filter
.
codec
.
ProtocolEncoderAdapter
;
import
org
.
apache
.
mina
.
filter
.
codec
.
ProtocolEncoderOutput
;
import
com
.
entity
.
PlayerAccount_Entity
;
public
class
HEncoder
extends
ProtocolEncoderAdapter
{
private
final
Charset
charset
;
public
HEncoder
(
Charset
charset
)
{
this
.
charset
=
charset
;
}
@
Override
public
void
encode
(
IoSession
arg0
,
Object
arg1
,
ProtocolEncoderOutput
arg2
)
throws
Exception
{
CharsetEncoder
ce
=
charset
.
newEncoder
(
)
;
PlayerAccount_Entity
paEntity
=
(
PlayerAccount_Entity
)
arg1
;
String
name
=
paEntity
.
getName
(
)
;
IoBuffer
buffer
=
IoBuffer
.
allocate
(
100
)
.
setAutoExpand
(
true
)
;
buffer
.
putString
(
name
,
ce
)
;
buffer
.
flip
(
)
;
arg2
.
write
(
buffer
)
;
}
}
|
在 Mina 中编写编码器可以实现 ProtocolEncoder,其中有 encode()、dispose()两个方法需 要实现。这里的 dispose()方法用于在销毁编码器时释放关联的资源,由于这个方法一般我 们并不关心,所以通常我们直接继承适配器 ProtocolEncoderAdapter。
3.创建一个解码类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
package
com
.
protocol
;
/**
* @author Himi
*/
import
java
.
nio
.
charset
.
Charset
;
import
java
.
nio
.
charset
.
CharsetDecoder
;
import
org
.
apache
.
mina
.
core
.
buffer
.
IoBuffer
;
import
org
.
apache
.
mina
.
core
.
session
.
IoSession
;
import
org
.
apache
.
mina
.
filter
.
codec
.
CumulativeProtocolDecoder
;
import
org
.
apache
.
mina
.
filter
.
codec
.
ProtocolDecoderOutput
;
import
com
.
entity
.
PlayerAccount_Entity
;
public
class
HDecoder
extends
CumulativeProtocolDecoder
{
private
final
Charset
charset
;
public
HDecoder
(
Charset
charset
)
{
this
.
charset
=
charset
;
}
@
Override
protected
boolean
doDecode
(
IoSession
arg0
,
IoBuffer
arg1
,
ProtocolDecoderOutput
arg2
)
throws
Exception
{
CharsetDecoder
cd
=
charset
.
newDecoder
(
)
;
String
name
=
arg1
.
getString
(
cd
)
;
PlayerAccount_Entity
paEntity
=
new
PlayerAccount_Entity
(
)
;
paEntity
.
setName
(
name
)
;
arg2
.
write
(
paEntity
)
;
return
true
;
}
}
|
在 Mina 中编写解码器,可以实现 ProtocolDecoder 接口,其中有 decode()、finishDecode()、 dispose()三个方法。这里的 finishDecode()方法可以用于处理在 IoSession 关闭时剩余的 读取数据,一般这个方法并不会被使用到,除非协议中未定义任何标识数据什么时候截止 的约定,譬如:Http 响应的 Content-Length 未设定,那么在你认为读取完数据后,关闭 TCP 连接(IoSession 的关闭)后,就可以调用这个方法处理剩余的数据,当然你也可以忽略调 剩余的数据。同样的,一般情况下,我们只需要继承适配器 ProtocolDecoderAdapter,关 注 decode()方法即可。
但前面说过解码器相对编码器来说,最麻烦的是数据发送过来的规模,以聊天室为例,一个 TCP 连接建立之后,那么隔一段时间就会有聊天内容发送过来,也就是 decode()方法会被往 复调用,这样处理起来就会非常麻烦。那么 Mina 中幸好提供了 CumulativeProtocolDecoder 类,从名字上可以看出累积性的协议解码器,也就是说只要有数据发送过来,这个类就会去 读取数据,然后累积到内部的 IoBuffer 缓冲区,但是具体的拆包(把累积到缓冲区的数据 解码为 JAVA 对象)交由子类的 doDecode()方法完成,实际上 CumulativeProtocolDecoder 就是在 decode()反复的调用暴漏给子类实现的 doDecode()方法。
具体执行过程如下所示:
A. 你的 doDecode()方法返回 true 时,CumulativeProtocolDecoder 的 decode()方法会首先判断你是否在 doDecode()方法中从内部的 IoBuffer 缓冲区读取了数据,如果没有,ce); buffer.putString(smsContent, ce);buffer.flip();则会抛出非法的状态异常,也就是你的 doDecode()方法返回 true 就表示你已经消费了 本次数据(相当于聊天室中一个完整的消息已经读取完毕),进一步说,也就是此时你 必须已经消费过内部的 IoBuffer 缓冲区的数据(哪怕是消费了一个字节的数据)。如果 验证过通过,那么 CumulativeProtocolDecoder 会检查缓冲区内是否还有数据未读取, 如果有就继续调用 doDecode()方法,没有就停止对 doDecode()方法的调用,直到有新 的数据被缓冲。
B. 当你的 doDecode()方法返回 false 时,CumulativeProtocolDecoder 会停止对 doDecode() 方法的调用,但此时如果本次数据还有未读取完的,就将含有剩余数据的 IoBuffer 缓 冲区保存到 IoSession 中,以便下一次数据到来时可以从 IoSession 中提取合并。如果 发现本次数据全都读取完毕,则清空 IoBuffer 缓冲区。简而言之,当你认为读取到的数据已经够解码了,那么就返回 true,否则就返回 false。这 个 CumulativeProtocolDecoder 其实最重要的工作就是帮你完成了数据的累积,因为这个工 作是很烦琐的。
4.创建一个编解码工厂类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
package
com
.
protocol
;
import
java
.
nio
.
charset
.
Charset
;
import
org
.
apache
.
mina
.
core
.
session
.
IoSession
;
import
org
.
apache
.
mina
.
filter
.
codec
.
ProtocolCodecFactory
;
import
org
.
apache
.
mina
.
filter
.
codec
.
ProtocolDecoder
;
import
org
.
apache
.
mina
.
filter
.
codec
.
ProtocolEncoder
;
/**
*
* @author Himi
*
*/
public
class
HCoderFactory
implements
ProtocolCodecFactory
{
private
final
HEncoder
encoder
;
private
final
HDecoder
decoder
;
public
HCoderFactory
(
)
{
this
(
Charset
.
defaultCharset
(
)
)
;
}
public
HCoderFactory
(
Charset
charSet
)
{
this
.
encoder
=
new
HEncoder
(
charSet
)
;
this
.
decoder
=
new
HDecoder
(
charSet
)
;
}
@
Override
public
ProtocolDecoder
getDecoder
(
IoSession
arg0
)
throws
Exception
{
// TODO Auto-generated method stub
return
decoder
;
}
@
Override
public
ProtocolEncoder
getEncoder
(
IoSession
arg0
)
throws
Exception
{
// TODO Auto-generated method stub
return
encoder
;
}
}
|
这个工厂类就是包装了编码器、解码器,通过接口中的 getEncoder()、getDecoder() 方法向 ProtocolCodecFilter 过滤器返回编解码器实例,以便在过滤器中对数据进行编解码 处理。
5. 以上3个编解码有关的类在Server与Client读需要有,那么同时我们创建好了自定义的编解码有关的类后,我们设置Server和Client的编码工厂为我们自定义的编码工厂类:
1
|
DefaultIoFilterChainBuilder
chain
=
acceptor
.
getFilterChain
(
)
;
chain
.
addLast
(
"mycoder"
,
new
ProtocolCodecFilter
(
new
HCoderFactory
(
Charset
.
forName
(
"UTF-8"
)
)
)
)
;
|
6.书写测试的消息处理器类Client和Server端;
Client端消息处理器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
/**
* @author Himi
*/
import
org
.
apache
.
mina
.
core
.
service
.
IoHandlerAdapter
;
import
org
.
apache
.
mina
.
core
.
session
.
IoSession
;
import
com
.
protocol
.
PlayerAccount_Entity
;
public
class
ClientMainHanlder
extends
IoHandlerAdapter
{
// 当一个客端端连结到服务器后
@
Override
public
void
sessionOpened
(
IoSession
session
)
throws
Exception
{
PlayerAccount_Entity
ho
=
new
PlayerAccount_Entity
(
)
;
ho
.
setName
(
"李华明 xiaominghimi@gmail.com"
)
;
session
.
write
(
ho
)
;
}
// 当一个客户端关闭时
@
Override
public
void
sessionClosed
(
IoSession
session
)
{
System
.
out
.
println
(
"I'm Client && I closed!"
)
;
}
// 当服务器端发送的消息到达时:
@
Override
public
void
messageReceived
(
IoSession
session
,
Object
message
)
throws
Exception
{
PlayerAccount_Entity
ho
=
(
PlayerAccount_Entity
)
message
;
System
.
out
.
println
(
"Server Say:name:"
+
ho
.
getName
(
)
)
;
}
}
|
Server端消息处理器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
/**
* @author Himi
*/
import
org
.
apache
.
mina
.
core
.
service
.
IoHandlerAdapter
;
import
org
.
apache
.
mina
.
core
.
session
.
IdleStatus
;
import
org
.
apache
.
mina
.
core
.
session
.
IoSession
;
import
com
.
entity
.
PlayerAccount_Entity
;
import
com
.
sessionUtilities
.
HibernateUtil
;
public
class
MainHanlder
extends
IoHandlerAdapter
{
private
int
count
=
0
;
// 当一个新客户端连接后触发此方法.
/*
* 这个方法当一个 Session 对象被创建的时候被调用。对于 TCP 连接来说,连接被接受的时候 调用,但要注意此时 TCP
* 连接并未建立,此方法仅代表字面含义,也就是连接的对象 IoSession 被创建完毕的时候,回调这个方法。 对于 UDP
* 来说,当有数据包收到的时候回调这个方法,因为 UDP 是无连接的。
*/
public
void
sessionCreated
(
IoSession
session
)
{
System
.
out
.
println
(
"新客户端连接"
)
;
}
// 当一个客端端连结进入时 @Override
/*
* 这个方法在连接被打开时调用,它总是在 sessionCreated()方法之后被调用。对于 TCP 来
* 说,它是在连接被建立之后调用,你可以在这里执行一些认证操作、发送数据等。 对于 UDP 来说,这个方法与
* sessionCreated()没什么区别,但是紧跟其后执行。如果你每 隔一段时间,发送一些数据,那么
* sessionCreated()方法只会在第一次调用,但是 sessionOpened()方法每次都会调用。
*/
public
void
sessionOpened
(
IoSession
session
)
throws
Exception
{
count
++
;
System
.
out
.
println
(
"第 "
+
count
+
" 个 client 登陆!address: : "
+
session
.
getRemoteAddress
(
)
)
;
}
// 当客户端发送的消息到达时:
/*
* 对于 TCP 来说,连接被关闭时,调用这个方法。 对于 UDP 来说,IoSession 的 close()方法被调用时才会毁掉这个方法。
*/
@
Override
public
void
messageReceived
(
IoSession
session
,
Object
message
)
throws
Exception
{
// // 我们己设定了服务器解析消息的规则是一行一行读取,这里就可转为String:
// String s = (String) message;
// // Write the received data back to remote peer
// System.out.println("收到客户机发来的消息: " + s);
// // 测试将消息回送给客户端 session.write(s+count); count++;
PlayerAccount_Entity
ho
=
(
PlayerAccount_Entity
)
message
;
System
.
out
.
println
(
"Client Say:"
+
ho
.
getName
(
)
)
;
ho
.
setName
(
"Himi 317426208@qq.com"
)
;
session
.
write
(
ho
)
;
}
// 当信息已经传送给客户端后触发此方法.
/*
* 当发送消息成功时调用这个方法,注意这里的措辞,发送成功之后,也就是说发送消息是不 能用这个方法的。
*/
@
Override
public
void
messageSent
(
IoSession
session
,
Object
message
)
{
System
.
out
.
println
(
"信息已经传送给客户端"
)
;
}
// 当一个客户端关闭时
/*
* 对于 TCP 来说,连接被关闭时,调用这个方法。 对于 UDP 来说,IoSession 的 close()方法被调用时才会毁掉这个方法。
*/
@
Override
public
void
sessionClosed
(
IoSession
session
)
{
System
.
out
.
println
(
"one Clinet Disconnect !"
)
;
}
// 当连接空闲时触发此方法.
/*
* 这个方法在 IoSession 的通道进入空闲状态时调用,对于 UDP 协议来说,这个方法始终不会 被调用。
*/
@
Override
public
void
sessionIdle
(
IoSession
session
,
IdleStatus
status
)
{
// System.out.println("连接空闲");
}
// 当接口中其他方法抛出异常未被捕获时触发此方法
/*
* 这个方法在你的程序、Mina 自身出现异常时回调,一般这里是关闭 IoSession。
*/
@
Override
public
void
exceptionCaught
(
IoSession
session
,
Throwable
cause
)
{
System
.
out
.
println
(
"其他方法抛出异常"
)
;
}
}
|
OK,首先启动Server端,然后运行Client端,观察控制台: