生产者核心类介绍|学习笔记

简介: 快速学习生产者核心类介绍

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)生产者核心类介绍】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12469


生产者核心类介绍

 

Producer

相对于RocketMQ来讲,消息生产者就是客户端,也是消息的提供者。Producer将消息发送到broker,broker是消息的服务端,customer再去连接broker消费消息,故producer和customer都是RocketMQ的客户端。

image.png打开原码目录(如上图),里面有一个client,消息生产者的代码都在client模块中。我们主要研究如下图的几个类:MQAdmin、MQProducer、ClientConfig和DefautMQProducer

image.png

MQAdmin是接口,MQProducer是MQAdmin的子接口,DefautMQProducer实现了MQProduce和MQAdmin里面的一些方法,配置是在ClientConfig里。

1.方法和属性

(1)主要方法介绍

①MQAdmin

//创建主题

void createTopic(final string key,final string newTopic,final int queueNum) throwsMQclientException;

//根据时间戳从队列中查找消息偏移量

1ong searchoffset(final MessageQueue mq,final long timestamp)

//查找消息队列中最大的偏移量

long maxoffset (final MessageQueue mq) throws MQclientException;

//查找消息队列中最小的偏移量

1ong minoffset(final MessageQueue mq)

//根据偏移量查找消息

MessageExt viewMessage(final string offsetMsgId)throws RemotingException,MQBrokerException,

InterruptedException,MQclientException;

//根据条件查找消息

QueryResult queryMessage(final string topic,final string key,final int  maxNum,final long begin,

final long end) throws MQclientException,InterruptedException;

查找消息给的条件比较丰富,所以认为是根据条件查找消息,可以传入主题topic,业务key,maxNum,当前的偏移量begin起始偏移量end终止偏移量等。

//根据消息ID和主题查找消息

MessageExt viewMessage(string topic,string msgId) throws RemotingException,MQBrokerException,InterruptedException,MQclientException;

②MQProducer

MQProducer大部分是以send开头,指得都是发送消息的意思。

//启动

void start( throws MQclientException;

//关闭

void shutdown();

//查找该主题下所有消息

List<MessageQueue> fetchpublishMessageQueues(final string topic) throws MoclientException;

//同步发送消息

sendResult send(final Message msg) throws MQclientException,RemotingException,MQB rokerException,

InterruptedException;

//同步超时发送消息

sendResult send(final Message msg,fina7 long timeout)throws Moc7ientException,

RemotingException,MQBrokerException,InterruptedException;

以上两个send的区别是:同时发送消息是一个参数,如果当前一直未得到broker的响应就会一直阻塞;另一个是两个参数,只会阻塞设置的timeout时间。

//异步发送消息(send没有返回值是异步发送)

void send(final Message msg,fina1 sendcallback sendcallback) throws MaclientException,

RemotingException, InterruptedException;

//异步超时发送消息

void send(final Message msg,final sendcallback sendcallback,final long timeout)

throws MQclientException,RemotingException,InterruptedException;

对于异步发送消息会传递一个回调函数,这个回调函数是用来接收消息发送结果。异步发送最大特点是调用send方法之后立即返回,通过回调函数接收发送结果。

//发送单向消息

void sendoneway(final Message msg) throws MQclientException,RemotingException,

InterruptedException;

//选择指定队列同步发送消息

sendResult send(final Message msg,final MessageQueue mq) throws MQclientException,

RemotingException,MQBrokerException,InterruptedException;

Message msg,MessageQueue mq意思为在发送消息时可以给指定队列发送,send方法有返回值,所以就是同步的给指定队列发送消息。

//选择指定队列异步发送消息

void send(final Message msg,fina1 MessageQueue mq,final sendcallback sendcallback)

throws MaclientException,RemotingException,InterruptedException;

//选择指定队列单项发送消息

void sendoneway(final Message msg,final MessageQueue mq) throws MoclientException,

RemotingException,MQBrokerException,InterruptedException;

//选择指定队列异步发送消息

void send(final Message msg,final MessageQueue mq,final sendcallback sendcallback)

throws MQclientException,RemotingException,InterruptedException;

//选择指定队列单项发送消息

void sendoneway(final Message msg,final MessageQueue mq) throws MQclientException,

RemotingException,InterruptedException;

//批量发送消息

sendResult send(final co1lection<Message> msgs) throws MQclientException,

RemotingException,MQBrokerException, InterruptedException;

传递的不是一个msgs,而是msgs的集合co1lection,而是一次发送多个消息,也就是发送批量消息。

以上方法在MQProducer接口已经被定义出,DefautMQProducer对MQProducer进行了实现。

(2)属性介绍

DefautMQProduce的位置如下图:

image.png

DefautMQProducer属性介绍:producerGroup:生产者所属组

createTopicKey:默认Topic

defaultTopicQueueNums:默认主题在每一个Broker队列数量

sendMsgTimeout:发送消息默认超时时间,默认3s

compressMsgBodyoverHowmuch:消息体超过该值则启用压缩,默认4k

retryTimeswhensendFailed:同步方式发送消息重试次数,默认为2,总共执行3次

retryTimeswhensendAsyncFailed:异步方法发送消息重试次数,默认为2

retryAnotherBrokerwhenNotstoreOK:消息重试时选择另外一个Broker时,是否不等待存储结果就返回,默认为false

maxMessagesize:允许发送的最大消息长度,默认为4M。在发送批量消息时,消息长度也不能超过4M。

相关文章
|
消息中间件 Java Kafka
Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)
Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)
17339 1
Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)
|
4月前
|
消息中间件 存储 安全
【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑
【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑
|
4月前
|
消息中间件 网络协议 Java
【消息队列开发】 实现BrokerServer类——本体服务器
【消息队列开发】 实现BrokerServer类——本体服务器
|
3月前
|
消息中间件 存储 缓存
深入理解Kafka核心设计及原理(二):生产者
深入理解Kafka核心设计及原理(二):生产者
68 8
|
3月前
|
消息中间件 存储 负载均衡
深入理解Kafka核心设计及原理(三):消费者
深入理解Kafka核心设计及原理(三):消费者
73 8
|
5月前
|
消息中间件 Java
Java操作RabbitMQ单一生产-消费者模式
Java操作RabbitMQ单一生产-消费者模式
57 0
|
4月前
|
存储 监控 Java
详尽分享统一对象消息编程(4)—对象消息编程框架1(基本接口)
详尽分享统一对象消息编程(4)—对象消息编程框架1(基本接口)
26 0
|
4月前
|
消息中间件 存储 前端开发
【消息队列开发】 创建核心类
【消息队列开发】 创建核心类
|
4月前
|
消息中间件 Java 数据库
【消息队列开发】 实现DataBaseManager类
【消息队列开发】 实现DataBaseManager类
|
5月前
|
Java
用java实现生产者和消费者模式
用java实现生产者和消费者模式
52 1
下一篇
无影云桌面