分片集群的运维一直都是MongoDB比较复杂的部分,最近遇到了较多客户对于MongoDB 6.0版本分片集群实例的数据均衡情况存在疑问,特写此文章介绍下MongoDB 6.0.3版本中内核关于Balancer的改动。
历史行为
Mongos会根据集合分片键将数据和命令分散到不同Shard上去,Shard上的数据是以Chunk为最小单位进行迁移的。哈希分片默认每个分片上创建2个Chunk,范围分片默认在主分片上创建1个Chunk。
每个Chunk默认最大64MB,插入/更新命令将Chunk写满后会自动进行分裂(AutoSplit)。分裂后会造成单分片上Chunk数量增加,在Balancer开启的情况下,每10s(kBalanceRoundDefaultInterval)会进行一次探测,发现有Chunk数量不相等的情况且数量差异超出阈值,就会进行Chunk的迁移,达到数据分布均衡的目的。
关于Chunk数量的差异阈值如下:
3.2版本
- Chunk总数 < 20,迁移阈值为2
- 20 ≤ Chunk总数 < 80,迁移阈值为4
- 80 ≤ Chunk总数,迁移阈值为8
- 3.4、4.0版本
- 默认迁移阈值为2
- 如果Chunk总数小于20个,或者上次迁移的Chunk数量小于20个,迁移阈值则为1
- 4.2、4.4、5.0版本
- 迁移阈值为1
这里的阈值信息在6.0之前,官方文档一直没有更新,实际代码和文档中是不一致的
产生问题
Chunk数量差异做为迁移标准可能会导致一些非预期情况,比如由于分片键区分度不明显导致的Jumbo Chunk、范围分片键含递增属性导致的写入倾斜,等一系列数据分布不均问题。
以前者举例,分片键的区分度不足可能会导致同一个Chunk中所有文档对应的分片键都是一样的,文档全部堆积在一个Chunk中,导致这个Chunk变的非常大,超过默认最大64MB的设定。这种Jumbo Chunk无法自动根据分片键的差异来进行分割,因此最终会出现不同分片上的Chunk数量是相同的,但分片的磁盘空间使用大小却不同的情况。这样就和我们预期的数据均衡分布相违背了,同时Jumbo Chunk会导致Move Chunk失败,从而无法删除分片,遇到后只能手动维护,处理起来比较麻烦。
同时,频繁的Move Chunk会消耗很多CPU资源,这可能会对业务上造成一定影响,严重时甚至无法返回命令结果,利用真实的数据大小作为迁移标准,也会一定程度上减少这类问题。
现在行为
MongoDB内核从6.0.3小版本开始更改了Balancer的默认行为决策。
对应阿里云MongoDB的内核小版本为7.0.1
- 探测是否需要Move Chunk的标准变成了:不同分片上同一集合的数据大小[1]情况
- 需要迁移的阈值变成了集合大小的差异超过384MB[2],即三倍的Chunk Size
- 阿里云MongoDB从6.0版本开始,每个Chunk默认最大128MB(kDefaultMaxChunkSizeBytes)
- Chunk在写满后不会进行自动分割[3],只会在迁移前进行必要的分割[4]
- 块(Chunk)现在被称为范围(Range)
moveChunk
被moveRange
取代了[5]- 对于集合分片前不再需要对db执行
enableSharding
命令[6]
现在不能通过在Mongos上执行sh.status()来查看各分片Chunk数量判断数据是否均衡,Chunk的数量差异会随着分片策略以及分片键值的不同,有较大区别;需要通过getShardDistribution()
来查看每个集合的数据分布情况。后续我们也无需关注Chunk这个逻辑上的概念,只专注于集合数据大小本身即可。
这里的data是指db.coll.stats().size,即未压缩的数据空间大小,会大于磁盘上压缩后的数据大小
下面的SQL将"myDB"更换为所需要查询的DB名称即可。
vardbName="myDB"; db.getSiblingDB(dbName).getCollectionNames().forEach(function(collName) { print("————————————————————————"); print("Collection: "+collName); db.getSiblingDB(dbName).getCollection(collName).getShardDistribution(); });
源码解析
基于MongoDB 6.0.3 内核版本,部分实现和之前版本有差异
ConfigServer的主节点启动后,会执行一个onStepUpComplete()
函数,会调用包含的所有service对此虚函数的实现。
voidReplicationCoordinatorImpl::signalDrainComplete(OperationContext*opCtx, longlongtermWhenBufferIsEmpty) noexcept { ... LOGV2(6015310, "Starting to transition to primary."); ... ReplicaSetAwareServiceRegistry::get(_service).onStepUpComplete(opCtx, firstOpTime.getTerm()); ... } voidReplicaSetAwareServiceRegistry::onStepUpComplete(OperationContext*opCtx, longlongterm) { std::for_each(_services.begin(), _services.end(), [&](ReplicaSetAwareInterface*service) { service->onStepUpComplete(opCtx, term); }); }
类的继承关系为Balancer
->ReplicaSetAwareServiceConfigSvr
->ReplicaSetAwareService
,Balancer类中的onStepUpComplete()
函数会创建一个线程执行_mainThread()
并将其赋值给_thread
变量。
Balancer主要有两个相关的线程。一个是在此处赋值的_thread
,主要负责集合数据平衡的判定以及构建对应请求;另一个是在_mainThread()中会赋值的_actionStreamConsumerThread
,主要负责查看是否需要进行Chunk碎片整理以及构建对应请求。
Chunk要进行碎片整理是因为,分片集合的数据被不必要分割成大量的小Chunk后,可能导致在该集合上CRUD操作的执行时间变长,希望通过将一些较小的Chunk合并为一个较大的Chunk,来减少Chunk的数量,从而缩短CRUD操作的执行时间。
如果没有单独使用configureCollectionBalancing
对集合设置defragmentCollection: true
,这里不会进行碎片整理操作。同时7.0的版本相较于6.0又做了一些改动,后续普遍情况下,不需要对集合Chunk进行碎片整理,因此本文对此改动不做过多介绍。
voidBalancer::onStepUpComplete(OperationContext*opCtx, longlongterm) { initiateBalancer(opCtx); } voidBalancer::initiateBalancer(OperationContext*opCtx) { stdx::lock_guard<Latch>scopedLock(_mutex); invariant(_state==kStopped); _state=kRunning; invariant(!_thread.joinable()); invariant(!_actionStreamConsumerThread.joinable()); invariant(!_threadOperationContext); _thread=stdx::thread([this] { _mainThread(); }); }
_mainThread
过程如下:
balancerConfig->refreshAndCheck()
查询config库更新元信息配置,有差异则更新成员变量
- _balancerSettings <=> {"_id":"balancer"}
- _maxChunkSizeBytes <=> {"_id":"chunksize"}
- _shouldAutoSplit <=> {"_id":"autosplit"}
_commandScheduler->start()
开启命令调度器
_workerThreadHandle = stdx::thread([this] { _workerThread(); });
- 依赖
_stateUpdatedCV
变量决定此时是否需要执行命令
- _stateUpdatedCV是一个继承了
std::condition_variable_any
自定义类型变量,利用父函数的.wait()
/.notifyAll()
等达到多线程联动的目的
- 不断的根据
_unsubmittedRequestIds
,在_requests
中拿到对应要执行的请求 _submit()
利用(*_executor)->scheduleRemoteCommand()
真正执行命令拿到结果
_consumeActionStreamLoop()
开启第二个主要线程,处理Chunk碎片整理相应逻辑
- 依赖
_defragmentationCondVar
变量决定此时是否需要执行命令,类似_stateUpdatedCV getNextStreamingAction()
查看下一个需要碎片整理的动作以及对应的集合/范围等信息
- mergeAction/dataSizeAction/splitVectorAction/splitAction
- 利用std::visit调用实际命令类型对应的构建请求函数
_commandScheduler->requestMergeChunks()
_buildAndEnqueueNewRequest()
构建并插入对应请求
- 将添加到
_requests
这个unordered_map中 - 将
requestId
添加到_unsubmittedRequestIds
这个vector中 - 利用
_stateUpdatedCV.notify_all()
通知scheduler消费
进入主循环:
refreshAndCheck()
查询config库更新元信息配置- 此时不需要balance则跳过此次循环
- Balancer处于关闭状态(kOff)
- 只需要AutoSplit时Balancer(kAutoSplitOnly)
- 此时不处于设定的Balancer Window时间
_defragmentationCondVar.notify_all();
- 对于所有集合进行
_defragmentationPolicy->startCollectionDefragmentation()
_defragmentationStates
放置集合碎片整理阶段元信息
_splitChunksIfNeeded()
根据ShardTagRange查看是否需要分割块[4]
selectChunksToSplit()
查找是否有需要分割的块
getCollections()
获取所有集合shuffle()
打乱集合顺序- 对于
config.system.sessions
集合,调用getSplitCandidatesForSessionsCollection()
返回需要分割的信息
- 如果chunk数量小于
minNumChunksForSessionsCollection
(默认为1024),将此集合的key划分成minNumChunksForSessionsCollection向上四舍五入的二的幂次方个 - 本质是为了防止
config.system.sessions
因为数据太少导致不会被分割,而集中在主分片,导致主分片负载高的问题
- 对于其他普通集合,调用
_getSplitCandidatesForCollection()
返回需要分割的信息。如果没有Shard Tag的话,这里不会做分割处理。
createCollectionDistributionStatus()
获取这个集合所有分片上的Chunk信息
shardToChunksMap[chunkEntry.getShardId()].push_back(chunk);
addTagsFromCatalog()
从Catalog的标签中加载集合现有TagRange情况
getSplitCandidatesToEnforceTagRanges()
根据自定义TagRange添加分割点
- 找到第一个chunk_max > range_min的chunk作为chunkAtZoneMin
- 找到第一个chunk_max > range_max的chunk作为chunkAtZoneMax
- 如果chunkAtZoneMin的chunk_min不等于range_min,就在range_min处添加分割点
- 如果chunkAtZoneMax的chunk_min和chunk_max都不等于range_max,就在range_max处添加分割点
splitChunkAtMultiplePoints()
发送分割块的命令到对应的分片上
runCommandWithFixedRetryAttempts()
发送splitChunk命令bsonExtractTypedField()
获取返回结果中的shouldMigrate字段
_defragmentationPolicy->selectChunksToMove()
查找是否有需要合并的Chunk碎片。如果没有对集合开启碎片整理的话,这里不会做相关操作。_chunkSelectionPolicy->selectChunksToMove()
查找是否有需要移动的块
getCollections()
获取所有集合shuffle()
打乱集合顺序getDataSizeInfoForCollections()
获取分片上集合大小信息[1]
- 如果
gBalanceAccordingToDataSize
这个变量为true才会进入这段逻辑。src/mongo/s/sharding_feature_flags.idl中定义的此变量,6.0版本默认为true。 - 向分片发送内部命令
_shardsvrGetStatsForBalancing
获取collSize
- 按批次调用
_getMigrateCandidatesForCollection()
读取Chunk分布,一批含20个集合
balance()
确定具体迁移信息(根据之前是否通过getDataSizeInfoForCollections()获取了collDataSizeInfo来做逻辑判定,此处只阐述按数据集大小作为标准的分支逻辑)
- 先处理要删除的Draining Shard
_getLeastLoadedReceiverShard()
查找含有集合数据最少的分片作为to shard
- 其次是不满足Shard Tag的
_getLeastLoadedReceiverShard()
查找含有集合数据最少的分片作为to shard
- 最后是不满足数据均衡的:
_singleZoneBalanceBasedOnDataSize()
以数据大小为标准
_getMostOverloadedShard()
查找含有集合数据最多的分片作为from shard_getLeastLoadedReceiverShard()
查找含有集合数据最少的分片作为to shardif (fromSize - toSize < 3 * collDataSizeInfo.maxChunkSizeBytes) { return false; }
查看是否满足迁移阈值[2]- 构造迁移任务
_moveChunks
执行块的移动
- 大版本是6.0以下的会走
_commandScheduler->requestMoveChunk()
- 6.0及以上走
_commandScheduler->requestMoveRange()
[5]
- 将已有信息包装成
ShardsvrMoveRange
类 _buildAndEnqueueNewRequest()
构建并插入对应请求
- 将添加到
_requests
这个unordered_map中 - 将
requestId
添加到_unsubmittedRequestIds
这个vector中 - 利用
_stateUpdatedCV.notify_all()
通知scheduler消费
- 一次循环结束,如果没有做迁移就睡眠1s,如果有迁移就睡眠10s
voidBalancer::_mainThread() { // ...Client::initThread("Balancer"); // ...LOGV2(21856, "CSRS balancer is starting"); // ...constSecondskInitBackoffInterval(10); autobalancerConfig=shardingContext->getBalancerConfiguration(); while (!_stopRequested()) { StatusrefreshStatus=balancerConfig->refreshAndCheck(opCtx.get()); // ...break; } LOGV2(6036605, "Starting command scheduler"); // ..._actionStreamConsumerThread=stdx::thread([&] { _consumeActionStreamLoop(); }); LOGV2(6036606, "Balancer worker thread initialised. Entering main loop."); autolastDrainingShardsCheckTime{Date_t::fromMillisSinceEpoch(0)}; while (!_stopRequested()) { BalanceRoundDetailsroundDetails; _beginRound(opCtx.get()); try { // ... StatusrefreshStatus=balancerConfig->refreshAndCheck(opCtx.get()); // ...if (!balancerConfig->shouldBalance() ||_stopRequested() ||_clusterChunksResizePolicy->isActive()) { if (balancerConfig->getBalancerMode() ==BalancerSettingsType::BalancerMode::kOff&&Date_t::now() -lastDrainingShardsCheckTime>=kDrainingShardsCheckInterval) { constautodrainingShardNames{getDrainingShardNames(opCtx.get())}; if (!drainingShardNames.empty()) { LOGV2_WARNING(6434000, "Draining of removed shards cannot be completed because the ""balancer is disabled", "shards"_attr=drainingShardNames); } lastDrainingShardsCheckTime=Date_t::now(); } LOGV2_DEBUG(21859, 1, "Skipping balancing round because balancing is disabled"); _endRound(opCtx.get(), kBalanceRoundDefaultInterval); continue; } // ... { LOGV2_DEBUG(21860, 1, "Start balancing round. waitForDelete: {waitForDelete}, ""secondaryThrottle: {secondaryThrottle}", "Start balancing round", "waitForDelete"_attr=balancerConfig->waitForDelete(), "secondaryThrottle"_attr=balancerConfig->getSecondaryThrottle().toBSON()); // ... { OperationContext*ctx=opCtx.get(); autoallCollections=Grid::get(ctx)->catalogClient()->getCollections(ctx, {}); for (constauto&coll : allCollections) { _defragmentationPolicy->startCollectionDefragmentation(ctx, coll); } } Statusstatus=_splitChunksIfNeeded(opCtx.get()); if (!status.isOK()) { LOGV2_WARNING(21878, "Failed to split chunks due to {error}", "Failed to split chunks", "error"_attr=status); } else { LOGV2_DEBUG(21861, 1, "Done enforcing tag range boundaries."); } stdx::unordered_set<ShardId>usedShards; constautochunksToDefragment=_defragmentationPolicy->selectChunksToMove(opCtx.get(), &usedShards); constautochunksToRebalance=uassertStatusOK( _chunkSelectionPolicy->selectChunksToMove(opCtx.get(), &usedShards)); if (chunksToRebalance.empty() &&chunksToDefragment.empty()) { LOGV2_DEBUG(21862, 1, "No need to move any chunk"); _balancedLastTime=0; } else { _balancedLastTime=_moveChunks(opCtx.get(), chunksToRebalance, chunksToDefragment); roundDetails.setSucceeded( static_cast<int>(chunksToRebalance.size() +chunksToDefragment.size()), _balancedLastTime); ShardingLogging::get(opCtx.get()) ->logAction(opCtx.get(), "balancer.round", "", roundDetails.toBSON()) .ignore(); } LOGV2_DEBUG(21863, 1, "End balancing round"); } MillisecondsbalancerInterval=_balancedLastTime?kShortBalanceRoundInterval : kBalanceRoundDefaultInterval; // ..._endRound(opCtx.get(), balancerInterval); } catch (constDBException&e) { // ... } } // ...LOGV2(21867, "CSRS balancer is now stopped"); }
Shard在收到moveRange命令后,执行ShardsvrMoveRangeCommand
对应的typedRun()
- 创建migrationSourceManager变量时,在其构造函数中会根据MigrationInfo中只设置了minKey没有设置maxKey来判断此次是一个特殊的MoveRange
- 然后执行
autoSplitVector()
来根据Chunk大小计算出需要迁移的maxKey
avgDocSize = dataSize / totalLocalCollDocuments;
计算集合中每条文档的平均大小maxDocsPerChunk = maxChunkSizeBytes / avgDocSize;
计算每个Chunk包含的最多文档条数- 从minKey开始遍历,直到第maxDocsPerChunk条文档,作为分割点,不断向前找直到找出
limit+1
个分割点(此处limit为1)。
- 如果上一次的分割点对应的Shard Key和这次的一样,那么不会记录这次的点位(分片键区分度较低)
- 为了防止切割后的Chunk过小,如果被分割出来的最右侧Chunk包含的文档个数小于maxDocsPerChunk的0.8倍,则会重新计算分割点(最多计算最后三个分割点)
maxDocsPerNewChunk = maxDocsPerChunk - ((maxDocsPerChunk - numScannedKeys) / (nSplitPointsToReposition + 1));
- 本质是将所有文档条数加在一起,除以保留分割点个数+1,得到新的每个Chunk包含的最多文档条数后再去计算分割点。即
(nSplitPointsToReposition * maxDocsPerChunk + numScannedKeys) / nSplitPointsToReposition + 1
算出所有分割点后,只保留第limit(1)个,返回对应的点位作为maxKey
- 从minKey到maxKey这些文档会被逐步复制到to shard上(Upsert),然后批量删除掉from shard上的孤儿文档
staticvoid_runImpl(OperationContext*opCtx, ShardsvrMoveRange&&request, WriteConcernOptions&&writeConcern) { // ...MigrationSourceManagermigrationSourceManager( opCtx, std::move(request), std::move(writeConcern), donorConnStr, recipientHost); migrationSourceManager.startClone(); migrationSourceManager.awaitToCatchUp(); migrationSourceManager.enterCriticalSection(); migrationSourceManager.commitChunkOnRecipient(); migrationSourceManager.commitChunkMetadataOnConfig(); } MigrationSourceManager::MigrationSourceManager(OperationContext*opCtx, ShardsvrMoveRange&&request, WriteConcernOptions&&writeConcern, ConnectionStringdonorConnStr, HostAndPortrecipientHost) : //... { LOGV2(22016, "Starting chunk migration donation {requestParameters} with expected collection epoch ""{collectionEpoch}", "Starting chunk migration donation", "requestParameters"_attr=redact(_args.toBSON({})), "collectionEpoch"_attr=_args.getEpoch()); // ...if (!_args.getMax().is_initialized()) { constauto&min=*_args.getMin(); constautocm=collectionMetadata.getChunkManager(); constautoowningChunk=cm->findIntersectingChunkWithSimpleCollation(min); constautomax=computeMaxBound(_opCtx, nss(), min, owningChunk, cm->getShardKeyPattern(), _args.getMaxChunkSizeBytes()); _args.getMoveRangeRequestBase().setMax(max); _moveTimingHelper.setMax(max); } // ...} BSONObjcomputeMaxBound(OperationContext*opCtx, constNamespaceString&nss, constBSONObj&min, constChunk&owningChunk, constShardKeyPattern&skPattern, constlonglongmaxChunkSizeBytes) { auto [splitKeys, _] =autoSplitVector( opCtx, nss, skPattern.toBSON(), min, owningChunk.getMax(), maxChunkSizeBytes, 1); if (splitKeys.size()) { returnstd::move(splitKeys.front()); } returnowningChunk.getMax(); }
6.0版本还新增了很多flag变量来做逻辑特判
- 在insert命令时,有一条链路是
_insertDocuments()
=>onInserts()
=>incrementChunkOnInsertOrUpdate()
,在incrementChunkOnInsertOrUpdate()
中决定了是否要自动分割。
判定的条件第一个为gNoMoreAutoSplitter
是否为false,此参数在src/mongo/s/sharding_feature_flags.idl
中定义,在6.0的版本默认为true,因此6.0的版本在插入/更新数据的时候不会进行自动分割[3]
if (!feature_flags::gNoMoreAutoSplitter.isEnabled( serverGlobalParams.featureCompatibility) &&balancerConfig->getShouldAutoSplit() &&chunkManager.allowAutoSplit() &&chunkWritesTracker->shouldSplit(maxChunkSizeBytes)) { autochunkSplitStateDriver=ChunkSplitStateDriver::tryInitiateSplit(chunkWritesTracker); if (chunkSplitStateDriver) { ChunkSplitter::get(opCtx).trySplitting(std::move(chunkSplitStateDriver), nss, chunk.getMin(), chunk.getMax(), dataWritten); } }
- 在shardCollection的时候会走
CreateCollectionCoordinator::_checkCommandArguments
的检查逻辑,判断创建的这个集合元数据是否合理。
判定的条件为gEnableShardingOptional
是否为false,此参数在src/mongo/s/sharding_feature_flags.idl
中定义,在6.0的版本默认为true,因此6.0的版本无需sh.enableSharding("dbName")
对库进行enableSharding后才能对集合进行分片[6]
if (!feature_flags::gEnableShardingOptional.isEnabled( serverGlobalParams.featureCompatibility)) { constautodbEnabledForSharding= [&, this] { autocatalogCache=Grid::get(opCtx)->catalogCache(); autodbInfo=uassertStatusOK(catalogCache->getDatabase(opCtx, nss().db())); if (!dbInfo->getSharded()) { sharding_ddl_util::linearizeCSRSReads(opCtx); dbInfo=uassertStatusOK(catalogCache->getDatabaseWithRefresh(opCtx, nss().db())); } returndbInfo->getSharded(); }(); uassert(ErrorCodes::IllegalOperation, str::stream() <<"sharding not enabled for db "<<nss().db(), dbEnabledForSharding); }
现象复盘
经过上面的源码分析我们可以发现,如果分片集群我们只使用最基本的功能,不添加Shard Tag,不启用碎片整理,那么Chunk是永远不会被自动“分割”的。但实际上我们会发现,在某些写入情况下Chunk的数量并不是一直固定的,虽然数据分布是均衡的,但可能还是会引起疑问,为什么某个分片的Chunk数量增加了?
以含递增趋势的范围分片键举例,会产生一个分片只有1个Chunk,而另一个分片有非常多Chunk的情况:
哈希分片出现这样的情况比较少,因为分片键值会被哈希成较均衡的分布情况
- 默认在主分片ShardA上创建一个Chunk1,包含了
minKey ~ maxKey
- 由于递增属性导致写入集中在ShardA上,ShardA_Chunk1不断增大直至达到迁移阈值
- Balancer将
minKey ~ splitKey1
迁移到ShardB
minKey ~ splitKey1
在ShardB_Chunk1上splitKey1 ~ maxKey
在ShardA_Chunk1上
- 继续写入更大的Key,依旧落在ShardA_Chunk1上,直至迁移阈值
- Balancer将splitKey1 ~ splitKey2迁移到ShardB,由于ShardB的Chunk1最大Key为splitKey1,因此复制过来的splitKey1 ~ splitKey2会放到ShardB_Chunk2上
minKey ~ splitKey1
在ShardB_Chunk1上splitKey1 ~ splitKey2
在ShardB_Chunk2上splitKey2 ~ maxKey
在ShardA_Chunk1上
- 不断进行这样的写入,最终ShardB会包含非常多Chunk,而ShardA只有1个Chunk