消息消费基本流程|学习笔记

简介: 快速学习消息消费基本流程

开发者学堂课程【RocketMQ知识精讲与项目实战(第一阶段)消息消费基本流程】学习笔记,与课程紧密联系,让用户快速学习知识

课程地址:https://developer.aliyun.com/learning/course/702/detail/12379


消息消费基本流程


如何使用 Java 代码实现消息消费

消费消息的基本流程:

1. 创建消费者 Consumerl,制定消费者组名

2. 指定 Nameserver 地址

3. 订阅主题 Topic 和 Tag

4. 设置回调函数,处理消息

5. 启动消费者 consumer

接下来用代码实现:

public static void main(String[]args) throws Exception{

//1.创建消费者 consumer,制定消费者组名;

DefaultMQPushConsumer  consumer=new DefaultMQPushConsumer ( consumerGroup ."group

//2.指定 Name server 地址consumer. setNamesrvAddr ("192.168.P5.135:9876;192.168.25.138:9876");

//3.订阅主题 Topic 和 Tag

consumer, subscribe(topic,"base", sub  Expression ""Tag 1");

//4.设置回调函数,处理消息consumer. registerMessageListener (newMessageListenerConcurrently ()(

//接受消息内容

public consumer  concurrentlystatus  consumer research(List(Wests) massiage Ext) mass,  consumercientlycontext context)System. out. println(msg); return ConsumeConcurrentlyStatus .CONSUME SUCCESS;

}

});

//5.启动消费者 consumer.

consumer. start();

}

现在消息以及被成功消费,是由同步的消息发送者发送的,也有消息的内容,已经看到消息的消费了。

把消息遍历一下,遍历 msg,每一个 msg 可以获得一个数组,结果就会正常接收到。

负载均衡模式

消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处

理的消息不同

public static void main(string[]args) throws Exception{

//实例化消息生产者,指定组名

DefaultMQPushConsumer  consumer=new  DefaultMQPushConsumer ("group 1");

//指定 Namesrv 地址信息.

consumer. setNamesrvAddr ("local host:9876");

//订阅 Topic

consumer, subscribe("Test","*");

//负载均衡模式消费

consumer. setMessageModel ( MessageModel , CLUSTERING );

//注册回调函数,处理消息

consumer. registerMessageListener (new MessageListenerConcurrently (){

@override

public  Consumecon   currentlyStatus  consume Message(List< MessageExt >msgs,

ConsumeConcurrentlycontext  context){

system, out, printf("%s Receive New Messages:%s%n",

Thread, currentThread ().getName(), msgs);

return  ConsumeConcurrentlystatus .CONSUME SUCCESS;

}

//启动消息者

consumer, start();

system, out, printf("consumer started,%n");

}

相关文章
|
3月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
3月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
73 0
|
5月前
|
消息中间件 Java
【消息队列开发】 实现消费者订阅消息
【消息队列开发】 实现消费者订阅消息
|
5月前
|
消息中间件 网络协议 物联网
消息队列 MQ产品使用合集之如何让消费者不从最开始进行消费,而是从最后一条消息开始消费
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 Apache RocketMQ
消息队列 MQ产品使用合集之是否提供机制检测消费的状态
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
5月前
|
消息中间件 存储 负载均衡
消息队列 MQ产品使用合集之POP消费模式是否可以保证消息顺序性
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
消息中间件 存储 算法
RocketMQ 消息集成:多类型业务消息——定时消息
本篇将继续业务消息集成的场景,从使用场景、应用案例、功能原理以及最佳实践等角度介绍 RocketMQ 的定时消息功能。
466 0
RocketMQ  消息集成:多类型业务消息——定时消息
|
消息中间件 NoSQL 关系型数据库
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
实战:如何防止mq消费方消息重复消费 如果因为网络延迟等原因,mq无法及时接收到消费方的应答,导致mq重试。(计算机网络)。在重试过程中造成重复消费的问题
2736 1
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
|
消息中间件 Java RocketMQ
消息消费要注意的细节|学习笔记
快速学习消息消费要注意的细节
消息消费要注意的细节|学习笔记
|
消息中间件 RocketMQ 开发者
消息消费方准备工作|学习笔记
快速学习消息消费方准备工作
消息消费方准备工作|学习笔记