8 Spring Boot整合RocketMQ
8.1 Maven依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version> </dependency>
8.2 配置文件
server: port: 0 rocketmq: name-server: 127.0.0.1:9876 producer: group: my_mq_one # 指定组名 必要参数
8.3 生产者代码
/** * @desc: 消息生产者 * @author: YanMingXin * @create: 2021/9/15-12:20 **/ @Service public class MessageProvider { /** * 注入RocketMQTemplate */ @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送普通消息 * * @param message * @return */ public boolean sendMessage(String message) { rocketMQTemplate.convertAndSend("Topic1:TagA", message); return true; } /** * 发送Spring的消息 * * @param message * @return */ public boolean sendSpringMessage(String message) { rocketMQTemplate.send("Topic1:TagA", MessageBuilder.withPayload(message).build()); return true; } /** * 发送异步消息 * * @param message * @return */ public boolean sendAsyncMessage(String message) { //发送异步消息 rocketMQTemplate.asyncSend("Topic1:TagA", message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { //发送成功 return; } @Override public void onException(Throwable throwable) { //发送失败 return; } }); return true; } /** * 发送顺序消息 * 注:需要加上synchronized,消费者多线程下会不保证顺序 * * @param list * @return */ public synchronized boolean sendAscMessage(List<String> list) { for (String str : list) { //发送顺序消息 rocketMQTemplate.syncSendOrderly("Topic1", str, str + "hash"); } return true; } }
8.4 消费者代码
/** * @desc: 消息消费者 * @author: YanMingXin * @create: 2021/9/15-12:20 **/ @RocketMQMessageListener(topic = "Topic1", consumerGroup = "my_mq_consumer_one") public class MessageConsumer implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("Get message is " + s); } }
8.5 测试
@SpringBootTest class RocketmqSpringBootApplicationTests { @Autowired private MessageProvider provider; @Test void contextLoads() { provider.sendMessage("Hello World"); provider.sendSpringMessage("Hello World By Spring"); provider.sendAsyncMessage("Hello World Async Message"); List<String> strings = Arrays.asList("A", "B", "C"); provider.sendAscMessage(strings); } }
测试结果: