暂时未有相关云产品技术能力~
1、基础篇:零基础如何学 Elasticsearch ?常见问题:“ES 零基础入门书籍看什么比较好?”认知前提:书籍的速度已远落后于 ES 版本更新的速度。ES 几乎每个月发布一个版本,更新很快。市面上的书籍,尤其国外翻译书籍还大多是:1.x,2.x,5.x ,6.x 的版本,更新较慢。有几本7.X的书籍,我没有买实体书,不太有发言权。不过:1.X——6.X全部实体书我都买了,也都看完了,可以说有发言权。相比于实体书籍,我更推荐官方文档,但鉴于“零基础”读者对英文的恐惧,书籍、视频、学习路线等推荐列表如下:1.1 零基础书籍推荐推荐1:《这就是搜索引擎》这是一本介绍搜索引擎原理的书,有了搜索引擎原理的认知再去理解 Elasticsearch 会有“居高临下”俯视的感觉。推荐理由 1:长销书(比畅销书高一个level)、作者是中科院软件所博士、包含大量图解、通俗易懂。推荐理由 2:貌似在 medcl 大神(Elasti中文社区创始人、Elastic中国第1位员工) 书架也有这本书。推荐 2:《Elasticsearch 权威指南》2.X 中文翻译电子书地址如下:https://www.elastic.co/guide/cn/elasticsearch/guide/current/index.html推荐理由 1:虽然基于 2.X,但底层原理的介绍并不过时,值得一看。推荐理由 2:wood大叔(携程架构师、Elastic 中文社区排名第1名)也曾经多次推荐且他看过2遍以上+。推荐 3:《Elasticsearch 实战》纸质书这本书是《Elasticsearch IN ACTION》 的翻译版本。推荐理由 1:作者之一已入职 Elasticsearch。推荐理由 2:翻译作者是极客时间数学课的作者:黄申博士,翻译还可以,至少比《 Lucene 实战 》翻译好 100 倍。推荐理由 3:这本书是 Medcl 大神写的序。推荐4:结合 ECE 考纲、《Elasticsearch 最少必要知识》电子教程啃官方文档。《Elasticsearch 最少必要知识》 地址:https://www.yuque.com/deep_elasticsearch/tzcm9n推荐理由 1:ECE 认证考试是以考带练,入门学习和进阶的绝佳方式,没有之一。推荐理由 2:单独看官方文档不知道重点,而 ECE 考纲就是重点。推荐理由 3:单独看官方文档 + ECE大纲没有头绪,《Elasticsearch 最少必要知识》帮梳理清楚了。1.2 零基础入门视频教程推荐推荐 1:阮一鸣老师 7.1+ 极客时间视频地址:https://time.geekbang.org/course/intro/100030501推荐 2:魏彬老师慕课网的视频(6.X 版本)地址:https://coding.imooc.com/class/181.html推荐 3:李猛老师视频课程(最新版本)地址:https://www.gupaoedu.cn/course-es.html推荐 4:中华石杉老师的 2.X 视频教程地址:https://www.roncoo.com/view/80推荐 5:死磕 Elasticsearch 最少必要知识系列直播(7.x + 版本,推进中)地址:http://t.cn/RmwM3N9因为每个人的“口味”不同,正所谓“众口难调”,再好口碑的课程也有人觉得“鸡肋”。所以,上面列举了几乎所有 Elasticsearch 视频教程,总有适合你的一款。1.3 “零基础”是相对的、不要怕!所谓“零基础”,Mysql 数据库的一些常规操作基础是有的吧?推荐结合已有知识体系,类比学习:MySQL ElasticSearchTable IndexTable TypeRow DocumentColumn FieldSchema MappingIndex Everything is indexedSQL Query DSLselect * from … GET http://…update table set … POST http://…group by、avg、sum Aggregations去重 distinct cardinality数据迁移 reindex通过类比:就可以由已有认知过度到未知认知,便于快速习得技能。建议把基础内容分为两部分,part 1:增删改查这一部分相对好理解,或者可以借助已有知识体系类比加深理解。进一步细分为:索引的增删改查文档的增删改查Mapping 的增删改查template 的增删改查......这些知识有了,基本的操作基本都经过手了。part 2:非增删改查进一步细分为:数据预处理 Ingest自定义分词检索分类聚合分类节点角色分类集群备份与快照集群安全跨集群部署冷热集群架构跨机房跨机架检索......有了上面的分类,一个知识点一个知识点的攻克就可以了。1.4 形成自己的学习路线图我之前梳理的学习路线图如下:重磅 | Elasticsearch7.X学习路线图学习路线因人而异,上面的路线图仅供参考。需要结合自己的实践和认知,找到属于自己的一条学习路线。1.5 “零基础”学习方法论——光看不够,搭建环境 + 练起来更重要!书籍和视频一个道理,看再多不如练习一遍。这里强调的练:是《刻意练习》书籍中推崇的概念“练习”的练,不是盲目的练习、而是有目的、有目标的刻意练习。你看星球考过的 46 位认证工程师,都是刻意练习 + 对官方文档非常熟练的结果。最简单、最轻量化的方式:搭建好环境(单节点 Elasticsearch + kibana),用 kibana 自带的三个示例数据就可以练习起来。部署方式一:如果 PC 内存足够,搭建个虚拟机就可以开搞了。部署方式二:如果对 docker 很熟悉,docker 部署 ELK 也非常快。部署方式三:一、二都不喜欢,自己买个云服务器(最少2核4G)就够了。结合自己的需求,用自己最擅长的方式。以上,期望有助于你快速入门!2、进阶篇:实践加深认知基础篇强调练起来。进阶篇会进一步强调“练”的重要性。2.1 刻意学习一遍官方文档程序猿DD大佬说过:要切实的落实某个框架,全面了解和掌握的方式永远都是“官方文档 + 学会读源代码”。是的,如果说入门我们可以看博客、看视频,站在别人“肩上”走的更快。那么进阶的话,系统的过一遍官方文档就很有必要了。如果感觉上来就全英文有些吃力,可以 7.X 最新版本的英文结合 2.X 中文文档一起来看。如果感觉吃力,可以看一下技术博客资源:全网最牛逼的 Elasticsearch 天团博客集合2.2 实战中刻意练习在产品或项目架构、开发、运维的过程中,遇到问题,不要仅局限在解决问题本身,多刨根问底,问问底层原理是什么?举例 1:range query 对数值类型还是 keyword 类型来讲,哪种数据类型会更快?举例 2:index sort 真的很快吗?适用于什么场景?底层是如何实现的?这时候有遇到不明白的,可以翻阅官方文档,查看github issue记录,翻阅源代码,社区内讨论等......通过不断求证提升认知。2.3 上学下帮的练习社区中每天都有来自全国 N 多公司线上实战问题,看看别人都遇到了哪些问题?用了哪些技术栈?国内Elastic社区地址:https://elasticsearch.cn/国外Elastic社区地址:https://discuss.elastic.co/2.3.1 向上学向比自己厉害的人学习。社区社群中有大量的大牛,即便很“偏门”、“刁钻”的问题也能有非常独到的见解和思路,这就很值得我们去学习。学习什么?学习他们拆解问题、分析问题、定位问题以及解决问题的思路。2.3.2 向下帮帮助需要帮助的人,大家都有知识盲区。帮助别人的同时个人也能得到飞速的提升。正所谓:“讲一遍有一遍的收货”。对于求助人来说,“能把问题讲清楚、问题就解决了一大半”。而对于解答者来说:“输出倒逼输入,帮助别人排查问题,本质就是进一步提升自己的认知”。别的不说,社区 wood 大叔的文章基本就是解决自己或者别人问题而累积的干货,对 Elastic 学习者来说大有裨益。
2、Elasticsearch 异步搜索发布的版本Elasitcsearch V7.7.0版本。3、Elasticsearch 异步搜索适用场景异步搜索允许用户在异步搜索结果可用时检索它们,从而消除了仅在查询完全完成后才最终响应的情况。4、Elasticsearch 异步搜索实战4.1 执行异步检索执行如下操作的前提是:待异步检索的索引数据量非常大(其实小了也可以,但数据量大更契合一些)。否则普通索引会直接返回结果数据。拿个普通索引试验一下:POST kibana_sample_data_flights/_async_search?size=0{ "sort": [ { "timestamp": { "order": "asc" } } ], "aggs": { "sale_date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d" } } }}返回结果:{ "is_partial" : false, "is_running" : false, "start_time_in_millis" : 1628663114252, "expiration_time_in_millis" : 1629095114252, "response" : { "took" : 23, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 10000, "relation" : "gte" }, "max_score" : null, "hits" : [ ] },为了达到异步检索的目的,可以用推荐的测试写入数据的工具:https://github.com/oliver006/elasticsearch-test-data你是不是也困惑,没有测试数据或者没有一定数量的测试数据?小工具来了。产生100W+数据,一条指令:python es_test_data.py --es_url=http://172.21.0.14:19205 --count=1000000结果如下:Done - total docs uploaded: 1000000, took 71 seconds可以结合自己业务场景优化一下,python 代码编写,很适合封装成自己的小工具。POST test_data/_async_search?size=0{ "sort": [ { "last_updated": { "order": "asc" } } ], "aggs": { "sale_date": { "date_histogram": { "field": "last_updated", "calendar_interval": "1d" } } }}返回结果如下:{ "id" : "FjUxQURkZFZyUVVlUUNydjVSZXhmWGcedFJCVnRVSVhSdVM0emN2YXZfTU9ZQToyNzE3MTcy", "is_partial" : true, "is_running" : true, "start_time_in_millis" : 1628662256012, "expiration_time_in_millis" : 1629094256012, "response" : { "took" : 1008, "timed_out" : false, "terminated_early" : false, "num_reduce_phases" : 0, "_shards" : { "total" : 1, "successful" : 0, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 0, "relation" : "gte" }, "max_score" : null, "hits" : [ ] } }}如果看不到上述结果,说明数据量还不够大。可以再导入一些。核心返回参数解释一下:id——可用于监控其进度、检索其结果和/或删除它的异步搜索的标识符。is_partial——当查询不再运行时,指示在所有分片上搜索是失败还是成功完成。在执行查询时,is_partial 始终设置为 true。is_running——搜索是否仍在执行中或已完成。total——总体而言,将在多少个分片上执行搜索。successful——有多少分片已成功完成搜索。4.2 查看异步检索GET /_async_search/FjFoeU8xMHJKUW9pd1dzN1g2Rm9wOGcedFJCVnRVSVhSdVM0emN2YXZfTU9ZQToyNjYyNjk54.3 查看异步检索状态GET /_async_search/status/FjUxQURkZFZyUVVlUUNydjVSZXhmWGcedFJCVnRVSVhSdVM0emN2YXZfTU9ZQToyNzE3MTcy/4.4 删除/中止异步检索DELETE /_async_search/FjFoeU8xMHJKUW9pd1dzN1g2Rm9wOGcedFJCVnRVSVhSdVM0emN2YXZfTU9ZQToyNjYyNjk55、官方文档地址https://www.elastic.co/guide/en/elasticsearch/reference/current/async-search.html6、小结异步搜索认证考试环节,大家了解就可以,能找到官方文档位置,知道有哪几个API。实战业务环节结合业务需求去选型使用即可。但,更倾向于数据量极大的业务传统同步请求体验差的场景。你实战业务场景有没有使用异步检索?欢迎留言讨论。
Q1:Elasticsearch可以根据检索词在doc中的词频进行检索排序嘛?Q2:求教 ES 可以查询某个索引中某个text类型字段的词频数量最大值和词所在文档数最大值么?例:索引中有两个文档 doc1:{"text":""} 分词结果有两个北京,一个南京 doc2:{"text":""} 分词结果有一个北京想要一下结果:北京:词频3,文档量2 南京:词频1,文档量1Q3:对某些文章的词频统计除了用fielddata之外还有没有效率比较高的解决办法呢?目前统计有时候会遇到十万级的文章数直接在通过聚合效率上不是特别理想。如上三个问题都可以归结为:Elasticsearch 文档词频统计问题。该问题在检索、统计领域应用的非常多。那么 Elasticsearch 如何实现词频统计呢?有必要梳理一下。2、词频统计探讨之前的文章《Elasticsearch词频统计实现与原理解读》,解决的是:Q3 提及的某索引中特定关键词统计的问题。解决方案是:text 字段开启 fielddata,咱们在《长津湖影评可视化那么,对于给定文档的词频统计呢?原来开启 fielddata 的方案就可以实现,举例如下:DELETE message_indexPUT message_index{ "mappings": { "properties": { "message": { "analyzer": "ik_smart", "type": "text", "fielddata": "true" } } }}POST message_index/_bulk{"index":{"_id":1}}{"message":"沉溺于「轻易获得高成就感」的事情:沉溺于有意无意地寻求用很小付出获得很大「huibao」的偏方,哪怕huibao是虚拟的"}{"index":{"_id":2}}{"message":"过度追求“短期huibao”可以先思考这样一个问题:为什么玩王者荣耀沉溺我们总是停不下来huibao"}{"index":{"_id":3}}{"message":"过度追求的努力无法带来超额的huibao,就因此放弃了努力。这点在聪明人身上尤其明显。以前念本科的时候身在沉溺"}POST message_index/_search{ "size": 0, "query": { "term": { "_id": 1 } }, "aggs": { "messages": { "terms": { "size": 10, "field": "message" } } }}无非在聚合的时候,加上query 语句指定了特定 id 进行检索。这种方法的缺点在于:正如 Q3 所说,聚合效率低。看过上次直播的同学,可能会闪现一种想法,写入前打 tag 的方式能解决吗?可以解决,但有个前提。先画个图解释一下:这个打 tag 的字段非全量,而是特定的指定脚本处理的部分。下一小节详细实现一把。其实,除了开启 fielddata 和 打 tag 之外,在 Elasticsearch 中有 termvectors 接口也能实现文档词频统计。下一小节一并实现。3、词频统计实现3.1 text 开启 fielddata 后聚合方案第 2 部分已有实现说明,不再赘述。3.2 写入前打 tag,写入后聚合统计方案还是用第 2 部分的数据,说明如下:PUT _ingest/pipeline/add_tags_pipeline{ "processors": [ { "append": { "field": "tags", "value": [] } }, { "script": { "description": "add tags", "lang": "painless", "source": """ if(ctx.message.contains('成就')){ ctx.tags.add('成就') } if(ctx.message.contains('王者荣耀')){ ctx.tags.add('王者荣耀') } if(ctx.message.contains('沉溺')){ ctx.tags.add('沉溺') } """ } } ]}POST message_index/_update_by_query?pipeline=add_tags_pipeline{ "query": { "match_all": {} }}POST message_index/_search{ "size":0, "aggs": { "terms_aggs": { "terms": { "field": "tags.keyword" } } }}实现后,结果如下:这种统计的依然是:关键词(key)和文档(doc_count)的统计关系。什么意思呢?"key":“沉溺”,“doc_count”:3 本质含义是:“沉溺”在三个不同的文档中出现了。细心的读者会发现,文档 1 中“沉溺”出现了2次,这种打 tag 统计是不准确的。3.3 term vectors 统计PUT message_index{ "mappings": { "properties": { "message": { "type": "text", "term_vector": "with_positions_offsets_payloads", "store": true, "analyzer": "ik_max_word" } } }}解释一下:term_vector: 检索特定文档字段中分词单元的信息和统计信息。store: 默认未开启存储,需要手动设置为true。with_positions_offsets_payloads 是Lucene 的参数之一,释义如下:可以理解为:存储分词单元(term vector)、位置(Token position)、偏移值(offset)、有效负载(payload,猜测是ES 新增的)。默认会统计词频信息,默认term information 为true。此外,还有 term statistics 和 field statistics 类型供设置和实现不同的统计,详细内容参考官方文档即可。https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-termvectors.html执行:GET message_index/_termvectors/1?fields=message后的返回结果如下:这种基于特定文档的词频统计是传统意义上我们理解的词频统计。默认情况下,term vectors是实时的,而不是接近实时的。可以通过将 realtime 参数设置为 false 来更改。实时就意味着可能会有性能问题。3.4 先分词,后 term vectors 统计在我担心仅 termvectors 可能带来的性能问题的时候,我想到了如下的解决方案。前提:写入之前除了存储 message 字段,加了一个分词结果组合字段,该字段每个词用空格做分隔。message 字段的前置分词需要自己调用 analyzer API 实现。有了切词后的字段,再做统计会更快。具体实现如下:DELETE message_ext_indexPUT message_ext_index{ "mappings": { "properties": { "message_ext": { "type": "text", "term_vector": "with_positions_offsets_payloads", "store": true, "analyzer": "whitespace" } } }}POST message_ext_index/_bulk{"index":{"_id":1}}{"message_ext":"沉溺 于 轻易 获得 高 成就感 的 事情 沉溺 有意 无意 地 寻求 用 很小 付出 获得 很大 huibao 的 偏方 哪怕 huibao 是 虚拟 的"}GET message_ext_index/_termvectors/1?fields=message_ext强调一下:message_ext 使用的 whitespace 分词器。4、小结关于词频统计,本文给出四种方案。只有第3、4种方案结合termvectors 实现是严格意义上的词频统计,其他两种是词频-文档关系的统计。考虑到方式3的实时分词可能的性能问题,扩展想到方案4前置分词的方式,能有效提高统计效率。本质也是空间换时间。你的实战中如何实现的词频统计呢?欢迎留言说一下你的实现方式和思考。参考https://titanwolf.org/Network/Articles/Article?AID=7c417f9f-5bde-4519-9bd5-39957d184a07 https://discuss.elastic.co/t/word-count-frequency-per-field/159910 https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-termvectors.html
这时,可能想到的解决方案:方案一:重新创建索引时添加字段,清除已有数据再重新导入数据。方案二:重新创建索引时添加字段,原索引通过 reindex 写入到新索引。方案三:提前指定数据预处理,结合管道 ingest 重新导入或批量更新 update_by_query 实现。方案四:保留原索引不动,通过script 脚本实现。方案一、二类似,新加字段导入数据即可。方案三、方案四 我们模拟实现一把。2、方案三、四实现一把2.1 方案三 Ingest 预处理实现DELETE news_00001PUT news_00001{ "mappings": { "properties": { "emotion": { "type": "integer" } } }}POST news_00001/_bulk{"index":{"_id":1}}{"emotion":558}{"index":{"_id":2}}{"emotion":125}{"index":{"_id":3}}{"emotion":900}{"index":{"_id":4}}{"emotion":600}PUT _ingest/pipeline/my-pipeline{ "processors": [ { "script": { "description": "Set emotion flag param", "lang": "painless", "source": """ if (ctx['emotion'] < 300 && ctx['emotion'] > 0) ctx['emotion_flag'] = -1; if (ctx['emotion'] >= 300 && ctx['emotion'] <= 700) ctx['emotion_flag'] = 0; if (ctx['emotion'] > 700 && ctx['emotion'] < 1000) ctx['emotion_flag'] = 1; """ } } ]}POST news_00001/_update_by_query?pipeline=my-pipeline{ "query": { "match_all": {} }}方案三的核心:定义了预处理管道:my-pipeline,管道里做了逻辑判定,对于emotion 不同的取值区间,设置 emotion_flag 不同的结果值。该方案必须提前创建管道,可以通过写入时指定缺省管道 default_pipeline 或者结合批量更新实现。实际是两种细分实现方式:方式一:udpate_by_query 批量更新。而更新索引尤其全量更新索引是有很大的成本开销的。方式二:写入阶段指定预处理管道,每写入一条数据预处理一次。2.2 方案四 script 脚本实现POST news_00001/_search{ "query": { "match_all": {} }, "script_fields": { "emotion_flag": { "script": { "lang": "painless", "source": "if (doc['emotion'].value < 300 && doc['emotion'].value>0) return -1; if (doc['emotion'].value >= 300 && doc['emotion'].value<=700) return 0; if (doc['emotion'].value > 700 && doc['emotion'].value<=1000) return 1;" } } }}方案四的核心:通过 script_field 脚本实现。该方案仅是通过检索获取了结果值,该值不能用于别的用途,比如:聚合。还要注意的是:script_field 脚本处理字段会有性能问题。两种方案各有利弊,这时候我们会进一步思考:能不能不改 Mapping、不重新导入数据,就能得到我们想要的数据呢?早期版本不可以,7.11 版本之后的版本有了新的解决方案——Runtime fields 运行时字段。3、Runtime fields 产生背景Runtime fields 运行时字段是旧的脚本字段 script field 的 Plus 版本,引入了一个有趣的概念,称为“读取建模”(Schema on read)。有 Schema on read 自然会想到 Schema on write(写时建模),传统的非 runtime field 类型 都是写时建模的,而 Schema on read 则是另辟蹊径、读时建模。这样,运行时字段不仅可以在索引前定义映射,还可以在查询时动态定义映射,并且几乎具有常规字段的所有优点。Runtime fields在索引映射或查询中一旦定义,就可以立即用于搜索请求、聚合、筛选和排序。4、Runtime fields 解决文章开头问题4.1 Runtime fields 实战求解PUT news_00001/_mapping{ "runtime": { "emotion_flag_new": { "type": "keyword", "script": { "source": "if (doc['emotion'].value > 0 && doc['emotion'].value < 300) emit('-1'); if (doc['emotion'].value >= 300 && doc['emotion'].value<=700) emit('0'); if (doc['emotion'].value > 700 && doc['emotion'].value<=1000) emit('1');" } } }}GET news_00001/_search{ "fields" : ["*"]}4.2 Runtime fields 核心语法解读第一:PUT news_00001/_mapping 是在已有 Mapping 的基础上 更新 Mapping。这是更新 Mapping 的方式。实际上,创建索引的同时,指定 runtime field 原理一致。实现如下:PUT news_00002{ "mappings": { "runtime": { "emotion_flag_new": { "type": "keyword", "script": { "source": "if (doc['emotion'].value > 0 && doc['emotion'].value < 300) emit('-1'); if (doc['emotion'].value >= 300 && doc['emotion'].value<=700) emit('0'); if (doc['emotion'].value > 700 && doc['emotion'].value<=1000) emit('1');" } } }, "properties": { "emotion": { "type": "integer" } } }}第二:更新的什么呢?加了字段,确切的说,加了:runtime 类型的字段,字段名称为:emotion_flag_new,字段类型为:keyword,字段数值是用脚本 script 实现的。脚本实现的什么呢?当 emotion 介于 0 到 300 之间时,emotion_flag_new 设置为 -1 。当 emotion 介于 300 到 700 之间时,emotion_flag_new 设置为 0。当 emotion 介于 700 到 1000 之间时,emotion_flag_new 设置为 1。第三:如何实现检索呢?我们尝试一下传统的检索,看一下结果。我们先看一下 Mapping:{ "news_00001" : { "mappings" : { "runtime" : { "emotion_flag_new" : { "type" : "keyword", "script" : { "source" : "if (doc['emotion'].value > 0 && doc['emotion'].value < 300) emit('-1'); if (doc['emotion'].value >= 300 && doc['emotion'].value<=700) emit('0'); if (doc['emotion'].value > 700 && doc['emotion'].value<=1000) emit('1');", "lang" : "painless" } } }, "properties" : { "emotion" : { "type" : "integer" } } } }}多了一个 runtime 类型的字段:emotion_flag_new。执行:GET news_00001/_search返回结果如下:执行:1. GET news_00001/_search 2. { 3. "query": { 4. "match": { 5. "emotion_flag_new": "-1" 6. } 7. } 8. }返回结果如下:执行:1. GET news_00001/_search 2. { 3. "fields" : ["*"], 4. "query": { 5. "match": { 6. "emotion_flag_new": "-1" 7. } 8. } 9. }返回结果如下:4.3 Runtime fields 核心语法解读为什么加了:field:[*] 才可以返回检索匹配结果呢?因为:Runtime fields 不会显示在:_source 中,但是:fields API 会对所有 fields 起作用。如果需要指定字段,就写上对应字段名称;否则,写 * 代表全部字段。4.4 如果不想另起炉灶定义新字段,在原来字段上能实现吗?其实上面的示例已经完美解决问题了,但是再吹毛求疵一下,在原有字段 emotion 上查询时实现更新值可以吗?实战一把如下:POST news_00001/_search{ "runtime_mappings": { "emotion": { "type": "keyword", "script": { "source": """ if(params._source['emotion'] > 0 && params._source['emotion'] < 300) {emit('-1')} if(params._source['emotion'] >= 300 && params._source['emotion'] <= 700) {emit('0')} if(params._source['emotion'] > 700 && params._source['emotion'] <= 1000) {emit('1')} """ } } }, "fields": [ "emotion" ]}返回结果:解释一下:第一:原来 Mapping 里面 emotion是 integer 类型的。第二:我们定义的是检索时类型,mapping 没有任何变化,但是:检索时字段类型 emotion 在字段名称保持不变的前提下,被修改为:keyword 类型。这是一个非常牛逼的功能!!!早期 5.X、6.X 没有这个功能的时候,实际业务中我们的处理思路如下:步骤一:停掉实时写入;步骤二:创建新索引,指定新 Mapping,新增 emotion_flag 字段。步骤三:恢复写入,新数据会生效;老数据 reindex 到新索引,reindex 同时结合 ingest 脚本处理。有了 Runtime field,这种相当繁琐的处理的“苦逼”日子一去不复回了!5、Runtime fields 适用场景比如:日志场景。运行时字段在处理日志数据时很有用,尤其是当不确定数据结构时。使用了 runtime field,索引大小要小得多,可以更快地处理日志而无需对其进行索引。6、Runtime fields 优缺点优点 1:灵活性强运行时字段非常灵活。主要体现在:需要时,可以将运行时字段添加到我们的映射中。不需要时,轻松删除它们。删除操作实战如下:PUT news_00001/_mapping{ "runtime": { "emotion_flag": null }}也就是说将这个字段设置为:null,该字段便不再出现在 Mapping 中。优点 2:打破传统先定义后使用方式运行时字段可以在索引时或查询时定义。由于运行时字段未编入索引,因此添加运行时字段不会增加索引大小,也就是说 Runtime fields 可以降低存储成本。优点3:能阻止 Mapping 爆炸Runtime field 不被索引(indexed)和存储(stored),能有效阻止 mapping “爆炸”。原因在于 Runtime field 不计算在 index.mapping.total_fields 限制里面。缺点1:对运行时字段查询会降低搜索速度对运行时字段的查询有时会很耗费性能,也就是说,运行时字段会降低搜索速度。7、Runtime fields 使用建议权衡利弊:可以通过使用运行时字段来减少索引时间以节省 CPU 使用率,但是这会导致查询时间变慢,因为数据的检索需要额外的处理。结合使用:建议将运行时字段与索引字段结合使用,以便在写入速度、灵活性和搜索性能之间找到适当的平衡。8、小结本文通过实战中添加字段的问题引出解决问题的几个方案;传统的解决方案大多都需要更改 Mapping、重建索引、reindex 数据等,相对复杂。因而,引申出更为简单、快捷的 7.11 版本后才有的方案——Runtime fields。Runtime fields 的核心知识点如下:Mapping 环节定义;在已有 Mapping 基础上更新;检索时使用 runtime fields 达到动态添加字段的目的;覆盖已有 Mapping 字段类型,保证字段名称一致的情况下,实现特定用途优缺点、适用场景、使用建议。你在实战环节使用 Runtime fields 了吗?效果如何呢?欢迎留言反馈交流。参考https://opster.com/elasticsearch-glossary/runtime-fields/https://www.elastic.co/cn/blog/introducing-elasticsearch-runtime-fieldshttps://dev.to/lisahjung/beginner-s-guide-understanding-mapping-with-elasticsearch-and-kibana-3646https://www.elastic.co/cn/blog/getting-started-with-elasticsearch-runtime-fields
如何系统的学习 Elasticsearch ?刻意练习 Elasticsearch 10000 个小时,鬼知道经历了什么?!重磅 | Elasticsearch 7.X学习路线图Elasticsearch 学习,请先看这一篇!死磕 Elasticsearch 方法论:普通程序员高效精进的 10 大狠招!从实战中来,到实战中去——Elasticsearch 技能更快提升方法论视频 | Elasticsearch 学习,没有数据怎么办?全网最牛逼的 Elasticsearch 天团博客集合如何做一次 Elasticsearch 技术分享?你的 Elasticsearch 难题,官方文档早就有了答案......开干!Elasticsearch官方文档离线访问实操指南严选 | ELK Stack 选书指南02 Elasticsearch 7.X进阶实战视频全网首发!《 Elasticsearch 最少必要知识教程 V1.0 》低调发布视频 | Elasticsearch 7.X 进阶实战私训课03 Elasticsearch 适用场景Elasticsearch Top5典型应用场景Elasticsearch, 你值得拥有!—— 云栖大会 Elasticsearch 场景化应用全景回顾04 Elasticsearch 架构选型指南Elasticsearch架构选型指南——不止是搜索引擎,还有......干货 | Elasticsearch 方案选型必须了解的10件事探究 | Elasticsearch 与传统数据库界限探究 | Elasticsearch 不支持事务有什么好的弥补方案吗?且慢!听说你线上环境准备选型 Elasticsearch SQL 了?从提高 Elasticsearch 搜索体验说开去......大事!!Elasticsearch 和 Kibana 换开源协议了.....05 Elasticsearch 底层原理Elasticsearch 内部数据结构深度解读吃透 | Elasticsearch filter 和 query 的不同图解Elasticsearch之一——索引创建过程Elasticsearch存储深入详解Elasticsearch 缓存深入详解干货 | 吃透Elasticsearch 堆内存Elasticsearch写入原理深入详解关于 Elasticsearch 段合并,这一篇说透了!深入解读 Elasticsearch 热点线程 hot_threadsElasticsearch 线程池和队列问题,请先看这一篇干货 | 一步步拆解 Elasticsearch BM25 模型评分细节Elasticsearch 中为什么会有大量文档插入后变成 deleted?06 Elasticsearch 基础必知必会Elasticsearch 7.0 正式发布,盘他!Elasticsearch 基础但非常有用的功能之一:别名干货 | Elasticsearch 基础但非常有用的功能之二:模板Elasticsearch词频统计实现与原理解读干货 | Elasticsearch、Kibana数据导出实战干货 |《深入理解Elasticsearch》读书笔记干货 | Elasticsearch 布道者Medcl对话携程Wood大叔核心笔记干货 | Elasticsearch7.X Scripting脚本使用详解07 Elasticsearch 集群规划与部署最佳实践上线必备 | 高性能 ES5.X 部署配置清单刨根问底 Elasticsearch 6.2.2 X-Pack 部署及使用详解Elasticsearch 5.X 集群多节点角色配置深入详解关于 Elasticsearch 集群核心配置,腾讯大佬的灵魂9问,你能接住几个?Elasticsearch 生产环境集群部署最佳实践牛逼!Elasticsearch 集群更换节点角色有了更快的方式干货 | Elasticsearch 冷热集群架构实战Elasticsearch集群管理之1——如何高效的添加、删除节点?探究 | Elasticsearch集群规模和容量规划的底层逻辑08 Elasticsearch 数据建模实战干货 | 论 Elasticsearch 数据建模的重要性视频 | Elasticsearch 数据建模实战指南干货 | Elasticsearch5.X Mapping 万能模板从一个实战问题再谈 Elasticsearch 数据建模干货 | Elasticsearch 多表关联设计指南Elasticsearch 6.X 新类型Join深入详解Elasticsearch Nested 选型,先看这一篇!干货 | Elasticsearch Nested类型深入详解干货 | 拆解一个 Elasticsearch Nested 类型复杂查询问题09 Elasticsearch 数据预处理指南Elasticsearch 预处理没有奇技淫巧,请先用好这一招!Elasticsearch 的ETL利器—— Ingest 节点干货 | Logstash Grok数据结构化ETL实战干货 | Logstash自定义正则表达式ETL实战两个 Elasticsearch 线上实战问题及解读10 Elasticsearch 检索实战你必须知道的23个最有用的 Elasticseaerch 检索技巧DSL的诞生 | 复杂 sql 转成 Elasticsearch DSL 深入详解干货 | 全方位深度解读 Elasticsearch 分页查询实战 | Elasticsearch 实现类Google高级检索Elasticsearch 实战 | match_phrase搜不出来,怎么办?探究 | 明明存在,怎么搜索不出来呢?Elasticsearch能检索出来,但不能正确高亮怎么办?实战 | Elasticsearch自定义评分的N种方法Elasticsearch 警惕使用 wildcard 检索!然后呢?你用过Elasticsearch Percolate 反向检索吗?Elasticsearch 多字段查询 best_fields、most_fields、cross_fields,傻傻分不清楚?Elasticsearch 如何实现类主流搜索引擎广告置顶显示效果?Elasticsearch 字段膨胀不要怕,Flattened 类型解千愁!Elasticsearch 6.X 去重详解fingerprint filter 插件——Elasticsearch 去重必备利器一步步拆解解决 Elasticsearch 检索模板问题抢先 | 支持 sql 的 Elasticsearch 6.3全景概览11 Elasticsearch 吃透聚合基于儿童积木玩具图解 Elasticsearch 聚合干货 | 通透理解Elasticsearch聚合Elasticsearch 如何实现查询/聚合不区分大小写?Elasticsearch 聚合数据结果不精确,怎么破?Elasticsearch聚合后分页深入详解Composite 聚合——Elasticsearch 聚合后分页新实现Elasticsearch 高基数聚合性能提升3倍,改动了什么?Elasticsearch聚合优化 | 聚合速度提升5倍!Elasticsearch 聚合性能优化六大猛招12 Elasticsearch 集群高可用性保障干货 | Elasitcsearch7.X集群/索引备份与恢复实战干货 | Elasticsearch 可搜索快照深入详解13 Elasticsearch 安全最佳实战指南成人网站泄露 108 亿数据后,一个 Elasticsearch 爱好者的思考干货 | Elasticsearch 7.1免费安全功能全景认知你的 Elasticsearch在裸奔吗?干货 | Elasticsearch7.X X-Pack 基础安全实操详解14 Elasticsearch 索引生命周期管理(ILM)实战干货 | Elasticsearch 索引生命周期管理探索干货 | Elasticsearch 趋势科技实战分享笔记干货 | Elasticsearch索引管理利器——Curator深入详解干货 | Elasticsearch 索引生命周期管理 ILM 实战指南视频 | Elasticsearch ILM索引生命周期管理Elasticsearch 7.X data stream 深入详解15 Elasticsearch 数据同步,一网打尽!干货 | Debezium 实现 Mysql 到 Elasticsearch 高效实时同步实战 | canal 实现 Mysql 到 Elasticsearch 实时增量同步logstash_output_kafka:Mysql 同步 Kafka 深入详解Elasticsearch 跨网络、跨集群同步选型指南16 Elasticsearch 实战避坑指南Elasticsearch 是一把梭,用起来再说?!好奇?!Elasticsearch 25 个必知必会的默认值Elasticsearch 常见的 8 种错误及最佳实践Elasticsearch常见的5个错误及解决策略干货 | Elasticsearch 6个不明显但很重要的注意事项Elasticsearch 空值处理实战指南17 Elasticsearch 十万个怎么办?Elasticsearch 究竟要设置多少分片数?深究|Elasticsearch 单字段支持的最大字符数?Elasticsearch 如何自定义扩展词库?Elasticsearch 设置默认值的三种方式?Elasticsearch 滞后8个小时等时区问题,一网打尽!探究 | Elasticsearch 如何物理删除给定期限的历史数据?如何不写一行代码把 Mysql json 字符串解析为 Elasticsearch 的独立字段?Elasticsearch 自定义分词同义词环节的这个细节不大好理解......18 Elasticsearch 开发最佳实战指南干货 | Elasticsearch 索引设计实战指南Elasticsearch 实战 | 必要的时候,还得空间换时间!Elasticsearch 架构解析与最佳实践干货 | Elasticsearch 开发人员最佳实战指南Elasticsearch 解决问题之道——请亮出你的DSL!干货 | Elasticsearch 开发实战常用命令清单19 Elasticsearch 运维最佳实战指南干货 | Elasticsearch Top10 指标你不得不关注的 Elasticsearch Top X 关键指标干货 | Elasticsearch 运维实战常用命令清单Elasticsearch 开发运维实战核心 TipsElasitcsearch 开发运维常用命令集锦干货 | Elasticsearch 集群健康值红色终极解决方案干货 | Elasticsearch 集群黄色原因的终极探秘严选 | Elasticsearch 史上最全最常用工具清单Elasticsearch 集群故障排查及修复指南实战 | 一步步排查基于业务场景的Elasticsearch难题!Elasticsearch 线上问题排查——搞一天了,明天还要给客户解决这个问题20 Elasticsearch 检索性能优化为什么Elasticsearch查询变得这么慢了?Elasticsearch 高级调优方法论之——根治慢查询!Elasticsearch 性能调优指南——推荐实战 DSLElasticsearch 大文件检索性能提升20倍实践(干货)让 Elasticsearch 飞起来!——性能优化实践干货Elasticsearch 性能优化实战指南干货 | Elasticsearch通用优化建议思维导图 | Elasticsearch加速检索的15个核心Tips21 Elasticsearch 写入性能优化Elasticsearch:从写入原理谈写入优化干货 | Elasticsearch Reindex性能提升10倍+实战22 Elasticsearch 实战练手小项目干货 | ELK 日志实时分析实战Elasticsearch 实战 | 如何从数千万手机号中识别出情侣号?实战 | Elasticsearch 打造知识库检索系统Elasticsearch 全文检索实战小结Elasticsearch 索引增量统计及定时邮件实现干货 | 知识库全文检索的最佳实践实战 | ELK 实现全量Elastic日报(2017-2019)多维度可视化分析相信坚持的力量!Elastic 日报 1000期+ 了......项目实战 01:将唐诗三百首写入 Elasticsearch 会发生什么?基于 Elasticsearch + kibana 实现 IP 地址分布地图可视化23Elasticsearch 笔试面试必备Elasticsearch 搜索工程师笔试面试,请先看这 10 条建议!擦亮慧眼——找工作避坑指北!今天面试问到一个 Elasticsearch 问题,给我问懵逼了......干货 | BAT等一线大厂 Elasticsearch面试题解读Elasticsearch Top 51 重中之重面试题及答案24 Elasticsearch 认证必备Elastic 认证考试,请先看这一篇!Elastic 认证工程师到底有没有用?全网首发 | Elasticsearch 认证(ECE)最新考试大纲解读Elastic 认证工程师考试最常被问到 Top10 +问题集锦潜心一技、做到极致!——Elastic认证工程师之路视频:手敲脑图串讲 Elastic 认证核心考点干货 | 95后运维小哥20天+通过 Elastic 认证考试经验分享坚持打卡41天,我通过了 Elastic 认证考试! “金三银四“,敢不敢“试”?穿过疫情,如何成就技术人的又一次飞跃?一个 70 后运维老兵的 Elastic 认证工程师之路能拿驾照就能通过 Elastic 认证考试!25 个人提升方法论信息过载的时代,程序员如何破局?升级这十点认知,你就是大佬!认知升级——不做开始爱好者!软技能,程序员编程之外的升值之道!有时间吗?看本书吧《一年顶十年》干货读书笔记坚持是很好的品格——阮一鸣老师微信交流心得打卡学习——应对焦虑的一剂良方!一个好习惯——每年至少更新一次简历你不得不读的好书 ——《此生未完成》读后感更多干货,同步更新 https://elastic.blog.csdn.net/
2、Elasticsarch 字段膨胀Elasticsearch Mapping 如果不做特殊设置,默认为 dynamic。dynamic 的本质就是:不加约束的动态添加字段。这样对某些日志场景,可能会产生大量的未知字段。字段如果持续激增,就会达到 Elasticsearch Mapping 层面的默认上限,对应设置和默认大小为:index.mapping.total_fields.limit:1000。我们把这种非预期字段激增的现象或结果称为:字段膨胀。拿自己线上环境示例,说一下 dynamic 的副作用。在一个实际业务环境,混淆了检索和写入的语法,会导致将检索语句动态认定为新增 Mapping 字段。当然,如果是非常复杂的大 bool 检索语句,会导致 Mapping 变得非常复杂甚至会出现字段膨胀的情况。当然,可行的解决方案就是:dynamic 设置为 false,甚至更为严谨的推荐方式:将 dynamic 设置为 strict。2.1 解决字段膨胀方案一:dynamic 设置为 falsedynamic 设置为 false 后,新来的非 mapping 预设字段数据可以写入,但是:不能被检索,仅支持 Get 获取文档的方式通过 _source 查看详情内容。举例如下:2.2 解决字段膨胀方案二:dynamic 设置为 strictdynamic 一旦设置为:strict,会“阻止一切来犯之敌”,一切索引创建阶段指定的 Mapping 字段之外的字段名称都将会报错。设置为 strict 后,再动态插入数据,会报错如下:{ "error" : { "root_cause" : [ { "type" : "strict_dynamic_mapping_exception", "reason" : "mapping set to strict, dynamic introduction of [cont] within [_doc] is not allowed" } ], "type" : "strict_dynamic_mapping_exception", "reason" : "mapping set to strict, dynamic introduction of [cont] within [_doc] is not allowed" }, "status" : 400}3、Flattened 类型产生的背景如前分析,将 dynamic 设置为 false 或者 strict 不是普适的解决方案 ,如日志场景需求如下:一方面:期望能动态添加字段。strict 过于严谨会导致新字段数据拒绝写入,dynamic 过于松散会字段膨胀。另一方面:不期望索引字段膨胀。这就导致同时满足上述两个方面的 Flattend 字段的诞生。Flattened 中文释义:“压扁、弄平”,实际就是字段扁平化的意思。当面临处理包含大量不可预测字段的文档时,使用 Flattend 类型可以通过将整个 JSON 对象及其嵌套 Nested 字段索引为单个关键字 keyword 类型字段来帮助减少字段总数。Flattened 类型的最早发布在:7.3 版本。4、Flattened 类型解决的根本问题特定日志场景、电商场景,Elasticsearch Mapping 字段数有时是无法预知的。如果随着新写入数据激增,字段也激增,可能带来的后果是什么呢?Elasticsearch 必须为每个新字段更新集群状态,并且必须将此集群状态传递给所有节点。由于跨节点的集群状态传输是单线程操作,因此需要更新的字段映射越多,完成更新所需的时间就越长。这种延迟通常大大降低集群性能,有时会导致整个集群宕机。这被称为“ Mapping 爆炸”(mapping explosion)。这也是 Elasticsearch 从 5.x 及更高版本将索引中的字段数限制为 1000 的原因之一。如果实战业务场景字段数超过 1000,我们必须手动更改默认索引字段限制或者重新考虑架构重构。修改默认值的方式如下:PUT record_infos{ "settings": { "index.mapping.total_fields.limit": 2000 }}Flattened 扁平化字段就是解决:“Mapping 爆炸”问题的。5、Flattened 类型实战解读5.1 Flattened 类型真容千呼万唤始出来,Flattend 真容如下:这和 Integer、long、nested、join 等都属于字段类型的范畴。Flattened 本质是:将原来一个复杂的 Object 或者 Nested 嵌套多字段类型统一映射为偏平的单字段类型。这里要强调的,不管原来内嵌多少个字段,内嵌多少层,有了 Flattend,一下都打平!!5.2 基于 Flattened 类型插入数据基于上面的 Mapping,写入一条数据如下:PUT demo-flattened/_doc/1{ "message": "[5592:1:0309/123054.737712:ERROR:child_process_sandbox_support_impl_linux.cc(79)] FontService unique font name matching request did not receive a response.", "fileset": { "name": "syslog" }, "process": { "name": "org.gnome.Shell.desktop", "pid": 3383 }, "@timestamp": "2020-03-09T18:00:54.000+05:30", "host": { "hostname": "bionic", "name": "bionic" }}这时候再查看 Mapping, 如下:由于 host 字段设置为:Flattened,其下的:hostname、name 字段都不再映射为特定嵌套子字段。5.3 更新 Flattened 字段,添加数据POST demo-flattened/_update/1{ "doc": { "host": { "osVersion": "Bionic Beaver", "osArchitecture": "x86_64" } }}再次查看 Mapping,依然“岿然不动”。继续 Flattened 拉平,没有字段扩增,也就不会再有 “Mapping 爆炸”出现。5.4 Flattened 类型检索以下两种检索都会召回数据:GET demo-flattened/_search{ "query": { "term": { "host": "Bionic Beaver" } }}GET demo-flattened/_search{ "query": { "term": { "host.osVersion": "Bionic Beaver" } }}而,如下的检索,则返回结果为空。GET demo-flattened/_search{ "query": { "match": { "host.osVersion": "bionic beaver" } }}GET demo-flattened/_search{ "query": { "match": { "host.osVersion": "Beaver" } }}为什么呢?由于使用 Flattened 扁平化类型,Elasticsearch 未对该字段进行分析,因此它只会返回匹配字母大小写且完全一致的结果。如上检索结果和 keyword 类型检索结果一致。这也初步暴露出:Flattened 类型的部分缺陷。5.5 Flattend 类型的不足每当面临 Flattened 扁平化对象的决定时,在选型 Elasticsearch 扁平化数据类型时,我们需要考虑以下几个关键限制:Flattened 类型支持的查询类型目前仅限于以下几种:termtermsterms_setprefixrangematch and multi_matchquery_string and simple_query_stringexistsFlattened 不支持的查询类型如下:无法执行涉及数字计算的查询,例如:range query。无法支持高亮查询。尽管支持诸如 term 聚合之类的聚合,但不支持处理诸如“histograms”或“date_histograms”之类的数值数据的聚合。6、小结Flattened 类型的出现,解决了字段膨胀引起的 Mapping 爆炸问题,如果您的生产环境高于7.3版本,有文章开头类似问题,可以小心求证、大胆尝试这种新类型。您生产环境使用 Flattened 类型了吗?您有没有遇到过字段膨胀或“Mapping 爆炸”问题,是如何解决的?欢迎留言说一下您的实战思考!ps:文章标题灵感起源于球友微信交流,对球友表示感谢!参考https://coralogix.com/blog/flattened-datatype-mappings-elasticsearch-tutorial/https://www.elastic.co/guide/en/elasticsearch/reference/master/flattened.html#flattened
2、Nested 产生的背景博文“干货 | Elasticsearch Nested类型深入详解”有讲过,这里再强调一遍。Object 对象类型对数据拉平存储,不能实现精准检索匹配,会召回不满足查询结果的数据,才导致 Nested 的出现。举例如下:PUT laptops-demo/_doc/1{ "id": 1, "name": "联想ThinkPad P1(01CD)", "price": 29999, "brand": "Lenovo", "attributes": [ { "attribute_name": "cpu", "attribute_value": "Intel Core i9-10885H" }, { "attribute_name": "memory", "attribute_value": "32GB" }, { "attribute_name": "storage", "attribute_value": "2TB" } ]}GET laptops-demo/_searchGET laptops-demo/_search{ "query": { "bool": { "must": [ { "match": { "attributes.attribute_name": "cpu" }}, { "match": { "attributes.attribute_value": "32GB" }} ] } }}如上逻辑混乱的检索条件(CPU,8GB 内存混搭的条件),也能召回数据,这不是实际业务需要满足的场景。本质原因:默认不指定 Mapping,如上的 attributes 里的键值对元素映射为:Object 类型。而 Object 存储类似是单个文档拉平存储的,如下所示:{ "id": 1,"name": "联想ThinkPad P1(01CD)","price": 29999,"brand": "Lenovo","attributes.attribute_name": ["cpu", "memory", "storage"], "attributes.attribute_value": [“Intel Core i9-10885H”, “32GB”, “2TB”] }有了 Nested 类型,则可以很好的规避上述检索不精确的问题。3、Nested 如何存储数据的?先将一下上小节例子,用 Nested 类型实现如下:注意:若选型 Nested,必须 Mapping 层面明确设定 Nested。PUT laptops-demo{ "mappings": { "properties": { "id": { "type": "long" }, "name": { "type": "text", "analyzer": "ik_max_word" }, "price": { "type": "long" }, "brand": { "type": "keyword" }, "attributes": { "type": "nested", "properties": { "attribute_name": { "type": "keyword" }, "attribute_value": { "type": "keyword" } } } } }}还是插入之前数据。PUT laptops-demo/_doc/1{ "id": 1, "name": "联想ThinkPad P1(01CD)", "price": 29999, "brand": "Lenovo", "attributes": [ { "attribute_name": "cpu", "attribute_value": "Intel Core i9-10885H" }, { "attribute_name": "memory", "attribute_value": "32GB" }, { "attribute_name": "storage", "attribute_value": "2TB" } ]}这里想说明的是:对于如下的拼凑且逻辑混乱的检索条件,不再返回数据。GET laptops-demo/_search{ "query": { "nested": { "path": "attributes", "query": { "bool": { "must": [ { "match": { "attributes.attribute_name": "cpu" } }, { "match": { "attributes.attribute_value": "8GB" } } ] } } } }}只有将上面的 “8GB” 换成与 cpu 对应的属性值:“Intel Core i9-10885H”,才会有数据召回。这体现了 Nested 的好处,实现了对象元素的“精准打击(检索)”。嵌套对象将数组中的每个对象作为单独的隐藏文档(hidden separate document)进行索引。拿上面的“联想电脑”的例子,用户看到写入了一个文档(对应文档id为1,包含三组attributes属性对象数据),实际后台是:4 个 Lucene 文档。4个 Lucene 文档组成:1个父 Lucene 文档 (或者叫做:根文档)+ 3 个 Lucene 隐藏文档(nested 对象文档)。具体文档存储大致拆解如下所示:父文档或根文档{ "id": 1, "name": "联想ThinkPad P1(01CD)", "price": 29999, "brand": "Lenovo"}第二个文档{ "attributes.attribute_name": "cpu", "attributes.attribute_value": "Intel Core i9-10885H"}第三个文档{ "attributes.attribute_name": "memory", "attributes.attribute_value": "32GB"}第四个文档{ "attributes.attribute_name": "storage", "attributes.attribute_value": "2TB"}这意味着:如果 attributes 键值对增多,假设上述示例中电脑的相关属性有100个,那么后台就会有:101 个文档。这看似风平浪静的存储,实际造成的风险还是很大的。同样的一份文档写入,若包含 Nested 类型,则意味着有 N 个Nesed 子对象,就会多出 N 个 隐形文档,写入必然会有压力。同样的,检索原来一个文档搞定,现在要在根文档及周边的 N 个隐形文档附件召回数据,势必检索性能也会受到影响。大家似乎能隐约感觉到了:写入慢、更新慢、检索慢的问题所在了!4、Nested 实战问题及解答4.1 Nested 新增或更新子文档操作,为什么需要更新整个文档?嵌套 Nested 文档在物理上位于根文档旁边的 Lucene 段中。这是为什么当只想更改单个嵌套文档时必须重新索引根文档和所有嵌套 Nested 文档的原因。4.2 Nested 文档和父子文档(join 类型)本质区别?6.X 之前的版本:父子文档是独立不同 type 实现,6.X 之后版本父、子文档要必须位于同一个分片上。Nested 的存储不同之处如前所述:嵌套文档和父文档(根文档)必须在段中依次相邻。这样对比下来,仅就更新操作:Join 父子文档是独立的,可以独立更新;而 Nested 嵌套文档需要全文档进行重建(类似:reindex 操作)。4.3 为什么 Nested 写入和更新操作会很慢?当需要:新增数据、修改数据的时候,看似更新子文档,实际整个document 都需要重建(类似:reindex 操作),这是更新慢的根源。因为频繁的更新会产生很多的嵌套文档,则创建的 Lucene 文档量会很大。5、Nesed 选型注意事项第一:如果需要索引对象数组而不是单个对象,请先考虑使用嵌套数据类型Nested。第二:Object、Nested 选型注意:如果不需要对 Nested 子文档精确搜索的就选型 object,需要的选型 Nested。第三 :对于子文档相对更新不频繁的场景,推荐使用:Nested 类型。注意:Nested 需要 Nested 专属 query,Nested 专属聚合实现检索和聚合操作。第四:对于子文档频繁更新的场景,推荐使用 Join 父子文档,其特点是:父、子文档分离,但在同一分片上,写入性能能相对快,但检索性能较 Nested 更慢。Join 类型检索的结果不能同时返回父子文档,一次 join 查询只能返回一种类型的文档(除非:inner_hits)。第五:早期版本 Github issue 有提及:inner_hits 检索的性能问题,新版本已有改善,但使用也要注意,使用前做一下:不加 inner_hits 和加 inner_hits 的响应时间比较。https://github.com/elastic/elasticsearch/issues/14229第六:为避免 Nested 造成的索引膨胀,需要灵活结合如下两个参数,合理设置索引或者模板。更新 Mapping 操作如下:PUT products/_settings{ "index.mapping.nested_fields.limit": 10, "index.mapping.nested_objects.limit": 500}index.mapping.nested_fields.limit 含义:一个索引中不同的 Nested 字段类型个数。如果超出,会报错如下: "reason" : "Limit of nested fields [1] has been exceeded"index.mapping.nested_objects.limit 含义:一个 Nested 字段中对象数目,多了可能导致内存泄露。这个字段,在我验证的时候,没有达到预期,后面我再通过留言补充完善。第七:选型 Nested 类型或者 Join 类型,就势必提前考虑性能问题,建议多做一些压力测试、性能测试。6、小结多表关联是很多业务场景的刚需。但,选型一定要慎重,了解一下各种多表关联类型:Nested、Join 的底层存储和逻辑,能有助于更好的选型。Nested 本质:将 Nested 对象作为隐藏的多个子文档存储,一次更新或写入子文档操作会需要全部文档的更新操作。上周末深圳的 meetup 提及:整合 mongoDB + ES 的解决方案(见文末社区参考链接),mongo DB 实现了多表融合,也就是大宽表一步到位,本质是:空间换时间的方案。该方案我认为脑洞非常大,如果业务层面可行,大家可以借鉴。您使用 Nested 遇到哪些问题?欢迎留言交流。参考https://medium.com/codex/learn-advanced-crud-and-search-queries-for-nested-objects-in-elasticsearch-from-practical-examples-7aebc1408d6fhttps://discuss.elastic.co/t/whats-nested-documents-layout-inside-the-lucene/59944https://www.elastic.co/guide/en/elasticsearch/reference/current/nested.htmlhttps://discuss.elastic.co/t/nested-types-are-slower/90546https://elasticsearch.cn/slides/284
2、时区问题拆解我们通过如下几个问题展开拆解。2.1 Elasticserch 默认时区是?能改吗?官方文档强调:在 Elasticsearch 内部,日期被转换为 UTC时区并存储为一个表示自1970-01-01 00:00:00 以来经过的毫秒数的值。Internally, dates are converted to UTC (if the time-zone is specified) and stored as a long number representing milliseconds-since-the-epoch.https://www.elastic.co/guide/en/elasticsearch/reference/current/date.htmlElasticsearch date 类型默认时区:UTC。正如官方工程师强调(如下截图所示):Elasticsearch 默认时区不可以修改。https://discuss.elastic.co/t/index-creates-in-different-timezone-other-than-utc/148941但,我们可以“曲线救国”,通过:ingest pipeline 预处理方式写入的时候修改时区;logstash filter 环节做时区转换;查询时指定时区;聚合时指定时区。2.2 Kibana 默认时区是?能改吗?kibana 默认时区是浏览器时区。可以修改,修改方式如下:Stack Management -> Advanced Settings ->Timezone for data formatting.2.3 Logstash 默认时区是?能改吗?默认:UTC。可以通过中间:filter 环节进行日期数据处理,包括:转时区操作。小结一下:logstash 默认 UTC 时区。Elasticsearch 默认 UTC 时区。Kibana 默认浏览器时区,基本我们用就是:东八区。如果基于Mysql 同步数据,Mysql 数据是:东八区。我们看一下东8区百度百科定义:东八区(UTC/GMT+08:00)是比世界协调时间(UTC)/格林尼治时间(GMT)快8小时的时区,理论上的位置是位于东经112.5度至127.5度之间,是东盟标准的其中一个候选时区。当格林尼治标准时间为0:00时,东八区的标准时间为08:00。通过上面的定义,能加深对 logstash 同步数据后,数据滞后8小时的理解。3、时区问题解决方案基于上面的分析,如何解决时区问题呢?由于 kibana 支持手动修改时区,不在下文讨论 的范围之内。实战项目中,自己根据业务需求修改即可。那么问题就转嫁为:写入的时候转换成给定时区(如:东8区)就可以了。3.1 方案一:ingest 预处理为东8区时区步骤 1:定义预处理管道:chage_utc_to_asiash(名称自己定义即可)。在该管道中实现了时区转换。步骤 2:创建索引同时指定缺省管道:chage_utc_to_asiash。步骤 3:写入数据(单条或 bulk 批量均可)PUT _ingest/pipeline/chage_utc_to_asiash{ "processors": [ { "date" : { "field" : "my_time", "target_field": "my_time", "formats" : ["yyyy-MM-dd HH:mm:ss"], "timezone" : "Asia/Shanghai" } } ]}PUT my-index-000001{ "settings": { "default_pipeline": "chage_utc_to_asiash" }, "mappings": { "properties": { "my_time": { "type": "date" } } }}PUT my-index-000001/_doc/1{ "my_time": "2021-08-09 08:07:16"} 当写入数据后,执行检索时,kibana dev tool 返回结果如下: "hits" : [ { "_index" : "my-index-000001", "_type" : "_doc", "_id" : "1", "_score" : 1.0, "_source" : { "my_time" : "2021-08-09T08:07:16.000+08:00" } } ]最明显的特征是:多了+08:00 时区(东8区)标志。然后,我们用:kibana discover可视化展示一下:上图中,kibana 采用默认浏览器时区。如果不做上面的 ingest 预处理实现,会怎么样呢?大家如果实现过,肯定会感触很深。需要我们在kibana中切换时间范围,才能找到之前写入的数据。ingest 预处理时区的好处:方便、灵活的实现了写入数据的时区转换。3.2 方案二:logstash 中间 filter 环节处理拿真实同步案例讲解一下时区处理:数据源端:Mysql;数据目的端:Elasticsearch;同步方式:logstash,本质借助:logstash_input_jdbc 插件同步;时区处理:logstash filter 环节 ruby 脚本处理。 如下只给出了中间 filter 环节的脚本:filter { ruby { code => "event.set('timestamp', event.get('publish_time').time.localtime + 8*60*60)" } ruby { code => "event.set('publish_time',event.get('timestamp'))" } mutate { remove_field => ["timestamp"] }}三行脚本含义,解释如下:第一行:将 publish_time 时间加 8 小时处理,赋值给 timestamp。publish_time 到了 logstash 已转成了 UTC 时区了。timestamp 类似似 C 语言中的交换两个数函数中的 temp 临时变量。第二行:将 timestamp 时间赋值给 publish_time。第三行:删除中转字段:timestamp。源数据Mysql 效果:如上两个截图,对比一下区别:publish_time 做了时区处理,两者时间已一致,都是东 8 区。update_time 未做时间处理,写入Elasticsearch 后由东8区时间 10:57:31 转为UTC时区时间 02:57:31,少了8小时。4、检索和聚合的时候指定时区假定我们写入ES前未做时区处理(实战环节常有的场景),但是检索或者聚合的时候想做时区处理可以吗?可以的,具体实现方式如下:POST testindex/_search?pretty{ "query": { "range": { "date": { "gte": "2020-01-01 00:00:00", "lte": "2020-01-03 23:59:59", "format": "yyyy-MM-dd HH:mm:ss", "time_zone": "+08:00" } }}, "size": 0, "aggs": { "per_day": { "date_histogram": { "calendar_interval": "day", "field": "date", "time_zone": "+08:00" } } }}如上示例中,整合了检索和聚合,有两个要点:要点1:range query 中指定时区检索。要点2:data_histogram 聚合中指定时区聚合。5、小结数据写入时间不一致、数据滞后8小时等时区问题的本质是:各个处理端时区不一致,写入源的时区、Kibana默认是本地时区(如中国为:东8区时区),而 logstash、Elasticsearch 是UTC时区。本文给出了两种写入前预处理的解决方案,方案一:基于管道预处理;方案二:基于logstash filter 中间环节过滤。两种方案各有利弊,预处理管道相对更轻量级,实战选型建议根据业务需求。本文最后指出在检索和聚合环节使用时区处理方式。大家在实战中有没有遇到时区问题,是怎么解决的呢?欢迎大家留言交流。参考https://t.zsxq.com/2nYnq76推荐1、Elasticsearch 7.X 进阶实战私训课2、如何系统的学习 Elasticsearch ?3、全网首发!《 Elasticsearch 最少必要知识教程 V1.0 》低调发布4、从实战中来,到实战中去——Elasticsearch 技能更快提升方法论5、刻意练习 Elasticsearch 10000 个小时,鬼知道经历了什么?!6、干货 | Logstash自定义正则表达式ETL实战7、干货 | Logstash Grok 数据结构化ETL实战
2、Elasticsearch Mapping 层面默认值认知前提:严格讲 Elasticsearch 是不支持 Mapping 层面设置数据类型的时候,设置字段的默认值的。有人会说,null value 设置算不算?不算。大家看一下:PUT my-index-000001{ "mappings": { "properties": { "status_code": { "type": "keyword", "null_value": "NULL" } } }}null_value 的本质是将“NULL” 替换 null 值,以使得空值可被索引或者检索。我们期望设置 Mapping 的时候,可以对各种数据类型添加一个任意指定的缺省值。但是 Elasticsearch Mapping 层面不支持,咋办?只能去寻找其他的方案。3、曲线救国实现 Elasticsearch 设置默认值直接给出答案,共三种设置默认值的。3.1 方案 一:pipeline 设置默认值# 创建 append 管道PUT _ingest/pipeline/add_default_pipeline{ "processors": [ { "set": { "field": "sale_count", "value": 1 } } ]}# 创建索引PUT customer{ "mappings":{ "properties":{ "sale_count":{ "type":"integer" }, "major":{ "type":"keyword", "null_value": "NULL" } } }, "settings": { "index":{ "default_pipeline":"add_default_pipeline" } }}插入数据,验证一把:POST customer/_doc/1{ "major":null}返回结果: "max_score" : 1.0, "hits" : [ { "_index" : "customer", "_type" : "_doc", "_id" : "1", "_score" : 1.0, "_source" : { "major" : null, "sale_count" : 1 } } ] }以上的方式,实现了sale_count 的默认值为1 的设置。是借助索引设计层面在 setting 中关联 default_pipeline 实现的。实现方式相对简单,能保证用户在设置索引的前提下,用户只关注写入数据,其他后台预处理管道帮助实现细节。引申一下,针对开篇提到的第二个问题:create_time 借助 pipeline 管道预处理 set processor 实现即可。PUT _ingest/pipeline/create_time_pipeline{ "description": "Adds create_time timestamp to documents", "processors": [ { "set": { "field": "_source.create_time", "value": "{{_ingest.timestamp}}" } } ]}DELETE my_index_0003PUT my_index_0003{ "settings": { "index.default_pipeline": "create_time_pipeline" }}POST my_index_0003/_doc/1{}GET my_index_0003/_searchupdate_time 自己维护更新,业务更新的时刻通过代码或者脚本加上时间戳就可以。3.2 方案 二:update_by_query 通过更新添加默认值POST customer/_doc/2{ "major":null}# 批量更新脚本POST customer/_update_by_query{ "script": { "lang": "painless", "source": "if (ctx._source.major == null) {ctx._source.major = 'student'}" }}POST customer/_search结果是:所有 major 为 null 的,都实现了更新,设置成了:“student"。该方式属于先写入数据,然后实现数据层面的更新,算作设置默认值甚至都有点勉强。3.3 方案 三:借助 pipeline script 更新PUT _ingest/pipeline/update_pipeline{ "processors": [ { "script": { "lang": "painless", "source": """ if (ctx['major'] == null) {ctx['major'] = 'student'} """ } } ]}POST customer/_doc/4{ "major":null}POST customer/_update_by_query?pipeline=update_pipeline{ "query": { "match_all": {} }}结果是:同方案二,也实现了更新。该方案是第二种方案的内卷版本,本质实现基本一致。强调细节不同点,ctx 取值的时候,细节语法不一样:脚本script 操作,访问方式:ctx._source.major。pipeline 预处理脚本操作:访问方式:ctx['major'] 。4、小结本文讲解了 Elasticsearch 实现类关系型数据库默认值的三种方案,只有第一种属于前置设置默认值。后两种都是先写入后设置默认值的脚本更新实现方案。实战方案选型,推荐方案一
在基于日志、指标、实时时间序列的大型系统中,集群的索引也具备类似上图中相通的属性,一个索引自创建之后,不可能无限期的存在下去, 从索引产生到索引“消亡”,也会经历:“生、老、病、死”的阶段。我们把索引的“生、老、病、死”的全过程类比称为索引的生命周期。2、什么是索引生命周期管理?由于自然规律,人会“不可逆转”的由小长到大,由大长到老,且理论上年龄一般不会超过 150 岁(吉尼斯世界纪录:122岁零164天)。索引不是人,理论上:一旦创建了索引,它可以一直存活下去(假定硬件条件允许,寿命是无限的)。索引创建后,它自身是相对静态的,没有“自然规律”牵引它变化,若放任其成长,它只会变成一个数据量极大的臃肿的“大胖子”。这里可能就会引申出来问题:若是时序数据的索引,随着时间的推移,业务索引的数据量会越来越大。但,基于如下的因素:集群的单个分片最大文档数上限:2 的 32 次幂减去 1(20亿左右)。索引最佳实践官方建议:分片大小控制在30GB-50GB,若索引数据量无限增大,肯定会超过这个值。索引大到一定程度,当索引出现健康问题,会导致真个集群核心业务不可用。大索引恢复的时间要远比小索引恢复慢的多得多。索引大之后,检索会很慢,写入和更新也会受到不同程度的影响。某些业务场景,用户更关心最近3天、最近7天的业务数据,大索引会将全部历史数据汇集在一起,不利于这种场景的查询。非常有必要对索引进行管理起来,不再放任其“野蛮长成体弱多病、潜在风险极大的大胖子”,而是限制其分阶段、有目标的、有规律的生长。这种分阶段、有目标的操作和实现,我们称为索引生命周期管理。3、索引生命周期管理的历史演变索引生命周期管理 (ILM) 是在 Elasticsearch 6.6(公测版)首次引入,在 6.7 版本正式推出的一项功能。ILM 是 Elasticsearch 的一部分,主要用来帮助用户管理索引。没有 ILM 之前索引生命周期管理基于:rollover + curator 实现。ILM 是早些年呼声非常高的功能之一,我印象中 2017 年南京的 meetup 中,就有公司说实现了类似的功能。Kibana 7.12.0 索引生命周期管理配置界面如下图所示:4、索引生命周期管理的前提本文演示试用版本:Elasticesarch:7.12.0,Kibana:7.12.0。集群规模:3节点,属性(node_roles)设置分别如下:节点 node-022:主节点+数据节点+热节点(Hot)。节点 node-023:主节点+数据节点+温节点(Warm)。节点 node-024:主节点+数据节点+冷节点(Cold)。4.1 冷热集群架构冷热架构也叫热暖架构,是“Hot-Warm” Architecture的中文翻译。冷热架构本质是给节点设置不同的属性,让每个节点具备了不同的属性。为演示 ILM,需要首先配置冷热架构,三个节点在 elasticsearch.yml 分别设置的属性如下:- node.attr.box_type: hot- node.attr.box_type: warm- node.attr.box_type: cold拿舆情数据举例,通俗解读如下:热节点(Hot):存放用户最关心的热数据。比如:最近3天的数据——近期大火的“曹县牛皮666,我的宝贝”。温节点(Warm):存放前一段时间沉淀的热数据,现在不再热了。比如:3-7天的热点事件——“特斯拉车顶事件”。冷节点(Cold):存放用户不太关心或者关心优先级低的冷数据,很久之前的热点事件。比如:7天前或者很久前的热点事件——去年火热的“后浪视频“、”马老师不讲武德”等。如果磁盘数量不足,冷数据是待删除优先级最高的。如果硬件资源不足,热节点优先配置为 SSD 固态盘。检索优先级最高的是热节点的数据,基于热节点检索数据自然比基于全量数据响应时间要快。更多冷热架构推荐:干货 | Elasticsearch 冷热集群架构实战。4.2 rollover 滚动索引实际Elasticsearch 5.X 之后的版本已经推出:Rollover API。Rollover API解决的是以日期作为索引名称的索引大小不均衡的问题。Rollover API对于日志类的数据非常有用,一般我们按天来对索引进行分割(数据量更大还能进一步拆分),没有Rollover之前,需要在程序里设置一个自动生成索引的模板。推荐阅读:干货 | Elasticsearch索引生命周期管理探索rollover 滚动索引实践一把:# 1、创建基于日期的索引PUT %3Cmy-index-%7Bnow%2Fd%7D-000001%3E{ "aliases": { "my-alias": { "is_write_index": true } }}# 2、批量导入数据PUT my-alias/_bulk{"index":{"_id":1}}{"title":"testing 01"}{"index":{"_id":2}}{"title":"testing 02"}{"index":{"_id":3}}{"title":"testing 03"}{"index":{"_id":4}}{"title":"testing 04"}{"index":{"_id":5}}{"title":"testing 05"}# 3、rollover 滚动索引POST my-alias/_rollover{ "conditions": { "max_age": "7d", "max_docs": 5, "max_primary_shard_size": "50gb" }}GET my-alias/_count# 4、在满足滚动条件的前提下滚动索引PUT my-alias/_bulk{"index":{"_id":6}}{"title":"testing 06"}# 5、检索数据,验证滚动是否生效GET my-alias/_search如上的验证结论是:{ "_index" : "my-index-2021.05.30-000001", "_type" : "_doc", "_id" : "5", "_score" : 1.0, "_source" : { "title" : "testing 05" } }, { "_index" : "my-index-2021.05.30-000002", "_type" : "_doc", "_id" : "6", "_score" : 1.0, "_source" : { "title" : "testing 06" } }_id 为 6 的数据索引名称变成了:my-index-2021.05.30-000002,实现了 后缀 id 自增。这里要强调下,索引滚动变化的三个核心条件:"max_age": "7d", 最长期限 7d,超过7天,索引会实现滚动。"max_docs": 5, 最大文档数 5,超过 5个文档,索引会实现滚动(测试需要,设置的很小)。"max_primary_shard_size": "50gb",主分片最大存储容量 50GB,超过50GB,索引就会滚动。注意,三个条件是或的关系,满足其中一个,索引就会滚动。4.3 shrink 压缩索引压缩索引的本质:在索引只读等三个条件的前提下,减少索引的主分片数。# 设置待压缩的索引,分片设置为5个。PUT kibana_sample_data_logs_ext{ "settings": { "number_of_shards":5 }}# 准备索引数据POST _reindex{ "source":{ "index":"kibana_sample_data_logs" }, "dest":{ "index":"kibana_sample_data_logs_ext" }}# shrink 压缩之前的三个必要条件PUT kibana_sample_data_logs_ext/_settings{ "settings": { "index.number_of_replicas": 0, "index.routing.allocation.require._name": "node-024", "index.blocks.write": true }}# 实施压缩POST kibana_sample_data_logs_ext/_shrink/kibana_sample_data_logs_shrink{ "settings": { "index.number_of_replicas": 0, "index.number_of_shards": 1, "index.codec": "best_compression" }, "aliases": { "kibana_sample_data_logs_alias": {} }}有图有真相:强调一下三个压缩前的条件,缺一不可:"index.number_of_replicas": 0副本设置为 0。"index.routing.allocation.require._name": "node-024" 分片数据要求都集中到一个独立的节点。"index.blocks.write": true索引数据只读。4.4 Frozen 冷冻索引为高效检索,核心业务索引都会保持在内存中,意味着内存使用率会变得很高。对于一些非业务必须、非密集访问的某些索引,可以考虑释放内存,仅磁盘存储,必要的时候再还原检索。这时候,就会用到 Frozen 冷冻索引。除了在内存中维护其元数据,冻结索引在集群上几乎没有开销,并且冷冻索引是只读的。具体使用如下:# 冷冻索引POST kibana_sample_data_logs_shrink/_freeze# 冷冻后,不能写入POST kibana_sample_data_logs_shrink/_doc/1{ "test":"12111"}# 冷冻后,能检索,但不返回具体数据,只返回0。POST kibana_sample_data_logs_shrink/_search# 解除冷冻POST kibana_sample_data_logs_shrink/_unfreeze# 解除冷冻后,可以检索和写入了POST kibana_sample_data_logs_shrink/_search综合上述拆解分析可知:有了:冷热集群架构,集群的不同节点有了明确的角色之分,冷热数据得以物理隔离,SSD 固态盘使用效率会更高。有了:rollover 滚动索引,索引可以基于文档个数、时间、占用磁盘容量滚动升级,实现了索引的动态变化。有了:Shrink 压缩索引、Frozen 冷冻索引,索引可以物理层面压缩、冷冻,分别释放了磁盘空间和内存空间,提高了集群的可用性。除此之外,还有:Force merge 段合并、Delete 索引数据删除等操作,索引的“生、老、病、死”的全生命周期的更迭,已然有了助推器。如上指令单个操作,非常麻烦和繁琐,有没有更为快捷的方法呢?有的!第一:命令行可以 DSL 大综合实现。第二:可以借助 Kibana 图形化界面实现。下面两小节会结合实例解读。5、Elasticsearch ILM 实战5.1 核心概念:不同阶段(Phrase)的功能点(Acitons)注意:仅在 Hot 阶段可以设置:Rollover 滚动。5.2 各生命周期 Actions 设定后两节演示要用。5.2.1 Hot 阶段基于:max_age=3天、最大文档数为5、最大size为:50gb rollover 滚动索引。设置优先级为:100(值越大,优先级越高)。5.2.2 Warm 阶段实现段合并,max_num_segments 设置为1.副本设置为 0。数据迁移到:warm 节点。优先级设置为:50。5.2.3 Cold 阶段冷冻索引数据迁移到冷节点5.2.4 Delete 阶段删除索引关于触发滚动的条件:Hot 阶段的触发条件:手动创建第一个满足模板要求的索引。其余阶段触发条件:min_age,索引自创建后的时间。时间类似:业务里面的 热节点保留 3 天,温节点保留 7 天,冷节点保留 30 天的概念。5.3 DSL 实战索引生命周期管理# step1: 前提:演示刷新需要PUT _cluster/settings{ "persistent": { "indices.lifecycle.poll_interval": "1s" }}# step2:测试需要,值调的很小PUT _ilm/policy/my_custom_policy_filter{ "policy": { "phases": { "hot": { "actions": { "rollover": { "max_age": "3d", "max_docs": 5, "max_size": "50gb" }, "set_priority": { "priority": 100 } } }, "warm": { "min_age": "15s", "actions": { "forcemerge": { "max_num_segments": 1 }, "allocate": { "require": { "box_type": "warm" }, "number_of_replicas": 0 }, "set_priority": { "priority": 50 } } }, "cold": { "min_age": "30s", "actions": { "allocate": { "require": { "box_type": "cold" } }, "freeze": {} } }, "delete": { "min_age": "45s", "actions": { "delete": {} } } } }}# step3:创建模板,关联配置的ilm_policyPUT _index_template/timeseries_template{ "index_patterns": ["timeseries-*"], "template": { "settings": { "number_of_shards": 1, "number_of_replicas": 0, "index.lifecycle.name": "my_custom_policy_filter", "index.lifecycle.rollover_alias": "timeseries", "index.routing.allocation.require.box_type": "hot" } }}# step4:创建起始索引(便于滚动)PUT timeseries-000001{ "aliases": { "timeseries": { "is_write_index": true } }}# step5:插入数据PUT timeseries/_bulk{"index":{"_id":1}}{"title":"testing 01"}{"index":{"_id":2}}{"title":"testing 02"}{"index":{"_id":3}}{"title":"testing 03"}{"index":{"_id":4}}{"title":"testing 04"}# step6:临界值(会滚动)PUT timeseries/_bulk{"index":{"_id":5}}{"title":"testing 05"}# 下一个索引数据写入PUT timeseries/_bulk{"index":{"_id":6}}{"title":"testing 06"}核心步骤总结如下:第一步:创建生周期 policy。第二步:创建索引模板,模板中关联 policy 和别名。第三步:创建符合模板的起始索引,并插入数据。第四步: 索引基于配置的 ilm 滚动。实现效果如下GIF动画(请耐心看完)5.4、Kibana 图形化界面实现索引生命周期管理步骤 1:配置 policy。步骤 2:关联模板。前提条件:模板要自己 DSL 创建,以便关联。PUT _index_template/timebase_template { "index_patterns": ["time_base-*"] }创建起始索引,指定别名和写入。PUT time_base-000001 { "aliases": { "timebase_alias": { "is_write_index": true } } }6、小结索引生命周期管理需要加强对三个概念的认知:横向——Phrase 阶段:Hot、Warm、Cold、Delete 等对应索引的生、老、病、死。纵向——Actions 阶段:各个阶段的动作。横向纵向整合的Policy:实际是阶段和动作的综合体。配置完毕Policy,关联好模板 template,整个核心工作就完成了80%。剩下就是各个阶段 Actions 的调整和优化了。实战表明:用 DSL 实现ILM 比图形化界面更可控、更便于问题排查。ILM 你实际生产环境使用了吗?效果如何?欢迎留言讨论。参考https://www.elastic.co/guide/en/elasticsearch/reference/current/example-using-index-lifecycle-policy.htmlhttps://ptran32.github.io/2020-08-08-hot-warm-cold-elasticsearch/https://www.elastic.co/cn/blog/implementing-hot-warm-cold-in-elasticsearch-with-index-lifecycle-management魏彬老师ET开源课堂
2、 Elasticsearch 支持的三种分页查询方式From + Size 查询Search After 查询Scroll 查询下面我就三种方式的联系与区别、优缺点、适用场景等展开进行解读。2.1 From + size 分页查询2.1.1 From + size 分页查询定义与实战案例如下基础查询:GET kibana_sample_data_flights/_search默认返回前10个匹配的匹配项。其中:from:未指定,默认值是 0,注意不是1,代表当前页返回数据的起始值。size:未指定,默认值是 10,代表当前页返回数据的条数。如下指定条件查询和排序:GET kibana_sample_data_flights/_search{ "from": 0, "size":20, "query": { "match": { "DestWeather": "Sunny" } }, "sort": [ { "FlightTimeHour": { "order": "desc" } } ]}共返回 20 条数据。其中:from + size 两个参数定义了结果页面显示数据的内容。2.1.2 From + size 查询优缺点及适用场景From + size 查询优点支持随机翻页。From + size 查询缺点受制于 max_result_window 设置,不能无限制翻页。存在深度翻页问题,越往后翻页越慢。From + size 查询适用场景第一:非常适合小型数据集或者大数据集返回 Top N(N <= 10000)结果集的业务场景。第二:类似主流 PC 搜索引擎(谷歌、bing、百度、360、sogou等)支持随机跳转分页的业务场景。2.1.3 深度翻页不推荐使用 From + sizeElasticsearch 会限制最大分页数,避免大数据量的召回导致性能低下。Elasticsearch 的 max_result_window 默认值是:10000。也就意味着:如果每页有 10 条数据,会最大翻页至 1000 页。实际主流搜索引擎都翻不了那么多页,举例:百度搜索“上海”,翻到第 76 页,就无法再往下翻页了,提示信息如下截图所示:GET kibana_sample_data_flights/_search{ "from": 0, "size":10001}GET kibana_sample_data_flights/_search{ "from": 10001, "size":10}报错如下:{ "error" : { "root_cause" : [ { "type" : "illegal_argument_exception", "reason" : "Result window is too large, from + size must be less than or equal to: [10000] but was [10001]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting." } ],什么原因?超过了最大窗口的限制,index.max_result_window 默认值为10000。报错信息还同时给出了两个解决方案:方案一:大数据集召回数据使用:scroll api。后面会详细讲解。方案二:调大 index.max_result_window 默认值。PUT kibana_sample_data_flights/_settings{ "index.max_result_window":50000}官方建议:避免过度使用 from 和 size 来分页或一次请求太多结果。不推荐使用 from + size 做深度分页查询的核心原因:搜索请求通常跨越多个分片,每个分片必须将其请求的命中内容以及任何先前页面的命中内容加载到内存中。对于翻页较深的页面或大量结果,这些操作会显著增加内存和 CPU 使用率,从而导致性能下降或节点故障。什么意思呢?GET kibana_sample_data_flights/_search{ "from": 10001, "size": 10}共 10 条数据加载到内存吗?不是的!共:10011 条数据加载到内存,然后经过后台处理后返回了最后 10 条我们想要的数据。那也就意味着,越往后翻页(也就是深度翻页)需要加载的数据量越大,势必会越耗费 CPU + 内存资源,响应也会越慢!2.2 search_after 查询2.2.1 search_after 查询定义与实战案例search_after 查询本质:使用前一页中的一组排序值来检索匹配的下一页。前置条件:使用 search_after 要求后续的多个请求返回与第一次查询相同的排序结果序列。也就是说,即便在后续翻页的过程中,可能会有新数据写入等操作,但这些操作不会对原有结果集构成影响。如何实现呢?可以创建一个时间点 Point In Time(PIT)保障搜索过程中保留特定事件点的索引状态。Point In Time(PIT)是 Elasticsearch 7.10 版本之后才有的新特性。PIT的本质:存储索引数据状态的轻量级视图。如下示例能很好的解读 PIT 视图的内涵。# 创建 PITPOST kibana_sample_data_logs/_pit?keep_alive=1m# 获取数据量 14074POST kibana_sample_data_logs/_count# 新增一条数据POST kibana_sample_data_logs/_doc/14075{ "test":"just testing"}# 数据总量为 14075POST kibana_sample_data_logs/_count# 查询PIT,数据依然是14074,说明走的是之前时间点的视图的统计。POST /_search{ "track_total_hits": true, "query": { "match_all": {} }, "pit": { "id": "48myAwEXa2liYW5hX3NhbXBsZV9kYXRhX2xvZ3MWM2hGWXpxLXFSSGlfSmZIaXJWN0dxUQAWdG1TOWFMTF9UdTZHdVZDYmhoWUljZwAAAAAAAAEN3RZGOFJCMGVrZVNndTk3U1I0SG81V3R3AAEWM2hGWXpxLXFSSGlfSmZIaXJWN0dxUQAA" }}有了 PIT,search_after 的后续查询都是基于 PIT 视图进行,能有效保障数据的一致性。search_after 分页查询可以简单概括为如下几个步骤。步骤 1:创建 PIT 视图,这是前置条件不能省。# Step 1: 创建 PITPOST kibana_sample_data_logs/_pit?keep_alive=5m返回结果如下:{ "id" : "48myAwEXa2liYW5hX3NhbXBsZV9kYXRhX2xvZ3MWM2hGWXpxLXFSSGlfSmZIaXJWN0dxUQAWdG1TOWFMTF9UdTZHdVZDYmhoWUljZwAAAAAAAAEg5RZGOFJCMGVrZVNndTk3U1I0SG81V3R3AAEWM2hGWXpxLXFSSGlfSmZIaXJWN0dxUQAA"}keep_alive=5m,类似scroll的参数,代表视图保留时间是 5 分钟,超过 5 分钟执行会报错如下: "type" : "search_context_missing_exception", "reason" : "No search context found for id [91600]"步骤 2:创建基础查询语句,这里要设置翻页的条件。# Step 2: 创建基础查询GET /_search{ "size":10, "query": { "match" : { "host" : "elastic" } }, "pit": { "id": "48myAwEXa2liYW5hX3NhbXBsZV9kYXRhX2xvZ3MWM2hGWXpxLXFSSGlfSmZIaXJWN0dxUQAWdG1TOWFMTF9UdTZHdVZDYmhoWUljZwAAAAAAAAEg5RZGOFJCMGVrZVNndTk3U1I0SG81V3R3AAEWM2hGWXpxLXFSSGlfSmZIaXJWN0dxUQAA", "keep_alive": "1m" }, "sort": [ {"response.keyword": "asc"} ]}设置了PIT,检索时候就不需要再指定索引。id 是基于步骤1 返回的 id 值。排序 sort 指的是:按照哪个关键字排序。在每个返回文档的最后,会有两个结果值,如下所示: "sort" : [ "200", 4 ]其中,“200”就是我们指定的排序方式:基于 {"response.keyword": "asc"} 升序排列。而 4 代表什么含义呢?4 代表——隐含的排序值,是基于_shard_doc 的升序排序方式。官方文档把这种隐含的字段叫做:tiebreaker (决胜字段),tiebreaker 等价于_shard_doc。tiebreaker 本质含义:每个文档的唯一值,确保分页不会丢失或者分页结果数据出现重复(相同页重复或跨页重复)。步骤3:实现后续翻页。# step 3 : 开始翻页GET /_search{ "size": 10, "query": { "match" : { "host" : "elastic" } }, "pit": { "id": "48myAwEXa2liYW5hX3NhbXBsZV9kYXRhX2xvZ3MWM2hGWXpxLXFSSGlfSmZIaXJWN0dxUQAWdG1TOWFMTF9UdTZHdVZDYmhoWUljZwAAAAAAAAEg5RZGOFJCMGVrZVNndTk3U1I0SG81V3R3AAEWM2hGWXpxLXFSSGlfSmZIaXJWN0dxUQAA", "keep_alive": "1m" }, "sort": [ {"response.keyword": "asc"} ], "search_after": [ "200", 4 ]}后续翻页都需要借助 search_after 指定前一页的最后一个文档的 sort 字段值。如下代码所示: "search_after": [ "200", 4 ]显然,search_after 查询仅支持向后翻页。2.2.2 search_after 查询优缺点及适用场景search_after 优点不严格受制于 max_result_window,可以无限制往后翻页。ps:不严格含义:单次请求值不能超过 max_result_window;但总翻页结果集可以超过。search_after 缺点只支持向后翻页,不支持随机翻页。search_after 适用场景类似:今日头条分页搜索 https://m.toutiao.com/search不支持随机翻页,更适合手机端应用的场景。2.3 Scroll 遍历查询2.3.1 Scroll 遍历查询定义与实战案例相比于 From + size 和 search_after 返回一页数据,Scroll API 可用于从单个搜索请求中检索大量结果(甚至所有结果),其方式与传统数据库中游标(cursor)类似。如果把 From + size 和 search_after 两种请求看做近实时的请求处理方式,那么 scroll 滚动遍历查询显然是非实时的。数据量大的时候,响应时间可能会比较长。scroll 核心执行步骤如下:步骤 1:指定检索语句同时设置 scroll 上下文保留时间。实际上,scroll 已默认包含了 search_after 的PIT 的视图或快照功能。从 Scroll 请求返回的结果反映了发出初始搜索请求时索引的状态,类似在那一个时刻做了快照。随后对文档的更改(写入、更新或删除)只会影响以后的搜索请求。POST kibana_sample_data_logs/_search?scroll=3m{ "size": 100, "query": { "match": { "host": "elastic" } }}步骤 2:向后翻页继续获取数据,直到没有要返回的结果为止。POST _search/scroll { "scroll" : "3m", "scroll_id":"FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFkY4UkIwZWtlU2d1OTdTUjRIbzVXdHcAAAAAAAGmkBZ0bVM5YUxMX1R1Nkd1VkNiaGhZSWNn" }scroll_id 值是步骤 1 返回的结果值。2.3.2 Scroll 遍历查询优缺点及适用场景 scroll 查询优点支持全量遍历。ps:单次遍历的 size 值也不能超过 max_result_window 大小。 scroll 查询缺点响应时间非实时。保留上下文需要足够的堆内存空间。scroll 查询适用场景全量或数据量很大时遍历结果数据,而非分页查询。官方文档强调:不再建议使用scroll API进行深度分页。如果要分页检索超过 Top 10,000+ 结果时,推荐使用:PIT + search_after。3、小结From+ size:需要随机跳转不同分页(类似主流搜索引擎)、Top 10000 条数据之内分页显示场景。search_after:仅需要向后翻页的场景及超过Top 10000 数据需要分页场景。Scroll:需要遍历全量数据场景 。max_result_window:调大治标不治本,不建议调过大。PIT:本质是视图。本文说法有不严谨的地方,以官方文档为准。欢迎大家就自己的分页实践进行留言讨论。参考:1. https://coralogix.com/log-analytics-blog/how-to-optimize-your-elasticsearch-queries-using-pagination2. https://www.javatpoint.com/elasticsearch-pagination3. https://www.elastic.co/guide/en/elasticsearch/reference/7.12/paginate-search-results.html
链接问题2:多套系统使用一套集群,错误日志如下{"message": "failed to execute pipeline for a bulk request" , "stacktrace": ["org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution of org.elasticsearch.ingest.IngestService$4@5b522103 on EsThreadPoolExecutor[name = node-2/write, queue capacity = 1024, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@19bdbd79[Running, pool size = 5, active threads = 5, queued tasks = 1677, ]]",针对问题 2,初步排查日志,是大量日志写入造成队列满了,造成集群直接拒绝写入了。问题 2 初步解决方案:修改默认值、扩大队列,根据业务后续持续观察队列大小,不再出现上述情形。问题 1、2 都会引出:Elasticsearch 线程池和队列知识点。2、线程池概览Elasticsearch 使用线程池(Thread pool )来管理请求并优化集群中每个节点上的资源使用。3、线程池用途主要线程池包括:搜索(search)、获取(get)和写入(write)等。通过运行以下命令可以看到线程池全貌:GET /_cat/GET /_cat/thread_pool/?v&h=id,name,active,rejected,completed,size,type&pretty&s=type其中:name:代表某一种线程池(写入、检索、刷新或其他)。type:代表线程数类型。通过运行上面的命令可以看到每个节点都有许多不同的线程池、线程池的大小和类型,还可以看到哪些节点拒绝了操作。Elasticsearch根据每个节点中检测到的线程数(number of processors,后面会讲到这个参数)自动配置线程池参数。4、线程池类型4.1 Fixed 类型固定数量的线程,具有固定的队列大小。Fixed 类型线程使用示例如下:thread_pool: write: size: 30 queue_size: 10004.2 Scaling 类型可变数量的线程,Elasticsearch会根据工作负载自动调节线程大小(值介于:core 到 max 之间)。Scaling 类型线程使用示例如下:thread_pool: warmer: core: 1 max: 84.3 fixed_autoqueue_size 线程固定数量的线程,队列大小会动态变化以保持目标响应时间。该功能 8.0+ 版本会废弃,这里也不着重讲解。fixed_autoqueue_size 类型线程使用示例如下:强调了队列大小可变。thread_pool: search: size: 30 queue_size: 500 min_queue_size: 10 max_queue_size: 1000 auto_queue_frame_size: 2000 target_response_time: 1s5、线程池使用举例若要查看哪些线程 CPU 利用率高或花费的时间最长,可以使用以下查询。GET /_nodes/hot_threads该 API 有助于排查性能问题。更多推荐:深入解读 Elasticsearch 热点线程 hot_threads6、线程池和队列认知认知 1:必要时设置:processors值得注意的是,线程池是根据 Elasticsearch 在基础硬件上检测到的线程数(number of processors)设置的。如果检测失败,则应在 elasticsearch.yml 中显式设置硬件中可用的线程数。特别是在一台宿主机配置多个 Elasticsearch 节点实例的情况下,若要修改其中一个节点线程池或队列大小,则要考虑配置 processors 参数。elasticsearch.yml 中设置如下所示:processors: 4PS:Linux 查看线程数方法:grep 'processor' /proc/cpuinfo | sort -u | wc -l认知 2:线程池关联队列设置大多数线程池还具有与之关联的队列,以使 Elasticsearch 可以将请求存储在内存中,同时等待资源变得可用来处理请求。但是,队列通常具有有限的大小,如果超过该大小,Elasticsearch将拒绝该请求。认知 3:很糟糕做法——盲目修改队列大小有时你可能会增加队列的大小以防止请求被拒绝,但要结合资源实际进行修改,千万别盲目修改。实际上,如果值设置的非常大,甚至可能适得其反。因为通过设置更大的队列大小,该节点将需要使用更多的内存来存储队列,这就意味着将剩下相对较少的内存来响应和管理实际请求。此外,增加队列大小还会增加将操作响应保留在队列中的时间长度,从而导致客户端应用程序面临超时问题。以下的莽撞行为,大家是要实战中避免的。认知 4:加强监控通常,唯一有需要增加队列大小的情况是:在请求数量激增导致无法在客户端管理此过程且资源使用率并未达到峰值。你可以借助 Kibana Stack Monitoring 可视化监控指标以更好地了解 Elasticsearch 集群的性能。Kibana 监控面板中的总视图、节点视图、索引视图如下所示:总视图监控节点视图监控索引视图监控上图是:Kibana 7.6 版本中的监控截图,标红的地方是我在批量写入数据。search Rate:检索速率search Latency:检索延时indexing Rate:写入速度indexing Latency:写入延时队列的增加(Growing)表明 Elasticsearch难以满足请求,而拒绝(rejection)则表明队列已经增长到 Elasticsearch 拒绝的程度。需要检查导致队列增加的根本原因,并尝试通过在客户端减轻相关写入或检索操作来平衡对集群线程池的压力。7、线程池线上实战问题及注意事项7.1 线程池和队列修改需要更改配置文件 elasticsearch.yml节点级别配置,而不再支持 5.X 之前的版本动态 setting 修改。重启集群后生效。7.2 reject 拒绝请求的原因有多种类似文章开头问题 2,如果 Elasticsearch 集群开始拒绝索引/写入(index)请求,则可能有多种原因。通常,这表明一个或多个节点无法跟上索引 / 删除 / 更新 / 批量请求的数量,从而导致在该节点上建立队列且队列逐渐累积。一旦索引队列超过队列的设置的最大值(如 elasticsearch.yml 定义的值或者默认值),则该节点将开始拒绝索引请求。排查方法:需要检查线程池的状态,以查明索引拒绝是否总是在同一节点上发生,还是分布在所有节点上。GET /_cat/thread_pool?v如果 reject 仅发生在特定的数据节点上,那么您可能会遇到负载平衡或分片问题。如果 reject 与高 CPU 利用率相关联,那么通常这是 JVM 垃圾回收的结果,而 JVM 垃圾回收又是由配置或查询相关问题引起的。如果集群上有大量分片,则可能存在过度分片的问题。如果观察到节点上的队列拒绝,但监控发现 CPU 未达到饱和,则磁盘写入速度可能存在问题。7.3 写入 bulk 值要递进步长调优不要妄图快速提高写入速度,一下调很大,很大势必会写入 reject。首先尝试一次索引 100 个文档,然后索引 200 个,再索引 400 个,依此类推......当索引写入速度(indexing rate)开始趋于平稳时,便知道已达到数据批量请求的最佳大小。8、小结写入 reject、“429 too many requests” 等都是非常常见的错误,问题多半和线程池和队列大小有关系,需要结合业务场景进行问题排查。本文“抛砖引玉”,给出线程池和队列相关总结知识。您在实战中遇到的类似问题吗?欢迎留言探讨交流。参考https://drscg.tistory.com/640 https://opster.com/elasticsearch-glossary/index-queue-size-is-high/https://opster.com/elasticsearch-glossary/elasticsearch-threadpool/https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-threadpool.html https://opster.com/analysis/elasticsearch-requested-thread-pool-size-for-is-too-large-setting-to-maximum-instead/推荐:全网首发!《 Elasticsearch 最少必要知识教程 V1.0 》低调发布从实战中来,到实战中去——Elasticsearch 技能更快提升方法论Elasticsearch写入原理深入详解Elasticsearch性能优化实战指南让Elasticsearch飞起来!——性能优化实践干货
链接让用户关注程度弱的“冷”数据分散到普通磁盘对应节点上?也就是说“冷热”数据分离是本文讨论的内容。提到冷热集群架构,通常我们关注的问题如下:什么是冷热集群架构,Elasticsearch支持吗?Elasticsearch集群如何设置冷热节点?Elasticsearch集群如何根据数据冷热度写入到不同的节点?当数据不“热”时,如何将数据迁移到“冷”节点?本文在Elasticsearch7.3版本上一 一给出解答。1、什么是冷热架构?官方叫法:热暖架构——“Hot-Warm” Architecture。通俗解读:热节点存放用户最关心的热数据;温节点或者冷节点存放用户不太关心或者关心优先级低的冷数据或者暖数据。图片来源cnblog 毛台的博客1.1 官方解读冷热架构冷热架构是一项十分强大的功能,能够让您将 Elasticsearch 部署划分为“热”数据节点和“冷”数据节点。热数据节点处理所有新输入的数据,并且存储速度也较快,以便确保快速地采集和检索数据。冷节点的存储密度则较大,如需在较长保留期限内保留日志数据,不失为一种具有成本效益的方法。将这两种类型的数据节点结合到一起后,您便能够有效地处理输入数据,并将其用于查询,同时还能在节省成本的前提下在较长时间内保留数据。此架构对日志用例来说尤其大有帮助,因为在日志用例中,人们的大部分精力都会专注于近期的日志(例如最近两周),而较早的日志(由于合规性或者其他原因仍需要保留)则可以接受较慢的查询时间。1.2 典型应用场景一句话:在成本有限的前提下,让客户关注的实时数据和历史数据硬件隔离,最大化解决客户反应的响应时间慢的问题。业务场景描述:每日增量6TB日志数据,高峰时段写入及查询频率都较高,集群压力较大,查询ES时,常出现查询缓慢问题。ES集群的索引写入及查询速度主要依赖于磁盘的IO速度,冷热数据分离的关键为使用SSD磁盘存储热数据,提升查询效率。若全部使用SSD,成本过高,且存放冷数据较为浪费,因而使用普通SATA磁盘与SSD磁盘混搭,可做到资源充分利用,性能大幅提升的目标。2、最最核心的实现原理借助:Elasticsearch的分片分配策略,确切的说是:第一:集群节点层面支持规划节点类型,这是划分热暖节点的前提。第二:索引层面支持将数据路由到给定节点,这为数据写入冷、热节点做了保障。Shard allocation awarenesshttps://www.elastic.co/guide/en/elasticsearch/reference/current/allocation-awareness.htmlIndex-level shard allocation filteringhttps://www.elastic.co/guide/en/elasticsearch/reference/current/shard-allocation-filtering.html#3、7.X版本ES实践一把第一:搭建一个两个节点的集群,划分热、热节点用。节点层面设置节点类型。热节点指定:node.attr.hotwarm_type: hot1暖节点或冷节点指定:node.attr.hotwarm_type: warm1第二:写入操作方案一:索引层面指定路由。PUT /logs_2019-10-01{ "settings": { "index.routing.allocation.require.hotwarm_type": "hot", "number_of_replicas": 0 }}PUT /logs_2019-08-01{ "settings": { "index.routing.allocation.require.hotwarm_type": "warm", "number_of_replicas": 0 }}12345678910111213141516方案二:通过模板指定索引的冷热存储。PUT _template/logs_2019-08-template{ "index_patterns": "logs_2019-08-*", "settings": { "index.number_of_replicas": "0", "index.routing.allocation.require.hotwarm_type": "warm" }}PUT _template/logs_2019-10-template{ "index_patterns": "logs_2019-10-*", "settings": { "index.number_of_replicas": "0", "index.routing.allocation.require.hotwarm_type": "hot" }}第三:效果图详见附件图。第四:借助curator定期迁移数据随着时间发展,当前数据会成为历史数据。历史数据要自动切换到普通磁盘的节点存储,可以借助curator实现。cuator的安装不再追溯,详细请参考官方文档。https://www.elastic.co/guide/en/elasticsearch/client/curator/5.8/command-line.html步骤1:定义cuator.yml,填写Elasticsearch集群配置信息。步骤2:定义action.yml。actions: 1: action: allocation description: >- Apply shard allocation routing to 'require' 'tag=cold' for hot/cold node setup for logstash- indices older than 3 days, based on index_creation date options: key: hotwarm_type value: warm allocation_type: require disable_action: false filters: - filtertype: pattern kind: prefix value: logs_ - filtertype: age source: name direction: older timestring: "%Y-%m-%d" unit: days unit_count: 312345678910111213141516171819202122步骤3:执行迁移。C:\Program Files\elasticsearch-curator>curator.exe --config .\conf\curator.yml .\conf\action.yml2019-10-13 22:28:31,662 INFO Preparing Action ID: 1, "allocation"2019-10-13 22:28:31,662 INFO Creating client object and testing connection2019-10-13 22:28:31,668 INFO Instantiating client object2019-10-13 22:28:31,668 INFO Testing client connectivity2019-10-13 22:28:31,675 INFO Successfully created Elasticsearch client object with provided settings2019-10-13 22:28:31,677 INFO Trying Action ID: 1, "allocation": Apply shard allocation routing to 'require' 'tag=cold' f....2019-10-13 22:28:31,706 INFO Updating 2 selected indices: ['logs_2019-08-01', 'logs_2019-10-01']2019-10-13 22:28:31,706 INFO Updating index setting {'index.routing.allocation.require.hotwarm_type': 'warm'}2019-10-13 22:28:32,559 INFO Action ID: 1, "allocation" completed.2019-10-13 22:28:32,560 INFO Job completed.4、坑:node.attr.hotwarm_type:1单纯搜索官网你是找不到的。因为:node.attr.*,你可以指定type类型、各种结合业务场景你的需要指定的值。包括:官方的:按照磁盘大小设定。和咱们的冷热节点。白话文:就是标定节点划分分类的一个属性类型值。这个坑网友也有疑惑:node属性(tag)如何设置,查资料看到了好几种方法很混乱 - Elastic 中文社区,官方文档说的不是特别清楚。5、线上使用场景来自星友的线上实战反馈如下:我们现有的架构也是冷热分离。热节点使用的是ssd,indexing和search性能都不错,其中保存4天的数据,4天之后数据推到warm节点。warm节点使用的是hdd。在运维过程中,能体会到这种架构的特点是:冷节点或者热节点的离群不会影响另外一个种类型节点的功能;但是如果整个集群中有节点产生stw(Java中一种全局暂停现象,全局停顿,所有Java代码停止,native代码可以执行,但不能与JVM交互;这些现象多半是由于gc引起。),整个集群的性能都会被影响;这种架构能在相对节约成本的前提下极大的提升性能,但是不能完全做到一种类型节点的故障对其他类型节点是无感的。6、小结Elasticsearch6.6版本后已推出索引生命周期管理ilm功能。涵盖了冷热集群的部署和自动化实现。最新实现参考:https://www.elastic.co/guide/en/elasticsearch/reference/7.4/index-lifecycle-management.html官方最早2015年的博客就提到了冷热集群架构的实现,但“再显而易见的道理,也有80%的人可能不知道”并考虑到大家使用场景的参差不齐,才梳理出本篇文章。你的集群使用冷热架构了吗? 欢迎交流。7、Good 参考深入学习1)最新冷热架构官方文档:https://www.elastic.co/cn/blog/deploying-a-hot-warm-logging-cluster-on-the-elasticsearch-service2)最多参考冷热架构文档:https://www.elastic.co/cn/blog/hot-warm-architecture-in-elasticsearch-5-x3)国内最佳实践:elasticsearch冷热数据读写分离https://elasticsearch.cn/article/6127
链接本文在官方文档基础上,结合实际业务场景,在Elasticsearch7.3环境下进行脚本使用解读。1、官方scripting使用建议Avoid scripts——In general, scripts should be avoided.If they are absolutely needed, you should prefer the painless and expressions engines.ebay在性能优化实践中也强调(本文做了扩展延伸):避免使用脚本查询(script query)计算动态字段。例如:我们有一个包含大量剧院信息的索引,我们需要查询以"Down"开头的所有剧院。你可能运行一个如下脚本查询:1POST seats/_search2{3 "query": {4 "bool":{5 "filter": { 6 "script":{7 "script":{8 "lang":"painless",9 "source": "doc['theatre'].value.startsWith('Down')"10 }11 }12 }13 }14 }15}这个查询非常耗费资源,并且减慢整个系统。解决方案:方案一:prefix前缀匹配;实测性能:prefix较scripting性能提升5倍。方案二:索引时考虑添加一个名为“theatre_prefix”的keyword类型字段。然后我们可以查询"theatre_prefix":"Down"。2、ES Scripting历史版本 使用脚本< Elasticsearch 1.4MVEL 脚本< Elasticsearch 5.0Groovy 脚本‘>= Elasticsearch 5.0painless 脚本Groovy 的出现是解决MVEL的安全隐患问题;但Groovy仍存在内存泄露+安全漏洞问题,painless脚本的官宣时间:2016年9月21日。看似很新,截止目前,已经三年左右时间了。正如其名字:无痛。painless的出现是为了用户更方便、高效的使用脚本。https://www.elastic.co/cn/blog/painless-a-new-scripting-language3、Painless Scripting 简介Painless是一种简单,安全的脚本语言,专为与Elasticsearch一起使用而设计。它是Elasticsearch的默认脚本语言,可以安全地用于内联和存储脚本。Painless特点:性能牛逼:Painless脚本运行速度比备选方案(包括Groovy)快几倍。安全性强:使用白名单来限制函数与字段的访问,避免了可能的安全隐患。可选输入:变量和参数可以使用显式类型或动态def类型。上手容易:扩展了java的基本语法,并兼容groove风格的脚本语言特性。特定优化:是ES官方专为Elasticsearch脚本编写而设计。4、Scripting 应用场景认知前提:增删改查能解决业务场景80%的问题,Painless脚本操作一般应用于相对复杂的业务场景中。常见场景举例如下:自定义字段自定义评分自定义更新自定义reindex聚合其他自定义操作5、Scripting 使用模板心中有模板,脚本认知就有了“套路”。1"script": {2 "lang": "...", 3 "source" | "id": "...", 4 "params": { ... } 5 }lang:代表language脚本语言,默认指定为:painless。source:脚本的核心部分,id应用于:stored script。params:传递给脚本使用的变量参数。6、Scripting 实战6.1 自定义字段举例:返回原有Mapping未定义的字段值。如:以my_doubled_field返回my_field字段的翻倍后的结果。1GET my_index/_search2{3 "script_fields": {4 "my_doubled_field": {5 "script": {6 "lang": "expression",7 "source": "doc['my_field'] * multiplier",8 "params": {9 "multiplier": 210 }11 }12 }13 }14}注意:这里脚本语言选择的expression,下一节讲解。如:返回日期字段中的“年”或“月”或“日”等。1GET hockey/_search2{3 "script_fields": {4 "birth_year": {5 "script": {6 "source": "doc.born.value.year"7 }8 }9 }10}6.2 自定义评分1GET my_index/_search2{3 "query": {4 "function_score": {5 "query": {6 "match": {7 "text": "quick brown fox"8 }9 },10 "script_score": {11 "script": {12 "lang": "expression",13 "source": "_score * doc['popularity']"14 }15 }16 }17 }18}6.3 自定义更新Update:将已有字段值赋值给其他字段。1POST hockey/_update/12{3 "script": {4 "lang": "painless",5 "source": """6 ctx._source.last = params.last;7 ctx._source.nick = params.nick8 """,9 "params": {10 "last": "gaudreau",11 "nick": "hockey"12 }13 }14}Update_by_query:满足b开头(注意正则)的字段,末尾添加matched。1POST hockey/_update_by_query2{3 "script": {4 "lang": "painless",5 "source": """6 if (ctx._source.last =~ /b/) {7 ctx._source.last += "matched";8 } else {9 ctx.op = "noop";10 }11 """12 }13}6.4 自定义reindexElasticsearch认证考试题:有index_a包含一些文档, 要求创建索引index_b,通过reindex api将index_a的文档索引到index_b。要求:1)增加一个整形字段,value是index_a的field_x的字符长度;2)再增加一个数组类型的字段,value是field_y的词集合。(field_y是空格分割的一组词,比方"foo bar",索引到index_b后,要求变成["foo", "bar"])1POST _reindex2{3 "conflicts": "proceed",4 "source": {5 "index": "index_a"6 },7 "dest": {8 "index": "index_b"9 },10 "script": {11 "source": "ctx._source.parts = / /.split(ctx._source.address); ctx._source.tag = ctx._source.city.length();"12 }13}语法参考:https://www.elastic.co/guide/en/elasticsearch/painless/7.3/painless-regexes.html6.5 聚合1GET /_search2{3 "aggs" : {4 "genres" : {5 "terms" : {6 "script" : {7 "source": "doc['genre'].value",8 "lang": "painless"9 }10 }11 }12 }1314}6.6 其他自定义操作需要结合业务去实践。7、常见坑及问题7.1 脚本只有Painless吗?显然不是,第6节用到的expression 是Lucene’s expressions 脚本语言。还可以基于脚本引擎自己开发插件实现,https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-engine.html7.2 怎么界定是expressions 还是Painless?"lang": "painless","lang": "expressions ",是唯一区分。7.3 使用painless就百分之百“无痛”,无漏洞后顾之忧了吗?凡事不能绝对。核心注意点:第一:不要root账户下运行Elasticsearch。第二:不要公开ES路径给其他用户。第三:不要公开ES路径到互联网。实战推荐:1、用户在搜索框中键入文本,文本将直接发送到后台的match、match_phrase、Simple query string或 Suggesters.2、作为应用程序开发过程的一部分(而非全部)开放上述查询的脚本。3、使用用户提供的参数运行脚本。4、文档固定的Mapping结构。不推荐:1、用户可以编写任意scripts, queries(检索), _search requests(search请求)。2、文档结构可以用户自定义。8、小结本文讲解了脚本的发展历史、使用场景、应用实战,但相比于实际业务的复杂需求仍然是九牛一毛。实战中,肯定还会遇到这样、那样的问题。一方面:欢迎留言交流。另一方面:多研读官方文档,很多细节值得深究。参考:https://www.elastic.co/guide/en/elasticsearch/reference/current/tune-for-search-speed.htmlhttps://www.infoq.cn/article/elasticsearch-performance-tuning-practice-at-ebayhttps://github.com/laoyang360/deep_elasticsearch/blob/master/es_dsl_study/6.scripting.mdhttps://github.com/elastic/elasticsearch/issues/19396https://www.youtube.com/watch?v=3FLEJJ8PsM4https://blog.csdn.net/u013613428/article/details/78134170
链接实际业务实战中,大家或多或少的都会遇到导入、导出问题。根据数据源的不同,基本可以借助:1、程序写入2、数据同步(logstash/flume/cana/es_hadoopl等)来实现关系型数据库(如:Oracle、mysql)、非关系型数据库(如:Mongo、Redis)、大数据(Hadoop、Spark、Hive)到Elasticsearch的写入。而数据的导出,一部分是业务场景需要,如:业务系统中支持检索结果导出为CSV、Json格式等。还有一部分是分析数据的需求:期望借助Kibana工具将仪表盘聚合结果导出、不需要借助程序尽快将满足给定条件的结果数据导出等。这些快速导出的需求,最好借助插件或者第三方工具实现。本文将重点介绍Kibana/Elasticsearch高效导出的插件、工具集。2、期望导出数据格式一般期望导出:CSV、Json格式。3、Kibana导出工具3.1 Kibana 官方导出步骤1:点击Kibana;步骤2:左侧选择数据,筛选字段;步骤3:右侧点击:share->csv reports。步骤4:菜单栏:选择Management->Reporting->下载。以上是kibana6.5.4的实操截图。其他常见报表数据导出:注意:建议7.X以上版本使用。低版本不支持。4、Elasticsearch导出工具4.1 es2csv1、简介:用Python编写的命令行实用程序,用于以Lucene查询语法或查询DSL语法查询Elasticsearch,并将结果作为文档导出到CSV文件中。es2csv 可以查询多个索引中的批量文档,并且只获取选定的字段,这可以缩短查询执行时间。2、地址:https://pypi.org/project/es2csv/3、使用方法:es2csv -u 192.168.1.1:9200 -q '{"_source":{"excludes":["*gxn",,"*kex","vperxs","lpix"]},"query":{"term":{"this_topic":{"value":41}}}}' -r -i sogou_topic -o ~/export.csv14、使用效果:官方最新更新支持5.X版本,实际验证6.X版本也可以使用,导出效率高。5、推荐指数:五星,Elasticsearch导出CSV首选方案。4.2 elasticsearch-dump1、简介:Elasticsearch导入导出工具。支持操作包含但不限于:1)、数据导出导出索引、检索结果、别名或模板为Json导出索引为gzip支持导出大文件切割为小文件支持统一集群不同索引间或者跨索引数据拷贝2)、数据导入支持Json数据、S3数据导入Elasticsearch。2、地址:https://github.com/taskrabbit/elasticsearch-dump3、使用方法:elasticdump \ --input=http://production.es.com:9200/my_index \ --output=query.json \ --searchBody='{"query":{"term":{"username": "admin"}}}'1234如上,将检索结果导出为json文件。更多导入、导出详见github介绍。4、使用效果:早期1.X版本没有reindex操作,使用elasticdump解决跨集群数据备份功能。效果可以。5、推荐指数:五星。Elasticsearch导出json首选方案。4.3 logstash_output_csv步骤1:安装logstash_output_csv工具:D:\ogstash-6.5.4\bin>logstash-plugin.bat install logstash-output-csvValidating logstash-output-csvInstalling logstash-output-csvInstallation successful1234步骤2:配置conf文件核心的:输入input,输出ouput,中间处理filter都在如下的配置文件中。输入:指定ES地址,索引,请求query语句;输出:csv输出地址,输出字段列表。input { elasticsearch { hosts => "127.0.0.1:9200" index => "company_infos" query => ' { "query": { "match_all": {} } } ' }}output { csv { # elastic field name fields => ["no", "name", "age", "company_name", "department", "sex"] # This is path where we store output. path => "D:\logstash-6.5.4\export\csv-export.csv" }}12345678910111213141516171819202122步骤3:执行导出D:\\logstash-6.5.4\bin>logstash -f ../config/logstash_ouput_csv.confSending Logstash logs to D:/2.es_install/logstash-6.5.4/logs which is now configured via log4j2.properties[2019-08-03T23:45:00,914][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified[2019-08-03T23:45:00,934][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.5.4"}[2019-08-03T23:45:03,473][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}[2019-08-03T23:45:04,241][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x34b305d3 sleep>"}[2019-08-03T23:45:04,307][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}[2019-08-03T23:45:04,740][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}[2019-08-03T23:45:05,610][INFO ][logstash.outputs.csv ] Opening file {:path=>"D:/logstash-6.5.4/export/csv-export.csv"}[2019-08-03T23:45:07,558][INFO ][logstash.pipeline ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"地址:https://medium.com/@shaonshaonty/export-data-from-elasticsearch-to-csv-caaef3a19b695、小结根据业务场景选择导出数据的方式。
链接logstash和kafka_connector都仅支持基于自增id或者时间戳更新的方式增量同步数据。回到问题本身:如果库表里没有相关字段,该如何处理呢?本文给出相关探讨和解决方案。1、 binlog认知1.1 啥是 binlog?binlog是Mysql sever层维护的一种二进制日志,与innodb引擎中的redo/undo log是完全不同的日志;其主要是用来记录对mysql数据更新或潜在发生更新的SQL语句,并以"事务"的形式保存在磁盘中;作用主要有:1)复制:达到master-slave数据一致的目的。2)数据恢复:通过mysqlbinlog工具恢复数据。3)增量备份。1.2 阿里的Canal实现了增量Mysql同步一图胜千言,canal是用java开发的基于数据库增量日志解析、提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。目的:增量数据订阅&消费。综上,使用binlog可以突破logstash或者kafka-connector没有自增id或者没有时间戳字段的限制,实现增量同步。2、基于binlog的同步方式1)基于kafka Connect的Debezium 开源工程,地址:. https://debezium.io/2)不依赖第三方的独立应用: Maxwell开源项目,地址:http://maxwells-daemon.io/由于已经部署过conluent(kafka的企业版本,自带zookeeper、kafka、ksql、kafka-connector等),本文仅针对Debezium展开。3、Debezium介绍Debezium是捕获数据实时动态变化的开源的分布式同步平台。能实时捕获到数据源(Mysql、Mongo、PostgreSql)的:新增(inserts)、更新(updates)、删除(deletes)操作,实时同步到Kafka,稳定性强且速度非常快。特点:1)简单。无需修改应用程序。可对外提供服务。2)稳定。持续跟踪每一行的每一处变动。3)快速。构建于kafka之上,可扩展,经官方验证可处理大容量的数据。4、同步架构如图,Mysql到ES的同步策略,采取“曲线救国”机制。步骤1: 基Debezium的binlog机制,将Mysql数据同步到Kafka。步骤2: 基于Kafka_connector机制,将kafka数据同步到Elasticsearch。5、Debezium实现Mysql到ES增删改实时同步软件版本:confluent:5.1.2;Debezium:0.9.2_Final;Mysql:5.7.x.Elasticsearch:6.6.15.1 Debezium安装confluent的安装部署参见:http://t.cn/Ef5poZk,不再赘述。Debezium的安装只需要把debezium-connector-mysql的压缩包解压放到Confluent的解压后的插件目录(share/java)中。MySQL Connector plugin 压缩包的下载地址:https://debezium.io/docs/install/。注意重启一下confluent,以使得Debezium生效。5.2 Mysql binlog等相关配置。Debezium使用MySQL的binlog机制实现数据动态变化监测,所以需要Mysql提前配置binlog。核心配置如下,在Mysql机器的/etc/my.cnf的mysqld下添加如下配置。[mysqld]server-id = 223344log_bin = mysql-binbinlog_format = rowbinlog_row_image = fullexpire_logs_days = 101234567然后,重启一下Mysql以使得binlog生效。systemctl start mysqld.service15.3 配置connector连接器。配置confluent路径目录 : /etc创建文件夹命令 :mkdir kafka-connect-debezium1在mysql2kafka_debezium.json存放connector的配置信息 :[root@localhost kafka-connect-debezium]# cat mysql2kafka_debezium.json{ "name" : "debezium-mysql-source-0223", "config": { "connector.class" : "io.debezium.connector.mysql.MySqlConnector", "database.hostname" : "192.168.1.22", "database.port" : "3306", "database.user" : "root", "database.password" : "XXXXXX", "database.whitelist" : "kafka_base_db", "table.whitlelist" : "accounts", "database.server.id" : "223344", "database.server.name" : "full", "database.history.kafka.bootstrap.servers" : "192.168.1.22:9092", "database.history.kafka.topic" : "account_topic", "include.schema.changes" : "true" , "incrementing.column.name" : "id", "database.history.skip.unparseable.ddl" : "true", "transforms": "unwrap,changetopic", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", "transforms.changetopic.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.changetopic.regex":"(.*)", "transforms.changetopic.replacement":"$1-smt" }}1234567891011121314151617181920212223242526注意如下配置:“database.server.id”,对应Mysql中的server-id的配置。“database.whitelist” : 待同步的Mysql数据库名。“table.whitlelist” :待同步的Mysq表名。重要:“database.history.kafka.topic”:存储数据库的Shcema的记录信息,而非写入数据的topic、“database.server.name”:逻辑名称,每个connector确保唯一,作为写入数据的kafka topic的前缀名称。坑一:transforms相关5行配置作用是写入数据格式转换。如果没有,输入数据会包含:before、after记录修改前对比信息以及元数据信息(source,op,ts_ms等)。这些信息在后续数据写入Elasticsearch是不需要的。(注意结合自己业务场景)。格式转换相关原理:https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/5.4 启动connectorcurl -X POST -H "Content-Type:application/json" --data @mysql2kafka_debezium.json.json http://192.168.1.22:18083/connectors | jq1235.5 验证写入是否成功。查看kafka-topickafka-topics --list --zookeeper localhost:21811此处会看到写入数据topic的信息。注意新写入数据topic的格式:database.schema.table-smt 三部分组成。本示例topic名称:full.kafka_base_db.account-smt。消费数据验证写入是否正常./kafka-avro-console-consumer --topic full.kafka_base_db.account-smt --bootstrap-server 192.168.1.22:9092 --from-beginning至此,Debezium实现mysql同步kafka完成。6、kafka-connector实现kafka同步Elasticsearch6.1、Kafka-connector介绍见官网:https://docs.confluent.io/current/connect.htmlKafka Connect是一个用于连接Kafka与外部系统(如数据库,键值存储,检索系统索引和文件系统)的框架。连接器实现公共数据源数据(如Mysql、Mongo、Pgsql等)写入Kafka,或者Kafka数据写入目标数据库,也可以自己开发连接器。6.2、kafka到ES connector同步配置配置路径:/home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties1配置内容:"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","tasks.max": "1","topics": "full.kafka_base_db.account-smt","key.ignore": "true","connection.url": "http://192.168.1.22:9200","type.name": "_doc","name": "elasticsearch-sink-test"12345676.3 kafka到ES启动connector启动命令confluent load elasticsearch-sink-test -d /home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties126.4 Kafka-connctor RESTFul API查看Mysql2kafka,kafka2ES的connector详情信息可以借助postman或者浏览器或者命令行查看。 curl -X GET http://localhost:8083/connectors17、坑复盘。坑2: 同步的过程中可能出现错误,比如:kafka topic没法消费到数据。排解思路如下:1)确认消费的topic是否是写入数据的topic;2)确认同步的过程中没有出错。可以借助connector如下命令查看。 curl -X GET http://localhost:8083/connectors-xxx/status1坑3: Mysql2ES出现日期格式不能识别。是Mysql jar包的问题,解决方案:在my.cnf中配置时区信息即可。坑4: kafka2ES,ES没有写入数据。排解思路:1)建议:先创建同topic名称一致的索引,注意:Mapping静态自定义,不要动态识别生成。2)通过connetor/status排查出错原因,一步步分析。8、小结binlog的实现突破了字段的限制,实际上业界的go-mysql-elasticsearch已经实现。对比:logstash、kafka-connector,虽然Debezium“曲线救国”两步实现了实时同步,但稳定性+实时性能相对不错。推荐大家使用。大家有好的同步方式也欢迎留言讨论交流。参考:[1] https://rmoff.net/2018/03/24/streaming-data-from-mysql-into-kafka-with-kafka-connect-and-debezium/[2] https://www.smwenku.com/a/5c0a7b61bd9eee6fb21356a1/zh-cn[3] https://juejin.im/post/5b7c036bf265da43506e8cfd[4] https://debezium.io/docs/connectors/mysql/#configuration[5] https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html#connect-jdbc
链接2、Elasticsearch 集群颜色变黄色了要不要紧?Elasticsearch集群黄色代表:分配了所有主分片,但至少缺少一个副本。没有数据丢失,因此搜索结果仍将完整。注意:您的高可用性在某种程度上会受到影响。如果更多分片消失,您可能会丢失数据。 将黄色视为应该提示调查的警告。3、Elasticsearch集群健康状态如何排查?3.1 集群状态查看curl -XGET 'http://localhost:9200/_cluster/health?pretty=true' { "cluster_name" : "astrung", "status" : "yellow", "timed_out" : false, "number_of_nodes" : 2, "number_of_data_nodes" : 2, "active_primary_shards" : 22, "active_shards" : 22, "relocating_shards" : 0, "initializing_shards" : 2, "unassigned_shards" : 20}123456789101112133.2 分片状态查看curl -XGET 'http://localhost:9200/_cat/shards?v' index shard prirep state docs store ip node _river 0 p STARTED 2 8.1kb 192.168.1.3 One _river 0 r UNASSIGNED megacorp 4 p STARTED 1 3.4kb 192.168.1.3 One megacorp 4 r UNASSIGNED megacorp 0 p STARTED 2 6.1kb 192.168.1.3 One 12345673.3 查看unsigned 的原因GET /_cluster/allocation/explain13.4 查看集群中不同节点、不同索引的状态GET _cat/shards?h=index,shard,prirep,state,unassigned.reason13.5 Head插件直观排查4、Elasticsearch集群黄色的原因排查及解决方案4.1 原因1:Elasticsearch采用默认配置(5分片,1副本),但实际只部署了单节点集群。由于只有一个节点,因此群集无法放置副本,因此处于黄色状态。elasticsearch 索引的默认配置如下:index.number_of_shards:5index.number_of_replicas:112解决方案如下:您可以将副本计数降低到0或将第二个节点添加到群集,以便可以将主分片和副本分片安全地放在不同的节点上。这样做以后,如果您的节点崩溃,群集中的另一个节点将拥有该分片的副本。(1)设置副本数为0,操作如下:PUT /cs_indexs/_settings{ "number_of_replicas": 0}1234进行段合并,提升访问效率,操作如下:POST /cs_indexs/_forcemerge?max_num_segments=1(2)不再物理扩展集群,将后续所有的索引自动创建的副本设置为 0。PUT /_template/index_defaults { "template": "*", "settings": { "number_of_replicas": 0 }}1234567##4.2 原因2:Elasticsearch分配分片错误。进一步可能的原因:您已经为集群中的节点数过分分配了副本分片的数量,则分片将保持UNASSIGNED状态。其错误码为:ALLOCATION_FAILED。解决方案如下:reroute:重新路由命令允许手动更改群集中各个分片的分配。核心操作如下:POST /_cluster/reroute { "commands": [ { "allocate_replica": { "index": "cs_indexs", "shard": 0, # 重新分配的分片(标记黄色的分片) "node": "es-2" } } ]}123456789101112reroute扩展使用——可以显式地将分片从一个节点移动到另一个节点,可以取消分配,并且可以将未分配的分片显式分配给特定节点。举例使用模板如下:POST /_cluster/reroute{ "commands" : [ { "move" : { "index" : "test", "shard" : 0, "from_node" : "node1", "to_node" : "node2" } }, { "allocate_replica" : { "index" : "test", "shard" : 1, "node" : "node3" } } ]}1234567891011121314151617其中:1)move代表移动;2)allocate_replica 代表重新分配;3)cancel 代表取消;4.3 磁盘使用过载。原因3:磁盘使用超过设定百分比85%。cluster.routing.allocation.disk.watermark.low——控制磁盘使用的低水位线。 它默认为85%,这意味着Elasticsearch不会将分片分配给使用磁盘超过85%的节点。 它也可以设置为绝对字节值(如500mb),以防止Elasticsearch在小于指定的可用空间量时分配分片。解决方案:(1)查看磁盘空间是否超过85%。[root@localhost home]# df -hFilesystem Size Used Avail Use% Mounted on/dev/xvda1 1014M 165M 849M 17% /boot/dev/mapper/cl-home 694G 597G 98G 86% /home1234(2)删除不必要的索引,以释放更多的空间。DELETE cs_indexs1##4.4 磁盘路径权限问题。原因4:磁盘路径权限问题。安全起见,默认Elasticsearch非root账户和启动。相关的Elasticsearch数据路径也是非root权限。解决方案:去数据存储路径排查权限,或者在data的最外层设置:chown -R elasticsearch:elasticsearch data1推荐阅读:干货 | Elasticsearch 集群健康值红色终极解决方案参考:https://www.elastic.co/guide/en/elasticsearch/guide/current/_cluster_health.htmlhttps://www.jianshu.com/p/542ed5a5bdfchttps://www.elastic.co/guide/en/elasticsearch/reference/current/indices-update-settings.htmlhttps://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-reroute.html
链接2、为什么是Nginx?Nginx是俄罗斯软件工程师Igor Sysoev开发的免费开源web服务器软件。nginx于2004年发布,c语言开发,聚焦于高性能,高并发和低内存消耗问题。并且具有多种web服务器功能特性:负载均衡,缓存,访问控制,带宽控制,以及高效整合各种应用的能力,这些特性使nginx很适合于现代网站架构。在国内,已经有淘宝、新浪博客、新浪播客、网易新闻、六间房、56.com、Discuz!、水木社区、豆瓣、YUPOO、海内、迅雷在线等多家网站使用 Nginx 作为Web服务器或反向代理服务器。3、Nginx通过反向代理实现负载均衡NginX,是一款高性能的反向代理服务器;也是一个IMAP、POP3、SMTP代理服务器;也是一个Http服务器。也就是说Nginx本身就可以托管网站,进行Http服务处理,也可以作为反向代理服务器使用。反向代理:代理服务器服务的对象不是用户,而是其他内容服务器。用户访问反向代理服务器(nginx服务器),由反向代理服务器决定转发给哪台服务器处理请求。注意:不是反向代理提高了性能,反向代理到后端服务器(N台),通过轮询多台后端服务器来提高逻辑处理能力,达到提高系统性能的效果。显然,Nginx起到的核心作用如下:1、分流请求2、负载均衡4、Linux下nginx的安装4.1.正则表达式库安装1)确保进行了安装了linux常用必备支持库。用命令rpm -qa | grep gcc 检查是否安装了g++、gcc。若未安装请使用命令:# yum install gcc-c++进行安装。2) 下载pcre-8.12.tar.gz, nginx-1.5.0.tar.gz 到 /usr/local/src/nginx目录下。3)解压pcre-8.12.tar.gz # cd /usr/local/src/nginx # tar zxvf pcre-8.12.tar.gz4)进入解压后的目录 # cd pcre-8.125)配置 # ./configure6) 编译 # make7) 安装 # make install1234567891011121314151617184.2 Nginx安装1) 创建用户nginx使用的www用户。 #添加www组 # groupadd www #创建nginx运行账户www并加入到www组,不允许www用户直接登录系统 # useradd -g www www -s /bin/false 2)创建安装目录与日志目录 创建安装目录 # mkdir /usr/local/nginx 创建日志目录 # mkdir /data0/logs/nginx # chown www:www /data0/logs/nginx -R3) 判断系统是否安装了zlib-devel。如果没有安装。使用如下命令进行安装: # yum install -y zlib-devel4)Nginx下载解压切换目录到/usr/local/src/nginx目录下# cd /usr/local/src/nginx下载nginx到该目录将下载的文件解压 # tar zxvf nginx-1.5.0.tar.gz5) 进入Nginx路径 # cd nginx-1.5.06) 配置通常将软件安装在/usr/local/目录下。# ./configure --user=www --group=www --prefix=/usr/local/nginx --with-http_stub_status_module --with-http_ssl_module --with-http_realip_module注意:openssl的安装yum install -y openssl-devel7)编译 # make8) 安装 # make install9) 检查是否安装成功 # cd /usr/local/nginx/sbin # ./nginx -t结果显示: nginx: the configuration file /usr/local/nginx/conf/nginx.conf syntax is ok nginx: configuration file /usr/local/nginx/conf/nginx.conf test is successful则表示安装成功。12345678910111213141516171819202122232425262728293031323334353637384.3 Linux做负载均衡的配置修改/usr/local/nginx/conf/nginx.conf#user www;worker_processes 8;#修改为和服务器CPU核数一样#error_log logs/error.log;#error_log logs/error.log notice;#error_log logs/error.log info;#pid logs/nginx.pid;events { worker_connections 1024;}http { include mime.types; default_type application/octet-stream; #log_format main '$remote_addr - $remote_user [$time_local] "$request" ' # '$status $body_bytes_sent "$http_referer" ' # '"$http_user_agent" "$http_x_forwarded_for"'; #access_log logs/access.log main; sendfile on; #tcp_nopush on; #keepalive_timeout 0; keepalive_timeout 65; #gzip on;#将进行负载均衡的服务器及端口号配置在这里 upstream backend { #ip_hash server 192.168.12.15:19001; } server { listen 80; server_name localhost; #charset koi8-r; #access_log logs/host.access.log main; location / { #设置主机头和客户端真实地址,以便服务器获取客户端真实IP# proxy_set_header Host $host;# proxy_set_header X-Real-IP $remote_addr;# proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; #禁用缓存# proxy_buffering off; #设置反向代理的地址,将负载均衡的服务器变量backend设为反向代理地址 proxy_pass http://backend;# root html;# index index.html index.htm; } #error_page 404 /404.html; # redirect server error pages to the static page /50x.html # error_page 500 502 503 504 /50x.html; location = /50x.html { root html; } }}12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970714.4 Nginx启停1)检查配置文件是否正确 /usr/local/nginx-1.6/sbin/nginx -t./sbin/nginx -V # 可以看到编译选项122)启动 Nginx./sbin/nginx # 默认配置文件 conf/nginx.conf,-c 指定13)停止Nginx./sbin/nginx -s stop或pkill nginx124)重启 Nginx# ./sbin/nginx -s reload或 kill -HUP `cat /usr/local/nginx-1.6/logs/nginx.pid`12推荐阅读[1] Nginx压缩高效配置: https://www.insp.top/article/open-nginx-gzip-module[2] Nginx简介:https://blog.csdn.net/wang379275614/article/details/47777985[3] Nginx介绍:https://blog.csdn.net/hanhuili/article/details/9389571
二、排查思路2.1、业务场景排查问自己几个问题?- 1)集群中数据类型是怎么样的?- 2)集群中有多少数据?- 3)集群中有多少节点数、分片数?- 4)当前集群索引和检索的速率如何?- 5)当前在执行哪种类型的查询或者其他操作?2、建议Htop观察,结合ElaticHQ 观察CPU曲线3、CPU高的时候,建议看一下ES节点的日志,看看是不是有大量的GC。4、查看hot_threads。GET _nodes/hot_threads::: {test}{ikKuXkFvRc-qFCqG99smGg}{VE-uqoiARoONJwomfPwRBw}{127.0.0.1}{127.0.0.1:9300}{ml.machine_memory=8481566720, ml.max_open_jobs=20, ml.enabled=true} Hot threads at 2018-04-09T15:58:21.117Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true: 0.0% (0s out of 500ms) cpu usage by thread 'Attach Listener' unique snapshot unique snapshot unique snapshot unique snapshot unique snapshot unique snapshot unique snapshot unique snapshot unique snapshot unique snapshot12345678910111213141516三、解决方案:3.1、集群负载高,增加新节点以缓解负载。3.2、增加堆内存到系统内存的1半,最大31GB(理论上线32GB).如果机器内存不够,那就加大内存吧。https://github.com/elastic/elasticsearch/issues/10437https://discuss.elastic.co/t/es-high-cpu-usage-when-idle/87950/43.3、插入数据的时候,副本数设置为0.分片数不可以修改,副本数是可以修改的。注意:分片过多,会导致:堆内存压力大。3.4、配置优化Force all memory to be locked, forcing the JVM to never swapbootstrap.mlockall: trueThreadpool SettingsSearch poolthreadpool.search.type: fixedthreadpool.search.size: 20threadpool.search.queue_size: 200Bulk poolthreadpool.bulk.type: fixedthreadpool.bulk.size: 60threadpool.bulk.queue_size: 3000Index poolthreadpool.index.type: fixedthreadpool.index.size: 20threadpool.index.queue_size: 1000Indices settingsindices.memory.index_buffer_size: 30%indices.memory.min_shard_index_buffer_size: 12mbindices.memory.min_index_buffer_size: 96mbCache Sizesindices.fielddata.cache.size: 30%#indices.fielddata.cache.expire: 6h #will be depreciated & Dev recomend not to use itindices.cache.filter.size: 30%#indices.cache.filter.expire: 6h #will be depreciated & Dev recomend not to use itIndexing Settings for Writesindex.refresh_interval: 30s#index.translog.flush_threshold_ops: 50000#index.translog.flush_threshold_size: 1024mbindex.translog.flush_threshold_period: 5mindex.merge.scheduler.max_thread_count: 1123456789101112131415161718192021222324252627282930参考:https://github.com/elastic/elasticsearch/issues/4288
1、 X-Pack 概览X-Pack 简介1)利用 X-Pack 拓展可能性。X-Pack 是集成了多种便捷功能的单个插件 — security、alerting、monitoring、reporting、graph 探索和 machine learning — 您可以在 Elastic Stack 中放心地使用这些功能。2)单就其自身而言,Elastic Stack 就是一款值得考虑的强大工具。X-Pack 将诸多强大功能集合到一个单独的程序包中,更将它带上了一个新的层次。3)x-pack是elasticsearch的一个扩展包,将安全,警告,监视,图形和报告功能捆绑在一个易于安装的软件包中,虽然x-pack被设计为一个无缝的工作,但是你可以轻松的启用或者关闭一些功能。X-Pack 功能介绍X-Pack 提供以下几个级别保护elastic集群1)用户验证2)授权和基于角色的访问控制3)节点/客户端认证和信道加密4)审计通俗讲解:安全防护功能:你是不是,不想别人直接访问你的5601,9200端口,这个,x-pack能办到。实时监控功能:实时监控集群的CPU、磁盘等负载;生成报告功能:图形化展示你的集群使用情况。还有,机器学习等功能。以上这些都是X-pack的核心功能点。2、 X-Pack 注意事项注意:截至Elasticsearch6.2.2(2018-3-11)的版本,x-pack尚处于付费版本,适用期限:1个月。不过,由于Elasticsearch公司已经开源x-pack,不久的将来,有可能到6.3版本,x-pack就可以和kibana一样使用了。3、 X-Pack 安装步骤以下由于特殊原因,我使用windows10安装的。Linux步骤相同。步骤1:安装Elasticsearch,这里我用的最新的版本:ElasticsearchV6.2.2安装head插件的最简单的方法:直接在Chrome中安装插件:http://sina.lt/ftSr以上后红色箭头标注的,都和x-pack有关。后续步骤安装成功后,可以看到。步骤2:安装kibanaV6.2.2步骤3:Elasticsearch下安装xpack。bin/elasticsearch-plugin install x-pack产生缺省的密码——(此步骤非常重要,缺省密码记录下,后续登陆用)。步骤4:启动Elasticsearchbin/elasticsearch启动时,x-pack相关的加载如下:步骤5:设置密码——自动生成密码bin/x-pack/setup-passwords auto步骤6:Kibana下安装x-packbin/kibana-plugin install x-pack注意,在kibana配置文件下设置登陆用户名和密码(步骤3记录的)步骤7:启动kibanakibana安装x-pack后的界面如下:登陆的时候,使用超级管理员用户:elastic和密码登陆。kibana登陆后的效果如下:注意用户权限:还是适用版本:步骤8:head插件身份验证登陆4、X-pack开源关于x-pack开源:http://www.lupaworld.com/article-266921-1.html5、关于X-pack破解由于,我这边使用该功能还不够迫切,我没有尝试。网上有很多,举例:https://www.jianshu.com/p/6acfeabb44f8 (2018-3 ES6.2.2最新版本)后续待ES6.3版本后,深入探究。
本文主要基于kafka connector实现kafka到Elasticsearch全量、增量同步。2、从confluenct说起LinkedIn有个三人小组出来创业了—正是当时开发出Apache Kafka实时信息列队技术的团队成员,基于这项技术Jay Kreps带头创立了新公司Confluent。Confluent的产品围绕着Kafka做的。Confluent Platform简化了连接数据源到Kafka,用Kafka构建应用程序,以及安全,监控和管理您的Kafka的基础设施。confluent组成如下所示:1)Apache Kafka消息分发组件,数据采集后先入Kafka。2)Schema RegistrySchema管理服务,消息出入kafka、入hdfs时,给数据做序列化/反序列化处理。3)Kafka Connect提供kafka到其他存储的管道服务,此次焦点是从kafka到hdfs,并建立相关HIVE表。4)Kafka Rest Proxy提供kafka的Rest API服务。5)Kafka Clients提供Client编程所需SDK。默认端口对应表:组件 | 端口Apache Kafka brokers (plain text):9092Confluent Control Center:9021Kafka Connect REST API:8083REST Proxy:8082Schema Registry REST API:8081ZooKeeper:21813、kafka connector介绍。Kafka 0.9+增加了一个新的特性 Kafka Connect,可以更方便的创建和管理数据流管道。它为Kafka和其它系统创建规模可扩展的、可信赖的流数据提供了一个简单的模型。通过 connectors可以将大数据从其它系统导入到Kafka中,也可以从Kafka中导出到其它系统。Kafka Connect可以将完整的数据库注入到Kafka的Topic中,或者将服务器的系统监控指标注入到Kafka,然后像正常的Kafka流处理机制一样进行数据流处理。而导出工作则是将数据从Kafka Topic中导出到其它数据存储系统、查询系统或者离线分析系统等,比如数据库、 Elastic Search、 Apache Ignite等。KafkaConnect有两个核心概念:Source和Sink。 Source负责导入数据到Kafka,Sink负责从Kafka导出数据,它们都被称为Connector。kafkaConnect通过Jest实现Kafka对接Elasticsearch。4、kafka connector安装实操非研究性的目的,不建议源码安装。直接从官网down confluent安装即可。地址:https://www.confluent.io/download/如下,解压后既可以使用。[root@kafka_no1 confluent-3.3.0]# pwd/home/confluent/confluent-3.3.0[root@kafka_no1 confluent-3.3.0]# ls -altotal 32drwxrwxr-x. 7 root root 4096 Dec 16 10:08 .drwxr-xr-x. 3 root root 4096 Dec 20 15:34 ..drwxr-xr-x. 3 root root 4096 Jul 28 08:30 bindrwxr-xr-x. 18 root root 4096 Jul 28 08:30 etcdrwxr-xr-x. 2 root root 4096 Dec 21 15:34 logs-rw-rw-r--. 1 root root 871 Jul 28 08:45 READMEdrwxr-xr-x. 10 root root 4096 Jul 28 08:30 sharedrwxrwxr-x. 2 root root 4096 Jul 28 08:45 src5、kafka connector模式Kafka connect 有两种工作模式1)standalone:在standalone模式中,所有的worker都在一个独立的进程中完成。2)distributed:distributed模式具有高扩展性,以及提供自动容错机制。你可以使用一个group.ip来启动很多worker进程,在有效的worker进程中它们会自动的去协调执行connector和task,如果你新加了一个worker或者挂了一个worker,其他的worker会检测到然后在重新分配connector和task。6、kafka connector同步步骤前提:$ confluent start1如下的服务都需要启动:Starting zookeeperzookeeper is [UP] ——对应端口:2181Starting kafkakafka is [UP]——对应端口:9092Starting schema-registryschema-registry is [UP]——对应端口:8081Starting kafka-restkafka-rest is [UP]Starting connectconnect is [UP]可以,netstat -natpl 查看端口是否监听ok。步骤1:创建topic./kafka-topics.sh --create --zookeeper 110.118.7.11 :2181 --replication-factor 3 --partitions 1 --topic test-elasticsearch-sink1步骤2:生产者发布消息假定avrotest topic已经创建。./kafka-avro-console-producer --broker-list 110.118.7.11:9092 --topic test-elasticsearch-sink \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'{"f1": "value1"}{"f1": "value2"}{"f1": "value3"}步骤3:消费者订阅消息测试(验证生产者消息可以接收到)./kafka-avro-console-consumer --bootstrap-server 110.118.7.11:9092 :9092 --topic test-elasticsearch-sink --from-beginning1步骤4:connector传输数据操作到ES./connect-standalone ../etc/schema-registry/connect-avro-standalone.properties \../etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties12注意此处: connect-standalone模式,对应 connect-avro-standalone.properties要修改;如果使用connect-distribute模式,对应的connect-avro-distribute.properties要修改。这里 quickstart-elasticsearch.properties :启动到目的Elasticsearch配置。quickstart-elasticsearch.properties**设置**:name=elasticsearch-sinkconnector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnectortasks.max=1#kafka主题名称,也是对应Elasticsearch索引名称topics= test-elasticsearch-sinkkey.ignore=true#ES url信息connection.url=http://110.18.6.20:9200#ES type.name固定type.name=kafka-connect7、同步效果。curl -XGET 'http:// 110.18.6.20 :9200/test-elasticsearch-sink/_search?pretty'8、连接信息查询REST API-GET /connectors – 返回所有正在运行的connector名。- POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。- GET /connectors/{name} – 获取指定connetor的信息。- GET /connectors/{name}/config – 获取指定connector的配置信息。- PUT /connectors/{name}/config – 更新指定connector的配置信息。- GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。- GET /connectors/{name}/tasks – 获取指定connector正在运行的task。- GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息。- PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。- PUT /connectors/{name}/resume – 恢复一个被暂停的connector。- POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用- POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。- DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。9、小结。他山之石,可以攻玉。kafka上的小学生,继续加油!参考:[1]kafka-connect部署及简介:http://t.cn/RiUCaWx[2]connector介绍:http://orchome.com/344[3]英文-同步介绍http://t.cn/RYeZm7P[4]部署&开发http://t.cn/RTeyOEl[5]confluent生态链http://t.cn/RTebVyL[6]快速启动参考:https://docs.confluent.io/3.3.0/quickstart.html[7]ES-connector:http://t.cn/RTecXmc
我的思考如下:1、pdf、Office类的文档如何被ES索引?更确切的说,pdf、Office类文档(word,ppt,excel等)如何导入ES中。如图所示:问题转嫁为:如何将Office类文档、PDF文档导入ES建立索引,并提供全文检索服务?2、Elasticsearch支持的最大待检索字段的长度是多大?ES5.X版本以后,keyword支持的最大长度为32766个UTF-8字符,text对字符长度没有限制。设置ignore_above后,超过给定长度后的数据将不被索引,无法通过term精确匹配检索返回结果。参考我的整理:http://blog.csdn.net/laoyang360/article/details/78207980参考6.0官网解读:https://www.elastic.co/guide/en/elasticsearch/reference/6.0/ignore-above.html参考luncene7.1API: http://t.cn/RYWvuGl3、Office&pdf文档存入Elastisearch注意问题清单少废话,直接上图。4、解析实战代码/***@brief:不同类型的文档解析**@param:文件路径**@return:解析结果*/public static ImExtInfo readXFile(String inputPath){String fileExt = FileUtil.getFileExtension(file);switch(fileExt){case "pdf":imExtInfo = readPdfFile(inputPath);break;case "docx":imExtInfo = readDocxFile(inputPath);break;case "doc":imExtInfo = readDocFile(inputPath);break;case "json":case "txt":case "md":imExtInfo = readTxtFile(inputPath);break;default:logger.info("unknow type " + fileExt);imExtInfo = null;}/***@brief:docx文档解析**@param:文件路径**@return:解析结果*/public static ImExtInfo readDocxFile(String inputPath) {ImExtInfo ImExtInfo = new ImageExtInfo();try {File file = new File(inputPath.trim());//1.获取文件名称String fileTitle = file.getName();//2.获取绝对路径String filePath = file.getAbsolutePath();//3.获取文件内容FileInputStream fis = new FileInputStream(file.getAbsolutePath());XWPFDocument document = new XWPFDocument(fis);List<XWPFParagraph> paragraphs = document.getParagraphs();StringBuilder contentBuilder = new StringBuilder();//System.out.println("Total no of paragraph "+paragraphs.size());for (XWPFParagraph para : paragraphs) {//System.out.println(para.getText());contentBuilder.append(para.getText().trim());}String content = contentBuilder.toString();imExtInfo.setTitle(fileTitle);imExtInfo.setPath(filePath);imExtInfo.setContent(content.trim());// 获取当前时间...String curTimeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());imExtInfo.setCreate_time(curTimeStamp);document.close();fis.close();} catch (Exception e) {e.printStackTrace();}return imExtInfo;}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869以上仅列举核心示例代码。5、小结从功能和性能角度考量,建立知识库的建议如下:1)知识库的核心是数据导入ES,导入ES的核心是各种类型文档的解析;2)提前设定Mapping,定义好字段分词、不分词的策略;3)对于大于1MB一个字段的存储,建议使用fvh高亮方式,在Mapping中一并设置。参考:[1] Java读取Office文档参考:http://poi.apache.org/document/[2] Html2Md参考:https://github.com/pnikosis/jHTML2Md[3] Pdf2Html参考:https://github.com/coolwanglu/pdf2htmlEX[4]OpenOffice参考:https://www.openoffice.org/download/index.html
1、sql语句转成DSL有哪些方法?方案一:借助工具 NLP团体开发的Elasticsearch-sql;2.X安装过,5.X没有再安装。方案二:借助工具ElasticHQ的自动转换模块:方案一、方案二和Github上其他语言开发的sql转DSL工具对简单的sql生成的DSL相对准确,但对于复杂的sql生成的不一定精确。(如上所示)方案三:徒手生成。2、如何根据复杂的sql语句生成ES的DSL查询语句呢?步骤1:拆解where (position=ES or work=ES or content=ES) and academic=本科 and (city=北京 or city=深圳)这个sql语句由几部分组成呢?以and作为拆分,共分为3部分:三个部分sql用and衔接,转换为DSL对应最外层must;第一部分: position=ES or work=ES or content=ES三个子条件sql用or衔接,转换DSL对应should;第二部分: academic=本科单一条件转换为DSL对应term精确匹配;第三部分: city=北京 or city=深圳city的两个or语句转换为DSL对应terms多词精确匹配。上面的sql都用的=号,假定不需要分词,我们统一采用termquery和termsquery实现。引申:如果需要分词,更换为matchquery或者match_parsequery即可。如果需要模糊匹配,更换为wildcardquery接口。步骤2:套bool多条件检索DSL模板复杂bool多条件检索DSL模板:包含了:查询/检索、聚合、排序、指定字段输出、输出起点、输出多少等信息。{ "query": { "bool": { "must": [], "must_not": [], "should": [] } }, "aggs": { "my_agg": { "terms": { "field": "user", "size": 10 } } }, "highlight": { "pre_tags": [ "<em>" ], "post_tags": [ "</em>" ], "fields": { "body": { "number_of_fragments": 1, "fragment_size": 20 }, "title": {} } }, "size": 20, "from": 100, "_source": [ "title", "id" ], "sort": [ { "_id": { "order": "desc" } } ]}123456789101112131415161718192021222324252627282930313233343536373839404142434445简单bool多条件查询DSL模板:只包含查询。{ "query": { "bool": { "must": [], "must_not": [], "should": [] } }}123456789以上根据我们的sql特点,简单模板即能满足要求。步骤3:构造生成DSL根据,步骤1、步骤2,可以构思出根据sql转换后的DSL应该:1)最外层bool2)第二层:must 三个并行条件3)第三层:各自的匹配条件。(存在bool嵌套bool的情况)3、动动手,验证下。3.1 创建索引(自动生成mapping)put test_index_0113.2 提交数据post test_index_01/test_type_01/1{ "no":"1", "city":"北京", "academic":"专科", "content":"ES", "position":"ES", "work":"ES"}post test_index_01/test_type_01/2{ "no":"2", "city":"天津", "academic":"本科", "content":"ES", "position":"ES", "work":"ES"}post test_index_01/test_type_01/3{ "no":"3", "city":"深圳", "academic":"本科", "content":"ES", "position":"ES2", "work":"ES3"}post test_index_01/test_type_01/4{ "no":"4", "city":"北京", "academic":"本科", "content":"ES1", "position":"ES2", "work":"ES"}123456789101112131415161718192021222324252627282930313233343536插入后ES-head插件控制台查询结果:3.3 完成检索post test_index_01/_search{ "query": { "bool": { "must": [ { "terms": { "city.keyword": [ "北京", "深圳" ] } }, { "term": { "academic.keyword": "本科" } }, { "bool": { "should": [ { "term": { "content.keyword": "ES" } }, { "term": { "position.keyword": "ES" } }, { "term": { "work.keyword": "ES" } } ] } } ] } }, "size": 10, "from": 0}123456789101112131415161718192021222324252627282930313233343536373839404142434445注意:没有做分词,做的精确匹配,所以加了”.keyword”。3.4 返回结果{ "took": 1, "timed_out": false, "_shards": { "total": 5, "successful": 5, "failed": 0 }, "hits": { "total": 2, "max_score": 1.0577903, "hits": [ { "_index": "test_index_01", "_type": "test_type_01", "_id": "4", "_score": 1.0577903, "_source": { "no": "4", "city": "北京", "academic": "本科", "content": "ES1", "position": "ES2", "work": "ES" } }, { "_index": "test_index_01", "_type": "test_type_01", "_id": "3", "_score": 0.8630463, "_source": { "no": "3", "city": "深圳", "academic": "本科", "content": "ES", "position": "ES2", "work": "ES3" } } ] }}123456789101112131415161718192021222324252627282930313233343536373839404142434、小结实践是检验真理的唯一标准!如有不同意见,欢迎拍砖探讨!
步骤1:卸载已安装的openoffice4。1.1 干掉已安装包和文件[root@dev ~]# rpm -e `rpm -qa |grep openoffice` `rpm -qa |grep ooobasis`[root@dev ~]# rpm -e `rpm -qa |grep openoffice` `rpm -qa |grep ooobasis`[root@dev ~]# rm -rf /opt/openoffice41231.2 查看openoffice是否还存在1)查看安装包是否还在rpm -q openoffice*rpm -q ooobasis*122)查看进程是否还在ps ef|grep soffice13)查看文件是否还在find / -name *openoffice*1步骤2:安装yum install openoffice.org-headless 1(貌似早期版本,实践证明可用)2.1 脚本启动openoffice。1)将如下脚本放到/etc/init.d路径。(其实放哪里都可以)[root@290bc6e083d6 program]# cat /etc/init.d/soffice#!/bin/sh## ooffice openoffice conversion service## Author: Ben Ward, <bdw@producepro.com>## Date: June 12, 2012## chkconfig: 345 50 25# description: Startup script for the Open Office conversion process# pidfile: /var/run/ooffice.pid# Source function library.. /etc/rc.d/init.d/functionsproc="/usr/lib64/libreoffice/program/soffice.bin"options="--headless \"--accept=socket,host=localhost,port=8100;urp;\" &"if [ ! -f $proc ]; then proc="/usr/lib64/openoffice.org3/program/soffice.bin" options="-headless \"-accept=socket,host=localhost,port=8100;urp;\" &"fiprog=oofficePID_FILE=/var/run/ooffice.pidLOG=/var/log/oofficeif [ ! -f $LOG ]; then touch $LOG chmod 644 $LOGfistart() { echo -n $"Starting $prog: " umask 000 daemon $prog $options RETVAL=$? echo sleep 1 pidofproc $proc > $PID_FILE echo "`date +\"%Y-%m-%d %T\"` Starting ooffice." >> $LOG return $RETVAL}stop() { echo -n $"Stopping $prog: " killproc -p $PID_FILE $prog RETVAL=$? echo echo "`date +\"%Y-%m-%d %T\"` Stopping ooffice." >> $LOG return $RETVAL}case "$1" in start) start ;; stop) stop ;; restart) stop sleep 1 start ;; status) status -p $PID_FILE $prog ;; *) echo $"Usage: $prog {start | stop | restart | status}" exit 1esacexit $RETVAL12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273742.2 执行./soffice start 完成启动。启动后,发现有两个父子进程已经启动。2.3 ./soffice stop 停止openoffice服务。结束两个进程。3. 步骤3:重新安装openoffice4.3.1 解压openoffice4安装包。tar -xzvf Apache_OpenOffice_4.1.3_Linux_x86-64_install-rpm_zh-CN.tar.gz13.2 安装所有rpm进入RPMS目录,执行rpm –ivh *.rpm(安装所有rpm文件)13.3 安装 noarch.rpmRPMS目录下有desktop-integration文件夹,进入到desktop-integration目录,里面有四个rpm文件,选择相应的安装即可,这里我选择的是redhat版本。执行rpm -ivh openoffice4.1.3-redhat-menus-4.1.3-9783.noarch.rpm1此时openOffice己经安装完成,默认会安装在/opt下3.4 启动openoffice4进入/opt/openoffice4/program下,执行如下命令启动openoffice。soffice -headless -accept="socket,host=192.168.1.177,port=8100;urp;" -nofirststartwizard &1步骤4 判定openoffice是否已经启动成功。1)查看端口:[root@z11 ~]# ps -ef |grep 8100root 2117 18586 0 14:58 pts/1 00:00:00 /usr/lib64/libreoffice/program/oosplash -headless -accept=socket,host=192.168.1.177,port=8100;urp; -nofirststartwizardroot 2134 2117 99 14:58 pts/1 00:11:49 /usr/lib64/libreoffice/program/soffice.bin -headless -accept=socket,host=192.168.1.177,port=8100;urp; -nofirststartwizardroot 3382 3300 0 15:10 pts/11 00:00:00 grep --color=auto 810012342)查看进程:[root@zsksvr11 ~]# ps -ef | grep sofficeroot 2134 2117 99 14:58 pts/1 00:12:28 /usr/lib64/libreoffice/program/soffice.bin -headless -accept=socket,host=192.168.1.177,port=8100;urp; -nofirststartwizardroot 3476 3300 0 15:11 pts/11 00:00:00 grep --color=auto soffice123至此,问题解决。执行脚本参考:https://pastebin.com/Zcnw7zGN
1、需提前做的工作1)设计好索引以及Mapping;Mapping的目的主要是——设定字段名称、字段类型,哪些字段需要进行全文检索等。12)Java程序中封装好类,和Mapping设定的字段一一对应。2、批量导入步骤分解步骤1:本地文件格式化,统一为Json格式。一个待导入的数据串,存成一个Json文件。步骤2:放置在统一./data路径下。 目录结构如下示意: ./data a_01.json a_02.json a_03.json ... a_100.json123456步骤3:循环遍历./data文件获取包含绝对路径的文件全名,存入linkedlist中。步骤4:遍历linkedlist的每个路径,获取Json信息。步骤5:使用fastjson解析Json,解析成对应设计好的类个各个匹配字段。步骤6:借助bulk**批量曹操API接口,完成本地文件的导入。3、核心接口实现/***@brief:遍历Json,批量插入ES**@param:空**@return:空*/ private static void insertBulkIndex() throws Exception {//Json文件的存储final String JSONFILEINPUT = ESConfig.es_json_path;logger.info("path = " + JSONFILEINPUT);LinkedList<String> curJsonList = FileProcess.getJsonFilePath(JSONFILEINPUT);logger.info("size = " + curJsonList.size());for (int i = 0; i < curJsonList.size(); ++i){//System.out.println(" i = " + i + " " + curJsonList.get(i));String curJsonPath = curJsonList.get(i);ImageInfo curImageInfo = JsonParse.GetImageJson(curJsonPath);//JsonParse.printImageJson(curImageInfo);if (curImageInfo == null){continue;}//遍历插入操作InsertIndex (curImageInfo);}}/***@brief:单条Json插入ES(借助了Jest封装后的API)**@param:空**@return:空*/private static void InsertIndex(AgeInfo ageInfo) throws Exception {JestClient jestClient = JestExa.getJestClient();JsonParse.PrintImageJson( ageInfo );Bulk bulk = new Bulk.Builder().defaultIndex("age_index").defaultType("age_type").addAction(Arrays.asList(new Index.Builder( ageInfo ).build())).build(); JestResult result = jestClient.execute(bulk); if (result.isSucceeded()){ System.out.println("insert success!"); }else{ System.out.println("insert failed"); }}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748最终实现效果为:java -jar bulk_insert.jar ./data/ 1即可实现./data下的所有json循环遍历导入ES。4、使用技术1)文件遍历2)Json解析3)ES批量插入操作5、遇到的坑程序导出Jar包时候,生成jar包报错。由于借助了Jest的源码工程,该工程是由Maven生成的。生成jar包的时候,会一直提示:“Java.long.ClassNotFoundException"1初步定位原因:是maven导致,然后了pom.xml,错误依旧。最终解决方案:重建工程,将代码和依赖的jar包重新导入即可。后记死磕ES,有问题欢迎大家提问探讨!
1、同步原理原有ES专栏中有详解,不再赘述。详细请参考我的专栏:深入详解Elasticsearch以下是通过ES5.4.0, logstash5.4.1 验证成功。可以确认的是2.X版本同样可以验证成功。2、核心配置文件input { stdin { } jdbc { type => "cxx_article_info" # mysql jdbc connection string to our backup databse 后面的test对应mysql中的test数据库 jdbc_connection_string => "jdbc:mysql://110.10.15.37:3306/cxxwb" # the user we wish to excute our statement as jdbc_user => "root" jdbc_password => "xxxxx" record_last_run => "true" use_column_value => "true" tracking_column => "id" last_run_metadata_path => "/opt/logstash/bin/logstash_xxy/cxx_info" clean_run => "false" # the path to our downloaded jdbc driver jdbc_driver_library => "/opt/elasticsearch/lib/mysql-connector-java-5.1.38.jar" # the name of the driver class for mysql jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "500" statement => "select * from cxx_article_info where id > :sql_last_value"#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新 schedule => "* * * * *"#设定ES索引类型 } jdbc { type => "cxx_user" # mysql jdbc connection string to our backup databse 后面的test对应mysql中的test数据库 jdbc_connection_string => "jdbc:mysql://110.10.15.37:3306/cxxwb" # the user we wish to excute our statement as jdbc_user => "root" jdbc_password => "xxxxxx" record_last_run => "true" use_column_value => "true" tracking_column => "id" last_run_metadata_path => "/opt/logstash/bin/logstash_xxy/cxx_user_info" clean_run => "false" # the path to our downloaded jdbc driver jdbc_driver_library => "/opt/elasticsearch/lib/mysql-connector-java-5.1.38.jar" # the name of the driver class for mysql jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "500" statement => "select * from cxx_user_info where id > :sql_last_value"#以下对应着要执行的sql的绝对路径。#statement_filepath => "/opt/logstash/bin/logstash_mysql2es/department.sql"#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新schedule => "* * * * *"#设定ES索引类型 }}filter {mutate { convert => [ "publish_time", "string" ] }date { timezone => "Europe/Berlin" match => ["publish_time" , "ISO8601", "yyyy-MM-dd HH:mm:ss"]}#date { # match => [ "publish_time", "yyyy-MM-dd HH:mm:ss,SSS" ] # remove_field => [ "publish_time" ] # }json { source => "message" remove_field => ["message"] }}output {if [type]=="cxxarticle_info" { elasticsearch {#ESIP地址与端口 hosts => "10.100.11.231:9200"#ES索引名称(自己定义的) index => "cxx_info_index"#自增ID编号 # document_id => "%{id}" }}if [type]=="cxx_user" { elasticsearch {#ESIP地址与端口 hosts => "10.100.11.231:9200"#ES索引名称(自己定义的) index => "cxx_user_index"#自增ID编号 # document_id => "%{id}" }}}1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031043、同步成功结果[2017-07-19T15:08:05,438][INFO ][logstash.pipeline ] Pipeline main startedThe stdin plugin is now waiting for input:[2017-07-19T15:08:05,491][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}[2017-07-19T15:09:00,721][INFO ][logstash.inputs.jdbc ] (0.007000s) SELECT count(*) AS `count` FROM (select * from cxx_article_info where id > 0) AS `t1` LIMIT 1[2017-07-19T15:09:00,721][INFO ][logstash.inputs.jdbc ] (0.008000s) SELECT count(*) AS `count` FROM (select * from cxx_user_info where id > 0) AS `t1` LIMIT 1[2017-07-19T15:09:00,730][INFO ][logstash.inputs.jdbc ] (0.004000s) SELECT * FROM (select * from cxx_user_info where id > 0) AS `t1` LIMIT 500 OFFSET 0[2017-07-19T15:09:00,731][INFO ][logstash.inputs.jdbc ] (0.007000s) SELECT * FROM (select * from cxx_article_info where id > 0) AS `t1` LIMIT 500 OFFSET 0[2017-07-19T15:10:00,173][INFO ][logstash.inputs.jdbc ] (0.002000s) SELECT count(*) AS `count` FROM (select * from cxx_article_info where id > 3) AS `t1` LIMIT 1[2017-07-19T15:10:00,174][INFO ][logstash.inputs.jdbc ] (0.003000s) SELECT count(*) AS `count` FROM (select * from cxx_user_info where id > 2) AS `t1` LIMIT 1[2017-07-19T15:11:00,225][INFO ][logstash.inputs.jdbc ] (0.001000s) SELECT count(*) AS `count` FROM (select * from cxx_article_info where id > 3) AS `t1` LIMIT 1[2017-07-19T15:11:00,225][INFO ][logstash.inputs.jdbc ] (0.002000s) SELECT count(*) AS `count` FROM (select * from cxx_user_info where id > 2) AS `t1` LIMIT 1123456789101112134、扩展1)多个表无非就是在input里面多加几个类型,在output中多加基础类型判定。举例:if [type]=="cxx_user" 12)input里的type和output if判定的type**保持一致**,该type对应ES中的type。后记死磕ES,有问题欢迎大家提问探讨!
1.步骤详解步骤一:设定my.cnf配置文件。innodb_file_per_table=1验证开关已经打开。mysql> SHOW VARIABLES LIKE 'innodb_file_per_table';+-----------------------+-------+| Variable_name | Value |+-----------------------+-------+| innodb_file_per_table | ON |+-----------------------+-------+1 row in set (0.00 sec)1234567步骤二:创建指定路径存储的分区表。CREATE TABLE orders_list2 ( id INT AUTO_INCREMENT, customer_surname VARCHAR(30), store_id INT, salesperson_id INT, order_date DATE, note VARCHAR(500), INDEX idx (id)) ENGINE = INNODB PARTITION BY LIST(store_id) ( PARTITION p1 VALUES IN (1, 3, 4, 17) INDEX DIRECTORY = '/var/orders/district1' DATA DIRECTORY = '/var/orders/district1', PARTITION p2 VALUES IN (2, 12, 14) INDEX DIRECTORY = '/var/orders/district2' DATA DIRECTORY = '/var/orders/district2', PARTITION p3 VALUES IN (6, 8, 20) INDEX DIRECTORY = '/var/orders/district3' DATA DIRECTORY = '/var/orders/district3', PARTITION p4 VALUES IN (5, 7, 9, 11, 16) INDEX DIRECTORY = '/var/orders/district4' DATA DIRECTORY = '/var/orders/district4', PARTITION p5 VALUES IN (10, 13, 15, 18) INDEX DIRECTORY = '/var/orders/district5' DATA DIRECTORY = '/var/orders/district5');12345678910111213141516171819202122232425262728293031插入记录:insert into orders_list2(id, customer_surname, store_id, salesperson_id, order_date, note)values(1, "yang", 1, 1, CURDATE(), "testing");insert into orders_list2(id, customer_surname, store_id, salesperson_id, order_date, note)values(2, "yang", 2, 2, CURDATE(), "testing");insert into orders_list2(id, customer_surname, store_id, salesperson_id, order_date, note)values(6, "yang", 6, 6, CURDATE(), "testing");insert into orders_list2(id, customer_surname, store_id, salesperson_id, order_date, note)values(8, "yang", 8, 8, CURDATE(), "testing");insert into orders_list2(id, customer_surname, store_id, salesperson_id, order_date, note)values(5, "yang", 5, 5, CURDATE(), "testing");insert into orders_list2(id, customer_surname, store_id, salesperson_id, order_date, note)values(10, "yang", 10, 10, CURDATE(), "testing");123456789101112步骤三:到指定新路径下验证。[root@f033b3fe25e2 orders]# tree.├── district1│ └── test│ └── orders_list2#P#p1.ibd├── district2│ └── test│ └── orders_list2#P#p2.ibd├── district3│ └── test│ └── orders_list2#P#p3.ibd├── district4│ └── test│ └── orders_list2#P#p4.ibd└── district5 └── test └── orders_list2#P#p5.ibd123456789101112131415161718192010 directories, 5 files[root@f033b3fe25e2 orders]# pwd/var/orders123查询验证:mysql> explain partitions select * from orders_list2;+----+-------------+--------------+----------------+------+---------------+------+---------+------+------+-------+| id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | Extra |+----+-------------+--------------+----------------+------+---------------+------+---------+------+------+-------+| 1 | SIMPLE | orders_list2 | p1,p2,p3,p4,p5 | ALL | NULL | NULL | NULL | NULL | 11 | NULL |+----+-------------+--------------+----------------+------+---------------+------+---------+------+------+-------+1 row in set (0.00 sec)1234567步骤四:新增分区处理。场景假设:比如最新的数据,我们想存储到SSD硬盘上。可以通过增加指定路径的分区文件达到目的。ALTER TABLE orders_list2 ADD PARTITION (PARTITION p6 VALUES IN (21,22,23)DATA DIRECTORY = '/var/ssd_testing' INDEX DIRECTORY = '/var/ssd_testing');[root@f033b3fe25e2 var]# tree ssd_testing/ssd_testing/└── test └── orders_list2#P#p6.ibd1234567六个分区结果:mysql> explain partitions select * from orders_list2;+----+-------------+--------------+-------------------+------+---------------+------+---------+------+------+-------+| id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | Extra |+----+-------------+--------------+-------------------+------+---------------+------+---------+------+------+-------+| 1 | SIMPLE | orders_list2 | p1,p2,p3,p4,p5,p6 | ALL | NULL | NULL | NULL | NULL | 12 | NULL |+----+-------------+--------------+-------------------+------+---------------+------+---------+------+------+-------+1 row in set (0.00 sec)1234567参考:https://dev.mysql.com/doc/refman/5.6/en/tablespace-placing.html
点此链接题记elasticsearch性能测试研究了很久,自己想过通过批量导入数据,然后记录时间,统计CPU、内存等变化,计算得出某个性能指标。但显然,数据量起伏不定,非常不准确。研究发现,github上提供了rally作为elasticsearch的性能测试工具,较好的解决了es性能测试问题。1、esrally功能:es的性能测试工具。esrally不支持windows版本,目前只支持Linux和Mac OS。esrally的运行基于以下两点假设:1)所有运行在同一台机器完成。(未来改假设条件可能会去掉)2)你需要向Elasticsearch索引中添加特定的数据集,然后在其上进行基准查询(benchmarking queries).2、esrally安装:前提:验证是否安装了以下内容(注意版本)Python 3.4+ available as python3 on the path (verify with: python3 --version which should print Python 3.4.0 or higher)pip3 available on the path (verify with pip3 --version)JDK 8git如下所示:[root@laoyang git-2.4.0]# pythonPython 3.5.1 (default, Aug 2 2016, 09:53:20)[GCC 4.4.7 20120313 (Red Hat 4.4.7-17)] on linuxType "help", "copyright", "credits" or "license" for more information.[root@laoyang git-2.4.0]# git --versiongit version 2.4.0[root@laoyang git-2.4.0]# java -versionjava version "1.8.0_91"Java(TM) SE Runtime Environment (build 1.8.0_91-b14)Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)[root@laoyang git-2.4.0]# find / -name "pip3"/usr/bin/pip3/usr/local/python3/bin/pip312345678910111213141516步骤1:安装 Rallypip3 install esrally步骤2:配置 Rally调用: esrally configure 会提示让你输入 java8.home(也就是JAVA_HOME设定的环境变量路径)。仅java8.home使用自己本机的JAVA_HOME环境变量路径,其他采用默认值即可。[root@laoyang .rally]# cat /root/.rally/rally.ini[meta]config.version = 5[system]root.dir = /root/.rally/benchmarkslog.root.dir = logsenv.name = local[source]local.src.dir=/root/.rally/src #注意,此路径并非elaticsearch安装路径。remote.repo.url=https://github.com/elastic/elasticsearch.git[provisioning]local.install.dir = install[runtime]java8.home = /opt/jdk1.8.0_91 #注意,JAVA_HOME 路径为:/opt/jdk1.8.0_91[benchmarks]local.dataset.cache = ${system:root.dir}/data[reporting]datastore.type = elaticsearchdatastore.host = 10.0.1.30datastore.port = 9200datastore.secure = Falsedatastore.user =datastore.password =[tracks]default.url = https://github.com/elastic/rally-tracks1234567891011121314151617181920212223242526272829303132步骤3:运行esrally由于elasticsearch的运行必须非root账户。esrally建议用非root账户执行。3、esrally执行常见问题及解决方案问题1:版本用哪个版本?目前用最新版本 Rally 0.3.2(2016-8-7发行)。问题2:注意gradle需要2.13版本,高版本会不兼容。[root@laoyang caches]# ll /usr/bin/gradlelrwxrwxrwx 1 root root 27 Aug 5 17:13 /usr/bin/gradle -> /opt/gradle-2.13/bin/gradle问题3:安装时候提示 documents.json.bz2 无法下载成功。1)手动下载: http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/geonames/documents.json.bz22)移动到 /root/.rally/benchmarks/data/geonames/(对于root用户)对于非root用户,elaticsearch用户,移动路径为:/home/elasticsearch/benchmarks/data/geonames/问题4: esrally单独运行和加参数运行区别?esrally单独运行:非常方便的指令,等价于 :esrally --pipeline=from-sources --version=current. Rally 将要通过Gradle从源码编译elasticsearch。esrally --pipline=from-distribution --distribution-version=2.3.4, Rally将要从 https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.4/elasticsearch-2.3.4.tar.gz下载elaticsearch2.3.4版本运行。问题5: Could not resolve all dependencies for configuration ‘:benchmarks:classpath’.37 > Could not download shadow.jar (com.github.jengelman.gradle.plugins:shadow:1.2.3)如何解决?root账户下:cd /home/elasticsearch/.rally/benchmarks/srcgradle :distribution:tar:assemble正确提示信息为:deprecated API.Note: Recompile with -Xlint:deprecation for details.:modules:transport-netty4:processResources UP-TO-DATE:modules:transport-netty4:classes:modules:transport-netty4:jar:modules:transport-netty4:copyPluginPropertiesTemplate:modules:transport-netty4:pluginProperties:modules:transport-netty4:bundlePlugin:distribution:buildModules:distribution:tar:buildTar:distribution:tar:generatePomFileForNebulaPublication:distribution:tar:signArchives SKIPPED:distribution:tar:assembleBUILD SUCCESSFUL1234567891011121314154、esrally最终运行结果:Preparing for race (might take a few moments) ... Building from sources .../home/elasticsearch/.rally/benchmarks/races/2016-08-08-07-29-59/local/logs/rally_out.logRacing on track [geonames] and challenge [append-no-conflicts] with car [defaults] Benchmarking indexing at 12183.1 docs/s [100% done] Benchmarking stats (warmup iteration 100/100) [100% done] Benchmarking stats (iteration 100/100) [100% done] Benchmarking search (warmup iteration 100/100) [100% done] Benchmarking search (iteration 100/100) [100% done]------------------------------------------------------ _______ __ _____ / ____(_)___ ____ _/ / / ___/_________ ________ / /_ / / __ \/ __ `/ / \__ \/ ___/ __ \/ ___/ _ \ / __/ / / / / / /_/ / / ___/ / /__/ /_/ / / / __//_/ /_/_/ /_/\__,_/_/ /____/\___/\____/_/ \___/------------------------------------------------------| Metric | Value ||----------------------------------------------------------:|----------:|| Min Indexing Throughput [docs/s] | 11949 || Median Indexing Throughput [docs/s] | 12307 || Max Indexing Throughput [docs/s] | 13349 || Indexing time [min] | 164.045 || Merge time [min] | 32.3815 || Refresh time [min] | 8.82333 || Flush time [min] | 1.63852 || Merge throttle time [min] | 1.45482 || Query latency default (90.0 percentile) [ms] | 68.8676 || Query latency default (99.0 percentile) [ms] | 77.6009 || Query latency default (100 percentile) [ms] | 78.8328 || Query latency term (90.0 percentile) [ms] | 4.63227 || Query latency term (99.0 percentile) [ms] | 10.9506 || Query latency term (100 percentile) [ms] | 13.0573 || Query latency phrase (90.0 percentile) [ms] | 6.35018 || Query latency phrase (99.0 percentile) [ms] | 13.1745 || Query latency phrase (100 percentile) [ms] | 14.2399 || Query latency country_agg_uncached (90.0 percentile) [ms] | 361.913 || Query latency country_agg_uncached (99.0 percentile) [ms] | 384.75 || Query latency country_agg_uncached (100 percentile) [ms] | 392.645 || Query latency country_agg_cached (90.0 percentile) [ms] | 7.71451 || Query latency country_agg_cached (99.0 percentile) [ms] | 17.4704 || Query latency country_agg_cached (100 percentile) [ms] | 18.4851 || Query latency scroll (90.0 percentile) [ms] | 50.4234 || Query latency scroll (99.0 percentile) [ms] | 50.9866 || Query latency scroll (100 percentile) [ms] | 51.2115 || Query latency expression (90.0 percentile) [ms] | 482.02 || Query latency expression (99.0 percentile) [ms] | 485.951 || Query latency expression (100 percentile) [ms] | 491.999 || Query latency painless_static (90.0 percentile) [ms] | 890.382 || Query latency painless_static (99.0 percentile) [ms] | 918.354 || Query latency painless_static (100 percentile) [ms] | 927.282 || Query latency painless_dynamic (90.0 percentile) [ms] | 968.908 || Query latency painless_dynamic (99.0 percentile) [ms] | 1019.14 || Query latency painless_dynamic (100 percentile) [ms] | 1021.97 || Median CPU usage (index) [%] | 887.7 || Median CPU usage (stats) [%] | 94.9 || Median CPU usage (search) [%] | 445.05 || Total Young Gen GC [s] | 89.121 || Total Old Gen GC [s] | 12.274 || Index size [GB] | 3.30111 || Totally written [GB] | 20.2123 || Heap used for segments [MB] | 21.6794 || Heap used for doc values [MB] | 0.113129 || Heap used for terms [MB] | 20.0574 || Heap used for norms [MB] | 0.0761719 || Heap used for points [MB] | 0.770404 || Heap used for stored fields [MB] | 0.6623 || Segment count | 96 || Indices Stats(90.0 percentile) [ms] | 11.5116 || Indices Stats(99.0 percentile) [ms] | 14.1724 || Indices Stats(100 percentile) [ms] | 36.8348 || Nodes Stats(90.0 percentile) [ms] | 11.342 || Nodes Stats(99.0 percentile) [ms] | 13.435 || Nodes Stats(100 percentile) [ms] | 16.4768 |123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172Logs for this race are archived in /home/elasticsearch/.rally/benchmarks/races/2016-08-08-07-29-59/local/logs-geonames-append-no-conflicts-defaults.zip15、问题大讨论https://discuss.elastic.co/t/the-below-bugs-appers-when-running-esrally/57063/25源码 作者danielmitterdorfer一一回复, 最终作者回复:I am very happy too and I am glad you had so much persistence. You also uncovered a few usability problems that I need to tackle.帮作者提了一些建议,促使他升级了一个版本0.3.2。参考:https://github.com/elastic/rally小结前面的确费劲周折,花费了接近3整天实践,期间甚至告诉原作者工具太难用,给作者反复沟通,最终问题解决。看到输出结果的那一刻,整个人很兴奋。坚持到底,相信付出的力量!不断思考、反思中前行!
1. 目前业界有以下几个插件实现ES与Mysql的同步操作。|序号 | 插件名称 |地址 |——:———————— :————————————————-| 1 | elasticsearch-jdbc | https://github.com/jprante/elasticsearch-jdbc |——:———————— :————————————————-| 2 | elasticsearch-river-MySQL | https://github.com/scharron/elasticsearch-|river-mysql |——:———————— :————————————————-| 3 | go-mysql-elasticsearch(国内作者siddontang) https://github.com/siddontang/go-mysql-|elasticsearch |——:———————— :————————————————-| 4 | logstash-input-jdbc https://github.com/logstash-plugins/logstash-|input-jdbc |——:———————— :————————————————-严格意义上elasticsearch-jdbc已经不是第三方插件,已经成为独立的第三方工具。2、插件优缺点对比
1、Elasticsearch安装# cd /usr/local下载:wgethttps://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/zip/elasticsearch/2.3.2/elasticsearch-2.3.2.zip# unzip elasticsearch-2.3.2.zip# mkdir -p /data/logs/elasticsearch# mkdir -p /data/elasticsearch/{data,work,plugins,scripts}# useradd elasticsearch -s /bin/bash# chown -R elasticsearch:elasticsearch /data/logs/elasticsearch /data/elasticsearch2、Elasticsearch配置1)主配置(./config/elasticsearch.yml)qc-hermes-es-1:cluster.name: qc-hermes-search-clusternode.name: qc-hermes-es-1 #同hostname cat /etc/hostnamepath.data: /data/elasticsearch/datapath.logs: /data/logs/elasticsearchpath.plugins: /data/elasticsearch/pluginspath.scripts: /data/elasticsearch/scriptsbootstrap.mlockall: truenetwork.host: 192.168.0.10 #本机IP地址http.port: 9200discovery.zen.ping.unicast.hosts: ["qc-hermes-es-1:9200", "qc-hermes-es-2:9200"]node.master: truenode.data: trueindex.number_of_shards: 8index.number_of_replicas: 11234567891011121314152)服务启动# cd /opt/elasticsearch# su elasticsearch (注意:root账号会报错)$ bin/elasticsearch -d说明:2.x的版本需要以普通用户权限启动,否则会报错,无法正常启动。3)验证启动成功:
操作系统面试之一——程序、进程、线程题注:《面试宝典》操作系统部分错误、漏洞较多,笔者对此参考相关书籍和自己观点进行了重写,供大家参考。一、程序、进程、线程1.程序和进程.进程由两个部分组成:1)操作系统用来管理进程的内核对象。内核对象也是系统用来存放关于进程的统计信息的地方。2)地址空间。它包含所有可执行模块或DLL模块的代码和数据。它还包含动态内存分配的空间。如线程堆栈和堆分配空间。定义使用系统运行资源情况程序计算机指令的集合,它以文件的形式存储在磁盘上。程序是静态实体(passive Entity),在多道程序系统中,它是不能独立运行的,更不能与其他程序并发执行。不使用【程序不能申请系统资源,不能被系统调度,也不能作为独立运行的单位,因此,它不占用系统的运行资源】。进程通常被定义为一个正在运行的程序的实例,是一个程序在其自身的地址空间中的一次执行活动。定义:进程是进程实体(包括:程序段、相关的数据段、进程控制块PCB)的运行过程,是系统进行资源分配和调度的一个独立单位。使用【进程是资源申请、调度和独立运行的单位,因此,它使用系统中的运行资源。】2. 进程与线程如果说操作系统引入进程的目的是为了提高程序并发执行,以提高资源利用率和系统吞吐量。那么操作系统中引入线程的目的,则是为了减少进程并发执行过程中所付出的时空开销,使操作系统能很好的并发执行。 进程process定义了一个执行环境,包括它自己私有的地址空间、一个句柄表,以及一个安全环境;线程则是一个控制流,有他自己的调用栈call stack,记录了它的执行历史。线程由两个部分组成:1)线程的内核对象,操作系统用它来对线程实施管理。内核对象也是系统用来存放线程统计信息的地方。2)线程堆栈,它用于维护线程在执行代码时需要的所有参数和局部变量。当创建线程时,系统创建一个线程内核对象。该线程内核对象不是线程本身,而是操作系统用来管理线程的较小的数据结构。可以将线程内核对象视为由关于线程的统计信息组成的一个小型数据结构。 进程与线程的比较如下:比较进程线程活泼性不活泼(只是线程的容器)活泼地址空间系统赋予的独立的虚拟地址空间(对于32位进程来说,这个地址空间是4GB)在进程的地址空间执行代码。线程只有一个内核对象和一个堆栈,保留的记录很少,因此所需要的内存也很少。因为线程需要的开销比进程少调度仅是资源分配的基本单位独立调度、分派的基本单位并发性仅进程间并发(传统OS)进程间、线程间并发拥有资源资源拥有的基本单位基本上不拥有资源系统开销创建、撤销、切换开销大仅保存少量寄存器内容,开销小。 3. 进程同步进程同步的主要任务:是对多个相关进程在执行次序上进行协调,以使并发执行的诸进程之间能有效地共享资源和相互合作,从而使程序的执行具有可再现性。 同步机制遵循的原则:(1) 空闲让进;(2) 忙则等待(保证对临界区的互斥访问);(3) 有限等待(有限代表有限的时间,避免死等);(4) 让权等待,(当进程不能进入自己的临界区时,应该释放处理机,以免陷入忙等状态)。由于进程同步产生了一系列经典的同步问题“生产者-消费者”问题,“哲学家进餐”问题,“读者-写者”问题。 4. 进程间的通信是如何实现的? 进程通信,是指进程之间的信息交换(信息量少则一个状态或数值,多者则是成千上万个字节)。因此,对于用信号量进行的进程间的互斥和同步,由于其所交换的信息量少而被归结为低级通信。所谓高级进程通信指:用户可以利用操作系统所提供的一组通信命令传送大量数据的一种通信方式。操作系统隐藏了进程通信的实现细节。或者说,通信过程对用户是透明的。高级通信机制可归结为三大类:(1) 共享存储器系统(存储器中划分的共享存储区);实际操作中对应的是“剪贴板”(剪贴板实际上是系统维护管理的一块内存区域)的通信方式,比如举例如下:word进程按下ctrl+c,在ppt进程按下ctrl+v,即完成了word进程和ppt进程之间的通信,复制时将数据放入到剪贴板,粘贴时从剪贴板中取出数据,然后显示在ppt窗口上。(2) 消息传递系统(进程间的数据交换以消息(message)为单位,当今最流行的微内核操作系统中,微内核与服务器之间的通信,无一例外地都采用了消息传递机制。应用举例:邮槽(MailSlot)是基于广播通信体系设计出来的,它采用无连接的不可靠的数据传输。邮槽是一种单向通信机制,创建邮槽的服务器进程读取数据,打开邮槽的客户机进程写入数据。(3) 管道通信系统(管道即:连接读写进程以实现他们之间通信的共享文件(pipe文件,类似先进先出的队列,由一个进程写,另一进程读))。实际操作中,管道分为:匿名管道、命名管道。匿名管道是一个未命名的、单向管道,通过父进程和一个子进程之间传输数据。匿名管道只能实现本地机器上两个进程之间的通信,而不能实现跨网络的通信。命名管道不仅可以在本机上实现两个进程间的通信,还可以跨网络实现两个进程间的通信。同一机器两个进程间通信跨网络通信剪贴板Clipboard可以不可以匿名管道Pipe可以不可以命名管道(点对点单一通信,数据量可较大)Namedpipe可以可以邮槽(一对多,数据量较小,424字节以下)Mailslot可以可以在Win32下提供的进程间通信方式有以下几种:1)剪贴板Clipboard:在16位时代常使用的方式,CWnd类中提供了支持。2)COM/DCOM:通过COM系统的代理存根方式进行进程间数据交换,但只能够表现在对接口函数的调用时传送数据,通过DCOM可以在不同主机间传送数据。3)Dynamic Data Exchange (DDE):在16位时代常使用的方式。4)File Mapping:文件映射,在32位系统中提供的新方法,可用来共享内存。5)Mailslots:邮件槽,在32位系统中提供的新方法,可在不同主机间交换数据,分为服务器方和客户方,双方可以通过其进行数据交换,在Win9X下只支持邮件槽客户。6)Pipes:管道,分为无名管道:在父子进程间交换数据;有名管道:可在不同主机间交换数据,分为服务器方和客户方,在Win9X下只支持有名管道客户。7)RPC:远程过程调用,很少使用,原因有两个:复杂而且与UNIX系统的RPC并不完全兼容。但COM/DCOM的调用是建立在RPC的基础上的。8)Windows Sockets:网络套接口,可在不同主机间交换数据,分为服务器方和客户方。9)WM_COPYDATA:通过发送WM_COPYDATA消息并将数据放在参数中来传递数据给其他进程。5. 线程同步 根据用户模式及内核模式下的同步方式的不同,分类及对比如下: 内核对象/非内核对象含义缺点适用关键代码段(临界区)CriticalSection非内核对象,工作在用户方式下,为用户模式对象从程序代码的角度来控制线程的并发性1.因为在等待进入关键代码段时无法设定超时值,所以其很容易进入死锁状态。2.不能跨进程使用。单个进程中线程间的同步(同步速度快)事件对象Event内核对象所有内核对象中最基本的。速度较慢(相比用户模式实现线程同步)多个进程间的各个线程间实现同步互斥对象Mutex内核对象代表对一个资源的独占式访问信号量Semaphore内核对象使用计数器来控制程序对一个共享资源的访问
2021年11月