ES代码总结2

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介:
一、简介

ElasticSearch和Solr都是基于Lucene的搜索引擎,不过ElasticSearch天生支持分布式,而Solr是4.0版本后的SolrCloud才是分布式版本,Solr的分布式支持需要ZooKeeper的支持。

这里有一个详细的ElasticSearch和Solr的对比:http://solr-vs-elasticsearch.com/

二、基本用法

Elasticsearch集群可以包含多个索引(indices),每一个索引可以包含多个类型(types),每一个类型包含多个文档(documents),然后每个文档包含多个字段(Fields),这种面向文档型的储存,也算是NoSQL的一种吧。

ES比传统关系型数据库,对一些概念上的理解:

Relational DB -> Databases -> Tables -> Rows -> Columns
Elasticsearch -> Indices   -> Types  -> Documents -> Fields

从创建一个Client到添加、删除、查询等基本用法:

1、创建Client

public ElasticSearchService(String ipAddress, int port) {
        client = new TransportClient()
                .addTransportAddress(new InetSocketTransportAddress(ipAddress,
                        port));
    }

这里是一个TransportClient。

ES下两种客户端对比:

TransportClient:轻量级的Client,使用Netty线程池,Socket连接到ES集群。本身不加入到集群,只作为请求的处理。

Node Client:客户端节点本身也是ES节点,加入到集群,和其他ElasticSearch节点一样。频繁的开启和关闭这类Node Clients会在集群中产生“噪音”。

2、创建/删除Index和Type信息

复制代码
 1 // 创建索引
 2     public void createIndex() {
 3         client.admin().indices().create(new CreateIndexRequest(IndexName))
 4                 .actionGet();
 5     }
 6 
 7     // 清除所有索引
 8     public void deleteIndex() {
 9         IndicesExistsResponse indicesExistsResponse = client.admin().indices()
10                 .exists(new IndicesExistsRequest(new String[] { IndexName }))
11                 .actionGet();
12         if (indicesExistsResponse.isExists()) {
13             client.admin().indices().delete(new DeleteIndexRequest(IndexName))
14                     .actionGet();
15         }
16     }
17     
18     // 删除Index下的某个Type
19     public void deleteType(){
20         client.prepareDelete().setIndex(IndexName).setType(TypeName).execute().actionGet();
21     }
22 
23     // 定义索引的映射类型
24     public void defineIndexTypeMapping() {
25         try {
26             XContentBuilder mapBuilder = XContentFactory.jsonBuilder();
27             mapBuilder.startObject()
28             .startObject(TypeName)
29                 .startObject("properties")
30                     .startObject(IDFieldName).field("type", "long").field("store", "yes").endObject()
31                     .startObject(SeqNumFieldName).field("type", "long").field("store", "yes").endObject()
32                     .startObject(IMSIFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
33                     .startObject(IMEIFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
34                     .startObject(DeviceIDFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
35                     .startObject(OwnAreaFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
36                     .startObject(TeleOperFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
37                     .startObject(TimeFieldName).field("type", "date").field("store", "yes").endObject()
38                 .endObject()
39             .endObject()
40             .endObject();
41 
42             PutMappingRequest putMappingRequest = Requests
43                     .putMappingRequest(IndexName).type(TypeName)
44                     .source(mapBuilder);
45             client.admin().indices().putMapping(putMappingRequest).actionGet();
46         } catch (IOException e) {
47             log.error(e.toString());
48         }
49     }
复制代码

这里自定义了某个Type的索引映射(Mapping),默认ES会自动处理数据类型的映射:针对整型映射为long,浮点数为double,字符串映射为string,时间为date,true或false为boolean。

注意:针对字符串,ES默认会做“analyzed”处理,即先做分词、去掉stop words等处理再index。如果你需要把一个字符串做为整体被索引到,需要把这个字段这样设置:field("index", "not_analyzed")。

详情参考:https://www.elastic.co/guide/en/elasticsearch/guide/current/mapping-intro.html

个人注:

设置mapping信息 这段代码是在ES中的索引库index和类型都已经建立完之后,再向type中插入数据之前设置要插入数据对应的mappings信息...

如果给一个还没有创建的索引库,类型 设置mapping信息 可以参考如下代码:

也可以实现判断一下这个索引库index 或者指定索引库index对应的类型type到底是否存在...再设置mapping信息.

注意:插入数据之后是无法修改索引库对应的mapping信息的...只能删了重新创建. 如下部分我在项目中使用的代码:

复制代码
 1     public void setMappings(){
 2         //mappings
 3         try {
 4             XContentBuilder mappings = XContentFactory.jsonBuilder();
 5             mappings.startObject()
 6                 .startObject(EsUtil.mmobjectTypename)
 7                     .startObject("properties")
 8                         .startObject("name").field("type","text").endObject()
 9                         .startObject("search_createtime").field("type","text").endObject()
10                     .endObject()
11                 .endObject()
12             .endObject();
13             boolean exists = transportClient.admin().indices()
14                     .prepareExists(EsUtil.indexname)
15                     .execute().actionGet().isExists();
16             if(exists){
17                 PutMappingRequest putMappingRequest = Requests.
18                         putMappingRequest(EsUtil.indexname).type(EsUtil.mmobjectTypename)
19                         .source(mappings);
20                 transportClient.admin().indices().putMapping(putMappingRequest).actionGet();
21             }else{
22                 CreateIndexRequestBuilder prepareCreate = transportClient.admin().indices().prepareCreate(EsUtil.indexname);
23                 prepareCreate.addMapping(EsUtil.mmobjectTypename, mappings).execute().actionGet();
24             }
25         } catch (IOException e) {
26             e.printStackTrace();
27         }
28     }
复制代码

关于设置mappings信息还可以参考如下:

http://stackoverflow.com/questions/23552845/configure-elasticsearch-mapping-with-java-api

3、索引数据

复制代码
 1 // 批量索引数据
 2     public void indexHotSpotDataList(List<Hotspotdata> dataList) {
 3         if (dataList != null) {
 4             int size = dataList.size();
 5             if (size > 0) {
 6                 BulkRequestBuilder bulkRequest = client.prepareBulk();
 7                 for (int i = 0; i < size; ++i) {
 8                     Hotspotdata data = dataList.get(i);
 9                     String jsonSource = getIndexDataFromHotspotData(data);
10                     if (jsonSource != null) {
11                         bulkRequest.add(client
12                                 .prepareIndex(IndexName, TypeName,
13                                         data.getId().toString())
14                                 .setRefresh(true).setSource(jsonSource));
15                     }
16                 }
17 
18                 BulkResponse bulkResponse = bulkRequest.execute().actionGet();
19                 if (bulkResponse.hasFailures()) {
20                     Iterator<BulkItemResponse> iter = bulkResponse.iterator();
21                     while (iter.hasNext()) {
22                         BulkItemResponse itemResponse = iter.next();
23                         if (itemResponse.isFailed()) {
24                             log.error(itemResponse.getFailureMessage());
25                         }
26                     }
27                 }
28             }
29         }
30     }
31 
32     // 索引数据
33     public boolean indexHotspotData(Hotspotdata data) {
34         String jsonSource = getIndexDataFromHotspotData(data);
35         if (jsonSource != null) {
36             IndexRequestBuilder requestBuilder = client.prepareIndex(IndexName,
37                     TypeName).setRefresh(true);
38             requestBuilder.setSource(jsonSource)
39                     .execute().actionGet();
40             return true;
41         }
42 
43         return false;
44     }
45 
46     // 得到索引字符串
47     public String getIndexDataFromHotspotData(Hotspotdata data) {
48         String jsonString = null;
49         if (data != null) {
50             try {
51                 XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
52                 jsonBuilder.startObject().field(IDFieldName, data.getId())
53                         .field(SeqNumFieldName, data.getSeqNum())
54                         .field(IMSIFieldName, data.getImsi())
55                         .field(IMEIFieldName, data.getImei())
56                         .field(DeviceIDFieldName, data.getDeviceID())
57                         .field(OwnAreaFieldName, data.getOwnArea())
58                         .field(TeleOperFieldName, data.getTeleOper())
59                         .field(TimeFieldName, data.getCollectTime())
60                         .endObject();
61                 jsonString = jsonBuilder.string();
62             } catch (IOException e) {
63                 log.equals(e);
64             }
65         }
66 
67         return jsonString;
68     }
复制代码

ES支持批量和单个数据索引。

4、查询获取数据

复制代码
 1 // 获取少量数据100个
 2     private List<Integer> getSearchData(QueryBuilder queryBuilder) {
 3         List<Integer> ids = new ArrayList<>();
 4         SearchResponse searchResponse = client.prepareSearch(IndexName)
 5                 .setTypes(TypeName).setQuery(queryBuilder).setSize(100)
 6                 .execute().actionGet();
 7         SearchHits searchHits = searchResponse.getHits();
 8         for (SearchHit searchHit : searchHits) {
 9             Integer id = (Integer) searchHit.getSource().get("id");
10             ids.add(id);
11         }
12         return ids;
13     }
14 
15     // 获取大量数据
16     private List<Integer> getSearchDataByScrolls(QueryBuilder queryBuilder) {
17         List<Integer> ids = new ArrayList<>();
18         // 一次获取100000数据
19         SearchResponse scrollResp = client.prepareSearch(IndexName)
20                 .setSearchType(SearchType.SCAN).setScroll(new TimeValue(60000))
21                 .setQuery(queryBuilder).setSize(100000).execute().actionGet();
22         while (true) {
23             for (SearchHit searchHit : scrollResp.getHits().getHits()) {
24                 Integer id = (Integer) searchHit.getSource().get(IDFieldName);
25                 ids.add(id);
26             }
27             scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())
28                     .setScroll(new TimeValue(600000)).execute().actionGet();
29             if (scrollResp.getHits().getHits().length == 0) {
30                 break;
31             }
32         }
33 
34         return ids;
35     }
复制代码

这里的QueryBuilder是一个查询条件,ES支持分页查询获取数据,也可以一次性获取大量数据,需要使用Scroll Search。

5、聚合(Aggregation Facet)查询 

复制代码
 1 // 得到某段时间内设备列表上每个设备的数据分布情况<设备ID,数量>
 2     public Map<String, String> getDeviceDistributedInfo(String startTime,
 3             String endTime, List<String> deviceList) {
 4 
 5         Map<String, String> resultsMap = new HashMap<>();
 6 
 7         QueryBuilder deviceQueryBuilder = getDeviceQueryBuilder(deviceList);
 8         QueryBuilder rangeBuilder = getDateRangeQueryBuilder(startTime, endTime);
 9         QueryBuilder queryBuilder = QueryBuilders.boolQuery()
10                 .must(deviceQueryBuilder).must(rangeBuilder);
11 
12         TermsBuilder termsBuilder = AggregationBuilders.terms("DeviceIDAgg").size(Integer.MAX_VALUE)
13                 .field(DeviceIDFieldName);
14         SearchResponse searchResponse = client.prepareSearch(IndexName)
15                 .setQuery(queryBuilder).addAggregation(termsBuilder)
16                 .execute().actionGet();
17         Terms terms = searchResponse.getAggregations().get("DeviceIDAgg");
18         if (terms != null) {
19             for (Terms.Bucket entry : terms.getBuckets()) {
20                 resultsMap.put(entry.getKey(),
21                         String.valueOf(entry.getDocCount()));
22             }
23         }
24         return resultsMap;
25     }
复制代码

Aggregation查询可以查询类似统计分析这样的功能:如某个月的数据分布情况,某类数据的最大、最小、总和、平均值等。

详情参考:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-aggs.html

三、集群配置

配置文件elasticsearch.yml

集群名和节点名:

#cluster.name: elasticsearch

#node.name: "Franz Kafka"

是否参与master选举和是否存储数据

#node.master: true

#node.data: true

分片数和副本数

#index.number_of_shards: 5
#index.number_of_replicas: 1

master选举最少的节点数,这个一定要设置为整个集群节点个数的一半加1,即N/2+1

#discovery.zen.minimum_master_nodes: 1

discovery ping的超时时间,拥塞网络,网络状态不佳的情况下设置高一点

#discovery.zen.ping.timeout: 3s

注意,分布式系统整个集群节点个数N要为奇数个!!

如何避免ElasticSearch发生脑裂(brain split):http://blog.trifork.com/2013/10/24/how-to-avoid-the-split-brain-problem-in-elasticsearch/

即使集群节点个数为奇数,minimum_master_nodes为整个集群节点个数一半加1,也难以避免脑裂的发生,详情看讨论:https://github.com/elastic/elasticsearch/issues/2488

四、Elasticsearch插件

1、elasticsearch-head是一个elasticsearch的集群管理工具:./elasticsearch-1.7.1/bin/plugin -install mobz/elasticsearch-head

2、elasticsearch-sql:使用SQL语法查询elasticsearch:./bin/plugin -u https://github.com/NLPchina/elasticsearch-sql/releases/download/1.3.5/elasticsearch-sql-1.3.5.zip --install sql

github地址:https://github.com/NLPchina/elasticsearch-sql

3、elasticsearch-bigdesk是elasticsearch的一个集群监控工具,可以通过它来查看ES集群的各种状态。

安装:./bin/plugin -install lukas-vlcek/bigdesk

访问:http://192.103.101.203:9200/_plugin/bigdesk/

4、elasticsearch-servicewrapper插件是ElasticSearch的服务化插件,

在https://github.com/elasticsearch/elasticsearch-servicewrapper下载该插件后,解压缩,将service目录拷贝到elasticsearch目录的bin目录下。

而后,可以通过执行以下语句安装、启动、停止ElasticSearch:

sh elasticsearch install

sh elasticsearch start

sh elasticsearch stop

 

参考:

https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html

https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html

http://stackoverflow.com/questions/10213009/solr-vs-elasticsearch

=============================================

java代码操作ES的settings配置信息


本文转自SummerChill博客园博客,原文链接:http://www.cnblogs.com/DreamDrive/p/6709936.html,如需转载请自行联系原作者

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
3月前
|
JavaScript
es5和es6的区别
es5和es6的区别
20 0
|
3月前
|
存储 JavaScript 前端开发
|
21天前
|
前端开发 JavaScript
前端最常见的es6,es7,es8方法
【4月更文挑战第3天】 前端最常见的es6,es7,es8方法
22 5
|
24天前
|
前端开发 JavaScript API
ES6和ES5的区别
ES6和ES5的区别
11 0
|
3月前
|
JSON 前端开发 数据格式
ES7、ES8、ES9、ES10、ES11、ES12都增加了那些新特性?(一)
ES7、ES8、ES9、ES10、ES11、ES12都增加了那些新特性?(一)
|
3月前
|
前端开发 JavaScript Java
ES7、ES8、ES9、ES10、ES11、ES12都增加了那些新特性?(二)
ES7、ES8、ES9、ES10、ES11、ES12都增加了那些新特性?(二)
|
8月前
|
JavaScript 前端开发
ES5、ES6和ES2015有什么区别?
ES5、ES6和ES2015有什么区别?
250 0
|
10月前
|
JavaScript
|
10月前
|
JavaScript 前端开发 Java
每天3分钟,重学ES6-ES12(八)ES11 ES12新增内容
每天3分钟,重学ES6-ES12(八)ES11 ES12新增内容
78 0