一文带大家快速掌握RabbitMQ!(二)

简介: 一文带大家快速掌握RabbitMQ!

一文带大家快速掌握RabbitMQ!(一)https://developer.aliyun.com/article/1624433


入门使用

引入RabbitMQ依赖:

<dependency>
 <groupId>com.rabbitmq</groupId>
 <artifactId>amqp-client</artifactId>
 <version>3.6.5</version>
</dependency>

创建一个生产者

public class Producer {
    private static final String QUEUE_NAME = "test01";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂并配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.58.129");
        connectionFactory.setPort(5672);
        // 设置虚拟机
        connectionFactory.setVirtualHost("/test");
        // 2. 通过连接工厂建立连接
        Connection connection = connectionFactory.newConnection();
        // 3. 通过connection创建Channel
        Channel channel = connection.createChannel();
        // 4. 通过Channel发送数据 (exchange, routingKey, props, body)
        // 不指定Exchange时, 交换机默认是AMQP default, 此时就看RoutingKey
        // RoutingKey要等于队列名才能被路由, 否则消息会被删除
        for (int i = 0; i < 5; i++) {
            String msg = "Learn For RabbitMQ-" + i;
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            System.out.println("Send message : " + msg);
        }
        // 5.关闭连接
        channel.close();
        connection.close();
    }
}

创建一个消费者

public class Consumer {
    private static final String QUEUE_NAME = "test01";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂并配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.58.129");
        connectionFactory.setPort(5672);
        // 设置虚拟机
        connectionFactory.setVirtualHost("/test");
        // 2. 通过连接工厂建立连接
        Connection connection = connectionFactory.newConnection();
        // 3. 通过connection创建Channel
        Channel channel = connection.createChannel();
        // 4. 声明队列 (queue, durable, exclusive, autoDelete, args)
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 5. 创建消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            /**
             * 获取消息 (监听到有消息时调用)
             * @param consumerTag 消费者标签, 在监听队列时可以设置autoAck为false,即手动确认(避免消息的丢失), 消息唯一性处理
             * @param envelope 信封
             * @param properties 消息的属性
             * @param body 消息的内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("Received message : " + msg);
            }
        };
        // 6. 设置Channel, 监听队列(String queue, boolean autoAck,Consumer callback)
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

参数:

  • queue:队列名称
  • durable:持久化,true 即使服务重启也不会被删除
  • exclusive:独占,true 队列只能使用一个连接,连接断开队列删除
  • autoDelete:自动删除,true 脱离了Exchange(连接断开),即队列没有Exchange关联时,自动删除
  • arguments:扩展参数
  • autoAck:是否自动签收(回执)

不指定Exchange时,交换机默认是AMQP default,此时就看RoutingKey,RoutingKey要等于队列名才能被路由,否则消息会被删除

交换机属性

Name:交换机名称

Type:交换机类型—— direct、topic、fanout、header

Durability:是否需要持久化,true为持久化

Auto Delete:当最后一个绑定到Exchange上的队列删除后,即Exchange上没有队列绑定,自动删除该Exhcange

Internal:当前Exchange是否用于RabbitMQ内部使用,大多数使用默认False

Arguments:扩展参数,用于扩展AMQP协议定制化使用

Direct Exchange


// Consumer
 // 声明交换机: 
 // (String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object) arguments)
 channel.exchangeDeclare("exchangeName", BuiltinExchangeType.DIRECT, true, false, false, null);
 // 声明队列 (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object) args)
 channel.queueDeclare("queueName", true, false, false, null);
 
 // 建立绑定关系:
 channel.queueBind("queueName", "exchangeName", "routingKey");
 // ===================================================================
 // Producer
 // 发送消息 (String exchange, String routingKey, BasicProperties props, Bytes[] body)
 channel.basicPublish("exchangeName", "routingKey", null, "msg".getBytes());

Topic Exchange


// Consumer
 // 声明交换机: 
 // (String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object) arguments)
 channel.exchangeDeclare("exchangeName", BuiltinExchangeType.TOPIC, true, false, false, null);
 // 声明队列 (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object) args)
 channel.queueDeclare("queueName", true, false, false, null);
 
 // 建立绑定关系:
 channel.queueBind("queueName", "exchangeName", "routingKey.#");
 // ===================================================================
 // Producer
 // 发送消息 (String exchange, String routingKey, BasicProperties props, Bytes[] body)
 channel.basicPublish("exchangeName", "routingKey.hi", null, "msg".getBytes());
 channel.basicPublish("exchangeName", "routingKey.save", null, "msg".getBytes());
 channel.basicPublish("exchangeName", "routingKey.save.hi", null, "msg".getBytes());

因为使用了模糊匹配的"#",可以匹配到发送的三条消息。因此可以收到三条消息

Fanout Exchange


// Consumer
 // 声明交换机: 
 // (String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object) arguments)
 channel.exchangeDeclare("exchangeName", BuiltinExchangeType.FANOUT, true, false, false, null);
 // 声明队列 (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object) args)
 channel.queueDeclare("queueName", true, false, false, null);
 
 // 建立绑定关系: 
 //(不设置routingKey, 这里为空)
 channel.queueBind("queueName", "exchangeName", "");
 // ===================================================================
 // Producer
 // 发送消息 (String exchange, String routingKey, BasicProperties props, Bytes[] body)
 // 同样routingKey为空 (也可以是任意字符串, 因为fanout并不依据routingKey)
 channel.basicPublish("exchangeName", "", null, "msg".getBytes());

高级特性

可靠性投递

什么是生产端的可靠性投递

  • 保障消息的成功发出
  • 保障MQ节点成功接收
  • 发送端收到MQ节点(Broker)确认应答(已收到)
  • 完善消息进行补偿机制

可靠性投递的方案一

消息落库(持久化至数据库),对消息状态进行打标,如若消息未响应,进行轮询操作

1.把业务消息落库,再生成一条消息落库到消息DB用来记录(譬如消息刚创建,正在发送中 status: 0)

缺点:对数据库进行两次持久化

2.生产端发送消息。

3.Broker端收到后,应答至生产端。Confirm Listener异步监听Broker的应答。

4.应答表明消息投递成功后,去消息DB中抓取到指定的消息记录,更新状态,如status: 1

5.如在3中出现网络不稳定等情况,导致Listener未收到消息成功确认的应答。

那么消息数据库中的status就还是0,而Broker可能是接收到消息的状态。

因此设定一个规则(定时任务),例如消息在落库5分钟后(超时)还是0的状态,就把该条记录抽取出来。

6.重新投递

7.限制一个重试的次数,譬如3次,如果大于3次,即为投递失败,更新status的值

用人工补偿机制去查询消息失败的原因

高并发场景消息的延迟投递,做二次确认,回调检查

Upstream service:生产端

Downstream service:消费端

1:业务消息落库后,发送消息至Broker。

2:紧接着发送第二条延迟(设置延迟时间)检查的消息。

3:消费端监听指定的队列接收到消息进行处理

4:处理完后,生成一条响应消息发送到Broker。

5:由Callback服务去监听该响应消息,收到该响应消息后持久化至消息DB(记录成功状态)。

6:到了延迟时间,延迟发送的消息也被Callback服务的监听器监听到后,去检查消息DB。如果未查询到成功的状态,Callback服务需要做补偿,发起RPC通讯,让生产端重新发送。生产端通过接收到的命令中所带的id去数据库查询该业务消息,再重新发送,即跳转到1。

该方案减少了对数据库的存储,保证了性能

消费端幂等性

避免消息的重复消费

消费端实现幂等性,接收到多条相同的消息,但不会重复消费,即收到多条一样的消息。

方案:

1.唯一ID + 指纹码机制

  • 唯一ID + 指纹码(业务规则、时间戳等拼接)机制,利用数据库主键去重
  • SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID + 指纹码 未查询到就insert,如有说明已处理过该消息,返回失败
  • 优点:实现简单
  • 缺点:高并发下有数据库写入的性能瓶颈
  • 解决方案:根据ID进行分库分表、算法路由

2.利用Redis的原子性

需要考虑的问题:

  • 是否要落库数据库,如落库,数据库和缓存如何做到数据的一致性
  • 不落库,数据存储在缓存中,如何设置定时同步的策略(可靠性保障)

Confirm确认消息

生产者投递消息后,如果Broker收到消息,则会给生产者一个应答。

生产者进行接收应答,用来确认这条消息是否正常发送到Broker是消息可靠性投递的核心保障。

确认机制的流程图

发送消息与监听应答的消息是异步操作。

确认消息的实现

  1. 在channel开启确认模式:channel.confirmSelect();
  2. 在channel添加监听:channel.addConfirmListener(ConfirmListener listener); 返回监听成功和失败的结果,对具体结果进行相应的处理(重新发送、记录日志等待后续处理等)

具体代码:

public class ConfirmProducer {
    private static final String EXCHANGE_NAME = "confirm_exchange";
    private static final String ROUTING_KEY = "confirm.key";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.58.129");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        // 指定消息的投递模式: 确认模式
        channel.confirmSelect();
        // 发送消息
        String msg = "Send message of confirm demo";
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes());
        // 添加确认监听
        channel.addConfirmListener(new ConfirmListener() {
            // 成功
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("========= Ack ========");
            }
            // 失败
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("========= Nack ========");
            }
        });
    }
}


一文带大家快速掌握RabbitMQ!(三)https://developer.aliyun.com/article/1624435

相关文章
|
2天前
|
存储 缓存 关系型数据库
MySQL事务日志-Redo Log工作原理分析
事务的隔离性和原子性分别通过锁和事务日志实现,而持久性则依赖于事务日志中的`Redo Log`。在MySQL中,`Redo Log`确保已提交事务的数据能持久保存,即使系统崩溃也能通过重做日志恢复数据。其工作原理是记录数据在内存中的更改,待事务提交时写入磁盘。此外,`Redo Log`采用简单的物理日志格式和高效的顺序IO,确保快速提交。通过不同的落盘策略,可在性能和安全性之间做出权衡。
1517 4
|
29天前
|
弹性计算 人工智能 架构师
阿里云携手Altair共拓云上工业仿真新机遇
2024年9月12日,「2024 Altair 技术大会杭州站」成功召开,阿里云弹性计算产品运营与生态负责人何川,与Altair中国技术总监赵阳在会上联合发布了最新的“云上CAE一体机”。
阿里云携手Altair共拓云上工业仿真新机遇
|
5天前
|
人工智能 Rust Java
10月更文挑战赛火热启动,坚持热爱坚持创作!
开发者社区10月更文挑战,寻找热爱技术内容创作的你,欢迎来创作!
501 19
|
2天前
|
存储 SQL 关系型数据库
彻底搞懂InnoDB的MVCC多版本并发控制
本文详细介绍了InnoDB存储引擎中的两种并发控制方法:MVCC(多版本并发控制)和LBCC(基于锁的并发控制)。MVCC通过记录版本信息和使用快照读取机制,实现了高并发下的读写操作,而LBCC则通过加锁机制控制并发访问。文章深入探讨了MVCC的工作原理,包括插入、删除、修改流程及查询过程中的快照读取机制。通过多个案例演示了不同隔离级别下MVCC的具体表现,并解释了事务ID的分配和管理方式。最后,对比了四种隔离级别的性能特点,帮助读者理解如何根据具体需求选择合适的隔离级别以优化数据库性能。
179 1
|
8天前
|
JSON 自然语言处理 数据管理
阿里云百炼产品月刊【2024年9月】
阿里云百炼产品月刊【2024年9月】,涵盖本月产品和功能发布、活动,应用实践等内容,帮助您快速了解阿里云百炼产品的最新动态。
阿里云百炼产品月刊【2024年9月】
|
21天前
|
存储 关系型数据库 分布式数据库
GraphRAG:基于PolarDB+通义千问+LangChain的知识图谱+大模型最佳实践
本文介绍了如何使用PolarDB、通义千问和LangChain搭建GraphRAG系统,结合知识图谱和向量检索提升问答质量。通过实例展示了单独使用向量检索和图检索的局限性,并通过图+向量联合搜索增强了问答准确性。PolarDB支持AGE图引擎和pgvector插件,实现图数据和向量数据的统一存储与检索,提升了RAG系统的性能和效果。
|
9天前
|
Linux 虚拟化 开发者
一键将CentOs的yum源更换为国内阿里yum源
一键将CentOs的yum源更换为国内阿里yum源
451 5
|
7天前
|
存储 人工智能 搜索推荐
数据治理,是时候打破刻板印象了
瓴羊智能数据建设与治理产品Datapin全面升级,可演进扩展的数据架构体系为企业数据治理预留发展空间,推出敏捷版用以解决企业数据量不大但需构建数据的场景问题,基于大模型打造的DataAgent更是为企业用好数据资产提供了便利。
314 2
|
23天前
|
人工智能 IDE 程序员
期盼已久!通义灵码 AI 程序员开启邀测,全流程开发仅用几分钟
在云栖大会上,阿里云云原生应用平台负责人丁宇宣布,「通义灵码」完成全面升级,并正式发布 AI 程序员。
|
25天前
|
机器学习/深度学习 算法 大数据
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
2024“华为杯”数学建模竞赛,对ABCDEF每个题进行详细的分析,涵盖风电场功率优化、WLAN网络吞吐量、磁性元件损耗建模、地理环境问题、高速公路应急车道启用和X射线脉冲星建模等多领域问题,解析了问题类型、专业和技能的需要。
2608 22
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析