1.RabbitMQ消息队列和核心概念
1.1.RabbitMQ介绍
RabbitMQ是一个开源的AMQP实现,采用erlang语言编写,支持多种客户端,如:Python、java、C、.NET,用于分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不错。
1.2.RabbitMQ图解
1.3.RabbitMQ核心概念
Broker
RabbitMQ的服务端程序,一个mq节点就是一个broker
Producer生产者
创建Messsage发送到RabbitMQ中
Consumer消费者
消费队列中的消息
Message消息
生产消费的内容,有消息头和消息体,包括多个属性的配置,如RoutingKey
Channel信道
一条支持多路复用的通道,独立的双向数据流通道,可以发布订阅、接收消息、信道是建立在真实的TCP连接内的虚拟连接,复用TCP连接的通道
Connection连接
RabbitMQ的socket链接,他封装了socket协议相关的部分逻辑,一个链接上可以有多个信道进行通信
Exchange交换机
生产者将消息通过信道发送给交换机,交换机将消息路由给一个或者多个队列中,交换机和对列是多对多的关系
RoutingKey路由键
生产者将消息发送给路由的时候,都会指定一个RoutingKey,交换机和队列之间绑定一个BindingKey,交换机同过RoutingKey匹配BindingKey发送到指定的队列
BindingKey绑定key
交换机和队列绑定的key
Virtual host虚拟主机
用于不同的业务模块的逻辑隔离,一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同的名称的Exchange和Queue
默认是:/
自行添加:/dev /prod /test
1.4.容器化部署RabbitMQ
#拉取镜像 docker pull rabbitmq:management docker run -d --hostname rabbit_host --name rabbitmq -e RABBITMQ_DEFAULT_USER=lixiang -e RABBITMQ_DEFAULT_PASS=992184xiang. -p 15672:15672 -p 5672:5672 rabbitmq:management #介绍 -d 以守护进程方式在后台运行 -p 15672:15672 management 界面管理访问端口 -p 5672:5672 amqp 访问端口 --name:指定容器名 --hostname:设定容器的主机名,它会被写到容器内的 /etc/hostname 和 /etc/hosts,作为容器主机IP的别名,并且将显示在容器的bash中 -e 参数 RABBITMQ_DEFAULT_USER 用户名 RABBITMQ_DEFAULT_PASS 密码
- 主要端口介绍:
4369 erlang 发现口 5672 client 端通信口 15672 管理界面 ui 端口 25672 server 间内部通信口
- 控制台介绍:
- 默认rabbitmq账号密码 guest/guest,我们这里指定了lixiang和992184xiang.
1.5.Java项目创建整合RabbitMQ
- 创建SpringBoot项目,加入依赖
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.10.0</version> </dependency> </dependencies>
2.RabbitMQ工作队列模型实战
2.1.简单队列
简单队列就是最简单的一种模式,有生产者,消费者还有队列组成,生产者将消息发送给队列,消费者从队列中读取消息完成消费。
编码实现
生产者
编码流程:创建连接信息->创建信道->设置队列信息->发布消息 核心API:connection.createChannel()、channel.queueDeclare()、channel.basicPublish()
/** * 消息的生产者 */ public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) { //创建连接参数 ConnectionFactory factory = new ConnectionFactory(); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("992184xiang."); factory.setHost("8.140.116.67"); factory.setVirtualHost("/dev"); //创建连接 try(Connection connection = factory.newConnection()){ //创建信道 Channel channel = connection.createChannel(); //设置队列参数 /** * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) * 第一个参数:队列名称 * 第二个参数:持久话配置 * 第三个参数:是否独占,只能有一个消费者监听队列,发布订阅是独占 * 第四个参数:当没有消费者消费的时候,消息自动删除 * 第五个参数:其他参数 */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); //设置消息内容 String msg = "Hello word"; //推送消息 /** * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) * 第一个参数:交换机名称,简单队列没有交换机,所以指定为空串 * 第二个参数:RoutingKey,简单队列的routingKey和队列的名称相同 * 第三个参数:配置信息 * 第四个参数:消息体内容,转换成字节数组 */ channel.basicPublish("",QUEUE_NAME,null,msg.getBytes(StandardCharsets.UTF_8)); System.out.println("消息成功发送到mq中一条"); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
消费者
编码流程:创建连接->创建信道->设置队列信息->创建消费者回调函数->消费消息 核心API:queueDeclare()、new DefualtConusmer(channel)、channel.basicConsume()
/** * 消息的消费者(持续监听) */ public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { //创建配置连接信息对象 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("8.140.116.67"); factory.setUsername("admin"); factory.setPassword("992184xiang."); factory.setVirtualHost("/dev"); factory.setPort(5672); //创建连接,消费者一版不自动关闭,因为持续监听 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //设置队列参数 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //消费者消费消息,创建消费者,把信道传进去 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:"+consumerTag); System.out.println("envelope:"+envelope); System.out.println("properties:"+properties); System.out.println("body:"+new String(body,"utf-8")); } }; //消费消息,自动确认 /** * basicConsume(String queue, boolean autoAck, Consumer callback) * 第一个参数:队列名称 * 第二个参数:自动确认消费完成 * 第三个参数:消费者的回调函数 */ channel.basicConsume(QUEUE_NAME,true,consumer); } }
这种简单队列的模式,系统会为每个队列隐式地绑定一个默认交换机,交换机名称为" (AMQP default)",类型为直连 direct,当你手动创建一个队列时,系统会自动将这个队列绑定到一个名称为空的 Direct 类型的交换机上,绑定的路由键 routing key 与队列名称相同。
2.2.工作队列
工作队列:使用于生产者能力大于消费者的场景,增多消费者节点,有两中策略,默认是round robin轮询策略,还有一个是公平策略
轮询策略
在简单队列的基础上,把消费确认机制改成手动确认,并且加上延时。
核心API:channel.basicAck(envelope.getDeliveryTag(),false)
/** * 消息的消费者(持续监听) */ public class Recv1 { private final static String QUEUE_NAME = "work_ms_rr"; public static void main(String[] args) throws IOException, TimeoutException { //创建配置连接信息对象 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("8.140.116.67"); factory.setUsername("admin"); factory.setPassword("992184xiang."); factory.setVirtualHost("/dev"); factory.setPort(5672); //创建连接,消费者一版不自动关闭,因为持续监听 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //设置队列参数 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //消费者消费消息,创建消费者,把信道传进去 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { /** * 模拟消费延迟 */ try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("consumerTag:"+consumerTag); System.out.println("envelope:"+envelope); System.out.println("properties:"+properties); System.out.println("body:"+new String(body,"utf-8")); //手工确认消息消费,不是多条确认 channel.basicAck(envelope.getDeliveryTag(),false); } }; //消费消息,自动确认 /** * basicConsume(String queue, boolean autoAck, Consumer callback) * 第一个参数:队列名称 * 第二个参数:自动确认消费完成 * 第三个参数:消费者的回调函数 */ channel.basicConsume(QUEUE_NAME,false,consumer); } }
公平策略
解决消费者能力不足的问题,降低消费时间问题
channel中设置消费者每次消费一个,消费完在进入下一个
//限制消费者每次消费一个,消费完在消费下一个 channel.basicQos(1);
/** * 消息的消费者(持续监听) */ public class Recv1 { private final static String QUEUE_NAME = "work_ms_fair"; public static void main(String[] args) throws IOException, TimeoutException { //创建配置连接信息对象 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("8.140.116.67"); factory.setUsername("admin"); factory.setPassword("992184xiang."); factory.setVirtualHost("/dev"); factory.setPort(5672); //创建连接,消费者一版不自动关闭,因为持续监听 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //设置队列参数 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //限制消费者每次消费一个,消费完在消费下一个 channel.basicQos(1); //消费者消费消息,创建消费者,把信道传进去 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { /** * 模拟消费延迟 */ try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("body:"+new String(body,"utf-8")); //手工确认消息消费,不是多条确认 channel.basicAck(envelope.getDeliveryTag(),false); } }; //消费消息,自动确认 /** * basicConsume(String queue, boolean autoAck, Consumer callback) * 第一个参数:队列名称 * 第二个参数:自动确认消费完成 * 第三个参数:消费者的回调函数 */ channel.basicConsume(QUEUE_NAME,false,consumer); } }
3.RabbitMQ交换机和发布订阅模型实战
3.1.RabbitMQ常见的Exchange交换机类型
生产者将消息发送到Exchange,交换机将消息路由到一个或者多个队列中,交换机有四个类型,队列和交换机是多对多的关系。
交换机只负责转发消息,不具备存储消息的能力,如果没有队列和交换机绑定,或者没有符合规则的路由,消息将被丢失。
RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange
交换机类型
- Direct Exchange定向
- 将一个队列绑定到交换机上,要求消息的routingKey与路由匹配的bindingKey完全匹配。
- 例子:如果一个队列绑定到改交换机上要求路由键“aabb”,则只有被标记为aabb的消息才会被转发,不会转发aabb.cc,也不会转发gg.aabb,只会转发aabb。
- Fanout Exchange广播
- 只需要简单的将队列绑定到交换机上,一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
- Fanout Exchange 交换机转发消息是最快的,用于发布订阅、广播形式。
- 不处理路由键
Topic Exchange主题
- 主题交换机是一种发布/订阅的模式,结合了直连交换机和扇形交换机的特点。
- 将路由键和某种模式进行匹配,此时队列上需要绑定在一个模式上
- 符号#匹配一个或者多个词,符号*匹配一个词
- 例子:“abc.#”能够匹配abc.gc.bs,“abc.*”只能匹配abc.hs
Headers Exchange(少用)
- 根据发送的消息内容中的headers属性进行匹配,在绑定Queue与Exchange时指定一组键值对
- 当消息发送到RabbitMQ时会取到该消息的headers与与Exchange绑定时指定的键值对进行匹配
- 完全匹配则消息会路由到该队列,否则不会路由到该队列
RabbitMQ默认自带7个交换机
3.2.RabbitMQ发布订阅模型实战
RabbitMQ发布订阅模型
发布-订阅模型中,消息生产者不在直接面对queue,而是直面exchange,都需要经过exchange俩进行消息的转发,不需要指定routingKey,所有发送到同一个fanout交换机的消息都会被监听这个交换机的队列收到。
发布订阅模型应用场景
- 微信公众号
- 新浪微博关注
发布订阅模型通过把消息发送给交换机,交换机转发给对应的绑定队列,交换机绑定的队列是排它独占队列,自动删除。
编码实战
生产者
编码流程:创建连接对象->创建信道->绑定交换机->推送消息
核心API:channel.exchangeDeclare(EXCHANGE_NAME,“fanout”)
/** * 消息的生产者 */ public class Send { private final static String EXCHANGE_NAME = "exchange_fanout"; public static void main(String[] args) { //创建连接参数 ConnectionFactory factory = new ConnectionFactory(); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("992184xiang."); factory.setHost("8.140.116.67"); factory.setVirtualHost("/dev"); //创建连接 try(Connection connection = factory.newConnection()){ //创建信道 Channel channel = connection.createChannel(); //绑定交换机,fanout_exchange,广播,第一个参数交换机名称,第二个参数交换机类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //设置消息内容 String msg = "小滴课堂大课训练营发布"; //推送消息 /** * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) * 第一个参数:交换机名称,简单队列没有交换机,所以指定为空串 * 第二个参数:RoutingKey,简单队列的routingKey和队列的名称相同 * 第三个参数:配置信息 * 第四个参数:消息体内容,转换成字节数组 */ channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes(StandardCharsets.UTF_8)); System.out.println("广播消息发送成功"); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
消费者
创建连接对象->创建信道->绑定交换机->获取队列名称->绑定交换机和队列->消费者消费消息
核心API:channel.queueDeclare().getQueue(),channel.queueBind(queue,EXCHANGE_NAME,“”)
/** * 消息的消费者(持续监听) */ public class Recv2 { private final static String EXCHANGE_NAME = "exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { //创建配置连接信息对象 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("8.140.116.67"); factory.setUsername("admin"); factory.setPassword("992184xiang."); factory.setVirtualHost("/dev"); factory.setPort(5672); //创建连接,消费者一版不自动关闭,因为持续监听 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机 channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT); //获取对列名称 String queue = channel.queueDeclare().getQueue(); //绑定交换机和队列,队列名称,交换机名称,routingKey channel.queueBind(queue,EXCHANGE_NAME,""); //消费者消费消息,创建消费者,把信道传进去 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { /** * 模拟消费延迟 */ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("body:"+new String(body,"utf-8")); //手工确认消息消费,不是多条确认 channel.basicAck(envelope.getDeliveryTag(),false); } }; //消费消息,自动确认 /** * basicConsume(String queue, boolean autoAck, Consumer callback) * 第一个参数:队列名称 * 第二个参数:自动确认消费完成 * 第三个参数:消费者的回调函数 */ channel.basicConsume(queue,false,consumer); } }
3.3.RabbitMQ路由模式实战
路由模式简介
交换机类型:Direct
队列和交换机绑定,需要指定一个路由key(BindingKey)
消息生产者发送消息到交换机,需要指定RoutingKey
交换机根据消息的路由key,转发给对应的队列
例子:日志采集系统,一个队列收集错误日志,一个队列收集全部日志
编码实战
/** * 消息的生产者 */ public class Send { private final static String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] args) { //创建连接参数 ConnectionFactory factory = new ConnectionFactory(); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("992184xiang."); factory.setHost("8.140.116.67"); factory.setVirtualHost("/dev"); //创建连接 try(Connection connection = factory.newConnection()){ //创建信道 Channel channel = connection.createChannel(); //绑定交换机,直连交换机direct channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //设置消息内容 String errMsg = "错误日志"; String infoMsg = "日常日志"; String warnMsg = "警告日志"; //推送消息 /** * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) * 第一个参数:交换机名称,简单队列没有交换机,所以指定为空串 * 第二个参数:RoutingKey,简单队列的routingKey和队列的名称相同 * 第三个参数:配置信息 * 第四个参数:消息体内容,转换成字节数组 */ //发送消息指定routingKey channel.basicPublish(EXCHANGE_NAME,"errRoutingKey",null,errMsg.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME,"infoRoutingKey",null,infoMsg.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME,"warnRoutingKey",null,warnMsg.getBytes(StandardCharsets.UTF_8)); System.out.println("消息成功发送到mq中一条"); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
/** * 消息的消费者(持续监听) */ public class Recv { private final static String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] args) throws IOException, TimeoutException { //创建配置连接信息对象 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("8.140.116.67"); factory.setUsername("admin"); factory.setPassword("992184xiang."); factory.setVirtualHost("/dev"); factory.setPort(5672); //创建连接,消费者一版不自动关闭,因为持续监听 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机,direct直连模式 channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT); //获取队列名称 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列 channel.queueBind(queueName,EXCHANGE_NAME,"errRoutingKey"); channel.queueBind(queueName,EXCHANGE_NAME,"infoRoutingKey"); channel.queueBind(queueName,EXCHANGE_NAME,"warnRoutingKey"); //消费者消费消息,创建消费者,把信道传进去 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:"+consumerTag); System.out.println("envelope:"+envelope); System.out.println("properties:"+properties); System.out.println("body:"+new String(body,"utf-8")); } }; //消费消息,自动确认 /** * basicConsume(String queue, boolean autoAck, Consumer callback) * 第一个参数:队列名称 * 第二个参数:自动确认消费完成 * 第三个参数:消费者的回调函数 */ channel.basicConsume(queueName,true,consumer); } }
/** * 消息的消费者(持续监听) */ public class Recv2 { private final static String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] args) throws IOException, TimeoutException { //创建配置连接信息对象 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("8.140.116.67"); factory.setUsername("admin"); factory.setPassword("992184xiang."); factory.setVirtualHost("/dev"); factory.setPort(5672); //创建连接,消费者一版不自动关闭,因为持续监听 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机,direct直连模式 channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT); //获取队列名称 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列 channel.queueBind(queueName,EXCHANGE_NAME,"errRoutingKey"); //消费者消费消息,创建消费者,把信道传进去 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:"+consumerTag); System.out.println("envelope:"+envelope); System.out.println("properties:"+properties); System.out.println("body:"+new String(body,"utf-8")); } }; //消费消息,自动确认 /** * basicConsume(String queue, boolean autoAck, Consumer callback) * 第一个参数:队列名称 * 第二个参数:自动确认消费完成 * 第三个参数:消费者的回调函数 */ channel.basicConsume(queueName,true,consumer); } }
3.4.RabbitMQ主题模式实战
背景
- 如果业务很多路由key,怎末维护?
- topic交换机,支持通配符,功能强大
- 工作中基本都是topic模式
主题模式简介
交换机是topic,可以实现发布订阅模式fanout和路由模式Direct的功能,更加灵活,支持模式匹配和通配符匹配。
交换机通过通配符转发到相应的队列,*代表一个词,#代表一个或者多个词。
注意:交换机和队列绑定时用的binding使用通配符的路由键,生产者和交换机绑定要使用具体的路由键。
例子:日志采集系统,一个队列中收集全部的订单日志信息,order.log.#,一个队列中收集群全部的日志消息*.log.#
编码实战
/** * 消息的生产者 */ public class Send { private final static String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] args) { //创建连接参数 ConnectionFactory factory = new ConnectionFactory(); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("992184xiang."); factory.setHost("8.140.116.67"); factory.setVirtualHost("/dev"); //创建连接 try(Connection connection = factory.newConnection()){ //创建信道 Channel channel = connection.createChannel(); //绑定交换机,直连交换机direct channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //设置消息内容 String errMsg = "错误日志"; String infoMsg = "日常日志"; String warnMsg = "警告日志"; //推送消息 /** * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) * 第一个参数:交换机名称,简单队列没有交换机,所以指定为空串 * 第二个参数:RoutingKey,简单队列的routingKey和队列的名称相同 * 第三个参数:配置信息 * 第四个参数:消息体内容,转换成字节数组 */ channel.basicPublish(EXCHANGE_NAME,"order.log.error",null,errMsg.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME,"order.log.info",null,infoMsg.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME,"order.log.warn",null,warnMsg.getBytes(StandardCharsets.UTF_8)); System.out.println("消息成功发送到mq中一条"); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
/** * 消息的消费者(持续监听) */ public class Recv { private final static String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { //创建配置连接信息对象 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("8.140.116.67"); factory.setUsername("admin"); factory.setPassword("992184xiang."); factory.setVirtualHost("/dev"); factory.setPort(5672); //创建连接,消费者一版不自动关闭,因为持续监听 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机,direct直连模式 channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC); //获取队列名称 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列 channel.queueBind(queueName,EXCHANGE_NAME,"order.log.error"); //消费者消费消息,创建消费者,把信道传进去 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:"+consumerTag); System.out.println("envelope:"+envelope); System.out.println("properties:"+properties); System.out.println("body:"+new String(body,"utf-8")); } }; //消费消息,自动确认 /** * basicConsume(String queue, boolean autoAck, Consumer callback) * 第一个参数:队列名称 * 第二个参数:自动确认消费完成 * 第三个参数:消费者的回调函数 */ channel.basicConsume(queueName,true,consumer); } }
/** * 消息的消费者(持续监听) */ public class Recv2 { private final static String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { //创建配置连接信息对象 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("8.140.116.67"); factory.setUsername("admin"); factory.setPassword("992184xiang."); factory.setVirtualHost("/dev"); factory.setPort(5672); //创建连接,消费者一版不自动关闭,因为持续监听 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机,direct直连模式 channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC); //获取队列名称 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列 channel.queueBind(queueName,EXCHANGE_NAME,"*.log.#"); //消费者消费消息,创建消费者,把信道传进去 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:"+consumerTag); System.out.println("envelope:"+envelope); System.out.println("properties:"+properties); System.out.println("body:"+new String(body,"utf-8")); } }; //消费消息,自动确认 /** * basicConsume(String queue, boolean autoAck, Consumer callback) * 第一个参数:队列名称 * 第二个参数:自动确认消费完成 * 第三个参数:消费者的回调函数 */ channel.basicConsume(queueName,true,consumer); } }
3.5.RabbitMQ多种工作模式的总结
简单模式
一个生产者、一个消费者,不用指定交换机,使用默认的交换机。
工作模式
一个生产者,多个消费,可以有轮询和公平策略,不用指定交换机,使用默认的交换机。
发布订阅模式
fanout类型交换机,通过交换机和队列绑定,不用指定绑定的路由键,生产者发送消息到交换机,fanout交换机直接进行转发,消息不用指定routingKey路由键
路由模式
direct类型交换机,交换机和队列绑定,指定的绑定的路由键,生产者发送消息到交换机,交换机根据消息的路由key进行转发到对应的队列,消息指定的RoutingKey要和交换机绑定队列的bindingKey一致进行转发。
主题模式
topic交换机,交换机和队列绑定,指定绑定的通配符,生产者发送消息到交换机,交换机根据消息的路由key进行转发到对应的队列,消息要制定routingKey路由键
3.6.JAVA整合RabbitMQ的核心API
创建配置连接对象
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort("5672"); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/dev"); //创建连接对象 Connection connection = factory.newConnection();
创建信道
Channel channel = connection.createChannel();
绑定队列
/** * 第一个参数:队列的名称 * 第二个参数:是否持久化配置 * 第三个参数:是否独占,发布订阅模型一般独占 * 第四个参数:自动删除,当没有消费者的消费消息的时候,是否自动删除消息 * 第五个参数:其他参数 **/ channel.queueDeclare(queueName,false,false,false,null);
推送消息
/** * 第一个参数:交换机名称 * 第二个参数:队列名称 * 第三个参数:配置信息 * 第四个参数:发送消息的字节数组 **/ channel.basicPublish("",queueName,null,msg.getBytes(StandardCharsets.UTF_8));
消费消息
//消费者消费消息,创建消费者,把信道传进去 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:"+consumerTag); System.out.println("envelope:"+envelope); System.out.println("properties:"+properties); System.out.println("body:"+new String(body,"utf-8")); } };
/** * 第一个参数:队列名称 * 第二个参数:是否自动消费完成 * 第三个参数:消费者的回调函数 **/ channel.basicConsume(queueName,true,consumer);
限制消费者每次只能消费一个
channel.basicQos(1);
手工确认消费消息
channel.basicAck(envelope.getDeliveryTag,false);
绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
获取队列名称
String queueName = channel.queueDeclare().getQueue();
绑定交换机和队列
channel.queueBind(queueName,EXCHANGE_NAME,"order.log.error");