RocketMQ使用教程相关系列 目录
DefaultMQProducer
类简介
public class DefaultMQProducer extends ClientConfig implements MQProducer
DefaultMQProducer类是应用用来投递消息的入口,开箱即用,可通过无参构造方法快速创建一个生产者。主要负责消息的发送,支持同步/异步/oneway的发送方式,这些发送方式均支持批量发送。可以通过该类提供的getter/setter方法,调整发送者的参数。DefaultMQProducer提供了多个send方法,每个send方法略有不同,在使用前务必详细了解其意图。
DefaultMQProducer的用法,传送门:
第三章:手把手教老婆实现:普通消息(同步,异步和单向)的生产者和消费者
public class Producer { public static void main(String[] args) throws MQClientException { // 创建指定分组名的生产者 DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); // 启动生产者 producer.start(); for (int i = 0; i < 128; i++) try { // 构建消息 Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 同步发送 SendResult sendResult = producer.send(msg); // 打印发送结果 System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
使用方法摘要字段详细信息
producerGroup
private String producerGroup
生产者的分组名称。相同的分组名称表明生产者实例在概念上归属于同一分组。这对事务消息十分重要,如果原始生产者在事务之后崩溃,那么broker可以联系同一生产者分组的不同生产者实例来提交或回滚事务。
默认值:DEFAULT_PRODUCER
注意: 由数字、字母、下划线、横杠(-)、竖线(|)或百分号组成;不能为空;长度不能超过255。
defaultMQProducerImpl
protected final transient DefaultMQProducerImpl defaultMQProducerImpl
生产者的内部默认实现,在构造生产者时内部自动初始化,提供了大部分方法的内部实现。
createTopicKey
private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
在发送消息时,自动创建服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。
默认值:TBW102
建议:测试或者demo使用,生产环境下不建议打开自动创建配置。
defaultTopicQueueNums
private volatile int defaultTopicQueueNums = 4
创建topic时默认的队列数量。
默认值:4
sendMsgTimeout
private int sendMsgTimeout = 3000
发送消息时的超时时间。
默认值:3000,单位:毫秒
建议:不建议修改该值,该值应该与broker配置中的sendTimeout一致,发送超时,可临时修改该值,建议解决超时问题,提高broker集群的Tps。
compressMsgBodyOverHowmuch
private int compressMsgBodyOverHowmuch = 1024 * 4
压缩消息体阈值。大于4K的消息体将默认进行压缩。
默认值:1024 * 4,单位:字节
建议:可通过DefaultMQProducerImpl.setZipCompressLevel方法设置压缩率(默认为5,可选范围[0,9]);可通过DefaultMQProducerImpl.tryToCompressMessage方法测试出compressLevel与compressMsgBodyOverHowmuch最佳值。
retryTimesWhenSendFailed
private int retryTimesWhenSendFailed = 2
同步模式下,在返回发送失败之前,内部尝试重新发送消息的最大次数。
默认值:2,即:默认情况下一条消息最多会被投递3次。
注意:在极端情况下,这可能会导致消息的重复。
retryTimesWhenSendAsyncFailed
private int retryTimesWhenSendAsyncFailed = 2
异步模式下,在发送失败之前,内部尝试重新发送消息的最大次数。
默认值:2,即:默认情况下一条消息最多会被投递3次。
注意:在极端情况下,这可能会导致消息的重复。
retryAnotherBrokerWhenNotStoreOK
private boolean retryAnotherBrokerWhenNotStoreOK = false
同步模式下,消息保存失败时是否重试其他broker。
默认值:false
注意:此配置关闭时,非投递时产生异常情况下,会忽略retryTimesWhenSendFailed配置。
maxMessageSize
private int maxMessageSize = 1024 * 1024 * 4
消息的最大大小。当消息题的字节数超过maxMessageSize就发送失败。
默认值:1024 * 1024 * 4,单位:字节
traceDispatcher
private TraceDispatcher traceDispatcher = null
在开启消息追踪后,该类通过hook的方式把消息生产者,消息存储的broker和消费者消费消息的信息像链路一样记录下来。在构造生产者时根据构造入参enableMsgTrace来决定是否创建该对象。
构造方法详细信息
DefaultMQProducer
public DefaultMQProducer()
创建一个新的生产者。
DefaultMQProducer
DefaultMQProducer(final String producerGroup)
使用指定的分组名创建一个生产者。