之前文章有讲到借助:fingerprint filter 插件实现去重。
近期又有社群小伙伴问到同样的问题,这里一并梳理回复如下。
1、python 脚本实现文档去重
这里讲的实现,借助 python 脚本实现。
- 前置条件:
由于涉及 8.X 版本 Elasticsearch 以安全方式的连接,这里需要 python 升级到 3.10+ 版本才可以。
1.1 实现前提
标定文档重复标记——一般文档中几个字段或者全部字段重复,才认为文档是一样的。
业务层面自己指定就可用 md5
值实现。
对于新闻类类线上业务的文档举例:
- 网易新闻
https://3g.163.com/news/article/H5APDMGH00019UD6.html
- 新浪新闻
https://news.sina.com.cn/sx/2022-04-19/detail-imcwiwst2726757.shtml
如果拿文章标题(title) + 正文内容(content)内容组合取 md5,然后对比的话,两者发布网站不同,但内容可以认为是重复的。
1.2 实现原理
- Step 1:scan遍历全部文档,生成文档 md5。
- Step2:生成字典,字典两部分组成,md5 值是 key,value 是一个数组,里面存的是文档id。
- Step3:遍历字典的value部分大于1的值,就代表存在重复文档。
- Step4:删除重复文档。
2、实现代码
#!/usr/local/bin/python3 from elasticsearch import Elasticsearch, helpers import hashlib import ssl # 全局变量 ES_HOST = 'https://192.168.1.10:9200' ES_USER = 'elastic' ES_PASSWORD = '9Z=T2wOWIXXXXXXXX' CERT_FINGERPRINT = "a4d0fe1eb7e1fd9874XXXXXXXXXX" # 全局词典 dict_of_duplicate_docs = {} # https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/config.html # 要求python 版本3.10及以上 # fingerprint 生成方式,方式一:Elasticsearch 首次启动的时候自动生成。 # 方式二:借助命令行再次生成(结果同方式一) # bash-4.2$ openssl x509 -fingerprint -sha256 -in config/certs/http_ca.crt # SHA256 Fingerprint=A4:D0:FE:1E:B7:E1:FD:98:74:A4:10:6F:E1:XXXXXXXX def es_init(): es = Elasticsearch( ES_HOST, ssl_assert_fingerprint=CERT_FINGERPRINT, basic_auth=(ES_USER, ES_PASSWORD), verify_certs=False ) return es # 对每个文档生成唯一id(自定义生成) def populate_dict_of_duplicate_docs(hit): combined_key = "" # 三个字段决定索引是否重复,自动是根据业务指定的 keys_to_include_in_hash = ["CAC", "FTSE", "SMI"] for mykey in keys_to_include_in_hash: combined_key += str(hit['_source'][mykey]) _id = hit["_id"] # 基于三个字段的组合,生成md5值,便于后续去重用。 hashval = hashlib.md5(combined_key.encode('utf-8')).digest() # 生成键值对词典,key使用md5值,value 为数组类型。 # 相同的 key 值也就是相同的文档,value 是文档id列表 dict_of_duplicate_docs.setdefault(hashval, []).append(_id) print(dict_of_duplicate_docs) # 待去重索引遍历 def scroll_over_all_docs(): es = es_init() for hit in helpers.scan(es, index='stocks'): populate_dict_of_duplicate_docs(hit) # 去重处理函数 def loop_over_hashes_and_remove_duplicates(): es = es_init() for hashval, array_of_ids in dict_of_duplicate_docs.items(): # 对id列表长度大于1的做去重处理操作 if len(array_of_ids) > 1: print(" Duplicate docs hash=%s ****" % hashval) # 获取相同的文档 matching_docs = es.mget(index="stocks", ids= array_of_ids[0:len(array_of_ids)-1]) for doc in matching_docs['docs']: print("doc=%s\n" % doc) es.delete(index="stocks", id = doc['_id']) def main(): scroll_over_all_docs() loop_over_hashes_and_remove_duplicates() main()
代码的核心:
使用了 8.X 版本的 Elasticsearch 访问方式。借助:fingerprint 访问实现。
fingerprint 两种获取方式:
方式一:Elasticsearch 启动的时候已经包含。
方式二:可以借助命令行再生成。
openssl x509 -fingerprint -sha256 -in config/certs/http_ca.crt
3、小结
文章给出 8.X 版本实现文档去重的完整思路和Python 代码实现,加上之前讲解的 logstash fingerprint filter 插件实现去重实现,共2种方案解决文档重复问题。
你的项目实战环节有没有遇到文档去重问题、删除重复文档问题?如何解决的?欢迎留言交流。
参考
https://github.com/deric/es-dedupe/blob/master/esdedupe/esdedupe.py
https://github.com/alexander-marquardt/deduplicate-elasticsearch
https://alexmarquardt.com/2018/07/23/deduplicating-documents-in-elasticsearch/
推荐阅读
更短时间更快习得更多干货!
和全球 1700+ Elastic 爱好者一起精进!
比同事抢先一步学习进阶干货!