前景
由于业务需要,需要用Netty 为基础做一个高并发、高可用的服务/客户端(硬件)交互系统。基本业务是:客户端自动发送一些消息给服务器,通过服务器返回来确定客户端是否正常。在使用过程中,部分场景需要服务器下行命令给客户端操作,由于Netty 是NIO基础框架,一般情况下无法正常的同步返回,即等待客户端的返回,判定命令是否成功,进而判定如何操作。
本人在网上找了很久,没找到合适的方法,通过有效时间的赞言与查阅书籍,发现了一个有效的方法。分享给大家,请大家多多指教和交流。不喜勿喷...........
NettyClient.class -- 业务的操作方法。
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
- @Description: Netty客户端
- @Author:xxx
@Since:2022年7月6日下午8:54:59
*/
@Slf4j
@Component
public class NettyClient {public String sendSyncMsg(Channel channel, byte[] bytes) {
SyncFuture<String> syncFuture = new SyncFuture<>(); // 放入缓存中 futureCache.put(channel.id().asShortText(), syncFuture); // 发送同步消息 String result = sendSyncMsg(channel, bytes, syncFuture); return result;
}
/**
- @param channel 下发指令的客户端
- @param bytes 指令信息
- @Description: 发送同步消息
- @Author: xxxx
@Since: 2022年7月7日下午5:08:47
*/
private String sendSyncMsg(Channel channel, byte[] bytes, SyncFuture syncFuture) {String result = null;
log.info(" >>> 对 {} 设备下发指令:{}", channel.id(), HexUtil.bytesToHexString(bytes));try {
ChannelFuture future = channel.writeAndFlush(bytes); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { log.info(" >>> 指令下发成功"); } else { log.error(" >>> 指令下发失败"); } } }); // 等待 8 秒 result = syncFuture.get(8, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
}
SyncFuture.class 实现的重点
package com.sfjiayuan.carrent.modular.socket.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class SyncFuture implements Future {
// 因为请求和响应是一一对应的,因此初始化CountDownLatch值为1。
private CountDownLatch latch = new CountDownLatch(1);
// 需要响应线程设置的响应结果
private T response;
// Futrue的请求时间,用于计算Future是否超时
private long beginTime = System.currentTimeMillis();
public SyncFuture() {
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
if (response != null) {
return true;
}
return false;
}
// 获取响应结果,直到有结果才返回。
@Override
public T get() throws InterruptedException {
latch.await();
return this.response;
}
// 获取响应结果,直到有结果或者超过指定时间就返回。
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException {
if (latch.await(timeout, unit)) {
return this.response;
}
return null;
}
// 用于设置响应结果,并且做countDown操作,通知请求线程
public void setResponse(T response) {
this.response = response;
latch.countDown();
}
public long getBeginTime() {
return beginTime;
}
}
工具类:
import com.google.common.cache.*;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ChannelCache {
/**
* key :车辆的IMEI (IMEI唯一)
* value : channelId
*/
public static final Map<String, Channel> channelMap = new HashMap<>();
public static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public static String lookupIMSI(Channel channel) {
String ret = null;
for (Map.Entry<String, Channel> entry : channelMap.entrySet()) {
if (entry.getValue().id().equals(channel.id())) {
ret = entry.getKey();
break;
}
}
return ret;
}
/**
* key : channelId
* value: 阻塞 Future
* <p>
* 缓存接口这里是LoadingCache,LoadingCache在缓存项不存在时可以自动加载缓存
*/
public static final LoadingCache<String, SyncFuture> futureCache = CacheBuilder.newBuilder()
//设置缓存容器的初始容量为10
.initialCapacity(100)
// maximumSize 设置缓存大小
.maximumSize(10000)
//设置并发级别为20,并发级别是指可以同时写缓存的线程数
.concurrencyLevel(20)
// expireAfterWrite设置写缓存后8秒钟过期
.expireAfterWrite(8, TimeUnit.SECONDS)
//设置缓存的移除通知
.removalListener(new RemovalListener<Object, Object>() {
@Override
public void onRemoval(RemovalNotification<Object, Object> notification) {
log.debug("LoadingCache: {} was removed, cause is {}", notification.getKey(), notification.getCause());
}
})
//build方法中可以指定CacheLoader,在缓存不存在时通过CacheLoader的实现自动加载缓存
.build(new CacheLoader<String, SyncFuture>() {
@Override
public SyncFuture load(String key) throws Exception {
// 当获取key的缓存不存在时,不需要自动添加
return null;
}
});
public static void ackSyncMsg(Channel channel, String message) {
// 从缓存中获取数据
String shortText = channel.id().asShortText();
SyncFuture<String> syncFuture = futureCache.getIfPresent(shortText);
// 如果不为null, 则通知返回
if (syncFuture != null) {
syncFuture.setResponse(message);
//主动释放
futureCache.invalidate(shortText);
}
}
}
Socket 服务端的引用部分代码:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 由于我们配置的是 String 编解码器,所以这里取到的用户发来的数据是 String
Channel channel = ctx.channel();
ChannelCache.ackSyncMsg(channel, message);
}