MetaQ 消息中间件介绍及使用

本文涉及的产品
性能测试 PTS,5000VUM额度
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: MetaQ是阿里云中间件团队设计和研发的一款分布式、队列模型的消息中间件。有如下几个特点:1. 有push、pull两种消费模式2. 支持严格的消息顺序...

MetaQ 消息中间件介绍及使用

简介

MetaQ是阿里云中间件团队设计和研发的一款分布式、队列模型的消息中间件。有如下几个特点:
  1. 有push、pull两种消费模式
  2. 支持严格的消息顺序
  3. 亿级别的堆积能力
  4. 支持消息回溯
  5. 多维度消息的查询
MetaQ的发展历史可以分成如下三个阶段:

初期:2011年基于Kafka的设计,重写并推出了MetaQ 1.0

中期:2012年对MetaQ重构升级,推出MetaQ 2.0

后期:基于RocketMQ3.0, 使用拉模型解决顺序消息和海量堆积问题,推出MetaQ 3.0

MetaQ与RocketMQ的关系

阿里内部称为MetaQ,外部称RocketMQ

基本用法

1.客户端接入

<dependency>

   <groupId>com.taobao.metaq.finalgroupId>

   <artifactId>metaq-clientartifactId>

   <version>4.2.6.Finalversion>

dependency>

2.订阅普通消息

publicclassPushConsumer {

   /**

    * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从MetaQ服务器推到了应用客户端。

    * 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法

    */

   publicstaticvoidmain(String[] args) throwsInterruptedException, MQClientException {

       /**

        * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例

        * 注意:ConsumerGroupName需要由应用来保证唯一

        * ConsumerGroupName在生产环境需要申请,非生产环境不需要

        */

       MetaPushConsumerconsumer=newMetaPushConsumer("RebalanceTest_Consumer_Group");

       /**

        * 订阅指定topic下tags分别等于TagA或TagC或TagD

        */

       consumer.subscribe("TopicTest1", "TagA || TagC || TagD");

       consumer.setConsumeMessageBatchMaxSize(3);

       /**

        * 订阅指定topic下所有消息

        * 注意:一个consumer对象可以订阅多个topic

        */

       consumer.subscribe("TopicTest2", "*");

       consumer.registerMessageListener(newMessageListenerConcurrently() {

           /**

            * 1、默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息

            * 2、如果设置为批量消费方式,要么都成功,要么都失败。

            * 3、此方法由MetaQ客户端多个线程回调,需要应用来处理并发安全问题

            * 4、抛异常与返回ConsumeConcurrentlyStatus.RECONSUME_LATER等价

            * 5、每条消息失败后,会尝试重试,重试16次都失败,则丢弃

            */

           @Override

           publicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt>msgs,

                                                           ConsumeConcurrentlyContextcontext) {

               System.out.println(Thread.currentThread().getName() +" Receive New Messages: "+msgs);

               // for (MessageExt msg : msgs) {

               // if (msg.getTags().equals("TagA")) {

               // return ConsumeConcurrentlyStatus.RECONSUME_LATER;

               // }

               // }

               returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;

           }

       });

       /**

        * Consumer对象在使用之前必须要调用start初始化,初始化一次即可

        */

       consumer.start();

       System.out.println("Consumer Started.");

   }

}

3.发送普通消息

publicclassProducer {

   publicstaticvoidmain(String[] args) throwsMQClientException, InterruptedException {

       /**

        * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例

        * 注意:ProducerGroupName需要由应用来保证唯一

        * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,

        * 因为服务器会回查这个Group下的任意一个Producer

        */

       MetaProducerproducer=newMetaProducer("manhongTestPubGroup");

       /**

        * Producer对象在使用之前必须要调用start初始化,初始化一次即可

        * 注意:切记不可以在每次发送消息时,都调用start方法

        */

       producer.start();

       /**

        * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。

        * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,

        * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,

        * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。

        */

       try {

           for (inti=0; i<20; i++) {

               {

                   Messagemsg=newMessage("Jodie_topic_1023",// topic

                           "TagA",// tag

                           "OrderID001",// key,消息的Key字段是为了唯一标识消息的,方便运维排查问题。如果不设置Key,则无法定位消息丢失原因。

                           ("Hello MetaQ").getBytes());// body

                   SendResultsendResult=producer.send(msg);

                   System.out.println(sendResult);

               }

               {

                   Messagemsg=newMessage("TopicTest2",// topic

                           "TagB",// tag

                           "OrderID0034",// key

                           ("Hello MetaQ").getBytes());// body

                   SendResultsendResult=producer.send(msg);

                   System.out.println(sendResult);

               }

               {

                   Messagemsg=newMessage("TopicTest3",// topic

                           "TagC",// tag

                           "OrderID061",// key

                           ("Hello MetaQ").getBytes());// body

                   SendResultsendResult=producer.send(msg);

                   System.out.println(sendResult);

               }

           }

       } catch (Exceptione) {

           e.printStackTrace();

       }

       /**

        * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己

        * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法

        */

       producer.shutdown();

   }

}

4.主动pull

publicclassPullConsumer {

   privatestaticfinalMap<MessageQueue, Long>offseTable=newHashMap<MessageQueue, Long>();

   publicstaticvoidmain(String[] args) throwsMQClientException {

       MetaPullConsumerconsumer=newMetaPullConsumer("please_rename_unique_group_name_5");

       consumer.start();

       Set<MessageQueue>mqs=consumer.fetchSubscribeMessageQueues("TopicTest");

       for (MessageQueuemq : mqs) {

           System.out.println("Consume from the queue: "+mq);

           PullResultpullResult;

           try {

               pullResult=consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);

               System.out.println(pullResult);

               putMessageQueueOffset(mq, pullResult.getNextBeginOffset());

               switch (pullResult.getPullStatus()) {

                   caseFOUND:

                       // TODO

                       break;

                   caseNO_MATCHED_MSG:

                       break;

                   caseNO_NEW_MSG:

                       break;

                   caseOFFSET_ILLEGAL:

                       break;

                   default:

                       break;

               }

           } catch (Exceptione) {

               e.printStackTrace();

           }

       }

       consumer.shutdown();

   }

   privatestaticlonggetMessageQueueOffset(MessageQueuemq) {

       Longoffset=offseTable.get(mq);

       if (offset!=null)

           returnoffset;

       return0;

   }

   privatestaticvoidputMessageQueueOffset(MessageQueuemq, longoffset) {

       offseTable.put(mq, offset);

   }

}

5.除以上功能外,MetaQ还支持发送顺序消息、订阅顺序消息、订阅广播消息、单元化消息订阅与发送等

常用功能及说明

1. MetaQ的控制台操作地址
2. 查询环境地址:curl http://jmenv.tbsite.net:8080/env

image-20211129162548306.png

3. 关于消息消费的两种方式

一种是pull,即消费者主动去broke拉取;一种是push,主动推送给消费者。

image-20211129173825182.png

关于两者的详细区别可以参考:原文地址

总结

MetaQ是一款功能强大的消息中间件,基于消费者订阅及发布模式,支持Topic管理、Message查询、消息的消费和生产、监控警报等功能。

笔者在实际使用过程中,发现MetaQ有着易于上手使用便捷的特点,对开发中较为友好,但同时发现其在使用场景上也有需要注意的点,譬如MetaQ不保证消息不重复、消息进入死信队列就不会继续投递应用等。

如果浩鲸的同僚想进一步深入了解MetaQ,可以去查看官方文档,结合实际场景使用它,实践出真知!

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 缓存 IDE
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
|
5月前
|
消息中间件 存储 Kafka
消息中间件MetaQ中的ConsumerGroup是指什么
消息中间件MetaQ中的ConsumerGroup是指什么
|
5月前
|
消息中间件 微服务
微服务订阅问题之Consumer的订阅规则如何解决
微服务订阅问题之Consumer的订阅规则如何解决
|
5月前
|
消息中间件 Cloud Native 中间件
云原生中间件问题之消息中间件MetaQ中的NameServer如何解决
云原生中间件问题之消息中间件MetaQ中的NameServer如何解决
|
消息中间件 存储 中间件
|
消息中间件
淘宝metaq开源消息中间件
http://metaq.taobao.org
1326 6
|
消息中间件 API Windows
|
存储 消息中间件 关系型数据库
消息中间件MetaQ高性能原因分【对外公布版本】
## 简介 前面写了关于文件系统的三篇文章,[深入浅出文件系统][文件系统之读写基础篇],[文件系统之读写高级篇],此篇算是对文件系统相关文章的一个总结。 `MetaQ`是一款高性能的消息中间件,经过几年的发展,已经非常成熟稳定,历经多年双11的零点峰值压测,表现堪称完美。 `MetaQ`当前最新最稳定的稳本是`3.x`系统,`MetaQ 3.x`重新设计和实现,比之前的版本更优秀。
1866 4
|
消息中间件 存储 MySQL
消息中间件MetaQ高性能原因分析
## 备注 该篇是由原作者 傅冲 提供 ## 简介 `MetaQ`是一款高性能的消息中间件,经过几年的发展,已经非常成熟稳定,历经多年双11的零点峰值压测,表现堪称完美。 `MetaQ`当前最新最稳定的稳本是`3.x`系统,`MetaQ 3.x`重新设计和实现,比之前的版本更优秀。
7288 4
|
消息中间件 测试技术