ES-hadoop写数据到阿里云Elasticsearch

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
简介: ES-Hadoop是一个用于Elasticsearch和Hadoop进行交互的开源独立库,在Hadoop和Elasticsearch之间起到桥梁的作用,本文基于阿里云E-MapReduce和阿里云Elasticsearch,演示如何通过ES-Hadoop连通Hadoop生态系统和Elasticsearch。

Elasticsearch是一个基于Lucene的分布式搜索引擎,具有分布式、全文检索、近实时搜索和分析、高可用、模式自由、RESTFul API等诸多优点,在实时搜索、日志处理(ELK)、大数据分析等领域有着广泛的应用。Hadoop是一个由Apache基金会所开发的分布式系统基础架构,核心组件有HDFS和MapReduce,分别提供海量数据存储和海量数据计算。

1

图1 ES-Hadoop简介

Elasticsearch for Apache Hadoop是一个用于Elasticsearch和Hadoop进行交互的开源独立库,简称ES-Hadoop,在Hadoop和Elasticsearch之间起到桥梁的作用,完美地把Hadoop的批处理优势和Elasticsearch强大的全文检索引擎结合起来。

ES-Hadoop开辟了更加广阔的应用空间,通过ES-Hadoop可以索引Hadoop中的数据到Elasticsearch,充分利用其查询和聚合分析功能,也可以在Kibana中做进一步的可视化分析,同时也可以把Elasticsearch中的数据放到Hadoop生态系统中做运算,ES-Hadoop支持Spark、Spark、 Streaming、SparkSQL,除此之外,不论是使用Hive、 Pig、Storm、Cascading还是运行单独的Map/Reduce,通过ES-Hadoop提供的接口都支持从Elasticsearch中进行索引和查询操作。

本文基于阿里云E-MapReduce和阿里云Elasticsearch,演示如何通过ES-Hadoop连通Hadoop生态系统和Elasticsearch。

一、云服务配置

2

图2 阿里云产品地图

1.1 开通专有网络VPC

因为在公网访问推送数据安全性较差,为保证阿里云Elasticsearch访问环境安全,购买阿里云ES产品,对应区域下必须要有 VPC 和 虚拟交换机,因此首先开通VPC专有网络。按路径:阿里云首页-->产品-->网络->专有网络VPC,然后选择立即开通,进入到管理控制台界面,新建专有网络。
_2018_06_21_9_47_25

图3 创建专有网络
创建完成之后在控制台中可以进行管理:

_2018_06_21_9_53_12

图4 专有网络管理

更多关于专有网络VPC的文档参考这里:专有网络 VPC

1.2 开通阿里云Elasticsearch

按路径:阿里云首页-->产品-->数据库-->Elasticsearch或阿里云首页-->产品-->大数据基础服务-->Elasticsearch进入到阿里云Elasticsearch产品界面,新用户可以免费试用30天

进入到购买入口,阿里云Elasticsearch提供了按月和按量两种付费模式,选择已经创建的专有网络并设置登录密码。

_2018_06_21_9_48_31

图5 阿里云Elasticsearch选购配置
购买成功后,按路径:控制台-->大数据(数加)-->Elasticsearch,可以看到新创建的Elasticsearch集群实例。

_2018_06_21_3_31_27

图6 阿里云Elasticsearch实例列表页

点击"管理"菜单,进入集群管理界面:

_2018_06_21_3_31_43

图7 阿里云Elasticsearch集群管理

点击"Kibana控制台"按钮即可进入到Kibana操作界面:

9

图8 阿里云Elasticsearch集群Kibana操作管理界面

点击"集群监控"按钮进入到监控界面:

10

图9 阿里云Elasticsearch集群监控界面

1.3 开通阿里云E-MapReduce

按路径:阿里云首页-->产品-->大数据基础服务-->E-MapReduce,之后进入到购买界面:

_2018_06_21_9_49_17

图10 阿里云E-MapReduce软件配置
下一步进行付费配置、网络配置和节点硬件配置:

_2018_06_21_9_49_33

图11 阿里云E-MapReduce硬件配置
最后设置集群名称、日志路径和集群登录密码:

_2018_06_21_9_49_57

图12 阿里云E-MapReduce基础配置

其中日志路径存储在OSS之上,如果没有创建bucket,需要到OSS管理控制台创建新的bucket。bucket的区域要和EMR集群一直,EMR集群为华东1区,这里的bucket区域也选择华东1区:

_2018_06_21_3_29_25

图13 阿里云OSS创建bucket

bucket创建完成以后就可以新建目录、上传文件:
_2018_06_21_3_38_10

图14 阿里云OSS文件管理

最后确定,完成EMR集群的创建:

_2018_06_21_9_51_22

图15 阿里云E-MapReduce确认页
集群创建成功后在集群列表中查看:
_2018_06_21_3_44_20
图16 阿里云E-MapReduce集群列表
点击管理,可以查看master节点和data节点的详细信息:

_2018_06_21_3_46_01

图17 阿里云E-MapReduce集群详细信息
公网IP可以直接访问,远程登录:
ssh root@你的公网IP

使用jps命令查看后台进程:

[root@emr-header-1 ~]# jps
16640 Bootstrap
17988 RunJar
19140 HistoryServer
18981 WebAppProxyServer
14023 Jps
15949 gateway.jar
16621 ZeppelinServer
1133 EmrAgent
15119 RunJar
17519 ResourceManager
1871 Application
19316 JobHistoryServer
1077 WatchDog
17237 SecondaryNameNode
16502 NameNode
16988 ApacheDsTanukiWrapper
18429 ApplicationHistoryServer

二、编写EMR写数据到ES的MR作业

推荐使用maven来进行项目管理,首先创建一个maven工程。操作步骤如下:

1.安装 Maven。首先确保计算机已经正确安装安装maven

2.生成工程框架。在工程根目录处执行如下命令:

mvn archetype:generate -DgroupId=com.aliyun.emrtoes -DartifactId=emrtoes -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

mvn 会自动生成一个空的 Sample 工程,工程名为emrtoes(和指定的artifactId一致),里面包含一个简单的 pom.xml 和 App 类(类的包路径和指定的 groupId 一致)

3.加入 Hadoop 和ES-Hadoop依赖。使用任意 IDE 打开这个工程,编辑 pom.xml 文件。在 dependencies 内添加如下内容:

    <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-mapreduce-client-common</artifactId>
         <version>2.7.3</version>
     </dependency>
     <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-common</artifactId>
         <version>2.7.3</version>
     </dependency>
      <dependency>
          <groupId>org.elasticsearch</groupId>
          <artifactId>elasticsearch-hadoop-mr</artifactId>
          <version>5.5.3</version>
      </dependency>

4.添加打包插件。由于使用了第三方库,需要把第三方库打包到jar文件中,在pom.xml中添加maven-assembly-plugin插件的坐标:

    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <archive>
            <manifest>
              <mainClass>com.aliyun.emrtoes.EmrToES</mainClass>
            </manifest>
          </archive>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.0</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer">
                </transformer>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>

5.编写代码。在com.aliyun.emrtoes包下和 App 类平行的位置添加新类 EmrToES.java。内容如下:

package com.aliyun.emrtoes;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import java.io.IOException;

public class EmrToES {

    public static class MyMapper extends Mapper<Object, Text, NullWritable, Text> {
        private Text line = new Text();

        @Override
        protected void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            if (value.getLength() > 0) {
                line.set(value);
                context.write(NullWritable.get(), line);
            }
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        //阿里云 Elasticsearch X-PACK用户名和密码
        conf.set("es.net.http.auth.user", "你的X-PACK用户名");
        conf.set("es.net.http.auth.pass", "你的X-PACK密码");

        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
        conf.set("es.nodes", "你的Elasticsearch内网地址");
        conf.set("es.port", "9200");
        conf.set("es.nodes.wan.only", "true");
        conf.set("es.resource", "blog/yunqi");
        conf.set("es.mapping.id", "id");
        conf.set("es.input.json", "yes");

        Job job = Job.getInstance(conf, "EmrToES");
        job.setJarByClass(EmrToES.class);

        job.setMapperClass(MyMapper.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(EsOutputFormat.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

6.编译并打包。在工程的目录下,执行如下命令:

mvn clean package 

执行完毕以后,可在工程目录的 target 目录下看到一个emrtoes-1.0-SNAPSHOT-jar-with-dependencies.jar,这个就是作业 jar 包。

_2018_06_21_3_49_00

图18 IDEA中EMR写ES工程目录截图

三、EMR中完成作业

3.1 测试数据

把下面的数据写入到blog.json中:

{"id":"1","title":"git简介","posttime":"2016-06-11","content":"svn与git的最主要区别..."}
{"id":"2","title":"ava中泛型的介绍与简单使用","posttime":"2016-06-12","content":"基本操作:CRUD ..."}
{"id":"3","title":"SQL基本操作","posttime":"2016-06-13","content":"svn与git的最主要区别..."}
{"id":"4","title":"Hibernate框架基础","posttime":"2016-06-14","content":"Hibernate框架基础..."}
{"id":"5","title":"Shell基本知识","posttime":"2016-06-15","content":"Shell是什么..."}

上传到阿里云E-MapReduce集群,使用scp远程拷贝命令上传文件:

scp blog.json root@你的弹性公网IP:/root

把blog.json上传至HDFS:

hadoop fs -mkdir /work
hadoop fs -put blog.json /work

3.2 上传JAR包

把maven工程target目录下的jar包上传至阿里云E-MapReduce集群:

scp target/emrtoes-1.0-SNAPSHOT-jar-with-dependencies.jar root@YourIP:/root

3.3 执行MR作业

hadoop jar emrtoes-1.0-SNAPSHOT-jar-with-dependencies.jar /work/blog.json

运行成功的话,控制台会输出如下图所示信息:

_2018_06_21_3_54_54

图19 阿里云E-MapReduce运行MR作业截图
命令查询Elasticsearch中的数据:
curl -u elastic -XGET es-cn-v0h0jdp990001rta9.elasticsearch.aliyuncs.com:9200/blog/_search?pretty

_2018_06_21_3_57_28

图20 命令查看阿里云Elasticsearch中的数据
或者在Kibana中查看:

_2018_06_21_3_58_21

图21 Kibana中查看阿里云Elasticsearch中的数据

四、API分析

Map过程,按行读入,input kye的类型为Object,input value的类型为Text。输出的key为NullWritable类型,NullWritable是Writable的一个特殊类,实现方法为空实现,不从数据流中读数据,也不写入数据,只充当占位符。MapReduce中如果不需要使用键或值,就可以将键或值声明为NullWritable,这里把输出的key设置NullWritable类型。输出为BytesWritable类型,把json字符串序列化。

因为只需要写入,没有Reduce过程。配置参数说明如下:

  • conf.set("es.net.http.auth.user", "你的X-PACK用户名");
    设置X-PACK的用户名
  • conf.set("es.net.http.auth.pass", "你的X-PACK密码");
    设置X-PACK的密码
  • conf.setBoolean("mapred.map.tasks.speculative.execution", false);

关闭mapper阶段的执行推测

  • conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);

关闭reducer阶段的执行推测

  • conf.set("es.nodes", "你的Elasticsearch内网地址");

配置Elasticsearch的IP和端口

  • conf.set("es.resource", "blog/yunqi");

设置索引到Elasticsearch的索引名和类型名。

  • conf.set("es.mapping.id", "id");

设置文档id,这个参数”id”是文档中的id字段

  • conf.set("es.input.json", "yes");

指定输入的文件类型为json。

  • job.setInputFormatClass(TextInputFormat.class);

设置输入流为文本类型

  • job.setOutputFormatClass(EsOutputFormat.class);

设置输出为EsOutputFormat类型。

  • job.setMapOutputKeyClass(NullWritable.class);

设置Map的输出key类型为NullWritable类型

  • job.setMapOutputValueClass(BytesWritable.class);

设置Map的输出value类型为BytesWritable类型

  • FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
    传入HDFS上的文件路径

五、总结

首先,总结一下实践过程中遇到的问题:

1.ClassNotFoundException异常
遇到找不到EsOutputFormat所在类,导致的ClassNotFoundException异常:

java.lang.NoClassDefFoundError: org/elasticsearch/hadoop/mr/EsOutputFormat

解决办法,使用maven-assembly-plugin插件,把第三方库打到jar包中。

2. 连接不到Elasticsearch集群

连接不到Elasticsearch集群的第一个原因是没有配置X-PACK 的用户名和密码,加上以下两行配置:

 conf.set("es.net.http.auth.user", "你的X-PACK用户名");
 conf.set("es.net.http.auth.pass", "你的X-PACK密码");

第二个原因就是EMR集群和Elasticsearch集群网络不通,在创建集群的时尽量选择同一区域,比如EMR集群在华东1区,Elasticsearch集群也在华东1区,事先用Ping命令测试。

第三个原因是端口,一般TCP端口(比如使用Java客户端)是9300,ES-Hadoop中使用的仍然是9200端口.

3.Reduce过程中格式错误
注意测试文件中每一行都是一个JSON,在设置中加上:

conf.set("es.input.json", "yes");

否则会出现解析文件格式异常。

最后,ES-Hadoop连通了Hadoop和Elasticsearch两个大数据生态圈,本博客做了写数据的实践案例,更多资料请参考阿里云E-MapReduce 阿里云Elasticsearch

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
28天前
|
关系型数据库 MySQL
elasticsearch对比mysql以及使用工具同步mysql数据全量增量
elasticsearch对比mysql以及使用工具同步mysql数据全量增量
21 0
|
1月前
|
关系型数据库 MySQL 数据挖掘
阿里云 SelectDB 携手 DTS ,一键实现 TP 数据实时入仓
DTS 作为阿里云核心的数据交互引擎,以其高效的实时数据流处理能力和广泛的数据源兼容性,为用户构建了一个安全可靠、可扩展、高可用的数据架构桥梁。阿里云数据库 SelectDB 通过与 DTS 联合,为用户提供了简单、实时、极速且低成本的事务数据分析方案。用户可以通过 DTS 数据传输服务,一键将自建 MySQL / RDS MySQL / PolarDB for MySQL 数据库,迁移或同步至阿里云数据库 SelectDB 的实例中,帮助企业在短时间内完成数据迁移或同步,并即时获得深度洞察。
阿里云 SelectDB 携手 DTS ,一键实现 TP 数据实时入仓
|
1月前
|
SQL 人工智能 数据挖掘
阿里云DMS,身边的智能化数据分析助手
生成式AI颠覆了人机交互的传统范式,赋予每个人利用AI进行低门槛数据分析的能力。Data Fabric与生成式AI的强强联合,不仅能够实现敏捷数据交付,还有效降低了数据分析门槛,让人人都能数据分析成为可能!阿里云DMS作为阿里云统一的用数平台,在2021年初就开始探索使用Data Fabric理念构建逻辑数仓来加速企业数据价值的交付,2023年推出基于大模型构建的Data Copilot,降低用数门槛,近期我们将Notebook(分析窗口)、逻辑数仓(Data Fabric)、Data Copilot(生成式AI)进行有机组合,端到端的解决用数难题,给用户带来全新的分析体验。
110132 118
阿里云DMS,身边的智能化数据分析助手
|
1月前
|
消息中间件 存储 关系型数据库
【微服务】mysql + elasticsearch数据双写设计与实现
【微服务】mysql + elasticsearch数据双写设计与实现
68 2
|
1月前
|
监控 安全 Linux
【Elasticsearch专栏 14】深入探索:Elasticsearch使用Logstash的日期过滤器删除旧数据
使用Logstash的日期过滤器可以有效删除Elasticsearch中的旧数据,释放存储空间并提高集群性能。通过配置Logstash,可以指定索引模式、筛选时间戳早于特定阈值的文档,并在输出阶段删除这些旧数据。执行配置时,需确保Logstash与Elasticsearch连接正常,并监控日志以确保操作安全。定期执行此操作可确保旧数据不会过多积累。总之,Logstash的日期过滤器提供了一种简单而高效的方法,帮助管理和优化Elasticsearch中的数据。
|
1月前
|
存储 搜索推荐 Java
|
3月前
|
存储 分布式计算 Hadoop
Hadoop:驭服数据洪流的利器
在当今信息大爆炸的时代,海量数据成为企业决策的重要依据。本文将介绍大规模数据处理框架Hadoop的概念与实践,探讨其在解决大数据应用中的重要性和优势。从分布式计算、高可靠性、扩展性等方面深入剖析Hadoop的工作原理,并结合实例说明如何利用Hadoop来处理海量数据,为读者提供了解和运用Hadoop的基础知识。
|
3月前
|
存储 缓存 数据库
PB数据毫秒级搜索之Elasticsearch(二)基础了解
PB数据毫秒级搜索之Elasticsearch(二)基础了解
90 0
|
2月前
|
存储 数据可视化 数据管理
基于阿里云服务的数据平台架构实践
本文主要介绍基于阿里云大数据组件服务,对企业进行大数据平台建设的架构实践。
717 2
|
1月前
|
监控 Java 测试技术
【Elasticsearch专栏 13】深入探索:Elasticsearch使用Curator工具删除Elasticsearch中的历史数据
使用Curator工具可以有效管理Elasticsearch中的旧数据,通过编写YAML配置文件定义删除操作。配置中指定了基于索引名称前缀和年龄的过滤器,确保仅删除符合条件的旧索引。执行删除操作时,Curator会应用过滤器识别目标索引,并向Elasticsearch发送删除请求。通过设置选项,如忽略空列表和超时时间,可以确保操作的灵活性和稳定性。使用Curator不仅释放了存储空间,还提高了查询性能,是维护Elasticsearch健康的重要工具