一、Publish/Subscribe 订阅模式
订阅模式中多了一个交换机角色
- P:生产者,不再发送消息给队列,而是发送消息给X(交换机)
- C:消费者,消息的接收者
- Queue:接收、缓存消息
- Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。如何操作,取决于Exchange的类型。常见的类型有:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,将消息交给符合指定路由(routing key)的队列
- Topic:通配符,将消息交给符合路由模式(routing pattern)的队列
- 交换机只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与交换机绑定,或者没有符合路由规则的队列,那么消息会丢失。
二、示例
基础代码及工具类全都在第一篇中,需要请查看《RabbitMQ工作模式(一)》
1、生产者代码
packagecom.cui.producer; importcom.cui.common.RabbitMqUtils; importcom.rabbitmq.client.BuiltinExchangeType; importcom.rabbitmq.client.Channel; importcom.rabbitmq.client.Connection; importjava.io.IOException; publicclassProducer_PubSub { publicstaticvoidmain(String[] args) throwsIOException { //获得连接Connectionconnection=RabbitMqUtils.getConnection(); //创建channelChannelchannel=connection.createChannel(); //创建交换机/*** exchange:交换机名称* type:交换机类型* DIRECT("direct"),定向* FANOUT("fanout"),广播* TOPIC("topic"),通配符* HEADERS("headers");参数匹配* durable:是否持久化* autoDelete:是否自动删除* internal:内部使用。一般是false* arguments:参数* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)*/Stringexchange="test_fanout_exchange"; channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT, true, false, false, null); //创建队列StringqueueName1="test_fanout_queue1"; StringqueueName2="test_fanout_queue2"; channel.queueDeclare(queueName1, true, false, false, null); channel.queueDeclare(queueName2, true, false, false, null); //绑定交换机和队列,fanout模式下路由设置为空channel.queueBind(queueName1, exchange, ""); channel.queueBind(queueName2, exchange, ""); //发送消息Stringmessage="信息:这是一条fanout模式下的消息"; channel.basicPublish(exchange, "", null, message.getBytes()); //释放资源RabbitMqUtils.close(channel, connection); } }
2、消费者代码
packagecom.cui.consumer; importcom.cui.common.RabbitMqUtils; importcom.rabbitmq.client.*; importjava.io.IOException; /*** 消费者1 Consumer_PubSub1* 消费者2 Consumer_PubSub2 复制1,注意修改queueName2*/publicclassConsumer_PubSub1 { publicstaticvoidmain(String[] args) throwsIOException { Connectionconnection=RabbitMqUtils.getConnection(); Channelchannel=connection.createChannel(); StringqueueName1="test_fanout_queue1"; //接收消息Consumerconsumer=newDefaultConsumer(channel){ publicvoidhandleDelivery(StringconsumerTag, Envelopeenvelope, AMQP.BasicPropertiesproperties, byte[] body) throwsIOException { Stringmessage=newString(body, "UTF-8"); System.out.println("接收到消息:"+message); System.out.println("将信息打印到控制台"); } }; channel.basicConsume(queueName1, true, consumer); } }
3、小结
- 订阅模式同一个交换机有多个队列,多个消费者监听各自队列