3. Console监控平台说明
这里不做过多介绍,可以参考以下文章
官网地址:https://github.com/apache/roc...
其他博客地址:https://guozh.net/rocketmqzhi...
3. 案例测试
案例整合环境:SpringBoot环境案例来源于网络
3.1 pom.xml文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.coderprogramming.rocketmq</groupId> <artifactId>rocketmq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rocketmq</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
3.2 Producer生产者
** * @Description: 生产者 * @author Coder编程 * @date 2019/5/8 17:08 */ @Component public class Producer { /** * 生产者的组名 */ @Value("${apache.rocketmq.producer.producerGroup}") private String producerGroup; /** * NameServer 地址 */ @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; public void orderedProducer() throws MQClientException, InterruptedException { /** * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例 * 注意:ProducerGroupName需要由应用来保证唯一 * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键, * 因为服务器会回查这个Group下的任意一个Producer */ DefaultMQProducer producer = new DefaultMQProducer(producerGroup); producer.setNamesrvAddr(namesrvAddr); /** * Producer对象在使用之前必须要调用start初始化,初始化一次即可 注意:切记不可以在每次发送消息时,都调用start方法 */ producer.start(); /** * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。 * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态 * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高, * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。 */ try { for (int i = 0; i < 10; i++) { Message msg = new Message("Topic1",// topic "TagA",// tag "001",// key ("Send Msg:Hello MetaQ1").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); Message msg2 = new Message("Topic2",// topic "TagB",// tag "002",// key ("Send Msg:Hello MetaQ2").getBytes());// body SendResult sendResult2 = producer.send(msg2); System.out.println(sendResult2); Message msg3 = new Message("Topic3",// topic "TagC",// tag "003",// key ("Send Msg:Hello MetaQ3").getBytes());// body SendResult sendResult3 = producer.send(msg3); System.out.println(sendResult3); } } catch (Exception e) { e.printStackTrace(); } /** * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己 * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法 */ producer.shutdown(); } }
3.3 Consumer消费者
/** * @Description: 消费者 * @author Coder编程 * @date 2019/5/8 17:08 */ @Component public class Consumer { /** * 生产者的组名 */ @Value("${apache.rocketmq.producer.producerGroup}") private String producerGroup; /** * NameServer 地址 */ @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; /** * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。 * 但是实际PushConsumer内部是使用长轮询Pull方式从Broker拉消息,然后再回调用户Listener方法 */ public void orderedConsumer() throws InterruptedException,MQClientException { /** * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例 * 注意:ConsumerGroupName需要由应用来保证唯一 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(producerGroup); // consumer.setNamesrvAddr("10.10.0.102:9876"); consumer.setNamesrvAddr(namesrvAddr); /** * 订阅指定topic下tags分别等于TagA或TagC或TagD */ consumer.subscribe("Topic1", "TagA || TagC || TagD"); /** * 订阅指定topic下所有消息<br> * 注意:一个consumer对象可以订阅多个topic */ consumer.subscribe("Topic2", "*"); consumer.subscribe("Topic3", "*"); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.registerMessageListener(new MessageListenerConcurrently() { /** * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息 */ @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); MessageExt msg = msgs.get(0); if (msg.getTopic().equals("Topic1")) { if (null != msg.getTags()) { // 执行Topic1的消费逻辑 if (msg.getTags().equals("TagA")) { // 执行TagA的消费 System.out.println("TagA开始。"); } else if (msg.getTags().equals("TagC")) { System.out.println("TagC开始。"); // 执行TagC的消费 } else if (msg.getTags().equals("TagD")) { // 执行TagD的消费 System.out.println("TagD开始。"); } } } else if (msg.getTopic().equals("Topic2")) { // 执行Topic2的消费逻辑 System.out.println("Topic2"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可 */ consumer.start(); System.out.println("Consumer Started."); } }
3.3 properties配置文件
# 消费者的组名 apache.rocketmq.consumer.PushConsumer=PushConsumer # 生产者的组名 apache.rocketmq.producer.producerGroup=Producer # NameServer地址 apache.rocketmq.namesrvAddr=192.168.220.72:9876 # 设置应用端口 server.port=8089
3.4 测试代码
/** * @author Coder编程 * @Title: HelloWord * @ProjectName rocketmq * @Description: Hello World * @date 2019/5/814:14 */ @RestController public class Test { @Autowired private Producer producer; @Autowired private Consumer consumer; @RequestMapping("/test") public String testMQ2() { try { System.out.println("-----------------开始生产-----------------"); producer.orderedProducer(); System.out.println("-----------------开始消费-----------------"); consumer.orderedConsumer(); } catch (Exception e) { e.printStackTrace(); } return "success"; } }
4.奉上源码
以上安装jar包和案例测试源码已经上传至GitHub/Gitee
源码地址: