RocketMQ系列(三)消息的生产与消费

简介:

前面的章节,我们已经把RocketMQ的环境搭建起来了,是一个两主两从的异步集群。接下来,我们就看看怎么去使用RocketMQ,在使用之前,先要在NameServer中创建Topic,我们知道RocketMQ是基于Topic的消息队列,在生产者发送消息的时候,要指定消息的Topic,这个Topic的路由规则是怎样的,这些都要在NameServer中去创建。

Topic的创建

我们先看看Topic的命令是如何使用的,如下:

./bin/mqadmin updateTopic -h

usage: mqadmin updateTopic -b <arg> | -c <arg>  [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>] -t
       <arg> [-u <arg>] [-w <arg>]
 -b,--brokerAddr <arg>       create topic to which broker
 -c,--clusterName <arg>      create topic to which cluster
 -h,--help                   Print help
 -n,--namesrvAddr <arg>      Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
 -o,--order <arg>            set topic's order(true|false)
 -p,--perm <arg>             set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]
 -r,--readQueueNums <arg>    set read queue nums
 -s,--hasUnitSub <arg>       has unit sub (true|false)
 -t,--topic <arg>            topic name
 -u,--unit <arg>             is unit topic (true|false)
 -w,--writeQueueNums <arg>   set write queue nums

其中有一段,-b <arg> | -c <arg>,说明这个Topic可以指定集群,也可以指定队列,我们先创建一个Topic指定集群,因为集群中有两个队列broker-abroker-b,看看我们的消息是否在两个队列中负载;然后再创建一个Topic指向broker-a,再看看这个Topic的消息是不是只在broker-a中。

创建两个Topic,

./bin/mqadmin updateTopic -c 'RocketMQ-Cluster' -t cluster-topic -n '192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876'

./bin/mqadmin updateTopic -b 192.168.73.130:10911 -t broker-a-topic

第一个命令创建了一个集群的Topic,叫做cluster-topic;第二个命令创建了一个只在broker-a中才有的Topic,我们指定了-b 192.168.73.130:10911,这个是broker-a的地址和端口。

生产者发送消息

我们新建SpringBoot项目,然后引入RocketMQ的jar包,

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
</dependency>

然后配置一下生产者的客户端,在这里使用@Configuration这个注解,具体如下:

@Configuration
public class RocketMQConfig {

    @Bean(initMethod = "start",destroyMethod = "shutdown")
    public DefaultMQProducer producer() {
        DefaultMQProducer producer = new
                DefaultMQProducer("DefaultMQProducer");
                                            producer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");
        return producer;
    }
}
  • 首先创建一个生产者组,名字叫做DefaultMQProducer;
  • 然后指定NameServer,192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;
  • 最后在@Bean注解中指定初始化的方法,和销毁的方法;

这样,生产者的客户端就配置好了,然后再写个Test类,在Test类中向MQ中发送消息,如下,

@SpringBootTest
class RocketmqDemoApplicationTests {

    @Autowired
    public DefaultMQProducer defaultMQProducer;

    @Test
    public void producerTest() throws Exception {

        for (int i = 0;i<5;i++) {
            Message message = new Message();
            message.setTopic("cluster-topic");
            message.setKeys("key-"+i);
            message.setBody(("this is simpleMQ,my NO is "+i).getBytes());

            SendResult sendResult = defaultMQProducer.send(message);
            System.out.println("SendStatus:" + sendResult.getSendStatus());
            System.out.println("BrokerName:" + sendResult.getMessageQueue().getBrokerName());
        }
    }
}
  • 我们先自动注入前面配置DefaultMQProducer;
  • 然后在Test方法中,循环5次,发送5个消息,消息的Topic指定为cluster-topic,是集群的消息,然后再设置消息的key和内容,最后调用send方法发送消息,这个send方法是同步方法,程序运行到这里会阻塞,等待返回的结果;
  • 最后,我们打印出返回的结果和broker的名字;

运行一下,看看结果:

SendStatus:SEND_OK
BrokerName:broker-b
SendStatus:SEND_OK
BrokerName:broker-b
SendStatus:SEND_OK
BrokerName:broker-b
SendStatus:SEND_OK
BrokerName:broker-b
SendStatus:SEND_OK
BrokerName:broker-a

5个消息发送都是成功的,而发送的队列有4个是broker-b,1个broker-a,说明两个broker之间还是有负载的,负载的规则我们猜测是随机。

我们再写个测试方法,看看broker-a-topic这个Topic的发送结果是什么样子的,如下:

@Test
public void brokerTopicTest() throws Exception {

    for (int i = 0;i<5;i++) {
        Message message = new Message();
        message.setTopic("broker-a-topic");
        message.setKeys("key-"+i);
        message.setBody(("this is broker-a-topic's MQ,my NO is "+i).getBytes());

        defaultMQProducer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("SendStatus:" + sendResult.getSendStatus());
                System.out.println("BrokerName:" + sendResult.getMessageQueue().getBrokerName());
            }

            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
            }
        });

        System.out.println("异步发送 i="+i);

    }
}
  • 消息的Topic指定的是broker-a-topic,这个Topic我们只指定了broker-a这个队列;
  • 发送的时候我们使用的是异步发送,程序到这里不会阻塞,而是继续向下执行,发送的结果正常或者异常,会调用对应的onSuccess和onException方法;
  • 我们在onSuccess方法中,打印出发送的结果和队列的名称;

运行一下,看看结果:

异步发送 i=0
异步发送 i=1
异步发送 i=2
异步发送 i=3
异步发送 i=4
SendStatus:SEND_OK
SendStatus:SEND_OK
SendStatus:SEND_OK
SendStatus:SEND_OK
BrokerName:broker-a
SendStatus:SEND_OK
BrokerName:broker-a
BrokerName:broker-a
BrokerName:broker-a
BrokerName:broker-a

由于我们是异步发送,所以最后的日志先打印了出来,然后打印出返回的结果,都是发送成功的,并且队列都是broker-a,完全符合我们的预期。

消费者

生产的消息已经发送到了队列当中,再来看看消费者端如何消费这个消息,我们在这个配置类中配置消费者,如下:

@Bean(initMethod = "start",destroyMethod = "shutdown")
public DefaultMQPushConsumer pushConsumer() throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultMQPushConsumer");
    consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");
    consumer.subscribe("cluster-topic","*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            if (msgs!=null&&msgs.size()>0) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                    System.out.println(context.getMessageQueue().getBrokerName());
                }
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    } );
    return consumer;
}
  • 我们创建了一个消费者组,名字叫做DefaultMQPushConsumer;
  • 然后指定NameServer集群,192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;
  • 消费者订阅的Topic,这里我们订阅的是cluster-topic,后面的*号是对应的tag,代表我们订阅所有的tag;
  • 最后注册一个并发执行的消息监听器,实现里边的consumeMessage方法,在方法中,我们打印出消息体的内容,和消息所在的队列;
  • 如果消息消费成功,返回CONSUME_SUCCESS,如果出现异常等情况,我们要返回RECONSUME_LATER,说明这个消息还要再次消费;

好了,这个订阅了cluster-topic的消费者,配置完了,我们启动一下项目,看看消费的结果如何,

this is simpleMQ,my NO is 2
broker-b
this is simpleMQ,my NO is 3
broker-b
this is simpleMQ,my NO is 1
broker-b
this is simpleMQ,my NO is 0
broker-a
this is simpleMQ,my NO is 4
broker-b

结果符合预期,cluster-topic中的5个消息全部消费成功,而且队列是4个broker-b,1个broker-a,和发送时的结果是一致的。

大家有问题欢迎评论区讨论~

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
767 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
5月前
|
消息中间件 网络协议 RocketMQ
消息队列 MQ产品使用合集之broker开启proxy,启动之后producer生产消息始终都只到一个broker,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 Java
Java操作RabbitMQ单一生产-消费者模式
Java操作RabbitMQ单一生产-消费者模式
65 0
|
消息中间件 Java Maven
Java整合RabbitMQ实现生产消费(7种通讯方式)
Java整合RabbitMQ实现生产消费(7种通讯方式)
272 0
|
6月前
|
消息中间件 Java Spring
一文看懂Spring Boot整合Rabbit MQ实现多种模式的生产和消费
一文看懂Spring Boot整合Rabbit MQ实现多种模式的生产和消费
304 0
|
消息中间件 中间件 Kafka
RocketMQ源码(一)RocketMQ消息生产及消费通信链路源码分析
**RocketMQ**的核心架构主要分为Broker、Producer、Consumer,通过阅读源码看到他们之间是通过Netty来通信的 ,具体来说Broker端是**Netty服务器**用来负责与客户端的连接请求处理,而Producer/Consumer端是**Netty客户端**用来负责与Netty服务器的通信及请求响应处理。
188 1
|
消息中间件 存储 安全
Java整合RocketMQ实现生产消费
Java整合RocketMQ实现生产消费
428 0
|
存储 消息中间件 Cloud Native
RocketMQ 消息收发弹性--生产集群如何解决大促场景消息收发的弹性&降本诉求|学习笔记
快速学习 RocketMQ 消息收发弹性--生产集群如何解决大促场景消息收发的弹性&降本诉求
246 0
RocketMQ 消息收发弹性--生产集群如何解决大促场景消息收发的弹性&降本诉求|学习笔记
|
消息中间件 存储 缓存
简述RocketMQ消息拉取过程【二】
简述RocketMQ消息拉取过程【二】
1102 1
|
消息中间件 Java
Java操作RabbitMQ单一生产-消费者模式
Java操作RabbitMQ单一生产-消费者模式