【RocketMQ系列四】消息示例-简单消息的实现

简介: 【RocketMQ系列四】消息示例-简单消息的实现

1. 前言

上一篇文章我们介绍了RocketMQ集群的搭建,这篇文章将主要使用RocketMQ测试下简单消息。

2. 同步消息(生产者)

同步消息的话,消费者发布消息之后必须等集群返回成功之后才会发布下一条消息,消息的发布是同步进行的。

2.1. 测试代码
  1. 创建生产者
// 1.创建生产者对象
  DefaultMQProducer defaultMQProducer = new DefaultMQProducer("feige-producer-group");
  1. 指定nameserver
// 2.指定nameServer
defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");
  1. 因为每个nameserver都有所有broker的路由信息,所以只需要指定一个nameserver。
  2. 启动生产者发布消息
// 3.启动生产者
    defaultMQProducer.start();
    //4.创建消息
    for (int i = 0; i < 100; i++) {
      // 创建消息,指定topic,以及消息体
      Message message = new Message("base_topic", ("飞哥测试消息" + i).getBytes());
      //5.发送消息
      SendResult send = defaultMQProducer.send(message);
      System.out.println(send);
    }
    // 6.关闭生产者
    defaultMQProducer.shutdown();

创建一个名为 base_topic的topic,虽然集群中还没有这个topic,但是由于前面我们搭建集群的时候指定的可以自动创建topic autoCreateTopicEnable=true 。 然后消息体是:飞哥测试消息xxx。这里打印了集群的响应结果SendResult。

运行结果(部分结果):

SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E2250000, offsetMsgId=AC1FB85900002A9F00000000001F70B6, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=0], queueOffset=125]
SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E2410001, offsetMsgId=AC1FB85900002A9F00000000001F71A1, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=1], queueOffset=125]
SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E24D0002, offsetMsgId=AC1FB85900002A9F00000000001F728C, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=2], queueOffset=125]
SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E2570003, offsetMsgId=AC1FB85900002A9F00000000001F7377, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=3], queueOffset=125]
SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E2600004, offsetMsgId=AC1FB85900002A9F00000000001F7462, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=0], queueOffset=126]
SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E2830005, offsetMsgId=AC1FB85900002A9F00000000001F754D, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=1], queueOffset=126]
SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E28C0006, offsetMsgId=AC1FB85900002A9F00000000001F7638, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=2], queueOffset=126]
SendResult [sendStatus=SEND_OK, msgId=7F0000018D8014DAD5DC0C03E2970007, offsetMsgId=AC1FB85900002A9F00000000001F7723, messageQueue=MessageQueue [topic=base_topic, brokerName=broker-b, queueId=3], queueOffset=126]
  

这里SendResult 返回结果有几个属性需要说明下:

  1. sendStatus: 发送状态
  2. msgId:消息ID,每个消息都是唯一的
  3. offsetMsgId:偏移消息ID,在队列里的消息唯一ID
  4. messageQueue:用于指定当前这条消息落到哪个队列中,在搭建集群的时候指定一个broker有4个messageQueue。
  5. topic:当前队列所属的主题
  6. brokerName:当前队列所属的broker
  7. queueId:当前队列在broker中序号
  8. queueOffset:当前消息在队列里的偏移量。

从打印的结果可以看出,目前这100条消息是轮流的发送到broker-b中的4个队列中的。关系如下图所示:

3. 消费者

  1. 创建消费者&指定nameserver
// 1.创建消费者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
    // 2.指定连接nameServer
    consumer.setNamesrvAddr("172.31.184.89:9876");
  1. 订阅一个或者多个topic,这里指定消费base_topic,不做过滤。
// 3.订阅一个或者多个topic,这里指定消费base_topic,不做过滤
consumer.subscribe("base_topic", "*");
  1. 创建一个回调函数&处理消息
// 4.创建一个回调函数
  consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
      // 5.处理消息
      for (MessageExt msg : msgs) {
        System.out.println(msg);
        System.out.println("收到的消息内容:" + new String(msg.getBody()));
      }
      // 返回消费成功的对象
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
  1. 创建一个回调监听函数,它是一个长轮询,当有消息产生时,它会监听到并进行消费(ps: broker会把消息推送给消费者)。
  2. 启动消费者
// 6.启动消费者
    consumer.start();
    System.out.println("消费者已经启动");
  1. 运行结果(部分截图):

4. 异步消息

异步消息与同步消息的区别就是异步消息不需要等待集群返回发送成功的标识,即可发送下一条消息。主要是发送消息阶段有区别。其他的与同步消息相同。

// 异步消息发送失败重试次数
    defaultMQProducer.setRetryTimesWhenSendAsyncFailed(0);
    CountDownLatch2 countDownLatch2 = new CountDownLatch2(100);
    // 4.创建消息
    for (int i = 0; i < 100; i++) {
      // 创建消息,指定topic,以及消息体
      Message message = new Message("base_topic", "TagA", "feige", ("飞哥异步消息测试" + i).getBytes());
      // 5.发送消息
      int index = i;
      defaultMQProducer.send(message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
          countDownLatch2.countDown();
          System.out.printf("%-10d ok,%s,%n", index,sendResult.getMsgId());
        }
        @Override
        public void onException(Throwable e) {
          countDownLatch2.countDown();
          System.out.printf("%-10d fail,%s,%n", index, e);
          e.printStackTrace();
        }
      });
    }
    System.out.println("=====================");
    countDownLatch2.await(10, TimeUnit.SECONDS);

异步消息在调用send方法的时候,需要实现SendCallback 接口。此函数有 onSuccess 方法和onException  方法。onSuccess 方法在消息发送成功的时候会被集群调用,而onException方法则是在消息发送失败的时候被调用。

5. 单向消息

单向消息只管发送不管接收。

//4.创建消息
    for (int i = 0; i < 100; i++) {
      // 创建消息,指定topic,以及消息体
      Message message = new Message("base_topic", ("飞哥同步消息测试:" + i).getBytes());
      //5.发送消息
        defaultMQProducer.sendOneway(message);
    }

6. 总结

本文详细介绍了简单消息里的同步消息,异步消息和单向消息。他们的区别主要是生产者发布消息时的区别。另外,简单消息的消费是没有顺序的。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 人工智能 移动开发
阿里云Rocket MQ PHP Http SDK发送消息示例Demo
消息队列 RocketMQ 版支持 RESTful 风格的 HTTP 协议通信,并提供了以下 7 种语言的 SDK,下面以最新的PHP Http SDK为范例介绍RocketMQ消息的发送。
2947 0
阿里云Rocket MQ PHP Http SDK发送消息示例Demo
|
5月前
|
消息中间件 SQL RocketMQ
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
91 1
|
7月前
|
消息中间件 Java API
RabbitMQ入门指南(三):Java入门示例
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了AMQP、Spring AMQP和使用SpringAMQP实现对RabbitMQ的消息收发等内容。
105 0
RabbitMQ入门指南(三):Java入门示例
|
消息中间件 Java 数据库
RabbitMq消息防丢失(含springboot代码示例)
1.概述 1.1.数据丢失的原因 在消息中有三种可能性造成数据丢失: 消费者消费消息失败 生产者生产消息失败 MQ数据丢失 消费者消费消息失败:
445 1
|
算法 Linux 网络安全
设备端mqtt sign(签名)原理以及实践示例
设备端mqtt sign(签名)原理以及实践示例
设备端mqtt sign(签名)原理以及实践示例
|
消息中间件 JSON Java
在开发环境下,基于Springboot的RocketMQ示例(含安装步骤、错误分析)
在看这文章之前建议先看看先前架构原理介绍文章
574 0
在开发环境下,基于Springboot的RocketMQ示例(含安装步骤、错误分析)
|
移动开发 JavaScript 前端开发
亚马逊AWS Kinesis Video Streams with IOT mqtt的demo示例
AWS IoT Device SDK for Embedded C通常面向需要优化的 C 语言运行时的资源受限设备。您可以在任何操作系统上使用此软件开发工具包,并将其托管在任何类型的处理器(例如 MCU 和 MPU)上。如果您有更多的可用内存和处理资源,我们建议您使用更高级的 AWS IoT 设备和移动开发工具包之一(例如,C++、Java、JavaScript 和 Python)。
225 0
亚马逊AWS Kinesis Video Streams with IOT mqtt的demo示例
|
消息中间件 Java 开发工具
阿里云Rocket MQ Java Http SDK发送消费消息示例Demo
消息队列 RocketMQ 版支持 RESTful 风格的 HTTP 协议通信,并提供了以下 7 种语言的 SDK,下面以最新的Java Http SDK为范例介绍RocketMQ消息的发送和接收。
4203 0
阿里云Rocket MQ Java Http SDK发送消费消息示例Demo
|
消息中间件 监控 Go
阿里云Rabbitmq Go SDK使用示例
本文主要演示如何使用开源Go SDK连接阿里云Rabbitmq。
788 0
阿里云Rabbitmq Go SDK使用示例
|
消息中间件 网络协议 RocketMQ
RocketMQ阿里云跨账户授权访问示例
使用企业A的阿里云账号(主账号)创建RAM角色并为该角色授权,并将该角色赋予企业B,即可实现使用企业B的RAM用户(子账号)访问企业A的阿里云资源的目的。本文主要演示相关策略的配置以及Code实现。
1589 0
RocketMQ阿里云跨账户授权访问示例
下一篇
DataWorks