RocketMQ源码解析:手把手教老婆看懂DefaultMQProducer

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: RocketMQ源码解析:手把手教老婆看懂DefaultMQProducer

image.pngRocketMQ使用教程相关系列 目录


DefaultMQProducer

类简介

public class DefaultMQProducer extends ClientConfig implements MQProducer


DefaultMQProducer类是应用用来投递消息的入口,开箱即用,可通过无参构造方法快速创建一个生产者。主要负责消息的发送,支持同步/异步/oneway的发送方式,这些发送方式均支持批量发送。可以通过该类提供的getter/setter方法,调整发送者的参数。DefaultMQProducer提供了多个send方法,每个send方法略有不同,在使用前务必详细了解其意图。

DefaultMQProducer的用法,传送门:

第三章:手把手教老婆实现:普通消息(同步,异步和单向)的生产者和消费者

public class Producer {
    public static void main(String[] args) throws MQClientException {
        // 创建指定分组名的生产者
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        // 启动生产者
        producer.start();
        for (int i = 0; i < 128; i++)
            try {
              // 构建消息
                Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 同步发送
                SendResult sendResult = producer.send(msg);
                // 打印发送结果
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        producer.shutdown();
    }
}

image.png

使用方法摘要image.png字段详细信息

producerGroup


private String producerGroup


生产者的分组名称。相同的分组名称表明生产者实例在概念上归属于同一分组。这对事务消息十分重要,如果原始生产者在事务之后崩溃,那么broker可以联系同一生产者分组的不同生产者实例来提交或回滚事务。


默认值:DEFAULT_PRODUCER


注意: 由数字、字母、下划线、横杠(-)、竖线(|)或百分号组成;不能为空;长度不能超过255。


defaultMQProducerImpl


protected final transient DefaultMQProducerImpl defaultMQProducerImpl


生产者的内部默认实现,在构造生产者时内部自动初始化,提供了大部分方法的内部实现。


createTopicKey


private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC


在发送消息时,自动创建服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。


默认值:TBW102


建议:测试或者demo使用,生产环境下不建议打开自动创建配置。


defaultTopicQueueNums


private volatile int defaultTopicQueueNums = 4


创建topic时默认的队列数量。


默认值:4


sendMsgTimeout


private int sendMsgTimeout = 3000


发送消息时的超时时间。


默认值:3000,单位:毫秒


建议:不建议修改该值,该值应该与broker配置中的sendTimeout一致,发送超时,可临时修改该值,建议解决超时问题,提高broker集群的Tps。


compressMsgBodyOverHowmuch


private int compressMsgBodyOverHowmuch = 1024 * 4


压缩消息体阈值。大于4K的消息体将默认进行压缩。


默认值:1024 * 4,单位:字节


建议:可通过DefaultMQProducerImpl.setZipCompressLevel方法设置压缩率(默认为5,可选范围[0,9]);可通过DefaultMQProducerImpl.tryToCompressMessage方法测试出compressLevel与compressMsgBodyOverHowmuch最佳值。


retryTimesWhenSendFailed


private int retryTimesWhenSendFailed = 2


同步模式下,在返回发送失败之前,内部尝试重新发送消息的最大次数。


默认值:2,即:默认情况下一条消息最多会被投递3次。


注意:在极端情况下,这可能会导致消息的重复。


retryTimesWhenSendAsyncFailed


private int retryTimesWhenSendAsyncFailed = 2


异步模式下,在发送失败之前,内部尝试重新发送消息的最大次数。


默认值:2,即:默认情况下一条消息最多会被投递3次。


注意:在极端情况下,这可能会导致消息的重复。


retryAnotherBrokerWhenNotStoreOK


private boolean retryAnotherBrokerWhenNotStoreOK = false


同步模式下,消息保存失败时是否重试其他broker。


默认值:false


注意:此配置关闭时,非投递时产生异常情况下,会忽略retryTimesWhenSendFailed配置。


maxMessageSize


private int maxMessageSize = 1024 * 1024 * 4


消息的最大大小。当消息题的字节数超过maxMessageSize就发送失败。


默认值:1024 * 1024 * 4,单位:字节


traceDispatcher


private TraceDispatcher traceDispatcher = null


在开启消息追踪后,该类通过hook的方式把消息生产者,消息存储的broker和消费者消费消息的信息像链路一样记录下来。在构造生产者时根据构造入参enableMsgTrace来决定是否创建该对象。


构造方法详细信息

DefaultMQProducer


public DefaultMQProducer()


创建一个新的生产者。


DefaultMQProducer


DefaultMQProducer(final String producerGroup)


使用指定的分组名创建一个生产者。

image.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.png

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
19天前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
39 0
|
19天前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
29 0
|
19天前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
33 0
|
17天前
|
存储
让星星⭐月亮告诉你,HashMap的put方法源码解析及其中两种会触发扩容的场景(足够详尽,有问题欢迎指正~)
`HashMap`的`put`方法通过调用`putVal`实现,主要涉及两个场景下的扩容操作:1. 初始化时,链表数组的初始容量设为16,阈值设为12;2. 当存储的元素个数超过阈值时,链表数组的容量和阈值均翻倍。`putVal`方法处理键值对的插入,包括链表和红黑树的转换,确保高效的数据存取。
39 5
|
18天前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
|
18天前
|
XML Java 数据格式
Spring底层架构源码解析(二)
Spring底层架构源码解析(二)
|
19天前
|
算法 Java 程序员
Map - TreeSet & TreeMap 源码解析
Map - TreeSet & TreeMap 源码解析
29 0
|
1天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
5天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
7天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
25 3

热门文章

最新文章

推荐镜像

更多