心跳检测的概念
在分布式架构中,比如Hadoop集群,Storm集群等,或多或少都涉及到Master/Slave的概念,往往是一个或者多个Master和N个Slave之间进行通信。那么通常Master应该需要知道Slave的状态,Slave会定时的向Master进行发送消息,相当于告知Master:“我还活着,我现在在做什么,什么进度,我的CPU/内存情况如何”等,这就是所谓的心跳。Master根据Slave的心跳,进行协调,比如Slave的CPU/内存消耗很大,那么Master可以将任务分配给其他负载小的Slave进行处理;比如Slave一段时间没有发送心跳过来,那么Master可能会将可服务列表中暂时删除该Slave,并可能发出报警,告知运维/开发人员进行处理.如下图所示。
Netty实现心跳检测代码实例
心跳信息对象
主要储存Slave的IP,通信PORT,时间,内存,CPU信息等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
package
day4;
import
java.io.Serializable;
import
java.util.Date;
import
java.util.HashMap;
import
java.util.Map;
/**
* Created by zhangfengzhe on 2017/2/4.
*/
public
class
HeartInfo
implements
Serializable{
private
String ip;
private
int
port;
private
Date lasttime;
private
Map<String , String> cpuInfo =
new
HashMap<String,String>();
private
Map<String , String> memInfo =
new
HashMap<String, String>();
public
String getIp() {
return
ip;
}
public
void
setIp(String ip) {
this
.ip = ip;
}
public
int
getPort() {
return
port;
}
public
void
setPort(
int
port) {
this
.port = port;
}
public
Date getLasttime() {
return
lasttime;
}
public
void
setLasttime(Date lasttime) {
this
.lasttime = lasttime;
}
public
Map<String, String> getCpuInfo() {
return
cpuInfo;
}
public
void
setCpuInfo(Map<String, String> cpuInfo) {
this
.cpuInfo = cpuInfo;
}
public
Map<String, String> getMemInfo() {
return
memInfo;
}
public
void
setMemInfo(Map<String, String> memInfo) {
this
.memInfo = memInfo;
}
@Override
public
String toString() {
return
"HeartInfo{"
+
"ip='"
+ ip + '\
''
+
", port="
+ port +
", lasttime="
+ lasttime +
", cpuInfo="
+ cpuInfo +
", memInfo="
+ memInfo +
'}'
;
}
}
|
JBoss Marshalling编解码处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
package
day3;
import
io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import
io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import
io.netty.handler.codec.marshalling.MarshallerProvider;
import
io.netty.handler.codec.marshalling.MarshallingDecoder;
import
io.netty.handler.codec.marshalling.MarshallingEncoder;
import
io.netty.handler.codec.marshalling.UnmarshallerProvider;
import
org.jboss.marshalling.MarshallerFactory;
import
org.jboss.marshalling.Marshalling;
import
org.jboss.marshalling.MarshallingConfiguration;
/**
* Marshalling工厂
*/
public
final
class
MarshallingCodeCFactory {
/**
* 创建Jboss Marshalling解码器MarshallingDecoder
* @return MarshallingDecoder
*/
public
static
MarshallingDecoder buildMarshallingDecoder() {
//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
final
MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory(
"serial"
);
//创建了MarshallingConfiguration对象,配置了版本号为5
final
MarshallingConfiguration configuration =
new
MarshallingConfiguration();
configuration.setVersion(
5
);
//根据marshallerFactory和configuration创建provider
UnmarshallerProvider provider =
new
DefaultUnmarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
MarshallingDecoder decoder =
new
MarshallingDecoder(provider,
1024
);
return
decoder;
}
/**
* 创建Jboss Marshalling编码器MarshallingEncoder
* @return MarshallingEncoder
*/
public
static
MarshallingEncoder buildMarshallingEncoder() {
final
MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory(
"serial"
);
final
MarshallingConfiguration configuration =
new
MarshallingConfiguration();
configuration.setVersion(
5
);
MarshallerProvider provider =
new
DefaultMarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
MarshallingEncoder encoder =
new
MarshallingEncoder(provider);
return
encoder;
}
}
|
Client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
package
day4;
import
io.netty.bootstrap.Bootstrap;
import
io.netty.channel.ChannelFuture;
import
io.netty.channel.ChannelInitializer;
import
io.netty.channel.EventLoopGroup;
import
io.netty.channel.nio.NioEventLoopGroup;
import
io.netty.channel.socket.SocketChannel;
import
io.netty.channel.socket.nio.NioSocketChannel;
public
class
Client {
public
static
void
main(String[] args)
throws
Exception{
EventLoopGroup group =
new
NioEventLoopGroup();
Bootstrap b =
new
Bootstrap();
final
int
port =
8765
;
final
String serverIP =
"127.0.0.1"
;
b.group(group)
.channel(NioSocketChannel.
class
)
.handler(
new
ChannelInitializer<SocketChannel>() {
@Override
protected
void
initChannel(SocketChannel sc)
throws
Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(
new
ClientHandler(port));
}
});
ChannelFuture cf = b.connect(serverIP, port).sync();
cf.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
|
Client Handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
package
day4;
import
io.netty.channel.ChannelHandlerAdapter;
import
io.netty.channel.ChannelHandlerContext;
import
io.netty.util.ReferenceCountUtil;
import
java.net.InetAddress;
import
java.net.UnknownHostException;
import
java.util.concurrent.Executors;
import
java.util.concurrent.ScheduledExecutorService;
import
java.util.concurrent.ScheduledFuture;
import
java.util.concurrent.TimeUnit;
/**
* Created by zhangfengzhe on 2017/2/4.
*/
public
class
ClientHandler
extends
ChannelHandlerAdapter {
private
String ip;
private
int
port;
private
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(
1
);
private
ScheduledFuture<?> scheduledFuture;
private
static
final
String SUCCESS =
"OK"
;
public
ClientHandler(){}
public
ClientHandler(
int
port) {
this
.port = port;
//获取本机IP
try
{
this
.ip = InetAddress.getLocalHost().getHostAddress();
}
catch
(UnknownHostException e) {
e.printStackTrace();
}
}
//通道建立初始化时 发送信息 准备握手验证
@Override
public
void
channelActive(ChannelHandlerContext ctx)
throws
Exception {
String authInfo =
this
.ip +
":"
+
this
.port;
ctx.writeAndFlush(authInfo);
}
//当服务器发送认证信息后,开始启动心跳发送
@Override
public
void
channelRead(ChannelHandlerContext ctx, Object msg)
throws
Exception {
if
(msg
instanceof
String){
//认证成功
if
(SUCCESS.equals((String)msg)){
this
.scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(
new
HeartTask(ctx,ip,port),
2
,
3
, TimeUnit.SECONDS);
}
else
{
System.out.println(
"服务器发来消息:"
+ msg);
}
}
ReferenceCountUtil.release(msg);
}
//如果出现异常 取消定时
@Override
public
void
exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws
Exception {
cause.printStackTrace();
if
(
this
.scheduledFuture !=
null
){
this
.scheduledFuture.cancel(
true
);
this
.scheduledFuture =
null
;
}
}
}
|
Client和Server建立通道初始化的时候,Client会向服务器发送信息用于认证。在实际开发中,Client在发送心跳前,需要和Server端进行握手验证,会涉及到加解密,这里为了简单起见,省去了这些过程。从上面的代码也可以看到,如果服务端认证成功,那么Client会开始启动定时线程去执行任务,那么接下来,我们看看这个心跳任务。
心跳任务HeartTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|