阿里云 E-MapReduce ClickHouse 操作指南 04 期 — 数据导入

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 阿里云 E-MapReduce(简称 EMR )是运行在阿里云平台上的一种大数据处理的系统解决方案。ClickHouse 作为开源的列式存储数据库,主要用于在线分析处理查询(OLAP),能够使用 SQL 查询实时生成分析数据报告。而阿里云 EMR ClickHouse 则提供了开源 OLAP 分析引擎 ClickHouse 的云上托管服务。

阿里云 E-MapReduce(简称EMR)是运行在阿里云平台上的一种大数据处理的系统解决方案。ClickHouse 作为开源的列式存储数据库,主要用于在线分析处理查询(OLAP),能够使用 SQL 查询实时生成分析数据报告。而阿里云 EMR ClickHouse 则提供了开源 OLAP 分析引擎 ClickHouse 的云上托管服务。


本系列文章将从以下几个方面详细介绍 EMR ClickHouse 的操作指南:


EMR ClickHouse 操作指南 —数据导入

lALPD4PvON_W-SnNAm7NA-0_1005_622.png

一、从 Spark 导入数据至 ClickHouse

——介绍如何将 Spark 中的数据导入至 ClickHouse 集群

前提条件

  • 开发工具
  • 本地安装了 Java JDK 8。
  • 本地安装了 Maven 3.x。
  • 本地安装了用于 Java 或 Scala 开发的 IDE,推荐 IntelliJ IDEA,且已配置完成 JDK 和 Maven 环境。


背景信息

关于 Spark 的更多介绍,请参见简介

代码示例

代码示例如下。

package com.company.packageName
import java.util.Properties
import java.util.concurrent.ThreadLocalRandom
import scala.annotation.tailrec
import com.google.common.collect.ImmutableMap
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SaveMode, SparkSession}
case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double)
object CKDataImporter extends Logging {
  private var dbName: String = "default"
  private var tableName: String = ""
  private var ckHost: String = ""
  private var ckPort: String = "8123"
  private var user: String = "default"
  private var password: String = ""
  private var local: Boolean = false
  def main(args: Array[String]): Unit = {
    parse(args.toList)
    checkArguments()
    val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName"
    logInfo(s"Use jdbc: $jdbcUrl")
    logInfo(s"Use table: $tableName")
    val spark = getSparkSession
    // generate test data
    val rdd = spark.sparkContext.parallelize(1 to 1000).map(i => {
      val rand = ThreadLocalRandom.current()
      val randString = (0 until rand.nextInt(10, 20))
        .map(_ => rand.nextLong())
        .mkString("")
      Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian())
    })
    val df = spark.createDataFrame(rdd)
    df.write
      .mode(SaveMode.Append)
      .jdbc(jdbcUrl, tableName, getCKJdbcProperties(user, password))
  }
  private def printUsageAndExit(exitCode: Int = 0): Unit = {
    logError("Usage: java -jar /path/to/CKDataImporter.jar [options]")
    logError("  --dbName      设置ClickHouse数据库的名称,默认为default")
    logError("  --tableName   设置ClickHouse库中表的名称")
    logError("  --ckHost      设置ClickHouse地址")
    logError("  --ckPort      设置ClickHouse端口,默认为8123")
    logError("  --user        设置ClickHouse所使用的用户名")
    logError("  --password    设置ClickHouse用户的密码,默认为空")
    logError("  --local       设置此程序使用Spark Local模式运行")
    System.exit(exitCode)
  }
  @tailrec
  private def parse(args: List[String]): Unit = args match {
    case ("--help" | "-h") :: _ =>
      printUsageAndExit()
    case "--dbName" :: value :: tail =>
      dbName = value
      parse(tail)
    case "--tableName" :: value :: tail =>
      tableName = value
      parse(tail)
    case "--ckHost" :: value :: tail =>
      ckHost = value
      parse(tail)
    case "--ckPort" :: value :: tail =>
      ckPort = value
      parse(tail)
    case "--user" :: value :: tail =>
      user = value
      parse(tail)
    case "--password" :: value :: tail =>
      password = value
      parse(tail)
    case "--local" :: tail =>
      local = true
      parse(tail)
    case Nil =>
    case _ =>
      printUsageAndExit(1)
  }
  private def checkArguments(): Unit = {
    if ("".equals(tableName) || "".equals(ckHost)) {
      printUsageAndExit(2)
    }
  }
  private def getCKJdbcProperties(
      user: String,
      password: String,
      batchSize: String = "1000",
      socketTimeout: String = "300000",
      numPartitions: String = "8",
      rewriteBatchedStatements: String = "true"): Properties = {
    val kvMap = ImmutableMap.builder()
      .put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
      .put("user", user)
      .put("password", password)
      .put("batchsize", batchSize)
      .put("socket_timeout", socketTimeout)
      .put("numPartitions", numPartitions)
      .put("rewriteBatchedStatements", rewriteBatchedStatements)
      .build()
    val properties = new Properties
    properties.putAll(kvMap)
    properties
  }
  private def getSparkSession: SparkSession = {
    val builder = SparkSession.builder()
    if (local) {
      builder.master("local[*]")
    }
    builder.appName("ClickHouse-Data-Importer")
    builder.getOrCreate()
  }
}

操作流程

步骤一:创建 ClickHouse 表

  1. 使用 SSH 方式登录 ClickHouse 集群,详情请参见使用SSH连接主节点
  2. 执行如下命令,进入 ClickHouse 客户端。
clickhouse-client
  1. 创建 ClickHouse 信息。

i. 执行如下命令,创建数据库 clickhouse_database_name

CREATE DATABASE clickhouse_database_name ON CLUSTER cluster_emr;

阿里云 EMR 会为 ClickHouse 集群自动生成一个名为 cluster_emr 的集群。数据库名您可以自定义。


ii. 执行如下命令,创建表 clickhouse_table_name_local

CREATE TABLE clickhouse_database_name.clickhouse_table_name_local ON CLUSTER cluster_emr (
  id            UInt32,
  key1            String,
  value1        UInt8,
  key2            Int64,
  value2        Float64
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/clickhouse_database_name/clickhouse_table_name_local', '{replica}')
ORDER BY id


说明:表名您可以自定义,但请确保表名是以 _local 结尾。layer、shard 和 replica 是阿里云 EMR 为 ClickHouse 集群自动生成的宏定义,可以直接使用。


 iii. 执行如下命令,创建与表 clickhouse_table_name_local 字段定义一致的表 clickhouse_table_name_all

说明: 表名您可以自定义,但请确保表名是以 _all 结尾。

CREATE TABLE clickhouse_database_name.clickhouse_table_name_all ON CLUSTER cluster_emr (
  id                    UInt32,
  key1                  String,
  value1                UInt8,
  key2                  Int64,
  value2                Float64
) ENGINE = Distributed(cluster_emr, clickhouse_database_name, clickhouse_table_name_local, rand());

步骤二:编译并打包

  1. 下载并解压CKDataImporter示例到本地。
  2. 在 CMD 命令行中,进入到下载文件中 pom.xml 所在的目录下,执行如下命令打包文件。
mvn clean package

根据您pom.xml文件中artifactId的信息,下载文件中的target目录下会出现CKDataImporter-1.0.0.jar的JAR包。


步骤三:提交作业

  1. 使用 SSH 方式登录 Spark 集群,详情请参见使用SSH连接主节点
  2. 执行如下命令提交作业。
spark-submit --master yarn \
             --class com.aliyun.emr.CKDataImporter \
             /CKDataImporter-1.0.0.jar \
             --dbName clickhouse_database_name \
             --tableName clickhouse_table_name_all \
             --ckHost ${clickhouse_host}
参数 说明
dbName ClickHouse集群数据库的名称,默认为default。本文示例为clickhouse_database_name
tableName ClickHouse集群数据库中表的名称。本文示例为clickhouse_table_name_all
ckHost ClickHouse集群的Master节点的内网IP地址或公网IP地址。ip地址获取方式,请参见获取主节点的IP地址

获取主节点的 IP 地址

  1. 进入详情页面。
  1. 登录阿里云E-MapReduce控制台
  2. 在顶部菜单栏处,根据实际情况选择地域和资源组
  3. 单击上方的集群管理页签。
  4. 集群管理页面,单击相应集群所在行的详情
  1. 集群基础信息页面的主机信息区域,获取主节点的内网或公网IP地址。

image.png

二、从 Flink 导入数据至 ClickHouse

——介绍如何将 Flink 中的数据导入至 ClickHouse 集群

前提条件

  • 开发工具
  • 本地安装了 Java JDK 8。
  • 本地安装了 Maven 3.x。
  • 本地安装了用于 Java 或 Scala 开发的 IDE,推荐 IntelliJ IDEA,且已配置完成 JDK 和 Maven 环境。


背景信息

关于 Flink 的更多介绍,请参见Apache Flink

代码示例

代码示例如下。

  • 流处理
package com.company.packageName
import java.util.concurrent.ThreadLocalRandom
import scala.annotation.tailrec
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala.{StreamTableEnvironment, table2RowDataStream}
object StreamingJob {
  case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double)
  private var dbName: String = "default"
  private var tableName: String = ""
  private var ckHost: String = ""
  private var ckPort: String = "8123"
  private var user: String = "default"
  private var password: String = ""
  def main(args: Array[String]) {
    parse(args.toList)
    checkArguments()
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)
    val insertIntoCkSql =
      s"""
        | INSERT INTO $tableName (
        |   id, key1, value1, key2, value2
        | ) VALUES (
        |   ?, ?, ?, ?, ?
        | )
        |""".stripMargin
    val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName"
    println(s"jdbc url: $jdbcUrl")
    println(s"insert sql: $insertIntoCkSql")
    val sink = JDBCAppendTableSink
      .builder()
      .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
      .setDBUrl(jdbcUrl)
      .setUsername(user)
      .setPassword(password)
      .setQuery(insertIntoCkSql)
      .setBatchSize(1000)
      .setParameterTypes(Types.INT, Types.STRING, Types.BOOLEAN, Types.LONG, Types.DOUBLE)
      .build()
    val data: DataStream[Test] = env.fromCollection(1 to 1000).map(i => {
      val rand = ThreadLocalRandom.current()
      val randString = (0 until rand.nextInt(10, 20))
        .map(_ => rand.nextLong())
        .mkString("")
      Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian())
    })
    val table = table2RowDataStream(tableEnv.fromDataStream(data))
    sink.emitDataStream(table.javaStream)
    // execute program
    env.execute("Flink Streaming Scala API Skeleton")
  }
  private def printUsageAndExit(exitCode: Int = 0): Unit = {
    println("Usage: flink run com.company.packageName.StreamingJob /path/to/flink-clickhouse-demo-1.0.0.jar [options]")
    println("  --dbName      设置ClickHouse数据库的名称,默认为default")
    println("  --tableName   设置ClickHouse库中表的名称")
    println("  --ckHost      设置ClickHouse地址")
    println("  --ckPort      设置ClickHouse端口,默认为8123")
    println("  --user        设置ClickHouse所使用的用户名")
    println("  --password    设置ClickHouse用户的密码,默认为空")
    System.exit(exitCode)
  }
  @tailrec
  private def parse(args: List[String]): Unit = args match {
    case ("--help" | "-h") :: _ =>
      printUsageAndExit()
    case "--dbName" :: value :: tail =>
      dbName = value
      parse(tail)
    case "--tableName" :: value :: tail =>
      tableName = value
      parse(tail)
    case "--ckHost" :: value :: tail =>
      ckHost = value
      parse(tail)
    case "--ckPort" :: value :: tail =>
      ckPort = value
      parse(tail)
    case "--user" :: value :: tail =>
      user = value
      parse(tail)
    case "--password" :: value :: tail =>
      password = value
      parse(tail)
    case Nil =>
    case _ =>
      printUsageAndExit(1)
  }
  private def checkArguments(): Unit = {
    if ("".equals(tableName) || "".equals(ckHost)) {
      printUsageAndExit(2)
    }
  }
}
  • 批处理
package com.company.packageName
import java.util.concurrent.ThreadLocalRandom
import scala.annotation.tailrec
import org.apache.flink.Utils
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.{BatchTableEnvironment, table2RowDataSet}
object BatchJob {
  case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double)
  private var dbName: String = "default"
  private var tableName: String = ""
  private var ckHost: String = ""
  private var ckPort: String = "8123"
  private var user: String = "default"
  private var password: String = ""
  def main(args: Array[String]) {
    parse(args.toList)
    checkArguments()
    // set up the batch execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = BatchTableEnvironment.create(env)
    val insertIntoCkSql =
      s"""
        | INSERT INTO $tableName (
        |   id, key1, value1, key2, value2
        | ) VALUES (
        |   ?, ?, ?, ?, ?
        | )
        |""".stripMargin
    val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName"
    println(s"jdbc url: $jdbcUrl")
    println(s"insert sql: $insertIntoCkSql")
    val sink = JDBCAppendTableSink
      .builder()
      .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
      .setDBUrl(jdbcUrl)
      .setUsername(user)
      .setPassword(password)
      .setQuery(insertIntoCkSql)
      .setBatchSize(1000)
      .setParameterTypes(Types.INT, Types.STRING, Types.BOOLEAN, Types.LONG, Types.DOUBLE)
      .build()
    val data = env.fromCollection(1 to 1000).map(i => {
      val rand = ThreadLocalRandom.current()
      val randString = (0 until rand.nextInt(10, 20))
        .map(_ => rand.nextLong())
        .mkString("")
      Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian())
    })
    val table = table2RowDataSet(tableEnv.fromDataSet(data))
    sink.emitDataSet(Utils.convertScalaDatasetToJavaDataset(table))
    // execute program
    env.execute("Flink Batch Scala API Skeleton")
  }
  private def printUsageAndExit(exitCode: Int = 0): Unit = {
    println("Usage: flink run com.company.packageName.StreamingJob /path/to/flink-clickhouse-demo-1.0.0.jar [options]")
    println("  --dbName      设置ClickHouse数据库的名称,默认为default")
    println("  --tableName   设置ClickHouse库中表的名称")
    println("  --ckHost      设置ClickHouse地址")
    println("  --ckPort      设置ClickHouse端口,默认为8123")
    println("  --user        设置ClickHouse所使用的用户名")
    println("  --password    设置ClickHouse用户的密码,默认为空")
    System.exit(exitCode)
  }
  @tailrec
  private def parse(args: List[String]): Unit = args match {
    case ("--help" | "-h") :: _ =>
      printUsageAndExit()
    case "--dbName" :: value :: tail =>
      dbName = value
      parse(tail)
    case "--tableName" :: value :: tail =>
      tableName = value
      parse(tail)
    case "--ckHost" :: value :: tail =>
      ckHost = value
      parse(tail)
    case "--ckPort" :: value :: tail =>
      ckPort = value
      parse(tail)
    case "--user" :: value :: tail =>
      user = value
      parse(tail)
    case "--password" :: value :: tail =>
      password = value
      parse(tail)
    case Nil =>
    case _ =>
      printUsageAndExit(1)
  }
  private def checkArguments(): Unit = {
    if ("".equals(tableName) || "".equals(ckHost)) {
      printUsageAndExit(2)
    }
  }
}

操作流程

步骤一:创建 ClickHouse 表

  1. 使用 SSH 方式登录 ClickHouse 集群,详情请参见使用SSH连接主节点
  2. 执行如下命令,进入 ClickHouse 客户端。
clickhouse-client
  1. 创建 ClickHouse 信息。

i. 执行如下命令,创建数据库 clickhouse_database_name

CREATE DATABASE clickhouse_database_name ON CLUSTER cluster_emr;

阿里云 EMR 会为 ClickHouse 集群自动生成一个名为 cluster_emr 的集群。数据库名您可以自定义。


ii. 执行如下命令,创建表 clickhouse_table_name_local

CREATE TABLE clickhouse_database_name.clickhouse_table_name_local ON CLUSTER cluster_emr (
  id            UInt32,
  key1            String,
  value1        UInt8,
  key2            Int64,
  value2        Float64
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/clickhouse_database_name/clickhouse_table_name_local', '{replica}')
ORDER BY id


说明:表名您可以自定义,但请确保表名是以_local结尾。layer、shard 和 replica 是阿里云 EMR 为 ClickHouse 集群自动生成的宏定义,可以直接使用。


 iii. 执行如下命令,创建与表 clickhouse_table_name_local 字段定义一致的表clickhouse_table_name_all

说明: 表名您可以自定义,但请确保表名是以_all结尾。

CREATE TABLE clickhouse_database_name.clickhouse_table_name_all ON CLUSTER cluster_emr (
  id                    UInt32,
  key1                  String,
  value1                UInt8,
  key2                  Int64,
  value2                Float64
) ENGINE = Distributed(cluster_emr, clickhouse_database_name, clickhouse_table_name_local, rand());

步骤二:编译并打包

  1. 下载并解压flink-clickhouse-demo.tgz示例到本地。
  2. 在 CMD 命令行中,进入到下载文件中 pom.xml 所在的目录下,执行如下命令打包文件。
mvn clean package

根据您 pom.xml 文件中 artifactId 的信息,下载文件中的 target 目录下会出现 flink-clickhouse-demo-1.0.0.jar 的JAR包。


步骤三:提交作业

  1. 使用 SSH 方式登录 Flink 集群,详情请参见使用SSH连接主节点
  2. 执行如下命令提交作业。代码示例如下:
  • 流作业
flink run -m yarn-cluster \
          -c com.aliyun.emr.StreamingJob \
          flink-clickhouse-demo-1.0.0.jar \
          --dbName clickhouse_database_name \
          --tableName clickhouse_table_name_all \
          --ckHost ${clickhouse_host}
  • 批作业
flink run -m yarn-cluster \
          -c com.aliyun.emr.BatchJob \
          flink-clickhouse-demo-1.0.0.jar \
          --dbName clickhouse_database_name \
          --tableName clickhouse_table_name_all \
          --ckHost ${clickhouse_host}
参数 说明
dbName ClickHouse集群数据库的名称,默认为default。本文示例为clickhouse_database_name
tableName ClickHouse集群数据库中表的名称。本文示例为clickhouse_table_name_all
ckHost ClickHouse集群的Master节点的内网IP地址或公网IP地址。ip地址获取方式,请参见获取主节点的IP地址

获取主节点的 IP 地址

  1. 进入详情页面。
  1. 登录阿里云E-MapReduce控制台
  2. 在顶部菜单栏处,根据实际情况选择地域和资源组
  3. 单击上方的集群管理页签。
  4. 集群管理页面,单击相应集群所在行的详情
  1. 集群基础信息页面的主机信息区域,获取主节点的内网或公网IP地址。

image.png


三、从 HDFS 导入数据至 ClickHouse

——可以通过 HDFS 表引擎或表函数读写数据

前提条件


使用 HDFS 表引擎读写数据

语法

CREATE TABLE [IF NOT EXISTS] [db.]table_name
(
  name1 [type1],
  name2 [type2],
  ...
)
Engine = HDFS(uri, format)

其中,涉及参数描述如下表所示。

参数 描述
db 数据库名。
table_name 表名。
name1/name2 列名。
tyep1/type2 列的类型。
uri HDFS上文件的地址。


说明

  • 不可以是目录地址。
  • 文件所属的目录需要存在,如果不存在,则写数据时会报错。
format 文件的类型。

示例

  1. 创建业务表和 HDFS 表

i. 使用 SSH 方式登录 ClickHouse 集群,详情请参见登录集群

ii. 执行如下命令,进入 ClickHouse 客户端。

clickhouse-client

iii. 执行以下命令,创建数据库 hdfs。

CREATE DATABASE IF NOT EXISTS hdfs ON CLUSTER cluster_emr;

iv. 执行以下命令,创建表 orders。

CREATE TABLE IF NOT EXISTS hdfs.orders ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
ENGINE = HDFS('hdfs://192.168.**.**:9000/orders.csv', 'CSV');


说明 本文示例是将示例数据上传到了HDFS集群的根目录下。代码中的192.168.**.**为HDFS集群的emr-header-1节点的内网IP地址,您可以在EMR控制台集群管理页签中的主机列表页面查看。

v.  执行以下命令,创建数据库 product。

CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;

vi. 执行以下命令,创建业务表 orders。

CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);

vii. 执行以下命令,创建业务表 orders_all。

CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = Distributed(cluster_emr, product, orders, rand());


  1. 使用 HDFS 表引擎导入数据

i. 下载并上传示例数据orders.csv至 HDFS 集群的目录下。
说明 本文示例上传到了 HDFS 集群的根目录下。

ii. 执行以下命令,导入数据。

INSERT INTO product.orders_all
SELECT
  uid,
  date,
  skuId,
  order_revenue
FROM
  hdfs.orders;

iii. 执行以下命令,检查数据一致性。

SELECT
  a.*
FROM
  hdfs.orders a
LEFT ANTI JOIN
  product.orders_all
USING uid;
  1. 使用 HDFS 表引擎导出数据

i. 执行以下命令,构造数据。

INSERT INTO product.orders_all VALUES \
  (60333391,'2021-08-04 11:26:01',49358700,89) \
  (38826285,'2021-08-03 10:47:29',25166907,27) \
  (10793515,'2021-07-31 02:10:31',95584454,68) \
  (70246093,'2021-08-01 00:00:08',82355887,97) \
  (70149691,'2021-08-02 12:35:45',68748652,1)  \
  (87307646,'2021-08-03 19:45:23',16898681,71) \
  (61694574,'2021-08-04 23:23:32',79494853,35) \
  (61337789,'2021-08-02 07:10:42',23792355,55) \
  (66879038,'2021-08-01 16:13:19',95820038,89);

ii. 执行以下命令,导出数据。

INSERT INTO hdfs.orders
SELECT
  uid,
  date,
  skuId,
  order_revenue
FROM
 product.orders_all;

iii. 执行以下命令,可以检查数据一致性。

SELECT
  a.*
FROM
  hdfs.orders
RIGHT ANTI JOIN
  product.orders_all a
USING uid;


使用 HDFS 表函数读写数据

语法

hdfs(uri, format, structure);

其中,涉及参数描述如下表所示。

参数 描述
uri HDFS上文件的地址。

说明

  • 不可以是目录地址。
  • 文件所属的目录需要存在,如果不存在,则写数据时会报错。
format 文件的类型。
structure 表中字段的类型。例如,column1 UInt32,column2 String。

示例

  1. 创建数据库和业务表

i. 使用 SSH 方式登录 ClickHouse 集群,详情请参见登录集群

ii. 执行如下命令,进入 ClickHouse 客户端。

clickhouse-client

iii. 执行以下命令,创建数据库 product。

CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;

iv. 执行以下命令,创建业务表 orders。

CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);

v. 执行以下命令,创建业务表 orders_all。

CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = Distributed(cluster_emr, product, orders, rand());
  1. 使用 HDFS 表引擎导入数据

i. 下载并上传示例数据orders.csv至HDFS集群的目录下。

说明 本文示例上传到了 HDFS 集群的根目录下。

ii. 执行以下命令,导入数据。

INSERT INTO product.orders_all
SELECT
  uid,
  date,
  skuId,
  order_revenue
FROM
  hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32');

iii. 执行以下命令,可以检查数据一致性。

SELECT
  a.*
FROM
  hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32') a
LEFT ANTI JOIN
  product.orders_all
USING uid;
  1. 使用 HDFS 表引擎导出数据

i. 执行以下命令,构造数据。

INSERT INTO product.orders_all VALUES \
  (60333391,'2021-08-04 11:26:01',49358700,89) \
  (38826285,'2021-08-03 10:47:29',25166907,27) \
  (10793515,'2021-07-31 02:10:31',95584454,68) \
  (70246093,'2021-08-01 00:00:08',82355887,97) \
  (70149691,'2021-08-02 12:35:45',68748652,1)  \
  (87307646,'2021-08-03 19:45:23',16898681,71) \
  (61694574,'2021-08-04 23:23:32',79494853,35) \
  (61337789,'2021-08-02 07:10:42',23792355,55) \
  (66879038,'2021-08-01 16:13:19',95820038,89);

ii. 执行以下命令,导出数据。

INSERT INTO FUNCTION
  hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32')
SELECT
  uid,
  date,
  skuId,
  order_revenue
FROM
  product.orders_all;

iii. 执行以下命令,可以检查数据一致性。

SELECT
  a.*
FROM
  hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32')
RIGHT ANTI JOIN
  product.orders_all a

配置

EMR ClickHouse 允许使用对 HDFS 进行配置:

  • 全局生效的 HDFS 配置。
<hdfs>
  <dfs_default_replica>3</dfs_default_replica>
</hdfs>


说明 查询参数时将下划线(_)替换为半角句号(.)即可。例如,您要查询EMR中的参数dfs_default_replica,则可以在官网文档中搜索dfs.default.replica


  • 仅对${user}用户生效的 HDFS 配置,用户配置与全局配置相同的键不同值时,会覆盖全局配置
<hdfs_${user}>
  <dfs_default_replica>3</dfs_default_replica>
</hdfs_${user}>

四、从 RDS 导入数据至 ClickHouse 集群

——可以通过 RDS MySQL 表引擎或表函数导入数据至 ClickHouse 集群

前提条件


使用 RDS MySQL 表引擎导入数据

语法

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
    ...
) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);

其中,涉及参数描述如下表所示。

参数 描述
db 数据库名。
table_name 表名。
cluster 集群标识。
name1/name2 列名。
tyep1/type2 列的类型。
host:port RDS MySQL的地址,可以在RDS MySQL管理控制台中数据库连接中进行查看。
database RDS MySQL中的数据库名。
table RDS MySQL中的表名。
user 用户名,该用户具有访问上述RDS MySQL中库中的表的权限。
password user对应的密码。
replace_query 是否将INSERT INTO查询转换为REPLACE INTO的标志。设置为1,表示替换查询。
on_duplicate_clause 会被添加到INSERT语句中。例如,INSERT INTO t (c1,c2) VALUES ('a', 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1,此时需要指定on_duplicate_clauseUPDATE c2 = c2 + 1

示例

  1. 在 RDS MySQL 实例中,创建原始数据表并导入原始数据。

i. 连接 MySQL 实例,详情请参见通过客户端、命令行连接RDS MySQL

ii. 执行以下命令,创建原始数据表。

CREATE TABLE `origin`.`orders` (
  `uid` int(10) unsigned DEFAULT NULL,
  `date` datetime DEFAULT NULL,
  `skuId` int(10) unsigned DEFAULT NULL,
  `order_revenue` int(10) unsigned DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8

iii. 执行以下命令,导入原始数据。

INSERT INTO `origin`.`orders` VALUES(60333391, '2021-08-04 11:26:01', 49358700, 89),
       (38826285, '2021-08-03 10:47:29', 25166907, 27),
       (10793515, '2021-07-31 02:10:31', 95584454, 68),
       (70246093, '2021-08-01 00:00:08', 82355887, 97),
       (70149691, '2021-08-02 12:35:45', 68748652, 1),
       (87307646, '2021-08-03 19:45:23', 16898681, 71),
       (61694574, '2021-08-04 23:23:32', 79494853, 35),
       (61337789, '2021-08-02 07:10:42', 23792355, 55),
       (66879038, '2021-08-01 16:13:19', 95820038, 89);
  1. 在 ClickHouse 集群中,执行以下操作。

i. 使用 SSH 方式登录 ClickHouse 集群,详情请参见登录集群

ii. 执行以下命令,进入 ClickHouse 客户端。

clickhouse-client

iii. 执行以下命令,创建数据库 mysql。

CREATE DATABASE IF NOT EXISTS mysql;

iv. 执行以下命令,创建表 orders。

CREATE TABLE mysql.orders
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
ENGINE = MySQL('host:port', 'origin', 'orders', 'user', 'password');

v. 执行以下命令,创建数据库 product。

CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;

vi. 执行以下命令,创建业务表 orders。

CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);

vii. 执行以下命令,创建业务表orders_all。

CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = Distributed(cluster_emr, product, orders, rand());

viii. 执行以下命令,导入数据。

INSERT INTO product.orders_all
SELECT
  uid,
  date,
  skuId,
  order_revenue
FROM
  mysql.orders;

ix. 执行以下命令,查询数据。

SELECT a.*
FROM mysql.orders AS a
ANTI LEFT JOIN product.orders_all USING (uid);


说明 查询数据为空时正常。


使用 RDS MySQL 表函数导入数据

语法

mysql('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause'])

其中,涉及参数描述如下表所示。

参数 描述
host:port RDS MySQL的地址,您可以在RDS MySQL管理控制台中的数据库连接中查看。
database RDS MySQL中的数据库名。
table RDS MySQL中的表名。
user 用户名,该用户具有访问上述RDS MySQL中库中的表的权限。
password user对应的密码。
replace_query 是否将INSERT INTO查询转换为REPLACE INTO的标志。设置为1,表示替换查询。
on_duplicate_clause 会被添加到INSERT语句中。例如,INSERT INTO t (c1,c2) VALUES ('a', 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1,此时需要指定on_duplicate_clauseUPDATE c2 = c2 + 1

示例

  1. 在 RDS MySQL 实例中,创建表并插入数据。

i. 连接 MySQL 实例,详情请参见通过客户端、命令行连接RDS MySQL

ii. 执行以下命令,创建表 orders。

CREATE TABLE `origin`.`orders` (
  `uid` int(10) unsigned DEFAULT NULL,
  `date` datetime DEFAULT NULL,
  `skuId` int(10) unsigned DEFAULT NULL,
  `order_revenue` int(10) unsigned DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8

iii. 执行以下命令,插入数据。

INSERT INTO `origin`.`orders` VALUES(60333391, '2021-08-04 11:26:01', 49358700, 89),
       (38826285, '2021-08-03 10:47:29', 25166907, 27),
       (10793515, '2021-07-31 02:10:31', 95584454, 68),
       (70246093, '2021-08-01 00:00:08', 82355887, 97),
       (70149691, '2021-08-02 12:35:45', 68748652, 1),
       (87307646, '2021-08-03 19:45:23', 16898681, 71),
       (61694574, '2021-08-04 23:23:32', 79494853, 35),
       (61337789, '2021-08-02 07:10:42', 23792355, 55),
       (66879038, '2021-08-01 16:13:19', 95820038, 89) ;
  1. 在 ClickHouse 集群中,执行以下操作。

i. 使用 SSH 方式登录 ClickHouse 集群,详情请参见登录集群

ii. 执行如下命令,进入 ClickHouse 客户端。

clickhouse-client

iii. 执行以下命令,创建数据库 product。

CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;

iv.执行以下命令,创建表 orders。

CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);

v. 执行以下命令,创建表 orders_all。

CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = Distributed(cluster_emr, product, orders, rand());

vi. 执行以下命令,导入数据。

INSERT INTO product.orders_all
SELECT
  uid,
  date,
  skuId,
  order_revenue
FROM
  mysql('host:port', 'origin', 'orders', 'user', 'password');

vii. 执行以下命令,查询数据。

SELECT a.*
FROM
  mysql('host:port', 'origin', 'orders', 'user', 'password') AS a
ANTI LEFT JOIN product.orders_all USING (uid);


说明 查询数据为空时正常。

如果您需要导出数据,则将业务表数据写入 MySQL 表函数即可。写入命令如下。

INSERT INTO FUNCTION
  mysql('host:port', 'origin', 'orders', 'user', 'password')
FROM
  product.orders_all;

五、从 Kafka 导入数据至 ClickHouse

——可以通过 Kafka 表引擎导入数据至 ClickHouse 集群

前提条件

使用限制

Kafka集群和ClickHouse集群需要在同一VPC下。

语法

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host1:port1,host2:port2',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format';

其中,涉及参数描述如下表所示。

参数 描述
db 数据库名。
table_name 表名。
cluster 集群标识。
name1/name2 列名。
tyep1/type2 列的类型。
kafka_broker_list Kafka Broker的地址及端口。

Kafka集群所有节点的内网IP地址及端口,您可以在EMR控制台集群管理页签中的主机列表页面查看。

kafka_topic_list 订阅的Topic名称。
kafka_group_name Kafka consumer的分组名称。
kafka_format 数据的类型。例如,CSV和JSONEachRow等,详细信息请参见Formats for Input and Output Data

示例

  1. 在 ClickHouse 集群中执行以下操作。

i. 使用SSH方式登录 ClickHouse 集群,详情请参见登录集群

ii. 执行如下命令,进入 ClickHouse 客户端。

clickhouse-client

iii. 执行如下命令,创建数据库 kafka。

CREATE DATABASE IF NOT EXISTS kafka ON CLUSTER cluster_emr;


说明 数据库名您可以自定义。本文示例中的cluster_emr是集群默认的标识,如果您修改过,请填写正确的集群标识,您也可以在EMR控制台ClickHouse服务的配置页面,在搜索区域搜索clickhouse_remote_servers参数查看。

iv. 执行如下命令,创建 Kafka 表。

CREATE TABLE IF NOT EXISTS kafka.consumer ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
ENGINE = Kafka()
SETTINGS
  kafka_broker_list = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092',
  kafka_topic_list = 'clickhouse_test',
  kafka_group_name = 'clickhouse_test',
  kafka_format = 'CSV';

kafka_broker_list为 Kafka 集群所有节点的内网 IP 地址及端口,您可以在 EMR 控制台集群管理页签中的主机列表页面查看。其余参数含义请参见语法。

image.png

v. 执行如下命令,创建数据库 product。

CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;

vi. 执行以下命令,创建本地表。

CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);

vii. 执行以下命令,创建分布式表。

CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
(
    `uid` UInt32,
    `date` DateTime,
    `skuId` UInt32,
    `order_revenue` UInt32
)
Engine = Distributed(cluster_emr, product, orders, rand());

viii. 执行以下命令,创建 MATERIALIZED VIEW 自动导数据。

CREATE MATERIALIZED VIEW IF NOT EXISTS product.kafka_load ON CLUSTER cluster_emr TO product.orders AS
SELECT *
FROM kafka.consumer;
  1. 在 Kafka 集群中执行以下操作。

i. 使用 SSH 方式登录 Kafka 集群,详情请参见登录集群

ii. 在 Kafka 集群的命令行窗口,执行如下命令运行 Kafka 的生产者。

/usr/lib/kafka-current/bin/kafka-console-producer.sh --broker-list 192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092 --topic clickhouse_test

iii. 执行以下命令,输入测试数据。

38826285,2021-08-03 10:47:29,25166907,27
10793515,2021-07-31 02:10:31,95584454,68
70246093,2021-08-01 00:00:08,82355887,97
70149691,2021-08-02 12:35:45,68748652,1
87307646,2021-08-03 19:45:23,16898681,71
61694574,2021-08-04 23:23:32,79494853,35
61337789,2021-08-02 07:10:42,23792355,55
66879038,2021-08-01 16:13:19,95820038,89

image.png

  1. 在 ClickHouse 命令窗口中,执行以下命令,可以查看从 Kafka 中导入至 ClickHouse 集群的数据。

您可以校验查询到的数据与源数据是否一致。

SELECT * FROM product.orders_all;

image.png


后续

您已经学习了数据导入,本系列还包括其他内容:




获取更详细的信息,点击下方链接查看:

EMR官网:https://www.aliyun.com/product/emapreduce

EMR ClickHouse :https://help.aliyun.com/document_detail/212195.html


扫描下方二维码加入 EMR 相关产品钉钉交流群一起参与讨论吧!

lALPD26eQMAeAf_NAd7NAvs_763_478.png

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
1月前
|
存储 分布式计算 数据库
阿里云国际版设置数据库云分析工作负载的 ClickHouse 版
阿里云国际版设置数据库云分析工作负载的 ClickHouse 版
|
4月前
|
存储 大数据 关系型数据库
从 ClickHouse 到阿里云数据库 SelectDB 内核 Apache Doris:快成物流的数智化货运应用实践
目前已经部署在 2 套生产集群,存储数据总量达百亿规模,覆盖实时数仓、BI 多维分析、用户画像、货运轨迹信息系统等业务场景。
|
5月前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用问题之如何对ClickHouse进行操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
存储 Cloud Native 大数据
国内独家|阿里云瑶池发布ClickHouse企业版:云原生Serverless新体验
全面升级为云原生架构,支持云原生按需弹性Serverless能力,解决了长期困扰用户的集群扩展效率和平滑性问题。
国内独家|阿里云瑶池发布ClickHouse企业版:云原生Serverless新体验
|
6月前
|
存储 容灾 Cloud Native
阿里云ClickHouse企业版正式商业化,为开发者提供容灾性更好、性价比更高的实时数仓
2024年4月23日,阿里云联合 ClickHouse Inc. 成功举办了企业版商业化发布会。阿里云 ClickHouse 企业版是阿里云和 ClickHouse 原厂 ClickHouse. Inc 独家合作的存算分离的云原生版本,支持资源按需弹性 Serverless,帮助企业降低成本的同时,为企业带来更多商业价值。
523 1
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之sql读取mysql写入clickhouse,该如何操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
云安全 运维 安全
阿里云国际站ATT&CK 多产品安全实践
本文根据MITRE ATT&CK的Cloud Matrix攻防知识图谱的·解读,介绍如何在阿里云国际站上通过多产品的组合实践,加强您的云安全防护能力,更好地达到安全运营的效果。
276 1
阿里云国际站ATT&CK 多产品安全实践
|
6月前
|
存储 缓存 运维
阿里云数据库 ClickHouse 云原生版产品解析
ClickHouse 介绍ClickHouse 是一款当前非常流行的开源在线分析型数据库。ClickHouse 主要应用于实时数仓构建、大数据加速分析、宽表日志分析等通用场景,服务于流量漏斗分析,用户行为分析,人群圈选,用户画像,广告投放人群评估、ABTest 、大促分析,CDP/DMP 等业务场景...
184 0
|
5月前
|
存储 关系型数据库 数据库
【DDIA笔记】【ch2】 数据模型和查询语言 -- 多对一和多对多
【6月更文挑战第7天】该文探讨数据模型,比较了“多对一”和“多对多”关系。通过使用ID而不是纯文本(如region_id代替&quot;Greater Seattle Area&quot;),可以实现统一、避免歧义、简化修改、支持本地化及优化搜索。在数据库设计中,需权衡冗余和范式。文档型数据库适合一对多但处理多对多复杂,若无Join,需应用程序处理。关系型数据库则通过外键和JOIN处理这些关系。文章还提及文档模型与70年代层次模型的相似性,层次模型以树形结构限制了多对多关系处理。为克服层次模型局限,发展出了关系模型和网状模型。
59 6
|
5月前
|
XML NoSQL 数据库
【DDIA笔记】【ch2】 数据模型和查询语言 -- 概念 + 数据模型
【6月更文挑战第5天】本文探讨了数据模型的分析,关注点包括数据元素、关系及不同类型的模型(关系、文档、图)与Schema模式。查询语言的考量涉及与数据模型的关联及声明式与命令式编程。数据模型从应用开发者到硬件工程师的各抽象层次中起着简化复杂性的关键作用,理想模型应具备简洁直观和可组合性。
39 2

相关产品

  • 开源大数据平台 E-MapReduce