java消息队列基础和RabbitMQ相关概念(一)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: java消息队列基础和RabbitMQ相关概念

java消息队列基础和RabbitMQ相关概念


什么是消息队列

消息队列一般简称为 MQ (Messges Queue),是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成,是在消息的传输过程中保存消息的容器。消息队列本质上是一个队列,而队列中存放的是一个个消息。

队列是一个数据结构,具有先进先出的特点。而消息队列就是将消息放到队列里,用队列做存储消息的介质。消息的发送放称为生产者,消息的接收方称为消费者。

消息队列由 Broker(消息服务器,核心部分)、Producer(消息生产者)、Consumer(消息消费者)、Topic(主题)、Queue(队列)和Message(消息体)组成。

消息队列特点

消息队列有三个作用,分别是削峰、解耦和异步。

流量削峰:主要用于在高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。

假设系统只能处理1000个请求,但这时突然来了3000个请求,如果不加以限制就会造成系统瘫痪。使用消息队列做缓冲,将多余的请求存放在消息队列中,等系统根据自己处理请求的能力去消息队列去。

应用解耦:主要用于当一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理时,只需要主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。

假设某个服务 A 需要调用服务 B,但是服务 B 突然出现问题,这样会导致服务 A 也会出现问题。如果使用消息队列,当服务 A 执行完成之后,发送一条消息到队列中,服务 B 读取到这条消息,那么它立刻开始进行业务的执行。

异步通信:主业务执行结束后从属业务通过MQ,异步执行,减低业务的响应时间,提高用户体验。

假设有一个业务,要先执行服务 A ,然后服务 A 去调用服务 B ,当服务 B 完成之后,服务 A 调用服务 C,这个业务需要一步步走下去。当使用了消息队列之后,服务 A 完成之后,可以同时执行服务 B 和 服务 C ,这样就减低业务的响应时间,提高用户体验。

RabbitMQ

MQ优势:

  • 应用解耦:提高系统容错性和可维护性
  • 异步提速:提升用户体验和系统吞吐量
  • 削峰填谷:提高系统稳定性

MQ劣势:

  • 系统可用性降低
  • 系统复杂度提高
  • 一致性问题

使用MQ需要满足什么条件

  1. 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。
  2. 容许短暂的不一致性。
  3. 确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。

AMQP

AMQP,即 Advanced Message Queuing Protocol (高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP规范发布。类比HTTP。

RabbitMQ相关概念

  • Exchange: message到达 broker的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有: direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最终被送到这里等待consumer取走
  • Binding: exchange和queue之间的虚拟连接,binding 中可以包含routing keyBinding信息被保存到exchange 中的查询表中,用于message 的分发依据

RabbitMQ提供了6种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing路由模式、Topics主题模式、RPC远程调用模式(远程调用,不太算MQ;暂不作介绍)。

工作模式

1.简单模式

image.png

/*
发送消息
*/
public class Producer_HelloWorld {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("172.16.98.133");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/itcast");//虚拟机 默认值/
        factory.setUsername("heima");//用户名 默认 guest
        factory.setPassword("heima");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        参数:
            1. queue:队列名称
            2. durable:是否持久化,当mq重启之后,还在
            3. exclusive:
                * 是否独占。只能有一个消费者监听这队列
                * 当Connection关闭时,是否删除队列
                *
            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
            5. arguments:参数。
         */
        //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("hello_world",true,false,false,null);
        /*
        basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
        参数:
            1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
            2. routingKey:路由名称
            3. props:配置信息
            4. body:发送消息数据
         */
        String body = "hello rabbitmq~~~";
        //6. 发送消息
        channel.basicPublish("","hello_world",null,body.getBytes());
        //7.释放资源
      channel.close();
      connection.close();
    }
}
/*
接受消息
*/
public class Consumer_HelloWorld {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("172.16.98.133");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/itcast");//虚拟机 默认值/
        factory.setUsername("heima");//用户名 默认 guest
        factory.setPassword("heima");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        参数:
            1. queue:队列名称
            2. durable:是否持久化,当mq重启之后,还在
            3. exclusive:
                * 是否独占。只能有一个消费者监听这队列
                * 当Connection关闭时,是否删除队列
                *
            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
            5. arguments:参数。
         */
        //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("hello_world",true,false,false,null);
        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调对象
         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回调方法,当收到消息后,会自动执行该方法
                1. consumerTag:标识
                2. envelope:获取一些信息,交换机,路由key...
                3. properties:配置信息
                4. body:数据
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
            }
        };
        channel.basicConsume("hello_world",true,consumer);
        //关闭资源?不要
    }
}


2.work queues工作队列模式

image.png

Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

Work Queues与入门程序的简单模式的代码几乎是一样的。可以完全复制,并多复制一个消费者进行多个消费者同时对消费消息的测试。

生产者:发送消息和简单模式差别不大

消费者:接受消息需要创建两个,代码差别不大

总结

  1. 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
  2. Work Queues对于任务过重或任务较多情况使用工作队队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可。

3.Pub/Sub订阅模式

image.png

在订阅模型中,多了一个Exchange角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • C:消费者,消息的接收者,会一直等待消息到来
  • Queue:消息队列,接收消息、缓存消息
  • Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
  • Fanout:广播,将消息交给所有绑定到交换机的队列
  • Direct:定向,把消息交给符合指定routing key 的队列
  • Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
  • Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
/**
 * 发送消息
 */
public class Producer_PubSub {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("172.16.98.133");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/itcast");//虚拟机 默认值/
        factory.setUsername("heima");//用户名 默认 guest
        factory.setPassword("heima");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
       /*
       exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
       参数:
        1. exchange:交换机名称
        2. type:交换机类型
            DIRECT("direct"),:定向
            FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
            TOPIC("topic"),通配符的方式
            HEADERS("headers");参数匹配
        3. durable:是否持久化
        4. autoDelete:自动删除
        5. internal:内部使用。 一般false
        6. arguments:参数
        */
       String exchangeName = "test_fanout";
        //5. 创建交换机
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
        //6. 创建队列
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7. 绑定队列和交换机
        /*
        queueBind(String queue, String exchange, String routingKey)
        参数:
            1. queue:队列名称
            2. exchange:交换机名称
            3. routingKey:路由键,绑定规则
                如果交换机的类型为fanout ,routingKey设置为""
         */
        channel.queueBind(queue1Name,exchangeName,"");
        channel.queueBind(queue2Name,exchangeName,"");
        String body = "日志信息:张三调用了findAll方法...日志级别:info...";
        //8. 发送消息
        channel.basicPublish(exchangeName,"",null,body.getBytes());
        //9. 释放资源
        channel.close();
        connection.close();
    }
}
/*
接受消息1
*/
public class Consumer_PubSub1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("172.16.98.133");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/itcast");//虚拟机 默认值/
        factory.setUsername("heima");//用户名 默认 guest
        factory.setPassword("heima");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调对象
         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回调方法,当收到消息后,会自动执行该方法
                1. consumerTag:标识
                2. envelope:获取一些信息,交换机,路由key...
                3. properties:配置信息
                4. body:数据
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              /*  System.out.println("consumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);*/
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息打印到控制台.....");
            }
        };
        channel.basicConsume(queue1Name,true,consumer);
        //关闭资源?不要
    }
}

接受消息2的消费者只需要在channel.basicConsume(queue2Name,true,consumer);更改队列名称就行

4.Routing路由模式

image.png

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
  • 消息的发送方在向Exchange '发送消息时,也必须指定消息的 RoutingKey
  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey 与消息的 Routing key完全一致,才会接收到消息
/**
 * 发送消息
 */
public class Producer_Routing {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("172.16.98.133");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/itcast");//虚拟机 默认值/
        factory.setUsername("heima");//用户名 默认 guest
        factory.setPassword("heima");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
       /*
       exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
       参数:
        1. exchange:交换机名称
        2. type:交换机类型
            DIRECT("direct"),:定向
            FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
            TOPIC("topic"),通配符的方式
            HEADERS("headers");参数匹配
        3. durable:是否持久化
        4. autoDelete:自动删除
        5. internal:内部使用。 一般false
        6. arguments:参数
        */
       String exchangeName = "test_direct";
        //5. 创建交换机
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
        //6. 创建队列
        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Na me,true,false,false,null);
        //7. 绑定队列和交换机
        /*
        queueBind(String queue, String exchange, String routingKey)
        参数:
            1. queue:队列名称
            2. exchange:交换机名称
            3. routingKey:路由键,绑定规则
                如果交换机的类型为fanout ,routingKey设置为""
         */
        //队列1绑定 error
        channel.queueBind(queue1Name,exchangeName,"error");
        //队列2绑定 info  error  warning
        channel.queueBind(queue2Name,exchangeName,"info");
        channel.queueBind(queue2Name,exchangeName,"error");
        channel.queueBind(queue2Name,exchangeName,"warning");
        String body = "日志信息:张三调用了delete方法...出错误了。。。日志级别:error...";
        //8. 发送消息
        channel.basicPublish(exchangeName,"warning",null,body.getBytes());
        //9. 释放资源
        channel.close();
        connection.close();
    }
}


消费者只需要修改队列名称就可以和上一次差别不大

总结: Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列

5.Topics通配符模式

image.png

*是一个 #是0或者多个

/**
 * 发送消息
 */
public class Producer_Topics {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("172.16.98.133");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/itcast");//虚拟机 默认值/
        factory.setUsername("heima");//用户名 默认 guest
        factory.setPassword("heima");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
       /*
       exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
       参数:
        1. exchange:交换机名称
        2. type:交换机类型
            DIRECT("direct"),:定向
            FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
            TOPIC("topic"),通配符的方式
            HEADERS("headers");参数匹配
        3. durable:是否持久化
        4. autoDelete:自动删除
        5. internal:内部使用。 一般false
        6. arguments:参数
        */
       String exchangeName = "test_topic";
        //5. 创建交换机
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
        //6. 创建队列
        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7. 绑定队列和交换机
        /*
        queueBind(String queue, String exchange, String routingKey)
        参数:
            1. queue:队列名称
            2. exchange:交换机名称
            3. routingKey:路由键,绑定规则
                如果交换机的类型为fanout ,routingKey设置为""
         */
        // routing key  系统的名称.日志的级别。
        //=需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
        channel.queueBind(queue1Name,exchangeName,"#.error");
        channel.queueBind(queue1Name,exchangeName,"order.*");
        channel.queueBind(queue2Name,exchangeName,"*.*");
        String body = "日志信息:张三调用了findAll方法...日志级别:info...";
        //8. 发送消息
        channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());
        //9. 释放资源
        channel.close();
        connection.close();
    }
}


Topic主题模式可以实现 Pub/Sub发布与订阅模式和Routing 路由模式的功能,只是Topic在配置routing key的时候可以使用通配符,显得更加灵活。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 存储 Java
RocketMQ文件刷盘机制深度解析与Java模拟实现
【11月更文挑战第22天】在现代分布式系统中,消息队列(Message Queue, MQ)作为一种重要的中间件,扮演着连接不同服务、实现异步通信和消息解耦的关键角色。Apache RocketMQ作为一款高性能的分布式消息中间件,广泛应用于实时数据流处理、日志流处理等场景。为了保证消息的可靠性,RocketMQ引入了一种称为“刷盘”的机制,将消息从内存写入到磁盘中,确保消息持久化。本文将从底层原理、业务场景、概念、功能点等方面深入解析RocketMQ的文件刷盘机制,并使用Java模拟实现类似的功能。
46 3
|
3月前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
166 7
|
3月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
3月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
110 10
|
2月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
3月前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
3月前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
3月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
80 4
|
2天前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
34 17
|
13天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者