🌊RabbitMQ - 简单案例
Hello world
我们将用 Java 编写两个程序。发送单个消息的生产者和接收消息并打印出来的消费者
在下图中,“ P” 是我们的生产者,“ C” 是我们的消费者。中间的框是一个队列 RabbitMQ 代表使用者保留的消息缓冲区
连接的时候,需要开启 5672 端口
- pom.xml
<!--指定 jdk 编译版本--> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <!--rabbitmq 依赖客户端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!--操作文件流的一个依赖--> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies>
- 消息生产者
💧发送消息
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("admin"); factory.setPassword("123456"); //channel 实现了自动 close 接口 自动关闭 不需要显示关闭 //创建连接 Connection connection = factory.newConnection(); //获取信道 Channel channel = connection.createChannel(); /** * 生成一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化 也就是是否用完就删除 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 * 5.其他参数 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "hello world"; /** * 发送一个消息 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其他的参数信息 * 4.发送消息的消息体 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("消息发送完毕"); } }
- 消息消费者
获取“生产者”发出的消息
import com.rabbitmq.client.*; public class Consumer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); System.out.println("等待接收消息........."); //推送的消息如何进行消费的接口回调 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println(message); }; //取消消费的一个回调接口 如在消费的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("消息消费被中断"); }; /** * 消费者消费消息 - 接受消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答 * 3.消费者未成功消费的回调 * 4.消息被取消时的回调 */ channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
Work Queues
Work Queues——工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
轮训分发消息
在这个案例中我们会启动两个工作线程,一个消息发送线程,我们来看看他们两个工作线程是如何工作的。
1、抽取工具类
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitMqUtils { //得到一个连接的 channel public static Channel getChannel() throws Exception { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
2、启动两个工作线程来接受消息
import com.oddfar.utils.RabbitMqUtils; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; /** * 这是一个工作线程,相当于之前的消费者 */ public class Worker01 { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //消息接受 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String receivedMessage = new String(delivery.getBody()); System.out.println("接收到消息:" + receivedMessage); }; //消息被取消 CancelCallback cancelCallback = (consumerTag) -> { System.out.println(consumerTag + "消费者取消消费接口回调逻辑"); }; System.out.println("C1 消费者启动等待消费.................. "); channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
选中 Allow multiple instances
启动后
3、启动一个发送消息线程
public class Task01 { public static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("消息发送完成:" + message); } } }
- 结果展示
通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且是按照有序的一个接收一次消息
【RabbitMQ教程】第二章 —— RabbitMQ - 简单案例(下):https://developer.aliyun.com/article/1389934?spm=a2c6h.13148508.setting.29.4fea4f0ervlqra