十、Spring Data集成Elasticsearch
10.1、Spring Data介绍
Spring Data 是一个用于简化数据库、非关系型数据库、索引库访问,并支持云服务的开源框架。其主要目标是使得对数据的访问变得方便快捷,并支持 map-reduce 框架和云计算数据服务。 Spring Data 可以极大的简化 JPA(Elasticsearch„)的写法,可以在几乎不用写实现的情况下,实现对数据的访问和操作。除了 CRUD 外,还包括如分页、排序等一些常用的功能。Spring Data 的官网
Spring Data 常用的功能模块大致有:
10.2、Spring Data Elasticsearch介绍
Spring Data Elasticsearch 基于 spring data API 简化 Elasticsearch 操作,将原始操作Elasticsearch 的客户端 API 进行封装 。Spring Data 为 Elasticsearch 项目提供集成搜索引擎。Spring Data Elasticsearch POJO 的关键功能区域为中心的模型与 Elastichsearch 交互文档和轻松地编写一个存储索引库数据访问层。官方网站
10.3、Spring Data Elasticsearch 版本对比
目前最新 springboot 对应 Elasticsearch7.6.2,Spring boot2.3.x 一般可以兼容 Elasticsearch7.x。
10.4、集成开始
10.4.1、导入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency> 复制代码
10.4.2、编写application.proerties
# es 服务地址 elasticsearch.host=127.0.0.1 # es 服务端口 elasticsearch.port=9200 # 配置日志级别,开启 debug 日志 logging.level.com.atguigu.es=debug 复制代码
10.4.3、编写实体类
// @Document:代表一个文档记录 ,indexName:用来指定索引名称,type:用来指定索引类型 @Document(indexName = "dangdang",type = "book") @Data @AllArgsConstructor @NoArgsConstructor public class Book { // @Id:将对象中id和ES中_id映射 @Id private String id; // analyzer:用来指定使用哪种分词器 @Field(type = FieldType.Text,analyzer ="ik_max_word") private String name; // @eld:用来指定ES中的字段对应Mapping ,type:用来指定ES中存储类型 @Field(type = FieldType.Date) @JsonFormat(pattern="yyyy-MM-dd") private Date createDate; @Field(type = FieldType.Keyword) private String author; @Field(type = FieldType.Text,analyzer ="ik_max_word") private String content; } 复制代码
10.4.4、编写BookRepository
public interface BookRepository extends ElasticsearchRepository<Book,String> { } 复制代码
10.4.5、插入文档
这种方式根据实体类中中配置自动在ElasticSearch创建索引、类型以及映射。
@SpringBootTest(classes = Application.class) @RunWith(SpringRunner.class) public class TestSpringBootDataEs { @Autowired private BookRepository bookRespistory; /** * 添加索引和更新索引 id 存在更新 不存在添加 */ @Test public void testSaveOrUpdate(){ Book book = new Book(); book.setId("21"); book.setName("xiaolin"); book.setCreateDate(new Date()); book.setAuthor("李白"); book.setContent("这是中国的好人,这真的是一个很好的人,李白很狂"); bookRespistory.save(book); } } 复制代码
10.4.6、删除文档
/** * 删除一条文档 */ @Test public void testDelete(){ Book book = new Book(); book.setId("21"); bookRespistory.delete(book); } 复制代码
10.4.7、查询
/** * 查询所有 */ @Test public void testFindAll(){ Iterable<Book> books = bookRespistory.findAll(); for (Book book : books) { System.out.println(book); } } /** * 查询一个 */ @Test public void testFindOne(){ Optional<Book> byId = bookRespistory.findById("21"); System.out.println(byId.get()); } 复制代码
10.4.8、查询排序
/** * 排序查询 */ @Test public void testFindAllOrder(){ Iterable<Book> books = bookRespistory.findAll(Sort.by(Sort.Order.asc("createDate"))); books.forEach(book -> System.out.println(book) ); } 复制代码
10.4.9、分页查询并排序
@Test public void testSearchPage() throws IOException { SearchRequest searchRequest = new SearchRequest(); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.from(0).size(2).sort("age", SortOrder.DESC).query(QueryBuilders.matchAllQuery()); searchRequest.indices("ems").types("emp").source(sourceBuilder); SearchResponse search = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); SearchHit[] hits = search.getHits().getHits(); for (SearchHit hit : hits) { System.out.println(hit.getSourceAsString()); } } 复制代码
10.4.10、高亮查询
//高亮查询 @Test public void testHighter() throws IOException, ParseException { SearchRequest searchRequest = new SearchRequest("ems"); List<Emp> emps = new ArrayList<>(); searchRequest.types("emp"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.termQuery("content","武侠"))//设置条件 .sort("age", SortOrder.DESC) .from(0)//起始条数 (当前页-1)*size .size(20) .highlighter(new HighlightBuilder().field("*").requireFieldMatch(false).preTags("<span style='color:red'").postTags("</span>")); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); SearchHit[] hits = searchResponse.getHits().getHits(); for (SearchHit hit : hits) { //原始文档 Map<String,Object> sourceAsMap = hit.getSourceAsMap(); Emp emp = new Emp(); emp.setId(hit.getId()); emp.setName(sourceAsMap.get("name").toString()); emp.setAge(Integer.parseInt(sourceAsMap.get("age").toString())); emp.setBir(new SimpleDateFormat("yyyy-MM-dd").parse(sourceAsMap.get("bir").toString())); emp.setContent(sourceAsMap.get("content").toString()); emp.setAddress(sourceAsMap.get("address").toString()); //高亮字段 Map<String, HighlightField> highlightFields = hit.getHighlightFields(); if (highlightFields.containsKey("content")){ emp.setContent(highlightFields.get("content").fragments()[0].toString()); } if (highlightFields.containsKey("name")){ emp.setName(highlightFields.get("name").fragments()[0].toString()); } if (highlightFields.containsKey("address")){ emp.setAddress(highlightFields.get("address").fragments()[0].toString()); } emps.add(emp); } emps.forEach(emp -> { System.out.println(emp.toString()); }); } 复制代码
10.5、封装工具类
@Component public class ElasticsearchUtil { private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchUtil.class); @Autowired private TransportClient transportClient; private static TransportClient client; /** * @PostContruct是spring框架的注解 * spring容器初始化的时候执行该方法 */ @PostConstruct public void init() { client = this.transportClient; } /** * 创建索引 * * @param index * @return */ public static boolean createIndex(String index) { if(!isIndexExist(index)){ LOGGER.info("Index is not exits!"); } CreateIndexResponse indexResponse = client.admin().indices().prepareCreate(index).execute().actionGet(); LOGGER.info("执行建立成功?" + indexResponse.isAcknowledged()); return indexResponse.isAcknowledged(); } /** * 删除索引 * * @param index * @return */ public static boolean deleteIndex(String index) { if(!isIndexExist(index)) { LOGGER.info("Index is not exits!"); } DeleteIndexResponse dResponse = client.admin().indices().prepareDelete(index).execute().actionGet(); if (dResponse.isAcknowledged()) { LOGGER.info("delete index " + index + " successfully!"); } else { LOGGER.info("Fail to delete index " + index); } return dResponse.isAcknowledged(); } /** * 判断索引是否存在 * * @param index * @return */ public static boolean isIndexExist(String index) { IndicesExistsResponse inExistsResponse = client.admin().indices().exists(new IndicesExistsRequest(index)).actionGet(); if (inExistsResponse.isExists()) { LOGGER.info("Index [" + index + "] is exist!"); } else { LOGGER.info("Index [" + index + "] is not exist!"); } return inExistsResponse.isExists(); } /** * 数据添加,正定ID * * @param jsonObject 要增加的数据 * @param index 索引,类似数据库 * @param type 类型,类似表 * @param id 数据ID * @return */ public static String addData(JSONObject jsonObject, String index, String type, String id) { IndexResponse response = client.prepareIndex(index, type, id).setSource(jsonObject).get(); LOGGER.info("addData response status:{},id:{}", response.status().getStatus(), response.getId()); return response.getId(); } /** * 数据添加 * * @param jsonObject 要增加的数据 * @param index 索引,类似数据库 * @param type 类型,类似表 * @return */ public static String addData(JSONObject jsonObject, String index, String type) { return addData(jsonObject, index, type, UUID.randomUUID().toString().replaceAll("-", "").toUpperCase()); } /** * 通过ID删除数据 * * @param index 索引,类似数据库 * @param type 类型,类似表 * @param id 数据ID */ public static void deleteDataById(String index, String type, String id) { DeleteResponse response = client.prepareDelete(index, type, id).execute().actionGet(); LOGGER.info("deleteDataById response status:{},id:{}", response.status().getStatus(), response.getId()); } /** * 通过ID 更新数据 * * @param jsonObject 要增加的数据 * @param index 索引,类似数据库 * @param type 类型,类似表 * @param id 数据ID * @return */ public static void updateDataById(JSONObject jsonObject, String index, String type, String id) { UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index(index).type(type).id(id).doc(jsonObject); client.update(updateRequest); } /** * 通过ID获取数据 * * @param index 索引,类似数据库 * @param type 类型,类似表 * @param id 数据ID * @param fields 需要显示的字段,逗号分隔(缺省为全部字段) * @return */ public static Map<String,Object> searchDataById(String index, String type, String id, String fields) { GetRequestBuilder getRequestBuilder = client.prepareGet(index, type, id); if (StringUtils.isNotEmpty(fields)) { getRequestBuilder.setFetchSource(fields.split(","), null); } GetResponse getResponse = getRequestBuilder.execute().actionGet(); return getResponse.getSource(); } /** * 使用分词查询,并分页 * * @param index 索引名称 * @param type 类型名称,可传入多个type逗号分隔 * @param startPage 当前页 * @param pageSize 每页显示条数 * @param query 查询条件 * @param fields 需要显示的字段,逗号分隔(缺省为全部字段) * @param sortField 排序字段 * @param highlightField 高亮字段 * @return */ public static EsPage searchDataPage(String index, String type, int startPage, int pageSize, QueryBuilder query, String fields, String sortField, String highlightField) { SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index); if (StringUtils.isNotEmpty(type)) { searchRequestBuilder.setTypes(type.split(",")); } searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH); // 需要显示的字段,逗号分隔(缺省为全部字段) if (StringUtils.isNotEmpty(fields)) { searchRequestBuilder.setFetchSource(fields.split(","), null); } //排序字段 if (StringUtils.isNotEmpty(sortField)) { searchRequestBuilder.addSort(sortField, SortOrder.DESC); } // 高亮(xxx=111,aaa=222) if (StringUtils.isNotEmpty(highlightField)) { HighlightBuilder highlightBuilder = new HighlightBuilder(); //highlightBuilder.preTags("<span style='color:red' >");//设置前缀 //highlightBuilder.postTags("</span>");//设置后缀 // 设置高亮字段 highlightBuilder.field(highlightField); searchRequestBuilder.highlighter(highlightBuilder); } //searchRequestBuilder.setQuery(QueryBuilders.matchAllQuery()); searchRequestBuilder.setQuery(query); // 分页应用 searchRequestBuilder.setFrom(startPage).setSize(pageSize); // 设置是否按查询匹配度排序 searchRequestBuilder.setExplain(true); //打印的内容 可以在 Elasticsearch head 和 Kibana 上执行查询 LOGGER.info("\n{}", searchRequestBuilder); // 执行搜索,返回搜索响应信息 SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); long totalHits = searchResponse.getHits().totalHits; long length = searchResponse.getHits().getHits().length; LOGGER.debug("共查询到[{}]条数据,处理数据条数[{}]", totalHits, length); if (searchResponse.status().getStatus() == 200) { // 解析对象 List<Map<String, Object>> sourceList = setSearchResponse(searchResponse, highlightField); return new EsPage(startPage, pageSize, (int) totalHits, sourceList); } return null; } /** * 使用分词查询 * * @param index 索引名称 * @param type 类型名称,可传入多个type逗号分隔 * @param query 查询条件 * @param size 文档大小限制 * @param fields 需要显示的字段,逗号分隔(缺省为全部字段) * @param sortField 排序字段 * @param highlightField 高亮字段 * @return */ public static List<Map<String, Object>> searchListData(String index, String type, QueryBuilder query, Integer size, String fields, String sortField, String highlightField) { SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index); if (StringUtils.isNotEmpty(type)) { searchRequestBuilder.setTypes(type.split(",")); } if (StringUtils.isNotEmpty(highlightField)) { HighlightBuilder highlightBuilder = new HighlightBuilder(); // 设置高亮字段 highlightBuilder.field(highlightField); searchRequestBuilder.highlighter(highlightBuilder); } searchRequestBuilder.setQuery(query); if (StringUtils.isNotEmpty(fields)) { searchRequestBuilder.setFetchSource(fields.split(","), null); } searchRequestBuilder.setFetchSource(true); if (StringUtils.isNotEmpty(sortField)) { searchRequestBuilder.addSort(sortField, SortOrder.DESC); } if (size != null && size > 0) { searchRequestBuilder.setSize(size); } //打印的内容 可以在 Elasticsearch head 和 Kibana 上执行查询 LOGGER.info("\n{}", searchRequestBuilder); SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); long totalHits = searchResponse.getHits().totalHits; long length = searchResponse.getHits().getHits().length; LOGGER.info("共查询到[{}]条数据,处理数据条数[{}]", totalHits, length); if (searchResponse.status().getStatus() == 200) { // 解析对象 return setSearchResponse(searchResponse, highlightField); } return null; } /** * 高亮结果集 特殊处理 * * @param searchResponse * @param highlightField */ private static List<Map<String, Object>> setSearchResponse(SearchResponse searchResponse, String highlightField) { List<Map<String, Object>> sourceList = new ArrayList<Map<String, Object>>(); StringBuffer stringBuffer = new StringBuffer(); for (SearchHit searchHit : searchResponse.getHits().getHits()) { searchHit.getSourceAsMap().put("id", searchHit.getId()); if (StringUtils.isNotEmpty(highlightField)) { System.out.println("遍历 高亮结果集,覆盖 正常结果集" + searchHit.getSourceAsMap()); Text[] text = searchHit.getHighlightFields().get(highlightField).getFragments(); if (text != null) { for (Text str : text) { stringBuffer.append(str.string()); } //遍历 高亮结果集,覆盖 正常结果集 searchHit.getSourceAsMap().put(highlightField, stringBuffer.toString()); } } sourceList.add(searchHit.getSourceAsMap()); } return sourceList; } } 复制代码
十一、ElasticSearch集群
11.1、相关概念
11.1.1、集群(cluster)
一个集群就是由一个或多个节点组织在一起,它们共同持有你整个的数据,并一起提供索引和搜索功能。一个集群 由一个唯一的名字标识,这个名字默认就是elasticsearch
。这个名字是重要的,因为一个节点只能通过指定某个集群的名字,来加入这个集群。在产品环境中显式地设定这个名字是一个好习惯,但是使用默认值来进行测试/开发也是不错的。
11.1.2、节点
一个节点是你集群中的一个服务器,作为集群的一部分,它存储你的数据,参与集群的索引和搜索功能。和集群类似,一个节点也是由一个名字来标识的,默认情况下,这个名字是一个随机的漫威漫画角色的名字,这个名字会在启动的时候赋予节点。这个名字对于管理工作来说挺重要的,因为在这个管理过程中,你会去确定网络中的哪些服务器对应于Elasticsearch集群中的哪些节点。
一个节点可以通过配置集群名称的方式来加入一个指定的集群。默认情况下,每个节点都会被安排加入到一个叫 做“elasticsearch”的集群中,这意味着,如果你在你的网络中启动了若干个节点,并假定它们能够相互发现彼此,它们将会自动地形成并加入到一个叫做“elasticsearch”的集群中。
在一个集群里,只要你想,可以拥有任意多个节点。而且,如果当前你的网络中没有运行任何Elasticsearch节点, 这时启动一个节点,会默认创建并加入一个叫做“elasticsearch”的集群。
11.1.3、分片和复制(shards & replicas)
一个索引可以存储超出单个结点硬件限制的大量数据。比如,一个具有10亿文档的索引占据1TB的磁盘空间,而任一节点都没有这样大的磁盘空间;或者单个节点处理搜索请求,响应太慢。为了解决这个问题,Elasticsearch提供了将索引划分成多份的能力,这些份就叫做分片。
当你创建一个索引的时候,你可以指定你想要的分片的数量。每个分片本身也是一个功能完善并且独立的“索引”,这个“索引”可以被放置 到集群中的任何节点上。 分片之所以重要,主要有两方面的原因: 允许你水平分割/扩展你的内容容量允许你在分片(潜在地,位于多个节点上)之上进行分布式的、并行的操作,进而提高性能/吞吐量 至于一个分片怎样分布,它的文档怎样聚合回搜索请求,是完全由Elasticsearch管理的,对于作为用户的你来说,这些都是透明的。
在一个网络/云的环境里,失败随时都可能发生,在某个分片/节点不知怎么的就处于离线状态,或者由于任何原因 消失了。这种情况下,有一个故障转移机制是非常有用并且是强烈推荐的。为此目的,Elasticsearch允许你创建分 片的一份或多份拷贝,这些拷贝叫做复制分片,或者直接叫复制。
复制之所以重要,主要有两方面的原因: 在分片/节点失败的情况下,提供了高可用性。因为这个原因,注意到复制分片从不与原/主要 (original/primary)分片置于同一节点上是非常重要的。 扩展你的搜索量/吞吐量,因为搜索可以在所有的复制上并行运行 。
总之,每个索引可以被分成多个分片。一个索引也可以被复制0次(意思是没有复制)或多次。一旦复制了,每个 索引就有了主分片(作为复制源的原来的分片)和复制分片(主分片的拷贝)之别。分片和复制的数量可以在索引创建的时候指定。在索引创建之后,你可以在任何时候动态地改变复制数量,但是不能改变分片的数量。
默认情况下,Elasticsearch中的每个索引被分片5个主分片和1个复制,这意味着,如果你的集群中至少有两个节点,你的索引将会有5个主分片和另外5个复制分片(1个完全拷贝),这样的话每个索引总共就有10个分片。一个 索引的多个分片可以存放在集群中的一台主机上,也可以存放在多台主机上,这取决于你的集群机器数量。主分片和复制分片的具体位置是由ES内在的策略所决定的。
11.2、集群架构图
11.3、搭建集群
将原有ES安装包复制三份
cp -r elasticsearch-6.2.4/ master/ cp -r elasticsearch-6.2.4/ slave1/ cp -r elasticsearch-6.2.4/ slave2/ 复制代码
删除复制目录中data目录
# 由于复制目录之前使用过因此需要在创建集群时将原来数据删除 rm -rf master/data rm -rf slave1/data rm -rf slave2/data 复制代码
编辑没有文件夹中config目录中jvm.options文件跳转启动内存
# 此步骤是调整启动时占JVM内存的大小,也可以关机去调整虚拟机的大小,因为一般的虚拟机都只可以启动一个elasticsearch服务,启动集群的时需要更大的内存,分别在下面配置文件中加入: -Xms512m -Xmx512m vim master/config/jvm.options vim slave1/config/jvm.options vim slave2/config/jvm.options 复制代码
分别修改三个文件夹中config目录中elasticsearch.yml文件
vim master/config/elasticsearch.yml vim salve1/config/elasticsearch.yml vim slave2/config/elasticsearch.yml # 分别修改如下配置: cluster.name: my-es #集群名称(集群名称必须一致) node.name: es-03 #节点名称(节点名称不能一致) network.host: 0.0.0.0 #监听地址(必须开启远程权限,并关闭防火墙) http.port: 9200 #监听端口(在一台机器时服务端口不能一致) discovery.zen.ping.unicast.hosts: ["172.30.2.175:9301", "172.30.2.201:9302"] #另外两个节点的ip gateway.recover_after_nodes: 3 #集群可做master的最小节点数 transport.tcp.port: 9300 #集群TCP端口(在一台机器搭建必须修改) 9301 9302 9303 复制代码
启动多个es
./master/bin/elasticsearch ./slave1/bin/elasticsearch ./slave2/bin/elasticsearch 复制代码
查看节点状态
curl http://10.102.115.3:9200 curl http://10.102.115.3:8200 curl http://10.102.115.3:7200 复制代码
查看集群健康
http://10.102.115.3:9200/_cat/health?v 复制代码
11.4、安装elasticsearch-head插件
有两种方式安装elasticsearch-head插件:
- 基于Node.js的部署
- 基于Chrome浏览器插件部署
11.4.1、方式一
elasticsearch-head下载到本地
git clone git://github.com/mobz/elasticsearch-head.git 复制代码
安装nodejs
# 没有wget的请先安装yum install -y wget wget http://cdn.npm.taobao.org/dist/node/latest-v8.x/node-v8.1.2-linux-x64.tar.xz 复制代码
解压缩nodejs
xz -d node-v10.15.3-linux-arm64.tar.xz 解压为tar包 复制代码
配置环境变量
mv node-v10.15.3-linux-arm64 nodejs mv nodejs /usr/nodejs vim /etc/profile # 在文件中书写下面两个配置,具体的路径自行更改 export NODE_HOME=/usr/nodejs export PATH=$PATH:$JAVA_HOME/bin:$NODE_HOME/bin 复制代码
进入elasticsearch-head的目录
npm config set registry https://registry.npm.taobao.org npm install npm run start 复制代码
编写elastsearch.yml配置文件开启head插件的访问
http.cors.enabled: true http.cors.allow-origin: "*" 复制代码
启动访问head插件 默认端口9100
http://ip:9100 查看集群状态 复制代码
11.4.2、基于Chrome浏览器插件
插件的百度链接:密码6666
下载插件后,到chrome的扩展程序里→打开开发者模式→加载已解压的扩展程序→上传刚才的es-head压缩包, 打开浏览器看右上角的es-head图标插件, 即安装成功。