发送方确认机制 publisher confirm
publisher-confirms: true #确认消息是否到达broker服务器,也就是只确认是否正确到达exchange中即可,只要正确的到达exchange中,broker即可确认该消息返回给客户端
ackpublisher-returns: true #确认消息是否正确到达queue,如果没有则触发,如果有则不触发
ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { CorrelationDataEx c = (CorrelationDataEx)correlationData; System.out.println("发送消息: " + c.getMsg()); System.out.println("HelloSender 消息发送成功 :" + correlationData.toString() ); /** * 通过设置correlationData.id为业务主键,消息发送成功后去继续做候选业务。 */ } else { System.out.println("HelloSender消息发送失败" + cause); } });
ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { //Users users1 = (Users)message.getBody().toString(); //String correlationId = message.getMessageProperties().getCorrelationId(); System.out.println("Message : " + new String(message.getBody())); //System.out.println("Message : " + new String(message.getBody())); System.out.println("replyCode : " + replyCode); System.out.println("replyText : " + replyText); //错误原因 System.out.println("exchange : " + exchange); System.out.println("routingKey : " + routingKey);//queue名称 });
/** * CorrelationDataEx继承CorrelationData, 把需要发送消息的关键字段加入 * 这样confirmcallback可以返回带有关键字段的correlationData,我们可以通过这个来确定发送的是那条业务记录 */ CorrelationDataEx c = new CorrelationDataEx(); c.setId(users.getId().toString()); c.setMsg(users.toString()); /** * 加上这个,可以从returncallback参数中读取发送的json消息,否则是二进制bytes * 比如:如果returncallback触发,则表明消息没有投递到队列,则继续业务操作,比如将消息记录标志位未投递成功,记录投递次数 */ rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.convertAndSend(EXCHANGE, QUEUE_TWO_ROUTING, users, c);
消息消费
1.配置
listener: simple: prefetch: 1 #设置一次处理一个消息 acknowledge-mode: manual #设置消费端手动 ack concurrency: 3 #设置同时有3个消费者消费,需要3个消费者实例
2.代码
@RabbitHandler @RabbitListener(queues = QUEUE_ONE_ROUTING) //containerFactory = "rabbitListenerContainerFactory", concurrency = "2") public void process(Users users, Channel channel, Message message) throws IOException { System.out.println("HelloReceiver收到 : " + users.toString() + "收到时间" + new Date()); try { //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 // 否则消息服务器以为这条消息没处理掉 后续还会在发 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); System.out.println("receiver success"); } catch (IOException e) { e.printStackTrace(); //丢弃这条消息,则不会重新发送了 //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); System.out.println("receiver fail"); } }
验证
创建消息生产者和消费者
生产者
集群配置:
spring: application: name: rabbitmq-producer-demo rabbitmq: # 单点配置 #host: localhost #port: 5672 # 集群的配置 addresses: 10.156.13.92:5672,10.156.13.93:5672,10.156.13.94:5672 username: rabbitmq #guest是缺省,只能localhost网络访问,要访问远程网络,需要创建用户 password: 123456 # 像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。那RabbitMQ呢?RabbitMQ也有类似的权限管理。 # 在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当于一个相对独立的RabbitMQ服务器, # 每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。 # Virtual Name一般以/开头 virtual-host: / # 确认消息是否正确到达queue,如果没有则触发,如果有则不触发 publisher-returns: on # 确认消息是否到达broker服务器,也就是只确认是否正确到达exchange中即可, # 只要正确的到达exchange中,broker即可确认该消息返回给客户端ack # 如果是simple就不会回调 publisher-confirm-type: correlated template: #设置为 on 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除 mandatory: on
队列设置: 设置了queue_sleb_accept队列
@Configuration public class RabbitConfig { /** * 投保消息交换机的名字 */ public static final String EXCHANGE_SLEB_ACCEPT = "exchange_sleb_accept"; /** * 投保消息队列 */ public static final String QUEUE_SLEB_ACCEPT = "queue_sleb_accept"; /** * 投保消息路由键 */ public static final String ROUTING_KEY_ACCEPT = "routing_key_accept"; /** * 投保消息死信交换机 */ public static final String DLX_EXCHANGE_SLEB_ACCEPT = "exchange_dlx_sleb_accept"; /** * 投保消息死信队列 */ public static final String DLX_QUEUE_SLEB_ACCEPT = "queue_dlx_sleb_accept"; /** * 常用交换器类型如下: * Direct(DirectExchange):direct 类型的行为是"先匹配, 再投送". * 即在绑定时设定一个 routing_key, 消息的routing_key完全匹配时, 才会被交换器投送到绑定的队列中去。 * Topic(TopicExchange):按规则转发消息(最灵活)。 * Headers(HeadersExchange):设置header attribute参数类型的交换机。 * Fanout(FanoutExchange):转发消息到所有绑定队列。 * * 下面都是采用direct, 必须严格匹配exchange和queue * 投保消息交换机 */ @Bean("slebAcceptExchange") DirectExchange slebAcceptExchange() { return ExchangeBuilder.directExchange(EXCHANGE_SLEB_ACCEPT).durable(true).build(); } /** * 第二个参数 durable: 是否持久化,如果true,则此种队列叫持久化队列(Durable queues)。此队列会被存储在磁盘上, * 当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。 * 第三个参数 execulusive: 表示此对应只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable * 第四个参数 autoDelete: 当没有生成者/消费者使用此队列时,此队列会被自动删除。(即当最后一个消费者退订后即被删除) * * 这儿是(queue)队列持久化(durable=true),exchange也需要持久化 * ********************死信队列********************************************************** * x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 * x-dead-letter-routing-key 这里声明当前队列的死信路由key * 采用死信队列,才会用到下面的参数 * Map<String, Object> args = new HashMap<>(2); * args.put("x-dead-letter-exchange", DLX_EXCHANGE_SLEB_ACCEPT); * args.put("x-dead-letter-routing-key", ROUTING_KEY_ACCEPT); * return QueueBuilder.durable(QUEUE_SLEB_ACCEPT).withArguments(args).build(); * ********************死信队列********************************************************** * 投保消息队列 */ @Bean("slebAcceptQueue") public Queue slebAcceptQueue() { return QueueBuilder.durable(QUEUE_SLEB_ACCEPT).build(); } /** * 交换机、队列、绑定 */ @Bean("bindingSlebAcceptExchange") Binding bindingSlebAcceptExchange(@Qualifier("slebAcceptQueue") Queue queue, @Qualifier("slebAcceptExchange") DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY_ACCEPT); } /** * 投保死信交换机 */ @Bean("slebDlxAcceptExchange") DirectExchange slebDlxAcceptExchange() { return ExchangeBuilder.directExchange(DLX_EXCHANGE_SLEB_ACCEPT).durable(true).build(); } /** * 投保死信队列 */ @Bean("slebDlxAcceptQueue") public Queue slebDlxAcceptQueue() { return QueueBuilder.durable(DLX_QUEUE_SLEB_ACCEPT).build(); } /** * 死信交换机、队列、绑定 */ @Bean("bindingDlxSlebAcceptExchange") Binding bindingDlxSlebAcceptExchange(@Qualifier("slebDlxAcceptQueue") Queue queue, @Qualifier("slebDlxAcceptExchange") DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY_ACCEPT); }
生产消息
@Service public class AcceptProducerServiceImpl implements AcceptProducerService { private final Logger logger = LoggerFactory.getLogger(AcceptProducerServiceImpl.class); private final RabbitTemplate rabbitTemplate; public AcceptProducerServiceImpl(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } @Override public void sendMessage(PolicyModal policyModal) { logger.info("开始发送时间: " + DateUtils.localDateTimeToString(LocalDateTime.now()) + ",保单号: " + policyModal.getPolicyNo() + ",发送内容: " + policyModal.toString()); /* * policyDataEx继承CorrelationData, 把需要发送消息的关键字段加入 * 这样confirmcallback可以返回带有关键字段的correlationData,我们可以通过这个来确定发送的是那条业务记录 * policyno为唯一的值 */ PolicyDataEx policyDataEx = new PolicyDataEx(); policyDataEx.setId(policyModal.getPolicyNo()); policyDataEx.setMessage(policyModal.toString()); /* * 加上这个,可以从returncallback参数中读取发送的json消息,否则是二进制bytes * 比如:如果returncallback触发,则表明消息没有投递到队列,则继续业务操作,比如将消息记录标志位未投递成功,记录投递次数 */ //rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); //PolicyModal类的全限名称(包名+类名)会带入到mq, 所以消费者服务一边必须有同样全限名称的类,否则接收会失败。 rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_SLEB_ACCEPT, RabbitConfig.ROUTING_KEY_ACCEPT, policyModal, policyDataEx); }
运行验证
http://localhost:9020/sendsing
查看3台服务器控制台:看到已经创建了镜像队列,并且有一个消息在队列里面:
消费者
配置
spring: application: name: rabbitmq-consumer-demo rabbitmq: # 单点配置 #host: localhost #port: 5672 # 集群的配置 addresses: 10.156.13.92:5672,10.156.13.93:5672,10.156.13.94:5672 username: rabbitmq password: 123456 # 像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。那RabbitMQ呢?RabbitMQ也有类似的权限管理。 # 在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当于一个相对独立的RabbitMQ服务器, # 每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。 # Virtual Name一般以/开头 virtual-host: / listener: simple: prefetch: 1 #设置一次处理一个消息 acknowledge-mode: manual #设置消费端手动 ack concurrency: 3 #设置同时有3个消费者消费 #消息接收确认,可选模式:NONE(不确认)、AUTO(自动确认)、MANUAL(手动确认)
配置队列名称,主要名称和生产者里面的名称一样
public class RabbitMQConfigInfo { /** * 投保消息队列 */ public static final String QUEUE_SLEB_ACCEPT = "queue_sleb_accept"; /** * 投保消息交换机的名字 */ public static final String EXCHANGE_SLEB_ACCEPT = "exchange_sleb_accept"; /** * 投保消息路由键 */ public static final String ROUTING_KEY_ACCEPT = "routing_key_accept"; }
消费
@Service public class RabbitConsumerServiceImpl implements RabbitConsumerService { private final Logger logger = LoggerFactory.getLogger(RabbitConsumerServiceImpl.class); @RabbitHandler @RabbitListener(bindings = @QueueBinding( value = @Queue(name = QUEUE_SLEB_ACCEPT, durable = "true"), exchange = @Exchange(name = EXCHANGE_SLEB_ACCEPT, ignoreDeclarationExceptions = "true"), key = {ROUTING_KEY_ACCEPT} )) @Override public void process(Channel channel, Message message) throws IOException { String jsonStr = new String(message.getBody()); logger.info("接收信息时间: " + DateUtils.localDateTimeToString(LocalDateTime.now()) + "\n,消息:" + jsonStr); //PolicyModal类的全限名称(包名+类名)会带入到mq, 所以消费者服务一边必须有同样全限名称的类,否则接收会失败。 PolicyModal policyModal = JsonUtils.JSON2Object(jsonStr, PolicyModal.class); assert policyModal != null; try { //将message中的body获取出来, 转换为PolicyModal,再获取policyno //更根据policyno新数据库里面的标志, // todo //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 // 否则消息服务器以为这条消息没处理掉 后续还会在发 //throw new IOException("myself"); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); /*logger.info("接收处理成功:\n" + "接收信息时间: " + DateUtils.localDateTimeToString(LocalDateTime.now()) + ",保单号: " + policyModal.getPolicyNo() + "\n,消息:" + new String(message.getBody())); */ } catch (IOException e) { e.printStackTrace(); //丢弃这条消息,则不会重新发送了 //一般不丢弃,超时后mq自动会转到死信队列(如果设置了超时时间和死信交换机和队列后) //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); logger.info("接收处理失败:\n" + "接收信息时间: " + DateUtils.localDateTimeToString(LocalDateTime.now()) + ",保单号: " + policyModal.getPolicyNo() + "\n,消息:" + new String(message.getBody())); } } }
启动验证
在看各个服务器控制台:消息已经被消费,队列里面消息为0
结束
技术文章难写,这个花了前后一个礼拜的时间,希望对大家有帮助。有要验证代码的,可以发邮件:lazasha@163.com联系我,我给你发。懒,没空上github,回来再说。
END