一、引入 MQ 的依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> </dependency>
二、配置文件中写入 MQ 的配置信息
#rabbitmq 配置信息 spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
三、创建生产者
package com.zyxx.rabbitmq.entity; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @ClassName producer * 生产者 * @Author Lizhou * @Date 2019-08-02 9:54:54 * @Version 1.0 **/ public class ProducerTest { public static void main(String[] args) { // 创建一个连接工厂,用于生成与RabbitMQ进行连接 ConnectionFactory factory = new ConnectionFactory(); // 根据这个连接工厂设置RabbitMQ所在的主机,账号密码和端口号等(默认情况下就不需要账号密码和端口了) factory.setHost(MqEntity.HOST); factory.setUsername(MqEntity.USERNAME); factory.setPassword(MqEntity.PASSWORD); try { // 由连接工厂生成与RabbitMQ的连接 Connection connection = factory.newConnection(); // 创建一条通道 Channel newChannel = connection.createChannel(); // 由这个通道声明一个队列 newChannel.queueDeclare(MqEntity.QUEUENAME, false, false, true, null); // 要发送的消息 String sendMessage = "Hello,我是生产者!!!"; newChannel.basicPublish(MqEntity.EXCHANGE_NAME, MqEntity.QUEUENAME, null, sendMessage.getBytes()); System.out.println("消息发送成功!!!"); // 关闭通道 newChannel.close(); // 关闭连接 connection.close(); } catch (Exception e) { e.printStackTrace(); } } }
四、创建消费者
package com.zyxx.test; import com.rabbitmq.client.*; import java.io.IOException; /** * @ClassName Consumer * 消费者 * @Author Lizhou * @Date 2019-08-02 9:57:57 * @Version 1.0 **/ public class ConsumerTest { public static void main(String[] args) { // 创建一个连接工厂,用于生成与RabbitMQ进行连接 ConnectionFactory factory=new ConnectionFactory(); // 根据这个连接工厂设置RabbitMQ所在的主机,账号密码和端口号等(默认情况下就不需要账号密码和端口了) factory.setHost(MqEntity.HOST); factory.setUsername(MqEntity.USERNAME); factory.setPassword(MqEntity.PASSWORD); Connection connection; try { // 创建一条通道,通道创建好后,关联相关的队列 connection = factory.newConnection(); Channel newChannel = connection.createChannel(); newChannel.queueDeclare(MqEntity.QUEUENAME, false, false, true, null); Consumer consumer = new DefaultConsumer(newChannel){ // 重写handleDelivery方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String getMessage = new String(body, "UTF-8"); System.out.println("接收到消息:'" + getMessage + "'"); } }; // 自动回复队列应答 -- RabbitMQ中的消息确认机制 newChannel.basicConsume(MqEntity.QUEUENAME, true, consumer); } catch (Exception e) { e.printStackTrace(); } } }
五、创建 MQ 常量类
package com.zyxx.test; /** * @author lizhou * @date 2019-08-02 */ public final class MqEntity { /** * 队列名称 */ public final static String QUEUENAME="mq.test_queu1"; /** * 主机名 */ public final static String HOST="localhost"; /** * 用户名 */ public final static String USERNAME="guest"; /** * 密码 */ public final static String PASSWORD="guest"; /** * 交换机名 */ public final static String EXCHANGE_NAME=""; }
- 六、启动生产者,发送消息
消息发送成功!!!
- 七、启动消费者,接收消息
接收到消息:‘Hello,我是生产者!!!’
最后,你还可以创建两个 SpringBoot 项目,一边用来发,一边用来收,可实现实时通信。