MapReduce编程(六) 从HDFS导入数据到Elasticsearch

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 一、Elasticsearch for Hadoop安装Elasticsearch for Hadoop并不像logstash、kibana一样是一个独立的软件,而是Hadoop和Elasticsearch交互所需要的jar包。

一、Elasticsearch for Hadoop安装

Elasticsearch for Hadoop并不像logstash、kibana一样是一个独立的软件,而是Hadoop和Elasticsearch交互所需要的jar包。所以,有直接下载和maven导入2种方式。安装之前确保JDK版本不要低于1.8,Elasticsearch版本不能低于1.0。
官网对声明是对Hadoop 1.1.x、1.2.x、2.2.x、2.4.x、2.6.x、2.7.x测试通过,支持较好,其它版本的也并不是不能用。

1.1 官网下载zip安装包

官网给的Elasticsearch for Hadoop的地址https://www.elastic.co/downloads/hadoop,下载后解压后,把dist目录下jar包导入hadoop即可。

1.2 maven方式下载

使用maven管理jar包更加方便,在pom.xml文件中加入以下依赖:

<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-hadoop</artifactId>
  <version>2.3.3</version>
</dependency>

我之前使用Intellij Idea搭建了一个MapReduce的环境,参考这里MapReduce编程(一) Intellij Idea配置MapReduce编程环境,pom.xml中再加入上面elasticsearch-hadoop的依赖即可,注意版本。

上面的依赖包含了MapReduce、Pig、Hive、Spark等完整的依赖,如果只想单独使用某一个功能,可以细化分别加入。

Map/Reduce:

<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-hadoop-mr</artifactId>
  <version>2.3.3</version>
</dependency>

Apache Hive:

<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-hadoop-hive</artifactId>
  <version>2.3.3</version>
</dependency>

Apache Pig.

<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-hadoop-pig</artifactId>
  <version>2.3.3</version>
</dependency>

Apache Spark.

<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-spark-20_2.10</artifactId>
  <version>2.3.3</version>
</dependency>

1.3 将ES-hadoop 的jar包加入环境变量

把ES-HADOOP的jar包导入环境变量,编辑/etc/profile(或.bash_profile),我的路径为:

/usr/local/Cellar/elasticsearch-hadoop-2.4.3

jar包位于elasticsearch-hadoop-2.4.3的dist目录下。编辑/etc/profile,加入一行:

#ES-HADOOP HOME
export EsHadoop_HOME=/usr/local/Cellar/elasticsearch-hadoop-2.4.3
export CLASSPATH=$CLASSPATH:$EsHadoop_HOME/dist/*

最后使profile文件生效:

source /etc/profile

二、准备数据

准备一些测试数据,数据内容为json格式,每行是一条文档。把下列内容保存到blog.json中。

{"id":"1","title":"git简介","posttime":"2016-06-11","content":"svn与git的最主要区别..."}
{"id":"2","title":"ava中泛型的介绍与简单使用","posttime":"2016-06-12","content":"基本操作:CRUD ..."}
{"id":"3","title":"SQL基本操作","posttime":"2016-06-13","content":"svn与git的最主要区别..."}
{"id":"4","title":"Hibernate框架基础","posttime":"2016-06-14","content":"Hibernate框架基础..."}
{"id":"5","title":"Shell基本知识","posttime":"2016-06-15","content":"Shell是什么..."}

启动hadoop,上传blog.json到hdfs:

hadoop fs -put blog.json /work

三、从HDFS读取文档索引到ES

从HDFS读取文档索引到Elasticsearch的代码:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.elasticsearch.hadoop.mr.EsOutputFormat;

import java.io.IOException;

/**
 * Created by bee on 4/1/17.
 */
public class HdfsToES {

    public static class MyMapper extends Mapper<Object, Text, NullWritable,
            BytesWritable> {

        public void map(Object key, Text value, Mapper<Object, Text,
                NullWritable, BytesWritable>.Context context) throws IOException, InterruptedException {
            byte[] line = value.toString().trim().getBytes();
            BytesWritable blog = new BytesWritable(line);
            context.write(NullWritable.get(), blog);
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
        conf.set("es.nodes", "192.168.1.111:9200");
        conf.set("es.resource", "blog/csdn");
        conf.set("es.mapping.id", "id");
        conf.set("es.input.json", "yes");

        Job job = Job.getInstance(conf, "hadoop es write test");
        job.setMapperClass(HdfsToES.MyMapper.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(EsOutputFormat.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(BytesWritable.class);

        // 设置输入路径
        FileInputFormat.setInputPaths(job, new Path
                ("hdfs://localhost:9000//work/blog.json"));
        job.waitForCompletion(true);
    }
}

倒入完成后会在Elasticsearch中生成blog索引。查看内容如下:
这里写图片描述

四、API分析

Map过程,按行读入,input kye的类型为Object,input value的类型为Text。输出的key为NullWritable类型,NullWritable是Writable的一个特殊类,实现方法为空实现,不从数据流中读数据,也不写入数据,只充当占位符。MapReduce中如果不需要使用键或值,就可以将键或值声明为NullWritable,这里把输出的key设置NullWritable类型。输出为BytesWritable类型,把json字符串序列化。

因为只需要写入,没有Reduce过程。在main函数中,首先创Configuration()类的一个对象conf,通过conf配置一些参数。

  • conf.setBoolean("mapred.map.tasks.speculative.execution", false);

    关闭mapper阶段的执行推测

  • conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);

    关闭reducer阶段的执行推测

  • conf.set("es.nodes", "192.168.1.111:9200");

    配置Elasticsearch的IP和端口

  • conf.set("es.resource", "blog/csdn");

    设置索引到Elasticsearch的索引名和类型名。

  • conf.set("es.mapping.id", "id");

    设置文档id,这个参数”id”是文档中的id字段

  • conf.set("es.input.json", "yes");

    指定输入的文件类型为json。

  • job.setInputFormatClass(TextInputFormat.class);

    设置输入流为文本类型

  • job.setOutputFormatClass(EsOutputFormat.class);

    设置输出为EsOutputFormat类型。

  • job.setMapOutputKeyClass(NullWritable.class);

    设置Map的输出key类型为NullWritable类型

  • job.setMapOutputValueClass(BytesWritable.class);

    设置Map的输出value类型为BytesWritable类型


2017年8月21日更新:

5.4版本的map函数:

    public static class MyMapper extends Mapper<Object, Text, NullWritable,
            Text> {
        private Text line = new Text();
        public void map(Object key, Text value, Mapper<Object, Text,
                NullWritable, Text>.Context context) throws IOException, InterruptedException {

            if(value.getLength()>0){
                line.set(value);
                context.write(NullWritable.get(), line);
            }

        }
    }

五、参考资料

https://www.elastic.co/guide/en/elasticsearch/hadoop/current/mapreduce.html

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
4月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
157 3
|
4月前
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
84 1
|
9月前
|
存储 分布式计算 监控
Hadoop【基础知识 01+02】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
389 2
|
4月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
164 0
|
4月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
71 0
|
4月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
93 0
|
9月前
|
分布式计算 数据可视化 Hadoop
【分布式计算框架】HDFS常用操作及编程实践
【分布式计算框架】HDFS常用操作及编程实践
256 1
|
8月前
|
存储 分布式计算 Hadoop
Hadoop生态系统详解:HDFS与MapReduce编程
Apache Hadoop是大数据处理的关键,其核心包括HDFS(分布式文件系统)和MapReduce(并行计算框架)。HDFS为大数据存储提供高容错性和高吞吐量,采用主从结构,通过数据复制保证可靠性。MapReduce将任务分解为Map和Reduce阶段,适合大规模数据集的处理。通过代码示例展示了如何使用MapReduce实现Word Count功能。HDFS和MapReduce的结合,加上YARN的资源管理,构成处理和分析大数据的强大力量。了解和掌握这些基础对于有效管理大数据至关重要。【6月更文挑战第12天】
321 0
|
9月前
|
SQL 关系型数据库 MySQL
Hive【基础知识 02-1】【Hive CLI 命令行工具使用】【准备阶段-建库、建表、导入数据、编写测试SQL脚本并上传HDFS】
【4月更文挑战第7天】Hive【基础知识 02-1】【Hive CLI 命令行工具使用】【准备阶段-建库、建表、导入数据、编写测试SQL脚本并上传HDFS】
110 0
|
9月前
|
存储 SQL 分布式计算
Hadoop(HDFS+MapReduce+Hive+数仓基础概念)学习笔记(自用)
Hadoop(HDFS+MapReduce+Hive+数仓基础概念)学习笔记(自用)
607 0