ElasticSearch-2.0.0集群安装配置与API使用实践

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介:

ElasticSearch是基于全文搜索引擎库Lucene构建的分布式搜索引擎,我们可以直接使用ElasticSearch实现分布式搜索系统的搭建与使用,都知道,Lucene只是一个搜索框架,它提供了搜索引擎操作的基本API,如果要实现一个能够使用的搜索引擎系统,还需要自己基于Lucene的API去实现,工作量很大,而且还需要很好地掌握Lucene的底层实现原理。
ElasticSearch是一个完整的分布式搜索引擎系统,它的一些基本特性包括如下:

  • 全文检索
  • 提供插件机制,可以共享重用插件的功能
  • 分布式文件存储
  • 分布式实时索引和搜索
  • 实时统计分析
  • 可以横向扩展,支持大规模数据的搜索
  • 简单易用的RESTful API
  • 基于Replication实现了数据的高可用特性
  • 与其他系统的集成
  • 支持结构化和非结构化数据
  • 灵活的Schema设计(Mappings)
  • 支持多编程语言客户端

我个人感觉,ElasticSearch尽量屏蔽底层Lucene相关的技术细节,让你根本无从感觉底层Lucene相关的内容,这样你可以省去了了解Lucene 的成本,学习曲线比较平缓,不像Solr,如果想要构造负责的查询(Query),还是要对Lucene有所了解的。另外,在分布式设计方面,ElasticSearch更轻量一些,用起来更简单,而使用Solr的分布式分片功能需要使用SolrCloud,它基于ZooKeeper来实现配置管理,以及Replication功能,而且Solr需要使用Web容器来部署,相对来说有点复杂一些(我个人之前使用的SolrCloud版本大概是3.1~3.5左右,比较早,现在可能更加完善了)。

基本概念

我们熟悉一下ElasticSearch中涉及到的一些基本概念:

  • 索引(Index)

索引(Index)是文档的集合,它是根据实际业务逻辑进行划分的,通常会把相对独立且具有相似结构或者性质的数据作为文档,放在一起,形成一个索引,比如,用户相关信息可以作为一个索引,交易相关信息也可应作为另一个索引。

  • 类型(Type)

类型(Type)是索引内部的一个逻辑划分,在一个索引内部可以定义多个类型(Type),类型将一个索引在逻辑上划分为多个集合,每个类型包含多个属性(字段)。比如,我们基于手机客户端应用App,创建一个了用户相关信息的索引,然后再在这个索引内部定义多个类型:基本信息类型、设备信息类型、行为信息类型,基本信息类型中包含用户编号、证件号码、名称、手机号码、年龄、出生日期,设备信息类型包括设备类型、设备名称、App版本号、渠道来源、系统版本、IMEI、mac地址,用户行为信息包含用户编号、事件编号、事件类型、时间、浏览页面代码、地区编码,这样有3个类型在一个索引当中。ElasticSearch中类型,与HBase中列簇(Column Family)的概念很相似。

  • 文档(Document)

文档(Document)是索引的基本单元,它与关系数据库中的一条记录相类似,包含了一组属性信息,同时包含一个唯一标识这一组属性值的ID,通过该ID可以更新一个文档,也可以删除一个文档。

  • 分片(Shards)&副本(Replicas)

一个索引是很多文档的集合,将一个索引进行分割,分成多个片段(一个索引的子集),每一个片段称为一个分片(Shard),这样划分可以很好地管理索引,跨节点存储,为分布式存储于搜索提供了便利。副本(Replica)是为了保证一个分片(Shard)的可用性,冗余复制存储,当一个分片对应的数据无法读取时,可以读取其副本,正常提供搜索服务。

集群安装配置

ElasticSearch集群安装配置非常容易,安装可以执行如下命令行:

1 wgethttps://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/zip/elasticsearch/2.0.0/elasticsearch-2.0.0.zip
2 unzip elasticsearch-2.0.0.zip

拿出集群的一个节点的进行配置,修改配置文件config/elasticsearch.yml的内容,如下所示:

01 # ======================== Elasticsearch Configuration =========================
02 #
03 # NOTE: Elasticsearch comes with reasonable defaults for most settings.
04 # Before you set out to tweak and tune the configuration, make sure you
05 # understand what are you trying to accomplish and the consequences.
06 #
07 # The primary way of configuring a node is via this file. This template lists
08 # the most important settings you may want to configure for a production cluster.
09 #
10 # Please see the documentation for further information on configuration options:
11 # <http://www.elastic.co/guide/en/elasticsearch/reference/current/setup-configuration.html>
12 #
13 # ---------------------------------- Cluster -----------------------------------
14 #
15 # Use a descriptive name for your cluster:
16 #
17 cluster.name: dw_search_engine
18 #
19 # ------------------------------------ Node ------------------------------------
20 #
21 # Use a descriptive name for the node:
22 #
23 node.name: esnode-01
24 #
25 # Add custom attributes to the node:
26 #
27 # node.rack: r1
28 #
29 # ----------------------------------- Paths ------------------------------------
30 #
31 # Path to directory where to store the data (separate multiple locations by comma):
32 #
33 path.data: /data/dw_search_storage
34 #
35 # Path to log files:
36 #
37 path.logs: /tmp/es/logs
38 #
39 # ----------------------------------- Memory -----------------------------------
40 #
41 # Lock the memory on startup:
42 #
43 # bootstrap.mlockall: true
44 #
45 # Make sure that the `ES_HEAP_SIZE` environment variable is set to about half the memory
46 # available on the system and that the owner of the process is allowed to use this limit.
47 #
48 # Elasticsearch performs poorly when the system is swapping the memory.
49 #
50 # ---------------------------------- Network -----------------------------------
51 #
52 # Set the bind adress to a specific IP (IPv4 or IPv6):
53 #
54 network.host: 10.10.2.62
55 #
56 # Set a custom port for HTTP:
57 #
58 http.port: 9200
59 #
60 # For more information, see the documentation at:
61 # <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html>
62 #
63 # ---------------------------------- Gateway -----------------------------------
64 #
65 # Block initial recovery after a full cluster restart until N nodes are started:
66 #
67 # gateway.recover_after_nodes: 3
68 #
69 # For more information, see the documentation at:
70 # <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-gateway.html>
71 #
72 # --------------------------------- Discovery ----------------------------------
73 #
74 # Elasticsearch nodes will find each other via unicast, by default.
75 #
76 # Pass an initial list of hosts to perform discovery when new node is started:
77 # The default list of hosts is ["127.0.0.1", "[::1]"]
78 #
79 discovery.zen.ping.unicast.hosts: ["es-01", "es-02"]
80 #
81 # Prevent the "split brain" by configuring the majority of nodes (total number of nodes / 2 + 1):
82 #
83 # discovery.zen.minimum_master_nodes: 3
84 #
85 # For more information, see the documentation at:
86 # <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery.html>
87 #
88 # ---------------------------------- Various -----------------------------------
89 #
90 # Disable starting multiple nodes on a single system:
91 #
92 # node.max_local_storage_nodes: 1
93 #
94 # Require explicit names when deleting indices:
95 #
96 # action.destructive_requires_name: true

其它节点的配置,在保证基本存储目录相同的前提下,可以根据需要修改如下几个参数:

1 node.name
2 network.host
3 http.port

最后,在每个节点上分别启动ElasticSearch,执行如下命令:

1 cd elasticsearch-2.0.0
2 bin/elasticsearch -d

然后可以查看Web管理界面,需要安装插件elasticsearch-head,后面会介绍,Web管理界面,如下所示:

上图中,我们已经创建了一个索引,可以看到节点的状态,及其分片(Shard)的情况。

RESTful API基本操作

尤其是在进行搜索的时候,为了使得其他系统能够与ElasticSearch搜索系统很好地解耦合,使用ElasticSearch提供的RESTful API是一种不错的选择。下面,我们介绍RESTful API的基本操作。

  • 插件管理

插件的存放目录为elasticsearch-2.0.0/plugins/,插件都是基于该存储目录进行操作的。
安装插件:

1 bin/plugin install analysis-icu
2 bin/plugin install mobz/elasticsearch-head

可以从不同的位置安装插件,上面第一个称为Core Elasticsearch plugin,它是Elasticsearch提供的,会从Elasticsearch上下载并安装;上面第一个是从Github上自动下载安装。还有其他的方式安装,如从特定的文件系统等进行安装。
列出插件:

1 bin/plugin list

删除插件:

1 bin/plugin remove analysis-icu

安装完一个插件,我们可以查看,例如查看elasticsearch_head插件,查看如下链接:

1 http://10.10.2.62:9200/_plugin/head/
  • 创建索引
1 curl -XPUT 'http://10.10.2.62:9200/basis_device_info/'

创建的索引名称为basis_device_info,我们也可以不指定一个索引对应的Mappings,而是在索引的时候自动生成Mappings,所以如果没有指定一个索引的Mappings,则这个索引可以支持任何的Mappings。同样可知,一个索引可以自动地增加不同的type,非常灵活。
也可以指定索引的基本配置,如分片(Shard)数目、副本(Replica)数目,如下所示:

1 curl -XPUT 'http://10.10.2.62:9200/basis_device_info /' -d '{
2 "settings" : {
3 "index" : {
4 "number_of_shards" : 10,
5 "number_of_replicas" : 1
6 }
7 }
8 }'

默认是5个分片,不进行复制,上面配置表示索引basis_device_info有10个分片,每个分片1个副本。
下面在创建索引的时候,指定设计的schema,即配置mappings,如下所示:

01 curl -XPUT 'http://10.10.2.62:9200/basis_device_info/' -d '
02 {
03 "mappings": {
04 "user": {
05 "_all": { "enabled": false },
06 "properties": {
07 "installid": { "type": "string" },
08 "appid": { "type": "string" },
09 "channel": { "type": "string", "index": "analyzed" },
10 "version": { "type": "string" },
11 "osversion": { "type": "string" },
12 "device_name": { "type": "string", "index": "analyzed" },
13 "producer": { "type": "string" },
14 "device_type": { "type": "string" },
15 "resolution": { "type": "string", "index": "analyzed" },
16 "screen_size": { "type": "string", "index": "analyzed" },
17 "mac": { "type": "string", "index": "not_analyzed" },
18 "idfa": { "type": "string" },
19 "idfv": { "type": "string", "index": "not_analyzed" },
20 "imei": { "type": "string", "index": "not_analyzed" },
21 "create_time": {
22 "type": "date",
23 "format": "yyyy-MM-dd HH:mm:ss",
24 "index": "not_analyzed"
25 }
26 }
27 }
28 }
29 }'

上面创建了索引basis_device_info,同时type为user,有了mappings,我们就知道需要索引的数据的格式了。

  • 删除索引
1 curl -XDELETE 'http://10.10.2.62:9200/basis_device_info/'

删除索引basis_device_info。

  • 索引文档
01 curl -PUT'http://10.10.2.62:9200/basis_device_info/user/CC49E748588490D41BFB89584007B0FA' -d '{
02 "installid": "0000000L",
03 "appid": "0",
04 "udid": "CC49E748588490D41BFB89584007B0FA",
05 "channel": "wulei1",
06 "version": "3.1.2",
07 "osversion": "8.1",
08 "device_name": "iPhone Retina4 Simulator",
09 "producer": "apple",
10 "device_type": "1",
11 "resolution": "640*1136",
12 "screen_size": "320*568",
13 "mac": "600308A20C5E",
14 "idfa": "dbbbs-fdsfa-fafda-321saf",
15 "idfv": "4283FAE1-19EB-4FA9-B739-8148F76BC8C3",
16 "imei": "af-sfd0fdsa-fad-ff",
17 "create_time": "2015-01-14 20:32:05"
18 }'

基于我们前面创建的type为user的索引,索引一个文档,文档_id为CC49E748588490D41BFB89584007B0FA,文档内容为一个用户设备信息,使用JSON格式表示。

  • 批量索引

批量索引,可以根据自己熟悉的编程语言或者脚本来实现,ElasticSearch也提供了一些客户端库。下面我们首先根据数据文件,构造成ElasticSearch索引支持的JSON格式,导出文件,然后通过curl工具去进行批量索引,实际上使用的是ElasticSearch提供的bulk API来实现的。
首先处理原始带索引数据,代码如下所示:

01 package org.shirdrn.es;
02
03 import java.io.BufferedReader;
04 import java.io.BufferedWriter;
05 import java.io.Closeable;
06 import java.io.File;
07 import java.io.FileReader;
08 import java.io.FileWriter;
09
10 import net.sf.json.JSONObject;
11
12 import com.google.common.base.Throwables;
13
14 public class EsIndexingClient {
15
16 public static void closeQuietly(Closeable... closeables) {
17 if(closeables != null) {
18 for(Closeable closeable : closeables) {
19 try {
20 closeable.close();
21 } catch (Exception e) { }
22 }
23 }
24 }
25
26 public static void main(String[] args) {
27 String f = "C:\\Users\\yanjun\\Desktop\\basis_device_info.txt";
28 String out = "C:\\Users\\yanjun\\Desktop\\basis_device_info.json";
29 File in = new File(f);
30 BufferedReader reader = null;
31 BufferedWriter writer = null;
32 try {
33 writer = new BufferedWriter(new FileWriter(out));
34 reader = new BufferedReader(new FileReader(in.getAbsoluteFile()));
35 String line = null;
36 while((line = reader.readLine()) != null) {
37 String[] a = line.split("\t", -1);
38 if(a.length == 16) {
39 String udid = a[2];
40
41 JSONObject c = new JSONObject();
42 c.put("_index", "basis_device_info");
43 c.put("_type", "user");
44 c.put("_id", udid);
45
46 JSONObject index = new JSONObject();
47 index.put("index", c);
48
49 JSONObject doc = new JSONObject();
50 doc.put("installid", a[0]);
51 doc.put("appid", a[1]);
52 doc.put("udid", a[2]);
53 doc.put("channel", a[3]);
54 doc.put("version", a[4]);
55 doc.put("osversion", a[5]);
56 doc.put("device_name", a[6]);
57 doc.put("producer", a[7]);
58 doc.put("device_type", a[8]);
59 doc.put("resolution", a[9]);
60 doc.put("screen_size", a[10]);
61 doc.put("mac", a[11]);
62 doc.put("idfa", a[12]);
63 doc.put("idfv", a[13]);
64 doc.put("imei", a[14]);
65 doc.put("create_time", a[15]);
66
67 writer.write(index.toString() + "\n");
68 writer.write(doc.toString() + "\n");
69 }
70 }
71
72 } catch (Exception e) {
73 throw Throwables.propagate(e);
74 } finally {
75 closeQuietly(reader, writer);
76 }
77
78 }
79 }

运行代码,输出的数据文件为basis_device_info.json,该文件的格式了,示例如下所示:

1 {"index":{"_index":"basis_device_info","_type":"user","_id":"1c207122a4b2c9632212ab86bac10f60"}}
2 {"installid":"00000002","appid":"0","udid":"1c207122a4b2c9632212ab86bac10f60","channel":"itings","version":"3.1.1","osversion":"4.1.2","device_name":"Lenovo P770","producer":"Lenovo","device_type":"0","resolution":"540*960","screen_size":"4.59","mac":"d4:22:3f:83:17:06","idfa":"","idfv":"","imei":"861166023335745","create_time":"2015-01-14 19:39:35"}
3 {"index":{"_index":"basis_device_info","_type":"user","_id":"FA6B1B98E6FF4E6994A1505A996F6102"}}
4 {"installid":"00000003","appid":"0","udid":"FA6B1B98E6FF4E6994A1505A996F6102","channel":"appstore","version":"3.1.1","osversion":"8.1.2","device_name":"iPhone 6Plus","producer":"apple","device_type":"1","resolution":"640*1136","screen_size":"320*568","mac":"020000000000","idfa":"84018625-A3C9-47A8-88D0-C57C12F80520","idfv":"9D1E2514-9DC8-47A8-ABD0-129FC0FB3171","imei":"","create_time":"2015-01-14 19:41:21"}
5 {"index":{"_index":"basis_device_info","_type":"user","_id":"8c5fe70b2408f184abcbe4f34b8f23c3"}}
6 {"installid":"00000004","appid":"0","udid":"8c5fe70b2408f184abcbe4f34b8f23c3","channel":"itings","version":"3.1.1.014","osversion":"4.2.2","device_name":"2014011","producer":"Xiaomi","device_type":"0","resolution":"720*1280","screen_size":"4.59","mac":"0c:1d:af:4f:48:9f","idfa":"","idfv":"","imei":"865763025472173","create_time":"2015-01-14 19:46:37"}

奇数编号行的内容为索引的指令信息,包括索引名称(_index)、类型(_type)、唯一标识(_id),偶数编号行的内容为实际待索引的文档数据。
然后,通过curl命令来进行批量索引,执行如下命令:

1 curl -s -XPOST http://10.10.2.62:9200/basis_device_info/_bulk --data-binary"@basis_device_info.json"
  • 搜索文档

简单的搜索,可以通过GET方式搜索,如下所示:

1 http://10.10.2.62:9200/basis_device_info/user/CC49E748588490D41BFB89584007B0FA
2
3
4 http://10.10.2.62:9200/basis_device_info/user/_search?q=channel:B-hicloud

上面第一个根据唯一的_id进行搜索,结果返回0个或者1个文档;第二个通过指定GET方式参数,其中_search和q是ElasticSearch内置的接口关键字,通过指定字段名称和搜索关键词的方式进行搜索,结果以JSON格式返回。

  • Request Body搜索

可以设置请求的body内容,能够支持更加复杂的查询条件然后请求搜索,如下所示:

1 curl -XGET 'http://10.10.2.245:9200/basis_device_info/user/_search' -d '{
2 "query" : {
3 "term" : { "udid": "bc0af2ca66a96725b8b0e0056d4213b6" }
4 }
5 }'

结果示例,如下所示:

1 {"took":11,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":9.45967,"hits":[{"_index":"basis_device_info","_type":"user","_id":"bc0af2ca66a96725b8b0e0056d4213b6","_score":9.45967,"_source":{"installid":"00000FPq","appid":"0","udid":"bc0af2ca66a96725b8b0e0056d4213b6","channel":"B-hicloud","version":"3.1.1","osversion":"4.4.2","device_name":"H60-L02","producer":"HUAWEI","device_type":"0","resolution":"720*1184","screen_size":"4.64","mac":"ec:cb:30:c4:93:e3","idfa":"","idfv":"","imei":"864103021536104","create_time":"2015-01-18 01:29:16"}}]}}
  • 基于Lucene查询语法搜索

如果熟悉Lucene查询(Query),可以构造通过构造复杂的Term关系字符串来进行搜索,示例如下所示:

1 curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search' -d '
2 {
3 "query": {
4 "query_string": { "query": "(channel:baidu OR device_name:HUAWEI)" }
5 }
6 }'

查询query字符串的含义是:从channel字段搜索baidu,从device_name字段搜索HUAWEI,然后两者取并集,这实际上一个布尔查询,返回最终结果。

  • 使用multi_match搜索

ElasticSearch支持给定搜索关键词,从多个字段中进行搜索,示例如下所示:

1 curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search' -d '
2 {
3 "query": {
4 "multi_match" : {
5 "query": "HTC",
6 "fields": [ "channel", "device_name" ]
7 }
8 }
9 }'

这样,只要在channel和device_name两个字段中出现关键词HTC,则都返回结果,结果应该是两个字段匹配上的文档集合的并集。

  • 支持Filter搜索

可以在制定Filter进行搜索。例如下面是一个按照时间范围进行过滤,得到搜索结果的查询:

01 curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search' -d '
02 {
03 "query": {
04 "filtered": {
05 "query": { "match_all": {} },
06 "filter" : {
07 "range" : {
08 "create_time" : { "from" : "2015-01-16 00:00:00", "to" : "2015-01-16 23:59:59" }
09 }
10 }
11 }
12 }
13 }'
  • 分页搜索

ElasticSearch支持分页搜索,可以通过在RESTful连接中指定size和from参数,来进行分页搜索,如下所示:

01 curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search?size=10&from=20' -d '
02 {
03 "query": {
04 "filtered": {
05 "query": { "match_all": {} },
06 "filter" : {
07 "range" : {
08 "create_time" : { "from" : "2015-01-16 00:00:00", "to" : "2015-01-16 23:59:59" }
09 }
10 }
11 }
12 }
13 }'

上面搜索的含义是:按照时间范围搜索,从第20个文档开始,返回10个文档,相当于一页取10个文档。

Java客户端

如果熟悉Java语言,而不想使用脚本等其他方式操作ElasticSearch搜索集群,则可以使用ElasticSearch提供的Java客户端API来编码实现,能够更加灵活地控制。ElasticSearch提供的Java客户端支持全部常用操作,如更新索引、索引文档、搜索文档、删除索引等等操作,而且还支持其他一些功能,如同步异步模式、explain查询等,下面我们通过代码来了解一下。
如果使用Maven管理Java代码,可以在pom.xml文件中加入如下依赖:

1 <dependency>
2 <groupId>org.elasticsearch</groupId>
3 <artifactId>elasticsearch</artifactId>
4 <version>2.0.0</version>
5 </dependency>

创建一个ElasticSearch客户端,代码如下所示:

1 // create & configure client
2 Settings settings = Settings.settingsBuilder()
3 .put("cluster.name", "dw_search_engine")
4 .put("client.transport.sniff", true)
5 .build();
6 final Client client = TransportClient.builder().settings(settings).build()
7 .addTransportAddress(newAddress("es-01", 9300))
8 .addTransportAddress(newAddress("es-02", 9300));

可以将你的ElasticSearch集群的节点通过上面的addTransportAddress方法,都与Client对象关联起来,这样在操作ElasticSearch集群中的索引/更新/删除/搜索文档的时候,就能够自动感知。上面newAddress方法如下:

1 private static InetSocketTransportAddress newAddress(String host, int port) throwsUnknownHostException {
2 return new InetSocketTransportAddress(InetAddress.getByName(host), port);
3 }

另外,也可以通过在配置文件elasticsearch.yml中指定相关配置,例如:

1 cluster.name: dw_search_engine
2 client.transport.sniff: true
3 client.transport.ping_timeout: 10s
4 client.transport.nodes_sampler_interval: 10s

那么,创建客户端需要从配置文件中读取配置内容,具体可以查看官方文档。

  • 准备工作

索引的时候,我们是从一个本地文件中读取数据,并构建索引文档需要的格式,然后请求ElasticSearch集群执行索引操作,下面代码是一些基本准备工作:

1 final String index = "basis_device_info";
2 final String type = "user";
3
4 // index documents
5 String f = "C:\\Users\\yanjun\\Desktop\\basis_device_info.txt";
6 File in = new File(f);

从文件中,每次读取一行记录,然后构建一个JSON格式字符串,通过XContentBuilder来表示,代码如下所示:

01 protected static XContentBuilder createSource(String[] a) throws IOException {
02 return jsonBuilder()
03 .startObject()
04 .field("installid", a[0])
05 .field("appid", a[1])
06 .field("udid", a[2])
07 .field("channel", a[3])
08 .field("version", a[4])
09 .field("osversion", a[5])
10 .field("device_name", a[6])
11 .field("producer", a[7])
12 .field("device_type", a[8])
13 .field("resolution", a[9])
14 .field("screen_size", a[10])
15 .field("mac", a[11])
16 .field("idfa", a[12])
17 .field("idfv", a[13])
18 .field("imei", a[14])
19 .field("create_time", a[15])
20 .endObject();
21 }

下面我们从API的功能入手,分别详细说明,并附加代码展示用法。

  • 创建索引

可以直接通过Java客户端库来创建索引,代码如下所示:

1 protected static void createIndex(final Client client, String index) {
2 Map<String, Object> indexSettings = Maps.newHashMap();
3 indexSettings.put("number_of_shards", "4");
4 indexSettings.put("number_of_replicas", "1");
5 CreateIndexRequest createIndexRequest = new CreateIndexRequest(
6 index, Settings.settingsBuilder().put(indexSettings).build());
7 CreateIndexResponse createIndexResponse = client.admin().indices().create(createIndexRequest).actionGet();
8 System.out.println(createIndexResponse);
9 }
  • 创建Mappings

通过Java客户端创建Mappings,相对比较复杂一点,需要拼接对应的JSON字符串,实现代码如下所示:

01 protected static void createMappings(final Client client, String index) throwsIOException, InterruptedException, ExecutionException {
02 XContentBuilder basisInfoMapping = jsonBuilder()
03 .startObject()
04 .startObject("_all")
05 .field("enabled", "false")
06 .endObject()
07 .startObject("properties")
08 .startObject("id")
09 .field("type", "string")
10 .endObject()
11 .startObject("name")
12 .field("type", "string")
13 .field("index", "analyzed")
14 .endObject()
15 .startObject("age")
16 .field("type", "int")
17 .endObject()
18 .startObject("birthday")
19 .field("type", "date")
20 .field("format", "yyyy-MM-dd HH:mm:ss")
21 .field("index", "not_analyzed")
22 .endObject()
23 .endObject()
24 .endObject();
25
26 XContentBuilder deviceInfoMapping = jsonBuilder()
27 .startObject()
28 .startObject("_all")
29 .field("enabled", "false")
30 .endObject()
31 .startObject("properties")
32 .startObject("udid")
33 .field("type", "string")
34 .endObject()
35 .startObject("device_name")
36 .field("type", "string")
37 .field("index", "analyzed")
38 .endObject()
39 .startObject("privoder")
40 .field("type", "string")
41 .field("index", "analyzed")
42 .endObject()
43 .startObject("os_version")
44 .field("type", "string")
45 .endObject()
46 .endObject()
47 .endObject();
48
49 PutMappingRequest putMappingRequest = Requests.putMappingRequest(index)
50 .type("basic_info")
51 .source(basisInfoMapping)
52 .type("device_info")
53 .source(deviceInfoMapping);
54
55 System.out.println(putMappingRequest.indicesOptions());
56
57 PutMappingResponse putMappingResponse = client.admin().indices().putMapping(putMappingRequest).get();
58 System.out.println(putMappingResponse);
59 }

上面代码创建了一个名称为app_user_info的索引,该索引具有basic_info和device_info这2个type,可以通过elasticsearch_head插件,在Web管理页面上查看对应的索引信息。

  • 索引单个文档

从文件中读取数据,一条记录构造一个文档,然后执行索引,代码如下所示:

01 protected static void indexDocs(final Client client, final String index, final String type, File in) {
02 BufferedReader reader = null;
03 try {
04 reader = new BufferedReader(new FileReader(in.getAbsoluteFile()));
05 String line = null;
06 while((line = reader.readLine()) != null) {
07 String[] a = line.split("\t", -1);
08 if(a.length == 16) {
09 String udid = a[2];
10 IndexResponse response =
11 client
12 .prepareIndex(index, type, udid)
13 .setSource(createSource(a))
14 .get();
15 System.out.println(response.toString());
16 }
17 }
18
19 } catch (Exception e) {
20 throw Throwables.propagate(e);
21 } finally {
22 closeQuietly(reader);
23 }
24 }
  • 批量索引

批量索引有多种方式,首先,通过Bulk API进行索引,我们自己控制每一个batch的大小,代码如下所示:

01 protected static void indexBulk(final Client client, final String index, final String type, File in) {
02 BulkRequestBuilder bulkRequest = client.prepareBulk();
03 final int batchSize = 100;
04 int counter = 0;
05 BufferedReader reader = null;
06 try {
07 reader = new BufferedReader(new FileReader(in.getAbsoluteFile()));
08 String line = null;
09 while((line = reader.readLine()) != null) {
10 String[] a = line.split("\t", -1);
11 if(a.length == 16) {
12 String udid = a[2];
13 IndexRequestBuilder indexRequestBuilder =
14 client
15 .prepareIndex(index, type, udid)
16 .setSource(createSource(a));
17 bulkRequest.add(indexRequestBuilder);
18 if(++counter >= batchSize) {
19 System.out.println(!bulkRequest.get().hasFailures());
20 counter = 0;
21 bulkRequest = client.prepareBulk();
22 }
23 }
24 }
25
26 } catch (Exception e) {
27 throw Throwables.propagate(e);
28 } finally {
29 System.out.println(!bulkRequest.get().hasFailures());
30 closeQuietly(reader);
31 }
32 }

另一种方式,是根据ElasticSearch提供的Bulk Processor来实现,只需要设置相关参数,就可以实现批量索引,这种方式更加灵活,示例如下所示:

01 protected static void indexUsingBulkProcessor(final Client client, final String index,final String type, File in) throws InterruptedException {
02 String name = "device_info_processor";
03 int bulkActions = 1000;
04 ByteSizeValue bulkSize = new ByteSizeValue(100, ByteSizeUnit.MB);
05 TimeValue flushInterval = TimeValue.timeValueSeconds(60);
06 int concurrentRequests = 12;
07
08 // create bulk processor
09 final BulkProcessor bulkProcessor = BulkProcessor.builder(client, newBulkProcessor.Listener() {
10
11 public void afterBulk(long id, BulkRequest req, BulkResponse resp) {
12 System.out.println("id=" + id + ", resp=" + resp);
13 }
14
15 public void afterBulk(long id, BulkRequest req, Throwable cause) {
16 System.out.println("id=" + id + ", req=" + req + ", cause=" + cause);
17 }
18
19 public void beforeBulk(long id, BulkRequest req) {
20 System.out.println("id=" + id + ", req=" + req);
21 }
22
23 })
24 .setName(name)
25 .setBulkActions(bulkActions)
26 .setBulkSize(bulkSize)
27 .setFlushInterval(flushInterval)
28 .setConcurrentRequests(concurrentRequests)
29 .build();
30
31 // index documents
32 BufferedReader reader = null;
33 try {
34 reader = new BufferedReader(new FileReader(in.getAbsoluteFile()));
35 String line = null;
36 while((line = reader.readLine()) != null) {
37 String[] a = line.split("\t", -1);
38 if(a.length == 16) {
39 String udid = a[2];
40 bulkProcessor.add(new IndexRequest(index, type, udid).source(createSource(a)));
41 }
42 }
43
44 } catch (Exception e) {
45 throw Throwables.propagate(e);
46 } finally {
47 closeQuietly(reader);
48
49 // close bulk processor
50 bulkProcessor.awaitClose(60, TimeUnit.SECONDS);
51 }
52 }

可以通过实现自定义的BulkProcessor.Listener,它提供了Hook的功能,比如,索引某个文档失败的话,可以在Hook方法中增加处理,实现重试的功能;再比如,如果索引成功,给其他系统服务一个回调,等等。

  • 更新文档

更新文档中的某些字段,需要指定id的值,以及需要更新的字段的值,代码如下所示:

01 protected static void updateDoc(final Client client, final String index, final String type) throws IOException, InterruptedException, ExecutionException {
02 String id = "60e90ddcb1a61622028b8d92112a646c";
03 UpdateRequest updateRequest = new UpdateRequest(index, type, id);
04 updateRequest.doc(jsonBuilder()
05 .startObject()
06 .field("channel", "h-google")
07 .field("appid", "1")
08 .endObject());
09 UpdateResponse response = client.update(updateRequest).get();
10 System.out.println(response);
11 }

如果更新文档的时候,文档不存在,则需要先执行索引操作,再进行更新操作,将这两个操作合并到一起,使用upsert操作,代码如下所示:

01 protected static void upsertDoc(final Client client, final String index, final String type) throws IOException, InterruptedException, ExecutionException {
02 String id = "fdd5ff7f56b613f0acb2c20a1ebc35e4";
03 IndexRequest indexRequest = new IndexRequest(index, type, id).source(jsonBuilder()
04 .startObject()
05 .field("installid", "00000BSe")
06 .field("appid", "0")
07 .field("udid", "fdd5ff7f56b613f0acb2c20a1ebc35e4")
08 .field("channel", "A-wandoujia")
09 .field("version", "3.1.1")
10 .field("resolution", "960*540")
11 .field("mac", "00:08:22:be:1b:b7")
12 .field("device_type", "0")
13 .field("device_name", "HTC")
14 .field("producer", "alps")
15 .field("create_time", "2015-01-17 17:15:36")
16 .endObject());
17
18 UpdateRequest updateRequest = new UpdateRequest(index, type, id).doc(jsonBuilder()
19 .startObject()
20 .field("resolution", "540*960")
21 .field("channel", "h-baidu")
22 .field("version", "3.1.1")
23 .field("imei", "861622010000056")
24 .endObject())
25 .upsert(indexRequest);
26 UpdateResponse response = client.update(updateRequest).get();
27 System.out.println(response);
28 }
  • 删除文档

删除文档,需要指定文档的id的值,代码如下所示:

1 protected static void deleteDoc(final Client client, final String index, final String type) {
2 String id = "60e90ddcb1a61622028b8d92112a646c";
3 DeleteResponse response = client.prepareDelete(index, type, id).get();
4 System.out.println(response);
5 }
  • 搜索文档

搜索文档,可以根据需要构造指定的查询(Query),可以设置过滤器等等,然后提交搜索,示例代码如下所示:

01 protected static void searchDocs(final Client client, final String index, final String type) {
02 SearchResponse response = client
03 .prepareSearch(index)
04 .setTypes(type)
05 .setQuery(QueryBuilders.termQuery("device_name", "xiaomi"))
06 .setPostFilter(QueryBuilders.rangeQuery("create_time").from("2015-01-16 00:00:00").to("2015-01-16 23:59:59"))
07 .setFrom(30).setSize(10).setExplain(true)
08 .execute()
09 .actionGet();
10 System.out.println(response);
11 }

查询(Query)的构造有很多的方式,比如构造布尔查询,指定与、或、非关系,然后提交搜索。执行搜索,可以设置搜索文档的起始偏移位置以及每次取多少个结果文档,这便能实现分页功能。

其他话题

ElasticSearch最经典的软件栈组合就是ELK(ElasticSearch Logstash Kibana),其中ElasticSearch提供了实时查询分析数据的功能,是一个非常通用的搜索引擎系统,而Logstash是一个日志管理工具,能够收集日志,对日志进行管理,Kibana是一个基于页面的前端展示工具,非常方便地使ElasticSearch中的数据可视化,具体使用起来如何,如果感兴趣可以尝试一下。
另外,ElasticSearch也被好多开源大数据系统所拥抱,比如Cloudera的CDH也整合了ElasticSearch作为搜索系统,ElasticSearch也可以和其他系统,如Hadoop、HBase等进行整合,使用领域比较广泛。

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
5天前
|
数据采集 人工智能 运维
从企业级 RAG 到 AI Assistant,阿里云Elasticsearch AI 搜索技术实践
本文介绍了阿里云 Elasticsearch 推出的创新型 AI 搜索方案
从企业级 RAG 到 AI Assistant,阿里云Elasticsearch AI 搜索技术实践
|
2天前
|
数据采集 人工智能 运维
从企业级 RAG 到 AI Assistant,阿里云Elasticsearch AI 搜索技术实践
本文介绍了阿里云 Elasticsearch 推出的创新型 AI 搜索方案。
|
10天前
|
存储 API 计算机视觉
自学记录HarmonyOS Next Image API 13:图像处理与传输的开发实践
在完成数字版权管理(DRM)项目后,我决定挑战HarmonyOS Next的图像处理功能,学习Image API和SendableImage API。这两个API支持图像加载、编辑、存储及跨设备发送共享。我计划开发一个简单的图像编辑与发送工具,实现图像裁剪、缩放及跨设备共享功能。通过研究,我深刻体会到HarmonyOS的强大设计,未来这些功能可应用于照片编辑、媒体共享等场景。如果你对图像处理感兴趣,不妨一起探索更多高级特性,共同进步。
67 11
|
5天前
|
人工智能 数据可视化 API
自学记录鸿蒙API 13:Calendar Kit日历功能从学习到实践
本文介绍了使用HarmonyOS的Calendar Kit开发日程管理应用的过程。通过API 13版本,不仅实现了创建、查询、更新和删除日程等基础功能,还深入探索了权限请求、日历配置、事件添加及查询筛选等功能。实战项目中,开发了一个智能日程管理工具,具备可视化管理、模糊查询和智能提醒等特性。最终,作者总结了模块化开发的优势,并展望了未来加入语音助手和AI推荐功能的计划。
117 1
|
20天前
|
存储 人工智能 API
(Elasticsearch)使用阿里云 infererence API 及 semantic text 进行向量搜索
本文展示了如何使用阿里云 infererence API 及 semantic text 进行向量搜索。
|
2月前
|
XML JSON 缓存
深入理解RESTful API设计原则与实践
在现代软件开发中,构建高效、可扩展的应用程序接口(API)是至关重要的。本文旨在探讨RESTful API的核心设计理念,包括其基于HTTP协议的特性,以及如何在实际应用中遵循这些原则来优化API设计。我们将通过具体示例和最佳实践,展示如何创建易于理解、维护且性能优良的RESTful服务,从而提升前后端分离架构下的开发效率和用户体验。
|
1月前
|
机器学习/深度学习 搜索推荐 API
淘宝/天猫按图搜索(拍立淘)API的深度解析与应用实践
在数字化时代,电商行业迅速发展,个性化、便捷性和高效性成为消费者新需求。淘宝/天猫推出的拍立淘API,利用图像识别技术,提供精准的购物搜索体验。本文深入探讨其原理、优势、应用场景及实现方法,助力电商技术和用户体验提升。
|
2月前
|
缓存 API 开发者
构建高效后端服务:RESTful API设计原则与实践
【10月更文挑战第43天】在数字化时代的浪潮中,后端服务的稳定性和效率成为企业竞争力的关键。本文将深入探讨如何构建高效的后端服务,重点介绍RESTful API的设计原则和实践技巧,帮助开发者提升服务的可用性、可扩展性和安全性。通过实际代码示例,我们将展示如何将这些原则应用到日常开发工作中,以确保后端服务能够支撑起现代Web和移动应用的需求。
|
30天前
|
监控 搜索推荐 测试技术
电商API的测试与用途:深度解析与实践
在电子商务蓬勃发展的今天,电商API成为连接电商平台、商家、消费者和第三方开发者的重要桥梁。本文深入探讨了电商API的核心功能,包括订单管理、商品管理、用户管理、支付管理和物流管理,并介绍了有效的测试技巧,如理解API文档、设计测试用例、搭建测试环境、自动化测试、压力测试、安全性测试等。文章还详细阐述了电商API的多样化用途,如商品信息获取、订单管理自动化、用户数据管理、库存同步、物流跟踪、支付处理、促销活动管理、评价管理、数据报告和分析、扩展平台功能及跨境电商等,旨在为开发者和电商平台提供有益的参考。
40 0
|
2月前
|
安全 测试技术 API
构建高效RESTful API:后端开发的艺术与实践####
在现代软件开发的浩瀚星空中,RESTful API如同一座桥梁,连接着前端世界的绚丽多彩与后端逻辑的深邃复杂。本文旨在探讨如何精心打造一款既高效又易于维护的RESTful API,通过深入浅出的方式,剖析其设计原则、实现技巧及最佳实践,为后端开发者提供一份实用的指南。我们不深入晦涩的理论,只聚焦于那些能够即刻提升API品质与开发效率的关键点,让你的API在众多服务中脱颖而出。 ####
35 0