RabbitMQ(2)

简介: RabbitMQ(2)

交换机

RabbitMQ中,生产者发送信息不会直接将消息投递到队列中,而是将消息投递到交换机中,再由交换机转发到具体的队列中,队列再将消息以推送或者拉取方式给消费进行消费。

交换机类型:

  1. direct直接类型;
  2. topic主题类型;
  1. headers标题类型;不常用
  2. fanout扇出类型。

无名类型:使用空字符串来指定

fanout交换机

生产者

public class Producer {
    public static final String QUEUE="hello";
    public static void main(String args[]) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        String message="hello";
        channel.basicPublish("exchange2","",null,message.getBytes());
        System.out.println("发送成功");
    }
}

消费者

public class Consumer {
    public static void main(String args[]) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        channel.exchangeDeclare("exchange2","fanout");
        String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue,"exchange2","");
        DeliverCallback deliverCallback=(consumerTag,message)->{
            System.out.println(new String(message.getBody()));
        };
        channel.basicConsume(queue,true,deliverCallback,consumerTag->{});
    }
}

Direct Exchange 直连交换其

这里整合Springboot代码如下:

application.yml文件

spring:
  rabbitmq:
    username: admin
    password: admin
    host: 182.92.167.13
    port: 5672

Configuration配置类

@Configuration
public class DirectExchange1 {
    @Bean
    public Queue queue(){
        return new Queue("queue2",true);
    }
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("exchange3",true,false);
    }
    @Bean
    Binding binding(){
        Binding binding= BindingBuilder.bind(queue()).to(directExchange()).with("123");
        return binding;
    }
}

生产者

rabbitTemplate.convertAndSend("exchange3","123","message");

消费者


@Component
@RabbitListener(queues = "queue2")
public class rabbitmq {
    @RabbitHandler
    public void consume(String message){
        System.out.println(message);
    }
}

Topic Exchange代码省略

死信队列

  死信,就是无法被消费的消息,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。


死信的来源


消息 TTL(存活时间) 过期


队列达到最大长度(队列满了,无法再添加数据到队列中)


消息被拒绝并且(不放回队列中: requeue=false)


生产者

 MessageProperties messageProperties=new MessageProperties();
        messageProperties.setExpiration("20000");
        Message message1=new Message("hello".getBytes(),messageProperties);
        rabbitTemplate.convertAndSend("exchange4","2.1",message1);

Configuration配置

@Configuration
public class RabbitmqConfig {
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("exchange4");
    }
    @Bean
    public Queue queue(){
        Map<String,Object> map=new HashMap<>();
        map.put("x-dead-letter-exchange","exchange4_1");
        map.put("x-dead-letter-routing-key","error.er");
        return new Queue("queue3",true,false,false,map);
    }
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(topicExchange()).with("2.*");
    }
    @Bean
    public TopicExchange topicExchange1(){
        return new TopicExchange("exchange4_1");
    }
    @Bean
    public Queue queue1(){
        return new Queue("queue3_1");
    }
    @Bean
    public Binding binding1(){
        return BindingBuilder.bind(queue1()).to(topicExchange1()).with("error.#");
    }
}

消费者

@Component
public class rabbitmq {
    @RabbitListener(queues = "queue3_1")
    public void consume(String msg, Message message, Channel channel){
       System.out.println(msg);
    }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 SQL Java
Rabbitmq
Rabbitmq
56 1
|
1月前
|
消息中间件 数据库 存储
一文带大家快速掌握RabbitMQ!(二)
一文带大家快速掌握RabbitMQ!
一文带大家快速掌握RabbitMQ!(二)
|
1月前
|
消息中间件 网络性能优化 API
一文带大家快速掌握RabbitMQ!(三)
一文带大家快速掌握RabbitMQ!
|
2月前
|
消息中间件 存储 Java
rabbitmq(一)
rabbitmq(一)
|
6月前
|
消息中间件 大数据 Java
RabbitMQ
RabbitMQ
95 1
|
6月前
|
消息中间件 存储 缓存
rabbitMQ
rabbitMQ
45 0
|
6月前
|
消息中间件 存储 网络协议
精通 RabbitMQ 系列 02
精通 RabbitMQ 系列 02
52 0
|
6月前
|
消息中间件 存储 负载均衡
什么是RabbitMQ?
RabbitMQ是一个开源的消息代理软件,用于在分布式系统中传递消息。它实现了高级消息队列协议(AMQP),提供了一种可靠的、强大的、灵活的消息传递机制,使得不同应用程序或组件之间可以轻松地进行通信。
64 0
|
消息中间件 存储 缓存
RabbitMQ到底为什么要使用它?
在多服务体系架构中,必然存在着多个服务之间的调用关系,当用户提交了订单,订单服务会调用支付服务执行用户的金钱操作,执行完毕之后紧接着调用商品服务对商家的商品信息(库存、成交量、收入等)进行更新,执行完毕之后又调用物流服务
|
消息中间件 存储 网络协议
rabbitmq的介绍
rabbitMQ是一个开源的AMQP实现的消息队列中间件,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、C、 用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不错,与SpringAMQP完美的整合、API丰富易用。