在 EMR 中使用 ES-Hadoop

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介:

在 EMR 中使用 ES-Hadoop

ES-Hadoop 是 Elasticsearch(ES) 推出的专门用于对接 Hadoop 生态的工具,使得用户可以使用 Mapreduce(MR)、Spark、Hive 等工具处理 ES 上的数据(ES-Hadoop 还包含另外一部分:将 ES 的索引 snapshot 到 HDFS,对于该内容本文暂不讨论)。众所周知,Hadoop 生态的长处是处理大规模数据集,但是其缺点也很明显,就是当用于交互式分析时,查询时延会比较长。而 ES 是这方面的好手,对于很多查询类型,特别是 ad-hoc 查询,基本可以做到秒级。ES-Hadoop 的推出提供了一种组合两者优势的可能性。使用 ES-Hadoop,用户只需要对自己代码做出很小的改动,即可以快速处理存储在 ES 中的数据,并且能够享受到 ES 带来的加速效果。

ES-Hadoop 的逻辑是将 ES 作为 MR/Spark/Hive 等数据处理引擎的“数据源”,在计算存储分离的架构中扮演存储的角色。这和 MR/Spark/Hive 的其他数据源并无差异。但相对于其他数据源, ES 具有更快的数据选择过滤能力。这种能力正是分析引擎最为关键的能力之一。

EMR 中已经添加了对 ES-Hadoop 的支持,用户不需要做任何配置即可使用 ES-Hadoop。下面我们通过几个例子,介绍如何在 EMR 中使用 ES-Hadoop。

准备

ES 有自动创建索引的功能,能够根据输入数据自动推测数据类型。这个功能在某些情况下很方便,避免了用户很多额外的操作,但是也产生了一些问题。最重要的问题是 ES 推测的类型和我们预期的类型不一致。比如我们定义了一个字段叫 age,INT 型,在 ES 索引中可能被索引成了 LONG 型。在执行一些操作的时候会带来类型转换问题。为此,我们建议手动创建索引。

在下面几个例子中,我们将使用同一个索引 company 和一个类型 employees(ES 索引可以看成一个 database,类型可以看做 database 下的一张表),该类型定义了四个字段(字段类型均为 ES 定义的类型):

{
  "id": long,
  "name": text,
  "age": integer,
  "birth": date
}

在 kibana 中运行如下命令创建索引(或用相应的 curl 命令)

PUT company
{
  "mappings": {
    "employees": {
      "properties": {
        "id": {
          "type": "long"
        },
        "name": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "birth": {
          "type": "date"
        },
        "addr": {
          "type": "text"
        }
      }
    }
  },
  "settings": {
    "index": {
      "number_of_shards": "5",
      "number_of_replicas": "1"
    }
  }
}

其中 settings 中的索引参数可根据需要设定,也可以不具体设定 settings。

准备一个文件,每一行为一个 json 对象,如下所示,

{"id": 1, "name": "zhangsan", "birth": "1990-01-01", "addr": "No.969, wenyixi Rd, yuhang, hangzhou"}
{"id": 2, "name": "lisi", "birth": "1991-01-01", "addr": "No.556, xixi Rd, xihu, hangzhou"}
{"id": 3, "name": "wangwu", "birth": "1992-01-01", "addr": "No.699 wangshang Rd, binjiang, hangzhou"}

并保存至 HDFS 指定目录(如 "/es-hadoop/employees.txt")。

Mapreduce

在下面这个例子中,我们读取 hdfs 上 /es-hadoop 目录下的 json 文件,并将这些 json 文件中的每一行作为一个 document 写入 es。写入过程由 EsOutputFormat 在 map 阶段完成。

这里对 ES 的设置主要是如下几个选项

  • es.nodes: ES 节点,为 host:port 格式。对于阿里云托管式 ES,此处应为阿里云提供的 ES 访问域名
  • es.net.http.auth.user: 用户名
  • es.net.http.auth.pass: 用户密码
  • es.nodes.wan.only: 对于阿里云托管式 ES,此处应设置为 true
  • es.resource: ES 索引和类型
  • es.input.json: 如果原始文件为 json 类型,设置为 true,否则,需要在 map 函数中自己解析原始数据,生成相应的 Writable 输出

注意:

  • 关闭 map 和 reduce 的推测执行机制
package com.aliyun.emr;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.elasticsearch.hadoop.mr.EsOutputFormat;

public class Test implements Tool {

  private Configuration conf;

  @Override
  public int run(String[] args) throws Exception {

    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

    conf.setBoolean("mapreduce.map.speculative", false);
    conf.setBoolean("mapreduce.reduce.speculative", false);
    conf.set("es.nodes", "<your_es_host>:9200");
    conf.set("es.net.http.auth.user", "<your_username>");
    conf.set("es.net.http.auth.pass", "<your_password>");
    conf.set("es.nodes.wan.only", "true");
    conf.set("es.resource", "company/employees");
    conf.set("es.input.json", "yes");

    Job job = Job.getInstance(conf);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(EsOutputFormat.class);
    job.setMapOutputKeyClass(NullWritable.class);
    job.setMapOutputValueClass(Text.class);
    job.setJarByClass(Test.class);
    job.setMapperClass(EsMapper.class);

    FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));

    return job.waitForCompletion(true) ? 0 : 1;
  }

  @Override
  public void setConf(Configuration conf) {
    this.conf = conf;
  }

  @Override
  public Configuration getConf() {
    return conf;
  }

  public static class EsMapper extends Mapper<Object, Text, NullWritable, Text> {
    private Text doc = new Text();

    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      if (value.getLength() > 0) {
        doc.set(value);
        context.write(NullWritable.get(), doc);
      }
    }
  }

  public static void main(String[] args) throws Exception {
    int ret = ToolRunner.run(new Test(), args);
    System.exit(ret);
  }
}

将该代码编译打包为 mr-test.jar, 上传至装有 emr 客户端的机器(如 gateway,或者 EMR cluster 任意一台机器)。

在装有 EMR 客户端的机器上运行如下命令执行 mapreduce 程序:

hadoop jar mr-test.jar com.aliyun.emr.Test -Dmapreduce.job.reduces=0 -libjars mr-test.jar /es-hadoop

即可完成向 ES 写数据。具体写入的数据可以通过 kibana 查询(或者通过相应的 curl 命令):

GET
{
  "query": {
    "match_all": {}
  }
}

Spark

本示例同 mapreduce 一样,也是向 ES 的一个索引写入数据,只不过是通过 spark 来执行。这里 spark 借助 JavaEsSpark 类将一份 RDD 持久化到 es。同上述 mapreduce 程序一样,用户也需要注意几个选项的设置。

package com.aliyun.emr;

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.spark_project.guava.collect.ImmutableMap;

public class Test {

  public static void main(String[] args) {
    SparkConf conf = new SparkConf();
    conf.setAppName("Es-test");
    conf.set("es.nodes", "<your_es_host>:9200");
    conf.set("es.net.http.auth.user", "<your_username>");
    conf.set("es.net.http.auth.pass", "<your_password>");
    conf.set("es.nodes.wan.only", "true");

    SparkSession ss = new SparkSession(new SparkContext(conf));
    final AtomicInteger employeesNo = new AtomicInteger(0);
    JavaRDD<Map<Object, ?>> javaRDD = ss.read().text("hdfs://emr-header-1:9000/es-hadoop/employees.txt")
        .javaRDD().map((Function<Row, Map<Object, ?>>) row -> ImmutableMap.of("employees" + employeesNo.getAndAdd(1), row.mkString()));

    JavaEsSpark.saveToEs(javaRDD, "company/employees");
  }
}

将其打包成 spark-test.jar,运行如下命令执行写入过程

spark-submit --master yarn --class com.aliyun.emr.Test spark-test.jar

待任务执行完毕后可以使用 kibana 或者 curl 命令查询结果。

除了 spark rdd 操作,es-hadoop 还提供了使用 sparksql 来读写 ES。详细请参考 ES-Hadoop 官方页面

Hive

这里展示使用 Hive 通过 SQL 来读写 ES 的方法。

首先运行 hive 命令进入交互式环境,先创建一个表

CREATE DATABASE IF NOT EXISTS company;

之后创建一个外部表,表存储在 ES 上,通过 TBLPROPERTIES 来设置对接 ES 的各个选项:

CREATE EXTERNAL table IF NOT EXISTS employees(
  id BIGINT,
  name STRING,
  birth TIMESTAMP,
  addr STRING
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
    'es.resource' = 'tpcds/ss',
    'es.nodes' = '<your_es_host>',
    'es.net.http.auth.user' = '<your_username>',
    'es.net.http.auth.pass' = '<your_password>',
    'es.nodes.wan.only' = 'true',
    'es.resource' = 'company/employees'
);

注意在 Hive 表中我们将 birth 设置成了 TIMESTAMP 类型,而在 ES 中我们将其设置成了 DATE 型。这是因为 Hive 和 ES 对于数据格式处理不一致。在写入时,Hive 将原始 date 转换后发送给 ES 可能会解析失败,相反在读取时,ES 返回的格式 Hive 也可能解析失败。参见这里

往表中插入一些数据:

INSERT INTO TABLE employees VALUES (1, "zhangsan", "1990-01-01","No.969, wenyixi Rd, yuhang, hangzhou");
INSERT INTO TABLE employees VALUES (2, "lisi", "1991-01-01", "No.556, xixi Rd, xihu, hangzhou");
INSERT INTO TABLE employees VALUES (3, "wangwu", "1992-01-01", "No.699 wangshang Rd, binjiang, hangzhou");

执行查询即可看到结果:

SELECT * FROM employees LIMIT 100;
OK
1    zhangsan    1990-01-01    No.969, wenyixi Rd, yuhang, hangzhou
2    lisi    1991-01-01    No.556, xixi Rd, xihu, hangzhou
3    wangwu    1992-01-01    No.699 wangshang Rd, binjiang, hangzhou
相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
目录
相关文章
|
分布式计算 Hadoop 大数据
第3期:自建Hadoop集群 VS 阿里云EMR,差距居然这么大?
本期将为大家带来开源大数据平台E-MapReduce与自建Hadoop集群对比,一起来看看吧~
918 1
第3期:自建Hadoop集群 VS 阿里云EMR,差距居然这么大?
|
分布式计算 Hadoop 大数据
自建Hadoop集群 VS 阿里云EMR,差距居然这么大?
本期将为大家带来开源大数据平台E-MapReduce与自建Hadoop集群对比,一起来看看吧~
1072 0
自建Hadoop集群 VS 阿里云EMR,差距居然这么大?
|
弹性计算 分布式计算 安全
自建 Hadoop 数据迁移到阿里云EMR集群
客户在 IDC 或者公有云环境自建 Hadoop 集群,数据集中保存在 HDFS文件系统用于数据分析任务。客户在决定上云之后,会将自建 Hadoop 集群的数据迁移到阿里云自建 Hadoop 集群或者 EMR 集群。本实践方案提供安全和低成本的 HDFS 数据迁移方案。
自建 Hadoop 数据迁移到阿里云EMR集群
|
消息中间件 弹性计算 分布式计算
自建Hadoop数据迁移到阿里云EMR
客户在IDC或者公有云环境自建Hadoop集群,数据集中保存在HDFS文件系统用于数据分析任务。客户在决定上云之后,会将自建Hadoop集群的数据迁移到阿里云自建部署架构图 Hadoop集群或者EMR集群。本实践方案提供安全和低成本的HDFS数据迁移方案。
自建Hadoop数据迁移到阿里云EMR
|
机器学习/深度学习 弹性计算 分布式计算
EMR:一体化Hadoop云上工作平台
Hadoop生态体系日臻完善,如何利用Hadoop生态各项技术与阿里云更好的服务于企业。EMR最新发布的工作流管理、弹性伸缩、异构计算多项功能,更好的助力用户在阿里云上利用Hadoop、Spark生态体系解决企业大数据问题。
2507 0
|
分布式计算 Hadoop
EMR 升级Hadoop 2.8.5
信息摘要: EMR 提供Hadoop 2.8.5,方便开发者使用新版Hadoop功能。适用客户: 所有客户版本/规格功能: EMR-3.18.0及以后版本,Hadoop升级为2.8.5,开发者可以方便地使用新版Hadoop的功能。
|
存储 分布式计算 Shell
EMR(hadoop/hbase/phoenix夸集群数据迁移采坑记录)
一、概述: Hbase(Phoenix)数据迁移方案主要分为 Hadoop层面(distcp)、及Hbase层面(copyTable、export/import、snapshot) 二、以下针对distcp方案详细说明(以亲测阿里EMR为例): st...
2594 0
|
SQL 分布式计算 关系型数据库
自建hadoop集群迁移到EMR之数据迁移篇
自建集群要迁移到EMR集群,往往需要迁移已有数据。本文主要介绍hdfs数据和hive meta数据如何迁移。 前置 已按需求创建好EMR集群。 迁移hdfs数据 主要依靠distcp,核心是打通网络,确定hdfs参数和要迁移内容,测速,迁移。
3816 0