ES-hadoop写数据到阿里云Elasticsearch

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
简介: ES-Hadoop是一个用于Elasticsearch和Hadoop进行交互的开源独立库,在Hadoop和Elasticsearch之间起到桥梁的作用,本文基于阿里云E-MapReduce和阿里云Elasticsearch,演示如何通过ES-Hadoop连通Hadoop生态系统和Elasticsearch。

Elasticsearch是一个基于Lucene的分布式搜索引擎,具有分布式、全文检索、近实时搜索和分析、高可用、模式自由、RESTFul API等诸多优点,在实时搜索、日志处理(ELK)、大数据分析等领域有着广泛的应用。Hadoop是一个由Apache基金会所开发的分布式系统基础架构,核心组件有HDFS和MapReduce,分别提供海量数据存储和海量数据计算。

1

图1 ES-Hadoop简介

Elasticsearch for Apache Hadoop是一个用于Elasticsearch和Hadoop进行交互的开源独立库,简称ES-Hadoop,在Hadoop和Elasticsearch之间起到桥梁的作用,完美地把Hadoop的批处理优势和Elasticsearch强大的全文检索引擎结合起来。

ES-Hadoop开辟了更加广阔的应用空间,通过ES-Hadoop可以索引Hadoop中的数据到Elasticsearch,充分利用其查询和聚合分析功能,也可以在Kibana中做进一步的可视化分析,同时也可以把Elasticsearch中的数据放到Hadoop生态系统中做运算,ES-Hadoop支持Spark、Spark、 Streaming、SparkSQL,除此之外,不论是使用Hive、 Pig、Storm、Cascading还是运行单独的Map/Reduce,通过ES-Hadoop提供的接口都支持从Elasticsearch中进行索引和查询操作。

本文基于阿里云E-MapReduce和阿里云Elasticsearch,演示如何通过ES-Hadoop连通Hadoop生态系统和Elasticsearch。

一、云服务配置

2

图2 阿里云产品地图

1.1 开通专有网络VPC

因为在公网访问推送数据安全性较差,为保证阿里云Elasticsearch访问环境安全,购买阿里云ES产品,对应区域下必须要有 VPC 和 虚拟交换机,因此首先开通VPC专有网络。按路径:阿里云首页-->产品-->网络->专有网络VPC,然后选择立即开通,进入到管理控制台界面,新建专有网络。
_2018_06_21_9_47_25

图3 创建专有网络
创建完成之后在控制台中可以进行管理:

_2018_06_21_9_53_12

图4 专有网络管理

更多关于专有网络VPC的文档参考这里:专有网络 VPC

1.2 开通阿里云Elasticsearch

按路径:阿里云首页-->产品-->数据库-->Elasticsearch或阿里云首页-->产品-->大数据基础服务-->Elasticsearch进入到阿里云Elasticsearch产品界面,新用户可以免费试用30天

进入到购买入口,阿里云Elasticsearch提供了按月和按量两种付费模式,选择已经创建的专有网络并设置登录密码。

_2018_06_21_9_48_31

图5 阿里云Elasticsearch选购配置
购买成功后,按路径:控制台-->大数据(数加)-->Elasticsearch,可以看到新创建的Elasticsearch集群实例。

_2018_06_21_3_31_27

图6 阿里云Elasticsearch实例列表页

点击"管理"菜单,进入集群管理界面:

_2018_06_21_3_31_43

图7 阿里云Elasticsearch集群管理

点击"Kibana控制台"按钮即可进入到Kibana操作界面:

9

图8 阿里云Elasticsearch集群Kibana操作管理界面

点击"集群监控"按钮进入到监控界面:

10

图9 阿里云Elasticsearch集群监控界面

1.3 开通阿里云E-MapReduce

按路径:阿里云首页-->产品-->大数据基础服务-->E-MapReduce,之后进入到购买界面:

_2018_06_21_9_49_17

图10 阿里云E-MapReduce软件配置
下一步进行付费配置、网络配置和节点硬件配置:

_2018_06_21_9_49_33

图11 阿里云E-MapReduce硬件配置
最后设置集群名称、日志路径和集群登录密码:

_2018_06_21_9_49_57

图12 阿里云E-MapReduce基础配置

其中日志路径存储在OSS之上,如果没有创建bucket,需要到OSS管理控制台创建新的bucket。bucket的区域要和EMR集群一直,EMR集群为华东1区,这里的bucket区域也选择华东1区:

_2018_06_21_3_29_25

图13 阿里云OSS创建bucket

bucket创建完成以后就可以新建目录、上传文件:
_2018_06_21_3_38_10

图14 阿里云OSS文件管理

最后确定,完成EMR集群的创建:

_2018_06_21_9_51_22

图15 阿里云E-MapReduce确认页
集群创建成功后在集群列表中查看:
_2018_06_21_3_44_20
图16 阿里云E-MapReduce集群列表
点击管理,可以查看master节点和data节点的详细信息:

_2018_06_21_3_46_01

图17 阿里云E-MapReduce集群详细信息
公网IP可以直接访问,远程登录:
ssh root@你的公网IP

使用jps命令查看后台进程:

[root@emr-header-1 ~]# jps
16640 Bootstrap
17988 RunJar
19140 HistoryServer
18981 WebAppProxyServer
14023 Jps
15949 gateway.jar
16621 ZeppelinServer
1133 EmrAgent
15119 RunJar
17519 ResourceManager
1871 Application
19316 JobHistoryServer
1077 WatchDog
17237 SecondaryNameNode
16502 NameNode
16988 ApacheDsTanukiWrapper
18429 ApplicationHistoryServer

二、编写EMR写数据到ES的MR作业

推荐使用maven来进行项目管理,首先创建一个maven工程。操作步骤如下:

1.安装 Maven。首先确保计算机已经正确安装安装maven

2.生成工程框架。在工程根目录处执行如下命令:

mvn archetype:generate -DgroupId=com.aliyun.emrtoes -DartifactId=emrtoes -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

mvn 会自动生成一个空的 Sample 工程,工程名为emrtoes(和指定的artifactId一致),里面包含一个简单的 pom.xml 和 App 类(类的包路径和指定的 groupId 一致)

3.加入 Hadoop 和ES-Hadoop依赖。使用任意 IDE 打开这个工程,编辑 pom.xml 文件。在 dependencies 内添加如下内容:

    <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-mapreduce-client-common</artifactId>
         <version>2.7.3</version>
     </dependency>
     <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-common</artifactId>
         <version>2.7.3</version>
     </dependency>
      <dependency>
          <groupId>org.elasticsearch</groupId>
          <artifactId>elasticsearch-hadoop-mr</artifactId>
          <version>5.5.3</version>
      </dependency>

4.添加打包插件。由于使用了第三方库,需要把第三方库打包到jar文件中,在pom.xml中添加maven-assembly-plugin插件的坐标:

    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <archive>
            <manifest>
              <mainClass>com.aliyun.emrtoes.EmrToES</mainClass>
            </manifest>
          </archive>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.0</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer">
                </transformer>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>

5.编写代码。在com.aliyun.emrtoes包下和 App 类平行的位置添加新类 EmrToES.java。内容如下:

package com.aliyun.emrtoes;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import java.io.IOException;

public class EmrToES {

    public static class MyMapper extends Mapper<Object, Text, NullWritable, Text> {
        private Text line = new Text();

        @Override
        protected void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            if (value.getLength() > 0) {
                line.set(value);
                context.write(NullWritable.get(), line);
            }
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        //阿里云 Elasticsearch X-PACK用户名和密码
        conf.set("es.net.http.auth.user", "你的X-PACK用户名");
        conf.set("es.net.http.auth.pass", "你的X-PACK密码");

        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
        conf.set("es.nodes", "你的Elasticsearch内网地址");
        conf.set("es.port", "9200");
        conf.set("es.nodes.wan.only", "true");
        conf.set("es.resource", "blog/yunqi");
        conf.set("es.mapping.id", "id");
        conf.set("es.input.json", "yes");

        Job job = Job.getInstance(conf, "EmrToES");
        job.setJarByClass(EmrToES.class);

        job.setMapperClass(MyMapper.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(EsOutputFormat.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

6.编译并打包。在工程的目录下,执行如下命令:

mvn clean package 

执行完毕以后,可在工程目录的 target 目录下看到一个emrtoes-1.0-SNAPSHOT-jar-with-dependencies.jar,这个就是作业 jar 包。

_2018_06_21_3_49_00

图18 IDEA中EMR写ES工程目录截图

三、EMR中完成作业

3.1 测试数据

把下面的数据写入到blog.json中:

{"id":"1","title":"git简介","posttime":"2016-06-11","content":"svn与git的最主要区别..."}
{"id":"2","title":"ava中泛型的介绍与简单使用","posttime":"2016-06-12","content":"基本操作:CRUD ..."}
{"id":"3","title":"SQL基本操作","posttime":"2016-06-13","content":"svn与git的最主要区别..."}
{"id":"4","title":"Hibernate框架基础","posttime":"2016-06-14","content":"Hibernate框架基础..."}
{"id":"5","title":"Shell基本知识","posttime":"2016-06-15","content":"Shell是什么..."}

上传到阿里云E-MapReduce集群,使用scp远程拷贝命令上传文件:

scp blog.json root@你的弹性公网IP:/root

把blog.json上传至HDFS:

hadoop fs -mkdir /work
hadoop fs -put blog.json /work

3.2 上传JAR包

把maven工程target目录下的jar包上传至阿里云E-MapReduce集群:

scp target/emrtoes-1.0-SNAPSHOT-jar-with-dependencies.jar root@YourIP:/root

3.3 执行MR作业

hadoop jar emrtoes-1.0-SNAPSHOT-jar-with-dependencies.jar /work/blog.json

运行成功的话,控制台会输出如下图所示信息:

_2018_06_21_3_54_54

图19 阿里云E-MapReduce运行MR作业截图
命令查询Elasticsearch中的数据:
curl -u elastic -XGET es-cn-v0h0jdp990001rta9.elasticsearch.aliyuncs.com:9200/blog/_search?pretty

_2018_06_21_3_57_28

图20 命令查看阿里云Elasticsearch中的数据
或者在Kibana中查看:

_2018_06_21_3_58_21

图21 Kibana中查看阿里云Elasticsearch中的数据

四、API分析

Map过程,按行读入,input kye的类型为Object,input value的类型为Text。输出的key为NullWritable类型,NullWritable是Writable的一个特殊类,实现方法为空实现,不从数据流中读数据,也不写入数据,只充当占位符。MapReduce中如果不需要使用键或值,就可以将键或值声明为NullWritable,这里把输出的key设置NullWritable类型。输出为BytesWritable类型,把json字符串序列化。

因为只需要写入,没有Reduce过程。配置参数说明如下:

  • conf.set("es.net.http.auth.user", "你的X-PACK用户名");
    设置X-PACK的用户名
  • conf.set("es.net.http.auth.pass", "你的X-PACK密码");
    设置X-PACK的密码
  • conf.setBoolean("mapred.map.tasks.speculative.execution", false);

关闭mapper阶段的执行推测

  • conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);

关闭reducer阶段的执行推测

  • conf.set("es.nodes", "你的Elasticsearch内网地址");

配置Elasticsearch的IP和端口

  • conf.set("es.resource", "blog/yunqi");

设置索引到Elasticsearch的索引名和类型名。

  • conf.set("es.mapping.id", "id");

设置文档id,这个参数”id”是文档中的id字段

  • conf.set("es.input.json", "yes");

指定输入的文件类型为json。

  • job.setInputFormatClass(TextInputFormat.class);

设置输入流为文本类型

  • job.setOutputFormatClass(EsOutputFormat.class);

设置输出为EsOutputFormat类型。

  • job.setMapOutputKeyClass(NullWritable.class);

设置Map的输出key类型为NullWritable类型

  • job.setMapOutputValueClass(BytesWritable.class);

设置Map的输出value类型为BytesWritable类型

  • FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
    传入HDFS上的文件路径

五、总结

首先,总结一下实践过程中遇到的问题:

1.ClassNotFoundException异常
遇到找不到EsOutputFormat所在类,导致的ClassNotFoundException异常:

java.lang.NoClassDefFoundError: org/elasticsearch/hadoop/mr/EsOutputFormat

解决办法,使用maven-assembly-plugin插件,把第三方库打到jar包中。

2. 连接不到Elasticsearch集群

连接不到Elasticsearch集群的第一个原因是没有配置X-PACK 的用户名和密码,加上以下两行配置:

 conf.set("es.net.http.auth.user", "你的X-PACK用户名");
 conf.set("es.net.http.auth.pass", "你的X-PACK密码");

第二个原因就是EMR集群和Elasticsearch集群网络不通,在创建集群的时尽量选择同一区域,比如EMR集群在华东1区,Elasticsearch集群也在华东1区,事先用Ping命令测试。

第三个原因是端口,一般TCP端口(比如使用Java客户端)是9300,ES-Hadoop中使用的仍然是9200端口.

3.Reduce过程中格式错误
注意测试文件中每一行都是一个JSON,在设置中加上:

conf.set("es.input.json", "yes");

否则会出现解析文件格式异常。

最后,ES-Hadoop连通了Hadoop和Elasticsearch两个大数据生态圈,本博客做了写数据的实践案例,更多资料请参考阿里云E-MapReduce 阿里云Elasticsearch

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
30天前
|
分布式计算 Java Hadoop
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
28 1
|
13天前
|
存储 人工智能 自然语言处理
Elasticsearch Inference API增加对阿里云AI的支持
本文将介绍如何在 Elasticsearch 中设置和使用阿里云的文本生成、重排序、稀疏向量和稠密向量服务,提升搜索相关性。
59 14
Elasticsearch Inference API增加对阿里云AI的支持
|
26天前
|
Web App开发 JavaScript Java
elasticsearch学习五:springboot整合 rest 操作elasticsearch的 实际案例操作,编写搜索的前后端,爬取京东数据到elasticsearch中。
这篇文章是关于如何使用Spring Boot整合Elasticsearch,并通过REST客户端操作Elasticsearch,实现一个简单的搜索前后端,以及如何爬取京东数据到Elasticsearch的案例教程。
156 0
elasticsearch学习五:springboot整合 rest 操作elasticsearch的 实际案例操作,编写搜索的前后端,爬取京东数据到elasticsearch中。
|
30天前
|
SQL 分布式计算 Hadoop
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
31 4
|
30天前
|
SQL 分布式计算 关系型数据库
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
49 3
|
30天前
|
SQL
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
32 2
|
30天前
|
分布式计算 Java Hadoop
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
61 1
|
30天前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
77 0
|
30天前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
34 0
|
30天前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
44 0