RabbitMq数据发送监听

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: RabbitMq数据发送监听

一 背景

项目要集成个推能力,于是查阅了个推的文档,个推有提供sdk,集成后只需要调用sdk中的api接口就可以了,还是很方便的,有趣的是个推的sdk项目代码也是开源的,于是出于好奇,开始分析了下他的源码结构。

二 开扒

  1. 集成

    引入maven依赖

    <dependency>
    
        <groupId>com.getui.push</groupId>
    
        <artifactId>restful-sdk</artifactId>
    
        <version>1.0.0.7</version>
    
    </dependency>
    
    2. demo
    
    public class TestCreatApi {
    
       public void test() {
    
       // 设置httpClient最大连接数,当并发较大时建议调大此参数。或者启动参数加上 -Dhttp.maxConnections=200
    
       System.setProperty("http.maxConnections", "200");
       GtApiConfiguration apiConfiguration = new GtApiConfiguration(); 
       //填写应用配置 apiConfiguration.setAppId("xxx"); apiConfiguration.setAppKey("xxx");
       apiConfiguration.setMasterSecret("xxx"); 
       // 接口调用前缀,请查看文档: 接口调用规范 -> 接口前缀, 可不填写appId apiConfiguration.setDomain("https://restapi.getui.com/v2/");
       // 实例化ApiHelper对象,用于创建接口对象
       ApiHelper apiHelper = ApiHelper.build(apiConfiguration); 
       // 创建对象,建议复用。目前有PushApi、StatisticApi、UserApi
       PushApi pushApi = apiHelper.creatApi(PushApi.class); 
       } }
    

    3 解析

    实际上 sdk就是对api的封装,我们可以看他们别人的封装 思考自己封装接口的暴露

    1)对象

    • GtApiConfiguration 对象 保存个推配置
    • GtApiProxyFactory 管理创建代理工厂类
    • ApiHelper api助手类
    • DefaultApiClient

       * 1. 管理token
      
       * 2. 解析请求参数
      
       * 3. 发送HTTP请求
      
       * 4. 解析请求结果

    2) 调用流程

      首先,构造GtApiConfiguration类,传入密钥等
      其次,构建ApiHelper类,这个类相当一个核心类,里面包含了很多功能,代码其实比较简单
      
      
      ```
    
           private static final Map<String, ApiHelper> apiHelperCache = new HashMap<String, ApiHelper>(4);
    
           /**
           * @param configuration 配置信息类
           * 创建api助手
           * @return
           */
           public static ApiHelper build(GtApiConfiguration configuration) {
               return build(configuration, new DefaultJson());
           }
           /**
           * @param configuration 配置信息类
           * @return
           */
           public static ApiHelper build(GtApiConfiguration configuration, IJson json) {
           Assert.notNull(configuration, "configuration");
           // 校验配置
           configuration.check();
           // 生成缓存key
           String key = configuration.keyOfCache();
           // 从缓存中获取apiHelper 注意apiHelperCache是 不可变得常量 且 静态的
           ApiHelper apiHelper = apiHelperCache.get(key);
           if (apiHelper != null) {
               return apiHelper;
           }
           // 如果没有传入 就用个推自己的DefaultJson ,因为接口参数是IJson,所以给外部开发者留了口子
           if (json == null) {
               json = new DefaultJson();
           }
           // 锁在静态常量上,锁的是整个类对象
           synchronized (BUILD_LOCK) {
               // 所有缓存都是map中获取
               apiHelper = apiHelperCache.get(key);
               if (apiHelper != null) {
                   return apiHelper;
               }
               // 构建DefaultApiClient
               final DefaultApiClient defaultApiClient = DefaultApiClient.build(configuration, json);
               GtApiProxyFactory factory = GtApiProxyFactory.build(defaultApiClient);
               // 创建代理对象 代理AuthApi
               final AuthApi authApi = factory.createProxy(AuthApi.class);
               // 这里面执行了获取token的方法将token保存在了DefaultApiClient中
               defaultApiClient.setAuthApiAndAuth(authApi);
               // DefaultApiClient又在GtApiProxyFactory中,最终放到了ApiHelper中
               apiHelper = new ApiHelper(factory);
               apiHelperCache.put(key, apiHelper);
               return apiHelper;
           }
       }
       
       
       /**
       * 生成缓存的key
       *
       * @return 缓存key
       */
       public String keyOfCache() {
           check();
           return String.format("%s|%s|%s", this.getAppId(), this.getAppKey(), this.getMasterSecret());
       }
       
    
    
       private DefaultApiClient(GtApiConfiguration apiConfiguration, IJson json) {
           if (apiConfiguration == null) {
               throw new ApiException("apiConfiguration cannot be null.", true);
           }
           this.json = json;
           apiConfiguration.check();
           this.apiConfiguration = apiConfiguration;
           // 创建http连接
           this.httpManager = new HttpManager(apiConfiguration.getConnectTimeout(),
                   apiConfiguration.getSoTimeout(), apiConfiguration.getConnectionRequestTimeout(),
                   apiConfiguration.getMaxHttpTryTime(), apiConfiguration.getKeepAliveSeconds(),
                   apiConfiguration.getProxyConfig(), apiConfiguration.isTrustSSL());
    
           this.hostManager = new HostManager(apiConfiguration, this.httpManager);
           // 分析最稳定域名
           initAnalyseDomainThread();
           // 数据上报
           initReportDataThread();
           if (apiConfiguration.isOpenCheckHealthDataSwitch()
                   || apiConfiguration.isOpenAnalyseStableDomainSwitch()) {
               defaultGtInterceptor = new DefaultGtInterceptor(hostManager, reportDataQueue, apiConfiguration);
               // 创建拦截器 这里面其实也是开了一个口子 如果我们想增加拦截 可以在这里实现接口就行了
               this.interceptorList.add(defaultGtInterceptor);
           }
       }
      
    

3)分析

  • 校验配置
  • 从缓存中取配置
  • 处理具体的json实现
  • 赋值token url等配置
  • 创建代理类

这里面有比较有趣的是创建代理类,我们先看一下被代理的类,像不像feign的类?

 @GtApi
 public interface AuthApi {

     /**
      * 获取鉴权token接口
      *
      * @param authDTO
      * @return
      */
     @GtPost(uri = "/auth", needToken = false)
     ApiResult<TokenDTO> auth(@GtBodyParam AuthDTO authDTO);

     /**
      * 关闭鉴权token
      *
      * @param token
      * @return
      */
     @GtDelete(uri = "/auth")
     ApiResult<Void> close(@GtPathParam String token);

 }

创建jdk动态代理对象过程


/**
 * 创建代理对象
 *
 * @param apiService
 * @param <T>
 * @return
 */
public <T> T createProxy(Class<T> apiService) {
    return (T) Proxy.newProxyInstance(apiService.getClassLoader(), new Class[]{apiService}, new ApiProxyHandler());
}

class ApiProxyHandler implements InvocationHandler {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) {
        try {
            if (Object.class.equals(method.getDeclaringClass())) {
                return method.invoke(this, args);
            }
        } catch (Throwable t) {
            throw new RuntimeException(t);
        }
        final BaseParam baseParam = gtApiRegistry.get(method);
        ApiParam apiParam = new ApiParam(baseParam);
        // 解析参数 -> HTTP参数
        handleApiParam(method, args, apiParam);
        return defaultApiClient.execute(apiParam);
    }
}

当执行authApi.auth()获取token时,实际上执行的是代理类方法,我们看下代理后执行的方法
1) 判断如果被代理类不是接口 那么久直接返回了,因为个推这边被代理类必须是接口,所以算是一种校验吧

  

    try {
        if (Object.class.equals(method.getDeclaringClass())) {
            return method.invoke(this, args);
        }
    } catch (Throwable t) {
        throw new RuntimeException(t);
    }

    

2)代理类方法中心


    private ApiResult<?> doExecute(GtApiProxyFactory.ApiParam apiParam, TokenDTO token) {
        Map<String, Object> header = new HashMap<String, Object>(4);
        if (apiParam.getNeedToken()) {
            if (token == null) {
                header.put("token", refreshTokenAndGet(token));
            } else {
                header.put("token", token.getToken());
            }
        }
        String body = null;
        if (apiParam.getBody() != null) {
            body = json.toJson(apiParam.getBody());
        }
        String result = null;
        String fullUrl = genFullUrl(apiParam.getUri(), apiParam.getPathParams(), apiParam.getQueryParams());
        try {
            // 处理header
            handleHeader(header);
            beforeExecute(apiParam, header, body);
            result = httpManager.syncHttps(fullUrl, apiParam.getMethod(), header, body, CONTENT_TYPE);
            postExecute(apiParam, header, body, result);
        } catch (ApiException e) {
            handleException(apiParam, header, body, e);
            return ApiResult.fail(e.getMessage(), e.getCode());
        } finally {
            afterDoExecute(apiParam, header, body, result);
        }
        if (result == null) {
            throw new ApiException(String.format("请求失败,返回值为空。url:%s, body: %s.", fullUrl, body));
        }
  
    }
    
    

逻辑上处理上比较简单就是获取被代理类的注解,参数,请求url等信息,然后通过http发送请求,其中再增加拦截器对请求时间进行记录等。

三 结语

个推的sdk使用非常典型的jdk动态代理,算是一次jdk动态代理的实战,对他的源码查看主要能够让我们以后封装接口时多一些思考,多一些选择,还有他对缓存的使用其实就是使用map,然后用静态变量修饰,也很简单,而且当我们要扩展时也可以自己写一个api类和个推api类格式保持一致就行了,非常容易拓展,总的来说,还是很有趣的。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 安全 物联网
MQTT常见问题之新增自定义主题后平台侧收不到发布的数据如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
3月前
|
物联网 网络性能优化 API
MQTT常见问题之单个消息发送数据不能超过64k如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
3月前
|
消息中间件 存储 监控
|
3月前
|
机器学习/深度学习 开发工具
DP活动:HMI-Board以太网数据监视器(二)MQTT和LVGL
DP活动:HMI-Board以太网数据监视器(二)MQTT和LVGL
57 1
|
21天前
|
消息中间件 Java 测试技术
消息队列 MQ使用问题之数据流出规则是否支持平台的云RabbitMQ
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
EMQ
|
1月前
|
传感器 人工智能 安全
EMQX 与 MQTT: AI 大模型时代的分布式数据中枢
在以数据为核心的 AI 时代,基于 MQTT 协议的消息服务器 EMQX 能帮助企业更好的利用人工智能和机器学习模型,是智能化系统中核心的数据基础软件。
EMQ
164 3
|
21天前
|
消息中间件 监控 物联网
消息队列 MQ使用问题之如何获取和处理消息堆积数据
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
21天前
|
消息中间件 数据安全/隐私保护 RocketMQ
消息队列 MQ使用问题之如何设置nameserver监听的IP
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3月前
|
消息中间件 Shell 数据处理
rocket mq 查看消费进度,消息堆积,清除堆积数据命令
该内容是关于RocketMQ的消费进度管理和堆积数据处理的指导。首先,需进入RocketMQ的bin目录,然后使用`mqadmin consumerProgress`命令查看消费者或生产者的消费进度。`broker offset`和`consumer offset`的差值表示未消费消息。通过`resetOffsetByTime`命令可重置消费位点来清除堆积数据,未消费消息默认3天后会被丢弃。此外,`CONSUME_FROM WHERE`枚举类定义了消费起点选项,包括从最后、最开始或指定时间点消费。
689 3
|
3月前
|
消息中间件 JavaScript Java
MQ产品使用合集之视觉智能平台人脸搜索1:N怎么更新人脸数据
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。