五、java客户端使用
RabbitMQ 支持多种语言访问,本次介绍 RabbitMQ Java Client 的一些简单的api使用,如声明 Exchange、Queue,发送消息,消费消息,一些高级 api 会在后面的文章中详细的说明。
5.1、引入 rabbitMQ 依赖包
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency>
5.2、连接服务器
使用给定的参数(host name,端口等等)连接AMQP的服务器。
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); Connection conn = factory.newConnection();
也可以使用通过 URI 方式进行连接。
ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost"); Connection conn = factory.newConnection();
Connection(连接)接口可以被用作创建一个channel(管道),利用 channel(管道)可以进行发送和接收消息,在后面我们会频繁使用到它。
Channel channel = conn.createChannel();
注意,管道使用之后,需要进行关闭。
channel.close(); conn.close();
5.3、创建交换器
不仅可以通过 web页面进行创建交换器,还可以通过代码进行声明(创建的意思)交换器。
//创建exchange,类型是direct类型 channel.exchangeDeclare("ex-hello","direct"); //第三个参数表示是否持久化,同步操作,有返回值 AMQP.Exchange.DeclareOk ok = channel.exchangeDeclare("ex-hello","direct",true); System.out.println(ok); //创建带属性的交换器 Map<String,Object> argument = new HashMap<>(); argument.put("alternate-exchange","log"); channel.exchangeDeclare("ex-hello","direct",true,false,argument); //异步创建exchange,没有返回值 channel.exchangeDeclareNoWait("ex-hello","direct",true,false,false,argument); ///判断exchange是否存在,存在的返回ok,不存在的exchange则报错 AMQP.Exchange.DeclareOk declareOk = channel.exchangeDeclarePassive("ex-hello"); System.out.println(declareOk); //删除exchange(可重复执行),删除一个不存在的也不会报错 channel.exchangeDelete("ex-hello");
创建交换器参数解读:
- 第一个参数:表示交换器名称
- 第二个参数:表示交换器类型
- 第三个参数:表示是否持久化,为true表示会将队列持久化存储到硬盘
- 第四个参数:表示是否自动删除,当最后一个绑定(队列或者exchange)被unbind之后,该exchange 自动被删除
- 第五个参数:表示设置参数,参数类型为
Map
5.4、创建队列
同样的,也可以通过代码进行声明队列。
//同步创建队列 channel.queueDeclare(queueName, true, false, false, null); //异步创建队列没有返回值 channel.queueDeclareNoWait(queueName,true,false,false,null); //判断queue是否存在,不存在会抛出异常 channel.exchangeDeclarePassive(queueName); //删除队列 channel.queueDelete(queueName);
创建队列参数解读:
- 第一个参数:表示队列名称
- 第二个参数:表示是否持久化,为true表示会将队列持久化存储到硬盘
- 第三个参数:表示是否排它性,为true表示只对首次声明它的连接可见,会在其连接断开的时候自动删除
- 第四个参数:表示是否自动删除,为true表示有过消费者并且所有消费者都解除订阅了,自动删除队列
- 第五个参数:表示设置参数,参数类型为
Map
5.5、创建绑定
当交换器和队列都创建成功之后,就可以建立绑定关系。
//交换器和队列进行绑定(可重复执行,不会重复创建) channel.queueBind(queueName, exchangeName, routingKey); //异步进行绑定,最后一个参数表示可以带自定义参数 channel.queueBindNoWait(queueName,exchangeName,routingKey,null); //exchange和queue进行解绑(可重复执行) channel.queueUnbind(queueName, exchangeName, routingKey); //exchange与exchange进行绑定(可重复执行,不会重复创建) //第一个参数表示目标交换器 //第二个参数表示原地址交换器 //第三个参数表绑定路由key channel.exchangeBind(destination,source,routingKey); //exchange和exchange进行解绑(可重复执行) channel.exchangeUnbind(destination,source,routingKey);
绑定关系参数解读:
- queueName:队列名称,取自创建的队列名称
- exchangeName:交换器,取自创建的交换器名称
- routingKey:路由键key,自定义
5.6、发送消息
发送消息到交换器就会使用我们上文所提到的channel
管道。
//发送的消息内容 byte[] messageBodyBytes = "Hello, world!".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
也可以在发送消息前设定一些消息属性。
//自己构建BasicProperties的对象 channel.basicPublish(exchangeName, routingKey, new AMQP.BasicProperties.Builder() .contentType("text/plain") .deliveryMode(2) .priority(1) .userId("zhangsan") .build()), messageBodyBytes);
发送指定头信息的消息。
Map<String, Object> headers = new HashMap<String, Object>(); headers.put("userName", '"zhangsan'); headers.put("userCode", "123"); //发送消息到交换器 channel.basicPublish(exchangeName, routingKey, new AMQP.BasicProperties.Builder() .headers(headers) .build()), messageBodyBytes);
发送一个有过期时间的消息,单位:ms。
//设置消息过期时间,单位ms channel.basicPublish(exchangeName, routingKey, new AMQP.BasicProperties.Builder() .expiration("6000") .build()), messageBodyBytes);
更多用法,可以参见官方 API
5.7、接受消息
从消息队列中接受消息也会使用我们上文所提到的channel
管道。
//监听队列中的消息 channel.basicConsume(queueName,true,new SimpleConsumer(channel));
监听队列消息参数解读:
- 第一个参数:表示需要监听的队列名称
- 第二个参数:表示是否自动确认,如果配置false表示手动确认消息是否收到
- 第三个参数:表示消息处理类
具体的消息处理类需要继承DefaultConsumer
,并重写handleDelivery
方法,代码如下:
public class SimpleConsumer extends DefaultConsumer{ public SimpleConsumer(Channel channel){ super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //接受从队列中发送的消息 System.out.println(consumerTag); System.out.println("-----收到消息了---------------"); System.out.println("消息属性为:"+properties); System.out.println("消息内容为:"+new String(body)); } }
如果是手工确认消息,需要在handleDelivery
方法中进行相关的确认,代码如下:
//手动确认 long deliveryTag = envelope.getDeliveryTag(); channel.basicAck(deliveryTag, false);
5.8、完整demo
5.8.1、发送消息
public class Producer { public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException, URISyntaxException { //连接RabbitMQ服务器 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); factory.setHost("197.168.24.206"); factory.setPort(5672); //创建一个连接 Connection conn = factory.newConnection(); //获得信道 Channel channel = conn.createChannel(); //声明交换器 channel.exchangeDeclare("ex-hello","direct"); //发送的消息内容 byte[] messageBodyBytes = "Hello, world!".getBytes(); channel.basicPublish("ex-hello", "route-hello", null, messageBodyBytes); //关闭通道 channel.close(); conn.close(); } }
5.8.2、接受消息
public class Consumer { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //连接RabbitMQ服务器 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); factory.setHost("197.168.24.206"); factory.setPort(5672); //创建一个连接 Connection conn = factory.newConnection(); //获得信道 Channel channel = conn.createChannel(); //声明队列 channel.queueDeclare("queue-hello", true, false, false, null); //声明绑定 channel.queueBind("queue-hello", "ex-hello", "route-hello"); //监听队列中的消息 channel.basicConsume("queue-hello",true,new SimpleConsumer(channel)); TimeUnit.SECONDS.sleep(10); channel.close(); conn.close(); } }
消息处理类SimpleConsumer
public class SimpleConsumer extends DefaultConsumer { public SimpleConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //接受从队列中发送的消息 System.out.println(consumerTag); System.out.println("-----收到消息了---------------"); System.out.println("消息属性为:"+properties); System.out.println("消息内容为:"+new String(body)); } }
消息发送成功之后,启动消费者,输出结果如下:
六、总结
整篇文章主要介绍了 RabbitMQ 内部结构、安装步骤、使用教程,以及 java 客户端使用等内容,内容比较长,限于笔者的才疏学浅,对本文内容可能还有理解不到位的地方,如有阐述不合理之处还望留言一起探讨。