RocketMQ出了4的版本,而且本身这个mq有事务消息,在分布式的场景中有很好的启发性和作用,而且本身它也是阿里开源到apache的一个项目,从出身还是实力来说都很不错的。
1、新建项目sc-rocketmq,对应的pom.xml如下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>spring-cloud</groupId>
<artifactId>sc-rocketmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>sc-rocketmq</name>
<url>http://maven.apache.org</url>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.4.RELEASE</version>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Finchley.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
</project>
Producer单从分类producer的官网doc来看主要分成3种:
DefaultMQProducer
TransactionMQProducer
messagingAccessPoint.createProducer()
本文主要说的是DefaultMQProducer和TransactionMQProducer
默认的producer是DefaultMQProducer,从官方的文档来看,前四个都是对这个producer的运用只是set的值不同而已,而且是很细微的变化而已。
2、新建配置文件application.yml
server:
port: 8182
spring:
application:
name: sc-rocketmq
rocketmq:
consumer:
groupName: consumerGroup # 消费者的组名
consumeThreadMin: 2
consumeThreadMax: 5
consumeMessageBatchMaxSize: 10
topics: rocketTopic,rocketTag
producer:
groupName: producerGroup # 生产者的组名
maxMessageSize: 100
sendMsgTimeout: 1000
retryTimesWhenSendFailed: 3
namesrvAddr: 127.0.0.1:9876 # NameServer地址
3、新建消息生产者类
读取application.yml配置:
package sc.rocketmq.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@ConfigurationProperties(prefix = "rocketmq.producer")
@Configuration
public class ProducerConfig {
private String namesrvAddr;
private String groupName;
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
@Override
public String toString() {
return "ProducerConfig [namesrvAddr=" + namesrvAddr + ", groupName=" + groupName + "]";
}
}
消息生产者:
package sc.rocketmq.config;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ProducerConfigure {
Logger log = LoggerFactory.getLogger(ProducerConfigure.class);
@Autowired
private ProducerConfig producerConfigure;
/**
* 创建普通消息发送者实例
*
* @return
* @throws MQClientException
*/
@Bean
// @ConditionalOnProperty(prefix = "rocketmq.producer", value = "default", havingValue = "true")
public DefaultMQProducer defaultProducer() throws MQClientException {
log.info(producerConfigure.toString());
log.info("defaultProducer 正在创建---------------------------------------");
DefaultMQProducer producer = new DefaultMQProducer(producerConfigure.getGroupName());
producer.setNamesrvAddr(producerConfigure.getNamesrvAddr());
producer.setVipChannelEnabled(false);
producer.setRetryTimesWhenSendAsyncFailed(10);
producer.start();
log.info("rocketmq producer server开启成功---------------------------------.");
return producer;
}
}
4、新建消息消费者类
读取application.yml配置:
package sc.rocketmq.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@ConfigurationProperties(prefix = "rocketmq.consumer")
@Configuration
public class ConsumerConfig {
private String groupName;
private String namesrvAddr;
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
@Override
public String toString() {
return "ConsumerConfig [groupName=" + groupName + ", namesrvAddr=" + namesrvAddr + "]";
}
}
消息消费者类(抽象类):
package sc.rocketmq.config;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
@Configuration
public abstract class DefaultConsumerConfigure {
Logger log = LoggerFactory.getLogger(DefaultConsumerConfigure.class);
@Autowired
private ConsumerConfig consumerConfig;
// 开启消费者监听服务
public void listener(String topic, String tag) throws MQClientException {
log.info("开启" + topic + ":" + tag + "消费者-------------------");
log.info(consumerConfig.toString());
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerConfig.getGroupName());
consumer.setNamesrvAddr(consumerConfig.getNamesrvAddr());
consumer.subscribe(topic, tag);
// 开启内部类实现监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return DefaultConsumerConfigure.this.dealBody(msgs);
}
});
consumer.start();
log.info("rocketmq启动成功---------------------------------------");
}
// 处理body的业务
public abstract ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgs);
}
具体消息消费者类:
package sc.rocketmq.service;
import java.io.UnsupportedEncodingException;
import java.util.List;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
import sc.rocketmq.config.DefaultConsumerConfigure;
@Configuration
public class CustomConsumer extends DefaultConsumerConfigure implements ApplicationListener<ContextRefreshedEvent> {
Logger log = LoggerFactory.getLogger(CustomConsumer.class);
@Override
public void onApplicationEvent(ContextRefreshedEvent arg0) {
try {
super.listener("t_TopicTest", "Tag1");
} catch (MQClientException e) {
log.error("消费者监听器启动失败", e);
}
}
@Override
public ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgs) {
int num = 1;
log.info("进入");
for (MessageExt msg : msgs) {
log.info("第" + num + "次消息");
try {
String msgStr = new String(msg.getBody(), "utf-8");
log.info(msgStr);
} catch (UnsupportedEncodingException e) {
log.error("body转字符串解析失败");
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
这个CustomConsumer类实现了ApplicationListener,让他在启动的时候就开启执行DefaultConsumerConfigure的listener方法
5、新建springboot启动类RocketMqApplication.java
package sc.rocketmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RocketMqApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMqApplication.class, args);
}
}
6、新建一个Controller,引入消息生产者
package sc.rocketmq.controller;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson.JSON;
import sc.rocketmq.service.CustomConsumer;
@RestController
public class ProducerController {
Logger log = LoggerFactory.getLogger(CustomConsumer.class);
@Autowired
private DefaultMQProducer producer;
// @Autowired
// private TransactionMQProducer producer;
// @Autowired
// private TestTransactionListener testTransactionListener;
@GetMapping("/msg/product")
public void test(String info) throws Exception {
Message message = new Message("TopicTest", "Tag1", "12345", "rocketmq测试成功".getBytes());
// 这里用到了这个mq的异步处理,类似ajax,可以得到发送到mq的情况,并做相应的处理
// 不过要注意的是这个是异步的
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("传输成功");
log.info(JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable e) {
log.error("传输失败", e);
}
});
}
}
7、验证是否成功
访问http://127.0.0.1:8080/msg/product
可以看到controller产生消息,然后CustomConsumer类的dealBody方法消息消息。
源码:
https://gitee.com/hjj520/spring-cloud-2.x/tree/master/sc-apache-rocketmq
本文作者: java乐园
本文来自云栖社区合作伙伴“JAVA乐园”,了解相关信息可以关注“JAVA乐园”