MQ 里的一个 Consumer ID 代表一个 Consumer 实例群组。对于大多数分布式应用来说,一个 Consumer ID 下通常会挂载多个 Consumer 实例。订阅关系一致指的是同一个 Consumer ID 下所有 Consumer 实例的处理逻辑必须完全一致。一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。
由于 MQ 的订阅关系主要由 Topic+Tag 共同组成,因此,保持订阅关系一致意味着同一个 Consumer ID 下所有的实例需在以下两方面均保持一致:
订阅的 Topic 必须一致;
订阅的 Topic 中的 Tag 必须一致。
如上图所示,一个 Consumer ID 也可以订阅多个 Topic,但是这个 Consumer ID 里的多个消费者实例的订阅关系一定要保持一致。
下文给出了订阅关系不一致的错误代码示例。
【例一】以下例子中,同一个 Consumer ID 下的两个实例订阅的 Topic 不一致。
Consumer 实例 1-1:
- Properties properties = new Properties();
- properties.put(PropertyKeyConst.ConsumerId, "CID_jodie_test_1");
- Consumer consumer = ONSFactory.createConsumer(properties);
- consumer.subscribe("jodie_test_A", "*", new MessageListener() {
- public Action consume(Message message, ConsumeContext context) {
- System.out.println(message.getMsgID());
- return Action.CommitMessage;
- }
- });
Consumer 实例1-2:
- Properties properties = new Properties();
- properties.put(PropertyKeyConst.ConsumerId, " CID_jodie_test_1");
- Consumer consumer = ONSFactory.createConsumer(properties);
- consumer.subscribe("jodie_test_B ", "*", new MessageListener() {
- public Action consume(Message message, ConsumeContext context) {
- System.out.println(message.getMsgID());
- return Action.CommitMessage;
- }
- });
【例二】以下例子中,同一个 Consumer ID 下订阅 Topic 的 Tag 不一致。Consumer 实例2-1 订阅了 TagA,而 Consumer 实例2-2 未指定 Tag。
Consumer 实例2-1:
- Properties properties = new Properties();
- properties.put(PropertyKeyConst.ConsumerId, "CID_jodie_test_2");
- Consumer consumer = ONSFactory.createConsumer(properties);
- consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
- public Action consume(Message message, ConsumeContext context) {
- System.out.println(message.getMsgID());
- return Action.CommitMessage;
- }
- });
Consumer 实例2-2:
- Properties properties = new Properties();
- properties.put(PropertyKeyConst.ConsumerId, " CID_jodie_test_2");
- Consumer consumer = ONSFactory.createConsumer(properties);
- consumer.subscribe("jodie_test_A ", "*", new MessageListener() {
- public Action consume(Message message, ConsumeContext context) {
- System.out.println(message.getMsgID());
- return Action.CommitMessage;
- }
- });
【例三】此例中,错误的原因有俩个:
- 同一个 Consumer ID 下订阅 Topic 个数不一致。
- 同一个 Consumer ID 下订阅 Topic 的 Tag 不一致。
Consumer 实例3-1:
- Properties properties = new Properties();
- properties.put(PropertyKeyConst.ConsumerId, "CID_jodie_test_3");
- Consumer consumer = ONSFactory.createConsumer(properties);
- consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
- public Action consume(Message message, ConsumeContext context) {
- System.out.println(message.getMsgID());
- return Action.CommitMessage;
- }
- });
- consumer.subscribe("jodie_test_B", "TagB", new MessageListener() {
- public Action consume(Message message, ConsumeContext context) {
- System.out.println(message.getMsgID());
- return Action.CommitMessage;
- }
- });
Consumer 实例3-2:
- Properties properties = new Properties();
- properties.put(PropertyKeyConst.ConsumerId, " CID_jodie_test_3");
- Consumer consumer = ONSFactory.createConsumer(properties);
- consumer.subscribe("jodie_test_A ", "TagB", new MessageListener() {
- public Action consume(Message message, ConsumeContext context) {
- System.out.println(message.getMsgID());
- return Action.CommitMessage;
- }
- });