如何将不同类型数据导入Elaticsearch中?

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 题记Elaticsearch的原理明白了以后,手头有很多不同类型的数据,如:1)单条数据,如程序中自己构造的JSON格式数据;2)符合Elasticsearch索引规范的批量数据;3)日志文件,格式*.log;4)结构化数据,存储在mysql、oracle等关系型数据库中;5)非结构化数据,存储在mongo中;如何将这些数据导入到Elasticsearch中呢?接下来,本文将逐个介绍。

image.png

1、单条索引导入elasticsearch

该方法类似mysql的insert 语句,用于插入一条数据。


[root@yang json_input]# curl -XPUT 'http://192.168.1.1:9200/blog/article/1' -d '

> {

> "title":"New version of Elasticsearch released!",

> "content":"Version 1.0 released today!",

> "tags":["announce","elasticsearch","release"]

> }'

1

2

3

4

5

6

结果查看如下所示:


[root@yang json_input]# curl -XGET 'http://192.168.1.1:9200/blog/article/1?pretty'

{

 "_index" : "blog",

 "_type" : "article",

 "_id" : "1",

 "_version" : 1,

 "found" : true,

 "_source" : {

 "title" : "New version of Elasticsearch released!",

 "content" : "Version 1.0 released today!",

 "tags" : [ "announce", "elasticsearch", "release" ]

 }

}

1

2

3

4

5

6

7

8

9

10

11

12

13

图形化显示如下:

image.png2、批量索引导入到 elasticsearch。

(1)索引结构映射

类似于SQL创建模式描述数据,Mapping控制并定义结构。


[root@yang json_input]# cat mapping.json

{

"book" : {

"_all": {

"enabled": false

},

"properties" : {

"author" : {

"type" : "string"

},

"characters" : {

"type" : "string"

},

"copies" : {

"type" : "long",

"ignore_malformed" : false

},

"otitle" : {

"type" : "string"

},

"tags" : {

"type" : "string"

},

"title" : {

"type" : "string"

},

"year" : {

"type" : "long",

"ignore_malformed" : false,

"index" : "analyzed"

},

"available" : {

"type" : "boolean"

}

}

}

}

[root@yang json_input]# curl -XPUT 'http://110.0.11.120:9200/library/book/_mapping' -d @mapping.json

{"acknowledged":true}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

(2)批量索引,将构造好的JSON信息和数据导入elasticsearch

Elasticsearch可以合并多个请求至单个包中,而这些包可以单个请求的形式传送。如此,可以将多个操作结合起来:


1)在索引中增加或更换现有文档(index);

2)从索引中移除文档(delete);

3)当索引中不存在其他文档定义时,在索引中增加新文档(create)。


为了获得较高的处理效率,选择这样的请求格式。它假定,请求的每一行包含描述操作说明的JSON对象,第二行为JSON对象本身。


可以把第一行看做信息行,第二行行为数据行。唯一的例外是Delete操作,它只包含信息行。


举例如下:


[root@yang json_input]# cat documents_03.json

{ "index": {"_index": "library", "_type": "book", "_id": "1"}}

{ "title": "All Quiet on the Western Front","otitle": "Im Westen nichts Neues","author": "Erich Maria Remarque","year": 1929,"characters": ["Paul Bäumer", "Albert Kropp", "Haie Westhus", "Fredrich Müller", "Stanislaus Katczinsky", "Tjaden"],"tags": ["novel"],"copies": 1, "available": true, "section" : 3}

{ "index": {"_index": "library", "_type": "book", "_id": "2"}}

{ "title": "Catch-22","author": "Joseph Heller","year": 1961,"characters": ["John Yossarian", "Captain Aardvark", "Chaplain Tappman", "Colonel Cathcart", "Doctor Daneeka"],"tags": ["novel"],"copies": 6, "available" : false, "section" : 1}

{ "index": {"_index": "library", "_type": "book", "_id": "3"}}

{ "title": "The Complete Sherlock Holmes","author": "Arthur Conan Doyle","year": 1936,"characters": ["Sherlock Holmes","Dr. Watson", "G. Lestrade"],"tags": [],"copies": 0, "available" : false, "section" : 12}

{ "index": {"_index": "library", "_type": "book", "_id": "4"}}

{ "title": "Crime and Punishment","otitle": "Преступлéние и наказáние","author": "Fyodor Dostoevsky","year": 1886,"characters": ["Raskolnikov", "Sofia Semyonovna Marmeladova"],"tags": [],"copies": 0, "available" : true}

1

2

3

4

5

6

7

8

9

为了执行批量请求,Elasticsearch提供了_bulk端点,形式是/_bulk,或者是/index_name/_bulk, 甚至是/index_name/type_name/_bulk。


Elasticsearch会返回每个操作的具体的信息,因此对于大批量的请求,响应也是巨大的。


3)执行结果如下所示:

[root@yang json_input]# curl -s -XPOST '10.0.1.30:9200/_bulk' --data-binary @documents_03.json

{"took":150,"errors":false,"items":[{"index":{"_index":"library","_type":"book","_id":"1","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"status":201}},{"index":{"_index":"library","_type":"book","_id":"2","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"status":201}},{"index":{"_index":"library","_type":"book","_id":"3","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"status":201}},{"index":{"_index":"library","_type":"book","_id":"4","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"status":201}}]}

1

2

执行结果如下图所示:

image.png

3、使用Logstash将 log文件导入elasticsearch

以下以项目实战的 MprobeDebug.log导入到ES中。


[root@yang logstash_conf]# tail -f MrobeDebug.log

[DEBUG][2015-07-23 23:59:58,138] : After CurProbe.Update()....lineNo:233, function:main

[DEBUG][2015-07-23 23:59:58,594] : lineNo:960, function:MNetworker::MessageTranslator, revoke nMsgRes = m_MsgPool.PeekMessage(CurMsg);

[DEBUG][2015-07-23 23:59:58,608] : ProbeTaskType_FTP lineNo:148, function:TempProbe::Update

........

1

2

3

4

5

核心配置文件要求如下:


[root@yang logstash_conf]# cat three.conf

input {

file {

path=> "/opt/logstash/bin/logstash_conf/MrobeDebug.log"

type=>"ttlog"

}

}


output {

elasticsearch {

hosts => "110.10.11.120:9200"

index => "tt_index"

}

stdout { codec => json_lines }

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

导入结果如下:

合计导入数据:200414条。



4、从Mysql/Oracle关系型数据库向Elasticsearch导入数据

参见:

http://blog.csdn.net/laoyang360/article/details/51747266

http://blog.csdn.net/laoyang360/article/details/51824617


5、从MongoDB非关系型数据库向Elasticsearch导入数据

参见:

http://blog.csdn.net/laoyang360/article/details/51842822


使用插件:mongo-connector

1)mongo与副本集成员连接

2)初始化副本集配置

3)Mongo与ES同步操作

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
7月前
|
SQL 关系型数据库 MySQL
使用CTAS 把mysql 表同步数据 到hologres ,Flink有什么参数可以使hologres 的字段都小写吗?
使用CTAS 把mysql 表同步数据 到hologres ,Flink有什么参数可以使hologres 的字段都小写吗?
360 0
|
存储 分布式计算 大数据
MaxCompute数据类型映射
MaxCompute数据类型映射
|
6月前
|
分布式计算 DataWorks MaxCompute
DataWorks产品使用合集之需要将mysql 表(有longtext类型字段) 迁移到odps,但odps好像没有对应的类型支持,该怎么办
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
存储 分布式计算 关系型数据库
实时数仓 Hologres产品使用合集之创建外部表时提示不支持ODPS的datetime数据类型,该怎么解决
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
6月前
|
JSON 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在使用CDAS语法同步MySQL数据到Hologres时,如果开启了字段类型宽容模式,MySQL中的JSON类型会被转换为什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
数据库 关系型数据库 MySQL
DTS库支持多种数据源类型
【6月更文挑战第2天】DTS库支持多种数据源类型
84 3
|
7月前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之在DataWorks中,使用JSON解析函数将MySQL表中的字段解析成多个字段将这些字段写入到ODPS(MaxCompute)中如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
212 3
|
SQL 存储 大数据
更改 HIVE 表字段数据类型有哪些注意事项?
更改 HIVE 表字段数据类型有哪些注意事项?
|
7月前
|
分布式计算 关系型数据库 大数据
MaxCompute产品使用合集之怎么才可以将 PostgreSQL 中的 geometry 空间类型字段同步到 MaxCompute 或另一个 PostgreSQL 数据库
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
7月前
|
SQL Java 数据库
Sqoop【付诸实践 02】Sqoop1最新版 全库导入 + 数据过滤 + 字段类型支持 说明及举例代码(query参数及字段类型强制转换)
【2月更文挑战第10天】Sqoop【付诸实践 02】Sqoop1最新版 全库导入 + 数据过滤 + 字段类型支持 说明及举例代码(query参数及字段类型强制转换)
322 0