一文带大家快速掌握RabbitMQ!(一)https://developer.aliyun.com/article/1624433
入门使用
引入RabbitMQ依赖:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency>
创建一个生产者
public class Producer { private static final String QUEUE_NAME = "test01"; public static void main(String[] args) throws IOException, TimeoutException { // 1. 创建连接工厂并配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.58.129"); connectionFactory.setPort(5672); // 设置虚拟机 connectionFactory.setVirtualHost("/test"); // 2. 通过连接工厂建立连接 Connection connection = connectionFactory.newConnection(); // 3. 通过connection创建Channel Channel channel = connection.createChannel(); // 4. 通过Channel发送数据 (exchange, routingKey, props, body) // 不指定Exchange时, 交换机默认是AMQP default, 此时就看RoutingKey // RoutingKey要等于队列名才能被路由, 否则消息会被删除 for (int i = 0; i < 5; i++) { String msg = "Learn For RabbitMQ-" + i; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("Send message : " + msg); } // 5.关闭连接 channel.close(); connection.close(); } }
创建一个消费者
public class Consumer { private static final String QUEUE_NAME = "test01"; public static void main(String[] args) throws IOException, TimeoutException { // 1. 创建连接工厂并配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.58.129"); connectionFactory.setPort(5672); // 设置虚拟机 connectionFactory.setVirtualHost("/test"); // 2. 通过连接工厂建立连接 Connection connection = connectionFactory.newConnection(); // 3. 通过connection创建Channel Channel channel = connection.createChannel(); // 4. 声明队列 (queue, durable, exclusive, autoDelete, args) channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 5. 创建消费者 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { /** * 获取消息 (监听到有消息时调用) * @param consumerTag 消费者标签, 在监听队列时可以设置autoAck为false,即手动确认(避免消息的丢失), 消息唯一性处理 * @param envelope 信封 * @param properties 消息的属性 * @param body 消息的内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("Received message : " + msg); } }; // 6. 设置Channel, 监听队列(String queue, boolean autoAck,Consumer callback) channel.basicConsume(QUEUE_NAME, true, defaultConsumer); } }
参数:
queue
:队列名称durable
:持久化,true 即使服务重启也不会被删除exclusive
:独占,true 队列只能使用一个连接,连接断开队列删除autoDelete
:自动删除,true 脱离了Exchange(连接断开),即队列没有Exchange关联时,自动删除arguments
:扩展参数autoAck
:是否自动签收(回执)
不指定Exchange时,交换机默认是AMQP default,此时就看RoutingKey
,RoutingKey要等于队列名才能被路由,否则消息会被删除
交换机属性
Name
:交换机名称
Type
:交换机类型—— direct、topic、fanout、header
Durability
:是否需要持久化,true为持久化
Auto Delete
:当最后一个绑定到Exchange上的队列删除后,即Exchange上没有队列绑定,自动删除该Exhcange
Internal
:当前Exchange是否用于RabbitMQ内部使用,大多数使用默认False
Arguments
:扩展参数,用于扩展AMQP协议定制化使用
Direct Exchange:
// Consumer // 声明交换机: // (String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object) arguments) channel.exchangeDeclare("exchangeName", BuiltinExchangeType.DIRECT, true, false, false, null); // 声明队列 (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object) args) channel.queueDeclare("queueName", true, false, false, null); // 建立绑定关系: channel.queueBind("queueName", "exchangeName", "routingKey"); // =================================================================== // Producer // 发送消息 (String exchange, String routingKey, BasicProperties props, Bytes[] body) channel.basicPublish("exchangeName", "routingKey", null, "msg".getBytes());
Topic Exchange:
// Consumer // 声明交换机: // (String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object) arguments) channel.exchangeDeclare("exchangeName", BuiltinExchangeType.TOPIC, true, false, false, null); // 声明队列 (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object) args) channel.queueDeclare("queueName", true, false, false, null); // 建立绑定关系: channel.queueBind("queueName", "exchangeName", "routingKey.#"); // =================================================================== // Producer // 发送消息 (String exchange, String routingKey, BasicProperties props, Bytes[] body) channel.basicPublish("exchangeName", "routingKey.hi", null, "msg".getBytes()); channel.basicPublish("exchangeName", "routingKey.save", null, "msg".getBytes()); channel.basicPublish("exchangeName", "routingKey.save.hi", null, "msg".getBytes());
因为使用了模糊匹配的"#
",可以匹配到发送的三条消息。因此可以收到三条消息
Fanout Exchange:
// Consumer // 声明交换机: // (String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object) arguments) channel.exchangeDeclare("exchangeName", BuiltinExchangeType.FANOUT, true, false, false, null); // 声明队列 (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object) args) channel.queueDeclare("queueName", true, false, false, null); // 建立绑定关系: //(不设置routingKey, 这里为空) channel.queueBind("queueName", "exchangeName", ""); // =================================================================== // Producer // 发送消息 (String exchange, String routingKey, BasicProperties props, Bytes[] body) // 同样routingKey为空 (也可以是任意字符串, 因为fanout并不依据routingKey) channel.basicPublish("exchangeName", "", null, "msg".getBytes());
高级特性
可靠性投递
什么是生产端的可靠性投递
- 保障消息的成功发出
- 保障MQ节点成功接收
- 发送端收到MQ节点(Broker)确认应答(已收到)
- 完善消息进行补偿机制
可靠性投递的方案一
消息落库(持久化至数据库),对消息状态进行打标,如若消息未响应,进行轮询操作
1.把业务消息落库,再生成一条消息落库到消息DB用来记录(譬如消息刚创建,正在发送中 status: 0)
缺点:对数据库进行两次持久化
2.生产端发送消息。
3.Broker端收到后,应答至生产端。Confirm Listener
异步监听Broker的应答。
4.应答表明消息投递成功后,去消息DB中抓取到指定的消息记录,更新状态,如status: 1
5.如在3中出现网络不稳定等情况,导致Listener未收到消息成功确认的应答。
那么消息数据库中的status就还是0,而Broker可能是接收到消息的状态。
因此设定一个规则(定时任务),例如消息在落库5分钟后(超时)还是0的状态,就把该条记录抽取出来。
6.重新投递
7.限制一个重试的次数,譬如3次,如果大于3次,即为投递失败,更新status的值
用人工补偿机制去查询消息失败的原因
高并发场景消息的延迟投递,做二次确认,回调检查
Upstream service:生产端
Downstream service:消费端
1:业务消息落库后,发送消息至Broker。
2:紧接着发送第二条延迟(设置延迟时间)检查的消息。
3:消费端监听指定的队列接收到消息进行处理
4:处理完后,生成一条响应消息发送到Broker。
5:由Callback服务去监听该响应消息,收到该响应消息后持久化至消息DB(记录成功状态)。
6:到了延迟时间,延迟发送的消息也被Callback服务的监听器监听到后,去检查消息DB。如果未查询到成功的状态,Callback服务需要做补偿,发起RPC通讯,让生产端重新发送。生产端通过接收到的命令中所带的id去数据库查询该业务消息,再重新发送,即跳转到1。
该方案减少了对数据库的存储,保证了性能
消费端幂等性
避免消息的重复消费
消费端实现幂等性,接收到多条相同的消息,但不会重复消费,即收到多条一样的消息。
方案:
1.唯一ID + 指纹码机制
- 唯一ID + 指纹码(业务规则、时间戳等拼接)机制,利用数据库主键去重
SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID + 指纹码
未查询到就insert
,如有说明已处理过该消息,返回失败- 优点:实现简单
- 缺点:高并发下有数据库写入的性能瓶颈
- 解决方案:根据ID进行分库分表、算法路由
2.利用Redis的原子性
需要考虑的问题:
- 是否要落库数据库,如落库,数据库和缓存如何做到数据的一致性
- 不落库,数据存储在缓存中,如何设置定时同步的策略(可靠性保障)
Confirm确认消息
生产者投递消息后,如果Broker收到消息,则会给生产者一个应答。
生产者进行接收应答,用来确认这条消息是否正常发送到Broker是消息可靠性投递的核心保障。
确认机制的流程图
发送消息与监听应答的消息是异步操作。
确认消息的实现
- 在channel开启确认模式:
channel.confirmSelect();
- 在channel添加监听:
channel.addConfirmListener(ConfirmListener listener);
返回监听成功和失败的结果,对具体结果进行相应的处理(重新发送、记录日志等待后续处理等)
具体代码:
public class ConfirmProducer { private static final String EXCHANGE_NAME = "confirm_exchange"; private static final String ROUTING_KEY = "confirm.key"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.58.129"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 指定消息的投递模式: 确认模式 channel.confirmSelect(); // 发送消息 String msg = "Send message of confirm demo"; channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes()); // 添加确认监听 channel.addConfirmListener(new ConfirmListener() { // 成功 @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("========= Ack ========"); } // 失败 @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("========= Nack ========"); } }); } }
一文带大家快速掌握RabbitMQ!(三)https://developer.aliyun.com/article/1624435