ElasticSearch是基于全文搜索引擎库Lucene构建的分布式搜索引擎,我们可以直接使用ElasticSearch实现分布式搜索系统的搭建与使用,都知道,Lucene只是一个搜索框架,它提供了搜索引擎操作的基本API,如果要实现一个能够使用的搜索引擎系统,还需要自己基于Lucene的API去实现,工作量很大,而且还需要很好地掌握Lucene的底层实现原理。
ElasticSearch是一个完整的分布式搜索引擎系统,它的一些基本特性包括如下:
- 全文检索
- 提供插件机制,可以共享重用插件的功能
- 分布式文件存储
- 分布式实时索引和搜索
- 实时统计分析
- 可以横向扩展,支持大规模数据的搜索
- 简单易用的RESTful API
- 基于Replication实现了数据的高可用特性
- 与其他系统的集成
- 支持结构化和非结构化数据
- 灵活的Schema设计(Mappings)
- 支持多编程语言客户端
我个人感觉,ElasticSearch尽量屏蔽底层Lucene相关的技术细节,让你根本无从感觉底层Lucene相关的内容,这样你可以省去了了解Lucene 的成本,学习曲线比较平缓,不像Solr,如果想要构造负责的查询(Query),还是要对Lucene有所了解的。另外,在分布式设计方面,ElasticSearch更轻量一些,用起来更简单,而使用Solr的分布式分片功能需要使用SolrCloud,它基于ZooKeeper来实现配置管理,以及Replication功能,而且Solr需要使用Web容器来部署,相对来说有点复杂一些(我个人之前使用的SolrCloud版本大概是3.1~3.5左右,比较早,现在可能更加完善了)。
基本概念
我们熟悉一下ElasticSearch中涉及到的一些基本概念:
索引(Index)是文档的集合,它是根据实际业务逻辑进行划分的,通常会把相对独立且具有相似结构或者性质的数据作为文档,放在一起,形成一个索引,比如,用户相关信息可以作为一个索引,交易相关信息也可应作为另一个索引。
类型(Type)是索引内部的一个逻辑划分,在一个索引内部可以定义多个类型(Type),类型将一个索引在逻辑上划分为多个集合,每个类型包含多个属性(字段)。比如,我们基于手机客户端应用App,创建一个了用户相关信息的索引,然后再在这个索引内部定义多个类型:基本信息类型、设备信息类型、行为信息类型,基本信息类型中包含用户编号、证件号码、名称、手机号码、年龄、出生日期,设备信息类型包括设备类型、设备名称、App版本号、渠道来源、系统版本、IMEI、mac地址,用户行为信息包含用户编号、事件编号、事件类型、时间、浏览页面代码、地区编码,这样有3个类型在一个索引当中。ElasticSearch中类型,与HBase中列簇(Column Family)的概念很相似。
文档(Document)是索引的基本单元,它与关系数据库中的一条记录相类似,包含了一组属性信息,同时包含一个唯一标识这一组属性值的ID,通过该ID可以更新一个文档,也可以删除一个文档。
一个索引是很多文档的集合,将一个索引进行分割,分成多个片段(一个索引的子集),每一个片段称为一个分片(Shard),这样划分可以很好地管理索引,跨节点存储,为分布式存储于搜索提供了便利。副本(Replica)是为了保证一个分片(Shard)的可用性,冗余复制存储,当一个分片对应的数据无法读取时,可以读取其副本,正常提供搜索服务。
集群安装配置
ElasticSearch集群安装配置非常容易,安装可以执行如下命令行:
2 |
unzip elasticsearch-2.0.0.zip |
拿出集群的一个节点的进行配置,修改配置文件config/elasticsearch.yml的内容,如下所示:
01 |
# ======================== Elasticsearch Configuration ========================= |
03 |
# NOTE: Elasticsearch comes with reasonable defaults for most settings. |
04 |
# Before you set out to tweak and tune the configuration, make sure you |
05 |
# understand what are you trying to accomplish and the consequences. |
07 |
# The primary way of configuring a node is via this file. This template lists |
08 |
# the most important settings you may want to configure for a production cluster. |
10 |
# Please see the documentation for further information on configuration options: |
13 |
# ---------------------------------- Cluster ----------------------------------- |
15 |
# Use a descriptive name for your cluster: |
17 |
cluster.name: dw_search_engine |
19 |
# ------------------------------------ Node ------------------------------------ |
21 |
# Use a descriptive name for the node: |
25 |
# Add custom attributes to the node: |
29 |
# ----------------------------------- Paths ------------------------------------ |
31 |
# Path to directory where to store the data (separate multiple locations by comma): |
33 |
path.data: /data/dw_search_storage |
37 |
path.logs: /tmp/es/logs |
39 |
# ----------------------------------- Memory ----------------------------------- |
41 |
# Lock the memory on startup: |
43 |
# bootstrap.mlockall: true |
45 |
# Make sure that the `ES_HEAP_SIZE` environment variable is set to about half the memory |
46 |
# available on the system and that the owner of the process is allowed to use this limit. |
48 |
# Elasticsearch performs poorly when the system is swapping the memory. |
50 |
# ---------------------------------- Network ----------------------------------- |
52 |
# Set the bind adress to a specific IP (IPv4 or IPv6): |
54 |
network.host: 10.10.2.62 |
56 |
# Set a custom port for HTTP: |
60 |
# For more information, see the documentation at: |
63 |
# ---------------------------------- Gateway ----------------------------------- |
65 |
# Block initial recovery after a full cluster restart until N nodes are started: |
67 |
# gateway.recover_after_nodes: 3 |
69 |
# For more information, see the documentation at: |
72 |
# --------------------------------- Discovery ---------------------------------- |
74 |
# Elasticsearch nodes will find each other via unicast, by default. |
76 |
# Pass an initial list of hosts to perform discovery when new node is started: |
77 |
# The default list of hosts is ["127.0.0.1", "[::1]"] |
79 |
discovery.zen.ping.unicast.hosts: ["es-01", "es-02"] |
81 |
# Prevent the "split brain" by configuring the majority of nodes (total number of nodes / 2 + 1): |
83 |
# discovery.zen.minimum_master_nodes: 3 |
85 |
# For more information, see the documentation at: |
88 |
# ---------------------------------- Various ----------------------------------- |
90 |
# Disable starting multiple nodes on a single system: |
92 |
# node.max_local_storage_nodes: 1 |
94 |
# Require explicit names when deleting indices: |
96 |
# action.destructive_requires_name: true |
其它节点的配置,在保证基本存储目录相同的前提下,可以根据需要修改如下几个参数:
最后,在每个节点上分别启动ElasticSearch,执行如下命令:
然后可以查看Web管理界面,需要安装插件elasticsearch-head,后面会介绍,Web管理界面,如下所示:
上图中,我们已经创建了一个索引,可以看到节点的状态,及其分片(Shard)的情况。
RESTful API基本操作
尤其是在进行搜索的时候,为了使得其他系统能够与ElasticSearch搜索系统很好地解耦合,使用ElasticSearch提供的RESTful API是一种不错的选择。下面,我们介绍RESTful API的基本操作。
插件的存放目录为elasticsearch-2.0.0/plugins/,插件都是基于该存储目录进行操作的。
安装插件:
1 |
bin/plugin install analysis-icu |
2 |
bin/plugin install mobz/elasticsearch- head |
可以从不同的位置安装插件,上面第一个称为Core Elasticsearch plugin,它是Elasticsearch提供的,会从Elasticsearch上下载并安装;上面第一个是从Github上自动下载安装。还有其他的方式安装,如从特定的文件系统等进行安装。
列出插件:
删除插件:
1 |
bin/plugin remove analysis-icu |
安装完一个插件,我们可以查看,例如查看elasticsearch_head插件,查看如下链接:
创建的索引名称为basis_device_info,我们也可以不指定一个索引对应的Mappings,而是在索引的时候自动生成Mappings,所以如果没有指定一个索引的Mappings,则这个索引可以支持任何的Mappings。同样可知,一个索引可以自动地增加不同的type,非常灵活。
也可以指定索引的基本配置,如分片(Shard)数目、副本(Replica)数目,如下所示:
4 |
"number_of_shards" : 10, |
5 |
"number_of_replicas" : 1 |
默认是5个分片,不进行复制,上面配置表示索引basis_device_info有10个分片,每个分片1个副本。
下面在创建索引的时候,指定设计的schema,即配置mappings,如下所示:
05 |
"_all" : { "enabled" : false }, |
07 |
"installid" : { "type" : "string" }, |
08 |
"appid" : { "type" : "string" }, |
09 |
"channel" : { "type" : "string" , "index" : "analyzed" }, |
10 |
"version" : { "type" : "string" }, |
11 |
"osversion" : { "type" : "string" }, |
12 |
"device_name" : { "type" : "string" , "index" : "analyzed" }, |
13 |
"producer" : { "type" : "string" }, |
14 |
"device_type" : { "type" : "string" }, |
15 |
"resolution" : { "type" : "string" , "index" : "analyzed" }, |
16 |
"screen_size" : { "type" : "string" , "index" : "analyzed" }, |
17 |
"mac" : { "type" : "string" , "index" : "not_analyzed" }, |
18 |
"idfa" : { "type" : "string" }, |
19 |
"idfv" : { "type" : "string" , "index" : "not_analyzed" }, |
20 |
"imei" : { "type" : "string" , "index" : "not_analyzed" }, |
23 |
"format" : "yyyy-MM-dd HH:mm:ss" , |
24 |
"index" : "not_analyzed" |
上面创建了索引basis_device_info,同时type为user,有了mappings,我们就知道需要索引的数据的格式了。
删除索引basis_device_info。
02 |
"installid" : "0000000L" , |
04 |
"udid" : "CC49E748588490D41BFB89584007B0FA" , |
08 |
"device_name" : "iPhone Retina4 Simulator" , |
11 |
"resolution" : "640*1136" , |
12 |
"screen_size" : "320*568" , |
13 |
"mac" : "600308A20C5E" , |
14 |
"idfa" : "dbbbs-fdsfa-fafda-321saf" , |
15 |
"idfv" : "4283FAE1-19EB-4FA9-B739-8148F76BC8C3" , |
16 |
"imei" : "af-sfd0fdsa-fad-ff" , |
17 |
"create_time" : "2015-01-14 20:32:05" |
基于我们前面创建的type为user的索引,索引一个文档,文档_id为CC49E748588490D41BFB89584007B0FA,文档内容为一个用户设备信息,使用JSON格式表示。
批量索引,可以根据自己熟悉的编程语言或者脚本来实现,ElasticSearch也提供了一些客户端库。下面我们首先根据数据文件,构造成ElasticSearch索引支持的JSON格式,导出文件,然后通过curl工具去进行批量索引,实际上使用的是ElasticSearch提供的bulk API来实现的。
首先处理原始带索引数据,代码如下所示:
01 |
package org.shirdrn.es; |
03 |
import java.io.BufferedReader; |
04 |
import java.io.BufferedWriter; |
05 |
import java.io.Closeable; |
07 |
import java.io.FileReader; |
08 |
import java.io.FileWriter; |
10 |
import net.sf.json.JSONObject; |
12 |
import com.google.common.base.Throwables; |
14 |
public class EsIndexingClient { |
16 |
public static void closeQuietly(Closeable... closeables) { |
17 |
if (closeables != null ) { |
18 |
for (Closeable closeable : closeables) { |
21 |
} catch (Exception e) { } |
26 |
public static void main(String[] args) { |
27 |
String f = "C:\\Users\\yanjun\\Desktop\\basis_device_info.txt" ; |
28 |
String out = "C:\\Users\\yanjun\\Desktop\\basis_device_info.json" ; |
29 |
File in = new File(f); |
30 |
BufferedReader reader = null ; |
31 |
BufferedWriter writer = null ; |
33 |
writer = new BufferedWriter( new FileWriter(out)); |
34 |
reader = new BufferedReader( new FileReader(in.getAbsoluteFile())); |
36 |
while ((line = reader.readLine()) != null ) { |
37 |
String[] a = line.split( "\t" , - 1 ); |
41 |
JSONObject c = new JSONObject(); |
42 |
c.put( "_index" , "basis_device_info" ); |
43 |
c.put( "_type" , "user" ); |
46 |
JSONObject index = new JSONObject(); |
47 |
index.put( "index" , c); |
49 |
JSONObject doc = new JSONObject(); |
50 |
doc.put( "installid" , a[ 0 ]); |
51 |
doc.put( "appid" , a[ 1 ]); |
52 |
doc.put( "udid" , a[ 2 ]); |
53 |
doc.put( "channel" , a[ 3 ]); |
54 |
doc.put( "version" , a[ 4 ]); |
55 |
doc.put( "osversion" , a[ 5 ]); |
56 |
doc.put( "device_name" , a[ 6 ]); |
57 |
doc.put( "producer" , a[ 7 ]); |
58 |
doc.put( "device_type" , a[ 8 ]); |
59 |
doc.put( "resolution" , a[ 9 ]); |
60 |
doc.put( "screen_size" , a[ 10 ]); |
61 |
doc.put( "mac" , a[ 11 ]); |
62 |
doc.put( "idfa" , a[ 12 ]); |
63 |
doc.put( "idfv" , a[ 13 ]); |
64 |
doc.put( "imei" , a[ 14 ]); |
65 |
doc.put( "create_time" , a[ 15 ]); |
67 |
writer.write(index.toString() + "\n" ); |
68 |
writer.write(doc.toString() + "\n" ); |
72 |
} catch (Exception e) { |
73 |
throw Throwables.propagate(e); |
75 |
closeQuietly(reader, writer); |
运行代码,输出的数据文件为basis_device_info.json,该文件的格式了,示例如下所示:
1 |
{"index":{"_index":"basis_device_info","_type":"user","_id":"1c207122a4b2c9632212ab86bac10f60"}} |
2 |
{"installid":"00000002","appid":"0","udid":"1c207122a4b2c9632212ab86bac10f60","channel":"itings","version":"3.1.1","osversion":"4.1.2","device_name":"Lenovo P770","producer":"Lenovo","device_type":"0","resolution":"540*960","screen_size":"4.59","mac":"d4:22:3f:83:17:06","idfa":"","idfv":"","imei":"861166023335745","create_time":"2015-01-14 19:39:35"} |
3 |
{"index":{"_index":"basis_device_info","_type":"user","_id":"FA6B1B98E6FF4E6994A1505A996F6102"}} |
4 |
{"installid":"00000003","appid":"0","udid":"FA6B1B98E6FF4E6994A1505A996F6102","channel":"appstore","version":"3.1.1","osversion":"8.1.2","device_name":"iPhone 6Plus","producer":"apple","device_type":"1","resolution":"640*1136","screen_size":"320*568","mac":"020000000000","idfa":"84018625-A3C9-47A8-88D0-C57C12F80520","idfv":"9D1E2514-9DC8-47A8-ABD0-129FC0FB3171","imei":"","create_time":"2015-01-14 19:41:21"} |
5 |
{"index":{"_index":"basis_device_info","_type":"user","_id":"8c5fe70b2408f184abcbe4f34b8f23c3"}} |
6 |
{"installid":"00000004","appid":"0","udid":"8c5fe70b2408f184abcbe4f34b8f23c3","channel":"itings","version":"3.1.1.014","osversion":"4.2.2","device_name":"2014011","producer":"Xiaomi","device_type":"0","resolution":"720*1280","screen_size":"4.59","mac":"0c:1d:af:4f:48:9f","idfa":"","idfv":"","imei":"865763025472173","create_time":"2015-01-14 19:46:37"} |
奇数编号行的内容为索引的指令信息,包括索引名称(_index)、类型(_type)、唯一标识(_id),偶数编号行的内容为实际待索引的文档数据。
然后,通过curl命令来进行批量索引,执行如下命令:
简单的搜索,可以通过GET方式搜索,如下所示:
上面第一个根据唯一的_id进行搜索,结果返回0个或者1个文档;第二个通过指定GET方式参数,其中_search和q是ElasticSearch内置的接口关键字,通过指定字段名称和搜索关键词的方式进行搜索,结果以JSON格式返回。
可以设置请求的body内容,能够支持更加复杂的查询条件然后请求搜索,如下所示:
3 |
"term" : { "udid" : "bc0af2ca66a96725b8b0e0056d4213b6" } |
结果示例,如下所示:
1 |
{"took":11,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":9.45967,"hits":[{"_index":"basis_device_info","_type":"user","_id":"bc0af2ca66a96725b8b0e0056d4213b6","_score":9.45967,"_source":{"installid":"00000FPq","appid":"0","udid":"bc0af2ca66a96725b8b0e0056d4213b6","channel":"B-hicloud","version":"3.1.1","osversion":"4.4.2","device_name":"H60-L02","producer":"HUAWEI","device_type":"0","resolution":"720*1184","screen_size":"4.64","mac":"ec:cb:30:c4:93:e3","idfa":"","idfv":"","imei":"864103021536104","create_time":"2015-01-18 01:29:16"}}]}} |
如果熟悉Lucene查询(Query),可以构造通过构造复杂的Term关系字符串来进行搜索,示例如下所示:
4 |
"query_string" : { "query" : "(channel:baidu OR device_name:HUAWEI)" } |
查询query字符串的含义是:从channel字段搜索baidu,从device_name字段搜索HUAWEI,然后两者取并集,这实际上一个布尔查询,返回最终结果。
ElasticSearch支持给定搜索关键词,从多个字段中进行搜索,示例如下所示:
6 |
"fields" : [ "channel" , "device_name" ] |
这样,只要在channel和device_name两个字段中出现关键词HTC,则都返回结果,结果应该是两个字段匹配上的文档集合的并集。
可以在制定Filter进行搜索。例如下面是一个按照时间范围进行过滤,得到搜索结果的查询:
05 |
"query" : { "match_all" : {} }, |
08 |
"create_time" : { "from" : "2015-01-16 00:00:00" , "to" : "2015-01-16 23:59:59" } |
ElasticSearch支持分页搜索,可以通过在RESTful连接中指定size和from参数,来进行分页搜索,如下所示:
05 |
"query" : { "match_all" : {} }, |
08 |
"create_time" : { "from" : "2015-01-16 00:00:00" , "to" : "2015-01-16 23:59:59" } |
上面搜索的含义是:按照时间范围搜索,从第20个文档开始,返回10个文档,相当于一页取10个文档。
Java客户端
如果熟悉Java语言,而不想使用脚本等其他方式操作ElasticSearch搜索集群,则可以使用ElasticSearch提供的Java客户端API来编码实现,能够更加灵活地控制。ElasticSearch提供的Java客户端支持全部常用操作,如更新索引、索引文档、搜索文档、删除索引等等操作,而且还支持其他一些功能,如同步异步模式、explain查询等,下面我们通过代码来了解一下。
如果使用Maven管理Java代码,可以在pom.xml文件中加入如下依赖:
2 |
< groupId >org.elasticsearch</ groupId > |
3 |
< artifactId >elasticsearch</ artifactId > |
4 |
< version >2.0.0</ version > |
创建一个ElasticSearch客户端,代码如下所示:
2 |
Settings settings = Settings.settingsBuilder() |
3 |
.put( "cluster.name" , "dw_search_engine" ) |
4 |
.put( "client.transport.sniff" , true ) |
6 |
final Client client = TransportClient.builder().settings(settings).build() |
7 |
.addTransportAddress(newAddress( "es-01" , 9300 )) |
8 |
.addTransportAddress(newAddress( "es-02" , 9300 )); |
可以将你的ElasticSearch集群的节点通过上面的addTransportAddress方法,都与Client对象关联起来,这样在操作ElasticSearch集群中的索引/更新/删除/搜索文档的时候,就能够自动感知。上面newAddress方法如下:
1 |
private static InetSocketTransportAddress newAddress(String host, int port) throws UnknownHostException { |
2 |
return new InetSocketTransportAddress(InetAddress.getByName(host), port); |
另外,也可以通过在配置文件elasticsearch.yml中指定相关配置,例如:
1 |
cluster.name: dw_search_engine |
2 |
client.transport.sniff: true |
3 |
client.transport.ping_timeout: 10s |
4 |
client.transport.nodes_sampler_interval: 10s |
那么,创建客户端需要从配置文件中读取配置内容,具体可以查看官方文档。
索引的时候,我们是从一个本地文件中读取数据,并构建索引文档需要的格式,然后请求ElasticSearch集群执行索引操作,下面代码是一些基本准备工作:
1 |
final String index = "basis_device_info" ; |
2 |
final String type = "user" ; |
5 |
String f = "C:\\Users\\yanjun\\Desktop\\basis_device_info.txt" ; |
从文件中,每次读取一行记录,然后构建一个JSON格式字符串,通过XContentBuilder来表示,代码如下所示:
01 |
protected static XContentBuilder createSource(String[] a) throws IOException { |
04 |
.field( "installid" , a[ 0 ]) |
07 |
.field( "channel" , a[ 3 ]) |
08 |
.field( "version" , a[ 4 ]) |
09 |
.field( "osversion" , a[ 5 ]) |
10 |
.field( "device_name" , a[ 6 ]) |
11 |
.field( "producer" , a[ 7 ]) |
12 |
.field( "device_type" , a[ 8 ]) |
13 |
.field( "resolution" , a[ 9 ]) |
14 |
.field( "screen_size" , a[ 10 ]) |
19 |
.field( "create_time" , a[ 15 ]) |
下面我们从API的功能入手,分别详细说明,并附加代码展示用法。
可以直接通过Java客户端库来创建索引,代码如下所示:
1 |
protected static void createIndex( final Client client, String index) { |
2 |
Map<String, Object> indexSettings = Maps.newHashMap(); |
3 |
indexSettings.put( "number_of_shards" , "4" ); |
4 |
indexSettings.put( "number_of_replicas" , "1" ); |
5 |
CreateIndexRequest createIndexRequest = new CreateIndexRequest( |
6 |
index, Settings.settingsBuilder().put(indexSettings).build()); |
7 |
CreateIndexResponse createIndexResponse = client.admin().indices().create(createIndexRequest).actionGet(); |
8 |
System.out.println(createIndexResponse); |
通过Java客户端创建Mappings,相对比较复杂一点,需要拼接对应的JSON字符串,实现代码如下所示:
01 |
protected static void createMappings( final Client client, String index) throws IOException, InterruptedException, ExecutionException { |
02 |
XContentBuilder basisInfoMapping = jsonBuilder() |
05 |
.field( "enabled" , "false" ) |
07 |
.startObject( "properties" ) |
09 |
.field( "type" , "string" ) |
12 |
.field( "type" , "string" ) |
13 |
.field( "index" , "analyzed" ) |
18 |
.startObject( "birthday" ) |
19 |
.field( "type" , "date" ) |
20 |
.field( "format" , "yyyy-MM-dd HH:mm:ss" ) |
21 |
.field( "index" , "not_analyzed" ) |
26 |
XContentBuilder deviceInfoMapping = jsonBuilder() |
29 |
.field( "enabled" , "false" ) |
31 |
.startObject( "properties" ) |
33 |
.field( "type" , "string" ) |
35 |
.startObject( "device_name" ) |
36 |
.field( "type" , "string" ) |
37 |
.field( "index" , "analyzed" ) |
39 |
.startObject( "privoder" ) |
40 |
.field( "type" , "string" ) |
41 |
.field( "index" , "analyzed" ) |
43 |
.startObject( "os_version" ) |
44 |
.field( "type" , "string" ) |
49 |
PutMappingRequest putMappingRequest = Requests.putMappingRequest(index) |
51 |
.source(basisInfoMapping) |
53 |
.source(deviceInfoMapping); |
55 |
System.out.println(putMappingRequest.indicesOptions()); |
57 |
PutMappingResponse putMappingResponse = client.admin().indices().putMapping(putMappingRequest).get(); |
58 |
System.out.println(putMappingResponse); |
上面代码创建了一个名称为app_user_info的索引,该索引具有basic_info和device_info这2个type,可以通过elasticsearch_head插件,在Web管理页面上查看对应的索引信息。
从文件中读取数据,一条记录构造一个文档,然后执行索引,代码如下所示:
01 |
protected static void indexDocs( final Client client, final String index, final String type, File in) { |
02 |
BufferedReader reader = null ; |
04 |
reader = new BufferedReader( new FileReader(in.getAbsoluteFile())); |
06 |
while ((line = reader.readLine()) != null ) { |
07 |
String[] a = line.split( "\t" , - 1 ); |
10 |
IndexResponse response = |
12 |
.prepareIndex(index, type, udid) |
13 |
.setSource(createSource(a)) |
15 |
System.out.println(response.toString()); |
19 |
} catch (Exception e) { |
20 |
throw Throwables.propagate(e); |
批量索引有多种方式,首先,通过Bulk API进行索引,我们自己控制每一个batch的大小,代码如下所示:
01 |
protected static void indexBulk( final Client client, final String index, final String type, File in) { |
02 |
BulkRequestBuilder bulkRequest = client.prepareBulk(); |
03 |
final int batchSize = 100 ; |
05 |
BufferedReader reader = null ; |
07 |
reader = new BufferedReader( new FileReader(in.getAbsoluteFile())); |
09 |
while ((line = reader.readLine()) != null ) { |
10 |
String[] a = line.split( "\t" , - 1 ); |
13 |
IndexRequestBuilder indexRequestBuilder = |
15 |
.prepareIndex(index, type, udid) |
16 |
.setSource(createSource(a)); |
17 |
bulkRequest.add(indexRequestBuilder); |
18 |
if (++counter >= batchSize) { |
19 |
System.out.println(!bulkRequest.get().hasFailures()); |
21 |
bulkRequest = client.prepareBulk(); |
26 |
} catch (Exception e) { |
27 |
throw Throwables.propagate(e); |
29 |
System.out.println(!bulkRequest.get().hasFailures()); |
另一种方式,是根据ElasticSearch提供的Bulk Processor来实现,只需要设置相关参数,就可以实现批量索引,这种方式更加灵活,示例如下所示:
01 |
protected static void indexUsingBulkProcessor( final Client client, final String index, final String type, File in) throws InterruptedException { |
02 |
String name = "device_info_processor" ; |
03 |
int bulkActions = 1000 ; |
04 |
ByteSizeValue bulkSize = new ByteSizeValue( 100 , ByteSizeUnit.MB); |
05 |
TimeValue flushInterval = TimeValue.timeValueSeconds( 60 ); |
06 |
int concurrentRequests = 12 ; |
09 |
final BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { |
11 |
public void afterBulk( long id, BulkRequest req, BulkResponse resp) { |
12 |
System.out.println( "id=" + id + ", resp=" + resp); |
15 |
public void afterBulk( long id, BulkRequest req, Throwable cause) { |
16 |
System.out.println( "id=" + id + ", req=" + req + ", cause=" + cause); |
19 |
public void beforeBulk( long id, BulkRequest req) { |
20 |
System.out.println( "id=" + id + ", req=" + req); |
25 |
.setBulkActions(bulkActions) |
26 |
.setBulkSize(bulkSize) |
27 |
.setFlushInterval(flushInterval) |
28 |
.setConcurrentRequests(concurrentRequests) |
32 |
BufferedReader reader = null ; |
34 |
reader = new BufferedReader( new FileReader(in.getAbsoluteFile())); |
36 |
while ((line = reader.readLine()) != null ) { |
37 |
String[] a = line.split( "\t" , - 1 ); |
40 |
bulkProcessor.add( new IndexRequest(index, type, udid).source(createSource(a))); |
44 |
} catch (Exception e) { |
45 |
throw Throwables.propagate(e); |
50 |
bulkProcessor.awaitClose( 60 , TimeUnit.SECONDS); |
可以通过实现自定义的BulkProcessor.Listener,它提供了Hook的功能,比如,索引某个文档失败的话,可以在Hook方法中增加处理,实现重试的功能;再比如,如果索引成功,给其他系统服务一个回调,等等。
更新文档中的某些字段,需要指定id的值,以及需要更新的字段的值,代码如下所示:
01 |
protected static void updateDoc( final Client client, final String index, final String type) throws IOException, InterruptedException, ExecutionException { |
02 |
String id = "60e90ddcb1a61622028b8d92112a646c" ; |
03 |
UpdateRequest updateRequest = new UpdateRequest(index, type, id); |
04 |
updateRequest.doc(jsonBuilder() |
06 |
.field( "channel" , "h-google" ) |
09 |
UpdateResponse response = client.update(updateRequest).get(); |
10 |
System.out.println(response); |
如果更新文档的时候,文档不存在,则需要先执行索引操作,再进行更新操作,将这两个操作合并到一起,使用upsert操作,代码如下所示:
01 |
protected static void upsertDoc( final Client client, final String index, final String type) throws IOException, InterruptedException, ExecutionException { |
02 |
String id = "fdd5ff7f56b613f0acb2c20a1ebc35e4" ; |
03 |
IndexRequest indexRequest = new IndexRequest(index, type, id).source(jsonBuilder() |
05 |
.field( "installid" , "00000BSe" ) |
07 |
.field( "udid" , "fdd5ff7f56b613f0acb2c20a1ebc35e4" ) |
08 |
.field( "channel" , "A-wandoujia" ) |
09 |
.field( "version" , "3.1.1" ) |
10 |
.field( "resolution" , "960*540" ) |
11 |
.field( "mac" , "00:08:22:be:1b:b7" ) |
12 |
.field( "device_type" , "0" ) |
13 |
.field( "device_name" , "HTC" ) |
14 |
.field( "producer" , "alps" ) |
15 |
.field( "create_time" , "2015-01-17 17:15:36" ) |
18 |
UpdateRequest updateRequest = new UpdateRequest(index, type, id).doc(jsonBuilder() |
20 |
.field( "resolution" , "540*960" ) |
21 |
.field( "channel" , "h-baidu" ) |
22 |
.field( "version" , "3.1.1" ) |
23 |
.field( "imei" , "861622010000056" ) |
25 |
.upsert(indexRequest); |
26 |
UpdateResponse response = client.update(updateRequest).get(); |
27 |
System.out.println(response); |
删除文档,需要指定文档的id的值,代码如下所示:
1 |
protected static void deleteDoc( final Client client, final String index, final String type) { |
2 |
String id = "60e90ddcb1a61622028b8d92112a646c" ; |
3 |
DeleteResponse response = client.prepareDelete(index, type, id).get(); |
4 |
System.out.println(response); |
搜索文档,可以根据需要构造指定的查询(Query),可以设置过滤器等等,然后提交搜索,示例代码如下所示:
01 |
protected static void searchDocs( final Client client, final String index, final String type) { |
02 |
SearchResponse response = client |
05 |
.setQuery(QueryBuilders.termQuery( "device_name" , "xiaomi" )) |
06 |
.setPostFilter(QueryBuilders.rangeQuery( "create_time" ).from( "2015-01-16 00:00:00" ).to( "2015-01-16 23:59:59" )) |
07 |
.setFrom( 30 ).setSize( 10 ).setExplain( true ) |
10 |
System.out.println(response); |
查询(Query)的构造有很多的方式,比如构造布尔查询,指定与、或、非关系,然后提交搜索。执行搜索,可以设置搜索文档的起始偏移位置以及每次取多少个结果文档,这便能实现分页功能。
其他话题
ElasticSearch最经典的软件栈组合就是ELK(ElasticSearch Logstash Kibana),其中ElasticSearch提供了实时查询分析数据的功能,是一个非常通用的搜索引擎系统,而Logstash是一个日志管理工具,能够收集日志,对日志进行管理,Kibana是一个基于页面的前端展示工具,非常方便地使ElasticSearch中的数据可视化,具体使用起来如何,如果感兴趣可以尝试一下。
另外,ElasticSearch也被好多开源大数据系统所拥抱,比如Cloudera的CDH也整合了ElasticSearch作为搜索系统,ElasticSearch也可以和其他系统,如Hadoop、HBase等进行整合,使用领域比较广泛。