Linux下RabbitMQ安装与使用

简介: Linux下RabbitMQ安装与使用

下载安装

下载链接「RabbitMQ」等文件 https://www.aliyundrive.com/s/tPdkW7xHC2a

点击链接保存,或者复制本段内容,打开「阿里云盘」APP ,无需下载极速在线查看,视频原画倍速播放。

安装erlang

安装socat

安装rabbitmq

启动rabbitmq并查看进程

启动rabbitmq管理插件

宿主机就可以访问管理界面

默认 rabbitmq 给我们提供了一个guest的账户 ,密码也是guest

但是我们RabbitMQ是装在Linux上的,所以必须使用IP访问

所以需要使用命令再创建一个rabbitmq的管理员账户:

rabbitmqctl add_user root root

用户创建好之后,再给用户管理员的角色:

rabbitmqctl set_user_tags root administrator

然后使用root root就可以登录rabbitmq管理界面了

SpringBoot中使用

生产者和消费者都需要添加依赖

<!-- springboot rabbitmq(amqp) -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  • 生产者(生产消息)
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
  @Autowired
  private RabbitSender reRabbitSender;
  
  @Test
  public void testSender() throws Exception {
    Map<String, Object> properties = new HashMap<String, Object>();
    properties.put("attr1", "12345");
    properties.put("attr2", "abcde");
    reRabbitSender.send("hello rabbitmq!", properties);
  }
}
import java.util.Map;
import java.util.UUID;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class RabbitSender {
  @Autowired
  private RabbitTemplate rabbitTemplate;
  
  /**
   *  这里就是确认消息的回调监听接口,用于确认消息是否被broker所收到
   */
  final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
    /**
     *  @param correlationData 作为一个唯一的标识
     *  @param ack broker 是否落盘成功 
     *  @param cause 失败的一些异常信息
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
      System.err.println("消息ACK结果:" + ack + ", correlationData: " + correlationData.getId());
    }
  };
  
  /**
   *  对外发送消息的方法
   * @param message   具体的消息内容
   * @param properties  额外的附加属性
   * @throws Exception
   */
  public void send(Object message, Map<String, Object> properties) throws Exception {
    // 附加属性封装到MessageHeaders中
    MessageHeaders mhs = new MessageHeaders(properties);
    // 构造消息
    Message<?> msg = MessageBuilder.createMessage(message, mhs);
    // 设置回调函数
    rabbitTemplate.setConfirmCallback(confirmCallback);
    //  指定业务唯一的iD
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    
    MessagePostProcessor mpp = new MessagePostProcessor() {
      @Override
      public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message)
          throws AmqpException {
        System.err.println("---> post to do: " + message);
        return message;
      }
    };
    
    rabbitTemplate.convertAndSend("exchange-1",
        "springboot.rabbit", 
        msg, mpp, correlationData);
    
  }
  
}
  • 消费者(消费消息)
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
public class RabbitReceive {
  
  /**
   *  组合使用监听
   *  @RabbitListener @QueueBinding @Queue @Exchange
   * @param message
   * @param channel
   * @throws Exception
   */
  @RabbitListener(bindings = @QueueBinding(
          value = @Queue(value = "queue-1", durable = "true"),
          exchange = @Exchange(name = "exchange-1",
          durable = "true",
          type = "topic",
          ignoreDeclarationExceptions = "true"),
          key = "springboot.*"
        )
      )
  @RabbitHandler
  public void onMessage(Message message, Channel channel) throws Exception {
    //  1. 收到消息以后进行业务端消费处理
    System.err.println("-----------------------");
    System.err.println("消费消息:" + message.getPayload());
    //  2. 处理成功之后 获取deliveryTag 并进行手工的ACK操作, 因为我们配置文件里配置的是 手工签收
    //  spring.rabbitmq.listener.simple.acknowledge-mode=manual
    Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
    channel.basicAck(deliveryTag, false);
  }
  
}
  • 测试
    生产者每次生产一条消息, 消费者都能消费到
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6天前
|
NoSQL Linux 网络安全
Linux安装Redis(详细教程)
Linux安装Redis(详细教程)
45 2
|
6天前
|
IDE Linux 开发工具
Linux 系统上安装
在Linux和Mac上安装Lua 5.3.0只需下载源码,解压,编译和安装。Windows用户可选择SciTE IDE或通过LuaForWindows在Github或Google Code下载安装。创建 HelloWorld.lua,使用`lua HelloWorld.lua`运行显示&quot;Hello World!&quot;。另可参考LuaDist官方推荐方式安装。
|
6天前
|
NoSQL 关系型数据库 MySQL
涉及rocketMQ,jemeter等性能测试服务器的安装记录
涉及rocketMQ,jemeter等性能测试服务器的安装记录
22 1
|
6天前
|
弹性计算 分布式计算 Hadoop
Linux(阿里云)安装Hadoop(详细教程+避坑)
Linux(阿里云)安装Hadoop(详细教程+避坑)
27 3
|
6天前
|
监控 JavaScript 网络协议
Linux系统之安装uptime-kuma服务器监控面板
【5月更文挑战第12天】Linux系统之安装uptime-kuma服务器监控面板
19 0
|
6天前
|
Ubuntu Linux Shell
minio服务端以Linux服务形式安装
minio服务端以Linux服务形式安装
258 6
|
6天前
|
关系型数据库 MySQL Linux
Linux下安装MySQL
Linux下安装MySQL
22 0
|
6天前
|
安全 Linux 网络安全
【操作系统】实验一 Linux操作系统安装
【操作系统】实验一 Linux操作系统安装
16 3
|
6天前
|
消息中间件 Shell
rabbitmq安装erlang环境后没生效
rabbitmq安装erlang环境后没生效
771 7
|
6天前
|
Oracle 关系型数据库 Linux
SuSE linux server 11通过SAP来安装oracle11g
SuSE linux server 11通过SAP来安装oracle11g
10 0