公众号merlinsea
Topic Exchange 主题交换机
1、主题交换机是⼀种发布/订阅的模式,结合了直连交换机与扇形交换机的特点 2、将路由键和某模式进⾏匹配,匹配成功就进行转发 3、符号“#”匹配⼀个或多个词,符号“*”匹配不多不少⼀个词
例⼦:因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.*” 只会匹配到“abc.def”。
生产者代码
在发送消息的时候需要指定消息的key,是完整的key
public class Send { //交换机的名称,必须保证生产方和消费方一致 private final static String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("39.107.221.166"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); //JDK7语法,自动关闭,创建连接 try (Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel()) { //绑定交换机,topic交换机 channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC); String error = "error日志"; String info = "info日志"; String debug = "debug日志"; channel.basicPublish(EXCHANGE_NAME,"order.log.error",null,error.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME,"order.log.info",null,info.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME,"product.log.debug",null,debug.getBytes(StandardCharsets.UTF_8)); System.out.println("TOPIC消息发送成功"); } } }
消费者代码
消费者会创建队列,在队列绑定交换机的时候要指定binding key的通配符。
public class Recv2 { private final static String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("39.107.221.166"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机, channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC); //获取队列 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列, 需要指定routingkey // *匹配一个词,#匹配多个词 channel.queueBind(queueName,EXCHANGE_NAME,"*.log.*"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body="+new String(body,"utf-8")); //手工确认消息消费,不是多条确认 channel.basicAck(envelope.getDeliveryTag(),false); } }; //消费,关闭消息消息自动确认,重要 channel.basicConsume(queueName,false,consumer); } }