在业务中经常要用drc,多分库分表,drc使用广播模式,如果多个client启动订阅topic,同时只能有一台机器订阅,我们之前的实现是抢夺式锁,前启动的机器基本会抢占锁,并持有多个topic,压力都在单机上面,我们想让多台机器均分这些topic,加大消费侧的整体吞吐量。
1.定义问题
多台机器订阅topic不均匀,整体吞吐量低,希望重新Rebalance从而提升消息的并行处理能力。
Rebalance限制:
由于一个队列最多分配给一个消费者,因此当某个消费者组下的消费者实例数量大于队列的数量时,多余的消费者实例将分配不到任何队列。
Rebalance危害:
除了以上限制,更加严重的是,在发生Rebalance时,存在着一些危害,如下所述:
-
消费暂停: 考虑在只有Consumer 1的情况下,其负责消费所有5个队列;在新增Consumer 2,触发Rebalance时,需要分配2个队列给其消费。那么Consumer 1就需要停止这2个队列的消费,等到这两个队列分配给Consumer 2后,这两个队列才能继续被消费。
-
重复消费: Consumer 2 在消费分配给自己的2个队列时,必须接着从Consumer 1之前已经消费到的offset继续开始消费。然而默认情况下,offset是异步提交的,如consumer 1当前消费到offset为10,但是异步提交给broker的offset为8;那么如果consumer 2从8的offset开始消费,那么就会有2条消息重复。也就是说,Consumer 2 并不会等待Consumer1提交完offset后,再进行Rebalance,因此提交间隔越长,可能造成的重复消费就越多。
-
消费突刺: 由于rebalance可能导致重复消费,如果需要重复消费的消息过多;或者因为rebalance暂停时间过长,导致积压了部分消息。那么都有可能导致在rebalance结束之后瞬间可能需要消费很多消息。
2.要解决的问题
-
实时感知消费者变化
-
平均订阅topic
-
脑裂问题:不能多订阅和少订阅(重复消费和少消费)
3.假设问题
假设有机器A,B,C,D,E,有topic4个编号1,2,3,4,当机器上线后按照上线机器数量平均分配这4个topic,重新分配后需要等待上一个占有者退订topic,这次分配到topic的机器才能订阅,避免重复消费消息。
-
A上线后,分配方式 A:1,2,3,4
-
B上线后,分配方式 A:1,3 B:2,4 B需要等待A退订2,4后才能订阅,下面都一样
-
C上线后,分配方式 A:1,4 B:2 C:3
-
D上线后,分配方式 A:1 B:2 C:3 D:4
-
E上线后,分配方式 A:1 B:2 C:3 D:4,维持不变E,没有可订阅的
4.走过的弯路
-
刚开想着用zk实现,就调研了下java- Curator ,和 go-zookeeper 的Api,大概想着注册topic和consumer并且监听目录变化,平均分配后,各个客户端监听到目录变化,重新订阅和退订就可以了。
-
java版的由我的同事@秋落 来实现,我们对了下觉得挺简单,3-4天就搞定了。
-
在没有完全想透彻的情况下就开始写代码了,经过多轮测试不断发现bug,脑裂情况一直出现 多订阅少订阅 ,意识到 拿着5个锅盖永远改不严实10个锅
-
重新整理和思考每一流程环节,重新构思设计,将全局逻辑和时序想明白了,画好流程图,在图上对可能出现的冲突点,一目了然,按图修复问题比盲目写代码太重要了,稍微复杂点的逻辑真的需要 三思而后行 。
5.解决方案
方案构思
-
我选用ZooKeeper,使用它临时节点,目录,子节点,序列节点,锁等特点
-
启动客户端时,将topic信息,分组信息配置好,注册到zk,并且将本机器唯一主键 (Ip+随机数+时间戳) 注册到consumer目录
-
加入consumer组持有锁,解决并发加入,消费组Event冲突,对后面预案冲突,产生脑裂
-
对topic和consumer-childern 按 升序排列,进行交叉分配,保证每次分配多机器议案一致
-
对议案进行评审,多机器 达成共识的议案 ,录入并执行
-
录入预案持有锁,解决并发录入,议案冲突,产生多订阅和少订阅
-
为了保证议案的有序录入和执行,将议案保存到proposal目录,每台机器执行完成需要在执行议案下保存自己主键节点和值
方案逻辑图
方案时序图
照着图写代码吧
单元测试验证
func TestStepUnit(t *testing.T) {
conn := connect()
defer conn.Close()
defer recursiveDeletePath(conn, "/test-drc/test-group/proposal")
recursiveDeletePath(conn, "/test-drc/test-group/proposal")
var group = GetGroupUnit(conn)
var lockPathList = group.GetLockPathList()
lockOwnersSupplier := func() []LockOwner {
return getLockOwner(conn, lockPathList)
}
consumerSupplier := func() []string {
return getChildren(conn, group.consumerPath)
}
time.Sleep(time.Second * 1)
assert.Equal(t, 1, len(consumerSupplier()))
var maxNodePath = "/test-drc/test-group/proposal/no-0000000000"
var maxNodeData = getData(conn, maxNodePath)
var nodeMap map[string][]string
if err := json.Unmarshal([]byte(maxNodeData), &nodeMap); err != nil {
t.Fatalf("erro:%+v", err)
return
}
time.Sleep(time.Second * 10)
var ownerLockList = lockOwnersSupplier()
log.Infof("nodeMap: %+v", nodeMap)
log.Infof("ownerLockList: %+v", ownerLockList)
for _,ownerLock := range ownerLockList {
assert.Equal(t, StringInSlice(ownerLock.Lock, nodeMap[ownerLock.Owner]), true)
t.Logf("Lock成功写入%+v", ownerLock.Lock)
assert.Equal(t, existsPath(conn, buildString(maxNodePath, "/", ownerLock.Owner)), true)
t.Logf("议案成功写入%+v", buildString(maxNodePath, ownerLock.Owner))
assert.Equal(t, group.topicClientMap[ownerLock.Lock].started, true)
t.Logf("客户端成功启动%+v", ownerLock.Lock)
}
t.Logf("启动一个验证成功%+v", group.Owner)
conn2 := connect()
defer conn2.Close()
var group2 = GetGroupUnit(conn2)
time.Sleep(time.Second * 1)
assert.Equal(t, 2, len(consumerSupplier()))
maxNodePath = "/test-drc/test-group/proposal/no-0000000001"
maxNodeData = getData(conn, maxNodePath)
for k, _ := range nodeMap {
delete(nodeMap, k)
}
if err := json.Unmarshal([]byte(maxNodeData), &nodeMap); err != nil {
t.Fatalf("erro:%+v", err)
return
}
time.Sleep(time.Second * 15)
var g1 = filterLockOwner(getLockOwner(conn, lockPathList), func(it LockOwner) bool {
return it.Owner == group.Owner
})
var g2 = filterLockOwner(getLockOwner(conn2, lockPathList), func(it LockOwner) bool {
return it.Owner == group2.Owner
})
for _, ownerLock := range g1 {
assert.Equal(t, StringInSlice(ownerLock.Lock, nodeMap[ownerLock.Owner]), true)
t.Logf("g1Lock成功写入%+v", ownerLock.Lock)
assert.Equal(t, existsPath(conn, buildString(maxNodePath, "/", ownerLock.Owner)), true)
t.Logf("g1议案成功写入%+v", buildString(maxNodePath, ownerLock.Owner))
assert.Equal(t, group.topicClientMap[ownerLock.Lock].started, true)
t.Logf("g1客户端成功启动%+v", ownerLock.Lock)
}
for _, ownerLock := range g2 {
assert.Equal(t, StringInSlice(ownerLock.Lock, nodeMap[ownerLock.Owner]), true)
t.Logf("g2Lock成功写入%+v", ownerLock.Lock)
assert.Equal(t, existsPath(conn, buildString(maxNodePath, "/", ownerLock.Owner)), true)
t.Logf("g2议案成功写入%+v", buildString(maxNodePath, ownerLock.Owner))
assert.Equal(t, group2.topicClientMap[ownerLock.Lock].started, true)
t.Logf("g2客户端成功启动%+v", ownerLock.Lock)
}
t.Logf("启动二个验证成功: %+v,%+v", group.Owner, group2.Owner)
conn3 := connect()
defer conn3.Close()
var group3 = GetGroupUnit(conn3)
time.Sleep(time.Second * 1)
assert.Equal(t, 3, len(consumerSupplier()))
maxNodePath = "/test-drc/test-group/proposal/no-0000000002"
maxNodeData = getData(conn, maxNodePath)
for k, _ := range nodeMap {
delete(nodeMap, k)
}
if err := json.Unmarshal([]byte(maxNodeData), &nodeMap); err != nil {
t.Fatalf("erro:%+v", err)
return
}
time.Sleep(time.Second * 15)
g1 = filterLockOwner(getLockOwner(conn, lockPathList), func(it LockOwner) bool {
return it.Owner == group.Owner
})
g2 = filterLockOwner(getLockOwner(conn2, lockPathList), func(it LockOwner) bool {
return it.Owner == group2.Owner
})
var g3 = filterLockOwner(getLockOwner(conn3, lockPathList), func(it LockOwner) bool {
return it.Owner == group3.Owner
})
for _, ownerLock := range g1 {
assert.Equal(t, StringInSlice(ownerLock.Lock, nodeMap[ownerLock.Owner]), true)
t.Logf("g1Lock成功写入%+v", ownerLock.Lock)
assert.Equal(t, existsPath(conn, buildString(maxNodePath, "/", ownerLock.Owner)), true)
t.Logf("g1议案成功写入%+v", buildString(maxNodePath, ownerLock.Owner))
assert.Equal(t, group.topicClientMap[ownerLock.Lock].started, true)
t.Logf("g1客户端成功启动%+v", ownerLock.Lock)
}
for _, ownerLock := range g2 {
assert.Equal(t, StringInSlice(ownerLock.Lock, nodeMap[ownerLock.Owner]), true)
t.Logf("g2Lock成功写入%+v", ownerLock.Lock)
assert.Equal(t, existsPath(conn, buildString(maxNodePath, "/", ownerLock.Owner)), true)
t.Logf("g2议案成功写入%+v", buildString(maxNodePath, ownerLock.Owner))
assert.Equal(t, group2.topicClientMap[ownerLock.Lock].started, true)
t.Logf("g2客户端成功启动%+v", ownerLock.Lock)
}
for _, ownerLock := range g3 {
assert.Equal(t, StringInSlice(ownerLock.Lock, nodeMap[ownerLock.Owner]), true)
t.Logf("g3Lock成功写入%+v", ownerLock.Lock)
assert.Equal(t, existsPath(conn, buildString(maxNodePath, "/", ownerLock.Owner)), true)
t.Logf("g3议案成功写入%+v", buildString(maxNodePath, ownerLock.Owner))
assert.Equal(t, group3.topicClientMap[ownerLock.Lock].started, true)
t.Logf("g3客户端成功启动%+v", ownerLock.Lock)
}
t.Logf("启动三个验证成功: %+v, %+v, %+v", group.Owner, group2.Owner, group3.Owner)
conn4 := connect()
defer conn4.Close()
var group4 = GetGroupUnit(conn4)
time.Sleep(time.Second * 1)
assert.Equal(t, 4, len(consumerSupplier()))
maxNodePath = "/test-drc/test-group/proposal/no-0000000003"
maxNodeData = getData(conn, maxNodePath)
for k, _ := range nodeMap {
delete(nodeMap, k)
}
if err := json.Unmarshal([]byte(maxNodeData), &nodeMap); err != nil {
t.Fatalf("erro:%+v", err)
return
}
time.Sleep(time.Second * 15)
g1 = filterLockOwner(getLockOwner(conn, lockPathList), func(it LockOwner) bool {
return it.Owner == group.Owner
})
g2 = filterLockOwner(getLockOwner(conn2, lockPathList), func(it LockOwner) bool {
return it.Owner == group2.Owner
})
g3 = filterLockOwner(getLockOwner(conn3, lockPathList), func(it LockOwner) bool {
return it.Owner == group3.Owner
})
var g4 = filterLockOwner(getLockOwner(conn4, lockPathList), func(it LockOwner) bool {
return it.Owner == group4.Owner
})
for _, ownerLock := range g1 {
assert.Equal(t, StringInSlice(ownerLock.Lock, nodeMap[ownerLock.Owner]), true)
t.Logf("g1Lock成功写入%+v", ownerLock.Lock)
assert.Equal(t, existsPath(conn, buildString(maxNodePath, "/", ownerLock.Owner)), true)
t.Logf("g1议案成功写入%+v", buildString(maxNodePath, ownerLock.Owner))
assert.Equal(t, group.topicClientMap[ownerLock.Lock].started, true)
t.Logf("g1客户端成功启动%+v", ownerLock.Lock)
}
for _, ownerLock := range g2 {
assert.Equal(t, StringInSlice(ownerLock.Lock, nodeMap[ownerLock.Owner]), true)
t.Logf("g2Lock成功写入%+v", ownerLock.Lock)
assert.Equal(t, existsPath(conn, buildString(maxNodePath, "/", ownerLock.Owner)), true)
t.Logf("g2议案成功写入%+v", buildString(maxNodePath, ownerLock.Owner))
assert.Equal(t, group2.topicClientMap[ownerLock.Lock].started, true)
t.Logf("g2客户端成功启动%+v", ownerLock.Lock)
}
for _, ownerLock := range g3 {
assert.Equal(t, StringInSlice(ownerLock.Lock, nodeMap[ownerLock.Owner]), true)
t.Logf("g3Lock成功写入%+v", ownerLock.Lock)
assert.Equal(t, existsPath(conn, buildString(maxNodePath, "/", ownerLock.Owner)), true)
t.Logf("g3议案成功写入%+v", buildString(maxNodePath, ownerLock.Owner))
assert.Equal(t, group3.topicClientMap[ownerLock.Lock].started, true)
t.Logf("g3客户端成功启动%+v", ownerLock.Lock)
}
for _, ownerLock := range g4 {
assert.Equal(t, StringInSlice(ownerLock.Lock, nodeMap[ownerLock.Owner]), true)
t.Logf("g4Lock成功写入%+v", ownerLock.Lock)
assert.Equal(t, existsPath(conn, buildString(maxNodePath, "/", ownerLock.Owner)), true)
t.Logf("g4议案成功写入%+v", buildString(maxNodePath, ownerLock.Owner))
assert.Equal(t, group4.topicClientMap[ownerLock.Lock].started, true)
t.Logf("g4客户端成功启动%+v", ownerLock.Lock)
}
t.Logf("启动四个验证成功: %+v, %+v, %+v, %+v", group.Owner, group2.Owner, group3.Owner, group4.Owner)
conn5 := connect()
defer conn5.Close()
var group5 = GetGroupUnit(conn5)
time.Sleep(time.Second * 15)
assert.Equal(t, 5, len(consumerSupplier()))
var g5 = filterLockOwner(getLockOwner(conn5, lockPathList), func(it LockOwner) bool {
return it.Owner == group5.Owner
})
assert.Equal(t, len(g5), 0)
t.Logf("g5客户端没有分配到%+v", len(g5))
t.Logf("启动五个验证成功: %+v, %+v, %+v, %+v, %+v", group.Owner, group2.Owner, group3.Owner, group4.Owner, group5.Owner)
}
6.能在消费侧提升server的生产力吗?
消费侧的吞吐量解决了,生产侧也存在者生产不足的问题,drc也提供了hash版的,将topi-hash成指定的分配,
我在想如果每个topic按照table的维度过滤,把一个topic按照table的维度过滤,单独维护每个table的checkpoint,启动table数量多client去和server建立连接去订阅消费,是不是在消费侧也能提升生产力。
go-drc-client 文档地址
go-zk-drc-client 文档地址