RocketMQ源码解析:手把手教老婆看懂DefaultMQProducer

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: RocketMQ源码解析:手把手教老婆看懂DefaultMQProducer

image.pngRocketMQ使用教程相关系列 目录


DefaultMQProducer

类简介

public class DefaultMQProducer extends ClientConfig implements MQProducer


DefaultMQProducer类是应用用来投递消息的入口,开箱即用,可通过无参构造方法快速创建一个生产者。主要负责消息的发送,支持同步/异步/oneway的发送方式,这些发送方式均支持批量发送。可以通过该类提供的getter/setter方法,调整发送者的参数。DefaultMQProducer提供了多个send方法,每个send方法略有不同,在使用前务必详细了解其意图。

DefaultMQProducer的用法,传送门:

第三章:手把手教老婆实现:普通消息(同步,异步和单向)的生产者和消费者

public class Producer {
    public static void main(String[] args) throws MQClientException {
        // 创建指定分组名的生产者
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        // 启动生产者
        producer.start();
        for (int i = 0; i < 128; i++)
            try {
              // 构建消息
                Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 同步发送
                SendResult sendResult = producer.send(msg);
                // 打印发送结果
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        producer.shutdown();
    }
}

image.png

使用方法摘要image.png字段详细信息

producerGroup


private String producerGroup


生产者的分组名称。相同的分组名称表明生产者实例在概念上归属于同一分组。这对事务消息十分重要,如果原始生产者在事务之后崩溃,那么broker可以联系同一生产者分组的不同生产者实例来提交或回滚事务。


默认值:DEFAULT_PRODUCER


注意: 由数字、字母、下划线、横杠(-)、竖线(|)或百分号组成;不能为空;长度不能超过255。


defaultMQProducerImpl


protected final transient DefaultMQProducerImpl defaultMQProducerImpl


生产者的内部默认实现,在构造生产者时内部自动初始化,提供了大部分方法的内部实现。


createTopicKey


private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC


在发送消息时,自动创建服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。


默认值:TBW102


建议:测试或者demo使用,生产环境下不建议打开自动创建配置。


defaultTopicQueueNums


private volatile int defaultTopicQueueNums = 4


创建topic时默认的队列数量。


默认值:4


sendMsgTimeout


private int sendMsgTimeout = 3000


发送消息时的超时时间。


默认值:3000,单位:毫秒


建议:不建议修改该值,该值应该与broker配置中的sendTimeout一致,发送超时,可临时修改该值,建议解决超时问题,提高broker集群的Tps。


compressMsgBodyOverHowmuch


private int compressMsgBodyOverHowmuch = 1024 * 4


压缩消息体阈值。大于4K的消息体将默认进行压缩。


默认值:1024 * 4,单位:字节


建议:可通过DefaultMQProducerImpl.setZipCompressLevel方法设置压缩率(默认为5,可选范围[0,9]);可通过DefaultMQProducerImpl.tryToCompressMessage方法测试出compressLevel与compressMsgBodyOverHowmuch最佳值。


retryTimesWhenSendFailed


private int retryTimesWhenSendFailed = 2


同步模式下,在返回发送失败之前,内部尝试重新发送消息的最大次数。


默认值:2,即:默认情况下一条消息最多会被投递3次。


注意:在极端情况下,这可能会导致消息的重复。


retryTimesWhenSendAsyncFailed


private int retryTimesWhenSendAsyncFailed = 2


异步模式下,在发送失败之前,内部尝试重新发送消息的最大次数。


默认值:2,即:默认情况下一条消息最多会被投递3次。


注意:在极端情况下,这可能会导致消息的重复。


retryAnotherBrokerWhenNotStoreOK


private boolean retryAnotherBrokerWhenNotStoreOK = false


同步模式下,消息保存失败时是否重试其他broker。


默认值:false


注意:此配置关闭时,非投递时产生异常情况下,会忽略retryTimesWhenSendFailed配置。


maxMessageSize


private int maxMessageSize = 1024 * 1024 * 4


消息的最大大小。当消息题的字节数超过maxMessageSize就发送失败。


默认值:1024 * 1024 * 4,单位:字节


traceDispatcher


private TraceDispatcher traceDispatcher = null


在开启消息追踪后,该类通过hook的方式把消息生产者,消息存储的broker和消费者消费消息的信息像链路一样记录下来。在构造生产者时根据构造入参enableMsgTrace来决定是否创建该对象。


构造方法详细信息

DefaultMQProducer


public DefaultMQProducer()


创建一个新的生产者。


DefaultMQProducer


DefaultMQProducer(final String producerGroup)


使用指定的分组名创建一个生产者。

image.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.pngimage.png

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
EMQ
|
4月前
|
Linux 网络性能优化
MQTT 5.0 报文解析 03:SUBSCRIBE 与 UNSUBSCRIBE
在 MQTT 中,SUBSCRIBE 报文用于发起订阅请求,SUBACK 报文用于返回订阅结果。而 UNSUBSCRIBE 和 UNSUBACK 报文则在取消订阅时使用。相比于取消订阅,订阅操作更加常用。不过在本文中,我们仍然会一并介绍订阅与取消订阅报文的结构与组成。
EMQ
304 7
MQTT 5.0 报文解析 03:SUBSCRIBE 与 UNSUBSCRIBE
EMQ
|
4月前
|
运维 Linux 网络性能优化
MQTT 5.0 报文解析 05:DISCONNECT
在 MQTT 中,客户端和服务端可以在断开网络连接前向对端发送一个 DISCONNECT 报文,来指示连接关闭的原因。客户端发送的 DISCONNECT 报文还可以影响服务端在连接断开后的行为,例如是否发送遗嘱消息,是否更新会话过期间隔。
EMQ
73 0
MQTT 5.0 报文解析 05:DISCONNECT
|
19天前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
36 2
|
14天前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
31 0
EMQ
|
3月前
|
安全 开发工具 数据安全/隐私保护
MQTT 5.0 报文解析 06:AUTH
MQTT 5.0 引入了增强认证特性,它使 MQTT 除了简单密码认证和 Token 认证以外,还能够支持质询/响应风格的认证。为了实现这一点,它在原先 CONNECT 和 CONNACK 报文的基础上,又引入了 AUTH 报文来实现任意多次的认证数据交换,以支持各种不同类型的认证机制,例如 SCRAM、Kerberos 认证等等。
EMQ
255 5
MQTT 5.0 报文解析 06:AUTH
EMQ
|
4月前
|
JSON Linux 网络性能优化
MQTT 5.0 报文解析 02:PUBLISH 与 PUBACK
本文将介绍在 MQTT 中用于传递应用消息的 PUBLISH 报文以及它的响应报文。不管是客户端向服务端发布消息,还是服务端向订阅端转发消息,都需要使用 PUBLISH 报文。决定消息流向的主题、消息的实际内容和 QoS 等级,都包含在 PUBLISH 报文中。
EMQ
181 6
MQTT 5.0 报文解析 02:PUBLISH 与 PUBACK
|
3月前
|
消息中间件 存储 运维
RocketMQ与Kafka深度对比:特性与适用场景解析
RocketMQ与Kafka深度对比:特性与适用场景解析
EMQ
|
4月前
|
Linux 网络性能优化 数据安全/隐私保护
MQTT 5.0 报文解析 01:CONNECT 与 CONNACK
如果我们想要使用 MQTT 进行通信,第一步必然是建立一个 MQTT 连接,而建立 MQTT 连接需要用到两个控制报文,它们分别是 CONNECT 报文与 CONNACK 报文。CONNECT 报文是客户端与服务端建立网络连接后,向服务端发送的第一个控制报文,用来发起连接请求。服务端将返回 CONNACK 报文告知客户端连接结果。
EMQ
710 4
MQTT 5.0 报文解析 01:CONNECT 与 CONNACK
|
3月前
|
消息中间件 自然语言处理 负载均衡
RabbitMQ揭秘:轻量级消息队列的优缺点全解析
**RabbitMQ简介** RabbitMQ是源自电信行业的消息中间件,支持AMQP协议,提供轻量、快速且易于部署的解决方案。它拥有灵活的路由配置,广泛的语言支持,适用于异步处理、负载均衡、日志收集和微服务通信等场景。然而,当面临大量消息堆积或高吞吐量需求时,性能可能会下降,并且扩展和开发成本相对较高。
170 0
EMQ
|
4月前
|
开发工具
MQTT 5.0 报文解析 04:PINGREQ 与 PINGRESP
除了用于连接、发布和订阅的控制报文,MQTT 还有一类报文用于在客户端和服务端之间模拟心跳,以达到保持连接的目的,它们分别是 PINGREQ 报文和 PINGRESP 报文,我们通常也会称它们为心跳报文。
EMQ
191 0
MQTT 5.0 报文解析 04:PINGREQ 与 PINGRESP

热门文章

最新文章

推荐镜像

更多