带你读《Elastic Stack 实战手册》之76:——4.2.2.Elasticsearch智能巡检开发设计实践(4)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 带你读《Elastic Stack 实战手册》之76:——4.2.2.Elasticsearch智能巡检开发设计实践(4)

《Elastic Stack 实战手册》——四、应用实践——4.2 可观测性应用场景 ——4.2.2.Elasticsearch智能巡检开发设计实践(3) https://developer.aliyun.com/article/1226092


结语

 

本章详细介绍了智能巡检系统的结构与指标选取、阈值确认,并给出 cluster status 指标采集分析的完整示例代码。有兴趣的读者可以将智能巡检系统通过 Python 脚本或者 Operator 的方式实现。由于 PAAS 系统下管理的大量差异巨大的 Elasticsearch 集群,巡检系统实现主要难点与重点,是抽象出合理的指标以及精细化实现。


上述介绍中可以看到部分指标项是固定场景下必现,且解决方案唯一且简单,例如shard_limit场景,一般通过 API 调整 total_shards_per_node 参数即可恢复,因此这些场景可以设计成自动化修复,达到简单的“自愈”,解放开发维护人员的生产力。


示例代码


# -*- coding:utf-8 -*-
"""
analyse cluster status exception,
匹配常用的case, 返回explain完整结果
"""
import espaas_api
import argparse
import traceback
import json
import logging
import gevent
import time
import datetime
import requests
from libs.log import initlog
from gevent import monkey
monkey.patch_all()
timeout = 10
# 7.10 elasticsearch 有16种决策器
dict = {
    'same_shard': '默认配置下一个节点不允许分配同一shard的多个副本分片', # 不参与统一处理
    'shards_limit': '节点限制单索引shard数,调整相应索引的total_shards_per_node参数',
    'disk_threshold': '磁盘空间达到水位线无法分配shard,调整磁盘容量、节点数或者清理磁盘',
    'max_retry': '达到allocate最大重试次数,尝试调用API手动retry',
    'awareness': '副本分配过多,大于awareness的配置',
    'cluster_rebalance': '集群正在Rebalance,可忽略',
    'concurrent_rebalance': '集群当前正在Rebalance,可忽略',
    'node_version': '分配到的节点版本不一致,请联系管理员调整',
    'replica_after_primary_active': '主分片未active,等待并观察主分片',
    'filter': '未通过filter,请检查配置',
    'enable': '集群enable限制',
    'throttling': '集群正在恢复,需要按照优先级恢复 primary > replica',
    'rebalance_only_when_active': 'Only allow rebalancing when all shards are active within the shard replication group',
    'resize': 'An allocation decider that ensures we allocate the shards of a target index for resize operations next to the source primaries',
    'restore_in_progress': 'This allocation decider prevents shards that have failed to be restored from a snapshot to be allocated',
    'snapshot_in_progress': 'This allocation decider prevents shards that are currently been snapshotted to be moved to other nodes'
}
def check_exception_status_reason_advice(cluster):
    # 省略结果数据入库代码
    res = {
        "status": "",
        "reason": "",
        "advice": "",
        "detail": ""
    }
    # 1.health API
    # 2.node count check
    url = gen_url(cluster) + "/_cat/health?format=json"
    r = requests.get(url, auth=(cluster["username"], cluster["password"]), timeout=timeout)
    if r.status_code == 200:
        try:
            content = json.loads(r.content)
            cluster_status_res = content[0]
            res["status"] = cluster_status_res["status"]
            if cluster_status_res["status"] == "green":
                return res
            if cluster_status_res["node.total"] != cluster["nodes_count"]:
                res["reason"] = "node_left"
                res["advice"] = "请检查k8s环境中node数, 使用`kubectl describe po pod_name`检查crash reason"
                return res
        except:
            traceback.print_exc()
    else:
        res["status"] = "unknown"
        return res
    # 3.get UNASSIGNED shard
    get_all_shard_url = gen_url(cluster) + "_cat/shards?format=json"
get_all_shard_res = requests.get(get_all_shard_url, auth=(cluster["username"], cluster["password"]),
       timeout=timeout)
    unassigned_shard_list = []
     unassigned_shard_final_list = []
    if get_all_shard_res.status_code == 200:
        try:
            shards = json.loads(get_all_shard_res.content)
            for shard in shards:
                if shard["state"] == "UNASSIGNED":
                    unassigned_shard_list.append(shard)
            if len(unassigned_shard_list) > 20:
                unassigned_shard_final_list = unassigned_shard_list[0:20]
            else:
                unassigned_shard_final_list = unassigned_shard_list
        except:
            traceback.print_exc()
    # 4.explain
    # 5.经验值提取出常用建议
    decider_list = []
    explanation_list = []
    has_advice = 0
    explain_res_all = []
    for shard_tmp in unassigned_shard_final_list:
        index_name_tmp = shard_tmp["index"]
        shard_pos = shard_tmp["shard"]
        shard_type = shard_tmp["prirep"]
        is_primary = False
        if shard_type == "p":
            is_primary = True
        explain_url = gen_url(cluster) + "/_cluster/allocation/explain"
        payload = {
            "index": index_name_tmp,
            "shard": shard_pos,
            "primary": is_primary
                }
        explain_res = requests.post(explain_url, json.dumps(payload),
                                    auth=(cluster["username"], cluster["password"]),
                                    headers={"Content-Type": "application/json"}).json()
        explain_res_all.append(explain_res)
        # 整理单个分片所有的decider信息
        same_shard_count = 0
        for decisions_tmp in explain_res["node_allocation_decisions"]:
            for decider_tmp in decisions_tmp['deciders']:
                decider_list.append(decider_tmp['decider'])
                explanation_list.append(decider_tmp['explanation'])
                if decider_tmp['decider'] == "same_shard":
                    same_shard_count = same_shard_count + 1
        tmp_string = ''
        explanation_list_string = tmp_string.join(explanation_list)
        if explanation_list_string.find("ik_max_word") != -1:
            res["reason"] = "max_retry"
            res["advice"] = "可能触发6.8.5Ik分词器bug,请使用其他兼容版本或根据issue fix"
            has_advice = 1
            break
        if explanation_list_string.find("cluster.routing.allocation.enable=primaries") == -1 \
                and explanation_list_string.find("cluster.routing.allocation.enable=replicas") == -1:
            print '正常'
        else:
            res["reason"] = "enable"
            res["advice"] = "cancel cluster.routing.allocation.enable=replicas/primaries settings"
            has_advice = 1
            Break
               if 'same_shard' in decider_list and same_shard_count == cluster["nodes_count"]:
            res["reason"] = "same_shard"
            res["advice"] = "shard副本设置过多,调整副本数量"
            has_advice = 1
            break
        # 取decider与ES决策器keys的交集
        decider_and = list(set(decider_list).intersection(set(dict.keys())))
        reason_and = ''
        advice_and = ''
        if list(set(decider_list).intersection(set(dict.keys()))):
            for x in decider_and:
                if x != 'same_shard':
                    reason_and += x + ';'
                    advice_and += dict[x] + ';'
            res["reason"] = reason_and
            res["advice"] = advice_and
    if has_advice == 0:
        res["detail"] = explain_res_all
    return res
def gen_url(cluster):
    url = 'http://' + cluster["domain"] + ':' + str(cluster["http_port"]) + cluster["h   if 'same_shard' in decider_list and same_shard_count == cluster["nodes_count"]:
            res["reason"] = "same_shard"
            res["advice"] = "shard副本设置过多,调整副本数量"
            has_advice = 1
            break
        # 取decider与ES决策器keys的交集
        decider_and = list(set(decider_list).intersection(set(dict.keys())))
        reason_and = ''
        advice_and = ''
        if list(set(decider_list).intersection(set(dict.keys()))):
            for x in decider_and:
                if x != 'same_shard':
                    reason_and += x + ';'
                    advice_and += dict[x] + ';'
            res["reason"] = reason_and
            res["advice"] = advice_and
    if has_advice == 0:
        res["detail"] = explain_res_all
    return res
def gen_url(cluster):
    url = 'http://' + cluster["domain"] + ':' + str(cluster["http_port"]) + cluster["h   if 'same_shard' in decider_list and same_shard_count == cluster["nodes_count"]:
            res["reason"] = "same_shard"
            res["advice"] = "shard副本设置过多,调整副本数量"
            has_advice = 1
            break
        # 取decider与ES决策器keys的交集
        decider_and = list(set(decider_list).intersection(set(dict.keys())))
        reason_and = ''
        advice_and = ''
        if list(set(decider_list).intersection(set(dict.keys()))):
            for x in decider_and:
                if x != 'same_shard':
                    reason_and += x + ';'
                    advice_and += dict[x] + ';'
            res["reason"] = reason_and
            res["advice"] = advice_and
    if has_advice == 0:
        res["detail"] = explain_res_all
    return res
def gen_url(cluster):
    url = 'http://' + cluster["domain"] + ':' + str(cluster["http_port"]) + cluster["http_path"]
    if url.endswith("/"):
        url = url[:-1]
    return url
def main(cluster_id=None):
    try:
        if cluster_id:
            clusters = espaas_api.get("cluster", "/api/get_cluster_by?cluster_id=%s" % cluster_id)["data"]
        else:
    1488         >   四、应用实践
            clusters = espaas_api.get("cluster", "/api/all_status_exception_cluster")["data"]
        jobs = []
        for cluster in clusters:
            try:
                jobs.append(gevent.spawn(check_exception_status_reason_advice, cluster))
            except:
                traceback.print_exc()
        logging.info("send job number: %s" % len(jobs))
        gevent.joinall(jobs)
    except:
        traceback.print_exc()
if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument("-l", default="-", help="log file")
    parser.add_argument("--level", default="info")
    args = parser.parse_args()
    initlog(level=args.level, log=args.l)
    main()

创作人简介

张妙成,ES PAAS 平台开发,云计算技术爱好者。

个人博客:https://blog.csdn.net/qq_33999844?spm=1001.2014.3001.5343

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
5月前
|
运维 监控 Java
探索Elasticsearch在Java环境下的全文检索应用实践
【6月更文挑战第30天】在大数据背景下,Elasticsearch作为分布式搜索分析引擎,因其扩展性和易用性备受青睐。本文指导在Java环境中集成Elasticsearch,涉及安装配置、使用RestHighLevelClient连接、索引与文档操作,如创建索引、插入文档及全文检索查询。此外,还讨论了高级查询、性能优化和故障排查,帮助开发者高效处理非结构化数据全文检索。
169 0
|
2月前
|
存储 关系型数据库 MySQL
浅谈Elasticsearch的入门与实践
本文主要围绕ES核心特性:分布式存储特性和分析检索能力,介绍了概念、原理与实践案例,希望让读者快速理解ES的核心特性与应用场景。
|
3月前
|
人工智能 自然语言处理 搜索推荐
阿里云Elasticsearch AI搜索实践
本文介绍了阿里云 Elasticsearch 在AI 搜索方面的技术实践与探索。
19145 21
|
1月前
|
开发框架 监控 搜索推荐
GoFly快速开发框架集成ZincSearch全文搜索引擎 - Elasticsearch轻量级替代为ZincSearch全文搜索引擎
本文介绍了在项目开发中使用ZincSearch作为全文搜索引擎的优势,包括其轻量级、易于安装和使用、资源占用低等特点,以及如何在GoFly快速开发框架中集成和使用ZincSearch,提供了详细的开发文档和实例代码,帮助开发者高效地实现搜索功能。
118 0
|
1月前
|
消息中间件 监控 关系型数据库
MySQL数据实时同步到Elasticsearch:技术深度解析与实践分享
在当今的数据驱动时代,实时数据同步成为许多应用系统的核心需求之一。MySQL作为关系型数据库的代表,以其强大的事务处理能力和数据完整性保障,广泛应用于各种业务场景中。然而,随着数据量的增长和查询复杂度的提升,单一依赖MySQL进行高效的数据检索和分析变得日益困难。这时,Elasticsearch(简称ES)以其卓越的搜索性能、灵活的数据模式以及强大的可扩展性,成为处理复杂查询需求的理想选择。本文将深入探讨MySQL数据实时同步到Elasticsearch的技术实现与最佳实践。
85 0
|
3月前
|
存储 运维 搜索推荐
运维开发.索引引擎ElasticSearch.倒序索引的概念
运维开发.索引引擎ElasticSearch.倒序索引的概念
52 1
|
5月前
|
存储 监控 固态存储
elasticsearch索引生命周期管理(ILM):原理和实践
elasticsearch索引生命周期管理(ILM):原理和实践
|
5月前
|
存储 JSON API
Elasticsearch中的模板:定义、作用与实践
Elasticsearch中的模板:定义、作用与实践
|
5月前
|
搜索推荐 Java 数据库
Java中的ElasticSearch集成与实践
Java中的ElasticSearch集成与实践
|
6月前
|
数据采集 数据挖掘 索引
Elasticsearch “指纹”去重机制,你实践中用到了吗?
Elasticsearch “指纹”去重机制,你实践中用到了吗?
84 7

相关产品

  • 检索分析服务 Elasticsearch版