2.Hello,world
在本教程的这一部分中,我们将用 Java 编写两个程序。发送单个消息的生产者和接收消息并打印
出来的消费者。我们将介绍 Java API 中的一些细节。
在下图中,“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ 代
表使用者保留的消息缓冲区
2.1实现代码
导入依赖
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <!--rabbitmq 依赖客户端 --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!-- 操作文件流的一个依赖 --> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies>
消息生产者:
public class Producer { private final static String QUEUE_NAME = "hello";//ctrl+shift+u public static void main(String[] args) throws IOException, TimeoutException { //创建一个工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.174.128"); factory.setUsername("admin"); factory.setPassword("admin"); // Connection connection = factory.newConnection(); Channel channel = connection.createChannel();//创建信道 /** * queueDeclare(String queue, boolean durable, boolean exclusive, * boolean autoDelete, Map<String, Object> arguments) * 生成一个队列(这里使用默认的交换机..) * 1.队列名称 * 2.队列里面的消息是否持久化,默认消息存储在内存中 * 3.该队列是否只提供一个消费者进行消费,是否进行共享, true:可以多个消费者消费 * 4.是否自动删除 最后一个消费者断开连接以后,该队列是否自动删除 true:自动删除. * 5.其他参数 * */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); String message = "hello,world"; /** *向哪个交换机的哪个队列发送消息 * 1.发送到那个交换机(为空表示是默认交换机) * 2.路由的key是哪个 匹配队列 * 3.其他的参数信息 * 4.发送消息的消息体 */ channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("消息发送外币"); } }
消息消费者:
public class Consumer { private final static String QUEUE_NAME = "hello";//ctrl+shift+u public static void main(String[] args) throws Exception{ //创建一个工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.174.128"); factory.setUsername("admin"); factory.setPassword("admin"); //channer实现自动close接口,自动关闭 Connection connection = factory.newConnection(); Channel channel = connection.createChannel();//创建信道 //声明接收消息 DeliverCallback deliverCallback=( consumerTag, message)->{ System.out.println(new String(message.getBody())); }; //取消消息时的回调 比如生产者拒绝消费时 CancelCallback cancelCallback = consumerTag->{ System.out.println("消息消费被中断..."); }; /** * String basicConsume(String queue, boolean autoAck, * DeliverCallback deliverCallback, CancelCallback cancelCallback) *消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否自动应答,true代表自动应答,false代表手动应答 * 3.消费者消费成功的回调. * 4.消费者未消费成功的回调. * */ //上面那俩声明只是作为消费调用的参数. channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
注意:当出现超时的错误时,要在防火墙中开放5672端口
3.Work Queues
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。
相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业(进程之间是竞争式的)。当有多个工作线程时,这些工作线程将一起处理这些任务。
图解:
3.1. 轮训分发消息
在这个案例中我们会启动两个工作线程,一个消息发送线程,我们来看看他们两个工作线程是如何工作的。
3.1.1. 抽取工具类
/** * 连接工厂创建信道的工具类 */ public class RabbitMqUtils { //得到一个连接的channel 创建TCP连接 public static Channel getChannel() throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.174.128"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();//创建信道 return channel; } }
3.1.2. 编写生产者,消费者
//生产者 public class Task01 { public static final String QUEUE_NAME="hello"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); //从控制台当中接收信息 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish("",QUEUE_NAME,null ,message.getBytes()); System.out.println("消息发送完成:"+message); } } } //工作线程(消费者) public class Work01 { public static final String QUEUE_NAME="hello"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //消息的接收 DeliverCallback deliverCallback = ( consumerTag, message)->{ System.out.println("接收的消息:"+new String(message.getBody())); }; //消息接收失败的回调 CancelCallback cancelCallback = consumerTag->{ System.out.println(consumerTag+"消费者取消消费接口执行回调逻辑."); }; System.out.println("C3等待接收消息..."); channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
通过开启IDEA的Allow parallel run按钮,开启多线程的运行.
运行结果:
3.2消息应答
3.2.1. 概念
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是: 消费 者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了, rabbitmq 可以把该消息删除了。
3.2.2 手动应答代码
生产者
public class Task02 { private static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); //创建队列 channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); Scanner scanner = new Scanner(System.in); System.out.println("请输入消息:"); while(scanner.hasNext()){ String message = scanner.next(); //设置生产者发送消息为持久化消息(要求保存到磁盘中) 默认是保存到内存中. channel.basicPublish("",TASK_QUEUE_NAME,false,message.getBytes("UTF-8")); System.out.println("生产者发送消息:"+message); } } }
消费者:
public class Work03 { private static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("C1等待接收消息处理时间较短"); DeliverCallback deliverCallback =( consumerTag, message)->{ SleepUtils.sleep(1); System.out.println("接收到消息:"+new String(message.getBody())); /** * 手动应答 * 1.消息标记tag * 2.是否批量应答 false:不批量应答信道中的消息 true:批量处理. */ channel.basicAck(message.getEnvelope().getDeliveryTag(),false); }; CancelCallback cancelCallback =consumerTag->{ System.out.println("消费者取消消费接口回调逻辑.."); }; boolean autoAck = false;//关闭自动应答,采用手动应答. channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback); } } public class Work04 { private static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("C2等待接收消息处理时间较长"); DeliverCallback deliverCallback =( consumerTag, message)->{ //沉睡30s SleepUtils.sleep(10); System.out.println("接收到消息:"+new String(message.getBody())); /** * 手动应答 * 1.消息标记tag * 2.是否批量应答 false:不批量应答信道中的消息 true:批量处理. */ channel.basicAck(message.getEnvelope().getDeliveryTag(),false); }; CancelCallback cancelCallback =consumerTag->{ System.out.println("消费者取消消费接口回调逻辑.."); }; boolean autoAck = false;//关闭自动应答,采用手动应答. channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback); } }
运行结果:
3.3. RabbitMQ 持久化
3.3.1. 概念
刚刚我们已经看到了如何处理任务不丢失的情况(生产者检测规则下的消费者是否活着,如果活着就让其处理,如果死了就让其他消费者处理),但是如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这做。确保消息不会丢失需要做两件事: 我们需要 将队列和消息都标记为持久 化
3.3.2. 队列如何实现持久化
之前我们创建的队列都是非持久化的,rabbitmq 如果重启的化,该队列就会被删除掉,如果要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化
boolean durable = true; channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);//消息持久化
这个时候即使重启 rabbitmq 队列也依然存在
3.3.3. 消息实现持久化
要想让消息实现持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添加这个属性
//设置生产者发送消息为持久化消息(要求保存到磁盘中) 默认是保存到内存中. channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要更强有力的持久化策略,参考后边课件发布确认章节。
3.3.4 不公平分发
在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是RabbitMQ 并不知道这种情况它依然很公平的进行分发。
为了避免这种情况,我们可以设置参数 channel.basicQos(1);
意思就是 如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略。
注:通过测试可以看到对于处理慢的消费者,其处理的任务量就少,处理快的,其处理的任务量就多,简而言之:能者多劳
3.3.5 预期值
本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息,另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区(用来存放已经消费但未手动确认的消息),因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos(x) 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量。 一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。
消息应答和 QoS 预取值对用户吞吐量有重大影响。通常,增加预取将提高向消费者传递消息的速度. 虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器).
应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的。
总结:
以上的意思就是说我们可以根据,每个消费者的情况给与处理任务的权重,比如消费者A prefetch=2; 消费者B prefetch = 5; 这里的数值就是预期值,也等于消费者在消费任务时队列中最多的任务数.
这样比如我一共有七个任务,那么就会有两个被A处理,5个被B处理. 即使A处理的效率高也是一样,因为我们人为的决定了预期值.