前言
RocketMQ已经写了两章了,一章是RocketMQ认识和安装,一章是RocketMQ的工作流程和核心概念,本章我们开始使用RocketMQ来发送和接收消息。RocketMQ的消息种类非常多,比如:普通消息,顺序消息,延迟消息,批量发送,消息过滤等等。本篇文章来探讨一下 普通消息的发送
文章目录
普通消息发送
普通消息这里介绍三种发送方式,同步发送,异步发送,单向发送。我们先导入需要的依赖,版本尽量和RocketMQ的安装版本一致。
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.8.0</version></dependency></dependencies>
【注意】请保持 RocketMQ和Name Server 是启动状态,见
《RocketMQ安装》`
同步发送
同步消息是发送者发送消息,需要等待结果的返回,才能继续发送第二条消息,这是一种阻塞式模型,虽然消息可靠性高,但是阻塞导致性能低下。API : SendResult result = producer.send(message);
发送者代码示例:
japublicclassProducer { //演示消息同步发送publicstaticvoidmain(String[] args) throwsInterruptedException, RemotingException, MQClientException, MQBrokerException { //生产者DefaultMQProducerproducer=newDefaultMQProducer("syn-producerGroup"); //设置name server地址producer.setNamesrvAddr("127.0.0.1:9876"); //设置队列数量为2,默认为4,根据情况设置producer.setDefaultTopicQueueNums(2); //启动producer.start(); //发16个消息for (inti=0 ; i<16 ; i++){ Messagemessage=newMessage(); //消息主题message.setTopic("syn-topic"); //消息标签message.setTags("sms"); //添加内容message.setBody((i+"我是消息").getBytes(CharsetUtil.UTF_8)); //执行发送SendResultresult=producer.send(message); //打印结果System.out.println(result); } producer.shutdown(); } }
解释一下其中的相关类
- DefaultMQProducer :生产者组,需要指定一个组名
- producer.setNamesrvAddr(“127.0.0.1:9876”) :Name Server 的地址,生产者通过它来找到Bocker中的Topic(Topic默认可以自动创建的)
- producer.setDefaultTopicQueueNums(2) :设置Topic中的队列的数量,默认是4个
- producer.start() :启动 ,如果不启动程序不起作用
- new Message :消息的对象封装,通过它设置 topic 主题,tags 标签,body 消息内容
- SendResult result = producer.send(message) : 同步发送消息,可以立马得到返回结果,成功或者失败
- producer.shutdown() : 关闭生产者
下面是 SendResult的结构
SendResult [ sendStatus=SEND_OK, msgId=C0A8006516B018B4AAC270EF9D940000, offsetMsgId=C0A8006500002A9F0000000000008E1C, messageQueue=MessageQueue [ topic=syn-topic, brokerName=LAPTOP-20VLGCRC, queueId=3 ], queueOffset=0]
- SendStatus : 状态OK
- msgId: 发送者生成的ID
- OffsetMsgId : 由Broker生成的消息ID
- MessageQueue :队列信息
自行观察MQ的可视化插件界面,应该可以看到发送者发送过去的消息了。
异步发送
异步消息是发送者发送消息,无需等待发送结果就可以再发送第二条消息,它是通过回调的方式来获取到消息的发送结果,消息可靠性高,性能也高。API : producer.send(message,SendCallback)
示例代码:
//. . .上面案例一样,部分代码省略. . .producer.send( //创建消息对象newMessage("asyn-topic", "sms", "我是消息".getBytes(CharsetUtil.UTF_8)), //添加发送回调newSendCallback() { //发送成功结果处理publicvoidonSuccess(SendResultsendResult) { System.out.println(sendResult); } //发送异常结果处理publicvoidonException(Throwablethrowable) { System.out.println("发送异常:"+throwable.getMessage()); } } );
对于异步发送而言,发送完消息不会马上返回结果,也无需等待结果返回就能继续发送第二条消息。它通过 producer.send(message,SendCallback) -> SendCallback 回调来接收发送的结果,回调中包括了:onSuccess 和 onException两个回调方法来表示成功和失败。
单向发送
这种方式指的是发送者发送消息后无需等待Broker的结果返回,Broker也不会返回结果,消息是单向的,该方式性能最高,但是消息可靠性低。API : producer.sendOneway(message)
示例代码:
//. . .上面案例一样,部分代码省略. . .Messagemessage=newMessage("oneway-topic", "sms", "我是消息".getBytes(CharsetUtil.UTF_8)); producer.sendOneway(message);
通过 :producer.sendOneway(message)
来发送消息是没有返回结果的,也无需任何等待,性能是最高的,但是数据的安全性最低,所以对于一些可被丢失的消息,比如:操作日志等就可以使用这种模式了。
消费者案例
下面是消费者端的代码
publicclassConsumer { publicstaticvoidmain(String[] args) throwsMQClientException { //创建消费者DefaultMQPushConsumerdefaultMQPushConsumer=newDefaultMQPushConsumer("syn-consumerGroup"); //设置name server 地址defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876"); //从开始位置消费defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //广播模式defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING); //订阅defaultMQPushConsumer.subscribe("syn-topic","sms"); //注册消息监听器defaultMQPushConsumer.registerMessageListener(newMessageListenerConcurrently() { publicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt>list, ConsumeConcurrentlyContextconsumeConcurrentlyContext) { list.forEach(message->{ System.out.println(newString(message.getBody(), CharsetUtil.UTF_8)); }); returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); defaultMQPushConsumer.start(); } }
解释一下相关的类
- DefaultMQPushConsumer : 消费者组,基于push模式
- defaultMQPushConsumer.setNamesrvAddr(“127.0.0.1:9876”) : name server地址
- defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET):从哪个位置开始消费,FIRST代表最前面
- defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING) : 消息的消费模式广播模式,默认是 MessageModel.CLUSTERING 集群 模式。
- defaultMQPushConsumer.subscribe(“syn-topic”,“sms”) :订阅哪个Topic中的哪个Tags中的消息
- defaultMQPushConsumer.registerMessageListener :注册消息监听并处理消息,通常支持
MessageListenerConcurrently
并发和MessageListenerOrderly
顺序 两种监听器。当监听器监听到消息,通过回调监听器中的 consumeMessage 方法来传递和处理消息。
List<MessageExt> list
: 消息列表 MessageExt中包含了消息的Body,消息的storeSzie,queueId等信息- ConsumeConcurrentlyContext : 消费者上下文
- ConsumeConcurrentlyStatus :消息应答(签收),包括 CONSUME_SUCCESS 消费成功和 RECONSUME_LATER 消费失败两种结果。
- defaultMQPushConsumer.start() :启动消费者,这个代码要写在注册了监听器的后面。
总结
下面对三种发送方式做一个对比
- 可靠性最高: 同步发送 > 异步发送 > 单向发送
- 性能最高:单向发送 > 异步发送 > 同步发送
使用场景建议如下
- 如果是比较重要的不可丢失的消息,且对时效性要去不高建议使用同步发送,如转账消息
- 如果是不重要的可失败的消息,比如日志消息,建议使用单向发送
- 如果对时效性要求比较高,且消息不能丢失,可以尝试使用异步发送