二、RocketMQ安装与配置
1.前提是Linux中安装了JDK8及以上的版本。
#查看是否安装JDK echo $JAVA_HOME
2.官网下载安装包-
RocketMQ最新版本:5.1
下载地址:https://rocketmq.apache.org/zh/release-notes/
3.上传到linux下并安装(使用xftp工具、rz命令、或者直接拖拽)
4.将文件解压并修改名称
#将文件解压 unzip rocketmq-all-5.1.2-bin-release.zip #修改文件名称 mv rocketmq-all-5.1.2-bin-release rocketmq-5.1.2 #文件移动 mv rocketmq-5.1.2 /usr/local
5.修改bin/runserver.sh文件
6.修改bin/runbroker.sh文件
7.修改环境变量
#打开配置文件 vim /etc/profile #添加如下环境变量 export JAVA_HOME=/usr/local/jdk1.8 export PATH=$PATH:$JAVA_HOME/bin export ROCKETMQ_HOME=/usr/local/rocketmq-5.1.2 export PATH=$PATH:$ROCKETMQ_HOME/bin
8.让配置生效
source /etc/profile
9.进入rocketMQ启动NameServer服务和broker服务。
#启动NameServer nohup sh mqnamesrv & #启动broker nohup sh mqbroker -n localhost:9876 &
如果遇到如下问题属于正常现象,或者说一定会遇到:
[root@localhost bin]# nohup: ignoring input and appending output to ‘nohup.out’
先通过命令查看服务是否启动:
jps
只要出现以下的信息表示启动成功:
三、RocketMQ管理命令
- 启动namesrv和broker
#启动NameServer nohup sh mqnamesrv & #启动broker nohup sh mqbroker -n localhost:9876 &
- 查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log #查看日志 tail -f ~/logs/rocketmqlogs/broker.log #查看日志
- 新增topic
mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t topicWarning
- 查看某个topic的状态
mqadmin topicStatus -n localhost:9876 -t topicWarning
- 查看所有消费组group
mqadmin consumerProgress -n localhost:9876
- 查看所有topic
mqadmin topicList -n localhost:9876
- 删除topic
mqadmin deleteTopic -n localhost:9876 -c DefaultCluster -t topicWarning
- 关闭namesrv和broker服务
mqshutdown namesrv mqshutdown broker
四、RocketMQ整合springboot
4.1 创建生产者工程 springboot-rocketmq-producer
1.在pom.xml文件中添加依赖
<!--rocketmq与springboot整合包--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency>
2.配置文件
# nameserver的地址 rocketmq.name-server=192.168.66.100:9876 #指定生产组名称 rocketmq.producer.group=my-group
3.测试类
@SpringBootTest class DemoApplicationTests { @Resource private RocketMQTemplate rocketMQTemplate; @Test void testSendMessage(){ //向broker发消息 /* * boot_topic:消息的主题 * hello mq:消息的内容 */ rocketMQTemplate.convertAndSend("boot_topic","hello mq"); } }
4.2创建消费者springboot-rocketmq-producer工程
1.在pom.xml文件中添加依赖
<!--rocketmq与springboot整合包--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependcy>
2.配置文件
# nameserver的地址 rocketmq.name-server=192.168.66.100:9876
3.创建消费者
@Component /** * consumerGroup:消费组名称随便写 */ @RocketMQMessageListener(topic = "boot_topic",consumerGroup = "boot_consumer_group") public class Consumer implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println(message); } }
五、RocketMQ架构
5.1 技术架构
RocketMQ架构上主要分为四部分,如上图所示:
- Producer:消息发布的角色,支持分布式集群方式部署。生产者将消息发送给broker时,broker会返回消息的发送状态。
- Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式消费消息。
- NameServer:管理Broker代理服务器。向broker发送心跳,监控broker的状态。如果有broker挂掉会实时更新broker列表。同时,也会存储维护broker中的主题列表。给生产者和消费者提供最新的路由信息。
- BrokerServer:RocketMQ的核心,负责消息的接收和转发。NameServer和BrokerServer共同构成了消息中间件。
5.2 部署架构
RocketMQ 网络部署特点:
- NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
- Broker部署相对复杂,Broker分为Master(主)与Slave(从),一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。(之前我们在Linux上安装的是一个Master的服务,在broker配置文件broker.conf中brokerID为0的是Master主机。在一组主从broker中brokerName都是相同的。)
- Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
- Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息。
- 结合部署架构图,集群工作流程可作如下描述:
- 启动NameServer,通过监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
- Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建Topic(4.4之后的版本会自动创建Topic),创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,包含Topic中所有队列列表然后选择一个队列,与队列所在的Broker建立长连接再向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
六、RocketMQ高级特性
6.1 消息存储
目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘损坏,否则一般是不会出现无法持久化的故障问题。
同步刷盘指的是生产者一生产消息就会将消息放在broker中等待消费者消费,这期间broker是不能进行其他工作的;异步刷盘指的是消息存储在内存中,而不是直接写入磁盘。当消息被发送时,消息会被存储到内存中,并异步刷盘到磁盘中,当小费者消费完磁盘(也就是broker中的消息后)同步刷盘的可靠性高于异步刷盘但是效率要低。
生产端发送同步消息
package com.zj.Producer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; public class SyncProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException { //实例化生产者,并指明生产者所属的生产组 DefaultMQProducer producer = new DefaultMQProducer("producerGroup01"); //设置nameserver地址 producer.setNamesrvAddr("192.168.66.100:9876"); //启动生产者 producer.start(); //创建消息体,并指定topic,tag(标识商品的类型) for (int i = 0; i < 10; i++) { Message message = new Message("boot_topic", "tagA", "hello MQ".getBytes(RemotingHelper.DEFAULT_CHARSET)); //发送消息 SendResult send = producer.send(message); //打印消息返回结果 System.out.println(send); } //消息发送完毕关闭生产者 producer.shutdown(); } }
启动MQ服务并发送消息:
D:\java\jdk8\bin\java.exe "-javaagent:D:\Java\IntelliJ IDEA 2019.3.4\lib\idea_rt.jar=49502:D:\Java\IntelliJ IDEA 2019.3.4\bin" -Dfile.encoding=UTF-8 -classpath D:\Java\jdk8\jre\lib\charsets.jar;D:\Java\jdk8\jre\lib\deploy.jar;D:\Java\jdk8\jre\lib\ext\access-bridge-64.jar;D:\Java\jdk8\jre\lib\ext\cldrdata.jar;D:\Java\jdk8\jre\lib\ext\dnsns.jar;D:\Java\jdk8\jre\lib\ext\jaccess.jar;D:\Java\jdk8\jre\lib\ext\jfxrt.jar;D:\Java\jdk8\jre\lib\ext\localedata.jar;D:\Java\jdk8\jre\lib\ext\nashorn.jar;D:\Java\jdk8\jre\lib\ext\sunec.jar;D:\Java\jdk8\jre\lib\ext\sunjce_provider.jar;D:\Java\jdk8\jre\lib\ext\sunmscapi.jar;D:\Java\jdk8\jre\lib\ext\sunpkcs11.jar;D:\Java\jdk8\jre\lib\ext\zipfs.jar;D:\Java\jdk8\jre\lib\javaws.jar;D:\Java\jdk8\jre\lib\jce.jar;D:\Java\jdk8\jre\lib\jfr.jar;D:\Java\jdk8\jre\lib\jfxswt.jar;D:\Java\jdk8\jre\lib\jsse.jar;D:\Java\jdk8\jre\lib\management-agent.jar;D:\Java\jdk8\jre\lib\plugin.jar;D:\Java\jdk8\jre\lib\resources.jar;D:\Java\jdk8\jre\lib\rt.jar;D:\Java\code\springbootcode\sb_rocketMQ_producer\target\classes;D:\Java\apache-maven-3.8.3\repositories\org\springframework\boot\spring-boot-starter\2.7.0\spring-boot-starter-2.7.0.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\boot\spring-boot\2.7.0\spring-boot-2.7.0.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\spring-context\5.3.20\spring-context-5.3.20.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\spring-beans\5.3.20\spring-beans-5.3.20.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\spring-expression\5.3.20\spring-expression-5.3.20.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\boot\spring-boot-autoconfigure\2.7.0\spring-boot-autoconfigure-2.7.0.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\boot\spring-boot-starter-logging\2.7.0\spring-boot-starter-logging-2.7.0.jar;D:\Java\apache-maven-3.8.3\repositories\ch\qos\logback\logback-classic\1.2.11\logback-classic-1.2.11.jar;D:\Java\apache-maven-3.8.3\repositories\ch\qos\logback\logback-core\1.2.11\logback-core-1.2.11.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\logging\log4j\log4j-to-slf4j\2.17.2\log4j-to-slf4j-2.17.2.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\logging\log4j\log4j-api\2.17.2\log4j-api-2.17.2.jar;D:\Java\apache-maven-3.8.3\repositories\org\slf4j\jul-to-slf4j\1.7.36\jul-to-slf4j-1.7.36.jar;D:\Java\apache-maven-3.8.3\repositories\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\spring-core\5.3.20\spring-core-5.3.20.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\spring-jcl\5.3.20\spring-jcl-5.3.20.jar;D:\Java\apache-maven-3.8.3\repositories\org\yaml\snakeyaml\1.30\snakeyaml-1.30.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-spring-boot-starter\2.0.3\rocketmq-spring-boot-starter-2.0.3.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-spring-boot\2.0.3\rocketmq-spring-boot-2.0.3.jar;D:\Java\apache-maven-3.8.3\repositories\org\slf4j\slf4j-api\1.7.36\slf4j-api-1.7.36.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-client\4.5.1\rocketmq-client-4.5.1.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-common\4.5.1\rocketmq-common-4.5.1.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\commons\commons-lang3\3.12.0\commons-lang3-3.12.0.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-acl\4.5.1\rocketmq-acl-4.5.1.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-remoting\4.5.1\rocketmq-remoting-4.5.1.jar;D:\Java\apache-maven-3.8.3\repositories\com\alibaba\fastjson\1.2.51\fastjson-1.2.51.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-all\4.1.77.Final\netty-all-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-buffer\4.1.77.Final\netty-buffer-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec\4.1.77.Final\netty-codec-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-dns\4.1.77.Final\netty-codec-dns-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-haproxy\4.1.77.Final\netty-codec-haproxy-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-http\4.1.77.Final\netty-codec-http-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-http2\4.1.77.Final\netty-codec-http2-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-memcache\4.1.77.Final\netty-codec-memcache-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-mqtt\4.1.77.Final\netty-codec-mqtt-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-redis\4.1.77.Final\netty-codec-redis-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-smtp\4.1.77.Final\netty-codec-smtp-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-socks\4.1.77.Final\netty-codec-socks-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-stomp\4.1.77.Final\netty-codec-stomp-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-xml\4.1.77.Final\netty-codec-xml-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-common\4.1.77.Final\netty-common-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-handler\4.1.77.Final\netty-handler-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-handler-proxy\4.1.77.Final\netty-handler-proxy-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-resolver\4.1.77.Final\netty-resolver-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-resolver-dns\4.1.77.Final\netty-resolver-dns-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport\4.1.77.Final\netty-transport-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-rxtx\4.1.77.Final\netty-transport-rxtx-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-sctp\4.1.77.Final\netty-transport-sctp-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-udt\4.1.77.Final\netty-transport-udt-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-classes-epoll\4.1.77.Final\netty-transport-classes-epoll-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-native-unix-common\4.1.77.Final\netty-transport-native-unix-common-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-classes-kqueue\4.1.77.Final\netty-transport-classes-kqueue-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-resolver-dns-classes-macos\4.1.77.Final\netty-resolver-dns-classes-macos-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-native-epoll\4.1.77.Final\netty-transport-native-epoll-4.1.77.Final-linux-x86_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-native-epoll\4.1.77.Final\netty-transport-native-epoll-4.1.77.Final-linux-aarch_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-native-kqueue\4.1.77.Final\netty-transport-native-kqueue-4.1.77.Final-osx-x86_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-native-kqueue\4.1.77.Final\netty-transport-native-kqueue-4.1.77.Final-osx-aarch_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-resolver-dns-native-macos\4.1.77.Final\netty-resolver-dns-native-macos-4.1.77.Final-osx-x86_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-resolver-dns-native-macos\4.1.77.Final\netty-resolver-dns-native-macos-4.1.77.Final-osx-aarch_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-tcnative-boringssl-static\2.0.52.Final\netty-tcnative-boringssl-static-2.0.52.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-tcnative-classes\2.0.52.Final\netty-tcnative-classes-2.0.52.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-tcnative-boringssl-static\2.0.52.Final\netty-tcnative-boringssl-static-2.0.52.Final-linux-x86_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-tcnative-boringssl-static\2.0.52.Final\netty-tcnative-boringssl-static-2.0.52.Final-linux-aarch_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-tcnative-boringssl-static\2.0.52.Final\netty-tcnative-boringssl-static-2.0.52.Final-osx-x86_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-tcnative-boringssl-static\2.0.52.Final\netty-tcnative-boringssl-static-2.0.52.Final-osx-aarch_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-tcnative-boringssl-static\2.0.52.Final\netty-tcnative-boringssl-static-2.0.52.Final-windows-x86_64.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-logging\4.5.1\rocketmq-logging-4.5.1.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-srvutil\4.5.1\rocketmq-srvutil-4.5.1.jar;D:\Java\apache-maven-3.8.3\repositories\commons-cli\commons-cli\1.2\commons-cli-1.2.jar;D:\Java\apache-maven-3.8.3\repositories\com\google\guava\guava\19.0\guava-19.0.jar;D:\Java\apache-maven-3.8.3\repositories\commons-codec\commons-codec\1.15\commons-codec-1.15.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\spring-messaging\5.3.20\spring-messaging-5.3.20.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\spring-aop\5.3.20\spring-aop-5.3.20.jar;D:\Java\apache-maven-3.8.3\repositories\com\fasterxml\jackson\core\jackson-databind\2.13.3\jackson-databind-2.13.3.jar;D:\Java\apache-maven-3.8.3\repositories\com\fasterxml\jackson\core\jackson-annotations\2.13.3\jackson-annotations-2.13.3.jar;D:\Java\apache-maven-3.8.3\repositories\com\fasterxml\jackson\core\jackson-core\2.13.3\jackson-core-2.13.3.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\boot\spring-boot-starter-validation\2.7.0\spring-boot-starter-validation-2.7.0.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\tomcat\embed\tomcat-embed-el\9.0.63\tomcat-embed-el-9.0.63.jar;D:\Java\apache-maven-3.8.3\repositories\org\hibernate\validator\hibernate-validator\6.2.3.Final\hibernate-validator-6.2.3.Final.jar;D:\Java\apache-maven-3.8.3\repositories\jakarta\validation\jakarta.validation-api\2.0.2\jakarta.validation-api-2.0.2.jar;D:\Java\apache-maven-3.8.3\repositories\org\jboss\logging\jboss-logging\3.4.3.Final\jboss-logging-3.4.3.Final.jar;D:\Java\apache-maven-3.8.3\repositories\com\fasterxml\classmate\1.5.1\classmate-1.5.1.jar;D:\Java\apache-maven-3.8.3\repositories\org\projectlombok\lombok\1.18.24\lombok-1.18.24.jar com.zj.Producer.SyncProducer SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712770000, offsetMsgId=C0A8426400002A9F0000000000000317, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=3], queueOffset=0] SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712960001, offsetMsgId=C0A8426400002A9F00000000000003FF, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=0], queueOffset=1] SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712960002, offsetMsgId=C0A8426400002A9F00000000000004E7, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=1], queueOffset=0] SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712960003, offsetMsgId=C0A8426400002A9F00000000000005CF, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=2], queueOffset=0] SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712960004, offsetMsgId=C0A8426400002A9F00000000000006B7, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=3], queueOffset=1] SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712960005, offsetMsgId=C0A8426400002A9F000000000000079F, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=0], queueOffset=2] SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712960006, offsetMsgId=C0A8426400002A9F0000000000000887, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=1], queueOffset=1] SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712960007, offsetMsgId=C0A8426400002A9F000000000000096F, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=2], queueOffset=1] SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712A60008, offsetMsgId=C0A8426400002A9F0000000000000A57, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=3], queueOffset=2] SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712A60009, offsetMsgId=C0A8426400002A9F0000000000000B3F, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=0], queueOffset=3] 20:04:10.038 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.66.100:10911] result: true 20:04:10.038 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.66.100:9876] result: true Process finished with exit code 0
生产端发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。主要应用于对性能要求较高但是对可靠性不高的情况下。
package com.zj.Producer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.CountDownLatch2; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; import java.util.concurrent.TimeUnit; public class AsyncProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException { //创建生产者实例 DefaultMQProducer producer = new DefaultMQProducer("producerGroup01"); //设置nameserver地址 producer.setNamesrvAddr("192.168.66.100:9876"); //启动生产者 producer.start(); //设置发送消息失败时重试次数 producer.setRetryTimesWhenSendAsyncFailed(0); //根据消息数量实例化倒计时器 CountDownLatch2 countDownLatch2 = new CountDownLatch2(10); for (int i = 0; i < 10; i++) { final int index = i; Message message = new Message("boot_topic", "tagA","orderId111", "hello MQ".getBytes(RemotingHelper.DEFAULT_CHARSET)); // SendCallback接收异步返回结果的回调 producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { countDownLatch2.countDown(); System.out.println(sendResult.getMsgId()); } @Override public void onException(Throwable throwable) { countDownLatch2.countDown(); throwable.printStackTrace(); } }); } //关闭producer countDownLatch2.await(5, TimeUnit.SECONDS); } }