一、安装rabbitmq并在管理页创建交换器和队列以及绑定
来到docker官网找一个带management的版本,https://hub.docker.com/_/rabbitmq
然后下载、运行
访问服务器ip:15672,使用默认用户名和密码 guest进行登录
创建三个自定义交换器,headers类型不推荐了
然后创建几个队列
然后选择交换器,绑定队列,需要注意的是direct类型的Routing key需要完全匹配,fanout类型不需要匹配Routing key,topic类型需要进行模糊匹配,*代表一个词,#代表0个或多个
topic类型的绑定如下
发送消息
二、创建项目并测试发送与接受消息
选择web和rabbitmq依赖就行了
配置文件里配置下host啥的
然后写个测试,发送一条消息试试
非常的nice,它直接报错了,然后把put方法拿出来写,它好了,这里原理暂时不太懂,先不深究
它默认使用的是jdk序列化
为了方便观察,改成json格式吧,使用这个转换器
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
也可以写个配置类替换
package com.example.amqp.config; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author baikunlong * @date 2020/9/19 19:06 */ @Configuration public class MyAMQPConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
运行发送方法,再运行接受方法即可看到消息,在这里接收后,消息就被消费了
在网页查看需要选择第二项才能消费掉消息
在启动类使用@EnableRabbit注解开启基于注解的RabbitMQ模式,写一个service,使用@RabbitListener注解监听消息队列
package com.example.amqp.service; import com.example.amqp.bean.Book; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * @author baikunlong * @date 2020/9/19 19:24 */ @Service public class BookService { @RabbitListener(queues = "bkl.news") public void receive(Book book){ System.out.println("收到消息了:"+book); } }
然后运行测试类的发送消息方法,即可收到消息,这里的交换器类型为fanout,即所有队列都会收到这条消息
要获取MessageProperties把入参改成Message即可
@RabbitListener(queues = "bkl") public void receive(Message message){ System.out.println("message.getBody():"+ message.getBody()); System.out.println("message.getMessageProperties():"+message.getMessageProperties()); }
三、使用AmqpAdmin动态创建交换器和队列
@Resource AmqpAdmin amqpAdmin; @Test void createExchange(){ amqpAdmin.declareExchange(new DirectExchange("amqpAdmin.exchange.direct")); System.out.println("创建Exchange成功"); amqpAdmin.declareQueue(new Queue("amqpAdmin.queue")); System.out.println("创建队列成功"); amqpAdmin.declareBinding(new Binding("amqpAdmin.queue", Binding.DestinationType.QUEUE, "amqpAdmin.exchange.direct","amqbAdmin.xxx",null)); System.out.println("绑定amqpAdmin.exchange.direct到amqpAdmin.queue"); }
成功的创建了交换器和队列并绑定了在一起