三.RocketMQ极简入门-RocketMQ普通消息发送

简介: RocketMQ极简入门-RocketMQ普通消息发送

前言

RocketMQ已经写了两章了,一章是RocketMQ认识和安装,一章是RocketMQ的工作流程和核心概念,本章我们开始使用RocketMQ来发送和接收消息。RocketMQ的消息种类非常多,比如:普通消息,顺序消息,延迟消息,批量发送,消息过滤等等。本篇文章来探讨一下 普通消息的发送

普通消息发送

普通消息这里介绍三种发送方式,同步发送,异步发送,单向发送。我们先导入需要的依赖,版本尽量和RocketMQ的安装版本一致。

<dependencies>
     <dependency>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-client</artifactId>
         <version>4.8.0</version>
     </dependency>
 </dependencies>

【注意】请保持 RocketMQ和Name Server 是启动状态,见RocketMQ安装》`

同步发送

同步消息是发送者发送消息,需要等待结果的返回,才能继续发送第二条消息,这是一种阻塞式模型,虽然消息可靠性高,但是阻塞导致性能低下。API : SendResult result = producer.send(message); 发送者代码示例:

public class Producer {
   

    //演示消息同步发送
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
   
        //生产者
        DefaultMQProducer producer = new DefaultMQProducer("syn-producerGroup");
        //设置name server地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //设置队列数量为2,默认为4,根据情况设置
        producer.setDefaultTopicQueueNums(2); 
        //启动
        producer.start();
        //发16个消息
        for (int i = 0 ; i < 16 ; i++){
   

            Message message = new Message();
            //消息主题
            message.setTopic("syn-topic");
            //消息标签
            message.setTags("sms");
            //添加内容
            message.setBody((i+"我是消息").getBytes(CharsetUtil.UTF_8));
            //执行发送
            SendResult result = producer.send(message);
            //打印结果
            System.out.println(result);
        }

        producer.shutdown();
    } 
 }

解释一下其中的相关类

  • DefaultMQProducer :生产者组,需要指定一个组名
  • producer.setNamesrvAddr("127.0.0.1:9876") :Name Server 的地址,生产者通过它来找到Bocker中的Topic(Topic默认可以自动创建的)
  • producer.setDefaultTopicQueueNums(2) :设置Topic中的队列的数量,默认是4个
  • producer.start() :启动 ,如果不启动程序不起作用
  • new Message :消息的对象封装,通过它设置 topic 主题,tags 标签,body 消息内容
  • SendResult result = producer.send(message) : 同步发送消息,可以立马得到返回结果,成功或者失败
  • producer.shutdown() : 关闭生产者

下面是 SendResult的结构

SendResult [
    sendStatus=SEND_OK,         
    msgId=C0A8006516B018B4AAC270EF9D940000,
    offsetMsgId=C0A8006500002A9F0000000000008E1C, 
    messageQueue=MessageQueue [
        topic=syn-topic, 
        brokerName=LAPTOP-20VLGCRC, 
        queueId=3
    ], 
    queueOffset=0
]
  • SendStatus : 状态OK
  • msgId: 发送者生成的ID
  • OffsetMsgId : 由Broker生成的消息ID
  • MessageQueue :队列信息

自行观察MQ的可视化插件界面,应该可以看到发送者发送过去的消息了。

异步发送

异步消息是发送者发送消息,无需等待发送结果就可以再发送第二条消息,它是通过回调的方式来获取到消息的发送结果,消息可靠性高,性能也高。API : producer.send(message,SendCallback) 示例代码:


//. . .上面案例一样,部分代码省略. . .

producer.send(
        //创建消息对象
        new Message("asyn-topic", "sms", "我是消息".getBytes(CharsetUtil.UTF_8)),
        //添加发送回调
        new SendCallback() {
   

            //发送成功结果处理
            @Override
            public void onSuccess(SendResult sendResult) {
   
                System.out.println(sendResult);
            }
            //发送异常结果处理
            @Override
            public void onException(Throwable throwable) {
   
                System.out.println("发送异常:"+throwable.getMessage());
            }
        }
);

对于异步发送而言,发送完消息不会马上返回结果,也无需等待结果返回就能继续发送第二条消息。它通过 producer.send(message,SendCallback) -> SendCallback 回调来接收发送的结果,回调中包括了:onSuccess 和 onException两个回调方法来表示成功和失败。

单向发送

这种方式指的是发送者发送消息后无需等待Broker的结果返回,Broker也不会返回结果,消息是单向的,该方式性能最高,但是消息可靠性低。API : producer.sendOneway(message) 示例代码:

//. . .上面案例一样,部分代码省略. . .
Message message = new Message("oneway-topic", "sms", "我是消息".getBytes(CharsetUtil.UTF_8));
producer.sendOneway(message);

通过 :producer.sendOneway(message)来发送消息是没有返回结果的,也无需任何等待,性能是最高的,但是数据的安全性最低,所以对于一些可被丢失的消息,比如:操作日志等就可以使用这种模式了。

消费者案例

下面是消费者端的代码

public class Consumer {
   
    public static void main(String[] args) throws MQClientException {
   
        //创建消费者
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("syn-consumerGroup");
        //设置name server 地址
        defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
        //从开始位置消费
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //广播模式
        defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
        //订阅
        defaultMQPushConsumer.subscribe("syn-topic","sms");
        //注册消息监听器
        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
   
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
   

                list.forEach(message->{
   
                    System.out.println(new String(message.getBody(), CharsetUtil.UTF_8));
                });

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        defaultMQPushConsumer.start();
    }
}

解释一下相关的类

  • DefaultMQPushConsumer : 消费者组,基于push模式
  • defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876") : name server地址
  • defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET):从哪个位置开始消费,FIRST代表最前面
  • defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING) : 消息的消费模式广播模式,默认是 MessageModel.CLUSTERING 集群 模式。
  • defaultMQPushConsumer.subscribe("syn-topic","sms") :订阅哪个Topic中的哪个Tags中的消息
  • defaultMQPushConsumer.registerMessageListener :注册消息监听并处理消息,通常支持 MessageListenerConcurrently 并发和 MessageListenerOrderly 顺序 两种监听器。当监听器监听到消息,通过回调监听器中的 consumeMessage 方法来传递和处理消息。

    1. List<MessageExt> list : 消息列表 MessageExt中包含了消息的Body,消息的storeSzie,queueId等信息
    2. ConsumeConcurrentlyContext : 消费者上下文
    3. ConsumeConcurrentlyStatus :消息应答(签收),包括 CONSUME_SUCCESS 消费成功和 RECONSUME_LATER 消费失败两种结果。
  • defaultMQPushConsumer.start() :启动消费者,这个代码要写在注册了监听器的后面。

总结

下面对三种发送方式做一个对比

  • 可靠性最高: 同步发送 > 异步发送 > 单向发送
  • 性能最高:单向发送 > 异步发送 > 同步发送

使用场景建议如下

  • 如果是比较重要的不可丢失的消息,且对时效性要去不高建议使用同步发送,如转账消息
  • 如果是不重要的可失败的消息,比如日志消息,建议使用单向发送
  • 如果对时效性要求比较高,且消息不能丢失,可以尝试使用异步发送
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
4383 9
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
消息中间件 新零售 弹性计算
云消息队列 RabbitMQ 版入门训练营,解锁对比开源优势与零基础实战
欢迎加入「云消息队列 RabbitMQ 版入门训练营」。
295 93
EMQ
|
安全 网络性能优化
MQTT 5.0 报文(Packets)入门指南
MQTT 控制报文是 MQTT 数据传输的最小单元。MQTT 客户端和服务端通过交换控制报文来完成它们的工作,比如订阅主题和发布消息。
EMQ
1199 102
MQTT 5.0 报文(Packets)入门指南
|
消息中间件 Java Kafka
RabbitMQ 入门
RabbitMQ 入门
239 0
|
消息中间件 存储 Java
分享一下rocketmq入门小知识
分享一下rocketmq入门小知识
305 0
分享一下rocketmq入门小知识
|
网络协议 物联网 测试技术
App Inventor 2 MQTT拓展入门(保姆级教程)
本文演示的是App和一个测试客户端进行消息交互的案例,实际应用中,我们的测试客户端可以看着是任意的、支持MQTT协议的硬件,通过订阅及发布消息,联网硬件与我们的App进行双向数据通信,以实现万物互联的智能控制效果。
970 2
|
消息中间件 监控 Linux
RabbitMQ轻松入门:从零开始的部署与安装指南
RabbitMQ轻松入门:从零开始的部署与安装指南
350 0
RabbitMQ轻松入门:从零开始的部署与安装指南
|
消息中间件 Docker 微服务
RabbitMQ入门指南(十一):延迟消息-延迟消息插件
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了DelayExchange插件、延迟消息插件实现延迟消息等内容。
2410 0
|
消息中间件 存储 Kafka
01.RabbitMQ入门
01.RabbitMQ入门
166 0
|
消息中间件 微服务
RabbitMQ入门指南(十):延迟消息-死信交换机
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了死信交换机、死信交换机实现延迟消息等内容。
496 0