数据通信的场景:长连接 OR 短连接
在实际场景中,我们如何使用Netty进行通信呢?大致有3种方式:
第一种,使用长连接通道不断开的形式进行通信,也就是服务器和客户端的通道一直处于开启的状态。如果服务器性能足够好,并且我们的客户端数量也比较少的情况下,是适合使用长连接的通道。
第二种,采用短连接方式,一次性批量提交数据,也就是我们会把数据保存在本地临时缓冲区或者临时表里。当达到数量时,就进行批量提交;或者通过定时任务轮询提交。这种情况是有弊端的,就是无法做到实时传输。如果应用程序对实时性要求不高,可以考虑使用。
第三种,采用一种特殊的长连接。特殊在哪里呢?在指定的某一时间之内,服务器与某台客户端没有任何通信,则断开连接,如果断开连接后,客户端又需要向服务器发送请求,那么再次建立连接。这里有点CachedThreadPool的味道。
本篇博客将采用Netty来实现第三种方式的数据通信,接下来我们一起来看看吧~
Netty数据通信代码实例
请求消息对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package
day3;
import
java.io.Serializable;
public
class
Request
implements
Serializable{
private
static
final
long
SerialVersionUID = 1L;
private
String id ;
private
String name ;
private
String requestMessage ;
public
String getId() {
return
id;
}
public
void
setId(String id) {
this
.id = id;
}
public
String getName() {
return
name;
}
public
void
setName(String name) {
this
.name = name;
}
public
String getRequestMessage() {
return
requestMessage;
}
public
void
setRequestMessage(String requestMessage) {
this
.requestMessage = requestMessage;
}
}
|
响应消息对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package
day3;
import
java.io.Serializable;
public
class
Response
implements
Serializable{
private
static
final
long
serialVersionUID = 1L;
private
String id;
private
String name;
private
String responseMessage;
public
String getId() {
return
id;
}
public
void
setId(String id) {
this
.id = id;
}
public
String getName() {
return
name;
}
public
void
setName(String name) {
this
.name = name;
}
public
String getResponseMessage() {
return
responseMessage;
}
public
void
setResponseMessage(String responseMessage) {
this
.responseMessage = responseMessage;
}
}
|
编解码处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
package
day3;
import
io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import
io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import
io.netty.handler.codec.marshalling.MarshallerProvider;
import
io.netty.handler.codec.marshalling.MarshallingDecoder;
import
io.netty.handler.codec.marshalling.MarshallingEncoder;
import
io.netty.handler.codec.marshalling.UnmarshallerProvider;
import
org.jboss.marshalling.MarshallerFactory;
import
org.jboss.marshalling.Marshalling;
import
org.jboss.marshalling.MarshallingConfiguration;
/**
* Marshalling工厂
*/
public
final
class
MarshallingCodeCFactory {
/**
* 创建Jboss Marshalling解码器MarshallingDecoder
* @return MarshallingDecoder
*/
public
static
MarshallingDecoder buildMarshallingDecoder() {
//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
final
MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory(
"serial"
);
//创建了MarshallingConfiguration对象,配置了版本号为5
final
MarshallingConfiguration configuration =
new
MarshallingConfiguration();
configuration.setVersion(
5
);
//根据marshallerFactory和configuration创建provider
UnmarshallerProvider provider =
new
DefaultUnmarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
MarshallingDecoder decoder =
new
MarshallingDecoder(provider,
1024
);
return
decoder;
}
/**
* 创建Jboss Marshalling编码器MarshallingEncoder
* @return MarshallingEncoder
*/
public
static
MarshallingEncoder buildMarshallingEncoder() {
final
MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory(
"serial"
);
final
MarshallingConfiguration configuration =
new
MarshallingConfiguration();
configuration.setVersion(
5
);
MarshallerProvider provider =
new
DefaultMarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
MarshallingEncoder encoder =
new
MarshallingEncoder(provider);
return
encoder;
}
}
|
注意,在上一篇博客《Netty实践(二):TCP拆包、粘包问题》中,我们是自己继承ByteToMessageDecoder、MessageToByteEncoder来实现ByteBuff与消息对象的转化的,其实这是有点麻烦的。在实际中,我们完全可以利用相关序列化框架(JBoss Marshlling/Protobuf/Kryo/MessagePack)来帮助我们快速完成编解码,这里我使用的是JBoss Marshalling(jboss-marshalling-1.3.0.CR9.jar+jboss-marshalling-serial-1.3.0.CR9.jar)。具体来说,客户端和服务端交互的消息对象只需要实现JDK默认的序列化接口,同时利用JBoss Marshalling 生成编码器和解码器,用于后续Client/Server端即可。
Client Handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
package
day3;
import
io.netty.channel.ChannelHandlerAdapter;
import
io.netty.channel.ChannelHandlerContext;
import
io.netty.util.ReferenceCountUtil;
public
class
ClientHandler
extends
ChannelHandlerAdapter{
@Override
public
void
channelRead(ChannelHandlerContext ctx, Object msg)
throws
Exception {
try
{
Response resp = (Response)msg;
System.out.println(
"Client : "
+ resp.getId() +
", "
+ resp.getName() +
", "
+ resp.getResponseMessage());
}
finally
{
ReferenceCountUtil.release(msg);
}
}
@Override
public
void
exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws
Exception {
ctx.close();
}
}
|
在这里可以清楚的看到,我们直接将Object转化成了自定义消息响应对象,可见JBoss Marshalling与Netty结合后,编解码是如此简单。
Client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
package
day3;
import
io.netty.bootstrap.Bootstrap;
import
io.netty.channel.ChannelFuture;
import
io.netty.channel.ChannelInitializer;
import
io.netty.channel.EventLoopGroup;
import
io.netty.channel.nio.NioEventLoopGroup;
import
io.netty.channel.socket.SocketChannel;
import
io.netty.channel.socket.nio.NioSocketChannel;
import
io.netty.handler.logging.LogLevel;
import
io.netty.handler.logging.LoggingHandler;
import
io.netty.handler.timeout.ReadTimeoutHandler;
import
java.util.concurrent.TimeUnit;
/**
*
*/
public
class
Client {
private
static
class
SingletonHolder {
static
final
Client instance =
new
Client();
}
public
static
Client getInstance(){
return
SingletonHolder.instance;
}
private
EventLoopGroup group;
private
Bootstrap b;
//通过ChannelFuture实现读写操作
private
ChannelFuture cf ;
private
Client(){
group =
new
NioEventLoopGroup();
b =
new
Bootstrap();
b.group(group)
.channel(NioSocketChannel.
class
)
.handler(
new
LoggingHandler(LogLevel.INFO))
.handler(
new
ChannelInitializer<SocketChannel>() {
@Override
protected
void
initChannel(SocketChannel sc)
throws
Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
//超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭相应的通道,主要为减小服务端资源占用)
sc.pipeline().addLast(
new
ReadTimeoutHandler(
3
));
sc.pipeline().addLast(
new
ClientHandler());
}
});
}
public
void
connect(){
try
{
this
.cf = b.connect(
"127.0.0.1"
,
8765
).sync();
System.out.println(
"远程服务器已经连接, 可以进行数据交换.."
);
}
catch
(Exception e) {
e.printStackTrace();
}
}
//这里是通道关闭,再次建立连接的核心代码
public
ChannelFuture getChannelFuture(){
if
(
this
.cf ==
null
){
this
.connect();
}
if
(!
this
.cf.channel().isActive()){
this
.connect();
}
return
this
.cf;
}
public
static
void
main(String[] args)
throws
Exception{
final
Client c = Client.getInstance();
//注意client好像没有调用connect()方法进行连接,但是实际上在下面的代码中做了
ChannelFuture cf = c.getChannelFuture();
for
(
int
i =
1
; i <=
3
; i++ ){
Request request =
new
Request();
request.setId(
""
+ i);
request.setName(
"pro"
+ i);
request.setRequestMessage(
"数据信息"
+ i);
cf.channel().writeAndFlush(request);
TimeUnit.SECONDS.sleep(
4
);
}
cf.channel().closeFuture().sync();
//通道关闭后,通过另一个线程模拟客户端再次建立连接发送请求
new
Thread(
new
Runnable() {
@Override
public
void
run() {
try
{
System.out.println(
"进入子线程..."
);
ChannelFuture cf = c.getChannelFuture();
System.out.println(cf.channel().isActive());
System.out.println(cf.channel().isOpen());
//再次发送数据
Request request =
new
Request();
request.setId(
""
+
4
);
request.setName(
"pro"
+
4
);
request.setRequestMessage(
"数据信息"
+
4
);
cf.channel().writeAndFlush(request);
cf.channel().closeFuture().sync();
System.out.println(
"子线程结束..."
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
}).start();
System.out.println(
"断开连接,主线程结束.."
);
}
}
|
这里对Client进行了初步的封装,采用静态内部类实现单例。
Client的Handler不仅仅有Marshalling的编解码器,还加入了Netty自带的ReadTimeoutHandler,这是客户端与服务端一段时间没有通信就断开连接的依据。从这里也看到Netty的强大之处了,通过提供一些预定义的Handler让你的代码变得简单,只需要专注于业务实现即可。客户端超时断开通道后,如何再次建立连接进行通信呢?通过getChannelFuture()你会知道。
客户端代码模拟了一个线程通信超时,关闭通道后,另一个线程与服务器端再次通信。
Server Handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
package
day3;
import
io.netty.channel.ChannelHandlerAdapter;
import
io.netty.channel.ChannelHandlerContext;
public
class
ServerHandler
extends
ChannelHandlerAdapter{
@Override
public
void
channelRead(ChannelHandlerContext ctx, Object msg)
throws
Exception {
Request request = (Request)msg;
System.out.println(
"Server : "
+ request.getId() +
", "
+ request.getName() +
", "
+ request.getRequestMessage());
Response response =
new
Response();
response.setId(request.getId());
response.setName(
"response"
+ request.getId());
response.setResponseMessage(
"响应内容"
+ request.getId());
ctx.writeAndFlush(response);
//.addListener(ChannelFutureListener.CLOSE);
}
@Override
public
void
exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws
Exception {
ctx.close();
}
}
|
Server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
package
day3;
import
io.netty.bootstrap.ServerBootstrap;
import
io.netty.channel.ChannelFuture;
import
io.netty.channel.ChannelInitializer;
import
io.netty.channel.ChannelOption;
import
io.netty.channel.EventLoopGroup;
import
io.netty.channel.nio.NioEventLoopGroup;
import
io.netty.channel.socket.SocketChannel;
import
io.netty.channel.socket.nio.NioServerSocketChannel;
import
io.netty.handler.logging.LogLevel;
import
io.netty.handler.logging.LoggingHandler;
import
io.netty.handler.timeout.ReadTimeoutHandler;
public
class
Server {
public
static
void
main(String[] args)
throws
Exception{
EventLoopGroup pGroup =
new
NioEventLoopGroup();
EventLoopGroup cGroup =
new
NioEventLoopGroup();
ServerBootstrap b =
new
ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.
class
)
.option(ChannelOption.SO_BACKLOG,
1024
)
//设置日志
.handler(
new
LoggingHandler(LogLevel.INFO))
.childHandler(
new
ChannelInitializer<SocketChannel>() {
protected
void
initChannel(SocketChannel sc)
throws
Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(
new
ReadTimeoutHandler(
3
));
sc.pipeline().addLast(
new
ServerHandler());
}
});
ChannelFuture cf = b.bind(
8765
).sync();
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}
|
运行结果
说明:由于客户端一开始是发送3条消息给服务端,但是每条消息发送间隔4S,由于超时设置为3S,于是发送第一条消息后,通道便断开连接。接下来,客户端又启动了一个线程再次与服务端通信。
到这里,这篇博客就结束了,对你有用吗?
下周我们再来看Netty在心跳检测方面的应用,^_^