开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):生产者核心类介绍】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12469
生产者核心类介绍
Producer
相对于RocketMQ来讲,消息生产者就是客户端,也是消息的提供者。Producer将消息发送到broker,broker是消息的服务端,customer再去连接broker消费消息,故producer和customer都是RocketMQ的客户端。
打开原码目录(如上图),里面有一个client,消息生产者的代码都在client模块中。我们主要研究如下图的几个类:MQAdmin、MQProducer、ClientConfig和DefautMQProducer
MQAdmin是接口,MQProducer是MQAdmin的子接口,DefautMQProducer实现了MQProduce和MQAdmin里面的一些方法,配置是在ClientConfig里。
1.方法和属性
(1)主要方法介绍
①MQAdmin
//创建主题
void createTopic(final string key,final string newTopic,final int queueNum) throwsMQclientException;
//根据时间戳从队列中查找消息偏移量
1ong searchoffset(final MessageQueue mq,final long timestamp)
//查找消息队列中最大的偏移量
long maxoffset (final MessageQueue mq) throws MQclientException;
//查找消息队列中最小的偏移量
1ong minoffset(final MessageQueue mq)
//根据偏移量查找消息
MessageExt viewMessage(final string offsetMsgId)throws RemotingException,MQBrokerException,
InterruptedException,MQclientException;
//根据条件查找消息
QueryResult queryMessage(final string topic,final string key,final int maxNum,final long begin,
final long end) throws MQclientException,InterruptedException;
查找消息给的条件比较丰富,所以认为是根据条件查找消息,可以传入主题topic,业务key,maxNum,当前的偏移量begin起始偏移量end终止偏移量等。
//根据消息ID和主题查找消息
MessageExt viewMessage(string topic,string msgId) throws RemotingException,MQBrokerException,InterruptedException,MQclientException;
②MQProducer
MQProducer大部分是以send开头,指得都是发送消息的意思。
//启动
void start( throws MQclientException;
//关闭
void shutdown();
//查找该主题下所有消息
List<MessageQueue> fetchpublishMessageQueues(final string topic) throws MoclientException;
//同步发送消息
sendResult send(final Message msg) throws MQclientException,RemotingException,MQB rokerException,
InterruptedException;
//同步超时发送消息
sendResult send(final Message msg,fina7 long timeout)throws Moc7ientException,
RemotingException,MQBrokerException,InterruptedException;
以上两个send的区别是:同时发送消息是一个参数,如果当前一直未得到broker的响应就会一直阻塞;另一个是两个参数,只会阻塞设置的timeout时间。
//异步发送消息(send没有返回值是异步发送)
void send(final Message msg,fina1 sendcallback sendcallback) throws MaclientException,
RemotingException, InterruptedException;
//异步超时发送消息
void send(final Message msg,final sendcallback sendcallback,final long timeout)
throws MQclientException,RemotingException,InterruptedException;
对于异步发送消息会传递一个回调函数,这个回调函数是用来接收消息发送结果。异步发送最大特点是调用send方法之后立即返回,通过回调函数接收发送结果。
//发送单向消息
void sendoneway(final Message msg) throws MQclientException,RemotingException,
InterruptedException;
//选择指定队列同步发送消息
sendResult send(final Message msg,final MessageQueue mq) throws MQclientException,
RemotingException,MQBrokerException,InterruptedException;
Message msg,MessageQueue mq意思为在发送消息时可以给指定队列发送,send方法有返回值,所以就是同步的给指定队列发送消息。
//选择指定队列异步发送消息
void send(final Message msg,fina1 MessageQueue mq,final sendcallback sendcallback)
throws MaclientException,RemotingException,InterruptedException;
//选择指定队列单项发送消息
void sendoneway(final Message msg,final MessageQueue mq) throws MoclientException,
RemotingException,MQBrokerException,InterruptedException;
//选择指定队列异步发送消息
void send(final Message msg,final MessageQueue mq,final sendcallback sendcallback)
throws MQclientException,RemotingException,InterruptedException;
//选择指定队列单项发送消息
void sendoneway(final Message msg,final MessageQueue mq) throws MQclientException,
RemotingException,InterruptedException;
//批量发送消息
sendResult send(final co1lection<Message> msgs) throws MQclientException,
RemotingException,MQBrokerException, InterruptedException;
传递的不是一个msgs,而是msgs的集合co1lection,而是一次发送多个消息,也就是发送批量消息。
以上方法在MQProducer接口已经被定义出,DefautMQProducer对MQProducer进行了实现。
(2)属性介绍
DefautMQProduce的位置如下图:
DefautMQProducer属性介绍:producerGroup:生产者所属组
createTopicKey:默认Topic
defaultTopicQueueNums:默认主题在每一个Broker队列数量
sendMsgTimeout:发送消息默认超时时间,默认3s
compressMsgBodyoverHowmuch:消息体超过该值则启用压缩,默认4k
retryTimeswhensendFailed:同步方式发送消息重试次数,默认为2,总共执行3次
retryTimeswhensendAsyncFailed:异步方法发送消息重试次数,默认为2
retryAnotherBrokerwhenNotstoreOK:消息重试时选择另外一个Broker时,是否不等待存储结果就返回,默认为false
maxMessagesize:允许发送的最大消息长度,默认为4M。在发送批量消息时,消息长度也不能超过4M。