主要讲述RabbitMQ环境部署,以及简单的测试用例。
前言
我于4月份写过消息队列系列文章,当时只讲解了消息队列的选型,以及RabbitMQ、Kafka、RocketMQ的基本原理,现在选择RabbitMQ进行实战方面的讲解,其实主要是为了将之前的“债”给还上。
RabbitMQ环境搭建
因为我用的是Mac,所以直接可以参考官网:
需要注意的是,一定需要先执行:
brew update
然后再执行:
brew install rabbitmq
之前没有执行brew update,直接执行brew install rabbitmq时,会报各种各样奇怪的错误,其中“403 Forbidde”居多。
但是在执行“brew install rabbitmq”,会自动安装其它的程序,如果你使用源码安装Rabbitmq,因为启动该服务依赖erlang环境,所以你还需手动安装erlang,但是目前官方已经一键给你搞定,会自动安装Rabbitmq依赖的所有程序,是不是很棒!
最后执行成功的输出如下:
启动服务:
# 启动方式1:后台启动 brew services start rabbitmq # 启动方式2:当前窗口启动 cd /usr/local/Cellar/rabbitmq/3.8.19 rabbitmq-server
在浏览器输入:
http://localhost:15672/
会出现RabbitMQ后台管理界面(用户名和密码都为guest):
通过brew安装,一行命令搞定,真香!
RabbitMQ测试
添加账号
首先得启动mq
## 添加账号 ./rabbitmqctl add_user admin admin ## 添加访问权限 ./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" ## 设置超级权限 ./rabbitmqctl set_user_tags admin administrator
编码实测
因为代码中引入了java 8的特性,pom引入依赖:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.1</version> </dependency> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins>
开始写代码:
public class RabbitMqTest { //消息队列名称 private final static String QUEUE_NAME = "hello"; @Test public void send() throws java.io.IOException, TimeoutException { //创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); //创建连接 Connection connection = factory.newConnection(); //创建消息通道 Channel channel = connection.createChannel(); //生成一个消息队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); for (int i = 0; i < 10; i++) { String message = "Hello World RabbitMQ count: " + i; //发布消息,第一个参数表示路由(Exchange名称),为""则表示使用默认消息路由 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } //关闭消息通道和连接 channel.close(); connection.close(); } @Test public void consumer() throws java.io.IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); //创建连接 Connection connection = factory.newConnection(); //创建消息信道 final Channel channel = connection.createChannel(); //消息队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println("[*] Waiting for message. To exist press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {}); } }
执行send()后控制台输出:
[x] Sent 'Hello World RabbitMQ count: 0' [x] Sent 'Hello World RabbitMQ count: 1' [x] Sent 'Hello World RabbitMQ count: 2' [x] Sent 'Hello World RabbitMQ count: 3' [x] Sent 'Hello World RabbitMQ count: 4' [x] Sent 'Hello World RabbitMQ count: 5' [x] Sent 'Hello World RabbitMQ count: 6' [x] Sent 'Hello World RabbitMQ count: 7' [x] Sent 'Hello World RabbitMQ count: 8' [x] Sent 'Hello World RabbitMQ count: 9'
执行consumer()后:
示例中的代码讲解,可以直接参考官网:https://www.rabbitmq.com/tutorials/tutorial-one-java.html