索引命令
创建spu索引
PUT /spu_dev { "settings": { "index": { "analysis": { "analyzer": { "my_ik": { "type": "custom", "tokenizer": "ik_smart", "filter": [ "my_synonym" ] } }, "filter": { "my_synonym": { "type": "synonym", "synonyms_path": "analysis/litong_synonym.txt" } } }, "number_of_shards":3, "number_of_replicas":2 } }, "mappings": { "properties": { "productcode": { "type": "keyword" }, "productname": { "type": "text", "analyzer": "ik_max_word" }, "cataname": { "type": "text", "analyzer": "ik_max_word" }, "cataid": { "type": "integer" }, "seriesid": { "type": "integer" }, "seriesname":{ "type": "text", "analyzer": "ik_max_word" }, "multiunitrule": { "type": "keyword" }, "brandid": { "type": "integer" }, "brandname": { "type": "text", "analyzer": "ik_max_word" }, "remarks": { "type": "text" }, "state": { "type": "keyword" }, "oprname":{ "type": "keyword" }, "pic1":{ "type": "text" }, "updatetime" : { "type" : "date" }, "createtime" : { "type" : "date" } } } }
创建sku索引
PUT /sku_test { "settings": { "index": { "analysis": { "analyzer": { "my_ik": { "type": "custom", "tokenizer": "ik_max_word", "filter": [ "my_synonym" ] } }, "filter": { "my_synonym": { "type": "synonym", "synonyms_path": "analysis/litong_synonym.txt" } } }, "number_of_shards":3, "number_of_replicas":2 } }, "mappings": { "properties": { "productcode": { "type": "keyword" }, "productname": { "type": "text", "analyzer": "ik_max_word" }, "speccode": { "type": "keyword" }, "specname": { "type": "text", "analyzer": "ik_max_word" }, "seriesid": { "type": "integer" }, "seriesname":{ "type": "text", "analyzer": "ik_max_word" }, "multiunitrule": { "type": "text" }, "unitname": { "type": "text" }, "brandid": { "type": "integer" }, "brandname": { "type": "text", "analyzer": "ik_max_word" }, "remarks": { "type": "text" }, "state": { "type": "keyword" }, "pic1":{ "type": "text" }, "updatetime" : { "type" : "date" }, "createtime" : { "type" : "date" }, "unifiedsaleprice" : { "type": "double" }, "salenum" : { "type": "double" }, "areaid" : { "type" : "integer" }, "areaname" : { "type": "text" }, "priceid" : { "type": "integer" }, "cataname": { "type": "text", "analyzer": "ik_max_word" }, "cataid": { "type": "integer" }, "categoryid": { "type": "integer" }, "servicepricecount": { "type": "integer" }, "activitycount": { "type": "integer" } } } }
删除索引
DELETE spu_dev
常用查询命令
统计文档数量
GET /spu_dev/_count
查询文档列表
GET /spu_dev/_search { "query": { "match_all": {} } }
分词bool查询
GET /spu_dev/_search { "query": { "bool": { "must": [ { "match_phrase": { "productname": { "query": "大", "analyzer": "ik_smart" } } } ] } } }
sku搜索用例
GET /sku_test/_search { "size": 20, "query": { "bool": { "must": [{ "bool": { "should": [{ "match": { "productname": { "query": "20红三通", "operator": "OR", "analyzer": "my_ik", "prefix_length": 0, "max_expansions": 50, "fuzzy_transpositions": true, "lenient": false, "zero_terms_query": "NONE", "auto_generate_synonyms_phrase_query": true, "boost": 2.0 } } }, { "match": { "specname": { "query": "20红三通", "operator": "OR", "analyzer": "my_ik", "prefix_length": 0, "max_expansions": 50, "fuzzy_transpositions": true, "lenient": false, "zero_terms_query": "NONE", "auto_generate_synonyms_phrase_query": true, "boost": 1.0 } } }, { "match": { "cataname": { "query": "20红三通", "operator": "OR", "analyzer": "my_ik", "prefix_length": 0, "max_expansions": 50, "fuzzy_transpositions": true, "lenient": false, "zero_terms_query": "NONE", "auto_generate_synonyms_phrase_query": true, "boost": 2.0 } } }], "adjust_pure_negative": true, "boost": 1.0 } }, { "terms": { "areaid": [1, 220000, 220200, 220206], "boost": 1.0 } }], "adjust_pure_negative": true, "boost": 1.0 } }, "sort": [{ "_score": { "order": "desc" } }, { "salenum": { "order": "desc" } }], "aggregations": { "count": { "cardinality": { "field": "priceid" } } } }
GET /sku_test/_search { "size": 20, "query": { "bool": { "must": [{ "bool": { "should": [{ "match": { "productname": { "query": "PE给水管", "operator": "OR", "analyzer": "my_ik", "prefix_length": 0, "max_expansions": 50, "fuzzy_transpositions": true, "lenient": false, "zero_terms_query": "NONE", "auto_generate_synonyms_phrase_query": true, "boost": 2.0 } } }, { "match": { "specname": { "query": "PE给水管", "operator": "OR", "analyzer": "my_ik", "prefix_length": 0, "max_expansions": 50, "fuzzy_transpositions": true, "lenient": false, "zero_terms_query": "NONE", "auto_generate_synonyms_phrase_query": true, "boost": 1.0 } } }, { "match": { "cataname": { "query": "PE给水管", "operator": "OR", "analyzer": "my_ik", "prefix_length": 0, "max_expansions": 50, "fuzzy_transpositions": true, "lenient": false, "zero_terms_query": "NONE", "auto_generate_synonyms_phrase_query": true, "boost": 0.1 } } }], "adjust_pure_negative": true, "boost": 1.0 } }], "adjust_pure_negative": true, "boost": 1.0 } }, "sort": [{ "_score": { "order": "desc" } }, { "salenum": { "order": "desc" } }], "aggregations": { "count": { "cardinality": { "field": "priceid" } } } }
删除索引中的全部数据
POST spu_dev/_delete_by_query { "query": { "match_all": {} } }
分词
ik分词器
插件列表—analysis-ik—更新词典
同义词
高级配置—同义词配置—更新词典
重建索引
POST _reindex { "source": { "index": "source_index" }, "dest": { "index": "destination_index" } }
脚本
分词
import jieba # 读取txt文件 with open("spec.txt", "r", encoding="utf-8") as f: text = f.read() # 将文本内容分成一组单独的字符串 words_list = text.split("\n") # 分词 results = [] for word in words_list: seg_list = jieba.cut_for_search(word) results.append("\n".join(seg_list)) # 将分好的词语重新组合成一个字符串 new_text = "\n".join(results) # 将分好的词语存储在一个新的文件中 with open("spec_dict.txt", "w", encoding="utf-8") as f: f.write(new_text)
去重
filename = "spec_dict.txt" new_filename = "spec_dict_distinct.txt" unique_words = set() with open(filename, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: unique_words.add(line) with open(new_filename, "w", encoding="utf-8") as f: for word in unique_words: f.write(word + "\n")
注意事项
- 分词、同义词修改后必须重建索引
- es索引的字段必须全小写
logstash同步
spu管道-全量
input { jdbc { jdbc_connection_string => "jdbc:mysql://allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowLoadLocalInfile=false" jdbc_user => "" jdbc_password => " jdbc_driver_library => "/usr/local/service/logstash/extended-files/mysql-connector-java-5.1.40.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "5000" statement => "SELECT DISTINCT ep.productCode productcode, ep.productName productname, ep.seriesId seriesid, ep.pic1 pic1, ebs.seriesName seriesname, fcata.cataName cataname, ep.cataId cataid, ep.multiUnitRule multiunitrule, ep.brandId brandid, eb.brandName brandname, ep.remarks remarks, ep.state state, sa.nickName oprname, ep.createTime createtime, ep.updateTime updatetime FROM t_erp_product ep LEFT JOIN ( SELECT ecr.subCataId 'cataId', ec.cataName 'subCataName', GROUP_CONCAT( fec.cataName ORDER BY ecr.subLevel DESC SEPARATOR ' > ' ) 'cataName' FROM t_erp_cata_relation ecr LEFT JOIN t_erp_cata ec ON ecr.subCataId = ec.cataId LEFT JOIN t_erp_cata fec ON ecr.cataId = fec.cataId GROUP BY ecr.subCataId ) fcata ON ep.cataId = fcata.cataId LEFT JOIN t_erp_brand eb ON ep.brandId = eb.brandId LEFT JOIN t_erp_brandseries ebs ON ep.seriesId = ebs.seriesId LEFT JOIN t_sys_account sa ON sa.accId = ep.oprId LEFT JOIN t_erp_productspec eps ON eps.productCode = ep.productCode WHERE ep.state != 'D' and ep.updateTime > :sql_last_value" use_column_value => true tracking_column => "updatetime" tracking_column_type => "timestamp" schedule => "* * * * *" last_run_metadata_path => "/usr/local/service/logstash/temp/gd_spu_dev_sql_last_value.yml" type => "jdbc" } } output { elasticsearch { hosts => "" user => "" index => "" password => "" document_id => "%{productcode}" } }
spu管道-增量
input { jdbc { jdbc_connection_string => "jdbc:mysql:///lt_uat_db?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowLoadLocalInfile=false" jdbc_user => " jdbc_password => "" jdbc_driver_library => "/usr/local/service/logstash/extended-files/mysql-connector-java-5.1.40.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "5000" statement => "SELECT DISTINCT ep.productCode productcode, ep.productName productname, ep.seriesId seriesid, ep.pic1 pic1, ebs.seriesName seriesname, fcata.cataName cataname, ep.cataId cataid, ep.multiUnitRule multiunitrule, ep.brandId brandid, eb.brandName brandname, ep.remarks remarks, ep.state state, sa.nickName oprname, ep.createTime createtime, ep.updateTime updatetime FROM t_erp_product ep LEFT JOIN ( SELECT ecr.subCataId 'cataId', ec.cataName 'subCataName', GROUP_CONCAT( fec.cataName ORDER BY ecr.subLevel DESC SEPARATOR ' > ' ) 'cataName' FROM t_erp_cata_relation ecr LEFT JOIN t_erp_cata ec ON ecr.subCataId = ec.cataId LEFT JOIN t_erp_cata fec ON ecr.cataId = fec.cataId GROUP BY ecr.subCataId ) fcata ON ep.cataId = fcata.cataId LEFT JOIN t_erp_brand eb ON ep.brandId = eb.brandId LEFT JOIN t_erp_brandseries ebs ON ep.seriesId = ebs.seriesId LEFT JOIN t_sys_account sa ON sa.accId = ep.oprId LEFT JOIN t_erp_productspec eps ON eps.productCode = ep.productCode WHERE ep.state != 'D' and ep.updateTime > :sql_last_value" use_column_value => true tracking_column => "updatetime" tracking_column_type => "timestamp" schedule => "* * * * *" last_run_metadata_path => "/usr/local/service/logstash/temp/gd_spu_dev_sql_last_value.yml" type => "jdbc" } } output { elasticsearch { hosts => "" user => "" index => "" password => "" document_id => "%{productcode}" } }
sku管道-全量
input { jdbc { jdbc_connection_string => "jdbc:mysql:///lt_uat_db?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowLoadLocalInfile=false" jdbc_user => "" jdbc_password => "" jdbc_driver_library => "/usr/local/service/logstash/extended-files/mysql-connector-java-5.1.40.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "5000" statement => "SELECT ep.productCode productcode, ep.productName productname, eps.specCode speccode, eps.specName specname, ep.seriesId seriesid, ep.pic1 pic1, ebs.seriesName seriesname, ep.multiUnitRule multiunitrule, ep.unitName unitname, ep.brandId brandid, eb.brandName brandname, eps.remarks remarks, ep.state state, eps.createTime createtime, eps.updateTime updatetime, sa.areaName areaname, sa.areaId areaid, eup.unifiedSalePrice unifiedsaleprice, eup.priceId priceid, IFNULL( t.salenum, 0 ) salenum, ( SELECT COUNT(1) FROM t_erp_costprice ecp WHERE ecp.state = 'A' AND ecp.supplierCode IN (SELECT supplierCode FROM t_wms_supplier_setting WHERE showTag = 1) AND ecp.specCode = eup.specCode AND ecp.areaId IN (SELECT areaId FROM t_sys_area_relation WHERE subAreaId = eup.areaId) ) servicepricecount, COALESCE((SELECT COUNT(1) 'activityCount' FROM t_erp_activity_product eap LEFT JOIN t_erp_activity ea ON eap.activityId = ea.activityId WHERE ea.state = 'A' AND NOW() BETWEEN ea.validStart AND ea.validEnd AND (SELECT COUNT(1) FROM t_sys_area_relation WHERE FIND_IN_SET(areaId,ea.areaIds) AND subAreaId = eup.areaId) > 0 AND eap.specCode = eup.specCode GROUP BY eap.specCode),0) activitycount FROM t_erp_product ep LEFT JOIN t_erp_brand eb ON ep.brandId = eb.brandId LEFT JOIN t_erp_brandseries ebs ON ep.seriesId = ebs.seriesId LEFT JOIN t_erp_productspec eps ON eps.productCode = ep.productCode LEFT JOIN t_erp_unified_price eup ON eps.specCode = eup.specCode LEFT JOIN t_sys_area sa ON eup.areaId = sa.areaId LEFT JOIN ( SELECT sum( itemNum ) salenum, specCode FROM t_erp_saleproduct GROUP BY specCode ) t ON t.specCode = eps.specCode WHERE ep.state != 'D' AND eup.unifiedSalePrice IS NOT NULL GROUP BY priceid" type => "jdbc" } } output { elasticsearch { hosts => user => "" index => "" password => "" document_id => "%{priceid}" } }
sku管道-增量
SELECT ep.productCode productcode, ep.productName productname, eps.specCode speccode, eps.specName specname, ep.seriesId seriesid, ep.pic1 pic1, ebs.seriesName seriesname, ep.multiUnitRule multiunitrule, ep.unitName unitname, ep.brandId brandid, eb.brandName brandname, eps.remarks remarks, ep.state state, eps.createTime createtime, eps.updateTime updatetime, sa.areaName areaname, sa.areaId areaid, eup.unifiedSalePrice unifiedsaleprice, eup.priceId priceid, ep.cataId cataid, ec.cataName cataname, IFNULL( t.salenum, 0 ) salenum, ( SELECT COUNT(1) FROM t_erp_costprice ecp WHERE ecp.state = 'A' AND ecp.supplierCode IN (SELECT supplierCode FROM t_wms_supplier_setting WHERE showTag = 1) AND ecp.specCode = eup.specCode AND ecp.areaId IN (SELECT areaId FROM t_sys_area_relation WHERE subAreaId = eup.areaId) ) servicepricecount, COALESCE((SELECT COUNT(1) 'activityCount' FROM t_erp_activity_product eap LEFT JOIN t_erp_activity ea ON eap.activityId = ea.activityId WHERE ea.state = 'A' AND NOW() BETWEEN ea.validStart AND ea.validEnd AND (SELECT COUNT(1) FROM t_sys_area_relation WHERE FIND_IN_SET(areaId,ea.areaIds) AND subAreaId = eup.areaId) > 0 AND eap.specCode = eup.specCode GROUP BY eap.specCode),0) activitycount FROM t_erp_product ep LEFT JOIN t_erp_brand eb ON ep.brandId = eb.brandId LEFT JOIN t_erp_brandseries ebs ON ep.seriesId = ebs.seriesId LEFT JOIN t_erp_productspec eps ON eps.productCode = ep.productCode LEFT JOIN t_erp_unified_price eup ON eps.specCode = eup.specCode LEFT JOIN t_sys_area sa ON eup.areaId = sa.areaId LEFT JOIN ( SELECT sum( itemNum ) salenum, specCode FROM t_erp_saleproduct GROUP BY specCode ) t ON t.specCode = eps.specCode LEFT JOIN t_erp_cata ec ON ec.cataId = ep.cataId INNER JOIN t_erp_costprice cs ON cs.specCode = eps.specCode WHERE ep.state = 'A' AND eup.unifiedSalePrice IS NOT NULL AND cs.state = 'A' AND eps.updateTime > :sql_last_value GROUP BY priceid
注意事项
- logstash同步数据库只能同步内网的,不支持外网
- es不能用驼峰或者下划线,否则会同步失败
- 要指定文档id,否则会一直增加重复文档
- 使用增量同步时,配置的yml文件名不能相同,可以自定义
同步命令使用示例
- 先删除原有的索引
DELETE sku_test
- 然后创建新索引
PUT /sku_test { "settings": { "index": { "analysis": { "analyzer": { "my_ik": { "type": "custom", "tokenizer": "ik_max_word", "filter": [ "my_synonym" ] } }, "filter": { "my_synonym": { "type": "synonym", "synonyms_path": "analysis/litong_synonym.txt" } } }, "number_of_shards":3, "number_of_replicas":2 } }, "mappings": { "properties": { "productcode": { "type": "keyword" }, "productname": { "type": "text", "analyzer": "ik_max_word" }, "speccode": { "type": "keyword" }, "specname": { "type": "text", "analyzer": "ik_max_word" }, "seriesid": { "type": "integer" }, "seriesname":{ "type": "text", "analyzer": "ik_max_word" }, "multiunitrule": { "type": "text" }, "unitname": { "type": "text" }, "brandid": { "type": "integer" }, "brandname": { "type": "text", "analyzer": "ik_max_word" }, "remarks": { "type": "text" }, "state": { "type": "keyword" }, "pic1":{ "type": "text" }, "updatetime" : { "type" : "date" }, "createtime" : { "type" : "date" }, "unifiedsaleprice" : { "type": "double" }, "salenum" : { "type": "double" }, "areaid" : { "type" : "integer" }, "areaname" : { "type": "text" }, "priceid" : { "type": "integer" }, "cataname": { "type": "text", "analyzer": "ik_max_word" }, "cataid": { "type": "integer" }, "categoryid": { "type": "integer" }, "servicepricecount": { "type": "integer" }, "activitycount": { "type": "integer" } } } }
- 最后执行命令查看是否同步成功,如果有数据就说明同步成功
GET /sku_test/_count
本地调试的时候需要注意
如果有碰到访问不了es、403等问题,说明没有配es的防火墙
首先要登入腾讯云es的控制台,注意是上海的服务器
然后点击管理集群
点击实例名称进入
点击访问控制
然后点击修改白名单
直接点获取当前ip即可,如果ip满了,就删掉几个没用的,因为公司的ip会经常变
一路确认
保存了之后,如果还是不生效,那么此时就需要重启集群
点击左上角箭头返回
更多里面有重启
尽量选择滚动重启,如果全量重启的话,会影响线上环境,如果追求速度的话就可以点全量重启
等待重启即可,滚动重启一般需要5-7分钟