本文主要介绍RocketMQ的使用,主要内容如下:
一、 RocketMQ集群
1. RocketMQ集群部署结构
1) Name Server
Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
2) Broker
Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的Broker Name,不同的Broker Id来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。
每个Broker与Name Server集群中的所有节点建立长连接,定时(每隔30s)注册Topic信息到所有NameServer。Name Server定时(每隔10s)扫描所有存活broker的连接,如果Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接。
3) Producer
Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取所有topic队列的最新情况,这意味着如果Broker不可用,Producer最多30s能够感知,在此期间内发往Broker的所有消息都会失败。
Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到心跳数据,则关闭与Producer的连接。
4) Consumer
Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
Consumer每隔30s从Name server获取topic的最新队列情况,这意味着Broker不可用时,Consumer最多最需要30s才能感知。
Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该Consumer Group的所有Consumer发出通知,Group内的Consumer重新分配队列,然后继续消费。
当Consumer得到master宕机通知后,转向slave消费,slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是一旦master恢复,未同步过去的消息会被最终消费掉。
二、 事务
1. MQ与DB一致性原理
1) 流程图
上图是RocketMQ提供的保证MQ消息、DB事务一致性的方案。
MQ消息、DB操作一致性方案:
² 发送消息到MQ服务器,此时消息状态为SEND_OK。此消息为consumer不可见。
² 执行DB操作;DB执行成功Commit DB操作,DB执行失败Rollback DB操作。
² 如果DB执行成功,回复MQ服务器,将状态为COMMIT_MESSAGE;如果DB执行失败,回复MQ服务器,将状态改为ROLLBACK_MESSAGE。注意此过程有可能失败。
² MQ内部提供一个名为“事务状态服务”的服务,此服务会检查事务消息的状态,如果发现消息未COMMIT,则通过Producer启动时注册的TransactionCheckListener来回调业务系统,业务系统在checkLocalTransactionState方法中检查DB事务状态,如果成功,则回复COMMIT_MESSAGE,否则回复ROLLBACK_MESSAGE。
说明:
² 上面依DB为例,其实此处可以是任何业务或者数据源。
² 以上SEND_OK、COMMIT_MESSAGE、ROLLBACK_MESSAGE均是clint jar提供的状态,在MQ服务器内部是一个数字。
2) 示例代码
a) 创建Producer
创建TransactionMQProducer,并注册TransactionCheckListener。
TransactionCheckListener transactionCheckListener = new TransactionCheckListener() {
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {
try {
Object msg = HessianUtils.decode(messageExt.getBody());
return getCommitStatus(messageExt.getTopic(), messageExt.getTags(), msg);
} catch (IOException e) {
logger.error(e.getMessage(), e);
return LocalTransactionState.COMMIT_MESSAGE;
}
}
};
// 设置事务会查监听器
producer.setTransactionCheckListener(transactionCheckListener);
b) 发送事务消息
Message message = new Message(topic, tag, HessianUtils.encode(msg));
// 发送事务性消息
TransactionSendResult sendResult = producer.sendMessageInTransaction(message, new LocalTransactionExecuter() {
@Override
public LocalTransactionState executeLocalTransactionBranch(Message arg0, Object arg1) {
Boolean result = transactionTemplate.execute(new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(TransactionStatus status) {
try {
// insert or update db
return true;
} catch (Exception e) {
logger.error("insert / update failed!", e);
status.setRollbackOnly();
return false;
}
}
});
if (result == null || !result) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}, null);
if (sendResult.getLocalTransactionState() != LocalTransactionState.COMMIT_MESSAGE) {
logger.error("send transaction msg failed! topic={}, tags={}, message={}, sendResult={}", topic, tag, msg, JSON.toJSONString(sendResult));
return sendResult;
}
return sendResult;
2. 案例分析
1) 单机环境下的事务示意图
如下为A给B转账的例子。
1 |
锁定A的账户 |
2 |
锁定B的账户 |
3 |
检查A账户是否有1元 |
4 |
A的账户扣减1元 |
5 |
给B的账户加1元 |
6 |
解锁B的账户 |
7 |
解锁A的账户 |
以上过程在代码层面甚至可以简化到在一个事物中执行两条sql语句。
2) 集群环境下事务
和单机事务不同,A、B账户可能不在同一个DB中,此时无法像在单机情况下使用事物来实现。此时可以通过一下方式实现,将转账操作分成两个操作。
a) A账户
1 |
锁定A的账户 |
2 |
检查A账户是否有1元 |
3 |
A的账户扣减1元 |
4 |
解锁A的账户 |
b) MQ消息
A账户数据发生变化时,发送MQ消息,MQ服务器将消息推送给转账系统,转账系统来给B账号加钱。
c) B账户
1 |
锁定B的账户 |
2 |
给B的账户加1元 |
3 |
解锁B的账户 |
d) 原理
大事物 = 小事物 + 异步
3. 系统之间数据一致性方案
以上是交易系统和其他系统之间保持最终一致性的解决方案。
三、 顺序消息
1. 顺序消息缺陷
发送顺序消息无法利用集群Fail Over特性 消费顺序消息的并行度依赖于队列数量队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题遇到消息失败的消息,无法跳过,当前队列消费暂停。
2. 原理
produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息。
注意:把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue
3. Producer
import java.io.UnsupportedEncodingException;
import java.util.List;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class Producer {
public static void main(String[] args) throws UnsupportedEncodingException {
try {
MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
Message msg =
new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
}
4. Consumer
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(false);
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
四、 最佳实践
1. Producer
1) Topic
一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用tags 在broker做消息过滤。
2) key
每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过 topic,key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key 尽可能唯一,这样可以避免潜在的哈希冲突。
//订单Id
String orderId="20034568923546";
message.setKeys(orderId);
3) 日志
消息发送成功或者失败,要打印消息日志,务必要打印 send result 和key 字段。
4) send
send消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在sendResult里定义。
SEND_OK:消息发送成功
FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到Slave时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
SLAVE_NOT_AVAILABLE:消息发送成功,但是此时slave不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
2. Consumer
1) 幂等
RocketMQ使用的消息原语是At Least Once,所以consumer可能多次收到同一个消息,此时务必做好幂等。
2) 日志
消费时记录日志,以便后续定位问题。
3) 批量消费
尽量使用批量方式消费方式,可以很大程度上提高消费吞吐量。
五、 参考资料
1. 文档
见附件
2. 博客
分布式开放消息系统(RocketMQ)的原理与实践
http://www.jianshu.com/p/453c6e7ff81c
RocketMQ事务消费和顺序消费详解
http://www.cnblogs.com/520playboy/p/6750023.html
ZeroCopy
http://www.linuxjournal.com/article/6345
IO方式的性能数据
http://stblog.baidu-tech.com/?p=851