1,导入依赖
1. <dependency> 2. <groupId>org.apache.rocketmq</groupId> 3. <artifactId>rocketmq-spring-boot-starter</artifactId> 4. <version>2.0.3</version> 5. </dependency>
2,yml配置文件
3,新建Bean对象User,此处使用lombook简化开发,因为需要网络传输,所以需要对象实现Serializable,完成序列化
4,新建生产者SendController 并且发送同步消息 访问http://localhost:8088/demo/send
出现success!说明发送成功
1. @RestController 2. @RequestMapping("/demo") 3. public class SendController { 4. 5. @Autowired 6. RocketMQTemplate rocketMQTemplate;//模板类:建连接及断连结 7. 8. @GetMapping("/send") 9. public String send(){ 10. 11. User smoky = new User("smoky", 18); 12. 13. //同步消息 14. SendResult syncSend = rocketMQTemplate.syncSend("topic10", smoky, 3); 15. 16. return "success!"; 17. }
5,发送异步消息
1. //异步消息 2. rocketMQTemplate.asyncSend("topic10", smoky, new SendCallback() { 3. @Override 4. public void onSuccess(SendResult sendResult) { 5. //成功回调函数 6. System.out.println("sendResult = " + sendResult); 7. } 8. 9. @Override 10. public void onException(Throwable throwable) { 11. //发送失败回调函数 12. System.out.println(throwable); 13. } 14. },1000);
//单向消息
rocketMQTemplate.sendOneWay("topic10",smoky);
//延时消息
1. //延时消息 2. rocketMQTemplate.syncSend("topic10", MessageBuilder.withPayload(msg).build(),2000,2);
新建消费者DemoConsumer 添加@RocketMQMessageListener注解
1. @Service 2. //根据sql过滤 3. @RocketMQMessageListener(topic = "topic10",consumerGroup = "group1", 4. selectorType = SelectorType.SQL92, 5. selectorExpression = "age>92", 6. messageModel = MessageModel.BROADCASTING //广播模式 7. ) 8. 9. //sql过滤 10. public class DemoConsumer implements RocketMQListener<User> { 11. 12. @Override 13. public void onMessage(User user) { 14. //业务逻辑 15. System.out.println("user = " + user); 16. } 17. }
启动消费者出现如下字样则说明搭建成功,正常一次就可以了,这里为了测试多发送了几次