1.创建项目(此处忽略)
2.引入依赖
在pom.xml文件中引入如下依赖
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
3.编写生产者
package com.tjrac.rabbitmq.TestDemo; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.Map; public class Producer { public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //主机地址 connectionFactory.setHost("192.168.100.110"); //连接端口;默认为 5672 connectionFactory.setPort(5672); //虚拟主机名称;默认为 / connectionFactory.setVirtualHost("/"); //连接用户名;默认为guest connectionFactory.setUsername("admin"); //连接密码;默认为guest connectionFactory.setPassword("123456"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建频道 Channel channel = connection.createChannel(); // 声明(创建)队列 /** * queue 参数1:队列名称 * durable 参数2:是否定义持久化队列,当mq重启之后,还在 * exclusive 参数3:是否独占本次连接 * ① 是否独占,只能有一个消费者监听这个队列 * ② 当connection关闭时,是否删除队列 * autoDelete 参数4:是否在不使用的时候自动删除队列,当没有consumer时,自动删除 * arguments 参数5:队列其它参数 */ channel.queueDeclare("simple_queue", true, false, false, null); // 要发送的信息 String message = "Hello RabbitMQ"; /** * 参数1:交换机名称,如果没有指定则使用默认Default Exchage * 参数2:路由key,简单模式可以传递队列名称 * 参数3:配置信息 * 参数4:消息内容 */ channel.basicPublish("", "simple_queue", null, message.getBytes()); System.out.println("已发送消息:" + message); // 关闭资源 channel.close(); connection.close(); } }
4.编写消费者
package com.tjrac.rabbitmq.TestDemo; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throws Exception { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("192.168.110.110");//ip factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/");//虚拟机 默认值/ factory.setUsername("admin");//用户名 factory.setPassword("123456");//密码 //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); //5. 创建队列Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 参数: 1. queue:队列名称 2. durable:是否持久化,当mq重启之后,还在 3. exclusive: * 是否独占。只能有一个消费者监听这队列 * 当Connection关闭时,是否删除队列 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 5. arguments:参数。 */ //如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建 channel.queueDeclare("simple_queue",true,false,false,null); // 接收消息 DefaultConsumer consumer = new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1. consumerTag:标识 2. envelope:获取一些信息,交换机,路由key... 3. properties:配置信息 4. body:数据 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:"+consumerTag); System.out.println("Exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties); System.out.println("body:"+new String(body)); } }; /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck:是否自动确认 ,类似咱们发短信,发送成功会收到一个确认消息 3. callback:回调对象 */ // 消费者类似一个监听程序,主要是用来监听消息 channel.basicConsume("simple_queue",true,consumer); } }