深入剖析 rabbitMQ(六)

简介: 在上篇关于消息队列的文章中,我们对 rabbitMQ 有过初步的介绍,本篇将将带你深入剖析 rabbitMQ 内部结构和使用。

五、java客户端使用

RabbitMQ 支持多种语言访问,本次介绍 RabbitMQ Java Client 的一些简单的api使用,如声明 Exchange、Queue,发送消息,消费消息,一些高级 api 会在后面的文章中详细的说明。

5.1、引入 rabbitMQ 依赖包

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>

5.2、连接服务器

使用给定的参数(host name,端口等等)连接AMQP的服务器。

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();

也可以使用通过 URI 方式进行连接。

ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();

Connection(连接)接口可以被用作创建一个channel(管道),利用 channel(管道)可以进行发送和接收消息,在后面我们会频繁使用到它。

Channel channel = conn.createChannel();

注意,管道使用之后,需要进行关闭。

channel.close();
conn.close();

5.3、创建交换器

不仅可以通过 web页面进行创建交换器,还可以通过代码进行声明(创建的意思)交换器。

//创建exchange,类型是direct类型
channel.exchangeDeclare("ex-hello","direct");
//第三个参数表示是否持久化,同步操作,有返回值
AMQP.Exchange.DeclareOk ok = channel.exchangeDeclare("ex-hello","direct",true);
System.out.println(ok);
//创建带属性的交换器
Map<String,Object> argument = new HashMap<>();
argument.put("alternate-exchange","log");
channel.exchangeDeclare("ex-hello","direct",true,false,argument);
//异步创建exchange,没有返回值
channel.exchangeDeclareNoWait("ex-hello","direct",true,false,false,argument);
///判断exchange是否存在,存在的返回ok,不存在的exchange则报错
AMQP.Exchange.DeclareOk declareOk = channel.exchangeDeclarePassive("ex-hello");
System.out.println(declareOk);
//删除exchange(可重复执行),删除一个不存在的也不会报错
channel.exchangeDelete("ex-hello");

创建交换器参数解读

  • 第一个参数:表示交换器名称
  • 第二个参数:表示交换器类型
  • 第三个参数:表示是否持久化,为true表示会将队列持久化存储到硬盘
  • 第四个参数:表示是否自动删除,当最后一个绑定(队列或者exchange)被unbind之后,该exchange 自动被删除
  • 第五个参数:表示设置参数,参数类型为Map

5.4、创建队列

同样的,也可以通过代码进行声明队列。

//同步创建队列
channel.queueDeclare(queueName, true, false, false, null);
//异步创建队列没有返回值
channel.queueDeclareNoWait(queueName,true,false,false,null);
//判断queue是否存在,不存在会抛出异常
channel.exchangeDeclarePassive(queueName);
//删除队列
channel.queueDelete(queueName);

创建队列参数解读

  • 第一个参数:表示队列名称
  • 第二个参数:表示是否持久化,为true表示会将队列持久化存储到硬盘
  • 第三个参数:表示是否排它性,为true表示只对首次声明它的连接可见,会在其连接断开的时候自动删除
  • 第四个参数:表示是否自动删除,为true表示有过消费者并且所有消费者都解除订阅了,自动删除队列
  • 第五个参数:表示设置参数,参数类型为Map

5.5、创建绑定

当交换器和队列都创建成功之后,就可以建立绑定关系。

//交换器和队列进行绑定(可重复执行,不会重复创建)
channel.queueBind(queueName, exchangeName, routingKey);
//异步进行绑定,最后一个参数表示可以带自定义参数
channel.queueBindNoWait(queueName,exchangeName,routingKey,null);
//exchange和queue进行解绑(可重复执行)
channel.queueUnbind(queueName, exchangeName, routingKey);
//exchange与exchange进行绑定(可重复执行,不会重复创建)
//第一个参数表示目标交换器
//第二个参数表示原地址交换器
//第三个参数表绑定路由key
channel.exchangeBind(destination,source,routingKey);
//exchange和exchange进行解绑(可重复执行)
channel.exchangeUnbind(destination,source,routingKey);

绑定关系参数解读

  • queueName:队列名称,取自创建的队列名称
  • exchangeName:交换器,取自创建的交换器名称
  • routingKey:路由键key,自定义

5.6、发送消息

发送消息到交换器就会使用我们上文所提到的channel管道。

//发送的消息内容
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

也可以在发送消息前设定一些消息属性。

//自己构建BasicProperties的对象
channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .contentType("text/plain")
               .deliveryMode(2)
               .priority(1)
               .userId("zhangsan")
               .build()),
               messageBodyBytes);

发送指定头信息的消息。

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("userName",  '"zhangsan');
headers.put("userCode", "123");
//发送消息到交换器
channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .headers(headers)
               .build()),
               messageBodyBytes);

发送一个有过期时间的消息,单位:ms。

//设置消息过期时间,单位ms
channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .expiration("6000")
               .build()),
               messageBodyBytes);

更多用法,可以参见官方 API

5.7、接受消息

从消息队列中接受消息也会使用我们上文所提到的channel管道。

//监听队列中的消息
channel.basicConsume(queueName,true,new SimpleConsumer(channel));

监听队列消息参数解读

  • 第一个参数:表示需要监听的队列名称
  • 第二个参数:表示是否自动确认,如果配置false表示手动确认消息是否收到
  • 第三个参数:表示消息处理类

具体的消息处理类需要继承DefaultConsumer,并重写handleDelivery方法,代码如下:

public class SimpleConsumer extends DefaultConsumer{
    public SimpleConsumer(Channel channel){
        super(channel);
    }
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  //接受从队列中发送的消息
        System.out.println(consumerTag);
        System.out.println("-----收到消息了---------------");
        System.out.println("消息属性为:"+properties);
        System.out.println("消息内容为:"+new String(body));
    }
}

如果是手工确认消息,需要在handleDelivery方法中进行相关的确认,代码如下:

//手动确认
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);

5.8、完整demo

5.8.1、发送消息
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
        //连接RabbitMQ服务器
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setHost("197.168.24.206");
        factory.setPort(5672);
        //创建一个连接
        Connection conn = factory.newConnection();
        //获得信道
        Channel channel = conn.createChannel();
        //声明交换器
        channel.exchangeDeclare("ex-hello","direct");
        //发送的消息内容
        byte[] messageBodyBytes = "Hello, world!".getBytes();
        channel.basicPublish("ex-hello", "route-hello", null, messageBodyBytes);
        //关闭通道
        channel.close();
        conn.close();
    }
}
5.8.2、接受消息
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //连接RabbitMQ服务器
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setHost("197.168.24.206");
        factory.setPort(5672);
        //创建一个连接
        Connection conn = factory.newConnection();
        //获得信道
        Channel channel = conn.createChannel();
        //声明队列
        channel.queueDeclare("queue-hello", true, false, false, null);
        //声明绑定
        channel.queueBind("queue-hello", "ex-hello", "route-hello");
        //监听队列中的消息
        channel.basicConsume("queue-hello",true,new SimpleConsumer(channel));
        TimeUnit.SECONDS.sleep(10);
        channel.close();
        conn.close();
    }
}

消息处理类SimpleConsumer

public class SimpleConsumer extends DefaultConsumer {
    public SimpleConsumer(Channel channel) {
        super(channel);
    }
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        //接受从队列中发送的消息
        System.out.println(consumerTag);
        System.out.println("-----收到消息了---------------");
        System.out.println("消息属性为:"+properties);
        System.out.println("消息内容为:"+new String(body));
    }
}

消息发送成功之后,启动消费者,输出结果如下:

113.jpg

六、总结

整篇文章主要介绍了 RabbitMQ 内部结构、安装步骤、使用教程,以及 java 客户端使用等内容,内容比较长,限于笔者的才疏学浅,对本文内容可能还有理解不到位的地方,如有阐述不合理之处还望留言一起探讨。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 存储 负载均衡
|
22天前
|
消息中间件 大数据 Java
RabbitMQ
RabbitMQ
63 1
|
23天前
|
消息中间件 存储 网络协议
精通 RabbitMQ 系列 02
精通 RabbitMQ 系列 02
25 0
|
7月前
|
消息中间件 存储 数据库
RabbitMQ特殊应用
RabbitMQ特殊应用
35 0
|
9月前
|
消息中间件 存储
RabbitMq
RabbitMq
69 0
|
10月前
|
消息中间件 存储 网络协议
rabbitmq的介绍
rabbitMQ是一个开源的AMQP实现的消息队列中间件,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、C、 用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不错,与SpringAMQP完美的整合、API丰富易用。
|
10月前
|
消息中间件 存储 缓存
RabbitMQ到底为什么要使用它?
在多服务体系架构中,必然存在着多个服务之间的调用关系,当用户提交了订单,订单服务会调用支付服务执行用户的金钱操作,执行完毕之后紧接着调用商品服务对商家的商品信息(库存、成交量、收入等)进行更新,执行完毕之后又调用物流服务
|
11月前
|
消息中间件 存储 JSON
关于RabbitMQ
MQ是一种应用程序键一步通讯的技术,MQ是消息队列的缩写(Message Queue) 在MQ中,消息由一个应用程序发送到一个称为队列的中间件中,接着被中间件存储,并最终被另一个或多个消费者应用程序读取和处理; MQ组成:消息——生产者——队列——中间件——消费者!
55 0
|
11月前
|
消息中间件 存储 Java
RabbitMq使用
RabbitMq使用
125 0
|
11月前
|
消息中间件 网络协议 Java
RabbitMQ(3)
RabbitMQ(3)

相关实验场景

更多