作者:俏巴
概述
初次使用AMQP的过程中,总是容易被AMQP支持的消息模型绕晕,这里结合官方的教程,对AMQP的消息模型做一个简要总结,供参考。目前官方给出了六种消息发送/接收模型,这里主要介绍前五种消息模型。
消息模型
1、Hello World
简单模式就是生产者将消息发送到队列、消费者从队列中获取消息。一条消息对应一个消费者。
示例代码说明:
测试使用的是阿里云的AMQP消息队列服务,具体的代码配置过程可以参考阿里云官方链接。
工具类
import AMQP.AliyunCredentialsProvider;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
public static Connection getConnection() throws Exception{
// 初始化参数设置
String AccessKey= "********";
String SecretKey = "********";
Long Uid = ********16617278L;
String VhostName = "********";
String host = "********16617278.mq-amqp.cn-hangzhou-a.aliyuncs.com";
// 定义连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置服务地址
connectionFactory.setHost(host);
// 端口
connectionFactory.setPort(5672);
// 设置用户名、密码、vhost
connectionFactory.setCredentialsProvider(new AliyunCredentialsProvider(AccessKey,SecretKey,Uid));
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(5000);
connectionFactory.setVirtualHost(VhostName);
// 通过工厂获取连接对象
Connection connection = connectionFactory.newConnection();
return connection;
}
}
发送端示例代码
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
// hello world 单个消费者和接收者
public class Send {
private final static String Queue_name = "helloDemo";
public static void main(String[] args) throws Exception {
// 获取连接及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Queue_name,false,false,false,null);
//消息内容
String message = "Hello World!";
// 1、交换机,此处无 2、发送到那个队列 3、属性 4、消息内容
channel.basicPublish("",Queue_name,null,message.getBytes());
System.out.println("发送数据:" + message);
// 关闭连接
channel.close();
connection.close();
}
}
消费端示例代码
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Receiver {
private final static String Queue_name = "helloDemo";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
// 开始消费消息
channel.basicConsume(Queue_name, false, "ConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
//接收到的消息,进行业务逻辑处理
System.out.println("message receive: ");
System.out.println("Received: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
Thread.sleep(100000);
channel.close();
connection.close();
}
}
2、Work Queues
一条消息可以被多个消费者尝试接收,最终只有一个消费者能够获取到消息。
发送端示例代码
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
// 1:N 消费者各自接收消息
public class Sender {
private final static String queueName = "workQueue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(queueName,false,false,false,null);
for (int i = 0; i < 100; i++) {
String message = "workqueues message " + i;
channel.basicPublish("",queueName,null,message.getBytes());
System.out.println("发送消息: " + message);
Thread.sleep(10);//休眠
}
// 关闭连接
channel.close();
connection.close();
}
}
消费端示例代码1
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Receiver1 {
private final static String queueName = "workQueue";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(queueName,false,false,false,null);
channel.basicQos(1);//告诉服务器,在没有确认当前消息完成之前,不要给我发新的消息。
DefaultConsumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
//接收到的消息,进行业务逻辑处理
System.out.println("message receive1: ");
System.out.println("Received1: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);// 参数2 false为确认收到消息, true为拒绝收到消息
}
};
channel.basicConsume(queueName,false,consumer);// 参数2 手动确认,代表我们收到消息后需要手动确认告诉服务器我们收到消息了
}
}
消费端示例代码2
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Receiver2 {
private final static String queueName = "workQueue";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(queueName,false,false,false,null);
channel.basicQos(1);//告诉服务器,在没有确认当前消息完成之前,不要给我发新的消息。
DefaultConsumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
//接收到的消息,进行业务逻辑处理
System.out.println("message receive2: ");
System.out.println("Received2: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);// 参数2 false为确认收到消息, true为拒绝收到消息
}
};
channel.basicConsume(queueName,false,consumer);// 参数2 手动确认,代表我们收到消息后需要手动确认告诉服务器我们收到消息了
}
}
3、Publish/Subscribe
一条消息可以被多个消费者同时获取,生产者将消息发送给交换机,消费者将自己对应的队列注册到交换机,当发送消息后,所有注册的队列的消费者都可以收到消息。
发送端示例代码
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Sender {
private static String Exchange_Name = "ExchangeDemo";//声明交换机
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange
// Producer 将消息发送到 Exchange ,由 Exchange 将消息路由到一个或多个 Queue 中(或者丢弃),Exchange 按照相应的 Binding 逻辑将消息路由到 Queue。
channel.exchangeDeclare(Exchange_Name,"fanout");
String message = "Exchange message demo";
// 消息发送端交换机,如果此时没有队列绑定,则消息会丢失,因为交换机没有存储消息的能力
channel.basicPublish(Exchange_Name,"",null,message.getBytes());
System.out.println("发送消息: " + message);
}
}
消费端示例代码1
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Sub1 {
private static String Exchange_Name = "ExchangeDemo";//声明交换机
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("testqueue1",false,false,false,null);
// 绑定到交换机
channel.queueBind("testqueue1",Exchange_Name,"");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException{
System.out.println("sub1: " + new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("testqueue1",false,consumer);
}
}
消费端示例代码2
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Sub2 {
private static String Exchange_Name = "ExchangeDemo";//声明交换机
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("testqueue2",false,false,false,null);
// 绑定到交换机
channel.queueBind("testqueue2",Exchange_Name,"");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException{
System.out.println("sub2: " + new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("testqueue2",false,consumer);
}
}
4、Routing
生产者将消息发送到type为direct模式的交换机,消费者的队列将自己绑定到路由的时候给自己绑定一个key,只有生产者发送的消息key和绑定的key一致时,消费者才能收到对应的消息。
发送端示例代码
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Sender {
private static final String ExchangeName = "Rout_Change";//路由消息交换机
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(ExchangeName,"direct");
channel.basicPublish(ExchangeName,"key3",null,"route 消息".getBytes());
channel.close();
connection.close();
}
}
消费端示例代码
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Sub {
private static final String ExchangeName = "Rout_Change";//路由消息交换机
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("testroutequeue1",false,false,false,null);
// 绑定交换机
// 参数3 标记 绑定到交换机的时候会有一个标记,只有和它一样标记的消息才会别消费到
channel.queueBind("testroutequeue1",ExchangeName,"key1");
channel.queueBind("testroutequeue1",ExchangeName,"key2");//绑定多个标记
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
//接收到的消息,进行业务逻辑处理
System.out.println("message route receive1: ");
System.out.println("Received1: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);// 参数2 false为确认收到消息, true为拒绝收到消息
}
};
channel.basicConsume("testroutequeue1",false,consumer);// 参数2 手动确认,代表我们收到消息后需要手动确认告诉服务器我们收到消息了
}
}
5、Topics
该类型与 Direct 类型相似,只是规则没有那么严格,可以模糊匹配和多条件匹配,即该类型 Exchange 使用 Routing key 模式匹配和字符串比较的方式将消息路由至绑定的 Queue。
示例:
Routing key 为 use.stock 的消息会转发给绑定匹配模式为 .stock, use.stock, . 和 #.use.stock.# 的 Queue; 表是匹配一个任意词组,# 表示匹配 0 个或多个词组。
发送端示例代码
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Sender {
private static String Exchange_Name = "Exchange_Topic";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange类型为Topic 也就是通配符模式
channel.exchangeDeclare(Exchange_Name,"topic");
channel.basicPublish(Exchange_Name,"abc.1.2",null,"Topic 模式消息".getBytes());
// 关闭通道和连接
channel.close();
connection.close();
}
}
接收端示例代码
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Sub {
private static String ExchangeName = "Exchange_Topic";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("topicqueue",false,false,false,null);
// 绑定交换机
// 参数3 标记 绑定到交换机的时候会有一个标记,只有和它一样标记的消息才会别消费到
channel.queueBind("topicqueue",ExchangeName,"key.*");
channel.queueBind("topicqueue",ExchangeName,"abc.#");//绑定多个标记
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
//接收到的消息,进行业务逻辑处理
System.out.println("message route receive1: ");
System.out.println("Received1: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);// 参数2 false为确认收到消息, true为拒绝收到消息
}
};
channel.basicConsume("topicqueue",false,consumer);// 参数2 手动确认,代表我们收到消息后需要手动确认告诉服务器我们收到消息了
}
}