2. Hello World
在本教程的这一部分中,我们将用 Java 编写两个程序。发送单个消息的生产者和接收消息并打印出来的消费者。我们将介绍 Java API 中的一些细节。
在下图中,“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ 代表使用者保留的消息缓冲区
创建一个空的工程
next
2.1. 依赖
<!--指定 jdk 编译版本--> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <!--rabbitmq 依赖客户端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!--操作文件流的一个依赖--> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies>
导入pom中
2.2. 消息生产者
public class Producer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("182.92.234.71"); factory.setUsername("admin"); factory.setPassword("123"); //channel 实现了自动 close 接口 自动关闭 不需要显示关闭 try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()) { /** * 生成一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化 默认消息存储在内存中 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 * 5.其他参数 */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); String message="hello world"; /** * 发送一个消息 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其他的参数信息 * 4.发送消息的消息体 */ channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("消息发送完毕"); } } }
package com.atguigu.rabbitmq.one; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { //队列名称 public static final String QUENU_NAME="hello"; //发消息 public static void main(String[] args) throws IOException, TimeoutException { //创建工程 ConnectionFactory factory = new ConnectionFactory(); //工厂IP 连接RabbliMQ的队列 //设置id factory.setHost("39.107.43.12"); //用户名 factory.setUsername("admin"); //密码 factory.setPassword("admin"); //创建连接 Connection connection = factory.newConnection(); // 获取信道 Channel channel = connection.createChannel(); /** * 生成一个队列 * 1.队列名称 * 2.队列中的消息是否持久化 默认消息存储在内存中 否不持久化 (持久化放在磁盘中) *3.该队列是否只供一个消费者进行消费 是否共享 true 可以多个消费者消费 默认false,只能一个消费者消费 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true自动删除 false不删除 5.其他参数 */ channel.queueDeclare(QUENU_NAME,false,false,false,null); //发消息了开始 /** * 发送一个消息 * 1.发送到那个交换机 此不考虑不写 * 2.路由key值是那个 本次是队列的名称 * 3.其他参数信息 *4.发送的消息 的消息体 发送要发送他的二进制 才能发送出去消息 */ String message="hello world";//初次使用 channel.basicPublish("",QUENU_NAME,null,message.getBytes()); System.out.println("消息发送完毕"); } }
遇到的问题是
P14 写请求的代码的时候 我遇到了一个 连接超时的问题 Connection timed out: connect 看了评论是因为防火墙端口少开了一个 mq需要配置2个我只配置了一个15672 然而是需要开放15672和5672两个端口,一个是连接控制台,一个是连接服务
fireawll-cmd --zone=public --add-port=15672/tcp --permanent
fireawll-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --reload
采用下下面同学的建议 完善了一下;希望可以帮到大家
2.3. 消息消费者
public class Consumer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("182.92.234.71"); factory.setUsername("admin"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); System.out.println("等待接收消息 ......... "); //推送的消息如何进行消费的接口回调 DeliverCallback deliverCallback=(consumerTag,delivery)->{ String message= new String(delivery.getBody()); System.out.println(message); }; //取消消费的一个回调接口 如在消费的时候队列被删除掉了 CancelCallback cancelCallback=(consumerTag)->{ System.out.println("消息消费被中断"); }; /** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答 * 3.消费者未成功消费的回调 */ channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
package com.atguigu.rabbitmq.one; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { //队列的名称 public static String QUENE_NAME="hello"; //接受信息 public static void main(String[] args) throws IOException, TimeoutException { //创建连接的工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("39.107.43.12"); factory.setUsername("admin"); factory.setPassword("admin"); //创建连接 Connection connection = factory.newConnection(); //获取信道 Channel channel = connection.createChannel(); //声明 接受消息 DeliverCallback deliverCallback=(consumerTag,message)->{ System.out.println(new String(message.getBody())); }; //取消消息是的回调 CancelCallback cancelCallback=consumerTag->{ System.out.println("消费被中断"); }; /** * 消费者消费信息 *1.表示消费那个队列 * 2.消费成功之后 是否自动应答ture自动应答 false手动应打 默认false * 3.消费未成功消费的回调 * 4.消费者取消消费的回调 * */ channel.basicConsume(QUENE_NAME,true,deliverCallback,cancelCallback); } }
3. Work Queues
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
3.1. 轮训分发消息
轮训就是 你一个我一个他一个 一人一个 消息的处理
在这个案例中我们会启动两个工作线程,一个消息发送线程,我们来看看他们两个工作线程是如何工作的。
3.1.1. 抽取工具类
public class RabbitMqUtils { //得到一个连接的 channel public static Channel getChannel() throws Exception{ //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("182.92.234.71"); factory.setUsername("admin"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
package com.atguigu.rabbitmq.utils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 连接工程创建信道 */ public class RabbitMqutils { //创建连接工程 public static Channel getChannel() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("39.107.43.12"); factory.setUsername("admin"); factory.setPassword("admin"); //创建连接 Connection connection = factory.newConnection(); //获取信道 Channel channel = connection.createChannel(); //返回 return channel; } }