RocketMQ源码解析:手把手教老婆看懂DefaultMQProducer

简介: RocketMQ源码解析:手把手教老婆看懂DefaultMQProducer

image.pngRocketMQ使用教程相关系列 目录


DefaultMQProducer

类简介

public class DefaultMQProducer extends ClientConfig implements MQProducer


DefaultMQProducer类是应用用来投递消息的入口,开箱即用,可通过无参构造方法快速创建一个生产者。主要负责消息的发送,支持同步/异步/oneway的发送方式,这些发送方式均支持批量发送。可以通过该类提供的getter/setter方法,调整发送者的参数。DefaultMQProducer提供了多个send方法,每个send方法略有不同,在使用前务必详细了解其意图。

DefaultMQProducer的用法,传送门:

第三章:手把手教老婆实现:普通消息(同步,异步和单向)的生产者和消费者

public class Producer {
    public static void main(String[] args) throws MQClientException {
        // 创建指定分组名的生产者
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        // 启动生产者
        producer.start();
        for (int i = 0; i < 128; i++)
            try {
              // 构建消息
                Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 同步发送
                SendResult sendResult = producer.send(msg);
                // 打印发送结果
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        producer.shutdown();
    }
}

image.png

使用方法摘要image.png字段详细信息

producerGroup


private String producerGroup


生产者的分组名称。相同的分组名称表明生产者实例在概念上归属于同一分组。这对事务消息十分重要,如果原始生产者在事务之后崩溃,那么broker可以联系同一生产者分组的不同生产者实例来提交或回滚事务。


默认值:DEFAULT_PRODUCER


注意: 由数字、字母、下划线、横杠(-)、竖线(|)或百分号组成;不能为空;长度不能超过255。


defaultMQProducerImpl


protected final transient DefaultMQProducerImpl defaultMQProducerImpl


生产者的内部默认实现,在构造生产者时内部自动初始化,提供了大部分方法的内部实现。


createTopicKey


private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC


在发送消息时,自动创建服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。


默认值:TBW102


建议:测试或者demo使用,生产环境下不建议打开自动创建配置。


defaultTopicQueueNums


private volatile int defaultTopicQueueNums = 4


创建topic时默认的队列数量。


默认值:4


sendMsgTimeout


private int sendMsgTimeout = 3000


发送消息时的超时时间。


默认值:3000,单位:毫秒


建议:不建议修改该值,该值应该与broker配置中的sendTimeout一致,发送超时,可临时修改该值,建议解决超时问题,提高broker集群的Tps。


compressMsgBodyOverHowmuch


private int compressMsgBodyOverHowmuch = 1024 * 4


压缩消息体阈值。大于4K的消息体将默认进行压缩。


默认值:1024 * 4,单位:字节


建议:可通过DefaultMQProducerImpl.setZipCompressLevel方法设置压缩率(默认为5,可选范围[0,9]);可通过DefaultMQProducerImpl.tryToCompressMessage方法测试出compressLevel与compressMsgBodyOverHowmuch最佳值。


retryTimesWhenSendFailed


private int retryTimesWhenSendFailed = 2


同步模式下,在返回发送失败之前,内部尝试重新发送消息的最大次数。


默认值:2,即:默认情况下一条消息最多会被投递3次。


注意:在极端情况下,这可能会导致消息的重复。


retryTimesWhenSendAsyncFailed


private int retryTimesWhenSendAsyncFailed = 2


异步模式下,在发送失败之前,内部尝试重新发送消息的最大次数。


默认值:2,即:默认情况下一条消息最多会被投递3次。


注意:在极端情况下,这可能会导致消息的重复。


retryAnotherBrokerWhenNotStoreOK


private boolean retryAnotherBrokerWhenNotStoreOK = false


同步模式下,消息保存失败时是否重试其他broker。


默认值:false


注意:此配置关闭时,非投递时产生异常情况下,会忽略retryTimesWhenSendFailed配置。


maxMessageSize


private int maxMessageSize = 1024 * 1024 * 4


消息的最大大小。当消息题的字节数超过maxMessageSize就发送失败。


默认值:1024 * 1024 * 4,单位:字节


traceDispatcher


private TraceDispatcher traceDispatcher = null


在开启消息追踪后,该类通过hook的方式把消息生产者,消息存储的broker和消费者消费消息的信息像链路一样记录下来。在构造生产者时根据构造入参enableMsgTrace来决定是否创建该对象。


构造方法详细信息

DefaultMQProducer


public DefaultMQProducer()


创建一个新的生产者。


DefaultMQProducer


DefaultMQProducer(final String producerGroup)


使用指定的分组名创建一个生产者。

image.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.png

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 Java Apache
RocketMQ消息回溯实践与解析
在分布式系统和高并发应用的开发中,消息队列扮演着至关重要的角色,而RocketMQ作为阿里巴巴开源的一款高性能消息中间件,以其高吞吐量、高可用性和灵活的配置能力,在业界得到了广泛应用。本文将围绕RocketMQ的消息回溯功能进行实践与解析,分享工作学习中的技术干货。
363 4
|
消息中间件 存储 监控
深度写作:深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(Message Queue, MQ)扮演着至关重要的角色。MQ不仅实现了应用间的解耦,还提供了异步消息处理、流量削峰等功能。而在MQ的众多特性中,长轮询(Long Polling)机制因其能有效提升消息处理的实时性和效率,备受关注。
544 12
|
消息中间件 存储 监控
RocketMQ消息重试机制解析!
RocketMQ消息重试机制解析!
1298 1
RocketMQ消息重试机制解析!
|
消息中间件 存储 Java
RocketMQ文件刷盘机制深度解析与Java模拟实现
【11月更文挑战第22天】在现代分布式系统中,消息队列(Message Queue, MQ)作为一种重要的中间件,扮演着连接不同服务、实现异步通信和消息解耦的关键角色。Apache RocketMQ作为一款高性能的分布式消息中间件,广泛应用于实时数据流处理、日志流处理等场景。为了保证消息的可靠性,RocketMQ引入了一种称为“刷盘”的机制,将消息从内存写入到磁盘中,确保消息持久化。本文将从底层原理、业务场景、概念、功能点等方面深入解析RocketMQ的文件刷盘机制,并使用Java模拟实现类似的功能。
385 3
|
消息中间件 存储 Java
深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(MQ)作为一种重要的中间件,广泛应用于解耦、异步处理、流量削峰等场景。其中,延时消息和定时消息作为MQ的高级功能,能够进一步满足复杂的业务需求。为了实现这些功能,MQ系统需要进行一系列优化,长轮询机制便是其中的关键一环。本文将深入探讨MQ如何设计延时消息和定时消息的优化机制,特别是长轮询机制的实现原理及其在Java中的模拟实现。
284 2
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
466 2
|
传感器 数据可视化 网络协议
DIY可视化整合MQTT生成UniApp源码
DIY可视化整合MQTT生成UniApp源码
296 0
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
439 0
|
12月前
|
算法 测试技术 C语言
深入理解HTTP/2:nghttp2库源码解析及客户端实现示例
通过解析nghttp2库的源码和实现一个简单的HTTP/2客户端示例,本文详细介绍了HTTP/2的关键特性和nghttp2的核心实现。了解这些内容可以帮助开发者更好地理解HTTP/2协议,提高Web应用的性能和用户体验。对于实际开发中的应用,可以根据需要进一步优化和扩展代码,以满足具体需求。
1124 29
|
12月前
|
前端开发 数据安全/隐私保护 CDN
二次元聚合短视频解析去水印系统源码
二次元聚合短视频解析去水印系统源码
470 4

推荐镜像

更多
  • DNS