在 EMR 中使用 Mongo-Hadoop

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介:

在 EMR 中使用 Mongo-Hadoop

Mongo-Hadoop 是 MongoDB 推出的用于 Hadoop 系列组件连接 MongoDB 的组件。其原理跟我们上一篇文章介绍的 ES-Hadoop 类似。EMR 中已经集成了 Mongo-Hadoop,用户不用做任何部署配置,即可使用 Mongo-Hadoop。下面我们通过几个例子来展示一下 Mongo-Hadoop 的用法。

准备

在下面这几个例子中,我们使用一个统一的数据模型

{
  "id": long,
  "name": text,
  "age": integer,
  "birth": date
}

由于我们是要通过 Mongo-Hadoop 向 MongoDB 的特定 collection (可以理解成数据库中的表)写数据,因此需要首先确保 MongoDB 上存在这个 collection。为此,首先需要在一台能够连接到 MongoDB 的客户机上运行 mongo client(你可能需要安装一下客户端程序,客户端程序可在 mongo 官网下载)。我们以连接阿里云数据库 MongoDB 版为例:

mongo --host dds-xxxxxxxxxxxxxxxxxxxxx.mongodb.rds.aliyuncs.com:3717 --authenticationDatabase admin -u root -p 123456

其中 dds-xxxxxxxxxxxxxxxxxxxxx.mongodb.rds.aliyuncs.com 为 MongoDB 的主机名,3717 为端口号(该端口号根据您的 MongoDB 集群而定,对于自建集群,默认为 27017),-p 为密码(这里假设密码为 123456)。进入交互式页面,运行如下命令,在 company 数据库下创建名为 employees 的 collection:

> use company;
> db.createCollection("employees")

准备一个文件,每一行为一个 json 对象,如下所示,

{"id": 1, "name": "zhangsan", "birth": "1990-01-01", "addr": "No.969, wenyixi Rd, yuhang, hangzhou"}
{"id": 2, "name": "lisi", "birth": "1991-01-01", "addr": "No.556, xixi Rd, xihu, hangzhou"}
{"id": 3, "name": "wangwu", "birth": "1992-01-01", "addr": "No.699 wangshang Rd, binjiang, hangzhou"}

并保存至 HDFS 指定目录(如 "/mongo-hadoop/employees.txt")。

Mapreduce

在下面这个例子中,我们读取 HDFS 上 /mongo-hadoop 目录下的 json 文件,并将这些 json 文件中的每一行作为一个 document 写入 MongoDB。

package com.aliyun.emr;

import com.mongodb.BasicDBObject;
import com.mongodb.hadoop.MongoOutputFormat;
import com.mongodb.hadoop.io.BSONWritable;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Test implements Tool {

  private Configuration conf;

  @Override
  public int run(String[] args) throws Exception {

    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

    conf.set("mongo.output.uri", "mongodb://<your_username>:<your_password>@dds-xxxxxxxxxxxxxxxxxxxxx.mongodb.rds.aliyuncs.com:3717/company.employees?authSource=admin");

    Job job = Job.getInstance(conf);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(MongoOutputFormat.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(BSONWritable.class);

    job.setJarByClass(Test.class);
    job.setMapperClass(MongoMapper.class);

    FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));

    return job.waitForCompletion(true) ? 0 : 1;
  }

  @Override
  public Configuration getConf() {
    return conf;
  }

  @Override
  public void setConf(Configuration conf) {
    this.conf = conf;
  }

  public static class MongoMapper extends Mapper<Object, Text, Text, BSONWritable> {

    private BSONWritable doc = new BSONWritable();
    private int employeeNo = 1;
    private Text id;

    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      if (value.getLength() > 0) {
        doc.setDoc(BasicDBObject.parse(value.toString()));
        id = new Text("employee" + employeeNo++);
        context.write(id, doc);
      }
    }
  }

  public static void main(String[] args) throws Exception {
    int ret = ToolRunner.run(new Test(), args);
    System.exit(ret);
  }
}

将该代码编译打包为 mr-test.jar, 运行

hadoop jar mr-test.jar com.aliyun.emr.Test -Dmapreduce.job.reduces=0 -libjars mr-test.jar /mongo-hadoop

待任务执行完毕后可以使用 MongoDB 客户端查询结果:

> db.employees.find();
{ "_id" : "employee1", "id" : 1, "name" : "zhangsan", "birth" : "1990-01-01", "addr" : "No.969, wenyixi Rd, yuhang, hangzhou" }
{ "_id" : "employee2", "id" : 2, "name" : "lisi", "birth" : "1991-01-01", "addr" : "No.556, xixi Rd, xihu, hangzhou" }
{ "_id" : "employee3", "id" : 3, "name" : "wangwu", "birth" : "1992-01-01", "addr" : "No.699 wangshang Rd, binjiang, hangzhou" }

Spark

本示例同 Mapreduce 一样,也是向 MongoDB 写入数据,只不过是通过 Spark 来执行。

package com.aliyun.emr;

import com.mongodb.BasicDBObject;
import com.mongodb.hadoop.MongoOutputFormat;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.bson.BSONObject;
import scala.Tuple2;

public class Test {

  public static void main(String[] args) {

    SparkSession ss = new SparkSession(new SparkContext());

    final AtomicInteger employeeNo = new AtomicInteger(0);
    JavaRDD<Tuple2<Object, BSONObject>> javaRDD =
        ss.read().text("hdfs://emr-header-1:9000/mongo-hadoop/employees.txt")
            .javaRDD().map((Function<Row, Tuple2<Object, BSONObject>>) row -> {
          BSONObject bson = BasicDBObject.parse(row.mkString());
          return new Tuple2<>("employee" + employeeNo.getAndAdd(1), bson);
        });

    JavaPairRDD<Object, BSONObject> documents = JavaPairRDD.fromJavaRDD(javaRDD);

    Configuration outputConfig = new Configuration();
    outputConfig.set("mongo.output.uri", "mongodb://<your_username>:<your_password>@dds-xxxxxxxxxxxxxxxxxxxxx.mongodb.rds.aliyuncs.com:3717/company.employees?authSource=admin");

    // 将其保存为一个 "hadoop 文件",实际上通过 MongoOutputFormat 写入 mongo。
    documents.saveAsNewAPIHadoopFile(
        "file:///this-is-completely-unused",
        Object.class,
        BSONObject.class,
        MongoOutputFormat.class,
        outputConfig
    );
  }
}

将其打包成 spark-test.jar,运行如下命令执行写入过程

spark-submit --master yarn --class com.aliyun.emr.Test spark-test.jar

待任务执行完毕后可以使用 MongoDB 客户端查询结果。

Hive

这里展示使用 Hive 通过 SQL 来读写 MongoDB 的方法。

首先运行 hive 命令进入交互式环境,先创建一个表

CREATE DATABASE IF NOT EXISTS company;

之后创建一个外部表,表存储在 MongoDB 上。但是创建外部表之前,注意像第一小节中介绍的,需要首先创建 MongoDB Collection —— employees。

下面回到 Hive 交互式控制台,运行如下 SQL 创建一个外部表,MongoDB 连接通过 TBLPROPERTIES 来设置:

CREATE EXTERNAL TABLE IF NOT EXISTS employees(
  id BIGINT,
  name STRING,
  birth STRING,
  addr STRING
)
STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler'
WITH SERDEPROPERTIES('mongo.columns.mapping'='{"id":"_id"}')
TBLPROPERTIES('mongo.uri'='mongodb://<your_username>:<your_password>@dds-xxxxxxxxxxxxxxxxxxxxx.mongodb.rds.aliyuncs.com:3717/company.employees?authSource=admin');

注意这里通过 SERDEPROPERTIES 把 Hive 的字段 "id" 和 MongoDB 的字段 "_id" 做了映射(用户可以根据自身需要选择做或者不做某些映射)。
另外注意在 Hive 表中我们将 birth 设置成了 STRING 类型。这是因为 Hive 和 MongoDB 对于数据格式处理的不一致造成的问题。Hive 将原始 date 转换后发送给 MongoDB,然后再从 Hive 中查询可能会得到 NULL。

往表中插入一些数据:

INSERT INTO TABLE employees VALUES (1, "zhangsan", "1990-01-01","No.969, wenyixi Rd, yuhang, hangzhou");
INSERT INTO TABLE employees VALUES (2, "lisi", "1991-01-01", "No.556, xixi Rd, xihu, hangzhou");
INSERT INTO TABLE employees VALUES (3, "wangwu", "1992-01-01", "No.699 wangshang Rd, binjiang, hangzhou");

执行查询即可看到结果:

SELECT * FROM employees LIMIT 100;
OK
1    zhangsan    1990-01-01    No.969, wenyixi Rd, yuhang, hangzhou
2    lisi    1991-01-01    No.556, xixi Rd, xihu, hangzhou
3    wangwu    1992-01-01    No.699 wangshang Rd, binjiang, hangzhou
相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
目录
相关文章
|
分布式计算 Hadoop 大数据
第3期:自建Hadoop集群 VS 阿里云EMR,差距居然这么大?
本期将为大家带来开源大数据平台E-MapReduce与自建Hadoop集群对比,一起来看看吧~
904 1
第3期:自建Hadoop集群 VS 阿里云EMR,差距居然这么大?
|
分布式计算 Hadoop 大数据
自建Hadoop集群 VS 阿里云EMR,差距居然这么大?
本期将为大家带来开源大数据平台E-MapReduce与自建Hadoop集群对比,一起来看看吧~
1064 0
自建Hadoop集群 VS 阿里云EMR,差距居然这么大?
|
弹性计算 分布式计算 安全
自建 Hadoop 数据迁移到阿里云EMR集群
客户在 IDC 或者公有云环境自建 Hadoop 集群,数据集中保存在 HDFS文件系统用于数据分析任务。客户在决定上云之后,会将自建 Hadoop 集群的数据迁移到阿里云自建 Hadoop 集群或者 EMR 集群。本实践方案提供安全和低成本的 HDFS 数据迁移方案。
自建 Hadoop 数据迁移到阿里云EMR集群
|
消息中间件 弹性计算 分布式计算
自建Hadoop数据迁移到阿里云EMR
客户在IDC或者公有云环境自建Hadoop集群,数据集中保存在HDFS文件系统用于数据分析任务。客户在决定上云之后,会将自建Hadoop集群的数据迁移到阿里云自建部署架构图 Hadoop集群或者EMR集群。本实践方案提供安全和低成本的HDFS数据迁移方案。
自建Hadoop数据迁移到阿里云EMR
|
机器学习/深度学习 弹性计算 分布式计算
EMR:一体化Hadoop云上工作平台
Hadoop生态体系日臻完善,如何利用Hadoop生态各项技术与阿里云更好的服务于企业。EMR最新发布的工作流管理、弹性伸缩、异构计算多项功能,更好的助力用户在阿里云上利用Hadoop、Spark生态体系解决企业大数据问题。
2501 0
|
分布式计算 Hadoop
EMR 升级Hadoop 2.8.5
信息摘要: EMR 提供Hadoop 2.8.5,方便开发者使用新版Hadoop功能。适用客户: 所有客户版本/规格功能: EMR-3.18.0及以后版本,Hadoop升级为2.8.5,开发者可以方便地使用新版Hadoop的功能。
|
存储 分布式计算 Shell
EMR(hadoop/hbase/phoenix夸集群数据迁移采坑记录)
一、概述: Hbase(Phoenix)数据迁移方案主要分为 Hadoop层面(distcp)、及Hbase层面(copyTable、export/import、snapshot) 二、以下针对distcp方案详细说明(以亲测阿里EMR为例): st...
2591 0
|
SQL 分布式计算 Java
|
SQL 分布式计算 关系型数据库
自建hadoop集群迁移到EMR之数据迁移篇
自建集群要迁移到EMR集群,往往需要迁移已有数据。本文主要介绍hdfs数据和hive meta数据如何迁移。 前置 已按需求创建好EMR集群。 迁移hdfs数据 主要依靠distcp,核心是打通网络,确定hdfs参数和要迁移内容,测速,迁移。
3812 0