MetaQ 消息中间件介绍及使用-阿里云开发者社区

开发者社区> 武汉-蔡昊> 正文

MetaQ 消息中间件介绍及使用

简介: MetaQ是阿里云中间件团队设计和研发的一款分布式、队列模型的消息中间件。有如下几个特点:1. 有push、pull两种消费模式 2. 支持严格的消息顺序...
+关注继续查看

MetaQ 消息中间件介绍及使用

简介

MetaQ是阿里云中间件团队设计和研发的一款分布式、队列模型的消息中间件。有如下几个特点:
  1. 有push、pull两种消费模式
  2. 支持严格的消息顺序
  3. 亿级别的堆积能力
  4. 支持消息回溯
  5. 多维度消息的查询
MetaQ的发展历史可以分成如下三个阶段:

初期:2011年基于Kafka的设计,重写并推出了MetaQ 1.0

中期:2012年对MetaQ重构升级,推出MetaQ 2.0

后期:基于RocketMQ3.0, 使用拉模型解决顺序消息和海量堆积问题,推出MetaQ 3.0

MetaQ与RocketMQ的关系

阿里内部称为MetaQ,外部称RocketMQ

基本用法

1.客户端接入

<dependency>

   <groupId>com.taobao.metaq.finalgroupId>

   <artifactId>metaq-clientartifactId>

   <version>4.2.6.Finalversion>

dependency>

2.订阅普通消息

public class PushConsumer {

   /**

    * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从MetaQ服务器推到了应用客户端。

    * 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法

    */

   public static void main(String[] args) throws InterruptedException, MQClientException {

       /**

        * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例

        * 注意:ConsumerGroupName需要由应用来保证唯一

        * ConsumerGroupName在生产环境需要申请,非生产环境不需要

        */

       MetaPushConsumer consumer = new MetaPushConsumer("RebalanceTest_Consumer_Group");

       /**

        * 订阅指定topic下tags分别等于TagA或TagC或TagD

        */

       consumer.subscribe("TopicTest1", "TagA || TagC || TagD");

       consumer.setConsumeMessageBatchMaxSize(3);

       /**

        * 订阅指定topic下所有消息

        * 注意:一个consumer对象可以订阅多个topic

        */

       consumer.subscribe("TopicTest2", "*");

       consumer.registerMessageListener(new MessageListenerConcurrently() {

           /**

            * 1、默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息

            * 2、如果设置为批量消费方式,要么都成功,要么都失败。

            * 3、此方法由MetaQ客户端多个线程回调,需要应用来处理并发安全问题

            * 4、抛异常与返回ConsumeConcurrentlyStatus.RECONSUME_LATER等价

            * 5、每条消息失败后,会尝试重试,重试16次都失败,则丢弃

            */

           @Override

           public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

                                                           ConsumeConcurrentlyContext context) {

               System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);

               // for (MessageExt msg : msgs) {

               // if (msg.getTags().equals("TagA")) {

               // return ConsumeConcurrentlyStatus.RECONSUME_LATER;

               // }

               // }

               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

           }

       });

       /**

        * Consumer对象在使用之前必须要调用start初始化,初始化一次即可

        */

       consumer.start();

       System.out.println("Consumer Started.");

   }

}

3.发送普通消息

public class Producer {

   public static void main(String[] args) throws MQClientException, InterruptedException {

       /**

        * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例

        * 注意:ProducerGroupName需要由应用来保证唯一

        * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,

        * 因为服务器会回查这个Group下的任意一个Producer

        */

       MetaProducer producer = new MetaProducer("manhongTestPubGroup");

       /**

        * Producer对象在使用之前必须要调用start初始化,初始化一次即可

        * 注意:切记不可以在每次发送消息时,都调用start方法

        */

       producer.start();

       /**

        * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。

        * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,

        * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,

        * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。

        */

       try {

           for (int i = 0; i < 20; i++) {

               {

                   Message msg = new Message("Jodie_topic_1023",// topic

                           "TagA",// tag

                           "OrderID001",// key,消息的Key字段是为了唯一标识消息的,方便运维排查问题。如果不设置Key,则无法定位消息丢失原因。

                           ("Hello MetaQ").getBytes());// body

                   SendResult sendResult = producer.send(msg);

                   System.out.println(sendResult);

               }

               {

                   Message msg = new Message("TopicTest2",// topic

                           "TagB",// tag

                           "OrderID0034",// key

                           ("Hello MetaQ").getBytes());// body

                   SendResult sendResult = producer.send(msg);

                   System.out.println(sendResult);

               }

               {

                   Message msg = new Message("TopicTest3",// topic

                           "TagC",// tag

                           "OrderID061",// key

                           ("Hello MetaQ").getBytes());// body

                   SendResult sendResult = producer.send(msg);

                   System.out.println(sendResult);

               }

           }

       } catch (Exception e) {

           e.printStackTrace();

       }

       /**

        * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己

        * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法

        */

       producer.shutdown();

   }

}

4.主动pull

public class PullConsumer {

   private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();

   public static void main(String[] args) throws MQClientException {

       MetaPullConsumer consumer = new MetaPullConsumer("please_rename_unique_group_name_5");

       consumer.start();

       Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");

       for (MessageQueue mq : mqs) {

           System.out.println("Consume from the queue: " + mq);

           PullResult pullResult;

           try {

               pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);

               System.out.println(pullResult);

               putMessageQueueOffset(mq, pullResult.getNextBeginOffset());

               switch (pullResult.getPullStatus()) {

                   case FOUND:

                       // TODO

                       break;

                   case NO_MATCHED_MSG:

                       break;

                   case NO_NEW_MSG:

                       break;

                   case OFFSET_ILLEGAL:

                       break;

                   default:

                       break;

               }

           } catch (Exception e) {

               e.printStackTrace();

           }

       }

       consumer.shutdown();

   }

   private static long getMessageQueueOffset(MessageQueue mq) {

       Long offset = offseTable.get(mq);

       if (offset != null)

           return offset;

       return 0;

   }

   private static void putMessageQueueOffset(MessageQueue mq, long offset) {

       offseTable.put(mq, offset);

   }

}

5.除以上功能外,MetaQ还支持发送顺序消息、订阅顺序消息、订阅广播消息、单元化消息订阅与发送等

常用功能及说明

1. MetaQ的控制台操作地址
2. 查询环境地址:curl http://jmenv.tbsite.net:8080/env

image-20211129162548306.png

3. 关于消息消费的两种方式

一种是pull,即消费者主动去broke拉取;一种是push,主动推送给消费者。

image-20211129173825182.png

关于两者的详细区别可以参考:原文地址

总结

MetaQ是一款功能强大的消息中间件,基于消费者订阅及发布模式,支持Topic管理、Message查询、消息的消费和生产、监控警报等功能。

笔者在实际使用过程中,发现MetaQ有着易于上手使用便捷的特点,对开发中较为友好,但同时发现其在使用场景上也有需要注意的点,譬如MetaQ不保证消息不重复、消息进入死信队列就不会继续投递应用等。

如果浩鲸的同僚想进一步深入了解MetaQ,可以去查看官方文档,结合实际场景使用它,实践出真知!

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
微服务消息队列(MQTT For IoT)Android Demo使用介绍
目前阿里云官方对于微消息队列 MQTT提供了很多语言的参考示例,但是在实际的使用中发现很多用户在使用Android Sample的时候总是会遇到问题,无法正常调试使用。本文主要介绍Android Sample的使用。
2559 0
阿里云服务器怎么设置密码?怎么停机?怎么重启服务器?
如果在创建实例时没有设置密码,或者密码丢失,您可以在控制台上重新设置实例的登录密码。本文仅描述如何在 ECS 管理控制台上修改实例登录密码。
10080 0
jQuery日期和时间插件(jquery-ui-timepicker-addon.js)中文破解版使用
简介 jQuery UI Datepicker日期选择插件很好用了,只不过只能精确到日,不能选择时间(小时分钟秒)很遗憾,而jquery-ui-timepicker-addon.js正是基于jQuery UI Datepicker的一款可选时间的插件。
1091 0
APNS IOS 消息推送JSON格式介绍
在开发向苹果Apns推送消息服务功能,我们需要根据Apns接受的数据格式进行推送。下面积累了我在进行apns推送时候总结的 apns服务接受的Json数据格式 示例 1: 以下负载包含哦一个简单的 aps 字典。
1920 0
阿里云服务器如何登录?阿里云服务器的三种登录方法
购买阿里云ECS云服务器后如何登录?场景不同,阿里云优惠总结大概有三种登录方式: 登录到ECS云服务器控制台 在ECS云服务器控制台用户可以更改密码、更换系.
13885 0
EMQ百万级MQTT消息服务(介绍和搭建)
先上节了解完MQTT之后我们需要选择一个MQTT服务端,在MQTT官方推荐下找了找最后选择了使用EMQ来进行服务端实现,EMQ有什么优势可以在官方推荐的那么多的服务器实现中脱颖而出,本章就来和大家一起慢慢了解EMQ相关的特性
1880 0
2
文章
0
问答
来源圈子
更多
阿里云GTS能力中心(浩鲸智能),从交付的视角探讨数字化转型过程中大型软件开发实践、以及阿里云产品在各行业被集成的案例分享、技术沉淀等内容。敬请关注!
+ 订阅
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载