RocketMQ学习(五):Pull和Push

简介:

源代码版本是3.2.6。在rocketmq里,consumer被分为2类:MQPullConsumer和MQPushConsumer,其实本质都是拉模式(pull),即consumer轮询从broker拉取消息。

区别是:

push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。

pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。

文字描述可能不是很清楚,前面的文章都是push方式的,所以这里只上pull方式的,贴代码:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

<properties>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<logback.version>1.0.13</logback.version>

<rocketmq.version>3.2.6</rocketmq.version>

</properties>

<dependencies>

<dependency>

<groupId>ch.qos.logback</groupId>

<artifactId>logback-classic</artifactId>

<version>1.0.13</version>

</dependency>

<dependency>

<groupId>ch.qos.logback</groupId>

<artifactId>logback-core</artifactId>

<version>1.0.13</version>

</dependency>

<dependency>

<groupId>com.alibaba.rocketmq</groupId>

<artifactId>rocketmq-client</artifactId>

<version>${rocketmq.version}</version>

</dependency>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.10</version>

<scope>test</scope>

</dependency>

</dependencies>

Producer:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

package com.zoo.quickstart;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

import com.alibaba.rocketmq.client.producer.SendResult;

import com.alibaba.rocketmq.common.message.Message;

/**

* Producer,发送消息

*

*/

public class Producer {

public static void main(String[] args) throws MQClientException, InterruptedException {

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

producer.setNamesrvAddr("192.168.0.104:9876");

producer.start();

for (int i = 0; i < 5; i++) {

try {

Message msg = new Message("TopicTest",// topic

"TagA",// tag

("Hello RocketMQ " + i).getBytes()// body

);

SendResult sendResult = producer.send(msg);

System.out.println(sendResult);

Thread.sleep(6000);

}

catch (Exception e) {

e.printStackTrace();

Thread.sleep(3000);

}

}

producer.shutdown();

}

}

Consumer:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

package com.zoo.quickstart.pull;

import java.util.HashMap;

import java.util.Map;

import java.util.Set;

import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;

import com.alibaba.rocketmq.client.consumer.PullResult;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.message.MessageQueue;

/**

* PullConsumer,订阅消息

*/

public class PullConsumer {

private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();

public static void main(String[] args) throws MQClientException {

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");

consumer.setNamesrvAddr("192.168.0.104:9876");

consumer.start();

Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");

for (MessageQueue mq : mqs) {

System.out.println("Consume from the queue: " + mq);

SINGLE_MQ: while (true) {

try {

PullResult pullResult =

consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);

System.out.println(pullResult);

putMessageQueueOffset(mq, pullResult.getNextBeginOffset());

switch (pullResult.getPullStatus()) {

case FOUND:

// TODO

break;

case NO_MATCHED_MSG:

break;

case NO_NEW_MSG:

break SINGLE_MQ;

case OFFSET_ILLEGAL:

break;

default:

break;

}

}

catch (Exception e) {

e.printStackTrace();

}

}

}

consumer.shutdown();

}

private static void putMessageQueueOffset(MessageQueue mq, long offset) {

offseTable.put(mq, offset);

}

private static long getMessageQueueOffset(MessageQueue mq) {

Long offset = offseTable.get(mq);

if (offset != null)

return offset;

return 0;

}

}

还有一种定时的Consumer:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

package com.zoo.quickstart.pull;

import com.alibaba.rocketmq.client.consumer.MQPullConsumer;

import com.alibaba.rocketmq.client.consumer.MQPullConsumerScheduleService;

import com.alibaba.rocketmq.client.consumer.PullResult;

import com.alibaba.rocketmq.client.consumer.PullTaskCallback;

import com.alibaba.rocketmq.client.consumer.PullTaskContext;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.message.MessageQueue;

import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;

public class PullScheduleService {

public static void main(String[] args) throws MQClientException {

final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");

scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("192.168.0.104:9876");

scheduleService.setMessageModel(MessageModel.CLUSTERING);

scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() {

@Override

public void doPullTask(MessageQueue mq, PullTaskContext context) {

MQPullConsumer consumer = context.getPullConsumer();

try {

// 获取从哪里拉取

long offset = consumer.fetchConsumeOffset(mq, false);

if (offset < 0)

offset = 0;

PullResult pullResult = consumer.pull(mq, "*", offset, 32);

System.out.println(offset + "\t" + mq + "\t" + pullResult);

switch (pullResult.getPullStatus()) {

case FOUND:

break;

case NO_MATCHED_MSG:

break;

case NO_NEW_MSG:

case OFFSET_ILLEGAL:

break;

default:

break;

}

// 存储Offset,客户端每隔5s会定时刷新到Broker

consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());

// 设置再过100ms后重新拉取

context.setPullNextDelayTimeMillis(100);

}

catch (Exception e) {

e.printStackTrace();

}

}

});

scheduleService.start();

}

}


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
7月前
|
消息中间件 JSON 缓存
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)
183 0
|
4月前
|
消息中间件 存储 数据库
深入学习RocketMQ的底层存储设计原理
文章深入探讨了RocketMQ的底层存储设计原理,分析了其如何通过将数据和索引映射到内存、异步刷新磁盘以及消息内容的混合存储来实现高性能的读写操作,从而保证了RocketMQ作为一款低延迟消息队列的读写性能。
|
消息中间件 中间件 Kafka
RocketMQ源码(二)消息消费的模式到底是Push还是Pull?
RocketMQ源码(二)消息消费的模式到底是Push还是Pull?
134 1
|
7月前
|
消息中间件 存储 数据安全/隐私保护
深入学习RabbitMQ五种模式(一)
深入学习RabbitMQ五种模式(一)
80 0
|
6月前
|
消息中间件 IDE 数据库
RocketMQ事务消息学习及刨坑过程
RocketMQ事务消息学习及刨坑过程
|
7月前
|
消息中间件 存储 负载均衡
消息队列学习之RabbitMQ
【4月更文挑战第3天】消息队列学习之RabbitMQ,一种基于erlang语言开发的流行的开源消息中间件。
63 0
|
7月前
|
消息中间件 Java RocketMQ
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
55 1
|
7月前
|
消息中间件 存储 NoSQL
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
65 1
|
7月前
|
消息中间件 存储 监控
写了10000字:全面学习RocketMQ中间件
以上是 V 哥在授课时整理的全部 RocketMQ 的内容,在学习时重点要理解其中的含义,正所谓知其然知其所以然,希望这篇文章可以帮助兄弟们搞清楚RocketMQ的来龙去脉,必竟这是一个非常常用的分布式应用的中间件,好了,今天的内容就分享到这,我靠!已经 00:36分,建议收藏起来,慢慢消化,创作不易,喜欢请点赞转发。
1076 0