RabbitMQ:四种ExChange用法

简介: RabbitMQ发送消息时,都是先把消息发送给ExChange(交换机),然后再分发给有相应RoutingKey(路由)关系的Queue(队列)。ExChange和Queue之前是多对多的关系。RabbitMQ 3.0之后创建ExChange时,有四种类型可选“fanout、direct、topic、headers”。
RabbitMQ发送消息时,都是先把消息发送给ExChange(交换机),然后再分发给有相应RoutingKey(路由)关系的Queue(队列)。
ExChange和Queue之前是多对多的关系。
RabbitMQ 3.0之后创建ExChange时,有四种类型可选“fanout、direct、topic、headers”。

一、fanout
当向一个fanout发送一个消息时,RoutingKey的设置不起作用。
消息会被发送给同一个交换机下的所有队列,每个队列接收到的消息是一样的;
一个队列内有所有消费者(包含那些并没有相应RoutingKey的 消费者 ),将平分队列接收到的消息

----------------消息生产者----------------
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false, null);
String message = "hello world! ";

for(int i=0;i<100;i++)
{
channel.basicPublish(EXCHANGE_NAME, "", null, (message+i).getBytes());
}

System.out.println("Sent msg finish");

channel.close();
connection.close();

----------------消息消费者 ----------------
ConnectionFactory factory = new ConnectionFactory();

factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

//声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false, null);
// 声明 队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//绑定路由和队列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routkey2", null);

System.out.println(" Waiting for msg....");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,   AMQP.BasicProperties properties, byte[] body) {
String message = "";
try
{
message = new String(body, "UTF-8");
}
catch (UnsupportedEncodingException e)
{
e.printStackTrace();
}
catch (Throwable ex)
{
ex.printStackTrace();
}

System.out.println("Received msg='" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);

二、direct
当向一个direct发送一个消息时, 消息会被发送给同一个交换机下的 拥有相应RoutingKey的队列, 每个队列接收到的消息是一样的;
一个队列内拥有相应RoutingKey的消费者,将平分队列接收到的消息。

----------------消息生产者 ----------------
ConnectionFactory factory = new ConnectionFactory();

factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
String message = "hello world! ";

for(int i=0;i<100;i++)
{
channel.basicPublish(EXCHANGE_NAME, "routingkey1", null, (message+i).getBytes());
}

System.out.println("Sent msg is '" + message + "'");

channel.close();
connection.close();

----------------消息消费者 ----------------
ConnectionFactory factory = new ConnectionFactory();

factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

//声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
//声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//绑定路由和队列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", null);

System.out.println(" Waiting for msg....");
Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body)
{
String message = "";
try
{
message = new String(body, "UTF-8");
}
catch (UnsupportedEncodingException e)
{
e.printStackTrace();
}
catch (Throwable ex)
{
ex.printStackTrace();
}

System.out.println("1 Received msg='" + message + "'");
}
};

channel.basicConsume(QUEUE_NAME, true, consumer);

三、topic
当向一个topic发送一个消息时 消息会被发送给同一个交换机下的 拥有相应RoutingKey的队列, 每个队列接收到的消息是一样的;
一个队列内有所有消费者(包含那些并没有相应RoutingKey的 消费者 ),将平分队列接收到的消息

----------------消息生产者 ----------------
ConnectionFactory factory = new ConnectionFactory();

factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null);
String message = "hello world! ";

// int i=101;
for (int i = 0; i < 100; i++)
{
channel.basicPublish(EXCHANGE_NAME, "routingkey1", null, (message + i).getBytes());
}

System.out.println("Sent msg is '" + message + "'");

channel.close();
connection.close();

----------------消息消费者 ----------------
ConnectionFactory factory = new ConnectionFactory();

factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null);
//声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//绑定路由和队列// 把队列绑定到路由上并指定headers
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", null);

System.out.println("1 Waiting for msg....");
Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,  byte[] body)
{
String message = "";
try
{
message = new String(body, "UTF-8");
}
catch (UnsupportedEncodingException e)
{
e.printStackTrace();
}
catch (Throwable ex)
{
ex.printStackTrace();
}

System.out.println("1 Received msg='" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);

四、headers
当向一个headers发送一个消息时 消息会被发送给同一个交换机下的 拥有相应RoutingKey或者headers的队列, 每个队列接收到的消息是一样的;
一个队列内有所有消费者(包含那些并没有相应RoutingKey或headers的 消费者 ),将平分队列接收到的消息

----------------消息生产者 ----------------
ConnectionFactory factory = new ConnectionFactory();

factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "headers", true, false, null);

// 设置消息头键值对信息
Map<String, Object> headers = new Hashtable<String, Object>();
headers.put("name", "jack");
headers.put("age", 31);
Builder builder = new Builder();
builder.headers(headers);

String message = "hello world! ";

// int i=101;
for (int i = 0; i < 100; i++)
{
channel.basicPublish(EXCHANGE_NAME, "routingkey1", builder.build(), (message + i).getBytes());
}

System.out.println("Sent msg is '" + message + "'");

channel.close();
connection.close();

----------------消息消费者 ----------------
ConnectionFactory factory = new ConnectionFactory();

factory.setHost(S_RabbitMQ.QUEUE_IP);
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "headers", true, false, null);
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 设置消息头键值对信息
Map<String, Object> headers = new Hashtable<String, Object>();
// 这里x-match有两种类型
// all:表示所有的键值对都匹配才能接受到消息
// any:表示只要有键值对匹配就能接受到消息
headers.put("x-match", "all");
headers.put("name", "jack");
headers.put("age", 30);

// 把队列绑定到路由上并指定headers
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", headers);

System.out.println(" Waiting for msg....");
Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,  byte[] body) throws IOException
{

System.out.println("Received start --------------");

for (Entry<String, Object> entry : properties.getHeaders().entrySet())
{
System.out.println(entry.getKey() + "=" + entry.getValue());
}

String message = new String(body, "UTF-8");

System.out.println("msg='" + message + "'");
System.out.println("Received end --------------");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 存储 Java
RabbitMQ之Direct(直连)Exchange解读
RabbitMQ之Direct(直连)Exchange解读
|
6月前
|
消息中间件 Java Kafka
RabbitMQ安装和5种不同的消息模型(BasicQueue,WorkQueue,Fanout Exchange,Direct Exchange,Topic Exchange)与SpringAMQP
RabbitMQ安装和5种不同的消息模型(BasicQueue,WorkQueue,Fanout Exchange,Direct Exchange,Topic Exchange)与SpringAMQP
|
6月前
|
消息中间件 Java
RabbitMQ中的Exchange是什么?它有哪些类型?
RabbitMQ中的Exchange是什么?它有哪些类型?
178 0
|
消息中间件 存储 Java
RabbitMQ之Exchange(交换机)属性及备用交换机解读
RabbitMQ之Exchange(交换机)属性及备用交换机解读
|
消息中间件 存储 Java
RabbitMQ之headers(头部)Exchange解读
RabbitMQ之headers(头部)Exchange解读
|
消息中间件 存储 Java
RabbitMQ之topic(主题)Exchange解读
RabbitMQ之topic(主题)Exchange解读
|
消息中间件 存储 Java
RabbitMQ之Fanout(扇形) Exchange解读
RabbitMQ之Fanout(扇形) Exchange解读
|
消息中间件 存储
RabbitMQ从入门到进阶(Direct exchange)
RabbitMQ从入门到进阶(Direct exchange)
210 0
|
消息中间件 存储
RabbitMQ从入门到进阶(Exchange)
RabbitMQ从入门到进阶(Exchange)
152 0
|
消息中间件 网络架构
9、RabbitMQ教程-Topic Exchange类型的基本使用demo
9、RabbitMQ教程-Topic Exchange类型的基本使用demo
137 0
9、RabbitMQ教程-Topic Exchange类型的基本使用demo