使用Hive读写ElasticSearch中的数据(转载)

简介: 转自:http://lxw1234.com/archives/2015/12/585.htm关键字:hive、elasticsearch、integration、整合ElasticSearch已经可以与YARN、Hadoop、Hive、Pig、Spark、Flume等大数据技术框架整合起来使用,尤其是在添加数据的时候,可以使用分布式任务来添加索引数据,尤其是在数据平台上,很多数据存储在Hive中,使用Hive操作ElasticSearch中的数据,将极大的方便开发人员。

转自:http://lxw1234.com/archives/2015/12/585.htm


关键字:hive、elasticsearch、integration、整合

ElasticSearch已经可以与YARN、Hadoop、Hive、Pig、Spark、Flume等大数据技术框架整合起来使用,尤其是在添加数据的时候,可以使用分布式任务来添加索引数据,尤其是在数据平台上,很多数据存储在Hive中,使用Hive操作ElasticSearch中的数据,将极大的方便开发人员。这里记录一下Hive与ElasticSearch整合,查询和添加数据的配置使用过程。基于Hive0.13.1、Hadoop-cdh5.0、ElasticSearch 2.1.0。

通过Hive读取与统计分析ElasticSearch中的数据

ElasticSearch中已有的数据

_index:lxw1234

_type:tags

_id:用户ID(cookieid)

字段:area、media_view_tags、interest

img_aa562f602ebcad3ff3d2a6a452e97bb0.jpe

Hive建表

由于我用的ElasticSearch版本为2.1.0,因此必须使用elasticsearch-hadoop-2.2.0才能支持,如果ES版本低于2.1.0,可以使用elasticsearch-hadoop-2.1.2.

下载地址:https://www.elastic.co/downloads/hadoop

add jar file:///home/liuxiaowen/elasticsearch-hadoop-2.2.0-beta1/dist/elasticsearch-hadoop-hive-2.2.0-beta1.jar;

CREATE EXTERNAL TABLE lxw1234_es_tags(

cookieidstring,

areastring,

media_view_tagsstring,

intereststring

)

STORED BY'org.elasticsearch.hadoop.hive.EsStorageHandler'

TBLPROPERTIES(

'es.nodes'='172.16.212.17:9200,172.16.212.102:9200',

'es.index.auto.create'='false',

'es.resource'='lxw1234/tags',

'es.read.metadata'='true',

'es.mapping.names'='cookieid:_metadata._id, area:area, media_view_tags:media_view_tags, interest:interest');

注意:因为在ES中,lxw1234/tags的_id为cookieid,要想把_id映射到Hive表字段中,必须使用这种方式:

‘es.read.metadata’ = ‘true’,

‘es.mapping.names’ = ‘cookieid:_metadata._id,…’

在Hive中查询数据

img_17e303260327d6a3f6970ba6051e7fca.jpe

数据已经可以正常查询。

执行SELECT COUNT(1) FROM lxw1234_es_tags;Hive还是通过MapReduce来执行,每个分片使用一个Map任务:

img_bfdbb789712bf3c512163fb8dfcf4cac.jpe

可以通过在Hive外部表中指定search条件,只查询过滤后的数据。比如,下面的建表语句会从ES中搜索_id=98E5D2DE059F1D563D8565的记录:

CREATE EXTERNAL TABLE lxw1234_es_tags_2(

cookieidstring,

areastring,

media_view_tagsstring,

intereststring

)

STORED BY'org.elasticsearch.hadoop.hive.EsStorageHandler'

TBLPROPERTIES(

'es.nodes'='172.16.212.17:9200,172.16.212.102:9200',

'es.index.auto.create'='false',

'es.resource'='lxw1234/tags',

'es.read.metadata'='true',

'es.mapping.names'='cookieid:_metadata._id, area:area, media_view_tags:media_view_tags, interest:interest',

'es.query'='?q=_id:98E5D2DE059F1D563D8565'

);

hive>select*fromlxw1234_es_tags_2;

OK

98E5D2DE059F1D563D8565四川|成都购物|1购物|1

Timetaken:0.096seconds,Fetched:1row(s)

如果数据量不大,可以使用Hive的Local模式来执行,这样不必提交到Hadoop集群:

在Hive中设置:

sethive.exec.mode.local.auto.inputbytes.max=134217728;

sethive.exec.mode.local.auto.tasks.max=10;

sethive.exec.mode.local.auto=true;

setfs.defaultFS=file:///;

hive>selectarea,count(1)ascntfromlxw1234_es_tagsgroupbyarea orderbycnt desc limit20;

Automaticallyselectinglocalonly modeforquery

Totaljobs=2

LaunchingJob1outof2

…..

Executionlog at:/tmp/liuxiaowen/liuxiaowen_20151211133030_97b50138-d55d-4a39-bc8e-cbdf09e33ee6.log

Jobrunningin-process(localHadoop)

Hadoopjob informationfornull:number of mappers:0;number of reducers:0

2015-12-1113:30:59,648nullmap=100%,reduce=100%

EndedJob=job_local1283765460_0001

Executioncompleted successfully

MapredLocaltask succeeded

OK

北京|北京10

四川|成都4

重庆|重庆3

山西|太原3

上海|上海3

广东|深圳3

湖北|武汉2

陕西|西安2

福建|厦门2

广东|中山2

福建|三明2

山东|济宁2

甘肃|兰州2

安徽|合肥2

湖南|长沙2

湖南|湘西2

河南|洛阳2

江苏|南京2

黑龙江|哈尔滨2

广西|南宁2

Timetaken:13.037seconds,Fetched:20row(s)

hive>

很快完成了查询与统计。

通过Hive向ElasticSearch中写数据

Hive建表

add jar file:///home/liuxiaowen/elasticsearch-hadoop-2.2.0-beta1/dist/elasticsearch-hadoop-hive-2.2.0-beta1.jar;

CREATE EXTERNAL TABLE lxw1234_es_user_tags(

cookieidstring,

areastring,

gendercode STRING,

birthday STRING,

jobtitle STRING,

familystatuscode STRING,

haschildrencode STRING,

media_view_tagsstring,

order_click_tags STRING,

search_egine_tags STRING,

intereststring)

STORED BY'org.elasticsearch.hadoop.hive.EsStorageHandler'

TBLPROPERTIES(

'es.nodes'='172.16.212.17:9200,172.16.212.102:9200',

'es.index.auto.create'='true',

'es.resource'='lxw1234/user_tags',

'es.mapping.id'='cookieid',

'es.mapping.names'='area:area,

gendercode:gendercode,

birthday:birthday,

jobtitle:jobtitle,

familystatuscode:familystatuscode,

haschildrencode:haschildrencode,

media_view_tags:media_view_tags,

order_click_tags:order_click_tags,

search_egine_tags:search_egine_tags,

interest:interest');

这里要注意下:如果是往_id中插入数据,需要设置’es.mapping.id’ = ‘cookieid’参数,表示Hive中的cookieid字段对应到ES中的_id,而es.mapping.names中不需要再映射,这点和读取时候的配置不一样。

关闭Hive推测执行,执行INSERT:

SET hive.mapred.reduce.tasks.speculative.execution=false;

SET mapreduce.map.speculative=false;

SET mapreduce.reduce.speculative=false;

INSERT overwrite TABLE lxw1234_es_user_tags

SELECT cookieid,

area,

gendercode,

birthday,

jobtitle,

familystatuscode,

haschildrencode,

media_view_tags,

order_click_tags,

search_egine_tags,

interest

FROM source_table;

注意:如果ES集群规模小,而source_table数据量特别大、Map任务数太多的时候,会引发错误:

Causedby:org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest:

FOUND unrecoverable error[172.16.212.17:9200]returnedTooManyRequests(429)-rejected

execution of org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryPhase$1@b6fa90f

ONEsThreadPoolExecutor[bulk,queue capacity=50,

org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@22e73289[Running,pool size=32,active threads=32,queued tasks=52,completed tasks=12505]];

Bailingout..

原因是Map任务数太多,并发发送至ES的请求数过多。

这个和ES集群规模以及bulk参数设置有关,目前还没弄明白。

减少source_table数据量(即减少Map任务数)之后,没有出现这个错误。

执行完成后,在ES中查询lxw1234/user_tags的数据:

curl-XGET http://172.16.212.17:9200/lxw1234/user_tags/_search?pretty -d '

{

"query":{

"match":{

"area":"成都"

}

}

}'

img_d9e229a268ae9b62b5f0cb41da84ffe3.jpe

数据已经写入到ElasticSearch中。

总结

使用Hive将数据添加到ElasticSearch中还是非常实用的,因为我们的数据都是在HDFS上,通过Hive可以查询的。

另外,通过Hive可以查询ES数据,并在其上做复杂的统计与分析,但性能一般,比不上使用ES原生API,亦或是还没有掌握使用技巧,后面继续研究。

相关阅读:

ElasticSearch集群安装配置

ElasticSearch与Hive整合官方文档

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
4月前
|
SQL 分布式计算 Hadoop
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
78 4
|
4月前
|
Web App开发 JavaScript Java
elasticsearch学习五:springboot整合 rest 操作elasticsearch的 实际案例操作,编写搜索的前后端,爬取京东数据到elasticsearch中。
这篇文章是关于如何使用Spring Boot整合Elasticsearch,并通过REST客户端操作Elasticsearch,实现一个简单的搜索前后端,以及如何爬取京东数据到Elasticsearch的案例教程。
322 0
elasticsearch学习五:springboot整合 rest 操作elasticsearch的 实际案例操作,编写搜索的前后端,爬取京东数据到elasticsearch中。
|
4月前
|
SQL 分布式计算 关系型数据库
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
150 3
|
4月前
|
SQL
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
68 2
|
4月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
171 0
|
4月前
|
消息中间件 监控 关系型数据库
MySQL数据实时同步到Elasticsearch:技术深度解析与实践分享
在当今的数据驱动时代,实时数据同步成为许多应用系统的核心需求之一。MySQL作为关系型数据库的代表,以其强大的事务处理能力和数据完整性保障,广泛应用于各种业务场景中。然而,随着数据量的增长和查询复杂度的提升,单一依赖MySQL进行高效的数据检索和分析变得日益困难。这时,Elasticsearch(简称ES)以其卓越的搜索性能、灵活的数据模式以及强大的可扩展性,成为处理复杂查询需求的理想选择。本文将深入探讨MySQL数据实时同步到Elasticsearch的技术实现与最佳实践。
330 0
|
6月前
|
存储 缓存 监控
|
6月前
|
自然语言处理 索引
ElasticSearch 实现分词全文检索 - 测试数据准备
ElasticSearch 实现分词全文检索 - 测试数据准备
74 1
|
6月前
|
SQL 物联网 数据处理
"颠覆传统,Hive SQL与Flink激情碰撞!解锁流批一体数据处理新纪元,让数据决策力瞬间爆表,你准备好了吗?"
【8月更文挑战第9天】数据时代,实时性和准确性至关重要。传统上,批处理与流处理各司其职,但Apache Flink打破了这一界限,尤其Flink与Hive SQL的结合,开创了流批一体的数据处理新时代。这不仅简化了数据处理流程,还极大提升了效率和灵活性。例如,通过Flink SQL,可以轻松实现流数据与批数据的融合分析,无需在两者间切换。这种融合不仅降低了技术门槛,还为企业提供了更强大的数据支持,无论是在金融、电商还是物联网领域,都将发挥巨大作用。
85 6