消息队列中的topic exchange

简介: 消息队列中的topic exchange

公众号merlinsea


Topic Exchange 主题交换机


1、主题交换机是⼀种发布/订阅的模式,结合了直连交换机与扇形交换机的特点 2、将路由键和某模式进⾏匹配,匹配成功就进行转发 3、符号“#”匹配⼀个或多个词,符号“*”匹配不多不少⼀个词

例⼦:因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.*” 只会匹配到“abc.def”。

640.jpg


生产者代码

在发送消息的时候需要指定消息的key,是完整的key

public class Send {
    //交换机的名称,必须保证生产方和消费方一致
    private final static String EXCHANGE_NAME = "exchange_topic";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("39.107.221.166");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
        //JDK7语法,自动关闭,创建连接
        try (Connection connection = factory.newConnection();
             //创建信道
             Channel channel = connection.createChannel()) {
            //绑定交换机,topic交换机
            channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
            String error = "error日志";
            String info = "info日志";
            String debug = "debug日志";
            channel.basicPublish(EXCHANGE_NAME,"order.log.error",null,error.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME,"order.log.info",null,info.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME,"product.log.debug",null,debug.getBytes(StandardCharsets.UTF_8));
            System.out.println("TOPIC消息发送成功");
        }
    }
}

消费者代码

消费者会创建队列,在队列绑定交换机的时候要指定binding key的通配符。

public class Recv2 {
    private final static String EXCHANGE_NAME = "exchange_topic";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("39.107.221.166");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机,
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
        //获取队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列, 需要指定routingkey
        // *匹配一个词,#匹配多个词
        channel.queueBind(queueName,EXCHANGE_NAME,"*.log.*");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body="+new String(body,"utf-8"));
                //手工确认消息消费,不是多条确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //消费,关闭消息消息自动确认,重要
        channel.basicConsume(queueName,false,consumer);
    }
}


相关文章
|
5月前
|
消息中间件 Kubernetes RocketMQ
消息队列 MQ产品使用合集之topic是怎么选择分布在哪里brocker上面的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 存储 Java
消息队列 MQ使用问题之如何将RocketMQ中某个集群的topic迁移到另一个集群
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 测试技术 Apache
消息队列 MQ产品使用合集之在测试环境中拥有大量的topic会有什么影响
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
110 1
|
5月前
|
消息中间件 Java API
消息队列 MQ产品使用合集之遇到"No topic route info in name server for the topic"错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 Java 开发工具
消息队列 MQ产品使用合集之topic相同,但是tag不同,这个类不能放入map中,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 RocketMQ
消息队列 MQ操作报错合集之无法自动创建topic,该怎么办
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
164 0
|
消息中间件
消息队列中的direct exchange
消息队列中的direct exchange
|
消息中间件
消息队列中的fanout exchange
消息队列中的fanout exchange
|
域名解析 消息中间件 Kubernetes
【消息队列】解决ERR 1 [topic/channel] (: no such host
【消息队列】解决ERR 1 [topic/channel] (: no such host
317 0
|
数据采集 消息中间件 存储
基于TableStore构建简易海量Topic消息队列
前言 消息队列,通常有两种场景,一种是发布者订阅模式,一种是生产者消费者模式。发布者订阅模式,即发布者生产消息放入队列,多个监听的消费者都会收到同一份消息,也就是每个消费者收到的消息是一样的。生产者消费者模式,生产者生产消息放入队列,多个消费者同时监听队列,谁先抢到消息就会从队列中取走消息,最终每个消息只会有一个消费者拥有。
6376 0