es 算是个数据库,但木有 ODBC/JDBC 这样的“连接桥梁”。或者说这是一种区别于“传统观念”,新的 NoSQL 所带来了新的视角和理念(相对地),自然这些非关系型数据库那么更倾向于采用新的交换数据或者增删改查的模式——其中 HTTP API 就悄悄地成为一种约定成俗的模式,渐渐流行开来。es 亦是如此。于是,無論什么语言都可以通过此 HTTP API 实现通讯。本身官方也提供一批语言的封装以方便开发者调用 es 里面的数据(称作 Client),另外 Client 还提供集群管理任务的 API。
es HTTP API 是典型的 RESTful API,所以 url 表示对资源的操作,例如 POST 新增,PUT 更新,GET 获取,DELETE 删除,HEAD 判断是否存在,此乃 REST API 之共性内容。除此之外,es 的 url 遵循一套规则,如:
http://localhost:9200/<index>/<type>/[<id>]
index可以理解为数据库,type 理解为数据表,例如 /blog/article/1,就表示一个 index 为 blog,type 为aritcle,id 为1的 document。其中 index、type 是必须提供的,id 是可选的,不提供的话 es 会自动生成。
由于 es 本身是用 Java 写成的,那么使用 Java API 调 es 更是 Native 的感覺,理論上速度会更快。另外不得不说是 JavaScript Client。既然人家是 JSON-Friendly 的,那么首先一个 Node.js 的 Client 亦不在话下,因为天然的姻亲缘故使得 JSON 格式交换起来十分顺畅(参见官方 Guide)。好~不扯到 nodejs了,我们详细来看看 Java 的。
TransportClient
如同访问数据库先要连接数据一样,这种连接过程在 es 中称作连接集群 cluster。完成这样工作的是“传输客户端 TransportClient”。
// on startup Client client = TransportClient.builder().build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300)) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300)); // on shutdown client.close();
请注意,这里虽然有多个 host,但这不是加入 es 集群,只是指定了多个传输地址(TransportAddress)来查询多个库,其目的在于当一个服务(对应一个 TransportAddress)不可用时,client 会自动发现当前其他可用的 nodes(the current connected nodes)。理论上,Client 可以添加 es 集群中部分或全部 nodes,然后轮询“拿到”一个 node,届时client 可以和 es 集群进行通信和操作。
这里是我本机上的例子,指定了集群的名称并批量插入索引数据。
import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; public class Test { public static void main(String[] args) { Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch").build(); try { Client client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300)); // 批量创建索引 BulkRequestBuilder bulkRequest = client.prepareBulk(); Map<String, Object> map = new HashMap<>(); map.put("name", "Jack"); IndexRequest request = client.prepareIndex("dept", "employee", "3433").setSource(map).request(); bulkRequest.add(request); BulkResponse bulkResponse = bulkRequest.execute().actionGet(); if (bulkResponse.hasFailures()) { System.out.println("批量创建索引错误!"); } client.close(); System.out.println("批量创建索引成功"); } catch (UnknownHostException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }调用笔记两点:
- 开始的时候使用了 InetAddress.getLocalHost() 虽然能返回本机 ip 但是不能发起连接的。后来改成 InetAddress.getByName("localhost") 才可以。
- 访问端口是 9300 而非 HTTP 的 9200。估计不是走 HTTP 接口而是类似于 RMI 的远程请求。
CRUD
增删改查的 API 有,
- Index 增加
- Get 获取
- Delete 删除
- Update 更新
所有的 CRUD API 均是单索引的 API,index 参数只可送入一个 index 名称,或者 alias (别名)指向一个单索引。
另外下面是多文档的 API,
- Multi Get 列表获取
- Bulk 批量插入、删除
es 接受的格式
所謂文档 document,其结构本质就是一个 JSON。我们把文档进行索引工作就能快速被找到。安装传统的关系型数据库概念,相当于 Create 创建的工作。es 接受的就是这种 JSON 格式的文档。尽管是这种结构,可是还有几种数据的形态,下面我们逐一看看。
JSON 字符串
不消多说,最直接的是手写,
String json = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}";
或者 String 转换 byte[] 字节数组也可以,因为最终也是读取其 byte[] 形态,所以其他对象的 byte[] 类型也可以让 es Java API 读取。
Map
不用多说,应该要支持的。
Map<String, Object> json = new HashMap<>();// Java 7 有木有! json.put("user","kimchy"); json.put("postDate",new Date()); json.put("message","trying out Elasticsearch");Jackson
一說到 Json,大家可能会联想到 Java 的转换工具,如 Jackson——同样也是没问题的。
import com.fasterxml.jackson.databind.*; // instance a json mapper ObjectMapper mapper = new ObjectMapper(); // create once, reuse // generate json byte[] json = mapper.writeValueAsBytes(yourbeaninstance);es 工具类
内置的工具类,还可以嵌套别的 builder。
import static org.elasticsearch.common.xcontent.XContentFactory.*; XContentBuilder builder = jsonBuilder() .startObject() // 数组的话是 startArray(String .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() // 数组的话是 endArray() String json = builder.string();// 甚至可以通过 builder 来代替 Jackson 转换
保存起来
前面说了,所謂创建工作就是 index(此处的 index 作动词用)。
import static org.elasticsearch.common.xcontent.XContentFactory.*; String json = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}"; IndexResponse response = client.prepareIndex("twitter", "tweet") .setSource(json) .get();
不过 prepareIndex 没有指定一个 id,现在指定以及换成 buidler 输入,
IndexResponse response = client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) .get();
client.prepareIndex 参数可以是零个(其后必须使用 index(String) 和 type(String) 方法)、两个(client.prepareIndex(index,type))和三个(client.prepareIndex(index,type,id)),其中 index 类似于库名,type 类似于表名(和前面说过的一样),而 id 则是每条记录的惟一编码。通常情况下我们会把实体的唯一编码作为 es 的 id。如果不给 id 的时候,由 es 自动生成。
IndexResponse 是 es 创建完成索引后,回馈给你的实体。本例响应结果如下,
// Index name String _index = response.getIndex(); // Type name String _type = response.getType(); // Document ID (generated or not) String _id = response.getId(); // Version (if it's the first time you index this document, you will get: 1) long _version = response.getVersion(); // isCreated() is true if the document is a new one, false if it has been updated boolean created = response.isCreated();
你可以从这个实体中获得相关有用的信息。
HTTP index API
另一边厢,可以瞧瞧 HTTP 接口同样效果,是不是更简单?
$ curl -XPUT 'http://localhost:9200/twitter/tweet/1' -d '{ "user" : "kimchy", "post_date" : "2009-11-15T14:12:12", "message" : "trying out Elasticsearch" }'
JSON 结果如下,
{ "_shards" : { "total" : 10, "failed" : 0, "successful" : 10 }, "_index" : "twitter", "_type" : "tweet", "_id" : "1", "_version" : 1, "created" : true }
index 批量添加建议使用bulk,效率高(减少通讯次数)。
获取、删除数据
插入记录之后,就可以获取了,
GetResponse response = client.prepareGet("twitter", "tweet", "1").get();
另可参考 HTTP API 的。
getResponse.getSourceAsString() 可以返回 JSON 字符串那么反序列化就非常简单了。
Person newPerson = mapper.readValue(getResponse.getSourceAsString(), Person.class);删除,
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();
另可参考 HTTP API 的。