关于springboot和rocketmq相信大家都有了解,这里直接介绍整合的方法。
这里提前下载好rocketmq的源码并运行,个人比较推荐去github直接clone代码,然后idea导入并运行,好处就是可以学习源码,并且支持debug调试,github地址如下,因为官方有教程,这里就直接跳过安装。
https://github.com/apache/rocketmq.git
打开idea,自行创建两个模块的springboot项目,一个生产者,一个消费者。
生产者的配置:
rocketmq: name-server: localhost:9876 producer: group: my-group server: port: 8081
这里填写自己真实的name-server地址和端口。
导入maven dependency,目前版本为2.0.1。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.1</version> </dependency>
启动类代码:
@SpringBootApplication public class SpringBootRocketmqProducerApplication implements CommandLineRunner { @Resource private RocketMQTemplate rocketMQTemplate; public static void main(String[] args) { SpringApplication.run(SpringBootRocketmqProducerApplication.class, args); } @Override public void run(String... args) throws Exception { rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!"); rocketMQTemplate.convertAndSend("test-topic-2", new OrderPaidEvent("orderId-0001", 88)); } } @Data @AllArgsConstructor class OrderPaidEvent implements Serializable { private String orderId; private Integer paidMoney; }
这里还是挺简单的,直接实现CommandLineRunner这个接口,复写run方法即可,然后注册RocketMQTemplate,就可以生产消息了。
消费者的配置
rocketmq: name-server: localhost:9876 server: port: 8082
name-server地址同生产者
导入maven dependency,目前版本为2.0.1。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.1</version> </dependency>
启动类代码
@SpringBootApplication public class SpringBootRocketmqConsumerApplication { public static void main(String[] args) { SpringApplication.run(SpringBootRocketmqConsumerApplication.class, args); } } @Slf4j @Service @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1") class MyConsumer1 implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("received message: {}", message); } } @Slf4j @Service @RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2") class MyConsumer2 implements RocketMQListener<OrderPaidEvent> { @Override public void onMessage(OrderPaidEvent orderPaidEvent) { log.info("received orderPaidEvent: {}", orderPaidEvent); } } @Data @AllArgsConstructor class OrderPaidEvent implements Serializable { private String orderId; private Integer paidMoney; }
最后依次启动消费者和生产者,就可以看见消费者控制台打印出如下日志表示消息消费成功。
2019-01-22 15:57:34.516 INFO 6368 --- [MessageThread_1] c.w.s.MyConsumer1 : received message: Hello, World! 2019-01-22 15:57:34.667 INFO 6368 --- [MessageThread_1] c.w.s.MyConsumer2 : received orderPaidEvent: OrderPaidEvent(orderId=orderId-0001, paidMoney=88)
参考和github链接:
https://github.com/apache/rocketmq-spring