交换机工作模式(四种)
fanout:广播,这种模式只需要将队列绑定到交换机上即可,是不需要设置路由键的。(广播,直接绑定队列)
我们之前的哪里就是fanout形式。
direct:根据RoutingKey匹配消息路由到指定的队列。(绝对匹配)
topic:生产者指定RoutingKey消息根据消费端指定的队列通过模糊匹配方式进行相应转发。(模糊匹配)
headers:根据发送消息内容中的headers属性来匹配。
有了交换机之后就有了更强大的能力,可以根据交换机的模式来完成更加复杂的功能。
一、fanout模式
1.1、基本概念
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JBGMtaQg-1651541063735)(C:\Users\93997\AppData\Roaming\Typora\typora-user-images\image-20210925070020024.png)]
应用场景:例如我们要通过rabbitmq来进行日志记录,一般会以两种形式,控制台以及存储到磁盘方式进行落库存盘。无论哪种方式,其内容本质是相同的,此时我们可以使用fanout方式。
特点:
使用fanout的特点就是只有当消费者连接之后生产者发送的消息才会生效,在此之前生产者产生的会默认直接丢弃,这也就解决了消息积压的问题。(对于之前的一些日志记录丢弃掉也没有关系)
若是运行多个服务,那么每一个服务都能够收到同一份消息,而不像之前案例一样均匀分配任务了!!!
1.2、代码实操
目的(实现效果):发送的多个日志信息,每个服务都能够收到相同数量且同样内容的信息。
生产者
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.util.Optional; /** * @ClassName EmitLog * @Author ChangLu * @Date 2021/9/25 7:10 * @Description TODO */ public class EmitLog { //核心:指定交换机名称 private static String EXCHANGE_NAME = "LOGS"; public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.118.128"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("password"); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection(); channel = connection.createChannel(); //绑定一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //向指定的交换机发送消息 String msg = "this is a log info!"; //第二个参数是routingkey,第三个参数是基本属性 channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes(StandardCharsets.UTF_8)); System.out.println("消息发送成功!"); }catch (Exception e){ System.out.println("消息发送失败,失败原因:"+e.getMessage()); } Optional.ofNullable(channel).ifPresent(EmitLog::close); Optional.ofNullable(connection).ifPresent(EmitLog::close); } //反射进行资源关闭 public static <T> void close(T t){ Class<?> aClass = t.getClass(); Optional.ofNullable(t).ifPresent(c-> { try { Method closeMethod = aClass.getDeclaredMethod("close"); closeMethod.setAccessible(true); closeMethod.invoke(t, null); } catch (Exception e){ e.printStackTrace(); } }); } }
消费者
import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * @ClassName ReceiveLog * @Author ChangLu * @Date 2021/9/25 7:23 * @Description TODO */ public class ReceiveLog { private static String EXCHANGE_NAME = "LOGS"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.118.128"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("password"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //绑定指定的交换机类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //生成一个临时的随机的queue,并绑定交换机与队列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_NAME,"");//第三个参数为routingkey DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, StandardCharsets.UTF_8); System.out.println("接收到消息:"+msg); } }; //进行消费,需要指定队列名称 channel.basicConsume(queueName,true,consumer); } }
演示:生产者连发两条消息,启动的两个消费者服务都能够接收到相同的两份信息
二、direct模式
2.1、基本概念
概念图
可以看到下图,相同的队列可以指定相同的routingkeys,这是不受影响的。
说明
场景:有些情况下,对于日志而言我们进行升级,可能不需要把所有的日志都存储到磁盘上,只需要在磁盘中存储错误日志,例如error类型的,日志分为不同等级嘛。
在磁盘中只存储error,而在控制台里把所有消息都打印出来。涉及到不同的消费者接收消息不一致的情况此时非常建议使用direct模式。
实现效果:生产者服务在消费者服务启动前发送的消息,不会放入到队列中存储起来,只有在消费者服务启动后发送的消息才会被进行分发处理。
实现方式:设置交换机为direct,并且需要额外设置routingkey,可以将key理解为对指定信息感兴趣。
2.2、代码实操
目的(实现效果):不同类型的日志交由不同的服务处理,例如info、debug、warning交由消费者服务1处理;error交由消费者服务2处理。
生产者:
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.nio.charset.StandardCharsets; /** * @ClassName EmitLogs * @Author ChangLu * @Date 2021/9/25 9:44 * @Description 生产者:绑定交换机direct_LOGS,发送四个消息,每个消息匹配一个routingkeys(info、debug、error、warning) */ public class EmitLogs { //核心:指定交换机名称,需要跟之前的不一样 private static String EXCHANGE_NAME = "direct_LOGS"; public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.118.128"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("password"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //绑定一个Direct类型交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String msg = "sended msg"; //发送三条消息,每个消息绑定对应的routingkey,发送给指定的一个交换机 String INFO_MESSAGE = "info-"+msg; channel.basicPublish(EXCHANGE_NAME,"info",null,INFO_MESSAGE.getBytes(StandardCharsets.UTF_8)); System.out.println("成功发送消息:"+INFO_MESSAGE); String DEBUG_MESSAGE = "debug-"+msg; channel.basicPublish(EXCHANGE_NAME,"debug",null,DEBUG_MESSAGE.getBytes(StandardCharsets.UTF_8)); System.out.println("成功发送消息:"+DEBUG_MESSAGE); String WARNING_MESSAGE = "warning-"+msg; channel.basicPublish(EXCHANGE_NAME,"warning",null,WARNING_MESSAGE.getBytes(StandardCharsets.UTF_8)); System.out.println("成功发送消息:"+WARNING_MESSAGE); String ERROR_MESSAGE = "warning-"+msg; channel.basicPublish(EXCHANGE_NAME,"error",null,ERROR_MESSAGE.getBytes(StandardCharsets.UTF_8)); System.out.println("成功发送消息:"+ERROR_MESSAGE); channel.close(); connection.close(); } }
消费者1:
import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * @ClassName ReceiveLogs * @Author ChangLu * @Date 2021/9/25 9:50 * @Description 消费者:绑定交换机direct_LOGS,并且设置三个routingkeys:info、debug、warning */ public class ReceiveLogs1 { private static String EXCHANGE_NAME = "direct_LOGS"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.118.128"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("password"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //绑定指定的交换机类型——direct channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //生成一个随机的临时的queue String queueName = channel.queueDeclare().getQueue(); //一个交换机同时绑定三个routingkeys channel.queueBind(queueName,EXCHANGE_NAME,"info"); channel.queueBind(queueName,EXCHANGE_NAME,"debug"); channel.queueBind(queueName,EXCHANGE_NAME,"warning"); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, StandardCharsets.UTF_8); System.out.println("接收到消息:"+msg); } }; //进行消费,需要指定队列名称 channel.basicConsume(queueName,true,consumer); } }
消费者2:大部分内容与消费者1相同,只不过就指定的routingkeys不一样而已
//绑定direct类型交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = channel.queueDeclare().getQueue(); //绑定指定routingkeys:error channel.queueBind(queueName,EXCHANGE_NAME,"error");
测试一下:生产者发送了四个不同routingkeys的消息,两个消费者接受对应routingkeys信息
三、topic模式
3.1、基本概念
我们可对routingkey进行模糊匹配,如使用*或#来进行匹配!
*可以代替一个单词
#可以替代零个或多个单词
我们可以对routingkeys进行.分割的不同类型区分,上面对应着消费者服务来对生产者产生的信息进行过滤,每个队列能够拿到对应交换机可模糊匹配的执行信息,来达到区分的效果!
3.2、实战
目的:有10条记录其中包含了不同类型的内容,生产者发送消息的同时每条记录都附带对应详细的routingkey;消费者则会定义指定的routingkey模糊表达式,用于进行模糊匹配拿到生产者发送过来的信息记录条数。
生产者
这里对方法进行了抽离,实际上基本配置与之前都相同。这里的话准备了一个routingkey数组用于分别描述各组信息与实体信息传递出去:
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * @ClassName TopicProduce * @Author ChangLu * @Date 2021/9/25 10:57 * @Description 生产者:交换机类型为topic */ public class TopicProduce { private static String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] args) throws IOException, TimeoutException { Util.doBefore(); Channel channel = Util.channel; //指定交换机为topic channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //定义关键字,不同的消息在发送时都指定routingKeys String[] routingKeys = new String[]{ "quick.orange.rabbit","quick.orange.fox","quick.brown","quick.orange.male.rabbit", "lazy.orange.elephant","lazy.brown.fox","lazy.pink.rabbit","lazy.orange.male.rabbit", "orange" }; for (String routingKey : routingKeys) { String msg = "信息内容:"+routingKey; //发送 channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes(StandardCharsets.UTF_8)); System.out.println("已发送"+msg); } Util.close(); } static class Util{ private static Connection connection = null; public static Channel channel = null; public static void doBefore() throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.118.128"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("password"); connection = connectionFactory.newConnection(); channel = connection.createChannel(); } public static void close() throws IOException, TimeoutException { if(channel!=null){ channel.close(); } if(connection!=null){ connection.close(); } } } }
消费者
消费者1:基本配置都相同,不再重复展示,匹配的routingkey为"*.orange.*"
private static String EXCHANGE_NAME = "topic_exchange"; ... //指定交换机为topic channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName = channel.queueDeclare().getQueue(); //指定routingkeys,可搭配*或#进行匹配 String bindingKey = "*.orange.*"; channel.queueBind(queueName,EXCHANGE_NAME,bindingKey);//绑定队列、交换机以及routingkey channel.basicConsume(queueName,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, StandardCharsets.UTF_8); System.out.println("收到消息:"+msg); } }); ...
消费者2:匹配的routingkey为"quick.*"、"lazy.orange.#"、"quick.orange.fox"
private static String EXCHANGE_NAME = "topic_exchange"; //指定交换机为topic channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName = channel.queueDeclare().getQueue(); //指定routingkeys,可搭配*或#进行匹配 String bindingKey = "quick.*"; channel.queueBind(queueName,EXCHANGE_NAME,bindingKey);//绑定队列、交换机以及routingkey String bindingkey2 = "lazy.orange.#"; channel.queueBind(queueName,EXCHANGE_NAME,bindingkey2); String bindingkey3 = "quick.orange.fox"; channel.queueBind(queueName,EXCHANGE_NAME,bindingkey3); channel.basicConsume(queueName,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, StandardCharsets.UTF_8); System.out.println("收到消息:"+msg); } }); ...
测试:可以看到对应的消费者服务都能够进行精确匹配拿到对应的信息
精炼总结
直接指定队列形式:消费者服务没有启动前发送的消息,在消费者服务启动后仍然能够收到,因为此前发送的消息被存储到队列中。
就是一、二章节中的案例。
交换机类型:第三章节案例
fanout:①交换机发送的消息都一致的发送给现有的队列中。(每个队列都能够收到相同的消息)②在使用fanout类型交换机的消费者服务不会接收服务启动前生产者发送的信息,也就是说启动前发送的消息都会被丢弃掉!
应用场景:相同数量及内容多个消费者都能够收到,分别对所有内容进行不同的处理。
direct:①交换机在发送的时候带上routingkey,消息队列在接收时也指定routingkey即可拿到对应交换机想要发送给某个队列的信息,来达到直接准确的发送!②消费者服务启动前,生产者发送给指定交换机的信息会被丢弃。
应用场景:不同类型的日志记录进行不同处理,如窗口执行、磁盘存储。
topic模式(与direct是兄弟关系,或升级关系):①根据严重性,源头去考虑。消费者服务可以进行模糊匹配(*号代替一个单词,#替代零个或多个单词)。交换机可以设置多个routingkey,对应的接收队列可以进行模糊匹配。②②消费者服务启动前,生产者发送给指定交换机的信息不会被丢弃。
headers:根据发送消息内容中的headers属性来匹配。(实际基本不会使用)
整理者:长路 时间:2021.9.25
类型交换机的消费者服务不会接收服务启动前生产者发送的信息,也就是说启动前发送的消息都会被丢弃掉!
应用场景:相同数量及内容多个消费者都能够收到,分别对所有内容进行不同的处理。
direct:①交换机在发送的时候带上routingkey,消息队列在接收时也指定routingkey即可拿到对应交换机想要发送给某个队列的信息,来达到直接准确的发送!②消费者服务启动前,生产者发送给指定交换机的信息会被丢弃。
应用场景:不同类型的日志记录进行不同处理,如窗口执行、磁盘存储。
topic模式(与direct是兄弟关系,或升级关系):①根据严重性,源头去考虑。消费者服务可以进行模糊匹配(*号代替一个单词,#替代零个或多个单词)。交换机可以设置多个routingkey,对应的接收队列可以进行模糊匹配。②②消费者服务启动前,生产者发送给指定交换机的信息不会被丢弃。
headers:根据发送消息内容中的headers属性来匹配。(实际基本不会使用)