13.2.2 分页
elasticsearch 默认情况下只返回top10的数据。而如果要查询更多数据就需要修改分页参数了。
elasticsearch中通过修改from、size参数来控制要返回的分页结果:
- from:开始的页数
- size:文档总数
采用逻辑分页,
13.2.3 深度分页
ES是分布式的,所以会面临深度分页问题。例如按price排序后,获取from = 990,size =10的数据:
1.首先在每个数据分片上都排序并查询前1000条文档。
2.然后将所有节点的结果聚合,在内存中重新排序选出前1000条文档
3.最后从这1000条中,选取从990开始的10条文档
如果搜索页数过深,或者结果集(from + size)越大,对内存和CPU的消耗也越高。因此ES设定结果集查询的上限是10000
解决深度分页:
search after:分页时需要排序,原理是从上一个的排序值开始,查询下一页数据
scroll:原理排序数据形成快照,保存在内存。消耗内存大
from + size:
优点:支持随机翻页
缺点:深度分页问题,默认查询上限(from + size)是10000
场景:百度、京东、谷歌、淘宝这样的随机翻页搜索
after search:
优点:没有查询上限(单次查询的size不超过10000)
缺点:只能向后逐页查询,不支持随机翻页
场景:没有随机翻页需求的搜索,例如手机向下滚动翻页
scroll:
优点:没有查询上限(单次查询的size不超过10000)
缺点:会有额外内存消耗,并且搜索结果是非实时的
场景:海量数据的获取和迁移。从ES7.1开始不推荐,建议用 after search方案。
13.2.4 高亮
高亮:就是在搜索结果中把搜索关键字突出显示。
原理是这样的:
- 将搜索结果中的关键字用标签标记出来
- 在页面中给标签添加css样式
# 高亮结果, 默认情况下ES搜索字段必须高亮的字段一致才会高亮 GET /shier_hotel/_search { "query": { "match": { "all": "上海" } }, "highlight": { "fields": { "all": { "pre_tags": "<em>", "post_tags": "</em>" }, "city": { "pre_tags": "<em>", "post_tags": "</em>" , "require_field_match": "false" // 是否需要搜索字段与高亮字段一直才高亮显示 }, "business": { "pre_tags": "<em>", "post_tags": "</em>" }, "address": { "pre_tags": "<em>", "post_tags": "</em>" } } } }
DSL搜索结果语句总结
13.4 RestClient 查询文档
13.4.1 快速上手
通过使用match_all 演示
基本步骤:
创建SearchRequest对象
准备Request.source(),也就是DSL。
① QueryBuilders来构建查询条件
② 传入Request.source() 的 query() 方法
发送请求,得到结果
解析结果(参考JSON结果,从外到内,逐层解析)
将查询到的结果进行解析,都有对应的JavaAPI
RestAPI中其中构建DSL是通过HighLevelRestClient中的resource()来实现的,其中包含了查询、排序、分页、高亮等所有功能,
RestAPI中其中构建查询条件的核心部分是由一个名为QueryBuilders的工具类提供的,其中包含了各种查询方法
具体实现:
@ @Test void testMatchAll() throws IOException { // 获取request SearchRequest request = new SearchRequest("shier_hotel"); // dsl语句 request.source().query(QueryBuilders.matchAllQuery()); // 发送请求 SearchResponse response = restClient.search(request, RequestOptions.DEFAULT); // 解析结果 SearchHits searchHits = response.getHits(); // 查询总条数 long total = searchHits.getTotalHits().value; // 查询结果是数组 SearchHit[] hits = searchHits.getHits(); for (SearchHit searchHit : hits) { // 得到source String source = searchHit.getSourceAsString(); // 反序列化 HotelDoc hotelDoc = JSON.parseObject(source, HotelDoc.class); // 打印结果 System.out.println(hotelDoc); } System.out.println("搜索结果有:" + total + "条"); }
13.4.2 全文检索查询
全文检索的match和multi_match查询与match_all的API基本一致。差别是查询条件,也就是query的部分。
同样是利用QueryBuilders提供的方法:
/** * Match查询 * @throws IOException */ @Test void testMatch() throws IOException { // 获取request SearchRequest request = new SearchRequest("shier_hotel"); // dsl语句 request.source().query(QueryBuilders.matchQuery("all","上海")); // 发送请求 SearchResponse response = restClient.search(request, RequestOptions.DEFAULT); // 解析结果 handleResponse(response); } private static void handleResponse(SearchResponse response) { // 解析结果 SearchHits searchHits = response.getHits(); // 查询总条数 long total = searchHits.getTotalHits().value; System.out.println("搜索结果有:" + total + "条"); // 查询结果是数组 SearchHit[] hits = searchHits.getHits(); for (SearchHit searchHit : hits) { // 得到source String source = searchHit.getSourceAsString(); // 反序列化 HotelDoc hotelDoc = JSON.parseObject(source, HotelDoc.class); // 打印结果 System.out.println(hotelDoc); } }
抽取相同的代码 :Ctrl + Alt + m
13.4.3 布尔查询与精确查询
精确查询常见的有term查询和range查询,同样利用QueryBuilders实现
/** * 布尔查询 * @throws IOException */ @Test void testBool() throws IOException { // 获取request SearchRequest request = new SearchRequest("shier_hotel"); // 准备BooleanQuery BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); // 添加term boolQuery.must(QueryBuilders.termQuery("city", "杭州")); // 添加range boolQuery.filter(QueryBuilders.rangeQuery("price").lte(250)); request.source().query(QueryBuilders.termQuery("city","上海")); // 发送请求 SearchResponse response = restClient.search(request, RequestOptions.DEFAULT); // 解析结果 handleResponse(response); }
要构建查询条件,只要记住一个类:QueryBuilders
13.4.4 排序、分页
搜索结果的排序和分页是与query同级的参数,对应的API关系:
@Test void testPageAndSort() throws IOException { // 页码,每页大小 int page = 1, size = 5; // 1.准备Request SearchRequest request = new SearchRequest("shier_hotel"); // 2.准备DSL // 2.1.query request.source().query(QueryBuilders.matchAllQuery()); // 2.2.排序 sort request.source().sort("price", SortOrder.ASC); // 2.3.分页 from、size request.source().from((page - 1) * size).size(5); // 3.发送请求 SearchResponse response = client.search(request, RequestOptions.DEFAULT); // 4.解析响应 handleResponse(response); }
13.4.5 高亮
高亮API包括请求DSL构建和结果解析两部分。我们先看请求的DSL构建:
解析
代码
@Test void testHighlight() throws IOException { // 1.准备Request SearchRequest request = new SearchRequest("shier_hotel"); // 2.准备DSL // 2.1.query request.source().query(QueryBuilders.matchQuery("all", "如家")); // 2.2.高亮 request.source().highlighter(new HighlightBuilder().field("name").requireFieldMatch(false)); // 3.发送请求 SearchResponse response = client.search(request, RequestOptions.DEFAULT); // 4.解析响应 handleHighResponse(response); } public void handleHighResponse(SearchResponse response){ // 4.解析响应 SearchHits searchHits = response.getHits(); // 4.1.获取总条数 long total = searchHits.getTotalHits().value; System.out.println("共搜索到" + total + "条数据"); // 4.2.文档数组 SearchHit[] hits = searchHits.getHits(); // 4.3.遍历 for (SearchHit hit : hits) { // 获取文档source String json = hit.getSourceAsString(); // 反序列化 HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class); // 获取高亮结果 Map<String, HighlightField> highlightFields = hit.getHighlightFields(); if (!CollectionUtils.isEmpty(highlightFields)) { // 根据字段名获取高亮结果 HighlightField highlightField = highlightFields.get("name"); if (highlightField != null) { // 获取高亮值 String name = highlightField.getFragments()[0].string(); // 覆盖非高亮结果 hotelDoc.setName(name); } } System.out.println("hotelDoc = " + hotelDoc); } }
13.5 黑马旅游案例
案例1:实现黑马旅游的酒店搜索功能,完成关键字搜索和分页
先实现其中的关键字搜索功能,实现步骤如下:
- 定义实体类,接收前端请求
@Data public class RequestParams { private String key; private Integer page; private Integer size; private String sortBy; }
定义controller接口,接收页面请求,调用IHotelService的search方法
/** * @author Shier * CreateTime 2023/4/24 19:16 */ @Data public class PageResult { private Long total; private List<HotelDoc> hotels; public PageResult() { } public PageResult(Long total, List<HotelDoc> hotels) { this.total = total; this.hotels = hotels; } }
@RestController @RequestMapping("/hotel") public class HotelController { @Autowired private HotelService hotelService; @PostMapping("/list") public PageResult search(@RequestBody RequestParams params){ return hotelService.search(params); } }
定义IHotelService中的search方法,利用match查询实现根据关键字搜索酒店信息
@Autowired private RestHighLevelClient restClient; @Override public PageResult search(RequestParams params) { try { // 1.准备Request SearchRequest request = new SearchRequest("shier_hotel"); // 2.准备DSL // 2.1.query关键字搜素哦 String key = params.getKey(); if (key == null || "".equals(key)) { request.source().query(QueryBuilders.matchAllQuery()); } else { request.source().query(QueryBuilders.matchQuery("all", key)); } // 2.2.分页 int size = params.getSize(); int page = params.getPage(); request.source().from(page - 1).size(size); // 3.发送请求 SearchResponse response = restClient.search(request, RequestOptions.DEFAULT); // 4.解析响应 return handleHighResponse(response); } catch (IOException e) { throw new RuntimeException(e); } } public PageResult handleHighResponse(SearchResponse response) { // 4.解析响应 SearchHits searchHits = response.getHits(); // 4.1.获取总条数 long total = searchHits.getTotalHits().value; System.out.println("共搜索到" + total + "条数据"); // 4.2.文档数组 SearchHit[] hits = searchHits.getHits(); // 4.3.遍历 List<HotelDoc> hotels = new ArrayList<>(); for (SearchHit hit : hits) { // 获取文档source String json = hit.getSourceAsString(); // 反序列化 HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class); hotels.add(hotelDoc); } // 封装返回 return new PageResult(total, hotels); }
案例2:添加品牌、城市、星级、价格等过滤功能
步骤:
1.修改RequestParams类,添加brand、city、starName、minPrice、maxPrice等参数
@Dat@Data public class RequestParams { private String key; private Integer page; private Integer size; private String sortBy; private String brand; private String city; private String starName; private Integer minPrice; private Integer maxPrice; }
2.修改search方法的实现,在关键字搜索时,如果brand等参数存在,对其做过滤
过滤条件包括:
- city精确匹配
- brand精确匹配
- starName精确匹配
- price范围过滤
注意事项:
- 多个条件之间是AND关系,组合多条件用BooleanQuery
- 参数存在才需要过滤,做好非空判断
@Override public PageResult search(RequestParams params) { try { // 1.准备Request SearchRequest request = new SearchRequest("shier_hotel"); // 2.准备DSL // 2.1.query关键字搜索 // BoolQuery构建 BoolQueryBuilder boolQuery = builderBasicQuery(params); request.source().query(boolQuery); // 2.2.分页 int size = params.getSize(); int page = params.getPage(); request.source().from(page - 1).size(size); // 3.发送请求 SearchResponse response = restClient.search(request, RequestOptions.DEFAULT); // 4.解析响应 return handleHighResponse(response); } catch (IOException e) { throw new RuntimeException(e); } } private static BoolQueryBuilder builderBasicQuery(RequestParams params) { BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); // 关键字搜索 String key = params.getKey(); if (key == null || "".equals(key)) { boolQuery.must(QueryBuilders.matchAllQuery()); } else { boolQuery.must(QueryBuilders.matchQuery("all", key)); } // 城市过滤 if (params.getCity() != null && !params.getCity().equals("")) { boolQuery.filter(QueryBuilders.termQuery("city", params.getCity())); } // 品牌过滤 if (params.getBrand() != null && !params.getBrand().equals("")) { boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand())); } // 星级过滤 if (params.getStarName() != null && !params.getStarName().equals("")) { boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName())); } //价格过滤 if (params.getMinPrice() != null && params.getMaxPrice() != null) { boolQuery.filter(QueryBuilders.rangeQuery("price") .gte(params.getMinPrice()) .lte(params.getMaxPrice())); } return boolQuery; }
案例3:我附近的酒店
前端页面点击定位后,会将你所在的位置发送到后台。
距离排序与普通字段排序有所差异,API如下:
我们要根据这个坐标,将酒店结果按照到这个点的距离升序排序。
实现思路如下:
- 修改RequestParams参数,接收location字段
private String location; • 1
- 修改search方法业务逻辑,如果location有值,添加根据geo_distance排序的功能
// 2.3 距离排序 String location = params.getLocation(); if (location != null && !location.equals("")) { request.source().sort( SortBuilders. geoDistanceSort("location", new GeoPoint(location)) .order(SortOrder.ASC) .unit(DistanceUnit.KILOMETERS) ); }
// 获取排序值,显示值 Object[] sortValues = hit.getSortValues(); if (sortValues.length > 0) { Object value = sortValues[0]; hotelDoc.setDistance(value); }
案例4:让指定的酒店在搜索结果中排名置顶
实现步骤分析:
1.给HotelDoc类添加isAD字段,Boolean类型
2.挑选几个你喜欢的酒店,给它的文档数据添加isAD字段,值为true
# 添加广告 POST /shier_hotel/_update/1393017952 { "doc": { "isAD": true } } POST /shier_hotel/_update/1455383931 { "doc": { "isAD": true } } POST /shier_hotel/_update/728180 { "doc": { "isAD": true } } POST /shier_hotel/_update/200215365 { "doc": { "isAD": true } }
3.修改search方法,添加function score功能,给isAD值为true的酒店增加权重
先看看DSL对应的Java代码:
// 算分控制 FunctionScoreQueryBuilder functionScoreQuery = QueryBuilders.functionScoreQuery( boolQuery, //原始查询 new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{ // 相关性算分查询 new FunctionScoreQueryBuilder.FilterFunctionBuilder( QueryBuilders.termQuery("isAd", true),// 过滤条件 ScoreFunctionBuilders.weightFactorFunction(10) //算分函数 ) }); request.source().query(functionScoreQuery);
排序
前端会传递sortBy参数,就是排序方式,我们需要判断sortBy值是什么:
- default:相关度算分排序,这个不用管,es的默认排序策略
- score:根据酒店的score字段排序,也就是用户评价,降序
- price:根据酒店的price字段排序,就是价格,升序
高亮
给黑马旅游添加搜索关键字高亮效果
前端已经给标签写好CSS样式了。我们只需要负责服务端高亮即可。
如果你在前面写过了这个高亮的测试,那就是复制粘贴就可以了的
十四、数据聚合
14.1 聚合分类
聚合aggregations可以实现对文档数据的统计、分析、运算。聚合常见的有三类:
桶(Bucket)集合:用来对文档做分组
TermAggregations:按照文档字段值分组
DateHistogram:按照日期阶梯分组,例如:一周为一组,一个月为一组
度量(Metric)集合:用来计算一些值,比如最大值,最小值,平均值等数值类
Avg:求平均值
Max:求最大值
Min:求最小值
Stats:求max、min、avg、sum等
管道(pipeline)聚合
参与集合的字段类型
- keyword
- 数值
- 日期
- 布尔
14.2 DSL实现Bucket聚合
举例:现在,我们要统计所有数据中的酒店品牌有几种,此时可以根据酒店品牌的名称做聚合。
类型为term类型
DSL示例:
聚合三要素
- 聚合名称
- 聚合类型
- 聚合字段
聚合可配置属性
- size:结果数量
- order:排序方式
- field:字段
例子实现:
# 聚合类型 GET /shier_hotel/_search { "size": 0, "aggs": { "brandAgg": { "terms": { "field": "brand", "size": 10 } } } }
14.2.1 聚合结果排序
# 聚合类型,自定义排序规则 GET /shier_hotel/_search { "size": 0, "aggs": { "brandAgg": { "terms": { "field": "brand", "size": 10, "order": { "_count": "asc" } } } } }
14.2.1 限定范围
默认情况下,Bucket聚合是对索引库的所有文档做聚合,我们可以限定要聚合的文档范围,只要添加query条件即可
# 聚合类型,限定聚合搜索的范围 GET /shier_hotel/_search { "query": { "range": { "price": { "lte": 200 } } }, "size": 0, "aggs": { "brandAgg": { "terms": { "field": "brand", "size": 10, "order": { "_count": "asc" } } } } }
14.3 DSL实现Metrics聚合
我们要求获取每个品牌的用户评分的min、max、avg等值
- Avg:
- Max:
- Min:
- Stats:求max、min、avg、sum等
语法格式:
例子说明:
# 嵌套聚合类型, GET /shier_hotel/_search { "size": 0, "aggs": { "brandAgg": { "terms": { "field": "brand", "size": 10 , "order": { // 排序 "scoreAgg.avg": "desc" } }, "aggs": { //嵌套排序, "scoreAgg": { "stats": { "field": "score" } } } } } }
14.4 RestClient实现
Java的RestClient使用,先看请求组装:
聚合结果解析:
具体实现:
/** * 聚合数据搜索 */ @Test void testAggregation() throws IOException { // 1. 准备request请求 SearchRequest request = new SearchRequest("shier_hotel"); // 2.DSL参数 request.source().size(0); request.source().aggregation( AggregationBuilders .terms("brandAgg") .field("brand") .size(20)); // 3.发送请求 SearchResponse response = restClient.search(request, RequestOptions.DEFAULT); // 4.解析结果 // 4.1 拿到最外层的aggregation Aggregations aggregations = response.getAggregations(); // 4.2拿到聚合的名称 Terms brandTerms = aggregations.get("brandAgg"); // 4.3拿到聚合里面的桶 List<? extends Terms.Bucket> buckets = brandTerms.getBuckets(); // 4.4遍历数组获取key for (Terms.Bucket bucket : buckets) { String brandName = bucket.getKeyAsString(); System.out.println(brandName); } }
14.5 在IUserService中定义方法,实现对品牌、城市、星级的聚合
需求:搜索页面的品牌、城市等信息不应该是在页面写死,而是通过聚合索引库中的酒店数据得来的
需要实现一个接口
对接前端接口
/** * 聚合搜索 * * @return */ @Override public Map<String, List<String>> filters(RequestParams params) { try { // 获取request SearchRequest request = new SearchRequest("shier_hotel"); // DSL参数 request.source().size(0); buliderAggregation(request); // 查询信息 builderBasicQuery(params,request); // 发送请求 SearchResponse response = restClient.search(request, RequestOptions.DEFAULT); // 解析结果 Map<String, List<String>> resultMap = new HashMap<>(); Aggregations aggregations = response.getAggregations(); // 根据多个名称,获取到聚合结果 List<String> brandList = getAggByName(aggregations,"brandAgg"); resultMap.put("品牌", brandList); List<String> starList = getAggByName(aggregations,"starAgg"); resultMap.put("星级", starList); List<String> cityList = getAggByName(aggregations,"cityAgg"); resultMap.put("城市", cityList); return resultMap; } catch (IOException e) { throw new RuntimeException(e); } } /** * 根据不同的名称聚合 * @param aggregations * @param aggName * @return */ private static List<String> getAggByName(Aggregations aggregations,String aggName) { Terms brandTerms = aggregations.get(aggName); // 获取桶 List<? extends Terms.Bucket> buckets = brandTerms.getBuckets(); List<String> brandList = new ArrayList<>(); for (Terms.Bucket bucket : buckets) { String key = bucket.getKeyAsString(); brandList.add(key); } return brandList; } /** * 聚合条件 * * @param request */ private static void buliderAggregation(SearchRequest request) { request.source().aggregation( AggregationBuilders .terms("brandAgg") .field("brand") .size(200)); request.source().aggregation( AggregationBuilders .terms("cityAgg") .field("city") .size(200)); request.source().aggregation( AggregationBuilders .terms("starAgg") .field("starName") .size(200)); }
接口:
/** * 聚合搜索 */ Map<String , List<String >> filters(RequestParams params);
Controller对应接口:
@PostMapping("/filters") public Map<String, List<String>> getFilters(@RequestBody RequestParams params){ return hotelService.filters(params); }
十五、自动补全
当用户在搜索框输入字符时,我们应该提示出与该字符有关的搜索项,如图:
15.1 拼音分词器
① 解压
② 上传到虚拟机中,elasticsearch的plugin目录
③ 重启elasticsearch
④ 测试,安装好就去测试一下:
# 拼音分词器 POST /_analyze { "text": ["上海酒店最豪华"], "analyzer": "pinyin" }
15.2 自定义分词器
character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符
tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词,还有ik_smart
tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等例如:
我们可以在创建索引库时,通过settings来配置自定义的analyzer(分词器):
例子:
# 自定义拼音分词器 PUT /test { "settings": { "analysis": { "analyzer": { "my_analyzer": { "tokenizer": "ik_max_word", "filter": "py" } }, "filter": { "py": { "type": "pinyin", "keep_full_pinyin": false, "keep_joined_full_pinyin": true, "keep_original": true, "limit_first_letter_length": 16, "remove_duplicated_term": true, "none_chinese_pinyin_tokenize": false } } } } } # 拼音分词器 POST /test/_analyze { "text": ["上海酒店最豪华"], "analyzer": "pinyin" }
例子:
POST /test/_doc/1 { "id": 1, "name": "狮子" } POST /test/_doc/2 { "id": 2, "name": "虱子" } GET /test/_search { "query": { "match": { "name": "狮子" } } }
拼音分词器适合在创建倒排索引的时候使用,但不能在搜索的时候使用。
创建的要和搜索的拼音分词器要分开
因此字段在创建倒排索引时应该用my_analyzer分词器;字段在搜索时应该使用ik_smart分词器
# 自定义拼音分词器 PUT /test { "settings": { "analysis": { "analyzer": { "my_analyzer": { "tokenizer": "ik_max_word", "filter": "py" } }, "filter": { "py": { "type": "pinyin", "keep_full_pinyin": false, "keep_joined_full_pinyin": true, "keep_original": true, "limit_first_letter_length": 16, "remove_duplicated_term": true, "none_chinese_pinyin_tokenize": false } } } }, "mappings": { "properties": { "name":{ "type": "text", "analyzer": "my_analyzer", "search_analyzer": "ik_smart" } } } }
此时去搜索狮子,就只有狮子,而不会出现虱子
15.3 completion suggester查询
elasticsearch提供了Completion Suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型有一些约束:
参与补全查询的字段必须是completion类型。
字段的内容一般是用来补全的多个词条形成的数组具体语法如下:
例子:
# 自动补全的索引库 PUT testauto { "mappings": { "properties": { "title":{ "type": "completion" } } } } # 示例数据 POST testauto/_doc { "title": ["Sony", "WH-1000XM3"] } POST testauto/_doc { "title": ["SK-II", "PITERA"] } POST testauto/_doc { "title": ["Nintendo", "switch"] } # 自动补全查询 POST /testauto/_search { "suggest": { "titleSuggest": { "text": "sk", "completion": { "field": "title", "skip_duplicates": true, "size": 10 } } } }
15.3.1 实现hotel索引库的自动补全、拼音搜索功能
实现思路如下:
1.修改hotel索引库结构,设置自定义拼音分词器
2.修改索引库的name、all字段,使用自定义分词器
3.索引库添加一个新字段suggestion,类型为completion类型,使用自定义的分词器
4.给HotelDoc类添加suggestion字段,内容包含brand、business
5.重新导入数据到hotel库
注意:name、all是可分词的,自动补全的brand、business是不可分词的,要使用不同的分词器组合
# 酒店数据索引库 PUT /shier_hotel { "settings": { "analysis": { "analyzer": { "text_anlyzer": { "tokenizer": "ik_max_word", "filter": "py" }, "completion_analyzer": { "tokenizer": "keyword", "filter": "py" } }, "filter": { "py": { "type": "pinyin", "keep_full_pinyin": false, "keep_joined_full_pinyin": true, "keep_original": true, "limit_first_letter_length": 16, "remove_duplicated_term": true, "none_chinese_pinyin_tokenize": false } } } }, "mappings": { "properties": { "id":{ "type": "keyword" }, "name":{ "type": "text", "analyzer": "text_anlyzer", "search_analyzer": "ik_smart", "copy_to": "all" }, "address":{ "type": "keyword", "index": false }, "price":{ "type": "integer" }, "score":{ "type": "integer" }, "brand":{ "type": "keyword", "copy_to": "all" }, "city":{ "type": "keyword" }, "starName":{ "type": "keyword" }, "business":{ "type": "keyword", "copy_to": "all" }, "location":{ "type": "geo_point" }, "pic":{ "type": "keyword", "index": false }, "all":{ "type": "text", "analyzer": "text_anlyzer", "search_analyzer": "ik_smart" }, "suggestion":{ "type": "completion", "analyzer": "completion_analyzer" } } } } # 自动补全查询 POST /shier_hotel/_search { "suggest": { "titleSuggest": { "text": "sd", "completion": { "field": "suggestion", "skip_duplicates": true, "size": 10 } } } }
15.4 RestAPI实现自动补全
具体测试实现:
/** * 自动补全 */ @Test void testAutoSuggest() throws IOException { // 1. 准备request请求 SearchRequest request = new SearchRequest("shier_hotel"); // 2.DSL参数 request.source().suggest(new SuggestBuilder().addSuggestion( "suggestions", SuggestBuilders.completionSuggestion("suggestion") .prefix("s") .skipDuplicates(true) .size(10) )); // 3.发送请求 SearchResponse response = restClient.search(request, RequestOptions.DEFAULT); // 4. 解析结果 Suggest suggest = response.getSuggest(); CompletionSuggestion suggestions = suggest.getSuggestion("suggestions"); List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions(); for (CompletionSuggestion.Entry.Option option : options) { String text = option.getText().toString(); System.out.println(text); } }
15.4.1 实现酒店搜索页面输入框的自动补全
具体实现:
/** * 自动补全 * * @param prefix * @return */ @Override public List<String> getSuggestions(String prefix) { try { // request请求 SearchRequest request = new SearchRequest("shier_hotel"); // DSl 参数 request.source().suggest(new SuggestBuilder().addSuggestion( "suggestions", SuggestBuilders.completionSuggestion("suggestion") .prefix(prefix) .skipDuplicates(true) .size(10) )); // 发送请求 SearchResponse response = restClient.search(request, RequestOptions.DEFAULT); Suggest suggest = response.getSuggest(); CompletionSuggestion suggestions = suggest.getSuggestion("suggestions"); List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions(); List<String> list = new ArrayList<>(options.size()); for (CompletionSuggestion.Entry.Option option : options) { String text = option.getText().toString(); list.add(text); } return list; } catch (IOException e) { throw new RuntimeException(e); } }
十六、数据同步
16.1 数据同步问题分析
elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elasticsearch与mysql之间的数据同步。
在微服务中,负责酒店管理(操作mysql )的业务与负责酒店搜索(操作elasticsearch )的业务可能在两个不同的微服务上,数据同步该如何实现呢?
16.1.1 同步调用解决
- 先写入数据库
- 再去调用索引库接口
- 更新es
存在耦合问题
优点:实现简单,粗暴
缺点:业务耦合度高
16.1.2 异步通知
- 写入数据库
- 通过mq发送信息
- 监听消息
- 更新es
优点:低耦合,实现难度一般
缺点:依赖mq的可靠性
16.1.3 监听binlog
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nlXMUerX-1682515505297)(https://gitee.com/kcsen/img/raw/master/picGo/image-20230425214540917.png)]
优点:完全解除服务间耦合
缺点:开启binlog增加数据库负担、实现复杂度高
方案一:同步调用
简单粗暴
业务耦合高
方案二:异步通知
- 低耦合
- 依赖mq的可靠性
方案三:监听binlog
- 耦合低、实现复杂
16.2 利用MQ实现mysql与elasticsearch数据同步
当酒店数据发生增、删、改时,要求对elasticsearch中数据也要完成相同操作。
导入hotel-admin项目,启动并测试酒店数据的CRUD
声明exchange、queue、RoutingKey
在hotel-admin中的增、删、改业务中完成消息发送
在hotel-demo中完成消息监听,并更新elasticsearch中数据
启动并测试数据同步功能具体的发送请求过程:
实现:
声明exchange、queue、RoutingKey
/** * @author Shier * CreateTime 2023/4/26 16:56 * 交换机声明 */ public class MqConstants { /** * 交换机 */ public static final String HOTEL_EXCHANGE ="hotel.topic"; /** * 监听新增和修改的队列 */ public static final String HOTEL_INSET_QUEUE ="hotel.insert_queue"; /** * 监听删除的队列 */ public static final String HOTEL_DELETE_QUEUE ="hotel.delete_queue"; /** * 新增或修改的RoutingKey */ public static final String HOTEL_INSERT_KEY ="hotel.insert_key"; /** * 删除的RoutingKey */ public static final String HOTEL_DELETE_KEY ="hotel.delete_key"; }
/** * @author Shier * CreateTime 2023/4/26 17:06 * */ @Configuration public class MqConfig { /** * 交换机 */ @Bean public TopicExchange topicExchange(){ return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false); } /** * 新增监听队列 */ @Bean public Queue insertQueue(){ return new Queue(MqConstants.HOTEL_INSET_QUEUE,true); } /** * 删除监听队列 */ @Bean public Queue deleteQueue(){ return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true); } /** * 绑定新增事件 */ @Bean public Binding insertQueueBinding(){ return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY); } /** * 绑定删除事件 */ @Bean public Binding deleteQueueBinding(){ return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY); } }
增、删、改业务中完成消息发送
public class HotelController { @Autowired private RabbitTemplate rabbitTemplate; /** * 新增酒店 * @param hotel */ @PostMapping public void saveHotel(@RequestBody Hotel hotel){ hotelService.save(hotel); rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId()); } /** * 修改 * @param hotel */ @PutMapping() public void updateById(@RequestBody Hotel hotel){ if (hotel.getId() == null) { throw new InvalidParameterException("id不能为空"); } hotelService.updateById(hotel); rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId()); } /** * 删除 * @param id */ @DeleteMapping("/{id}") public void deleteById(@PathVariable("id") Long id) { hotelService.removeById(id); rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id); } }
完成消息监听,并更新elasticsearch中数据
/** * @author Shier * CreateTime 2023/4/26 17:25 */ @Component public class HotelListener { @Autowired private IHotelService hotelService; /** * 监听酒店新增或修改的业务 * @param id */ @RabbitListener(queues = MqConstants.HOTEL_INSET_QUEUE) public void listenHotelInsertOrUpdate(Long id){ hotelService.insertById(id); } /** * 监听酒店删除的业务 * @param id */ @RabbitListener(queues = MqConstants.HOTEL_INSET_QUEUE) public void listenHotelDelete(Long id){ hotelService.deleteById(id); } }
十七、elasticsearch集群
17.1 ES集群结构
单机的elasticsearch做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题。
- 海量数据存储问题:将索引库从逻辑上分为N个分片(shard),存储到多个节点
- 单点故障问题:将分片数据在不同节点备份(replica)
17.2 ES集群部署
我们会在单机上利用docker容器运行多个es实例来模拟es集群。不过生产环境推荐大家每一台服务节点仅部署一个es的实例。
部署es集群可以直接使用docker-compose来完成,但这要求你的Linux虚拟机至少有4G的内存空间
17.2.1.创建es集群
首先编写一个docker-compose文件,内容如下:
version: '2.2' services: es01: image: elasticsearch:7.17.9 container_name: es01 environment: - node.name=es01 - cluster.name=es-docker-cluster # 集群名称 - discovery.seed_hosts=es02,es03 # 其他集群的IP地址,docker容器名称 - cluster.initial_master_nodes=es01,es02,es03 # 主从节点 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" # JVM堆内存大小 volumes: - data01:/usr/share/elasticsearch/data # 数据卷位置 ports: - 9200:9200 networks: - elastic es02: image: elasticsearch:7.17.9 container_name: es02 environment: - node.name=es02 - cluster.name=es-docker-cluster - discovery.seed_hosts=es01,es03 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data02:/usr/share/elasticsearch/data ports: - 9201:9200 networks: - elastic es03: image: elasticsearch:7.127.9 container_name: es03 environment: - node.name=es03 - cluster.name=es-docker-cluster - discovery.seed_hosts=es01,es02 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data03:/usr/share/elasticsearch/data networks: - elastic ports: - 9202:9200 volumes: data01: driver: local data02: driver: local data03: driver: local networks: elastic: driver: bridge
es运行需要修改一些linux系统权限,修改/etc/sysctl.conf
文件
vi /etc/sysctl.conf • 1
添加下面的内容:
vm.max_map_count=262144 • 1
然后执行命令,让配置生效:
sysctl -p
通过docker-compose启动集群:
docker-compose up -d
17.2.2 集群状态监控
安装解压,然后进入bin目录,运行cerebro.bat文件,然后访问https://localhots:9000
17.2.3 创建索引库
- 利用kibana的DevTools创建索引库
PUT /itcast { "settings": { "number_of_shards": 3, // 分片数量 "number_of_replicas": 1 // 副本数量 }, "mappings": { "properties": { // mapping映射定义 ... } } }
- 利用cerebro创建索引库
利用cerebro还可以创建索引库:
填写索引库信息:
点击右下角的create按钮:
查看分片效果
回到首页,即可查看索引库分片效果:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1ntEbT83-1682515505303)(…/…/…/…/Desktop/assets/image-20210602221914483.png)]
17.3 ES集群的节点角色
elasticsearch中集群节点有不同的职责划分
ES可以同时拥有以上的四个节点类型,但是在开发当中不一定都有每一个集群节点,都拥有这样四种类型
17.4 ES集群的分布式查询
elasticsearch中的每个节点角色都有自己不同的职责,因此建议集群部署时,每个节点都有独立的角色。
17.5 ES集群的脑裂
默认情况下,每个节点都是master eligible节点,因此一旦master节点宕机,其它候选节点会选举一个成为主节点。当主节点与其他节点网络故障时,可能发生脑裂问题。
为了避免脑裂,需要要求选票超过 ( eligible节点数量 + 1 )/ 2 才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题
17.6 ES集群的分布式存储
elasticsearch会通过hash算法来计算文档应该存储到哪个分片:
说明:
- _routing默认是文档的id
- 算法与分片数量有关,因此索引库一旦创建,分片数量不能修改!
17.6.1 新增文档浏览
- 深蓝色:主分片
- 浅蓝色:副分片
17.6.2 ES集群的分布式查询
elasticsearch的查询分成两个阶段:
- scatter phase:分散阶段,coordinating node会把请求分发到每一个分片
- gather phase:聚集阶段,coordinating node汇总data node的搜索结果,并处理为最终结果集返回给用户
分布式新增确定分片
coordinating node根据 id 做hash运算,得到结果对shard数量取余,余数就是对应的分片
分布式查询的两个阶段
分散阶段: coordinating node将查询请求分发给不同分片
收集阶段:将查询结果汇总到coordinating node ,整理并返回给用户
17.7 ES集群的故障转移
集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。
当三个节点当中的node1节点挂了,就会自动选择其他的主节点,并且把故障节点上的数据迁移到健康的节点上。
故障转移:
- master宕机后,EligibleMaster选举为新的主节点。
- master节点监控分片、节点状态,将故障节点上的分片转移到正常节点,确保数据安全。
L_INSERT_KEY,hotel.getId());
}
/** * 删除 * @param id */ @DeleteMapping("/{id}") public void deleteById(@PathVariable("id") Long id) { hotelService.removeById(id); rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id); }
完成消息监听,并更新elasticsearch中数据 ```java /** * @author Shier * CreateTime 2023/4/26 17:25 */ @Component public class HotelListener { @Autowired private IHotelService hotelService; /** * 监听酒店新增或修改的业务 * @param id */ @RabbitListener(queues = MqConstants.HOTEL_INSET_QUEUE) public void listenHotelInsertOrUpdate(Long id){ hotelService.insertById(id); } /** * 监听酒店删除的业务 * @param id */ @RabbitListener(queues = MqConstants.HOTEL_INSET_QUEUE) public void listenHotelDelete(Long id){ hotelService.deleteById(id); } }
十七、elasticsearch集群
17.1 ES集群结构
单机的elasticsearch做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题。
海量数据存储问题:将索引库从逻辑上分为N个分片(shard),存储到多个节点
单点故障问题:将分片数据在不同节点备份(replica)
[外链图片转存中…(img-xCunf5lE-1682515505300)]
17.2 ES集群部署
我们会在单机上利用docker容器运行多个es实例来模拟es集群。不过生产环境推荐大家每一台服务节点仅部署一个es的实例。
部署es集群可以直接使用docker-compose来完成,但这要求你的Linux虚拟机至少有4G的内存空间
17.2.1.创建es集群
首先编写一个docker-compose文件,内容如下:
version: '2.2' services: es01: image: elasticsearch:7.17.9 container_name: es01 environment: - node.name=es01 - cluster.name=es-docker-cluster # 集群名称 - discovery.seed_hosts=es02,es03 # 其他集群的IP地址,docker容器名称 - cluster.initial_master_nodes=es01,es02,es03 # 主从节点 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" # JVM堆内存大小 volumes: - data01:/usr/share/elasticsearch/data # 数据卷位置 ports: - 9200:9200 networks: - elastic es02: image: elasticsearch:7.17.9 container_name: es02 environment: - node.name=es02 - cluster.name=es-docker-cluster - discovery.seed_hosts=es01,es03 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data02:/usr/share/elasticsearch/data ports: - 9201:9200 networks: - elastic es03: image: elasticsearch:7.127.9 container_name: es03 environment: - node.name=es03 - cluster.name=es-docker-cluster - discovery.seed_hosts=es01,es02 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data03:/usr/share/elasticsearch/data networks: - elastic ports: - 9202:9200 volumes: data01: driver: local data02: driver: local data03: driver: local networks: elastic: driver: bridge
es运行需要修改一些linux系统权限,修改/etc/sysctl.conf
文件
vi /etc/sysctl.conf • 1
添加下面的内容:
vm.max_map_count=262144
然后执行命令,让配置生效:
sysctl -p • 1
通过docker-compose启动集群:
docker-compose up -d
17.2.2 集群状态监控
安装解压,然后进入bin目录,运行cerebro.bat文件,然后访问https://localhots:9000
17.2.3 创建索引库
- 利用kibana的DevTools创建索引库
PUT /itcast { "settings": { "number_of_shards": 3, // 分片数量 "number_of_replicas": 1 // 副本数量 }, "mappings": { "properties": { // mapping映射定义 ... } } }
利用cerebro创建索引库
利用cerebro还可以创建索引库:
[外链图片转存中…(img-HGj8TqZi-1682515505301)]
填写索引库信息:
[外链图片转存中…(img-Xua3g7oA-1682515505301)]
点击右下角的create按钮:
[外链图片转存中…(img-CWJNp7yW-1682515505302)]
查看分片效果
回到首页,即可查看索引库分片效果:
[外链图片转存中…(img-1ntEbT83-1682515505303)]
17.3 ES集群的节点角色
elasticsearch中集群节点有不同的职责划分:
ES可以同时拥有以上的四个节点类型,但是在开发当中不一定都有每一个集群节点,都拥有这样四种类型
17.4 ES集群的分布式查询
elasticsearch中的每个节点角色都有自己不同的职责,因此建议集群部署时,每个节点都有独立的角色。
17.5 ES集群的脑裂
默认情况下,每个节点都是master eligible节点,因此一旦master节点宕机,其它候选节点会选举一个成为主节点。当主节点与其他节点网络故障时,可能发生脑裂问题。
为了避免脑裂,需要要求选票超过 ( eligible节点数量 + 1 )/ 2 才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题
17.6 ES集群的分布式存储
elasticsearch会通过hash算法来计算文档应该存储到哪个分片:
- _routing默认是文档的id
- 算法与分片数量有关,因此索引库一旦创建,分片数量不能修改!
17.6.1 新增文档浏览
- 深蓝色:主分片
- 浅蓝色:副分片
17.6.2 ES集群的分布式查询
elasticsearch的查询分成两个阶段:
scatter phase:分散阶段,coordinating node会把请求分发到每一个分片
gather phase:聚集阶段,coordinating node汇总data node的搜索结果,并处理为最终结果集返回给用户
[外链图片转存中…(img-hjNm18eh-1682515505306)]
分布式新增确定分片
coordinating node根据 id 做hash运算,得到结果对shard数量取余,余数就是对应的分片
分布式查询的两个阶段
分散阶段: coordinating node将查询请求分发给不同分片
收集阶段:将查询结果汇总到coordinating node ,整理并返回给用户
17.7 ES集群的故障转移
集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。
[外链图片转存中…(img-JyouVSog-1682515505307)]
当三个节点当中的node1节点挂了,就会自动选择其他的主节点,并且把故障节点上的数据迁移到健康的节点上。
故障转移:
master宕机后,EligibleMaster选举为新的主节点。
master节点监控分片、节点状态,将故障节点上的分片转移到正常节点,确保数据安全。