Netty 与硬件设备交互,下行命令时(服务对设备),如何等待设备响应,再进行业务操作解决方案

简介: Netty 与硬件设备交互,下行命令时(服务对设备),如何等待设备响应,再进行业务操作解决方案

前景
由于业务需要,需要用Netty 为基础做一个高并发、高可用的服务/客户端(硬件)交互系统。基本业务是:客户端自动发送一些消息给服务器,通过服务器返回来确定客户端是否正常。在使用过程中,部分场景需要服务器下行命令给客户端操作,由于Netty 是NIO基础框架,一般情况下无法正常的同步返回,即等待客户端的返回,判定命令是否成功,进而判定如何操作。

本人在网上找了很久,没找到合适的方法,通过有效时间的赞言与查阅书籍,发现了一个有效的方法。分享给大家,请大家多多指教和交流。不喜勿喷...........

NettyClient.class -- 业务的操作方法。

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**

  • @Description: Netty客户端
  • @Author:xxx
  • @Since:2022年7月6日下午8:54:59
    */
    @Slf4j
    @Component
    public class NettyClient {

    public String sendSyncMsg(Channel channel, byte[] bytes) {

     SyncFuture<String> syncFuture = new SyncFuture<>();
     // 放入缓存中
     futureCache.put(channel.id().asShortText(), syncFuture);
    
     // 发送同步消息
     String result = sendSyncMsg(channel, bytes, syncFuture);
    
     return result;
    

    }

    /**

    • @param channel 下发指令的客户端
    • @param bytes 指令信息
    • @Description: 发送同步消息
    • @Author: xxxx
    • @Since: 2022年7月7日下午5:08:47
      */
      private String sendSyncMsg(Channel channel, byte[] bytes, SyncFuture syncFuture) {

      String result = null;
      log.info(" >>> 对 {} 设备下发指令:{}", channel.id(), HexUtil.bytesToHexString(bytes));

      try {

       ChannelFuture future = channel.writeAndFlush(bytes);
       future.addListener(new ChannelFutureListener() {
      
           @Override
           public void operationComplete(ChannelFuture future) throws Exception {
      
               if (future.isSuccess()) {
                   log.info(" >>> 指令下发成功");
               } else {
                   log.error(" >>> 指令下发失败");
               }
           }
       });
      
       // 等待 8 秒
       result = syncFuture.get(8, TimeUnit.SECONDS);
      

      } catch (InterruptedException e) {

       e.printStackTrace();
      

      }

      return result;
      }
      }

SyncFuture.class 实现的重点

package com.sfjiayuan.carrent.modular.socket.client;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class SyncFuture implements Future {

// 因为请求和响应是一一对应的,因此初始化CountDownLatch值为1。
private CountDownLatch latch = new CountDownLatch(1);

// 需要响应线程设置的响应结果
private T response;

// Futrue的请求时间,用于计算Future是否超时
private long beginTime = System.currentTimeMillis();

public SyncFuture() {
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
    return false;
}

@Override
public boolean isCancelled() {
    return false;
}

@Override
public boolean isDone() {
    if (response != null) {
        return true;
    }
    return false;
}

// 获取响应结果,直到有结果才返回。
@Override
public T get() throws InterruptedException {
    latch.await();
    return this.response;
}

// 获取响应结果,直到有结果或者超过指定时间就返回。
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException {
    if (latch.await(timeout, unit)) {
        return this.response;
    }
    return null;
}

// 用于设置响应结果,并且做countDown操作,通知请求线程
public void setResponse(T response) {
    this.response = response;
    latch.countDown();
}

public long getBeginTime() {
    return beginTime;
}

}

工具类:

import com.google.common.cache.*;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@Slf4j
public class ChannelCache {
/**

 * key :车辆的IMEI (IMEI唯一)
 * value : channelId
 */
public static final Map<String, Channel> channelMap = new HashMap<>();

public static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);


public static String lookupIMSI(Channel channel) {
    String ret = null;
    for (Map.Entry<String, Channel> entry : channelMap.entrySet()) {
        if (entry.getValue().id().equals(channel.id())) {
            ret = entry.getKey();
            break;
        }
    }
    return ret;
}


/**
 * key : channelId
 * value: 阻塞 Future
 * <p>
 * 缓存接口这里是LoadingCache,LoadingCache在缓存项不存在时可以自动加载缓存
 */
public static final LoadingCache<String, SyncFuture> futureCache = CacheBuilder.newBuilder()
        //设置缓存容器的初始容量为10
        .initialCapacity(100)
        // maximumSize 设置缓存大小
        .maximumSize(10000)
        //设置并发级别为20,并发级别是指可以同时写缓存的线程数
        .concurrencyLevel(20)
        // expireAfterWrite设置写缓存后8秒钟过期
        .expireAfterWrite(8, TimeUnit.SECONDS)
        //设置缓存的移除通知
        .removalListener(new RemovalListener<Object, Object>() {
            @Override
            public void onRemoval(RemovalNotification<Object, Object> notification) {
                log.debug("LoadingCache: {} was removed, cause is {}", notification.getKey(), notification.getCause());
            }
        })
        //build方法中可以指定CacheLoader,在缓存不存在时通过CacheLoader的实现自动加载缓存
        .build(new CacheLoader<String, SyncFuture>() {
            @Override
            public SyncFuture load(String key) throws Exception {
                // 当获取key的缓存不存在时,不需要自动添加
                return null;
            }
        });

public static void ackSyncMsg(Channel channel, String message) {
    // 从缓存中获取数据
    String shortText = channel.id().asShortText();

    SyncFuture<String> syncFuture = futureCache.getIfPresent(shortText);

    // 如果不为null, 则通知返回
    if (syncFuture != null) {
        syncFuture.setResponse(message);
        //主动释放
        futureCache.invalidate(shortText);
    }
}

}

Socket 服务端的引用部分代码:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 由于我们配置的是 String 编解码器,所以这里取到的用户发来的数据是 String
    Channel channel = ctx.channel();
    ChannelCache.ackSyncMsg(channel, message);

}

相关文章
|
NoSQL Redis
使用netty按照Redis协议发消息完成set key value 命令
使用netty按照Redis协议发消息完成set key value 命令
82 0
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13440 1
|
4月前
|
消息中间件 Oracle Dubbo
Netty 源码共读(一)如何阅读JDK下sun包的源码
Netty 源码共读(一)如何阅读JDK下sun包的源码
110 1
|
9月前
|
NoSQL Java Redis
跟着源码学IM(十二):基于Netty打造一款高性能的IM即时通讯程序
关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。 原本打算做个多人斗地主练习程序,但那需要织入过多的业务逻辑,因此一方面会带来不必要的理解难度,让案例更为复杂化,另一方面代码量也会偏多,所以最终依旧选择实现基本的IM聊天程序,既简单,又能加深对Netty的理解。
143 1
|
4月前
|
编解码 前端开发 网络协议
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
98 0
|
4月前
|
编解码 安全 前端开发
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
167 0
|
11月前
|
分布式计算 网络协议 前端开发
【Netty底层数据交互源码】
【Netty底层数据交互源码】
|
11月前
|
Java 容器
【深入研究NIO与Netty线程模型的源码】
【深入研究NIO与Netty线程模型的源码】
|
编解码 弹性计算 缓存
Netty源码和Reactor模型
Netty源码和Reactor模型
95 0
|
设计模式 监控 前端开发
第 10 章 Netty 核心源码剖析
第 10 章 Netty 核心源码剖析
124 0