HelloWord
在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列-RabbitMO.代表使用者保留的消息缓冲区
第一步:导入依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.yc</groupId> <artifactId>rabbitmq-hello</artifactId> <version>1.0-SNAPSHOT</version> <!--指定jdk编译版本--> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <!--rabbitmq依赖客户端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.0.0</version> </dependency> <!--操作文件源的一个依赖--> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies> </project>
第二步:创建生产者
//生产者:发消息 public class Producer { //队列名称 public static final String QUEUE_NAME = "hello"; //发消息 public static void main(String[] args) throws IOException, TimeoutException { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //工厂IP 连接RabbitMQ的队列 factory.setHost("192.168.80.128"); //用户名 factory.setUsername("admin"); //密码 factory.setPassword("123"); //创建连接 Connection connection = factory.newConnection(); //获取信道 Channel channel = connection.createChannel(); //生成一个队列 /* * 1.队列名称 * 2.队列里面的消息是否持久化(磁盘)默认情况消息存储在内存中 * 3.该队列是否只供一个消费者进行消费,是否进行消息共享,true可以多个消费者消费false:只能一个消费者消费 *4.是否自动剧除最后一个消贫者端开连接以后该队列是否自动鹏除 true自动鹏除false不自动翮除 * 5.其它参数*/ channel.queueDeclare(QUEUE_NAME,false,false,false,null); //发消息 String message = "hello world";//初次使用 /*发送一个消息 * 1.发送到哪个交换机 * 2.路由的Key值是哪个,本次是队列的名称 * 3.其它参数信息 * 4.发送消息的消息体*/ channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("消息发送完毕"); } }
第三步:创建消费者
//消费者 接收消息的 public class Consumer { //队列名称 public static final String QUEUE_NAME="hello"; //接收消息 public static void main(String[] args) throws IOException, TimeoutException { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.80.128"); factory.setUsername("admin"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明 DeliverCallback deliverCallback=(consumerTag, message)->{ System.out.println(new String(message.getBody())); }; //取消消息时的回调 CancelCallback cancelCallback = consumerTag->{ System.out.println("消费消息被中断"); }; //消费者消费消息 //1.消费哪个队列 //2.消费成功之后是否要自动应答 //3.消费者未成功消费的回调 //4.消费者取消消费的回调 channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
工作队列
因为你为了确保同一条消息被其中一个工作线程接收到了之后,其它工作就不能消费的到了
三者之间的关系必须是竞争的关系
因为
这部分代码来来回回都是重复的,所以我们可以抽取连接工厂工具类
public class RabbitMqUtils { //得到一个连接的channel public static Channel getChannel() throws Exception{ //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.80.128"); factory.setUsername("admin"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
工作线程代码
//这是一个工作线程(相当于之前的消费者) public class Worker01 { //队列的名称 public static final String QUEUE_NAME = "hello"; //接收消息 public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback=(consumerTag, message)->{ System.out.println(new String(message.getBody())); }; //取消消息时的回调 CancelCallback cancelCallback = consumerTag->{ System.out.println("消费消息被中断"); }; //消息的接收 channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
启动两个工作线程
工作队列(生产者代码)
public class Task01 { //队列名称 public static final String QUEUE_NAME = "hello"; //发送大量消息 public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //从控制台当中接受信息 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("发送消息完成"); } } }
工作队列(结果成功)
消息应答
我们都知道消费者它完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况?
为了保证消息在发送过程中不丢失,rabbitmq_引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉rabbitmq它已经处理了,rabbitmq可以把该消息删除了。
自动应答
消息发送后,立即被认为已经传送成功了,这种模式需要在高吞吐量和数据传输安全性方面做权衡,使得内存耗尽,最终这些消费者线程被操作系统杀死,这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
手动消息应答
A.Channel. basicAck(用于肯定确认)
RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃了
B.Channel. basicNack(用于否定确认)
C.Channel. basicReject(用于否定确认)
与Channel. basicNack相比少一个参数不处理该消息了直接拒绝,可以将其丢弃了
multiple的解释
multiple 的 true和 false 代表不同意思
rue 代表批量应答channel上未应答的消息
比如说channel上有传送tag 的消息5,6,7,8 当前tag是8那么此时5-8的这些还未应答的消息都会被确认收到消息应答
false同上面相比
只会应答 tag=8的消息5,6,7这三个消息依然不会被确认收到消息应答