【消息队列】消息中间件RabbitMQ急速入门1

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 【消息队列】消息中间件RabbitMQ急速入门

1.RabbitMQ消息队列和核心概念

1.1.RabbitMQ介绍

RabbitMQ是一个开源的AMQP实现,采用erlang语言编写,支持多种客户端,如:Python、java、C、.NET,用于分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不错。

1.2.RabbitMQ图解

444fb1fc7b2c43ee9f7fa6b3259780f2.jpg

1.3.RabbitMQ核心概念

Broker

RabbitMQ的服务端程序,一个mq节点就是一个broker

Producer生产者

创建Messsage发送到RabbitMQ中

Consumer消费者

消费队列中的消息

Message消息

生产消费的内容,有消息头和消息体,包括多个属性的配置,如RoutingKey

Channel信道

一条支持多路复用的通道,独立的双向数据流通道,可以发布订阅、接收消息、信道是建立在真实的TCP连接内的虚拟连接,复用TCP连接的通道

Connection连接

RabbitMQ的socket链接,他封装了socket协议相关的部分逻辑,一个链接上可以有多个信道进行通信

Exchange交换机

生产者将消息通过信道发送给交换机,交换机将消息路由给一个或者多个队列中,交换机和对列是多对多的关系

RoutingKey路由键

生产者将消息发送给路由的时候,都会指定一个RoutingKey,交换机和队列之间绑定一个BindingKey,交换机同过RoutingKey匹配BindingKey发送到指定的队列

BindingKey绑定key

交换机和队列绑定的key

Virtual host虚拟主机

用于不同的业务模块的逻辑隔离,一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同的名称的Exchange和Queue

默认是:/

自行添加:/dev
    /prod
    /test

1.4.容器化部署RabbitMQ

#拉取镜像
docker pull rabbitmq:management
docker run -d --hostname rabbit_host --name rabbitmq -e RABBITMQ_DEFAULT_USER=lixiang -e RABBITMQ_DEFAULT_PASS=992184xiang. -p 15672:15672 -p 5672:5672 rabbitmq:management
#介绍
-d 以守护进程方式在后台运行
-p 15672:15672 management 界面管理访问端口
-p 5672:5672 amqp 访问端口
--name:指定容器名
--hostname:设定容器的主机名,它会被写到容器内的 /etc/hostname 和 /etc/hosts,作为容器主机IP的别名,并且将显示在容器的bash中
-e 参数
  RABBITMQ_DEFAULT_USER 用户名
  RABBITMQ_DEFAULT_PASS 密码
  • 主要端口介绍:
4369 erlang 发现口
5672 client 端通信口
15672 管理界面 ui 端口
25672 server 间内部通信口
  • 控制台介绍:
  • 默认rabbitmq账号密码 guest/guest,我们这里指定了lixiang和992184xiang.


e1ed279477a246b592f3052861e3db3b.jpg

feb75fde4f64447f908de9a6f072d173.jpg


9eb74c9632e542149a690e1e27d56416.jpg



b44194db0b024ae9af19b6a93a4b1eea.jpg

1.5.Java项目创建整合RabbitMQ

  • 创建SpringBoot项目,加入依赖
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
  </properties>
  <dependencies>
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.10.0</version>
    </dependency>
  </dependencies>

2.RabbitMQ工作队列模型实战

2.1.简单队列


2e8cd379172041b3a26c7b6ef234c563.jpg简单队列就是最简单的一种模式,有生产者,消费者还有队列组成,生产者将消息发送给队列,消费者从队列中读取消息完成消费。

编码实现

生产者

编码流程:创建连接信息->创建信道->设置队列信息->发布消息
核心API:connection.createChannel()、channel.queueDeclare()、channel.basicPublish()
/**
 * 消息的生产者
 */
public class Send {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) {
        //创建连接参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setHost("8.140.116.67");
        factory.setVirtualHost("/dev");
        //创建连接
        try(Connection connection = factory.newConnection()){
            //创建信道
            Channel channel = connection.createChannel();
            //设置队列参数
            /**
             * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
             * 第一个参数:队列名称
             * 第二个参数:持久话配置
             * 第三个参数:是否独占,只能有一个消费者监听队列,发布订阅是独占
             * 第四个参数:当没有消费者消费的时候,消息自动删除
             * 第五个参数:其他参数
             */
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //设置消息内容
            String msg = "Hello word";
            //推送消息
            /**
             * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
             * 第一个参数:交换机名称,简单队列没有交换机,所以指定为空串
             * 第二个参数:RoutingKey,简单队列的routingKey和队列的名称相同
             * 第三个参数:配置信息
             * 第四个参数:消息体内容,转换成字节数组
             */
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("消息成功发送到mq中一条");
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

消费者

编码流程:创建连接->创建信道->设置队列信息->创建消费者回调函数->消费消息
核心API:queueDeclare()、new DefualtConusmer(channel)、channel.basicConsume()
/**
 * 消息的消费者(持续监听)
 */
public class Recv {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建配置连接信息对象
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("8.140.116.67");
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
        //创建连接,消费者一版不自动关闭,因为持续监听
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //设置队列参数
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //消费者消费消息,创建消费者,把信道传进去
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body,"utf-8"));
            }
        };
        //消费消息,自动确认
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 第一个参数:队列名称
         * 第二个参数:自动确认消费完成
         * 第三个参数:消费者的回调函数
         */
        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}

这种简单队列的模式,系统会为每个队列隐式地绑定一个默认交换机,交换机名称为" (AMQP default)",类型为直连 direct,当你手动创建一个队列时,系统会自动将这个队列绑定到一个名称为空的 Direct 类型的交换机上,绑定的路由键 routing key 与队列名称相同。

2.2.工作队列

0647429720284735bc52c7b6aabb4599.jpeg


工作队列:使用于生产者能力大于消费者的场景,增多消费者节点,有两中策略,默认是round robin轮询策略,还有一个是公平策略

轮询策略

在简单队列的基础上,把消费确认机制改成手动确认,并且加上延时。

核心API:channel.basicAck(envelope.getDeliveryTag(),false)

/**
 * 消息的消费者(持续监听)
 */
public class Recv1 {
    private final static String QUEUE_NAME = "work_ms_rr";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建配置连接信息对象
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("8.140.116.67");
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
        //创建连接,消费者一版不自动关闭,因为持续监听
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //设置队列参数
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //消费者消费消息,创建消费者,把信道传进去
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                /**
                 * 模拟消费延迟
                 */
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body,"utf-8"));
                //手工确认消息消费,不是多条确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //消费消息,自动确认
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 第一个参数:队列名称
         * 第二个参数:自动确认消费完成
         * 第三个参数:消费者的回调函数
         */
        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}

公平策略

解决消费者能力不足的问题,降低消费时间问题

channel中设置消费者每次消费一个,消费完在进入下一个

//限制消费者每次消费一个,消费完在消费下一个
channel.basicQos(1);
/**
 * 消息的消费者(持续监听)
 */
public class Recv1 {
    private final static String QUEUE_NAME = "work_ms_fair";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建配置连接信息对象
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("8.140.116.67");
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
        //创建连接,消费者一版不自动关闭,因为持续监听
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //设置队列参数
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //限制消费者每次消费一个,消费完在消费下一个
        channel.basicQos(1);
        //消费者消费消息,创建消费者,把信道传进去
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                /**
                 * 模拟消费延迟
                 */
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("body:"+new String(body,"utf-8"));
                //手工确认消息消费,不是多条确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //消费消息,自动确认
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 第一个参数:队列名称
         * 第二个参数:自动确认消费完成
         * 第三个参数:消费者的回调函数
         */
        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}

3.RabbitMQ交换机和发布订阅模型实战

3.1.RabbitMQ常见的Exchange交换机类型

生产者将消息发送到Exchange,交换机将消息路由到一个或者多个队列中,交换机有四个类型,队列和交换机是多对多的关系。

交换机只负责转发消息,不具备存储消息的能力,如果没有队列和交换机绑定,或者没有符合规则的路由,消息将被丢失。

RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange

交换机类型

  • Direct Exchange定向
  • 将一个队列绑定到交换机上,要求消息的routingKey与路由匹配的bindingKey完全匹配。
  • 例子:如果一个队列绑定到改交换机上要求路由键“aabb”,则只有被标记为aabb的消息才会被转发,不会转发aabb.cc,也不会转发gg.aabb,只会转发aabb。

  • Fanout Exchange广播
  • 只需要简单的将队列绑定到交换机上,一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
  • Fanout Exchange 交换机转发消息是最快的,用于发布订阅、广播形式。
  • 不处理路由键
Topic Exchange主题
  • 主题交换机是一种发布/订阅的模式,结合了直连交换机和扇形交换机的特点。
  • 将路由键和某种模式进行匹配,此时队列上需要绑定在一个模式上
  • 符号#匹配一个或者多个词,符号*匹配一个词
  • 例子:“abc.#”能够匹配abc.gc.bs,“abc.*”只能匹配abc.hs
Headers Exchange(少用)
  • 根据发送的消息内容中的headers属性进行匹配,在绑定Queue与Exchange时指定一组键值对
  • 当消息发送到RabbitMQ时会取到该消息的headers与与Exchange绑定时指定的键值对进行匹配
  • 完全匹配则消息会路由到该队列,否则不会路由到该队列

RabbitMQ默认自带7个交换机

3.2.RabbitMQ发布订阅模型实战

RabbitMQ发布订阅模型

发布-订阅模型中,消息生产者不在直接面对queue,而是直面exchange,都需要经过exchange俩进行消息的转发,不需要指定routingKey,所有发送到同一个fanout交换机的消息都会被监听这个交换机的队列收到。

发布订阅模型应用场景

  • 微信公众号
  • 新浪微博关注

发布订阅模型通过把消息发送给交换机,交换机转发给对应的绑定队列,交换机绑定的队列是排它独占队列,自动删除。

bd4c5012d8354cec88e129fb6e7010df.jpeg

编码实战

生产者

编码流程:创建连接对象->创建信道->绑定交换机->推送消息

核心API:channel.exchangeDeclare(EXCHANGE_NAME,“fanout”)

/**
 * 消息的生产者
 */
public class Send {
    private final static String EXCHANGE_NAME = "exchange_fanout";
    public static void main(String[] args) {
        //创建连接参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setHost("8.140.116.67");
        factory.setVirtualHost("/dev");
        //创建连接
        try(Connection connection = factory.newConnection()){
            //创建信道
            Channel channel = connection.createChannel();
            //绑定交换机,fanout_exchange,广播,第一个参数交换机名称,第二个参数交换机类型
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            //设置消息内容
            String msg = "小滴课堂大课训练营发布";
            //推送消息
            /**
             * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
             * 第一个参数:交换机名称,简单队列没有交换机,所以指定为空串
             * 第二个参数:RoutingKey,简单队列的routingKey和队列的名称相同
             * 第三个参数:配置信息
             * 第四个参数:消息体内容,转换成字节数组
             */
            channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("广播消息发送成功");
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

消费者

创建连接对象->创建信道->绑定交换机->获取队列名称->绑定交换机和队列->消费者消费消息

核心API:channel.queueDeclare().getQueue(),channel.queueBind(queue,EXCHANGE_NAME,“”)

/**
 * 消息的消费者(持续监听)
 */
public class Recv2 {
    private final static String EXCHANGE_NAME = "exchange_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建配置连接信息对象
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("8.140.116.67");
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
        //创建连接,消费者一版不自动关闭,因为持续监听
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
        //获取对列名称
        String queue = channel.queueDeclare().getQueue();
        //绑定交换机和队列,队列名称,交换机名称,routingKey
        channel.queueBind(queue,EXCHANGE_NAME,"");
        //消费者消费消息,创建消费者,把信道传进去
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                /**
                 * 模拟消费延迟
                 */
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("body:"+new String(body,"utf-8"));
                //手工确认消息消费,不是多条确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //消费消息,自动确认
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 第一个参数:队列名称
         * 第二个参数:自动确认消费完成
         * 第三个参数:消费者的回调函数
         */
        channel.basicConsume(queue,false,consumer);
    }
}

3.3.RabbitMQ路由模式实战

路由模式简介

交换机类型:Direct

队列和交换机绑定,需要指定一个路由key(BindingKey)

消息生产者发送消息到交换机,需要指定RoutingKey

交换机根据消息的路由key,转发给对应的队列

例子:日志采集系统,一个队列收集错误日志,一个队列收集全部日志

e90347a6e6144c7382f5a31be19a3685.jpeg

编码实战

/**
 * 消息的生产者
 */
public class Send {
    private final static String EXCHANGE_NAME = "exchange_direct";
    public static void main(String[] args) {
        //创建连接参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setHost("8.140.116.67");
        factory.setVirtualHost("/dev");
        //创建连接
        try(Connection connection = factory.newConnection()){
            //创建信道
            Channel channel = connection.createChannel();
            //绑定交换机,直连交换机direct
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //设置消息内容
            String errMsg = "错误日志";
            String infoMsg = "日常日志";
            String warnMsg = "警告日志";
            //推送消息
            /**
             * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
             * 第一个参数:交换机名称,简单队列没有交换机,所以指定为空串
             * 第二个参数:RoutingKey,简单队列的routingKey和队列的名称相同
             * 第三个参数:配置信息
             * 第四个参数:消息体内容,转换成字节数组
             */
            //发送消息指定routingKey
            channel.basicPublish(EXCHANGE_NAME,"errRoutingKey",null,errMsg.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME,"infoRoutingKey",null,infoMsg.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME,"warnRoutingKey",null,warnMsg.getBytes(StandardCharsets.UTF_8));
            System.out.println("消息成功发送到mq中一条");
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
/**
 * 消息的消费者(持续监听)
 */
public class Recv {
    private final static String EXCHANGE_NAME = "exchange_direct";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建配置连接信息对象
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("8.140.116.67");
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
        //创建连接,消费者一版不自动关闭,因为持续监听
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机,direct直连模式
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
        //获取队列名称
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列
        channel.queueBind(queueName,EXCHANGE_NAME,"errRoutingKey");
        channel.queueBind(queueName,EXCHANGE_NAME,"infoRoutingKey");
        channel.queueBind(queueName,EXCHANGE_NAME,"warnRoutingKey");
        //消费者消费消息,创建消费者,把信道传进去
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body,"utf-8"));
            }
        };
        //消费消息,自动确认
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 第一个参数:队列名称
         * 第二个参数:自动确认消费完成
         * 第三个参数:消费者的回调函数
         */
        channel.basicConsume(queueName,true,consumer);
    }
}
/**
 * 消息的消费者(持续监听)
 */
public class Recv2 {
    private final static String EXCHANGE_NAME = "exchange_direct";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建配置连接信息对象
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("8.140.116.67");
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
        //创建连接,消费者一版不自动关闭,因为持续监听
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机,direct直连模式
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
        //获取队列名称
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列
        channel.queueBind(queueName,EXCHANGE_NAME,"errRoutingKey");
        //消费者消费消息,创建消费者,把信道传进去
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body,"utf-8"));
            }
        };
        //消费消息,自动确认
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 第一个参数:队列名称
         * 第二个参数:自动确认消费完成
         * 第三个参数:消费者的回调函数
         */
        channel.basicConsume(queueName,true,consumer);
    }
}

3.4.RabbitMQ主题模式实战

背景

  • 如果业务很多路由key,怎末维护?
  • topic交换机,支持通配符,功能强大
  • 工作中基本都是topic模式

主题模式简介

交换机是topic,可以实现发布订阅模式fanout和路由模式Direct的功能,更加灵活,支持模式匹配和通配符匹配。

交换机通过通配符转发到相应的队列,*代表一个词,#代表一个或者多个词。

注意:交换机和队列绑定时用的binding使用通配符的路由键,生产者和交换机绑定要使用具体的路由键。

e2f45084bba5455289ff1a56beefce20.jpeg

例子:日志采集系统,一个队列中收集全部的订单日志信息,order.log.#,一个队列中收集群全部的日志消息*.log.#

编码实战

/**
 * 消息的生产者
 */
public class Send {
    private final static String EXCHANGE_NAME = "exchange_topic";
    public static void main(String[] args) {
        //创建连接参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setHost("8.140.116.67");
        factory.setVirtualHost("/dev");
        //创建连接
        try(Connection connection = factory.newConnection()){
            //创建信道
            Channel channel = connection.createChannel();
            //绑定交换机,直连交换机direct
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            //设置消息内容
            String errMsg = "错误日志";
            String infoMsg = "日常日志";
            String warnMsg = "警告日志";
            //推送消息
            /**
             * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
             * 第一个参数:交换机名称,简单队列没有交换机,所以指定为空串
             * 第二个参数:RoutingKey,简单队列的routingKey和队列的名称相同
             * 第三个参数:配置信息
             * 第四个参数:消息体内容,转换成字节数组
             */
            channel.basicPublish(EXCHANGE_NAME,"order.log.error",null,errMsg.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME,"order.log.info",null,infoMsg.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME,"order.log.warn",null,warnMsg.getBytes(StandardCharsets.UTF_8));
            System.out.println("消息成功发送到mq中一条");
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
/**
 * 消息的消费者(持续监听)
 */
public class Recv {
    private final static String EXCHANGE_NAME = "exchange_topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建配置连接信息对象
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("8.140.116.67");
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
        //创建连接,消费者一版不自动关闭,因为持续监听
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机,direct直连模式
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
        //获取队列名称
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列
        channel.queueBind(queueName,EXCHANGE_NAME,"order.log.error");
        //消费者消费消息,创建消费者,把信道传进去
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body,"utf-8"));
            }
        };
        //消费消息,自动确认
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 第一个参数:队列名称
         * 第二个参数:自动确认消费完成
         * 第三个参数:消费者的回调函数
         */
        channel.basicConsume(queueName,true,consumer);
    }
}
/**
 * 消息的消费者(持续监听)
 */
public class Recv2 {
    private final static String EXCHANGE_NAME = "exchange_topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建配置连接信息对象
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("8.140.116.67");
        factory.setUsername("admin");
        factory.setPassword("992184xiang.");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
        //创建连接,消费者一版不自动关闭,因为持续监听
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机,direct直连模式
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
        //获取队列名称
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列
        channel.queueBind(queueName,EXCHANGE_NAME,"*.log.#");
        //消费者消费消息,创建消费者,把信道传进去
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body,"utf-8"));
            }
        };
        //消费消息,自动确认
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 第一个参数:队列名称
         * 第二个参数:自动确认消费完成
         * 第三个参数:消费者的回调函数
         */
        channel.basicConsume(queueName,true,consumer);
    }
}

3.5.RabbitMQ多种工作模式的总结

简单模式

一个生产者、一个消费者,不用指定交换机,使用默认的交换机。

e812f1e690fd4e348fcf7c87dccb5f8e.jpeg

工作模式

一个生产者,多个消费,可以有轮询和公平策略,不用指定交换机,使用默认的交换机。

b43bbbfceaf34494961a841f06635a59.jpeg

发布订阅模式

fanout类型交换机,通过交换机和队列绑定,不用指定绑定的路由键,生产者发送消息到交换机,fanout交换机直接进行转发,消息不用指定routingKey路由键

28312997508c4ce0832db9272a54ec44.jpeg

路由模式

direct类型交换机,交换机和队列绑定,指定的绑定的路由键,生产者发送消息到交换机,交换机根据消息的路由key进行转发到对应的队列,消息指定的RoutingKey要和交换机绑定队列的bindingKey一致进行转发。

e56b6a9a3ea2484eb7ea8708eea07a7c.jpeg

主题模式

topic交换机,交换机和队列绑定,指定绑定的通配符,生产者发送消息到交换机,交换机根据消息的路由key进行转发到对应的队列,消息要制定routingKey路由键

e0666fe5992649f9a349fdf5ddc444d4.jpeg

3.6.JAVA整合RabbitMQ的核心API

创建配置连接对象

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort("5672");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/dev");
//创建连接对象
Connection connection = factory.newConnection();

创建信道

Channel channel = connection.createChannel();

绑定队列

/**
  * 第一个参数:队列的名称 
  * 第二个参数:是否持久化配置
  * 第三个参数:是否独占,发布订阅模型一般独占
  * 第四个参数:自动删除,当没有消费者的消费消息的时候,是否自动删除消息
  * 第五个参数:其他参数
  **/
channel.queueDeclare(queueName,false,false,false,null);

推送消息

/**
  * 第一个参数:交换机名称 
  * 第二个参数:队列名称
  * 第三个参数:配置信息
  * 第四个参数:发送消息的字节数组
  **/
channel.basicPublish("",queueName,null,msg.getBytes(StandardCharsets.UTF_8));

消费消息

        //消费者消费消息,创建消费者,把信道传进去
Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body,"utf-8"));
            }
        };
/**
  * 第一个参数:队列名称 
  * 第二个参数:是否自动消费完成
  * 第三个参数:消费者的回调函数
  **/
channel.basicConsume(queueName,true,consumer);

限制消费者每次只能消费一个

channel.basicQos(1);

手工确认消费消息

channel.basicAck(envelope.getDeliveryTag,false);

绑定交换机

channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);

获取队列名称

String queueName = channel.queueDeclare().getQueue();

绑定交换机和队列

channel.queueBind(queueName,EXCHANGE_NAME,"order.log.error");


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
83 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
23天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
63 5
|
18天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
1月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
63 7
|
21天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
29天前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
1月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
64 4
|
2月前
|
消息中间件 监控 数据处理
解决方案 | 云消息队列RabbitMQ实践
解决方案 | 云消息队列RabbitMQ实践
52 1
|
2月前
|
消息中间件 弹性计算 运维
云消息队列RabbitMQ实践
本评测报告详细分析了阿里云云消息队列 RabbitMQ 版的实践原理、部署体验及核心优势。报告认为其在解决消息积压、脑裂难题及弹性伸缩方面表现优秀,但建议进一步细化架构优化策略和技术细节描述。部署文档详尽,对初学者友好,但仍需加强网络配置和版本兼容性说明。实际部署展示了其高可用性和成本优化能力,适用于高并发消息处理和分布式系统数据同步。为进一步提升方案,建议增加安全性配置指导、性能调优建议及监控告警系统设置。
|
1月前
|
消息中间件 Java Kafka
RabbitMQ 入门
RabbitMQ 入门