MongoDB分片迁移原理与源码
源码
下面将从源码角度分析与迁移相关的若干过程,源码基于MongoDB-4.0.3版本。
split chunk
split chunks 一般是在插入、更新、删除数据时,由 mongos 发出到分片的 splitVector 命令,此时分片才会判断是否需要 split。
_runAutosplit()函数
//默认的chunk最大字节数。该大小可以调整,范围为[1,1024]M
const uint64_t ChunkSizeSettingsType::kDefaultMaxChunkSizeBytes{64 * 1024 * 1024};
/*系统会调度一个自动split的任务,而任务会调用下述接口。该接口会确定是否应该分割指定的块,然后执行任何必要的分割。它还可以执行“top chunk”优化,其中包含MaxKey或MinKey的结果块将被移到另一个碎片上,以减轻原始所有者的负载*/
void _runAutosplit(const NamespaceString& nss,
const BSONObj& min,
const BSONObj& max,
long dataWritten) {
//......
//经过一些参数判断,比如判断根据min获取的chunk包含的range是否与要split的range相同;是否打开了自动split等;
//调用splitVector来判断是否需要split
auto splitPoints = uassertStatusOK(splitVector(opCtx.get(),
nss,
cm->getShardKeyPattern().toBSON(),
chunk.getMin(),
chunk.getMax(),
false,
boost::none,
boost::none,
boost::none,
maxChunkSizeBytes));
if (splitPoints.size() <= 1) {
/*没有分割点意味着没有足够的数据可供分割;一个分割点意味着我们有一半的块大小到完整的块大小,所以还没有必要分割*/
return;
}
//......
//进行实际的split操作
uassertStatusOK(splitChunkAtMultiplePoints(opCtx.get(),
chunk.getShardId(),
nss,
cm->getShardKeyPattern(),
cm->getVersion(),
chunkRange,
splitPoints));
//判断是否需要进行balance;包括判断支持的balance设定为kAutoSplitOnly,即只支持在自动split后balance;以及发生split的nss支持balance;
const bool shouldBalance = isAutoBalanceEnabled(opCtx.get(), nss, balancerConfig);
//如果启用了autobalance选项,并且作为顶部块优化的一部分在集合的第一个块或最后一个块进行分割,则平衡结果块。
if (!shouldBalance || topChunkMinKey.isEmpty()) {
return;
}
//尝试将顶部块移出shard,以防止热点停留在单个shard上。这是基于以下假设:后续插入将落在顶部块上。这是因为split触发的一次move。
moveChunk(opCtx.get(), nss, topChunkMinKey);
}
splitVector()函数
/*给定一个块,确定它是否可以分割,如果可以,则返回分割点。这个函数的功能相当于splitVector命令。如果指定了maxSplitPoints,并且有多个“maxSplitPoints”拆分点,则只返回第一个“maxSplitPoints”拆分点。
如果指定了maxChunkObjects,那么它指示拆分每个“maxChunkObjects”的th键。
默认情况下,我们将数据块分割,这样每个新数据块大约有maxChunkSize数据块一半的键。我们只分割“maxChunkObjects”的第一个键,如果它将分割的键数低于默认值。maxChunkSize是块的最大大小(以兆字节为单位)。如果数据块超过这个大小,我们应该分块。虽然maxChunkSize和maxChunkSizeBytes是boost::optional,但至少必须指定一个。如果设置了force,则在块的中点处进行分割。这也有效地使maxChunkSize等于块的大小。
*/
StatusWith<std::vector<BSONObj>> splitVector(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj& keyPattern,
const BSONObj& min,
const BSONObj& max,
bool force,
boost::optional<long long> maxSplitPoints,
boost::optional<long long> maxChunkObjects,
boost::optional<long long> maxChunkSize,
boost::optional<long long> maxChunkSizeBytes) {
// maxChunkObjects一直有默认值。kMaxObjectPerChunk=25000
if (!maxChunkObjects) {
maxChunkObjects = kMaxObjectPerChunk;
}
//......
//获取集合相关信息
const long long recCount = collection->numRecords(opCtx);
const long long dataSize = collection->dataSize(opCtx);
/*现在我们已经有了大小估计,检查一下其余的参数,并应用这里指定的最大大小限制。强制分割相当于让maxChunkSize等于当前块的大小,下面的逻辑将把这一大块分成两半*/
if (force) {
maxChunkSize = dataSize;
} else if (!maxChunkSize) {
if (maxChunkSizeBytes) {
maxChunkSize = maxChunkSizeBytes.get();
}
} else {
maxChunkSize = maxChunkSize.get() * 1 << 20;
}
//我们需要一个最大的块大小,除非我们实际上不能找到任何分裂点。
if ((!maxChunkSize || maxChunkSize.get() <= 0) && recCount != 0) {
return {ErrorCodes::InvalidOptions, "need to specify the desired max chunk size"};
}
//如果没有足够的数据来处理多个块,就没有必要继续了。
if (dataSize < maxChunkSize.get() || recCount == 0) {
std::vector<BSONObj> emptyVector;
return emptyVector;
}
//我们将使用平均对象大小和对象数量来找到每个块应该拥有的键数。如果提供了maxChunkSize或maxChunkObjects,我们将按其一半进行拆分。
const long long avgRecSize = dataSize / recCount;
long long keyCount = maxChunkSize.get() / (2 * avgRecSize);
if (maxChunkObjects.get() && (maxChunkObjects.get() < keyCount)) {
log() << "limiting split vector to " << maxChunkObjects.get() << " (from " << keyCount
<< ") objects ";
keyCount = maxChunkObjects.get();
}
/*遍历索引并将第keyCount个键添加到结果中。如果这个键之前出现在结果中,我们就忽略它。这里的不变式是,给定键值的所有实例都位于同一块中。*/
//......
/*使用每个第keyCount个键作为一个分裂点。我们添加初始键作为标记,在结束时移除。如果一个键出现的次数超过块上允许的条目数,我们将发出警告并对下面的键进行拆分。*/
//......
//返回所有分裂点
}
splitChunkAtMultiplePoints()函数会调用splitChunk()函数进行分裂操作
StatusWith<boost::optional<ChunkRange>> splitChunk(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj& keyPatternObj,
const ChunkRange& chunkRange,
const std::vector<BSONObj>& splitKeys,
const std::string& shardName,
const OID& expectedCollectionEpoch) {
//......
//将split信息提交到config服务器,使用的“_configsvrCommitChunkSplit”命令
auto request =
SplitChunkRequest(nss, shardName, expectedCollectionEpoch, chunkRange, splitKeys);
auto configCmdObj =
request.toConfigCommandBSON(ShardingCatalogClient::kMajorityWriteConcern.toBSON());
auto cmdResponseStatus =
Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
opCtx,
kPrimaryOnlyReadPreference,
"admin",
configCmdObj,
Shard::RetryPolicy::kIdempotent);
//......
}
balance
MongoDB balancer 是一个后台进程,它监视每个分片上的块的数量。当给定分片上的块数量达到特定的迁移阈值时,平衡器尝试在分片之间自动迁移块,并在每个分片上达到相同数量的块。
切分集群的平衡过程对用户和应用程序层是完全透明的,尽管在此过程中可能会有一些性能影响。
从MongoDB 3.4开始,balancer在config服务器副本集(CSRS)的主节点上运行.
balancer 基本过程大致相同:
- config.shards 读取分片信息;
- config.collections 读取所有集合信息,并且随机排序保存到一个数组中;
- 对每个集合从 config.chunks 读取 chunks 的信息;
- 含有最多 chunks 数量 (maxChunksNum)的分片为源分片,含有最少 chunks 数量(minChunksNum)的分片为目的分片; 如果 maxChunksNum - idealNumberOfChunksPerShardForTag(每个碎片的最优块数的上限) 大于迁移的阈值 (threshold), 那么就是不均衡状态,需要迁移,源分片的 chunks 第一个 chunk 为待迁移的 chunk ,构造一个迁移任务(源分片,目的分片,chunk)。
构造迁移任务时,如果某个集合含有最多数量的分片或者最少数量 chunks 的分片,已经属于某一个迁移任务,那么此集合本轮 balancer 不会发生迁移,即,一个分片不能同时参与多个块的迁移。要从一个分片迁移多个块,平衡器一次迁移一个块。。最后,本次检测出的迁移任务完成以后才开始下次 balancer 过程。
balancer 过程中,会对集合做一次随机排序,当有多个集合的数据需要均衡时,迁移时也是随机的,并不是迁移完一个集合开始下一个集合。
void Balancer::_mainThread() {
//......
// balancer主循环
while (!_stopRequested()) {
BalanceRoundDetails roundDetails;
_beginRound(opCtx.get());
try {
shardingContext->shardRegistry()->reload(opCtx.get());
//判断balance是否打开,如果没有打开,_endRound会sleep 10s(kBalanceRoundDefaultInterval);没有打开包括:balance为off、或者是只在split后进行的balance、或者balance只支持在某个窗口时间;
if (!balancerConfig->shouldBalance()) {
_endRound(opCtx.get(), kBalanceRoundDefaultInterval);
continue;
}
{
//对分片的集合进行splitChunk操作
Status status = _enforceTagRanges(opCtx.get());
if (!status.isOK()) {
warning() << "Failed to enforce tag ranges" << causedBy(status);
} else {
LOG(1) << "Done enforcing tag range boundaries.";
}
//选择需要迁移的chunk
const auto candidateChunks = uassertStatusOK(
_chunkSelectionPolicy->selectChunksToMove(opCtx.get(), _balancedLastTime));
if (candidateChunks.empty()) {
LOG(1) << "no need to move any chunk";
_balancedLastTime = false;
} else {
//为指定的块集合安排迁移,并返回成功处理了多少块。
_balancedLastTime = _moveChunks(opCtx.get(), candidateChunks);
roundDetails.setSucceeded(static_cast<int>(candidateChunks.size()),
_balancedLastTime);
}
}
//默认的检测周期为 10s, 如果发生了moveChunk, 检测周期为 1s
//const Seconds kBalanceRoundDefaultInterval(10);
//const Seconds kShortBalanceRoundInterval(1);
_endRound(opCtx.get(),
_balancedLastTime ? kShortBalanceRoundInterval
: kBalanceRoundDefaultInterval);
} catch (const std::exception& e) {
//......
_endRound(opCtx.get(), kBalanceRoundDefaultInterval);
}
}
}
selectChunksToMove()函数最终返回所有需要迁移的chunk信息
StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToMove(
OperationContext* opCtx, bool aggressiveBalanceHint) {
//......
//读取分片信息
const auto shardStats = std::move(shardStatsStatus.getValue());
if (shardStats.size() < 2) {
return MigrateInfoVector{};
}
//usedShards保存那些已经涉及到某一个块迁移的(from shard, to shard)信息,同一次balance,一个shard只参与一个块的迁移,不管是from shard还是to shard。
std::set<ShardId> usedShards;
//读取所有集合信息,并且随机排序保存到一个数组中
std::shuffle(collections.begin(), collections.end(), _random);
for (const auto& coll : collections) {
//如果集合已经被删了,跳过
if (coll.getDropped()) {
continue;
}
const NamespaceString nss(coll.getNs());
//如果集合不允许balance,掉过
if (!coll.getAllowBalance()) {
LOG(1) << "Not balancing collection " << nss << "; explicitly disabled.";
continue;
}
//获取当前集合需要迁移的chunk信息
auto candidatesStatus = _getMigrateCandidatesForCollection(
opCtx, nss, shardStats, aggressiveBalanceHint, &usedShards);
//此处会判断candidatesStatus结果,如果集合被删了,跳过;如果其他错误,打印日志后跳过
candidateChunks.insert(candidateChunks.end(),
std::make_move_iterator(candidatesStatus.getValue().begin()),
std::make_move_iterator(candidatesStatus.getValue().end()));
}
_getMigrateCandidatesForCollection()函数获取当前集合需要迁移的chunk信息
StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::_getMigrateCandidatesForCollection(
OperationContext* opCtx,
const NamespaceString& nss,
const ShardStatisticsVector& shardStats,
bool aggressiveBalanceHint,
std::set<ShardId>* usedShards) {
//......
//读取集合的所有chunk信息
//返回该集合建议在shard之间迁移的chunk集合
return BalancerPolicy::balance(shardStats, distribution, aggressiveBalanceHint, usedShards);
}
balance()函数计算集合内各shard上chunk的个数,确定迁移变化的情况。
在4.0中(其实是从3.4)开始,迁移阈值与官方文档中的介绍不符迁移阈值不符;官方文档的描述是3.2版本中的设计。
3.2 版本, chunks 数量小于 20 的时候为 2, 小于 80 的时候为 4, 大于 80 的时候为 8 。也就是说假设两分片集群,某个表有 100 个chunk , 每个分片分别有 47 和 53 个chunk 。那么此时 balance 认为是均衡的,不会发生迁移。
int threshold = 8;
if (balancedLastTime || distribution.totalChunks() < 20) threshold = 2;
else if (distribution.totalChunks() < 80)
threshold = 4;
4.0 版本,chunks 数量差距大于 2 的时候就会发生迁移。
/*返回一组建议的块,根据碎片的指定状态(耗尽、达到最大大小等)和该集合的块的数量移动碎片。如果策略不建议移动任何内容,则返回一个空向量。vector do中的条目都是针对单独的源/目标碎片的,因此不需要串行执行,可以并行调度。
平衡逻辑为每个区域计算每个碎片的最佳块数,如果任何碎片的块数足够高,建议将块移动到低于这个数字shard。
shouldAggressivelyBalance参数导致块的阈值可能会降低碎片之间的差异。
usedShards参数是in/out,它包含一组已经用于迁移的shards。这样我们就不会为同一个碎片返回多个冲突迁移。*/
vector<MigrateInfo> BalancerPolicy::balance(const ShardStatisticsVector& shardStats,
const DistributionStatus& distribution,
bool shouldAggressivelyBalance,
std::set<ShardId>* usedShards) {
vector<MigrateInfo> migrations;
// 1) Check for shards, which are in draining mode
// 这一部分是将处于draining模式的shard中的chunk移到其他shard,从该被删除的shard上拿一个chunk,找一个chunk最少的非from shard作为to shard(即即将被移除的shard)
// 2) Check for chunks, which are on the wrong shard and must be moved off of it
// 调整因为Tag设定不匹配引起的chunk内的数据分布shard转换
// 3) for each tag balance
//shouldAggressivelyBalance由最上层的Balancer::_mainThread()中_balancedLastTime赋值,表明上一次迁移round中迁移个数,0为false
//即如果已经在一次迁移中了或集合的块总数少于20,则迁移阈值为1;否则为2
const size_t imbalanceThreshold = (shouldAggressivelyBalance || distribution.totalChunks() < 20)
? kAggressiveImbalanceThreshold
: kDefaultImbalanceThreshold;
vector<string> tagsPlusEmpty(distribution.tags().begin(), distribution.tags().end());
tagsPlusEmpty.push_back("");
for (const auto& tag : tagsPlusEmpty) {
const size_t totalNumberOfChunksWithTag =
(tag.empty() ? distribution.totalChunks() : distribution.totalChunksWithTag(tag));
size_t totalNumberOfShardsWithTag = 0;
for (const auto& stat : shardStats) {
if (tag.empty() || stat.shardTags.count(tag)) {
totalNumberOfShardsWithTag++;
}
}
//计算每个碎片的最优块数的上限
const size_t idealNumberOfChunksPerShardForTag =
(totalNumberOfChunksWithTag / totalNumberOfShardsWithTag) +
(totalNumberOfChunksWithTag % totalNumberOfShardsWithTag ? 1 : 0);
while (_singleZoneBalance(shardStats,
distribution,
tag,
idealNumberOfChunksPerShardForTag,
imbalanceThreshold,
&migrations,
usedShards))
;
}
return migrations;
}
_singleZoneBalance()函数去寻找满足迁移阈值限制的from shard和to shard以及chunk
bool BalancerPolicy::_singleZoneBalance(const ShardStatisticsVector& shardStats,
const DistributionStatus& distribution,
const string& tag,
size_t idealNumberOfChunksPerShardForTag,
size_t imbalanceThreshold,
vector<MigrateInfo>* migrations,
set<ShardId>* usedShards) {
//获取含有最多chunk数量的分片为源分片,from shard
const ShardId from = _getMostOverloadedShard(shardStats, distribution, tag, *usedShards);
if (!from.isValid())
return false;
//最大的chunk量
const size_t max = distribution.numberOfChunksInShardWithTag(from, tag);
// Do not use a shard if it already has less entries than the optimal per-shard chunk count
if (max <= idealNumberOfChunksPerShardForTag)
return false;
//获取含有最少chunk数量的分片为源分片,to shard
const ShardId to = _getLeastLoadedReceiverShard(shardStats, distribution, tag, *usedShards);
if (!to.isValid()) {
if (migrations->empty()) {
log() << "No available shards to take chunks for zone [" << tag << "]";
}
return false;
}
//最小的chunk量
const size_t min = distribution.numberOfChunksInShardWithTag(to, tag);
// Do not use a shard if it already has more entries than the optimal per-shard chunk count
if (min >= idealNumberOfChunksPerShardForTag)
return false;
//最大的chunk数与每个碎片的最优块数的上限之间的差值
const size_t imbalance = max - idealNumberOfChunksPerShardForTag;
//这个差值超过了迁移阈值,之前算出来的2,则迁移
if (imbalance < imbalanceThreshold)
return false;
//把需要迁移的chunk,构造一个迁移任务
const vector<ChunkType>& chunks = distribution.getChunks(from);
unsigned numJumboChunks = 0;
for (const auto& chunk : chunks) {
if (distribution.getTagForChunk(chunk) != tag)
continue;
if (chunk.getJumbo()) {
numJumboChunks++;
continue;
}
migrations->emplace_back(to, chunk);
invariant(usedShards->insert(chunk.getShard()).second);
invariant(usedShards->insert(to).second);
return true;
}
}
在完成迁移块的选择之后,Balancer::_mainThread()会调用Balancer::_moveChunks(),_moveChunks调用MigrationManager::executeMigrationsForAutoBalance()执行moveChunk。
未完,待续
参考文档
MongoDB官方文档
孤儿文档是怎样产生的(MongoDB orphaned document)
MongoDB疑难解析:为什么升级之后负载升高了?
由数据迁移至MongoDB导致的数据不一致问题及解决方案