前景
由于业务需要,需要用Netty 为基础做一个高并发、高可用的服务/客户端(硬件)交互系统。基本业务是:客户端自动发送一些消息给服务器,通过服务器返回来确定客户端是否正常。在使用过程中,部分场景需要服务器下行命令给客户端操作,由于Netty 是NIO基础框架,一般情况下无法正常的同步返回,即等待客户端的返回,判定命令是否成功,进而判定如何操作。
本人在网上找了很久,没找到合适的方法,通过有效时间的赞言与查阅书籍,发现了一个有效的方法。分享给大家,请大家多多指教和交流。不喜勿喷...........
NettyClient.class -- 业务的操作方法。
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
- @Description: Netty客户端
- @Author:xxx
@Since:2022年7月6日下午8:54:59
*/
@Slf4j
@Component
public class NettyClient {public String sendSyncMsg(Channel channel, byte[] bytes) {
SyncFuture<String> syncFuture = new SyncFuture<>(); // 放入缓存中 futureCache.put(channel.id().asShortText(), syncFuture); // 发送同步消息 String result = sendSyncMsg(channel, bytes, syncFuture); return result;
AI 代码解读}
/**
- @param channel 下发指令的客户端
- @param bytes 指令信息
- @Description: 发送同步消息
- @Author: xxxx
@Since: 2022年7月7日下午5:08:47
*/
private String sendSyncMsg(Channel channel, byte[] bytes, SyncFuture syncFuture) {String result = null;
log.info(" >>> 对 {} 设备下发指令:{}", channel.id(), HexUtil.bytesToHexString(bytes));try {
ChannelFuture future = channel.writeAndFlush(bytes); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { log.info(" >>> 指令下发成功"); } else { log.error(" >>> 指令下发失败"); } } }); // 等待 8 秒 result = syncFuture.get(8, TimeUnit.SECONDS);
AI 代码解读} catch (InterruptedException e) {
e.printStackTrace();
AI 代码解读}
return result;
}
}
SyncFuture.class 实现的重点
package com.sfjiayuan.carrent.modular.socket.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class SyncFuture implements Future {
// 因为请求和响应是一一对应的,因此初始化CountDownLatch值为1。 private CountDownLatch latch = new CountDownLatch(1); // 需要响应线程设置的响应结果 private T response; // Futrue的请求时间,用于计算Future是否超时 private long beginTime = System.currentTimeMillis(); public SyncFuture() { } @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } @Override public boolean isCancelled() { return false; } @Override public boolean isDone() { if (response != null) { return true; } return false; } // 获取响应结果,直到有结果才返回。 @Override public T get() throws InterruptedException { latch.await(); return this.response; } // 获取响应结果,直到有结果或者超过指定时间就返回。 @Override public T get(long timeout, TimeUnit unit) throws InterruptedException { if (latch.await(timeout, unit)) { return this.response; } return null; } // 用于设置响应结果,并且做countDown操作,通知请求线程 public void setResponse(T response) { this.response = response; latch.countDown(); } public long getBeginTime() { return beginTime; }
AI 代码解读
}
工具类:
import com.google.common.cache.*;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ChannelCache {
/**
* key :车辆的IMEI (IMEI唯一) * value : channelId */ public static final Map<String, Channel> channelMap = new HashMap<>(); public static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); public static String lookupIMSI(Channel channel) { String ret = null; for (Map.Entry<String, Channel> entry : channelMap.entrySet()) { if (entry.getValue().id().equals(channel.id())) { ret = entry.getKey(); break; } } return ret; } /** * key : channelId * value: 阻塞 Future * <p> * 缓存接口这里是LoadingCache,LoadingCache在缓存项不存在时可以自动加载缓存 */ public static final LoadingCache<String, SyncFuture> futureCache = CacheBuilder.newBuilder() //设置缓存容器的初始容量为10 .initialCapacity(100) // maximumSize 设置缓存大小 .maximumSize(10000) //设置并发级别为20,并发级别是指可以同时写缓存的线程数 .concurrencyLevel(20) // expireAfterWrite设置写缓存后8秒钟过期 .expireAfterWrite(8, TimeUnit.SECONDS) //设置缓存的移除通知 .removalListener(new RemovalListener<Object, Object>() { @Override public void onRemoval(RemovalNotification<Object, Object> notification) { log.debug("LoadingCache: {} was removed, cause is {}", notification.getKey(), notification.getCause()); } }) //build方法中可以指定CacheLoader,在缓存不存在时通过CacheLoader的实现自动加载缓存 .build(new CacheLoader<String, SyncFuture>() { @Override public SyncFuture load(String key) throws Exception { // 当获取key的缓存不存在时,不需要自动添加 return null; } }); public static void ackSyncMsg(Channel channel, String message) { // 从缓存中获取数据 String shortText = channel.id().asShortText(); SyncFuture<String> syncFuture = futureCache.getIfPresent(shortText); // 如果不为null, 则通知返回 if (syncFuture != null) { syncFuture.setResponse(message); //主动释放 futureCache.invalidate(shortText); } }
AI 代码解读
}
Socket 服务端的引用部分代码:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 由于我们配置的是 String 编解码器,所以这里取到的用户发来的数据是 String Channel channel = ctx.channel(); ChannelCache.ackSyncMsg(channel, message);
AI 代码解读
}