一.同步发送
1. @RequestMapping("/send") 2. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 3. for (int i = 0; i < 100; i++) { 4. final int index = i; 5. String msg = "demo msg test"; 6. logger.info("开始发送消息:" + msg); 7. Message sendMsg = new Message("DemoTopic", "DemoTag", msg.getBytes()); 8. //默认3秒超时 9. SendResult sendResult = defaultMQProducer.send(sendMsg); 10. logger.info("消息发送响应信息:" + sendResult.toString() + "当前为第" + i + "次"); 11. } 12. }
二.异步发送
如果对响应时间有要求(要求程序不阻塞),可以异步调用
配置类代码不变 在调用时改变方法即可
1. package cn.baocl.rocketmq.controllor; 2. 3. import cn.baocl.rocketmq.entity.TestVo; 4. import com.alibaba.rocketmq.client.exception.MQBrokerException; 5. import com.alibaba.rocketmq.client.exception.MQClientException; 6. import com.alibaba.rocketmq.client.producer.DefaultMQProducer; 7. import com.alibaba.rocketmq.client.producer.SendCallback; 8. import com.alibaba.rocketmq.client.producer.SendResult; 9. import com.alibaba.rocketmq.common.message.Message; 10. import com.alibaba.rocketmq.remoting.exception.RemotingException; 11. import org.slf4j.Logger; 12. import org.slf4j.LoggerFactory; 13. import org.springframework.beans.factory.annotation.Autowired; 14. import org.springframework.web.bind.annotation.RequestMapping; 15. import org.springframework.web.bind.annotation.RestController; 16. 17. 18. @RestController 19. @RequestMapping("/test") 20. public class TestControllor { 21. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class); 22. 23. /** 24. * 使用RocketMq的生产者 25. */ 26. @Autowired 27. private DefaultMQProducer defaultMQProducer; 28. 29. @RequestMapping("/send") 30. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 31. for (int i = 0; i < 100; i++) { 32. final int index = i; 33. String msg = "demo msg test"; 34. logger.info("开始发送消息:" + msg); 35. Message sendMsg = new Message("DemoTopic", "DemoTag", msg.getBytes()); 36. //异步传输方法 37. defaultMQProducer.send(sendMsg, new SendCallback() { 38. @Override 39. public void onSuccess(SendResult sendResult) { 40. System.out.printf("%-10d OK %s %n", index, 41. sendResult.getMsgId()); 42. } 43. @Override 44. public void onException(Throwable e) { 45. System.out.printf("%-10d Exception %s %n", index, e); 46. e.printStackTrace(); 47. } 48. }); 49. } 50. } 51. }
三.单向传输
单向传输用于需要中等可靠性的情况(只发送,不调用回调 直接返回),例如日志收集。
1. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 2. for (int i = 0; i < 100; i++) { 3. final int index = i; 4. String msg = "demo msg test"; 5. logger.info("开始发送消息:" + msg); 6. //单项调用 7. Message sendMsg = new Message("DemoTopic", "DemoTag", msg.getBytes()); 8. //默认3秒超时 9. defaultMQProducer.sendOneway(sendMsg); 10. } 11. }