MetaQ 消息中间件介绍及使用
简介
MetaQ是阿里云中间件团队设计和研发的一款分布式、队列模型的消息中间件。有如下几个特点:
- 有push、pull两种消费模式
- 支持严格的消息顺序
- 亿级别的堆积能力
- 支持消息回溯
- 多维度消息的查询
MetaQ的发展历史可以分成如下三个阶段:
初期:2011年基于Kafka的设计,重写并推出了MetaQ 1.0
中期:2012年对MetaQ重构升级,推出MetaQ 2.0
后期:基于RocketMQ3.0, 使用拉模型解决顺序消息和海量堆积问题,推出MetaQ 3.0
MetaQ与RocketMQ的关系
阿里内部称为MetaQ,外部称RocketMQ
基本用法
1.客户端接入
<dependency>
<groupId>com.taobao.metaq.finalgroupId>
<artifactId>metaq-clientartifactId>
<version>4.2.6.Finalversion>
dependency>
2.订阅普通消息
publicclassPushConsumer {
/**
* 当前例子是PushConsumer用法,使用方式给用户感觉是消息从MetaQ服务器推到了应用客户端。
* 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法
*/
publicstaticvoidmain(String[] args) throwsInterruptedException, MQClientException {
/**
* 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例
* 注意:ConsumerGroupName需要由应用来保证唯一
* ConsumerGroupName在生产环境需要申请,非生产环境不需要
*/
MetaPushConsumerconsumer=newMetaPushConsumer("RebalanceTest_Consumer_Group");
/**
* 订阅指定topic下tags分别等于TagA或TagC或TagD
*/
consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
consumer.setConsumeMessageBatchMaxSize(3);
/**
* 订阅指定topic下所有消息
* 注意:一个consumer对象可以订阅多个topic
*/
consumer.subscribe("TopicTest2", "*");
consumer.registerMessageListener(newMessageListenerConcurrently() {
/**
* 1、默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
* 2、如果设置为批量消费方式,要么都成功,要么都失败。
* 3、此方法由MetaQ客户端多个线程回调,需要应用来处理并发安全问题
* 4、抛异常与返回ConsumeConcurrentlyStatus.RECONSUME_LATER等价
* 5、每条消息失败后,会尝试重试,重试16次都失败,则丢弃
*/
@Override
publicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt>msgs,
ConsumeConcurrentlyContextcontext) {
System.out.println(Thread.currentThread().getName() +" Receive New Messages: "+msgs);
// for (MessageExt msg : msgs) {
// if (msg.getTags().equals("TagA")) {
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
// }
// }
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/**
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可
*/
consumer.start();
System.out.println("Consumer Started.");
}
}
3.发送普通消息
publicclassProducer {
publicstaticvoidmain(String[] args) throwsMQClientException, InterruptedException {
/**
* 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例
* 注意:ProducerGroupName需要由应用来保证唯一
* ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
* 因为服务器会回查这个Group下的任意一个Producer
*/
MetaProducerproducer=newMetaProducer("manhongTestPubGroup");
/**
* Producer对象在使用之前必须要调用start初始化,初始化一次即可
* 注意:切记不可以在每次发送消息时,都调用start方法
*/
producer.start();
/**
* 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
* 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,
* 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,
* 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
*/
try {
for (inti=0; i<20; i++) {
{
Messagemsg=newMessage("Jodie_topic_1023",// topic
"TagA",// tag
"OrderID001",// key,消息的Key字段是为了唯一标识消息的,方便运维排查问题。如果不设置Key,则无法定位消息丢失原因。
("Hello MetaQ").getBytes());// body
SendResultsendResult=producer.send(msg);
System.out.println(sendResult);
}
{
Messagemsg=newMessage("TopicTest2",// topic
"TagB",// tag
"OrderID0034",// key
("Hello MetaQ").getBytes());// body
SendResultsendResult=producer.send(msg);
System.out.println(sendResult);
}
{
Messagemsg=newMessage("TopicTest3",// topic
"TagC",// tag
"OrderID061",// key
("Hello MetaQ").getBytes());// body
SendResultsendResult=producer.send(msg);
System.out.println(sendResult);
}
}
} catch (Exceptione) {
e.printStackTrace();
}
/**
* 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
* 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
*/
producer.shutdown();
}
}
4.主动pull
publicclassPullConsumer {
privatestaticfinalMap<MessageQueue, Long>offseTable=newHashMap<MessageQueue, Long>();
publicstaticvoidmain(String[] args) throwsMQClientException {
MetaPullConsumerconsumer=newMetaPullConsumer("please_rename_unique_group_name_5");
consumer.start();
Set<MessageQueue>mqs=consumer.fetchSubscribeMessageQueues("TopicTest");
for (MessageQueuemq : mqs) {
System.out.println("Consume from the queue: "+mq);
PullResultpullResult;
try {
pullResult=consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.println(pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
caseFOUND:
// TODO
break;
caseNO_MATCHED_MSG:
break;
caseNO_NEW_MSG:
break;
caseOFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exceptione) {
e.printStackTrace();
}
}
consumer.shutdown();
}
privatestaticlonggetMessageQueueOffset(MessageQueuemq) {
Longoffset=offseTable.get(mq);
if (offset!=null)
returnoffset;
return0;
}
privatestaticvoidputMessageQueueOffset(MessageQueuemq, longoffset) {
offseTable.put(mq, offset);
}
}
5.除以上功能外,MetaQ还支持发送顺序消息、订阅顺序消息、订阅广播消息、单元化消息订阅与发送等
常用功能及说明
1. MetaQ的控制台操作地址
2. 查询环境地址:curl http://jmenv.tbsite.net:8080/env
3. 关于消息消费的两种方式
一种是pull,即消费者主动去broke拉取;一种是push,主动推送给消费者。
关于两者的详细区别可以参考:原文地址
总结
MetaQ是一款功能强大的消息中间件,基于消费者订阅及发布模式,支持Topic管理、Message查询、消息的消费和生产、监控警报等功能。
笔者在实际使用过程中,发现MetaQ有着易于上手使用便捷的特点,对开发中较为友好,但同时发现其在使用场景上也有需要注意的点,譬如MetaQ不保证消息不重复、消息进入死信队列就不会继续投递应用等。
如果浩鲸的同僚想进一步深入了解MetaQ,可以去查看官方文档,结合实际场景使用它,实践出真知!