高性能消息中间件 RocketMQ(二)

简介: 高性能消息中间件 RocketMQ(二)

二、RocketMQ安装与配置

1.前提是Linux中安装了JDK8及以上的版本。

#查看是否安装JDK
echo $JAVA_HOME

2.官网下载安装包-

RocketMQ最新版本:5.1

下载地址:https://rocketmq.apache.org/zh/release-notes/

3.上传到linux下并安装(使用xftp工具、rz命令、或者直接拖拽)

4.将文件解压并修改名称

#将文件解压
unzip rocketmq-all-5.1.2-bin-release.zip
#修改文件名称
mv rocketmq-all-5.1.2-bin-release rocketmq-5.1.2
#文件移动
mv rocketmq-5.1.2 /usr/local

5.修改bin/runserver.sh文件

6.修改bin/runbroker.sh文件

7.修改环境变量

#打开配置文件
vim /etc/profile
#添加如下环境变量
export JAVA_HOME=/usr/local/jdk1.8
export PATH=$PATH:$JAVA_HOME/bin
export ROCKETMQ_HOME=/usr/local/rocketmq-5.1.2
export PATH=$PATH:$ROCKETMQ_HOME/bin

8.让配置生效

source /etc/profile

9.进入rocketMQ启动NameServer服务和broker服务。

#启动NameServer
nohup sh mqnamesrv &
#启动broker
nohup sh mqbroker -n localhost:9876 &

如果遇到如下问题属于正常现象,或者说一定会遇到:

[root@localhost bin]# nohup: ignoring input and appending output to ‘nohup.out’

先通过命令查看服务是否启动:

jps

只要出现以下的信息表示启动成功:

三、RocketMQ管理命令

  • 启动namesrv和broker
#启动NameServer
nohup sh mqnamesrv &
#启动broker
nohup sh mqbroker -n localhost:9876 &
  • 查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log    #查看日志
 tail -f ~/logs/rocketmqlogs/broker.log     #查看日志
  • 新增topic
mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t topicWarning
  • 查看某个topic的状态
mqadmin topicStatus -n localhost:9876 -t topicWarning
  • 查看所有消费组group
mqadmin consumerProgress -n localhost:9876
  • 查看所有topic
mqadmin topicList -n localhost:9876
  • 删除topic
mqadmin deleteTopic -n localhost:9876 -c DefaultCluster -t topicWarning
  • 关闭namesrv和broker服务
mqshutdown namesrv
mqshutdown broker

四、RocketMQ整合springboot

4.1 创建生产者工程 springboot-rocketmq-producer

1.在pom.xml文件中添加依赖

<!--rocketmq与springboot整合包-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.3</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

2.配置文件

# nameserver的地址
rocketmq.name-server=192.168.66.100:9876
#指定生产组名称
rocketmq.producer.group=my-group

3.测试类

@SpringBootTest
class DemoApplicationTests {
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    @Test
    void testSendMessage(){
        //向broker发消息
        /*
         * boot_topic:消息的主题
         * hello mq:消息的内容
         */
        rocketMQTemplate.convertAndSend("boot_topic","hello mq");
    }
}

4.2创建消费者springboot-rocketmq-producer工程

1.在pom.xml文件中添加依赖

<!--rocketmq与springboot整合包-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.3</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependcy>

2.配置文件

# nameserver的地址
rocketmq.name-server=192.168.66.100:9876

3.创建消费者

@Component
/**
 * consumerGroup:消费组名称随便写
 */
@RocketMQMessageListener(topic = "boot_topic",consumerGroup = "boot_consumer_group")
public class Consumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println(message);
    }
}

五、RocketMQ架构

5.1 技术架构

RocketMQ架构上主要分为四部分,如上图所示:

  • Producer:消息发布的角色,支持分布式集群方式部署。生产者将消息发送给broker时,broker会返回消息的发送状态。
  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式消费消息。
  • NameServer:管理Broker代理服务器。向broker发送心跳,监控broker的状态。如果有broker挂掉会实时更新broker列表。同时,也会存储维护broker中的主题列表。给生产者和消费者提供最新的路由信息。
  • BrokerServer:RocketMQ的核心,负责消息的接收和转发。NameServer和BrokerServer共同构成了消息中间件。

5.2 部署架构

RocketMQ 网络部署特点:

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
  • Broker部署相对复杂,Broker分为Master(主)与Slave(从),一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。(之前我们在Linux上安装的是一个Master的服务,在broker配置文件broker.conf中brokerID为0的是Master主机。在一组主从broker中brokerName都是相同的。)
  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息。
  • 结合部署架构图,集群工作流程可作如下描述:
  • 启动NameServer,通过监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
  • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  • 收发消息前,先创建Topic(4.4之后的版本会自动创建Topic),创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,包含Topic中所有队列列表然后选择一个队列,与队列所在的Broker建立长连接再向Broker发消息。
  • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

六、RocketMQ高级特性

6.1 消息存储

目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘损坏,否则一般是不会出现无法持久化的故障问题。

同步刷盘指的是生产者一生产消息就会将消息放在broker中等待消费者消费,这期间broker是不能进行其他工作的;异步刷盘指的是消息存储在内存中,而不是直接写入磁盘。当消息被发送时,消息会被存储到内存中,并异步刷盘到磁盘中,当小费者消费完磁盘(也就是broker中的消息后)同步刷盘的可靠性高于异步刷盘但是效率要低。

生产端发送同步消息

package com.zj.Producer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
public class SyncProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        //实例化生产者,并指明生产者所属的生产组
        DefaultMQProducer producer = new DefaultMQProducer("producerGroup01");
        //设置nameserver地址
        producer.setNamesrvAddr("192.168.66.100:9876");
        //启动生产者
        producer.start();
        //创建消息体,并指定topic,tag(标识商品的类型)
        for (int i = 0; i < 10; i++) {
            Message message = new Message("boot_topic", "tagA", "hello MQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
            //发送消息
            SendResult send = producer.send(message);
            //打印消息返回结果
            System.out.println(send);
        }
        //消息发送完毕关闭生产者
        producer.shutdown();
    }
}

启动MQ服务并发送消息:

D:\java\jdk8\bin\java.exe "-javaagent:D:\Java\IntelliJ IDEA 2019.3.4\lib\idea_rt.jar=49502:D:\Java\IntelliJ IDEA 2019.3.4\bin" -Dfile.encoding=UTF-8 -classpath D:\Java\jdk8\jre\lib\charsets.jar;D:\Java\jdk8\jre\lib\deploy.jar;D:\Java\jdk8\jre\lib\ext\access-bridge-64.jar;D:\Java\jdk8\jre\lib\ext\cldrdata.jar;D:\Java\jdk8\jre\lib\ext\dnsns.jar;D:\Java\jdk8\jre\lib\ext\jaccess.jar;D:\Java\jdk8\jre\lib\ext\jfxrt.jar;D:\Java\jdk8\jre\lib\ext\localedata.jar;D:\Java\jdk8\jre\lib\ext\nashorn.jar;D:\Java\jdk8\jre\lib\ext\sunec.jar;D:\Java\jdk8\jre\lib\ext\sunjce_provider.jar;D:\Java\jdk8\jre\lib\ext\sunmscapi.jar;D:\Java\jdk8\jre\lib\ext\sunpkcs11.jar;D:\Java\jdk8\jre\lib\ext\zipfs.jar;D:\Java\jdk8\jre\lib\javaws.jar;D:\Java\jdk8\jre\lib\jce.jar;D:\Java\jdk8\jre\lib\jfr.jar;D:\Java\jdk8\jre\lib\jfxswt.jar;D:\Java\jdk8\jre\lib\jsse.jar;D:\Java\jdk8\jre\lib\management-agent.jar;D:\Java\jdk8\jre\lib\plugin.jar;D:\Java\jdk8\jre\lib\resources.jar;D:\Java\jdk8\jre\lib\rt.jar;D:\Java\code\springbootcode\sb_rocketMQ_producer\target\classes;D:\Java\apache-maven-3.8.3\repositories\org\springframework\boot\spring-boot-starter\2.7.0\spring-boot-starter-2.7.0.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\boot\spring-boot\2.7.0\spring-boot-2.7.0.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\spring-context\5.3.20\spring-context-5.3.20.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\spring-beans\5.3.20\spring-beans-5.3.20.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\spring-expression\5.3.20\spring-expression-5.3.20.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\boot\spring-boot-autoconfigure\2.7.0\spring-boot-autoconfigure-2.7.0.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\boot\spring-boot-starter-logging\2.7.0\spring-boot-starter-logging-2.7.0.jar;D:\Java\apache-maven-3.8.3\repositories\ch\qos\logback\logback-classic\1.2.11\logback-classic-1.2.11.jar;D:\Java\apache-maven-3.8.3\repositories\ch\qos\logback\logback-core\1.2.11\logback-core-1.2.11.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\logging\log4j\log4j-to-slf4j\2.17.2\log4j-to-slf4j-2.17.2.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\logging\log4j\log4j-api\2.17.2\log4j-api-2.17.2.jar;D:\Java\apache-maven-3.8.3\repositories\org\slf4j\jul-to-slf4j\1.7.36\jul-to-slf4j-1.7.36.jar;D:\Java\apache-maven-3.8.3\repositories\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\spring-core\5.3.20\spring-core-5.3.20.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\spring-jcl\5.3.20\spring-jcl-5.3.20.jar;D:\Java\apache-maven-3.8.3\repositories\org\yaml\snakeyaml\1.30\snakeyaml-1.30.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-spring-boot-starter\2.0.3\rocketmq-spring-boot-starter-2.0.3.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-spring-boot\2.0.3\rocketmq-spring-boot-2.0.3.jar;D:\Java\apache-maven-3.8.3\repositories\org\slf4j\slf4j-api\1.7.36\slf4j-api-1.7.36.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-client\4.5.1\rocketmq-client-4.5.1.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-common\4.5.1\rocketmq-common-4.5.1.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\commons\commons-lang3\3.12.0\commons-lang3-3.12.0.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-acl\4.5.1\rocketmq-acl-4.5.1.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-remoting\4.5.1\rocketmq-remoting-4.5.1.jar;D:\Java\apache-maven-3.8.3\repositories\com\alibaba\fastjson\1.2.51\fastjson-1.2.51.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-all\4.1.77.Final\netty-all-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-buffer\4.1.77.Final\netty-buffer-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec\4.1.77.Final\netty-codec-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-dns\4.1.77.Final\netty-codec-dns-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-haproxy\4.1.77.Final\netty-codec-haproxy-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-http\4.1.77.Final\netty-codec-http-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-http2\4.1.77.Final\netty-codec-http2-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-memcache\4.1.77.Final\netty-codec-memcache-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-mqtt\4.1.77.Final\netty-codec-mqtt-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-redis\4.1.77.Final\netty-codec-redis-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-smtp\4.1.77.Final\netty-codec-smtp-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-socks\4.1.77.Final\netty-codec-socks-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-stomp\4.1.77.Final\netty-codec-stomp-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-xml\4.1.77.Final\netty-codec-xml-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-common\4.1.77.Final\netty-common-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-handler\4.1.77.Final\netty-handler-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-handler-proxy\4.1.77.Final\netty-handler-proxy-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-resolver\4.1.77.Final\netty-resolver-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-resolver-dns\4.1.77.Final\netty-resolver-dns-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport\4.1.77.Final\netty-transport-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-rxtx\4.1.77.Final\netty-transport-rxtx-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-sctp\4.1.77.Final\netty-transport-sctp-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-udt\4.1.77.Final\netty-transport-udt-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-classes-epoll\4.1.77.Final\netty-transport-classes-epoll-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-native-unix-common\4.1.77.Final\netty-transport-native-unix-common-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-classes-kqueue\4.1.77.Final\netty-transport-classes-kqueue-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-resolver-dns-classes-macos\4.1.77.Final\netty-resolver-dns-classes-macos-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-native-epoll\4.1.77.Final\netty-transport-native-epoll-4.1.77.Final-linux-x86_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-native-epoll\4.1.77.Final\netty-transport-native-epoll-4.1.77.Final-linux-aarch_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-native-kqueue\4.1.77.Final\netty-transport-native-kqueue-4.1.77.Final-osx-x86_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-native-kqueue\4.1.77.Final\netty-transport-native-kqueue-4.1.77.Final-osx-aarch_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-resolver-dns-native-macos\4.1.77.Final\netty-resolver-dns-native-macos-4.1.77.Final-osx-x86_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-resolver-dns-native-macos\4.1.77.Final\netty-resolver-dns-native-macos-4.1.77.Final-osx-aarch_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-tcnative-boringssl-static\2.0.52.Final\netty-tcnative-boringssl-static-2.0.52.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-tcnative-classes\2.0.52.Final\netty-tcnative-classes-2.0.52.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-tcnative-boringssl-static\2.0.52.Final\netty-tcnative-boringssl-static-2.0.52.Final-linux-x86_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-tcnative-boringssl-static\2.0.52.Final\netty-tcnative-boringssl-static-2.0.52.Final-linux-aarch_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-tcnative-boringssl-static\2.0.52.Final\netty-tcnative-boringssl-static-2.0.52.Final-osx-x86_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-tcnative-boringssl-static\2.0.52.Final\netty-tcnative-boringssl-static-2.0.52.Final-osx-aarch_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-tcnative-boringssl-static\2.0.52.Final\netty-tcnative-boringssl-static-2.0.52.Final-windows-x86_64.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-logging\4.5.1\rocketmq-logging-4.5.1.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-srvutil\4.5.1\rocketmq-srvutil-4.5.1.jar;D:\Java\apache-maven-3.8.3\repositories\commons-cli\commons-cli\1.2\commons-cli-1.2.jar;D:\Java\apache-maven-3.8.3\repositories\com\google\guava\guava\19.0\guava-19.0.jar;D:\Java\apache-maven-3.8.3\repositories\commons-codec\commons-codec\1.15\commons-codec-1.15.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\spring-messaging\5.3.20\spring-messaging-5.3.20.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\spring-aop\5.3.20\spring-aop-5.3.20.jar;D:\Java\apache-maven-3.8.3\repositories\com\fasterxml\jackson\core\jackson-databind\2.13.3\jackson-databind-2.13.3.jar;D:\Java\apache-maven-3.8.3\repositories\com\fasterxml\jackson\core\jackson-annotations\2.13.3\jackson-annotations-2.13.3.jar;D:\Java\apache-maven-3.8.3\repositories\com\fasterxml\jackson\core\jackson-core\2.13.3\jackson-core-2.13.3.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\boot\spring-boot-starter-validation\2.7.0\spring-boot-starter-validation-2.7.0.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\tomcat\embed\tomcat-embed-el\9.0.63\tomcat-embed-el-9.0.63.jar;D:\Java\apache-maven-3.8.3\repositories\org\hibernate\validator\hibernate-validator\6.2.3.Final\hibernate-validator-6.2.3.Final.jar;D:\Java\apache-maven-3.8.3\repositories\jakarta\validation\jakarta.validation-api\2.0.2\jakarta.validation-api-2.0.2.jar;D:\Java\apache-maven-3.8.3\repositories\org\jboss\logging\jboss-logging\3.4.3.Final\jboss-logging-3.4.3.Final.jar;D:\Java\apache-maven-3.8.3\repositories\com\fasterxml\classmate\1.5.1\classmate-1.5.1.jar;D:\Java\apache-maven-3.8.3\repositories\org\projectlombok\lombok\1.18.24\lombok-1.18.24.jar com.zj.Producer.SyncProducer
SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712770000, offsetMsgId=C0A8426400002A9F0000000000000317, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712960001, offsetMsgId=C0A8426400002A9F00000000000003FF, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=0], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712960002, offsetMsgId=C0A8426400002A9F00000000000004E7, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712960003, offsetMsgId=C0A8426400002A9F00000000000005CF, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712960004, offsetMsgId=C0A8426400002A9F00000000000006B7, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712960005, offsetMsgId=C0A8426400002A9F000000000000079F, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=0], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712960006, offsetMsgId=C0A8426400002A9F0000000000000887, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712960007, offsetMsgId=C0A8426400002A9F000000000000096F, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=2], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712A60008, offsetMsgId=C0A8426400002A9F0000000000000A57, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=3], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712A60009, offsetMsgId=C0A8426400002A9F0000000000000B3F, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=0], queueOffset=3]
20:04:10.038 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.66.100:10911] result: true
20:04:10.038 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.66.100:9876] result: true
Process finished with exit code 0

生产端发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。主要应用于对性能要求较高但是对可靠性不高的情况下。

package com.zj.Producer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeUnit;
public class AsyncProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
        //创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("producerGroup01");
        //设置nameserver地址
        producer.setNamesrvAddr("192.168.66.100:9876");
        //启动生产者
        producer.start();
        //设置发送消息失败时重试次数
        producer.setRetryTimesWhenSendAsyncFailed(0);
        //根据消息数量实例化倒计时器
        CountDownLatch2 countDownLatch2 = new CountDownLatch2(10);
        for (int i = 0; i < 10; i++) {
            final int index = i;
            Message message = new Message("boot_topic", "tagA","orderId111", "hello MQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // SendCallback接收异步返回结果的回调
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    countDownLatch2.countDown();
                    System.out.println(sendResult.getMsgId());
                }
                @Override
                public void onException(Throwable throwable) {
                    countDownLatch2.countDown();
                    throwable.printStackTrace();
                }
            });
        }
        //关闭producer
        countDownLatch2.await(5, TimeUnit.SECONDS);
    }
}


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 存储 负载均衡
消息中间件的选择:RabbitMQ是一个明智的选择
消息中间件的选择:RabbitMQ是一个明智的选择
52 0
|
3月前
|
消息中间件 中间件 Kafka
原来RocketMQ中间件可以这么玩
消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下优势:
35 0
|
13天前
|
消息中间件 存储 监控
写了10000字:全面学习RocketMQ中间件
以上是 V 哥在授课时整理的全部 RocketMQ 的内容,在学习时重点要理解其中的含义,正所谓知其然知其所以然,希望这篇文章可以帮助兄弟们搞清楚RocketMQ的来龙去脉,必竟这是一个非常常用的分布式应用的中间件,好了,今天的内容就分享到这,我靠!已经 00:36分,建议收藏起来,慢慢消化,创作不易,喜欢请点赞转发。
|
6月前
|
消息中间件 数据库
消息中间件系列教程(18) -RabbitMQ-基于RabbitMQ解决分布式事务(思想)
消息中间件系列教程(18) -RabbitMQ-基于RabbitMQ解决分布式事务(思想)
51 0
|
6月前
|
消息中间件
消息中间件系列教程(17) -RabbitMQ-死信队列
消息中间件系列教程(17) -RabbitMQ-死信队列
130 0
|
6月前
|
消息中间件
消息中间件系列教程(16) -RabbitMQ-应答模式
消息中间件系列教程(16) -RabbitMQ-应答模式
40 0
|
6月前
|
消息中间件
消息中间件系列教程(15) -RabbitMQ-基于全局消息ID解决幂等性问题
消息中间件系列教程(15) -RabbitMQ-基于全局消息ID解决幂等性问题
50 0
|
6月前
|
消息中间件 缓存 API
消息中间件系列教程(14) -RabbitMQ-自动补偿机制
消息中间件系列教程(14) -RabbitMQ-自动补偿机制
101 0
|
2月前
|
消息中间件 网络协议 JavaScript
MQTT常见问题之微消息队列mqtt支持ipv6失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
2月前
|
消息中间件 物联网 Java
MQTT常见问题之微消息队列配置失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:

热门文章

最新文章