Linux下RabbitMQ安装与使用

简介: Linux下RabbitMQ安装与使用

下载安装

下载链接「RabbitMQ」等文件 https://www.aliyundrive.com/s/tPdkW7xHC2a

点击链接保存,或者复制本段内容,打开「阿里云盘」APP ,无需下载极速在线查看,视频原画倍速播放。

安装erlang

安装socat

安装rabbitmq

启动rabbitmq并查看进程

启动rabbitmq管理插件

宿主机就可以访问管理界面

默认 rabbitmq 给我们提供了一个guest的账户 ,密码也是guest

但是我们RabbitMQ是装在Linux上的,所以必须使用IP访问

所以需要使用命令再创建一个rabbitmq的管理员账户:

rabbitmqctl add_user root root

用户创建好之后,再给用户管理员的角色:

rabbitmqctl set_user_tags root administrator

然后使用root root就可以登录rabbitmq管理界面了

SpringBoot中使用

生产者和消费者都需要添加依赖

<!-- springboot rabbitmq(amqp) -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  • 生产者(生产消息)
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
  @Autowired
  private RabbitSender reRabbitSender;
  
  @Test
  public void testSender() throws Exception {
    Map<String, Object> properties = new HashMap<String, Object>();
    properties.put("attr1", "12345");
    properties.put("attr2", "abcde");
    reRabbitSender.send("hello rabbitmq!", properties);
  }
}
import java.util.Map;
import java.util.UUID;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class RabbitSender {
  @Autowired
  private RabbitTemplate rabbitTemplate;
  
  /**
   *  这里就是确认消息的回调监听接口,用于确认消息是否被broker所收到
   */
  final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
    /**
     *  @param correlationData 作为一个唯一的标识
     *  @param ack broker 是否落盘成功 
     *  @param cause 失败的一些异常信息
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
      System.err.println("消息ACK结果:" + ack + ", correlationData: " + correlationData.getId());
    }
  };
  
  /**
   *  对外发送消息的方法
   * @param message   具体的消息内容
   * @param properties  额外的附加属性
   * @throws Exception
   */
  public void send(Object message, Map<String, Object> properties) throws Exception {
    // 附加属性封装到MessageHeaders中
    MessageHeaders mhs = new MessageHeaders(properties);
    // 构造消息
    Message<?> msg = MessageBuilder.createMessage(message, mhs);
    // 设置回调函数
    rabbitTemplate.setConfirmCallback(confirmCallback);
    //  指定业务唯一的iD
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    
    MessagePostProcessor mpp = new MessagePostProcessor() {
      @Override
      public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message)
          throws AmqpException {
        System.err.println("---> post to do: " + message);
        return message;
      }
    };
    
    rabbitTemplate.convertAndSend("exchange-1",
        "springboot.rabbit", 
        msg, mpp, correlationData);
    
  }
  
}
  • 消费者(消费消息)
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
public class RabbitReceive {
  
  /**
   *  组合使用监听
   *  @RabbitListener @QueueBinding @Queue @Exchange
   * @param message
   * @param channel
   * @throws Exception
   */
  @RabbitListener(bindings = @QueueBinding(
          value = @Queue(value = "queue-1", durable = "true"),
          exchange = @Exchange(name = "exchange-1",
          durable = "true",
          type = "topic",
          ignoreDeclarationExceptions = "true"),
          key = "springboot.*"
        )
      )
  @RabbitHandler
  public void onMessage(Message message, Channel channel) throws Exception {
    //  1. 收到消息以后进行业务端消费处理
    System.err.println("-----------------------");
    System.err.println("消费消息:" + message.getPayload());
    //  2. 处理成功之后 获取deliveryTag 并进行手工的ACK操作, 因为我们配置文件里配置的是 手工签收
    //  spring.rabbitmq.listener.simple.acknowledge-mode=manual
    Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
    channel.basicAck(deliveryTag, false);
  }
  
}
  • 测试
    生产者每次生产一条消息, 消费者都能消费到
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4天前
|
Linux 开发工具 C语言
Linux 安装 gcc 编译运行 C程序
Linux 安装 gcc 编译运行 C程序
23 0
|
5天前
|
Ubuntu Linux Python
Linux(15)Ubuntu安装ninja构建工具
Linux(15)Ubuntu安装ninja构建工具
15 0
|
7天前
|
NoSQL Linux 测试技术
Redis的安装(Linux版)
Redis的安装(Linux版)
151 1
|
16天前
|
消息中间件 Java Linux
RocketMQ的下载与安装(全网最细保姆级别教学)
RocketMQ的下载与安装(全网最细保姆级别教学)
74 0
|
17天前
|
缓存 Linux 测试技术
安装【银河麒麟V10】linux系统--并挂载镜像
安装【银河麒麟V10】linux系统--并挂载镜像
90 0
|
17天前
|
Linux C语言
linux yum安装ffmpeg 图文详解
linux yum安装ffmpeg 图文详解
39 0
|
17天前
|
NoSQL Linux Redis
linux 下和win下安装redis 并添加开机自启 图文详解
linux 下和win下安装redis 并添加开机自启 图文详解
17 0
|
17天前
|
Linux
linux yum 安装rar和unrar
linux yum 安装rar和unrar
57 0
|
2天前
|
关系型数据库 MySQL Java
Linux 安装 JDK、MySQL、Tomcat(图文并茂)
Linux 安装 JDK、MySQL、Tomcat(图文并茂)
15 2
|
2天前
|
负载均衡 Java 应用服务中间件
nginx安装在linux上
nginx安装在linux上
23 2