drc-client动态负载均衡加大消费侧吞吐量

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
EMR Serverless StarRocks,5000CU*H 48000GB*H
应用型负载均衡 ALB,每月750个小时 15LCU
简介: 在业务中经常要用drc,多分库分表,drc使用广播模式,如果多个client启动订阅topic,同时只能有一台机器订阅,我们之前的实现是抢夺式锁,前启动的机器基本会抢占锁,并持有多个topic,压力都在单机上面,我们想让多台机器均分这些topic,加大消费侧的整体吞吐量。1.定义问题多台机器订阅topic不均匀,整体吞吐量低,希望重新Rebalance从而提升消息的并行处理能力。Rebalance

在业务中经常要用drc,多分库分表,drc使用广播模式,如果多个client启动订阅topic,同时只能有一台机器订阅,我们之前的实现是抢夺式锁,前启动的机器基本会抢占锁,并持有多个topic,压力都在单机上面,我们想让多台机器均分这些topic,加大消费侧的整体吞吐量。

1.定义问题

多台机器订阅topic不均匀,整体吞吐量低,希望重新Rebalance从而提升消息的并行处理能力。

Rebalance限制:

由于一个队列最多分配给一个消费者,因此当某个消费者组下的消费者实例数量大于队列的数量时,多余的消费者实例将分配不到任何队列。

Rebalance危害: 

除了以上限制,更加严重的是,在发生Rebalance时,存在着一些危害,如下所述:

  • 消费暂停: 考虑在只有Consumer 1的情况下,其负责消费所有5个队列;在新增Consumer 2,触发Rebalance时,需要分配2个队列给其消费。那么Consumer 1就需要停止这2个队列的消费,等到这两个队列分配给Consumer 2后,这两个队列才能继续被消费。
  • 重复消费: Consumer 2 在消费分配给自己的2个队列时,必须接着从Consumer 1之前已经消费到的offset继续开始消费。然而默认情况下,offset是异步提交的,如consumer 1当前消费到offset为10,但是异步提交给broker的offset为8;那么如果consumer 2从8的offset开始消费,那么就会有2条消息重复。也就是说,Consumer 2 并不会等待Consumer1提交完offset后,再进行Rebalance,因此提交间隔越长,可能造成的重复消费就越多。 
  • 消费突刺: 由于rebalance可能导致重复消费,如果需要重复消费的消息过多;或者因为rebalance暂停时间过长,导致积压了部分消息。那么都有可能导致在rebalance结束之后瞬间可能需要消费很多消息。

2.要解决的问题

  • 实时感知消费者变化
  • 平均订阅topic
  • 脑裂问题:不能多订阅和少订阅(重复消费和少消费)

3.假设问题

假设有机器A,B,C,D,E,有topic4个编号1,2,3,4,当机器上线后按照上线机器数量平均分配这4个topic,重新分配后需要等待上一个占有者退订topic,这次分配到topic的机器才能订阅,避免重复消费消息。

  • A上线后,分配方式  A:1,2,3,4
  • B上线后,分配方式  A:1,3  B:2,4    B需要等待A退订2,4后才能订阅,下面都一样
  • C上线后,分配方式  A:1,4  B:2    C:3
  • D上线后,分配方式  A:1     B:2    C:3  D:4
  • E上线后,分配方式  A:1     B:2    C:3  D:4,维持不变E,没有可订阅的

4.走过的弯路

  1. 刚开想着用zk实现,就调研了下java- Curator ,和 go-zookeeper 的Api,大概想着注册topic和consumer并且监听目录变化,平均分配后,各个客户端监听到目录变化,重新订阅和退订就可以了。
  2. java版的由我的同事@秋落 来实现,我们对了下觉得挺简单,3-4天就搞定了。
  3. 在没有完全想透彻的情况下就开始写代码了,经过多轮测试不断发现bug,脑裂情况一直出现 多订阅少订阅 ,意识到 拿着5个锅盖永远改不严实10个锅
  4. 重新整理和思考每一流程环节,重新构思设计,将全局逻辑和时序想明白了,画好流程图,在图上对可能出现的冲突点,一目了然,按图修复问题比盲目写代码太重要了,稍微复杂点的逻辑真的需要 三思而后行

5.解决方案

方案构思
  • 我选用ZooKeeper,使用它临时节点,目录,子节点,序列节点,锁等特点
  • 启动客户端时,将topic信息,分组信息配置好,注册到zk,并且将本机器唯一主键 (Ip+随机数+时间戳) 注册到consumer目录
  • 加入consumer组持有锁,解决并发加入,消费组Event冲突,对后面预案冲突,产生脑裂
  • 对topic和consumer-childern 升序排列,进行交叉分配,保证每次分配多机器议案一致
  • 对议案进行评审,多机器 达成共识的议案 ,录入并执行
  • 录入预案持有锁,解决并发录入,议案冲突,产生多订阅和少订阅
  • 为了保证议案的有序录入和执行,将议案保存到proposal目录,每台机器执行完成需要在执行议案下保存自己主键节点和值
方案逻辑图

方案时序图

照着图写代码吧

单元测试验证
func TestStepUnit(t *testing.T) {
	conn := connect()
	defer conn.Close()
	defer recursiveDeletePath(conn, "/test-drc/test-group/proposal")
	recursiveDeletePath(conn, "/test-drc/test-group/proposal")

	var group = GetGroupUnit(conn)

	var lockPathList = group.GetLockPathList()

	lockOwnersSupplier := func() []LockOwner {
		return getLockOwner(conn, lockPathList)
	}

	consumerSupplier := func() []string {
		return getChildren(conn, group.consumerPath)
	}

	time.Sleep(time.Second * 1)
	assert.Equal(t, 1, len(consumerSupplier()))
	var maxNodePath = "/test-drc/test-group/proposal/no-0000000000"
	var maxNodeData = getData(conn, maxNodePath)
	var nodeMap map[string][]string
	if err := json.Unmarshal([]byte(maxNodeData), &nodeMap); err != nil {
		t.Fatalf("erro:%+v", err)
		return
	}
	time.Sleep(time.Second * 10)
	var ownerLockList = lockOwnersSupplier()
	log.Infof("nodeMap: %+v", nodeMap)
	log.Infof("ownerLockList: %+v", ownerLockList)
	for _,ownerLock := range ownerLockList {
		assert.Equal(t, StringInSlice(ownerLock.Lock, nodeMap[ownerLock.Owner]), true)
		t.Logf("Lock成功写入%+v", ownerLock.Lock)
		assert.Equal(t, existsPath(conn, buildString(maxNodePath, "/", ownerLock.Owner)), true)
		t.Logf("议案成功写入%+v", buildString(maxNodePath, ownerLock.Owner))
		assert.Equal(t, group.topicClientMap[ownerLock.Lock].started, true)
		t.Logf("客户端成功启动%+v", ownerLock.Lock)
	}
	t.Logf("启动一个验证成功%+v", group.Owner)

	conn2 := connect()
	defer conn2.Close()
	var group2 = GetGroupUnit(conn2)
	time.Sleep(time.Second * 1)
	assert.Equal(t, 2, len(consumerSupplier()))

	maxNodePath = "/test-drc/test-group/proposal/no-0000000001"
	maxNodeData = getData(conn, maxNodePath)
	for k, _ := range nodeMap {
		delete(nodeMap, k)
	}
	if err := json.Unmarshal([]byte(maxNodeData), &nodeMap); err != nil {
		t.Fatalf("erro:%+v", err)
		return
	}
	time.Sleep(time.Second * 15)

	var g1 = filterLockOwner(getLockOwner(conn, lockPathList), func(it LockOwner) bool {
		return it.Owner == group.Owner
	})
	var g2 = filterLockOwner(getLockOwner(conn2, lockPathList), func(it LockOwner) bool {
		return it.Owner == group2.Owner
	})

	for _, ownerLock := range g1 {
		assert.Equal(t, StringInSlice(ownerLock.Lock, nodeMap[ownerLock.Owner]), true)
		t.Logf("g1Lock成功写入%+v", ownerLock.Lock)
		assert.Equal(t, existsPath(conn, buildString(maxNodePath, "/", ownerLock.Owner)), true)
		t.Logf("g1议案成功写入%+v", buildString(maxNodePath, ownerLock.Owner))
		assert.Equal(t, group.topicClientMap[ownerLock.Lock].started, true)
		t.Logf("g1客户端成功启动%+v", ownerLock.Lock)
	}

	for _, ownerLock := range g2 {
		assert.Equal(t, StringInSlice(ownerLock.Lock, nodeMap[ownerLock.Owner]), true)
		t.Logf("g2Lock成功写入%+v", ownerLock.Lock)
		assert.Equal(t, existsPath(conn, buildString(maxNodePath, "/", ownerLock.Owner)), true)
		t.Logf("g2议案成功写入%+v", buildString(maxNodePath, ownerLock.Owner))
		assert.Equal(t, group2.topicClientMap[ownerLock.Lock].started, true)
		t.Logf("g2客户端成功启动%+v", ownerLock.Lock)
	}
	t.Logf("启动二个验证成功: %+v,%+v", group.Owner, group2.Owner)



	conn3 := connect()
	defer conn3.Close()
	var group3 = GetGroupUnit(conn3)
	time.Sleep(time.Second * 1)
	assert.Equal(t, 3, len(consumerSupplier()))


	maxNodePath = "/test-drc/test-group/proposal/no-0000000002"
	maxNodeData = getData(conn, maxNodePath)
	for k, _ := range nodeMap {
		delete(nodeMap, k)
	}
	if err := json.Unmarshal([]byte(maxNodeData), &nodeMap); err != nil {
		t.Fatalf("erro:%+v", err)
		return
	}
	time.Sleep(time.Second * 15)

	g1 = filterLockOwner(getLockOwner(conn, lockPathList), func(it LockOwner) bool {
		return it.Owner == group.Owner
	})
	g2 = filterLockOwner(getLockOwner(conn2, lockPathList), func(it LockOwner) bool {
		return it.Owner == group2.Owner
	})
	var g3 = filterLockOwner(getLockOwner(conn3, lockPathList), func(it LockOwner) bool {
		return it.Owner == group3.Owner
	})

	for _, ownerLock := range g1 {
		assert.Equal(t, StringInSlice(ownerLock.Lock, nodeMap[ownerLock.Owner]), true)
		t.Logf("g1Lock成功写入%+v", ownerLock.Lock)
		assert.Equal(t, existsPath(conn, buildString(maxNodePath, "/", ownerLock.Owner)), true)
		t.Logf("g1议案成功写入%+v", buildString(maxNodePath, ownerLock.Owner))
		assert.Equal(t, group.topicClientMap[ownerLock.Lock].started, true)
		t.Logf("g1客户端成功启动%+v", ownerLock.Lock)
	}

	for _, ownerLock := range g2 {
		assert.Equal(t, StringInSlice(ownerLock.Lock, nodeMap[ownerLock.Owner]), true)
		t.Logf("g2Lock成功写入%+v", ownerLock.Lock)
		assert.Equal(t, existsPath(conn, buildString(maxNodePath, "/", ownerLock.Owner)), true)
		t.Logf("g2议案成功写入%+v", buildString(maxNodePath, ownerLock.Owner))
		assert.Equal(t, group2.topicClientMap[ownerLock.Lock].started, true)
		t.Logf("g2客户端成功启动%+v", ownerLock.Lock)
	}
	for _, ownerLock := range g3 {
		assert.Equal(t, StringInSlice(ownerLock.Lock, nodeMap[ownerLock.Owner]), true)
		t.Logf("g3Lock成功写入%+v", ownerLock.Lock)
		assert.Equal(t, existsPath(conn, buildString(maxNodePath, "/", ownerLock.Owner)), true)
		t.Logf("g3议案成功写入%+v", buildString(maxNodePath, ownerLock.Owner))
		assert.Equal(t, group3.topicClientMap[ownerLock.Lock].started, true)
		t.Logf("g3客户端成功启动%+v", ownerLock.Lock)
	}
	t.Logf("启动三个验证成功: %+v, %+v, %+v", group.Owner, group2.Owner, group3.Owner)


	conn4 := connect()
	defer conn4.Close()
	var group4 = GetGroupUnit(conn4)
	time.Sleep(time.Second * 1)
	assert.Equal(t, 4, len(consumerSupplier()))


	maxNodePath = "/test-drc/test-group/proposal/no-0000000003"
	maxNodeData = getData(conn, maxNodePath)
	for k, _ := range nodeMap {
		delete(nodeMap, k)
	}
	if err := json.Unmarshal([]byte(maxNodeData), &nodeMap); err != nil {
		t.Fatalf("erro:%+v", err)
		return
	}
	time.Sleep(time.Second * 15)
	g1 = filterLockOwner(getLockOwner(conn, lockPathList), func(it LockOwner) bool {
		return it.Owner == group.Owner
	})
	g2 = filterLockOwner(getLockOwner(conn2, lockPathList), func(it LockOwner) bool {
		return it.Owner == group2.Owner
	})
	g3 = filterLockOwner(getLockOwner(conn3, lockPathList), func(it LockOwner) bool {
		return it.Owner == group3.Owner
	})
	var g4 = filterLockOwner(getLockOwner(conn4, lockPathList), func(it LockOwner) bool {
		return it.Owner == group4.Owner
	})

	for _, ownerLock := range g1 {
		assert.Equal(t, StringInSlice(ownerLock.Lock, nodeMap[ownerLock.Owner]), true)
		t.Logf("g1Lock成功写入%+v", ownerLock.Lock)
		assert.Equal(t, existsPath(conn, buildString(maxNodePath, "/", ownerLock.Owner)), true)
		t.Logf("g1议案成功写入%+v", buildString(maxNodePath, ownerLock.Owner))
		assert.Equal(t, group.topicClientMap[ownerLock.Lock].started, true)
		t.Logf("g1客户端成功启动%+v", ownerLock.Lock)
	}

	for _, ownerLock := range g2 {
		assert.Equal(t, StringInSlice(ownerLock.Lock, nodeMap[ownerLock.Owner]), true)
		t.Logf("g2Lock成功写入%+v", ownerLock.Lock)
		assert.Equal(t, existsPath(conn, buildString(maxNodePath, "/", ownerLock.Owner)), true)
		t.Logf("g2议案成功写入%+v", buildString(maxNodePath, ownerLock.Owner))
		assert.Equal(t, group2.topicClientMap[ownerLock.Lock].started, true)
		t.Logf("g2客户端成功启动%+v", ownerLock.Lock)
	}
	for _, ownerLock := range g3 {
		assert.Equal(t, StringInSlice(ownerLock.Lock, nodeMap[ownerLock.Owner]), true)
		t.Logf("g3Lock成功写入%+v", ownerLock.Lock)
		assert.Equal(t, existsPath(conn, buildString(maxNodePath, "/", ownerLock.Owner)), true)
		t.Logf("g3议案成功写入%+v", buildString(maxNodePath, ownerLock.Owner))
		assert.Equal(t, group3.topicClientMap[ownerLock.Lock].started, true)
		t.Logf("g3客户端成功启动%+v", ownerLock.Lock)
	}
	for _, ownerLock := range g4 {
		assert.Equal(t, StringInSlice(ownerLock.Lock, nodeMap[ownerLock.Owner]), true)
		t.Logf("g4Lock成功写入%+v", ownerLock.Lock)
		assert.Equal(t, existsPath(conn, buildString(maxNodePath, "/", ownerLock.Owner)), true)
		t.Logf("g4议案成功写入%+v", buildString(maxNodePath, ownerLock.Owner))
		assert.Equal(t, group4.topicClientMap[ownerLock.Lock].started, true)
		t.Logf("g4客户端成功启动%+v", ownerLock.Lock)
	}
	t.Logf("启动四个验证成功: %+v, %+v, %+v, %+v", group.Owner, group2.Owner, group3.Owner, group4.Owner)

	conn5 := connect()
	defer conn5.Close()
	var group5 = GetGroupUnit(conn5)
	time.Sleep(time.Second * 15)
	assert.Equal(t, 5, len(consumerSupplier()))
	var g5 = filterLockOwner(getLockOwner(conn5, lockPathList), func(it LockOwner) bool {
		return it.Owner == group5.Owner
	})
	assert.Equal(t, len(g5), 0)
	t.Logf("g5客户端没有分配到%+v", len(g5))
	t.Logf("启动五个验证成功: %+v, %+v, %+v, %+v, %+v", group.Owner, group2.Owner, group3.Owner, group4.Owner, group5.Owner)
}

6.能在消费侧提升server的生产力吗?

消费侧的吞吐量解决了,生产侧也存在者生产不足的问题,drc也提供了hash版的,将topi-hash成指定的分配,

我在想如果每个topic按照table的维度过滤,把一个topic按照table的维度过滤,单独维护每个table的checkpoint,启动table数量多client去和server建立连接去订阅消费,是不是在消费侧也能提升生产力。

go-drc-client 文档地址

go-zk-drc-client 文档地址

相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
负载均衡 Dubbo 应用服务中间件
Nginx系列教程(11) - HTTP动态负载均衡(一)
Nginx系列教程(11) - HTTP动态负载均衡(一)
196 0
|
负载均衡 应用服务中间件 Linux
Nginx系列教程(12) - HTTP动态负载均衡(二)
Nginx系列教程(12) - HTTP动态负载均衡(二)
209 1
|
缓存 运维 负载均衡
通过Nginx、Consul、Upsync实现动态负载均衡和服务平滑发布
前段时间顺利地把整个服务集群和中间件全部从UCloud迁移到阿里云,笔者担任了架构和半个运维的角色。这里详细记录一下通过Nginx、Consul、Upsync实现动态负载均衡和服务平滑发布的核心知识点和操作步骤,整个体系已经在生产环境中平稳运行。编写本文使用的虚拟机系统为CentOS7.x,虚拟机的内网IP为192.168.56.200。
571 0
通过Nginx、Consul、Upsync实现动态负载均衡和服务平滑发布
|
JSON 缓存 负载均衡
通过 Consul+OpenResty 实现无reload动态负载均衡
动态Nginx负载均衡的配置,可以通过Consul+Consul-Template方式,但是每次发现配置变更都需要reload Nginx,通过balancer_by_lua除可以实现动态负载均衡外,还可以实现个性化负载均衡算法。
2656 0
|
6月前
|
负载均衡 算法 应用服务中间件
面试题:Nginx有哪些负载均衡算法?Nginx位于七层网络结构中的哪一层?
字节跳动面试题:Nginx有哪些负载均衡算法?Nginx位于七层网络结构中的哪一层?
129 0
|
6月前
|
负载均衡 应用服务中间件 API
Nginx配置文件详解Nginx负载均衡Nginx静态配置Nginx反向代理
Nginx配置文件详解Nginx负载均衡Nginx静态配置Nginx反向代理
171 4
|
5月前
|
缓存 负载均衡 算法
解读 Nginx:构建高效反向代理和负载均衡的秘密
解读 Nginx:构建高效反向代理和负载均衡的秘密
122 2
|
4月前
|
负载均衡 算法 应用服务中间件
nginx自定义负载均衡及根据cpu运行自定义负载均衡
nginx自定义负载均衡及根据cpu运行自定义负载均衡
85 1
|
4月前
|
运维 负载均衡 算法
SLB与NGINX的异同是什么
SLB与NGINX的异同是什么
436 2
|
6月前
|
负载均衡 应用服务中间件 nginx
解决nginx配置负载均衡时invalid host in upstream报错
在Windows环境下,配置Nginx 1.11.5进行负载均衡时遇到问题,服务无法启动。错误日志显示“invalid host in upstream”。检查发现上游服务器列表中,192.168.29.128的主机地址无效。负载均衡配置中,两个服务器地址前误加了"http://"。修正方法是删除上游服务器列表和proxy_pass中的"http://"。问题解决后,Nginx服务应能正常启动。
494 4
解决nginx配置负载均衡时invalid host in upstream报错