108.【RabbitsMQ】(六)

简介: 108.【RabbitsMQ】
(2).创建消费者

1.消费者1

1. 声明队列 queue1
2. 声明交换机 (name,tyepe.isConsist)
3. 队列绑定交换机并指明runtingKey
package com.jsxs.rabbitmq.topics;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("8.130.48.9");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("Lwt121788..");
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String queueName="queue1";
        String exchangeName="topic_exchange";
        String exchangeType="topic";
        channel.queueDeclare(queueName,false,false,false,null);
        channel.exchangeDeclare(exchangeName,exchangeType,true);  // 交换机名字 交换机类型 是否持久化
        channel.queueBind(queueName,exchangeName,"log.*");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String exchange = envelope.getExchange();
                String routingKey = envelope.getRoutingKey();
                String s = new String(body, "UTF-8");
                System.out.println("交换机是: " + exchange + " 路由是: " + routingKey + " 消息是:" + s);
            }
        };
        channel.basicConsume(queueName,true,defaultConsumer);
        //资源不关闭
    }
}

2.消费者2

1. 声明队列 queue1
2. 声明交换机 (name,tyepe.isConsist)
3. 队列绑定交换机并指明runtingKey
package com.jsxs.rabbitmq.topics;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("8.130.48.9");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("Lwt121788..");
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String queueName="queue2";
        String exchangeName="topic_exchange";
        String exchangeType="topic";
        channel.queueDeclare(queueName,false,false,false,null);
        channel.exchangeDeclare(exchangeName,exchangeType,true);  // 交换机名字 交换机类型 是否持久化
        channel.queueBind(queueName,exchangeName,"log.#");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String exchange = envelope.getExchange();
                String routingKey = envelope.getRoutingKey();
                String s = new String(body, "UTF-8");
                System.out.println("交换机是: " + exchange + " 路由是: " + routingKey + " 消息是:" + s);
            }
        };
        channel.basicConsume(queueName,true,defaultConsumer);
        //资源不关闭
    }
}

我们发送的消息是四条,但是我们web上是六条。原因是:我们利用模糊查询,查询到的数据是六条。所以是六条。

(3).简单总结
  1. Topic主题模式需要设置类型为topic的交换机交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列。
  2. Topic主题模式可以实现 Publish/Subscribe发布与订阅模式 和 Routing路由模式 的功能;只是Topic在配置routing key 的时候可以使用通配符,所以显得更加灵活。

8.RabbitMQ入门案列-Headers

header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。

(1).创建生产者
1.声明两个队列
2. 声明一个交换机并指定交换机的类型为 headers
3. 将队列与交换机进行绑定并 绑定hashMap
package com.jsxs.rabbitmq.header;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Hashtable;
import java.util.concurrent.TimeoutException;
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.设置连接的操作
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("8.130.48.9");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("xxx");
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String queueName1="queue1";
        String queueName2="queue2";
        String exchangeName="exchange_header";
        String exchangeType="headers";
        //  声明队列 队列名
        channel.queueDeclare(queueName1,false,false,false,null);
        channel.queueDeclare(queueName2,false,false,false,null);
        //  声明交换机:  交换机的蜜罐子/交换机的类型/是否持久化
        channel.exchangeDeclare(exchangeName,exchangeType,true);  // 类型声明为: header
        //  进行交换机的绑定
        Hashtable<String, Object> header_consumer1 = new Hashtable<>();
        header_consumer1.put("inform_type_consumer1","consumer1");
        Hashtable<String, Object> header_consumer2 = new Hashtable<>();
        header_consumer2.put("inform_type_consumer2","consumer2");
        //  参数:   队列名/交换机名/是否持久化/header
        channel.queueBind(queueName1,exchangeName,"",header_consumer1);
        channel.queueBind(queueName2,exchangeName,"",header_consumer2);
        //  发送十条消息
        for (int i = 0; i < 10; i++) {
            String message="inform to producer: "+i;
            Hashtable<String, Object> headers = new Hashtable<>();
            headers.put("inform_type_consumer1","consumer1");
            headers.put("inform_type_consumer2","consumer2");
            AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
            builder.headers(headers);
            channel.basicPublish(exchangeName,"",builder.build(),message.getBytes(StandardCharsets.UTF_8));
            System.out.println("Send to email: " + message);
        }
        //  关闭资源
        channel.close();
        connection.close();
    }
}
(2).创建消费者
  1. 第一个消费者
package com.jsxs.rabbitmq.header;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Hashtable;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("8.130.48.9");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("xxx");
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String queueName1="queue1";
        String exchangeName="exchange_header";
        Hashtable<String, Object> headers_consumer1 = new Hashtable<>();
        headers_consumer1.put("inform_type_consumer1","consumer1");
        channel.queueBind(queueName1,exchangeName,"",headers_consumer1);
        channel.queueDeclare(queueName1,false,false,false,null);
        //  我们开始对其
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String exchange = envelope.getExchange(); // 交换机
                String routingKey = envelope.getRoutingKey(); //路由
                long deliveryTag = envelope.getDeliveryTag();  //消息Id mq在channel中用来标识消息的id,可用于确认消息已接受
                String s = new String(body, "UTF-8");
                System.out.println("交换机: "+exchange+" 路由:"+routingKey+" 消息ID "+deliveryTag+"---->"+s);
            }
        };
        channel.basicConsume(queueName1,true,defaultConsumer);
    }
}
  1. 第二个消费者
package com.jsxs.rabbitmq.header;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Hashtable;
import java.util.concurrent.TimeoutException;
public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("8.130.48.9");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("xxx");
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String queueName2="queue2";
        String exchangeName="exchange_header";
        Hashtable<String, Object> headers_consumer2 = new Hashtable<>();
        headers_consumer2.put("inform_type_consumer2","consumer2");
        channel.queueBind(queueName2,exchangeName,"",headers_consumer2);
        channel.queueDeclare(queueName2,false,false,false,null);
        //  我们开始对其
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String exchange = envelope.getExchange(); // 交换机
                String routingKey = envelope.getRoutingKey(); //路由
                long deliveryTag = envelope.getDeliveryTag();  //消息Id mq在channel中用来标识消息的id,可用于确认消息已接受
                String s = new String(body, "UTF-8");
                System.out.println("交换机: "+exchange+" 路由:"+routingKey+" 消息ID "+deliveryTag+"---->"+s);
            }
        };
        channel.basicConsume(queueName2,true,defaultConsumer);
    }
}

生产者提供20条

9.RabbitMQ使用场景

(1).解耦、销峰、异步

同步异步的问题(串行)

串行方式:将订单信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端

总共开销时间-> 1+2+3+4;  串行执行
public void makeOrder(){
    //1.发送订单
    //2.发送短信服务
    //3.发送email服务
    //4.发送app服务
}

并行方式 异步线程池

并行方式:将订单信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间

public void test(){
    //异步
    theadpool.submit(new Callable<Object>{
        //1.发送短信服务
    })
    //异步
    theadpool.submit(new Callable<Object>{
        //2.
    })
    //异步
    theadpool.submit(new Callable<Object>{
        //3.
    })
    //异步
    theadpool.submit(new Callable<Object>{
        //4.
    })
}

存在问题

  1. 耦合度高
  2. 需要自己写线程池自己维护成本太高
  3. 出现了消息可能会丢失,需要你自己做消息补偿
  4. 如何保证消息的可靠性你自己写
  5. 如果服务器承载不了,你需要自己去写高可用

异步消息队列的方式

好处:

  1. 完全解耦,用 MQ建立桥接
  2. 独立的线程池和运行模型
  3. 出现了消息可能会丢失,MQ有持久化功能
  4. 如何保证消息的可靠性,死信队列和消息转移等
  5. 如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用

按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20QPS。比串行提高了3倍,比并行提高了两倍

  1. 削峰: 运行效率提高,可以处理更多的请求,减轻服务器压力。
  2. 解耦: 独立的线程池,灵活性更高一些。
  3. 异步: 异步执行
(2).高内聚、低耦合

好处:

  1. 完全解耦,用 MQ建立桥接
  2. 有独立的线程池和运行模型
  3. 出现了消息可能会丢失,MQ有持久化功能
  4. 如何保证消息的可靠性,死信队列和消息转移等
  5. 如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用

按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20QPS。比串行提高了3倍,比并行提高了两倍.

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
12月前
|
Rust 安全 前端开发
为什么 Rust 备受开发者青睐?
在本篇文章中,作者介绍了 Rust 是什么,它的历史以及 Rust 是如何备受开发者和行业的青睐。希望本篇文章能帮助读者对 Rust 这门语言有一个大概的了解。
137687 43
|
12月前
|
Java
【java每日一题,数论】最大公约数,最大质因数,欧拉筛
【java每日一题,数论】最大公约数,最大质因数,欧拉筛
|
12月前
|
消息中间件 存储 Java
108.【RabbitsMQ】(二)
108.【RabbitsMQ】
78 0
|
12月前
|
Java
【Java每日一题,01背包问题】 kkksc03考前临时抱佛脚
【Java每日一题,01背包问题】 kkksc03考前临时抱佛脚
|
12月前
|
机器学习/深度学习 人工智能 自然语言处理
|
12月前
|
消息中间件
|
12月前
|
存储 JSON Java
109.【Java最全腾讯地图接口】(四)
109.【Java最全腾讯地图接口】
109 1
|
12月前
|
存储 关系型数据库 MySQL
2023年MySQL实战核心技术前言篇(前言可能比较枯燥,下一篇开始后就会让你热血沸腾)
2023年MySQL实战核心技术前言篇(前言可能比较枯燥,下一篇开始后就会让你热血沸腾)
133 1
|
12月前
|
弹性计算 开发者
阿里云弹性计算经济型e实例火爆发布!
性价比首选,特惠云服务器,个人开发者、学生、小微企业大众的福音燃情上线啦!价格低至0.5元/天!!! 全用户群低价首选,续费不涨价!!
|
12月前
|
存储 关系型数据库 数据库
数据库内核那些事|PolarDB X-Engine:如何构建1/10成本的事务存储引擎?
X-Engine引擎是PolarDB为用户提供的低成本,高性价比的解决方案,LSM-tree分层存储结合标准zstd压缩,在性能和成本做到了很好的平衡。在标准sysbench场景下,存储空间相比InnoDB引擎减少60%,读写性能降低10-20%。
数据库内核那些事|PolarDB X-Engine:如何构建1/10成本的事务存储引擎?