安装
1. 安装RabbitMq所需的环境
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc- c++ kernel-devel m4 ncurses-devel tk tc xz -y
2. 下载安装包
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
3.安装
- 安装erlang语言环境
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
- 安装socat加解密软件
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
- 安装rabbitmq
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
默认安装到了/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5 目录下
4. 修改配置
- 修改rabbit.app配置
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app # 将<<guest>> 修改为 guest , 否则只能通过localhost访问 {loopback_users, [<<"guest">>]} -> {loopback_users, ["guest"]}
- 修改本机系统文件
vim /etc/rabbitmq/rabbitmq-env.conf #添加 NODENAME=rabbit
5. 验证服务器是否可用
启动服务
[root@centos7lzj rabbitmq]# rabbitmq-server start & [1] 14460 [root@centos7lzj rabbitmq]# RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc. ## ## Licensed under the MPL. See http://www.rabbitmq.com/ ## ## ########## Logs: /var/log/rabbitmq/rabbit.log ###### ## /var/log/rabbitmq/rabbit-sasl.log ########## Starting broker... completed with 0 plugins. [root@centos7lzj rabbitmq]# lsof -i:5672 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME beam 14559 rabbitmq 48u IPv6 45280 0t0 TCP *:amqp (LISTEN)
- 执行管控台插件,可以浏览器管理端控制
[root@centos7lzj rabbitmq]# rabbitmq-plugins enable rabbitmq_management The following plugins have been enabled: mochiweb webmachine rabbitmq_web_dispatch amqp_client rabbitmq_management_agent rabbitmq_management Applying plugin configuration to rabbit@centos7lzj... started 6 plugins.
- 6. 在浏览器中访问
ip:15672 如: 172.20.140.111:15672
账号:guest
密码:guest
常用命令
- 启动服务: rabbitmqctl start_app
- 关闭服务:rabbitmqctl stop_app
代码Demo
由于所有demo都要创建连接,自定义一个获取连接的工厂类
public class MyConnectionFactory { public static Connection connection(){ //username 和 password 可在管理端自行创建 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("172.20.140.111"); factory.setPort(5672); factory.setUsername("lzj"); factory.setPassword("123456"); try { return factory.newConnection(); } catch (IOException | TimeoutException e) { e.printStackTrace(); throw new RuntimeException("创建连接失败: "+e.getMessage()); } } }
Quick-Start
- 消费者
import com.my.rabbit.MyConnectionFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import java.nio.charset.StandardCharsets; /** * 消费者 * * @author Zijian Liao * @since 1.0.0 */ public class QuickStartConsumer { private final static String QUEUE_NAME = "hello2"; public static void main(String[] argv) throws Exception { Connection connection = MyConnectionFactory.connection(); Channel channel = connection.createChannel(); /** * durable:是否持久化,rabbitmq的队列是放在内存当中的,重启之后队列将会丢失,为true则会将队列持久化到磁盘上 * 注意:该参数只是判断队列持久化与否,与消息无关,虽然队列持久化了,但消息是不会持久化的,若队列中存在消息,重启之后消息会丢失 * 若想消息持久化,可以使用自定义消息,见Message的demo * exclusive: 是否独占,为true表示该队列只能有一个消费者消费,若有第二个消费者再次声明,将会抛出异常 * 为false时可以有多个消费者一起消费,每条消息只能由一个消费者消费,默认是轮询的方式,一个消费者一条 * 注意:开启此参数后(true),消费者断开队列将自动删除,durable的效果被无效 * autoDelete:是否自动删除,为true表示当最后一个消费者连接断开之后自动删除,durable的效果被无效 */ channel.queueDeclare(QUEUE_NAME, true, false, true, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME,true,queueingConsumer); while (true) { QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); } } }
- 生产者
import com.my.rabbit.MyConnectionFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 生产者 * * @author Zijian Liao * @since 1.0.0 */ public class QuickStartProducer { static String QUEUE_NAME = "hello1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = MyConnectionFactory.connection(); Channel channel = connection.createChannel(); String message = "Hello World!"; for (int i = 0; i < 5; i++) { channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); } System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
直接交换机
- 消费者
import com.my.rabbit.MyConnectionFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import java.io.IOException; import java.nio.charset.StandardCharsets; /** * 直接交换机-消费者 * * @author Zijian Liao * @since 1.0.0 */ public class DirectConsumer { static String QUEUE_NAME = "hello-direct"; static String EXCHANGE_NAME = "my-direct"; static String EXCHANGE_TYPE = "direct"; static String ROUTING_KEY = "direct-key"; public static void main(String[] args) throws IOException, InterruptedException { Connection connection = MyConnectionFactory.connection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true, false, false, null); channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME,true,queueingConsumer); while (true){ QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); } } }
- 生产者
import com.my.rabbit.MyConnectionFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 直接交换机-生产者 * * @author Zijian Liao * @since 1.0.0 */ public class DirectProducer { /** * routingKey必须与消费者中声明的相同,否则消息无法正确发送 */ static String ROUTING_KEY = "direct-key"; static String EXCHANGE_NAME = "my-direct"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = MyConnectionFactory.connection(); Channel channel = connection.createChannel(); String message = "Hello Direct!"; channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
Topic交换机
- 生产者
import com.my.rabbit.MyConnectionFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import java.io.IOException; import java.nio.charset.StandardCharsets; /** * Topic交换机-消费者 * * @author Zijian Liao * @since 1.0.0 */ public class TopicConsumer { static String QUEUE_NAME = "hello-topic"; static String EXCHANGE_NAME = "my-topic"; static String EXCHANGE_TYPE = "topic"; /** * * :匹配一个单词 如 topic-key.* 可以匹配 topic-key.1 topic-key.a topic-key.a-b,但不能匹配 topic-key.a.b * # :匹配多个单词 如 topic-key.# 可以匹配 topic-key.1 也可以匹配 topic-key.a.b topic-key.a.b.c * 也可以这样玩,放到前面 #.key 匹配 1.key 2.key a.b.key 或者放到中间 */ static String ROUTING_KEY = "topic-key.*"; public static void main(String[] args) throws IOException, InterruptedException { Connection connection = MyConnectionFactory.connection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true, false, false, null); channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); //如果有多个队列绑定了该交换机,则生产者发消息时,每个队列都会接到消息 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME,true,queueingConsumer); while (true){ QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); } } }
- 生产者
import com.my.rabbit.MyConnectionFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Topic交换机-生产者 * * @author Zijian Liao * @since 1.0.0 */ public class TopicProducer { static String ROUTING_KEY = "topic-key.a"; static String EXCHANGE_NAME = "my-topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = MyConnectionFactory.connection(); Channel channel = connection.createChannel(); String message = "Hello topic!"; channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
扇形交换机
- 消费者
import com.my.rabbit.MyConnectionFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import java.io.IOException; import java.nio.charset.StandardCharsets; /** * 扇形交换机-消费者 * * @author Zijian Liao * @since 1.0.0 */ public class FanoutConsumer { static String QUEUE_NAME = "hello-fanout"; static String EXCHANGE_NAME = "my-fanout"; static String EXCHANGE_TYPE = "fanout"; /** * 扇形交换机routingKey无效 */ static String ROUTING_KEY = "fanout-key"; public static void main(String[] args) throws IOException, InterruptedException { Connection connection = MyConnectionFactory.connection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true, false, false, null); channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME,true,queueingConsumer); while (true){ QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); } } }
- 生产者
import com.my.rabbit.MyConnectionFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 扇形交换机-生产者 * * @author Zijian Liao * @since 1.0.0 */ public class FanoutProducer { /** * 扇形交换机routingKey无效 */ static String ROUTING_KEY = "everything"; static String EXCHANGE_NAME = "my-fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = MyConnectionFactory.connection(); Channel channel = connection.createChannel(); String message = "Hello fanout!"; channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
自定义消息
- 消费者
import com.my.rabbit.MyConnectionFactory; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import java.io.IOException; import java.nio.charset.StandardCharsets; /** * 自定义消息属性-消费者, 这里直接把topic的demo拿过来 * * @author Zijian Liao * @since 1.0.0 */ public class MessageConsumer { static String QUEUE_NAME = "hello-message1"; static String EXCHANGE_NAME = "my-message"; static String EXCHANGE_TYPE = "topic"; /** * * :匹配一个单词 如 topic-key.* 可以匹配 topic-key.1 topic-key.a topic-key.a-b,但不能匹配 topic-key.a.b * # :匹配多个单词 如 topic-key.# 可以匹配 topic-key.1 也可以匹配 topic-key.a.b topic-key.a.b.c * 也可以这样玩,放到前面 #.key 匹配 1.key 2.key a.b.key 或者放到中间 */ static String ROUTING_KEY = "message-key.*"; public static void main(String[] args) throws IOException, InterruptedException { Connection connection = MyConnectionFactory.connection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true, false, false, null); channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME,true,queueingConsumer); while (true){ QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); AMQP.BasicProperties properties = delivery.getProperties(); String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); System.out.println("properties: " + properties.toString()); } } }
- 生产者
import com.my.rabbit.MyConnectionFactory; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeoutException; /** * 自定义消息属性-生产者 * * @author Zijian Liao * @since 1.0.0 */ public class MessageProducer { static String ROUTING_KEY = "message-key.a"; static String EXCHANGE_NAME = "my-message"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = MyConnectionFactory.connection(); Channel channel = connection.createChannel(); String message = "Hello topic!"; Map<String, Object> headers = new HashMap<>(2,1); headers.put("name","jack"); headers.put("age",20); /** * deliveryMode 1.不持久化 2.持久化 若队列没有持久化,消息持久化无意义,重启队列都没了 * expiration 过期时间 单位毫秒 使用该参数后则持久化无效 * contentEncoding 消息内容编码 * messageId 消息id * correlationId 关联id * headers 消息头 */ AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder() .deliveryMode(2) // .expiration("10000") .contentEncoding("UTF-8") .messageId(UUID.randomUUID().toString()) .correlationId(UUID.randomUUID().toString()) .headers(headers) .build(); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, basicProperties, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }