1,启动流程
Producer如何感知要发送消息的broker(brokerAddrTable中的值是怎么获得的)?
- producer本地集合中没有,会根据指定topic到namesrv获取TopicPublishInfo,并放入本地集合。
- 定时从namesrv更新topic路由信息。
Producer与broker间的心跳
Producer定时发送心跳,将producer信息(其实就是procduer的group)定时发送到broker(brokerAddrTable集合中列出的)。
master接收消息,slave拷贝master
Producer发送消息,只发送到broker_master机器,通过broker的主从复制机制拷贝到broker_slave。
2,如何发送消息
producer轮询某topic下的所有queue的方式 实现发送方的负载均衡。
Topic下的所有队列如何理解?
Broker | Topic | queue | 注册namesrv队列(Topic_A) |
---|---|---|---|
broker1 | Topic_A | queue0 , queue1 | broker1_queue0 ,broker1_queue1 |
broker2 | Topic_A | queue0, queue2, queue3 | broker2_queue0,broker2_queue1,broker2_queue2 |
broker3 | Topic_A | queue0 | broker3_queue0 |
Producer如何实现轮询队列?
Producer从namesrv获得topic_A路由信息TopicPublishInfo。
//Topic_A的所有的队列
private List<MessageQueue> messageQueueList
//自增整型
private volatile ThreadLocalIndex sendWhichQueue
/**
*选择一个发送队列
*lastBrokerName不为空,代表上次选择的queue失败,本次避开同一个queue
*/
public MessageQueue selectOneMessageQueue(final String lastBrokerName){
//计算队列下标
//sendWhichQueue.getAndIncrement()每次调用+1
Math.abs(sendWhichQueue.getAndIncrement()) % this.messageQueueList.size()
}
Producer发消息系统重试?
//消息发送失败重试次数默认为2
private int retryTimesWhenSendFailed = 2;
//发送消息超时时间
private int sendMsgTimeout = 3000;
发送失败,换个队列继续发送所需条件:
1. 重试次数不到retryTimesWhenSendFailed (默认2次)
2. 发送此条消息花费时间还没有到sendMsgTimeout (默认3000毫秒)