RabbitMq数据发送监听

简介: RabbitMq数据发送监听

一 背景

项目要集成个推能力,于是查阅了个推的文档,个推有提供sdk,集成后只需要调用sdk中的api接口就可以了,还是很方便的,有趣的是个推的sdk项目代码也是开源的,于是出于好奇,开始分析了下他的源码结构。

二 开扒

  1. 集成

    引入maven依赖

    <dependency>
    
        <groupId>com.getui.push</groupId>
    
        <artifactId>restful-sdk</artifactId>
    
        <version>1.0.0.7</version>
    
    </dependency>
    
    2. demo
    
    public class TestCreatApi {
    
       public void test() {
    
       // 设置httpClient最大连接数,当并发较大时建议调大此参数。或者启动参数加上 -Dhttp.maxConnections=200
    
       System.setProperty("http.maxConnections", "200");
       GtApiConfiguration apiConfiguration = new GtApiConfiguration(); 
       //填写应用配置 apiConfiguration.setAppId("xxx"); apiConfiguration.setAppKey("xxx");
       apiConfiguration.setMasterSecret("xxx"); 
       // 接口调用前缀,请查看文档: 接口调用规范 -> 接口前缀, 可不填写appId apiConfiguration.setDomain("https://restapi.getui.com/v2/");
       // 实例化ApiHelper对象,用于创建接口对象
       ApiHelper apiHelper = ApiHelper.build(apiConfiguration); 
       // 创建对象,建议复用。目前有PushApi、StatisticApi、UserApi
       PushApi pushApi = apiHelper.creatApi(PushApi.class); 
       } }
    

    3 解析

    实际上 sdk就是对api的封装,我们可以看他们别人的封装 思考自己封装接口的暴露

    1)对象

    • GtApiConfiguration 对象 保存个推配置
    • GtApiProxyFactory 管理创建代理工厂类
    • ApiHelper api助手类
    • DefaultApiClient

       * 1. 管理token
      
       * 2. 解析请求参数
      
       * 3. 发送HTTP请求
      
       * 4. 解析请求结果

    2) 调用流程

      首先,构造GtApiConfiguration类,传入密钥等
      其次,构建ApiHelper类,这个类相当一个核心类,里面包含了很多功能,代码其实比较简单
      
      
      ```
    
           private static final Map<String, ApiHelper> apiHelperCache = new HashMap<String, ApiHelper>(4);
    
           /**
           * @param configuration 配置信息类
           * 创建api助手
           * @return
           */
           public static ApiHelper build(GtApiConfiguration configuration) {
               return build(configuration, new DefaultJson());
           }
           /**
           * @param configuration 配置信息类
           * @return
           */
           public static ApiHelper build(GtApiConfiguration configuration, IJson json) {
           Assert.notNull(configuration, "configuration");
           // 校验配置
           configuration.check();
           // 生成缓存key
           String key = configuration.keyOfCache();
           // 从缓存中获取apiHelper 注意apiHelperCache是 不可变得常量 且 静态的
           ApiHelper apiHelper = apiHelperCache.get(key);
           if (apiHelper != null) {
               return apiHelper;
           }
           // 如果没有传入 就用个推自己的DefaultJson ,因为接口参数是IJson,所以给外部开发者留了口子
           if (json == null) {
               json = new DefaultJson();
           }
           // 锁在静态常量上,锁的是整个类对象
           synchronized (BUILD_LOCK) {
               // 所有缓存都是map中获取
               apiHelper = apiHelperCache.get(key);
               if (apiHelper != null) {
                   return apiHelper;
               }
               // 构建DefaultApiClient
               final DefaultApiClient defaultApiClient = DefaultApiClient.build(configuration, json);
               GtApiProxyFactory factory = GtApiProxyFactory.build(defaultApiClient);
               // 创建代理对象 代理AuthApi
               final AuthApi authApi = factory.createProxy(AuthApi.class);
               // 这里面执行了获取token的方法将token保存在了DefaultApiClient中
               defaultApiClient.setAuthApiAndAuth(authApi);
               // DefaultApiClient又在GtApiProxyFactory中,最终放到了ApiHelper中
               apiHelper = new ApiHelper(factory);
               apiHelperCache.put(key, apiHelper);
               return apiHelper;
           }
       }
       
       
       /**
       * 生成缓存的key
       *
       * @return 缓存key
       */
       public String keyOfCache() {
           check();
           return String.format("%s|%s|%s", this.getAppId(), this.getAppKey(), this.getMasterSecret());
       }
       
    
    
       private DefaultApiClient(GtApiConfiguration apiConfiguration, IJson json) {
           if (apiConfiguration == null) {
               throw new ApiException("apiConfiguration cannot be null.", true);
           }
           this.json = json;
           apiConfiguration.check();
           this.apiConfiguration = apiConfiguration;
           // 创建http连接
           this.httpManager = new HttpManager(apiConfiguration.getConnectTimeout(),
                   apiConfiguration.getSoTimeout(), apiConfiguration.getConnectionRequestTimeout(),
                   apiConfiguration.getMaxHttpTryTime(), apiConfiguration.getKeepAliveSeconds(),
                   apiConfiguration.getProxyConfig(), apiConfiguration.isTrustSSL());
    
           this.hostManager = new HostManager(apiConfiguration, this.httpManager);
           // 分析最稳定域名
           initAnalyseDomainThread();
           // 数据上报
           initReportDataThread();
           if (apiConfiguration.isOpenCheckHealthDataSwitch()
                   || apiConfiguration.isOpenAnalyseStableDomainSwitch()) {
               defaultGtInterceptor = new DefaultGtInterceptor(hostManager, reportDataQueue, apiConfiguration);
               // 创建拦截器 这里面其实也是开了一个口子 如果我们想增加拦截 可以在这里实现接口就行了
               this.interceptorList.add(defaultGtInterceptor);
           }
       }
      
    

3)分析

  • 校验配置
  • 从缓存中取配置
  • 处理具体的json实现
  • 赋值token url等配置
  • 创建代理类

这里面有比较有趣的是创建代理类,我们先看一下被代理的类,像不像feign的类?

 @GtApi
 public interface AuthApi {

     /**
      * 获取鉴权token接口
      *
      * @param authDTO
      * @return
      */
     @GtPost(uri = "/auth", needToken = false)
     ApiResult<TokenDTO> auth(@GtBodyParam AuthDTO authDTO);

     /**
      * 关闭鉴权token
      *
      * @param token
      * @return
      */
     @GtDelete(uri = "/auth")
     ApiResult<Void> close(@GtPathParam String token);

 }

创建jdk动态代理对象过程


/**
 * 创建代理对象
 *
 * @param apiService
 * @param <T>
 * @return
 */
public <T> T createProxy(Class<T> apiService) {
    return (T) Proxy.newProxyInstance(apiService.getClassLoader(), new Class[]{apiService}, new ApiProxyHandler());
}

class ApiProxyHandler implements InvocationHandler {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) {
        try {
            if (Object.class.equals(method.getDeclaringClass())) {
                return method.invoke(this, args);
            }
        } catch (Throwable t) {
            throw new RuntimeException(t);
        }
        final BaseParam baseParam = gtApiRegistry.get(method);
        ApiParam apiParam = new ApiParam(baseParam);
        // 解析参数 -> HTTP参数
        handleApiParam(method, args, apiParam);
        return defaultApiClient.execute(apiParam);
    }
}

当执行authApi.auth()获取token时,实际上执行的是代理类方法,我们看下代理后执行的方法
1) 判断如果被代理类不是接口 那么久直接返回了,因为个推这边被代理类必须是接口,所以算是一种校验吧

  

    try {
        if (Object.class.equals(method.getDeclaringClass())) {
            return method.invoke(this, args);
        }
    } catch (Throwable t) {
        throw new RuntimeException(t);
    }

    

2)代理类方法中心


    private ApiResult<?> doExecute(GtApiProxyFactory.ApiParam apiParam, TokenDTO token) {
        Map<String, Object> header = new HashMap<String, Object>(4);
        if (apiParam.getNeedToken()) {
            if (token == null) {
                header.put("token", refreshTokenAndGet(token));
            } else {
                header.put("token", token.getToken());
            }
        }
        String body = null;
        if (apiParam.getBody() != null) {
            body = json.toJson(apiParam.getBody());
        }
        String result = null;
        String fullUrl = genFullUrl(apiParam.getUri(), apiParam.getPathParams(), apiParam.getQueryParams());
        try {
            // 处理header
            handleHeader(header);
            beforeExecute(apiParam, header, body);
            result = httpManager.syncHttps(fullUrl, apiParam.getMethod(), header, body, CONTENT_TYPE);
            postExecute(apiParam, header, body, result);
        } catch (ApiException e) {
            handleException(apiParam, header, body, e);
            return ApiResult.fail(e.getMessage(), e.getCode());
        } finally {
            afterDoExecute(apiParam, header, body, result);
        }
        if (result == null) {
            throw new ApiException(String.format("请求失败,返回值为空。url:%s, body: %s.", fullUrl, body));
        }
  
    }
    
    

逻辑上处理上比较简单就是获取被代理类的注解,参数,请求url等信息,然后通过http发送请求,其中再增加拦截器对请求时间进行记录等。

三 结语

个推的sdk使用非常典型的jdk动态代理,算是一次jdk动态代理的实战,对他的源码查看主要能够让我们以后封装接口时多一些思考,多一些选择,还有他对缓存的使用其实就是使用map,然后用静态变量修饰,也很简单,而且当我们要扩展时也可以自己写一个api类和个推api类格式保持一致就行了,非常容易拓展,总的来说,还是很有趣的。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 存储 监控
|
2月前
|
机器学习/深度学习 开发工具
DP活动:HMI-Board以太网数据监视器(二)MQTT和LVGL
DP活动:HMI-Board以太网数据监视器(二)MQTT和LVGL
32 1
|
28天前
|
消息中间件 物联网 关系型数据库
MQTT常见问题之消息对列mqtt的历史数据查看失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
2月前
|
消息中间件 Web App开发 监控
mqtt数据问题之如何实现webRTC 协议的监控视频压测
MQTT协议是一个轻量级的消息传输协议,设计用于物联网(IoT)环境中设备间的通信;本合集将详细阐述MQTT协议的基本原理、特性以及各种实际应用场景,供用户学习和参考。
63 0
|
4月前
|
消息中间件 Shell Docker
百度搜索:蓝易云【docker rabbitmq-清空queue队列数据】
通过以上步骤,您可以使用Docker清空RabbitMQ队列的数据。这将帮助您重置队列并清除旧数据,以进行新的测试或使用。
33 0
|
4月前
|
存储 JSON 数据库
从 MQTT、InfluxDB 将数据无缝接入 TDengine,接入功能与 Logstash 类似
利用 TDengine Enterprise 和 TDengine Cloud 的数据接入功能,我们现在能够将 MQTT、InfluxDB 中的数据通过规则无缝转换至 TDengine 中,由于该功能在实现及使用上与 Logstash 类似,本文将结合 Logstash 为大家进行解读。
83 1
|
5月前
|
消息中间件 算法 关系型数据库
RocketMQ中,对一个包含200万条数据的表进行新建索引时,通常会需要锁定该
RocketMQ中,对一个包含200万条数据的表进行新建索引时,通常会需要锁定该
29 2
|
5月前
|
消息中间件 Java Spring
Spring Boot使用RabbitMq消费数据较慢解决
Spring Boot使用RabbitMq消费数据较慢解决
112 0
|
6月前
|
数据采集 JSON 移动开发
【MQTT】Esp32数据上传采集:最新mqtt插件(支持掉线、真机调试错误等问题)
【MQTT】Esp32数据上传采集:最新mqtt插件(支持掉线、真机调试错误等问题)
|
7月前
|
NoSQL 关系型数据库 MySQL
同步 MySQL 数据至 ES/Redis/MQ 等的五种方式
同步 MySQL 数据至 ES/Redis/MQ 等的五种方式
419 0

热门文章

最新文章