RocketMQ使用总结

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: RocketMQ使用总结

 

Topic可以理解为在rocketMq体系当中作为一个逻辑消息组织形式,一般情况下一类业务消息会申请一个topic来实现业务之间隔离。

Topic是一个逻辑上的概念,实际上在每个broker上以queue的形式保存,也就是说每个topic在broker上会划分成几个逻辑队列,每个逻辑队列保存一部分消息数据,但是保存的消息数据实际上不是真正的消息数据,而是指向commit log的消息索引。

   Topic创建的时候可以用集群模式去创建(这样集群里面每个broker的queue的数量相同),也可以用单个broker模式去创建(这样每个broker的queue数量可以不一致)。

 

Queue是Topic在一个Broker上的分片等分为指定份数后的其中一份,是负载均衡过程中资源分配的基本单元。

概念:

Producer:消息生产者,负责产生消息,一般由业务系统负责产生消息

Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费

Topic:消息主题,负责标记一类消息,生产者将消息发送到Topic,消费者从该Topic消费消息

Broker:消息中转角色,负责存储消息,转发消息,一般也称为 Server,在 JMS 规范中称为 Provider

NameServer:服务发现Server,用于生产者和消费者获取Broker的服务;

 

 

 

Rocketmq模块划分:

 

特性:

Producer端:

发送方式:

Sync:同步的发送方式,会等待发送结果后才返回

Async:异步的发送方式,发送完后,立刻返回。Client 在拿到 Broker 的响应结果后,会回调指定的 callback. 这个 API 也可以指定 Timeout,不指定也是默认的 3000ms.

Oneway:比较简单,发出去后,什么都不管直接返回。Ps:日志

 

普通消息的发送:

  1. 准备工作 mesasge、网络相关、线程相关
  2. 从namesrv获取topic路由(缓存机制)
  3. 组装数据,broker需要的序列化数据(json)
  4. Netty发送(源码)

定时消息

 定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。

顺序消息

事务消息

MQ应用场景

异步处理,应用解耦,流量削锋和日志处理,消息通讯5个场景

启动 name server(namesrv启动之后的默认端口号是9876):

nohup sh bin/mqnamesrv &

看日志:

tail -f ~/logs/rocketmqlogs/namesrv.log

关闭 name server:

sh bin/mqshutdown namesrv

调整启动内存等

vim /data/backup/rocketmq-all-4.4.0-bin-release/bin/runserver.sh

这里把Broker跟Namesrv装在一个服务器上面,使用的Broker配置文件是自带的2m-noslave/broker-a.properties。

注意这个配置文件里面没有属性brokerIP1,他默认取本机IP,如果你服务器的网卡设置过于复杂,他会取的IP报错,后续就连不上这个Broker,建议大家手动修改这个IP地址。

brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
brokerIP1=172.27.0.8

启动 broker:

sh mqbroker -n 172.27.0.8:9876 -c ../conf/2m-noslave/broker-a.properties &

-n后面代表的是namesrv的地址和端口

-c后面代表的是broker的配置文件地址

看日志:

tail -f ~/logs/rocketmqlogs/broker.log

关闭 broker:

sh bin/mqshutdown broker

调整启动内存等

vim /data/backup/rocketmq-all-4.4.0-bin-release/bin/runbroker.sh
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

rocketmq 默认会开启10909,10911

1. 目前这种写法Rocket默认开启了VIP通道,VIP通道端口为10911-2=10909。若Rocket服务器未启动端口10909,则报connect to <> failed。

2. 解决方式:增加一行代码producer.setVipChannelEnabled(false);

package com.rokcetmq;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        //声明并初始化一个producer
        //需要一个producer group名字作为构造方法的参数,这里为producer1
        DefaultMQProducer producer = new DefaultMQProducer("producer1");
        //设置NameServer地址,此处应改为实际NameServer地址,多个地址之间用;分隔
        //NameServer的地址必须有,但是也可以通过环境变量的方式设置,不一定非得写死在代码里
        producer.setNamesrvAddr("132.232.85.11:9876");
      //  producer.setVipChannelEnabled(false);
        //调用start()方法启动一个producer实例
        producer.start();
        //发送10条消息到Topic为TopicTest,tag为TagA,消息内容为“Hello RocketMQ”拼接上i的值
        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("TopicTest",// topic
                        "TagA",// tag
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
                );
                //调用producer的send()方法发送消息
                //这里调用的是同步的方式,所以会有返回结果
                SendResult sendResult = producer.send(msg);
                //打印返回结果,可以看到消息发送的状态以及一些相关信息
                System.out.println(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        //发送完消息之后,调用shutdown()方法关闭producer
        producer.shutdown();
    }
}

Consumer

package com.rokcetmq;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        //声明并初始化一个consumer
        //需要一个consumer group名字作为构造方法的参数,这里为consumer1
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
        //同样也要设置NameServer地址
        consumer.setNamesrvAddr("132.232.85.11:9876");
        //这里设置的是一个consumer的消费策略
        //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
        //CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
        //CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //设置consumer所订阅的Topic和Tag,*代表全部的Tag
        consumer.subscribe("TopicTest", "TagA");
        //设置一个Listener,主要进行消息的逻辑处理
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
              for(MessageExt e:msgs){
                  System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs  +":"+new String(e.getBody()));
              }
                //返回消费状态
                //CONSUME_SUCCESS 消费成功
                //RECONSUME_LATER 消费失败,需要稍后重新消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //调用start()方法启动consumer
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

 

消息队列使用的四种场景介绍

RocketMQ概念模型

Kafka、RabbitMQ、RocketMQ等消息中间件的对比

RocketMQ nameserver、broker、生产者和消费者之间的关系

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 存储 缓存
RocketMQ在项目中的使用总结
RocketMQ在项目中如何使用,什么场景下需要用到RocketMQ以及使用RocketMQ如何保证消息的准备性,都需要我们思考?
RocketMQ在项目中的使用总结
|
5月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
107 6
|
2月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
97 11
|
1月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
2月前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
2月前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
2月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
76 4
|
3月前
|
消息中间件 运维 监控
云消息队列RabbitMQ实践解决方案评测报告
本报告旨在对《云消息队列RabbitMQ实践》解决方案进行综合评测。通过对该方案的原理理解、部署体验、设计验证以及实际应用价值等方面进行全面分析,为用户提供详尽的反馈与建议。
93 16
|
3月前
|
消息中间件 弹性计算 运维
阿里云云消息队列RabbitMQ实践解决方案评测报告
阿里云云消息队列RabbitMQ实践解决方案评测报告
82 9