Netty 与硬件设备交互,下行命令时(服务对设备),如何等待设备响应,再进行业务操作解决方案

简介: Netty 与硬件设备交互,下行命令时(服务对设备),如何等待设备响应,再进行业务操作解决方案

前景
由于业务需要,需要用Netty 为基础做一个高并发、高可用的服务/客户端(硬件)交互系统。基本业务是:客户端自动发送一些消息给服务器,通过服务器返回来确定客户端是否正常。在使用过程中,部分场景需要服务器下行命令给客户端操作,由于Netty 是NIO基础框架,一般情况下无法正常的同步返回,即等待客户端的返回,判定命令是否成功,进而判定如何操作。

本人在网上找了很久,没找到合适的方法,通过有效时间的赞言与查阅书籍,发现了一个有效的方法。分享给大家,请大家多多指教和交流。不喜勿喷...........

NettyClient.class -- 业务的操作方法。

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**

  • @Description: Netty客户端
  • @Author:xxx
  • @Since:2022年7月6日下午8:54:59
    */
    @Slf4j
    @Component
    public class NettyClient {

    public String sendSyncMsg(Channel channel, byte[] bytes) {

     SyncFuture<String> syncFuture = new SyncFuture<>();
     // 放入缓存中
     futureCache.put(channel.id().asShortText(), syncFuture);
    
     // 发送同步消息
     String result = sendSyncMsg(channel, bytes, syncFuture);
    
     return result;
    
    AI 代码解读

    }

    /**

    • @param channel 下发指令的客户端
    • @param bytes 指令信息
    • @Description: 发送同步消息
    • @Author: xxxx
    • @Since: 2022年7月7日下午5:08:47
      */
      private String sendSyncMsg(Channel channel, byte[] bytes, SyncFuture syncFuture) {

      String result = null;
      log.info(" >>> 对 {} 设备下发指令:{}", channel.id(), HexUtil.bytesToHexString(bytes));

      try {

       ChannelFuture future = channel.writeAndFlush(bytes);
       future.addListener(new ChannelFutureListener() {
      
           @Override
           public void operationComplete(ChannelFuture future) throws Exception {
      
               if (future.isSuccess()) {
                   log.info(" >>> 指令下发成功");
               } else {
                   log.error(" >>> 指令下发失败");
               }
           }
       });
      
       // 等待 8 秒
       result = syncFuture.get(8, TimeUnit.SECONDS);
      
      AI 代码解读

      } catch (InterruptedException e) {

       e.printStackTrace();
      
      AI 代码解读

      }

      return result;
      }
      }

SyncFuture.class 实现的重点

package com.sfjiayuan.carrent.modular.socket.client;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class SyncFuture implements Future {

// 因为请求和响应是一一对应的,因此初始化CountDownLatch值为1。
private CountDownLatch latch = new CountDownLatch(1);

// 需要响应线程设置的响应结果
private T response;

// Futrue的请求时间,用于计算Future是否超时
private long beginTime = System.currentTimeMillis();

public SyncFuture() {
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
    return false;
}

@Override
public boolean isCancelled() {
    return false;
}

@Override
public boolean isDone() {
    if (response != null) {
        return true;
    }
    return false;
}

// 获取响应结果,直到有结果才返回。
@Override
public T get() throws InterruptedException {
    latch.await();
    return this.response;
}

// 获取响应结果,直到有结果或者超过指定时间就返回。
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException {
    if (latch.await(timeout, unit)) {
        return this.response;
    }
    return null;
}

// 用于设置响应结果,并且做countDown操作,通知请求线程
public void setResponse(T response) {
    this.response = response;
    latch.countDown();
}

public long getBeginTime() {
    return beginTime;
}
AI 代码解读

}

工具类:

import com.google.common.cache.*;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@Slf4j
public class ChannelCache {
/**

 * key :车辆的IMEI (IMEI唯一)
 * value : channelId
 */
public static final Map<String, Channel> channelMap = new HashMap<>();

public static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);


public static String lookupIMSI(Channel channel) {
    String ret = null;
    for (Map.Entry<String, Channel> entry : channelMap.entrySet()) {
        if (entry.getValue().id().equals(channel.id())) {
            ret = entry.getKey();
            break;
        }
    }
    return ret;
}


/**
 * key : channelId
 * value: 阻塞 Future
 * <p>
 * 缓存接口这里是LoadingCache,LoadingCache在缓存项不存在时可以自动加载缓存
 */
public static final LoadingCache<String, SyncFuture> futureCache = CacheBuilder.newBuilder()
        //设置缓存容器的初始容量为10
        .initialCapacity(100)
        // maximumSize 设置缓存大小
        .maximumSize(10000)
        //设置并发级别为20,并发级别是指可以同时写缓存的线程数
        .concurrencyLevel(20)
        // expireAfterWrite设置写缓存后8秒钟过期
        .expireAfterWrite(8, TimeUnit.SECONDS)
        //设置缓存的移除通知
        .removalListener(new RemovalListener<Object, Object>() {
            @Override
            public void onRemoval(RemovalNotification<Object, Object> notification) {
                log.debug("LoadingCache: {} was removed, cause is {}", notification.getKey(), notification.getCause());
            }
        })
        //build方法中可以指定CacheLoader,在缓存不存在时通过CacheLoader的实现自动加载缓存
        .build(new CacheLoader<String, SyncFuture>() {
            @Override
            public SyncFuture load(String key) throws Exception {
                // 当获取key的缓存不存在时,不需要自动添加
                return null;
            }
        });

public static void ackSyncMsg(Channel channel, String message) {
    // 从缓存中获取数据
    String shortText = channel.id().asShortText();

    SyncFuture<String> syncFuture = futureCache.getIfPresent(shortText);

    // 如果不为null, 则通知返回
    if (syncFuture != null) {
        syncFuture.setResponse(message);
        //主动释放
        futureCache.invalidate(shortText);
    }
}
AI 代码解读

}

Socket 服务端的引用部分代码:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 由于我们配置的是 String 编解码器,所以这里取到的用户发来的数据是 String
    Channel channel = ctx.channel();
    ChannelCache.ackSyncMsg(channel, message);
AI 代码解读

}

相关文章
|
移动开发 网络协议 算法
由浅入深Netty粘包与半包解决方案
由浅入深Netty粘包与半包解决方案
92 0
|
8月前
|
编解码 缓存 移动开发
TCP粘包/拆包与Netty解决方案
TCP粘包/拆包与Netty解决方案
113 0
|
8月前
|
安全 Java Go
springboot+netty化身Udp服务端,go化身客户端模拟设备实现指令联动
springboot+netty化身Udp服务端,go化身客户端模拟设备实现指令联动
183 0
|
8月前
|
Java
Netty实现HTTP服务
Netty实现HTTP服务
66 0
|
分布式计算 网络协议 前端开发
【Netty底层数据交互源码】
【Netty底层数据交互源码】
|
NoSQL Redis
使用netty按照Redis协议发消息完成set key value 命令
使用netty按照Redis协议发消息完成set key value 命令
94 0
编译netty报错缺少 io.netty.collection 包 解决方案
编译netty报错缺少 io.netty.collection 包 解决方案
88 0
|
安全 API
Gateway集成Netty服务
Netty是一个异步的,事件驱动的网络应用框架,用以快速开发高可靠、高性能的网络应用程序,提供网络传输能力的管理,支持常见的数据传输协议;
352 0
Gateway集成Netty服务
|
前端开发 网络协议 Java
Netty服务开发及性能优化
造成假死的原因可能是公网丢包、客户端或服务端网络故障等,Netty为我们提供了IdleStateHandler 来解决超时假死问题,示例代码如下
134 0