开篇
怀着佛系心态写的文章,因为发现心急依然看不懂代码,所以只能安慰自己佛系一点,这篇文章希望能够把elasticsearch的index的创建过程讲清楚(包括index但不包括doc的添加过程),希望能够有帮助。
ES的Meta的组成
Meta是用来描述数据的数据。在ES中,Index的mapping结构、配置、持久化状态等就属于meta数据,集群的一些配置信息也属于meta。ES的Meta信息可以简单的理解为包括ClusterState、MetaData、IndexMetaData。
ClusterState
集群中的每个节点都会在内存中维护一个当前的ClusterState,表示当前集群的各种状态。ClusterState中包含一个MetaData的结构,MetaData中存储的内容更符合meta的特征,而且需要持久化的信息都在MetaData中,此外的一些变量可以认为是一些临时状态,是集群运行中动态构建出来的。
public class ClusterState implements ToXContentFragment, Diffable<ClusterState> {
public static final String UNKNOWN_UUID = "_na_";
public static final long UNKNOWN_VERSION = -1;
private final long version;
private final String stateUUID;
private final RoutingTable routingTable;
private final DiscoveryNodes nodes;
private final MetaData metaData;
private final ClusterBlocks blocks;
private final ImmutableOpenMap<String, Custom> customs;
private final ClusterName clusterName;
private final boolean wasReadFromDiff;
}
MetaData
MetaData主要是集群的一些配置,集群所有Index的Meta,所有Template的Meta,所有custom的Meta信息。
public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, ToXContentFragment {
private final String clusterUUID;
private final long version;
private final Settings transientSettings;
private final Settings persistentSettings;
private final Settings settings;
private final ImmutableOpenMap<String, IndexMetaData> indices;
private final ImmutableOpenMap<String, IndexTemplateMetaData> templates;
private final ImmutableOpenMap<String, Custom> customs;
private final transient int totalNumberOfShards;
private final int numberOfShards;
private final String[] allIndices;
private final String[] allOpenIndices;
private final String[] allClosedIndices;
private final SortedMap<String, AliasOrIndex> aliasAndIndexLookup;
IndexMetaData
IndexMetaData指具体某个Index的Meta,比如Index的shard、replica、mappings等
public class IndexMetaData implements Diffable<IndexMetaData>, ToXContentFragment {
private final int routingNumShards;
private final int routingFactor;
private final int routingPartitionSize;
private final int numberOfShards;
private final int numberOfReplicas;
private final Index index;
private final long version;
private final long[] primaryTerms;
private final State state;
private final ImmutableOpenMap<String, AliasMetaData> aliases;
private final Settings settings;
private final ImmutableOpenMap<String, MappingMetaData> mappings;
private final ImmutableOpenMap<String, Custom> customs;
private final ImmutableOpenIntMap<Set<String>> inSyncAllocationIds;
private final transient int totalNumberOfShards;
private final DiscoveryNodeFilters requireFilters;
private final DiscoveryNodeFilters includeFilters;
private final DiscoveryNodeFilters excludeFilters;
private final DiscoveryNodeFilters initialRecoveryFilters;
private final Version indexCreatedVersion;
private final Version indexUpgradedVersion;
private final ActiveShardCount waitForActiveShards;
}
索引建立过程
阶段一:校验参数阶段
- 校验当前ClusterState的currentState是否存在该索引,routingTable包含该index、metaData包含该index、alias别名等。
阶段二:配置合并阶段
- 合并template和request传入的mapping、customs 数据,优先级上request配置优先于template。
- 合并template和request的setting,优先级上request配置优先于template。
阶段三:构建IndexSettings阶段
- 构建Settings.Builder indexSettingsBuilder对象,合并templates、request的数据,辅以默认配置值。
阶段四:构建IndexMetaData阶段
- 构建IndexMetaData.Builder的tmpImdBuilder对象并绑定indexSettingsBuilder生成的actualIndexSettings。
- 通过tmpImdBuilder.build()构建构建IndexMetaData tmpImd对象。
阶段五:构建IndexService阶段
- 临时构建indexService对象,IndexService indexService = indicesService.createIndex(tmpImd, Collections.emptyList())。
阶段六:获取Index和Map阶段
- 获取新创建的index和mapping,Index createdIndex = indexService.index()和MapperService mapperService = indexService.mapperService()。
阶段七:更新mapping到mapperService阶段
- mapperService.merge()合并request和template合并后生成的最新的mappings。
- 生成Map mappingsMetaData,key是创建mapping的指定的type。
阶段八:构建IndexMetaData阶段
- 构建IndexMetaData.Builder indexMetaDataBuilder的indexMetaDataBuilder对象并绑定actualIndexSettings和routingNumShards。
- indexMetaDataBuilder通过primaryTerm设置primaryTerm、putMapping设置mappingMd,putAlias设置aliasMetaData(template和request请求),putCustom设置customIndexMetaData,indexMetaDataBuilder.state设置state。
- 创建indexMetaData对象,通过indexMetaData = indexMetaDataBuilder.build()实现。
阶段九:更新IndexMetaData阶段
- 将IndexMetaData对象更新到MetaData对象中并返回最新的MetaData, MetaData newMetaData = MetaData.builder(currentState.metaData()).put(indexMetaData, false).build()。
阶段十:更新MetaData阶段
- 更新最新的MetaData对象newMetaData对象到ClusterState,ClusterState.builder(currentState).blocks(blocks).metaData(newMetaData).build()。
public class MetaDataCreateIndexService extends AbstractComponent {
static class IndexCreationTask extends AckedClusterStateUpdateTask<ClusterStateUpdateResponse> {
@Override
/**
* es当前集群的状态ClusterState
*/
public ClusterState execute(ClusterState currentState) throws Exception {
Index createdIndex = null;
String removalExtraInfo = null;
IndexRemovalReason removalReason = IndexRemovalReason.FAILURE;
try {
// 以下是校验ClusterState中是否存在index索引。
// 校验当前currentState是否存在该索引,routingTable包含该index、metaData包含该index、alias别名等。
validator.validate(request, currentState);
// 校验是否已存在别名
for (Alias alias : request.aliases()) {
aliasValidator.validateAlias(alias, request.index(), currentState.metaData());
}
// 以下过程是合并request和template的配置信息
// 从当前的ClusterState查询是否存在别名
List<IndexTemplateMetaData> templates = findTemplates(request, currentState);
// 用来保存请求带过来的mapping
Map<String, Custom> customs = new HashMap<>();
Map<String, Map<String, Object>> mappings = new HashMap<>();
Map<String, AliasMetaData> templatesAliases = new HashMap<>();
List<String> templateNames = new ArrayList<>();
// 保存request的mapping到临时变量mappings
for (Map.Entry<String, String> entry : request.mappings().entrySet()) {
mappings.put(entry.getKey(), MapperService.parseMapping(xContentRegistry, entry.getValue()));
}
// 保存request的customs信息到临时变量customs当中
for (Map.Entry<String, Custom> entry : request.customs().entrySet()) {
customs.put(entry.getKey(), entry.getValue());
}
final Index recoverFromIndex = request.recoverFrom();
// todo 这里不知道是什么判断,大概是不需要从某个索引进行恢复
if (recoverFromIndex == null) {
// 合并template的mapping、customs、alias到request的参数当中
for (IndexTemplateMetaData template : templates) {
templateNames.add(template.getName());
// 合并request和template的mapping变量
for (ObjectObjectCursor<String, CompressedXContent> cursor : template.mappings()) {
String mappingString = cursor.value.string();
// 如果request包含该命名的mapping,就进行合并,mapping以request传入为主,合并命中的template
if (mappings.containsKey(cursor.key)) {
XContentHelper.mergeDefaults(mappings.get(cursor.key),
MapperService.parseMapping(xContentRegistry, mappingString));
} else {
// 如果request不包含该命名的mapping,就直接新增即可
mappings.put(cursor.key,
MapperService.parseMapping(xContentRegistry, mappingString));
}
}
// handle custom
for (ObjectObjectCursor<String, Custom> cursor : template.customs()) {
String type = cursor.key;
IndexMetaData.Custom custom = cursor.value;
IndexMetaData.Custom existing = customs.get(type);
if (existing == null) {
customs.put(type, custom);
} else {
IndexMetaData.Custom merged = existing.mergeWith(custom);
customs.put(type, merged);
}
}
// 以request带的alias作为为主,合并template的alias
for (ObjectObjectCursor<String, AliasMetaData> cursor : template.aliases()) {
AliasMetaData aliasMetaData = cursor.value;
if (request.aliases().contains(new Alias(aliasMetaData.alias()))) {
continue;
}
if (templatesAliases.containsKey(cursor.key)) {
continue;
}
if (aliasMetaData.alias().contains("{index}")) {
String templatedAlias = aliasMetaData.alias().replace("{index}", request.index());
aliasMetaData = AliasMetaData.newAliasMetaData(aliasMetaData, templatedAlias);
}
aliasValidator.validateAliasMetaData(aliasMetaData, request.index(), currentState.metaData());
templatesAliases.put(aliasMetaData.alias(), aliasMetaData);
}
}
}
// 以下是创建setting过程,用于创建索引时候使用。
// 合并templates的setting配置
Settings.Builder indexSettingsBuilder = Settings.builder();
if (recoverFromIndex == null) {
// apply templates, here, in reverse order, since first ones are better matching
for (int i = templates.size() - 1; i >= 0; i--) {
indexSettingsBuilder.put(templates.get(i).settings());
}
}
// 合并request的setting并覆盖templates的setting
indexSettingsBuilder.put(request.settings());
if (indexSettingsBuilder.get(SETTING_NUMBER_OF_SHARDS) == null) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5));
}
if (indexSettingsBuilder.get(SETTING_NUMBER_OF_REPLICAS) == null) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
}
if (settings.get(SETTING_AUTO_EXPAND_REPLICAS) != null &&
indexSettingsBuilder.get(SETTING_AUTO_EXPAND_REPLICAS) == null) {
indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, settings.get(SETTING_AUTO_EXPAND_REPLICAS));
}
if (indexSettingsBuilder.get(SETTING_VERSION_CREATED) == null) {
DiscoveryNodes nodes = currentState.nodes();
final Version createdVersion = Version.min(Version.CURRENT, nodes.getSmallestNonClientNodeVersion());
indexSettingsBuilder.put(SETTING_VERSION_CREATED, createdVersion);
}
if (indexSettingsBuilder.get(SETTING_CREATION_DATE) == null) {
indexSettingsBuilder.put(SETTING_CREATION_DATE, new DateTime(DateTimeZone.UTC).getMillis());
}
indexSettingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getProvidedName());
indexSettingsBuilder.put(SETTING_INDEX_UUID, UUIDs.randomBase64UUID());
// 以下是创建IndexMetaData的builder的过程
// 组建IndexMetaData的builder
final IndexMetaData.Builder tmpImdBuilder = IndexMetaData.builder(request.index());
final int routingNumShards;
// routingNumShards的获取逻辑没怎么看懂
if (recoverFromIndex == null) {
Settings idxSettings = indexSettingsBuilder.build();
routingNumShards = IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.get(idxSettings);
} else {
assert IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(indexSettingsBuilder.build()) == false
: "index.number_of_routing_shards should be present on the target index on resize";
final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(recoverFromIndex);
routingNumShards = sourceMetaData.getRoutingNumShards();
}
// 移除routing_shards配置
indexSettingsBuilder.remove(IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.getKey());
tmpImdBuilder.setRoutingNumShards(routingNumShards);
if (recoverFromIndex != null) {
assert request.resizeType() != null;
prepareResizeIndexSettings(
currentState, mappings.keySet(),
indexSettingsBuilder, recoverFromIndex, request.index(), request.resizeType());
}
// 实际的IndexSetting
final Settings actualIndexSettings = indexSettingsBuilder.build();
// IndexMetaData的builder添加实际的索引的设置
tmpImdBuilder.settings(actualIndexSettings);
if (recoverFromIndex != null) {
final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(recoverFromIndex);
final long primaryTerm =
IntStream
.range(0, sourceMetaData.getNumberOfShards())
.mapToLong(sourceMetaData::primaryTerm)
.max()
.getAsLong();
for (int shardId = 0; shardId < tmpImdBuilder.numberOfShards(); shardId++) {
tmpImdBuilder.primaryTerm(shardId, primaryTerm);
}
}
// 创建实际的IndexMetaData对象
final IndexMetaData tmpImd = tmpImdBuilder.build();
ActiveShardCount waitForActiveShards = request.waitForActiveShards();
if (waitForActiveShards == ActiveShardCount.DEFAULT) {
waitForActiveShards = tmpImd.getWaitForActiveShards();
}
if (waitForActiveShards.validate(tmpImd.getNumberOfReplicas()) == false) {
throw new IllegalArgumentException("invalid wait_for_active_shards[" + request.waitForActiveShards() +
"]: cannot be greater than number of shard copies [" +
(tmpImd.getNumberOfReplicas() + 1) + "]");
}
// 创建索引服务IndexService
final IndexService indexService = indicesService.createIndex(tmpImd, Collections.emptyList());
createdIndex = indexService.index();
// 获取创建IndexService的mapperService
MapperService mapperService = indexService.mapperService();
try {
// 合并request和template合并后生成的最新的mappings
mapperService.merge(mappings, MergeReason.MAPPING_UPDATE, request.updateAllTypes());
} catch (Exception e) {
removalExtraInfo = "failed on parsing default mapping/mappings on index creation";
throw e;
}
if (request.recoverFrom() == null) {
indexService.getIndexSortSupplier().get();
}
final QueryShardContext queryShardContext = indexService.newQueryShardContext(0, null, () -> 0L, null);
for (Alias alias : request.aliases()) {
if (Strings.hasLength(alias.filter())) {
aliasValidator.validateAliasFilter(alias.name(), alias.filter(), queryShardContext, xContentRegistry);
}
}
for (AliasMetaData aliasMetaData : templatesAliases.values()) {
if (aliasMetaData.filter() != null) {
aliasValidator.validateAliasFilter(aliasMetaData.alias(), aliasMetaData.filter().uncompressed(),
queryShardContext, xContentRegistry);
}
}
// 新建mappingsMetaData对象
Map<String, MappingMetaData> mappingsMetaData = new HashMap<>();
for (DocumentMapper mapper : mapperService.docMappers(true)) {
MappingMetaData mappingMd = new MappingMetaData(mapper);
mappingsMetaData.put(mapper.type(), mappingMd);
}
// 以下过程是创建IndexMetaData的过程
// 创建indexMetaDataBuilder对象,执行真正的创建
final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index())
.settings(actualIndexSettings)
.setRoutingNumShards(routingNumShards);
for (int shardId = 0; shardId < tmpImd.getNumberOfShards(); shardId++) {
indexMetaDataBuilder.primaryTerm(shardId, tmpImd.primaryTerm(shardId));
}
for (MappingMetaData mappingMd : mappingsMetaData.values()) {
indexMetaDataBuilder.putMapping(mappingMd);
}
for (AliasMetaData aliasMetaData : templatesAliases.values()) {
indexMetaDataBuilder.putAlias(aliasMetaData);
}
for (Alias alias : request.aliases()) {
AliasMetaData aliasMetaData = AliasMetaData.builder(alias.name()).filter(alias.filter())
.indexRouting(alias.indexRouting()).searchRouting(alias.searchRouting()).build();
indexMetaDataBuilder.putAlias(aliasMetaData);
}
for (Map.Entry<String, Custom> customEntry : customs.entrySet()) {
indexMetaDataBuilder.putCustom(customEntry.getKey(), customEntry.getValue());
}
indexMetaDataBuilder.state(request.state());
// 创建indexMetaData对象
final IndexMetaData indexMetaData;
try {
indexMetaData = indexMetaDataBuilder.build();
} catch (Exception e) {
removalExtraInfo = "failed to build index metadata";
throw e;
}
indexService.getIndexEventListener().beforeIndexAddedToCluster(indexMetaData.getIndex(),
indexMetaData.getSettings());
// 创建新的MetaData
MetaData newMetaData = MetaData.builder(currentState.metaData())
.put(indexMetaData, false)
.build();
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
if (!request.blocks().isEmpty()) {
for (ClusterBlock block : request.blocks()) {
blocks.addIndexBlock(request.index(), block);
}
}
blocks.updateBlocks(indexMetaData);
// 阻塞blocks并更新Meta元数据
ClusterState updatedState = ClusterState.builder(currentState).blocks(blocks).metaData(newMetaData).build();
if (request.state() == State.OPEN) {
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable())
.addAsNew(updatedState.metaData().index(request.index()));
updatedState = allocationService.reroute(
ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build(),
"index [" + request.index() + "] created");
}
removalExtraInfo = "cleaning up after validating index on master";
removalReason = IndexRemovalReason.NO_LONGER_ASSIGNED;
return updatedState;
} finally {
if (createdIndex != null) {
indicesService.removeIndex(createdIndex, removalReason, removalExtraInfo);
}
}
}
}
}
创建IndexService过程
这部分逻辑暂时没有深入研究,这里只放在这里用于引导提醒一下,具体不做深入研究。
public class IndicesService extends AbstractLifecycleComponent
implements IndicesClusterStateService.AllocatedIndices<IndexShard, IndexService>, IndexService.ShardStoreDeleter {
public synchronized IndexService createIndex(
final IndexMetaData indexMetaData, final List<IndexEventListener> builtInListeners) throws IOException {
ensureChangesAllowed();
if (indexMetaData.getIndexUUID().equals(IndexMetaData.INDEX_UUID_NA_VALUE)) {
throw new IllegalArgumentException("index must have a real UUID found value: [" + indexMetaData.getIndexUUID() + "]");
}
final Index index = indexMetaData.getIndex();
if (hasIndex(index)) {
throw new ResourceAlreadyExistsException(index);
}
List<IndexEventListener> finalListeners = new ArrayList<>(builtInListeners);
final IndexEventListener onStoreClose = new IndexEventListener() {
@Override
public void onStoreClosed(ShardId shardId) {
indicesQueryCache.onClose(shardId);
}
};
finalListeners.add(onStoreClose);
finalListeners.add(oldShardsStats);
final IndexService indexService =
createIndexService(
"create index",
indexMetaData,
indicesQueryCache,
indicesFieldDataCache,
finalListeners,
indexingMemoryController);
boolean success = false;
try {
indexService.getIndexEventListener().afterIndexCreated(indexService);
indices = newMapBuilder(indices).put(index.getUUID(), indexService).immutableMap();
success = true;
return indexService;
} finally {
if (success == false) {
indexService.close("plugins_failed", true);
}
}
}
private synchronized IndexService createIndexService(final String reason,
IndexMetaData indexMetaData,
IndicesQueryCache indicesQueryCache,
IndicesFieldDataCache indicesFieldDataCache,
List<IndexEventListener> builtInListeners,
IndexingOperationListener... indexingOperationListeners) throws IOException {
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexScopeSetting);
logger.debug("creating Index [{}], shards [{}]/[{}] - reason [{}]",
indexMetaData.getIndex(),
idxSettings.getNumberOfShards(),
idxSettings.getNumberOfReplicas(),
reason);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry);
for (IndexingOperationListener operationListener : indexingOperationListeners) {
indexModule.addIndexOperationListener(operationListener);
}
pluginsService.onIndexModule(indexModule);
for (IndexEventListener listener : builtInListeners) {
indexModule.addIndexEventListener(listener);
}
return indexModule.newIndexService(
nodeEnv,
xContentRegistry,
this,
circuitBreakerService,
bigArrays,
threadPool,
scriptService,
client,
indicesQueryCache,
mapperRegistry,
indicesFieldDataCache,
namedWriteableRegistry
);
}
}
参考文章
Elasticsearch分布式一致性原理剖析(二)-Meta篇
elasticsearch index 之 create index(二)
招聘信息
【招贤纳士】
欢迎热爱技术、热爱生活的你和我成为同事,和贝贝共同成长。
贝贝集团诚招算法、大数据、BI、Java、PHP、android、iOS、测试、运维、DBA等人才,有意可投递zhi.wang@beibei.com。
贝贝集团创建于2011年,旗下拥有贝贝网、贝店、贝贷等平台,致力于成为全球领先的家庭消费平台。
贝贝创始团队来自阿里巴巴,先后获得IDG资本、高榕资本、今日资本、新天域资本、北极光等数亿美金的风险投资。
公司地址:杭州市江干区普盛巷9号东谷创业园(上下班有多趟班车)