阿里云 E-MapReduce(简称EMR)是运行在阿里云平台上的一种大数据处理的系统解决方案。ClickHouse 作为开源的列式存储数据库,主要用于在线分析处理查询(OLAP),能够使用 SQL 查询实时生成分析数据报告。而阿里云 EMR ClickHouse 则提供了开源 OLAP 分析引擎 ClickHouse 的云上托管服务。
本系列文章将从以下几个方面详细介绍 EMR ClickHouse 的操作指南:
EMR ClickHouse 操作指南 —数据导入

一、从 Spark 导入数据至 ClickHouse
——介绍如何将 Spark 中的数据导入至 ClickHouse 集群
前提条件
- 本地安装了 Java JDK 8。
- 本地安装了 Maven 3.x。
- 本地安装了用于 Java 或 Scala 开发的 IDE,推荐 IntelliJ IDEA,且已配置完成 JDK 和 Maven 环境。
- 已创建 Spark 集群,详情请参见创建集群。
- 已创建 ClickHouse 集群,详情请参见创建集群。
背景信息
关于 Spark 的更多介绍,请参见简介。
代码示例
代码示例如下。
package com.company.packageName
import java.util.Properties
import java.util.concurrent.ThreadLocalRandom
import scala.annotation.tailrec
import com.google.common.collect.ImmutableMap
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SaveMode, SparkSession}
case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double)
object CKDataImporter extends Logging {
private var dbName: String = "default"
private var tableName: String = ""
private var ckHost: String = ""
private var ckPort: String = "8123"
private var user: String = "default"
private var password: String = ""
private var local: Boolean = false
def main(args: Array[String]): Unit = {
parse(args.toList)
checkArguments()
val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName"
logInfo(s"Use jdbc: $jdbcUrl")
logInfo(s"Use table: $tableName")
val spark = getSparkSession
// generate test data
val rdd = spark.sparkContext.parallelize(1 to 1000).map(i => {
val rand = ThreadLocalRandom.current()
val randString = (0 until rand.nextInt(10, 20))
.map(_ => rand.nextLong())
.mkString("")
Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian())
})
val df = spark.createDataFrame(rdd)
df.write
.mode(SaveMode.Append)
.jdbc(jdbcUrl, tableName, getCKJdbcProperties(user, password))
}
private def printUsageAndExit(exitCode: Int = 0): Unit = {
logError("Usage: java -jar /path/to/CKDataImporter.jar [options]")
logError(" --dbName 设置ClickHouse数据库的名称,默认为default")
logError(" --tableName 设置ClickHouse库中表的名称")
logError(" --ckHost 设置ClickHouse地址")
logError(" --ckPort 设置ClickHouse端口,默认为8123")
logError(" --user 设置ClickHouse所使用的用户名")
logError(" --password 设置ClickHouse用户的密码,默认为空")
logError(" --local 设置此程序使用Spark Local模式运行")
System.exit(exitCode)
}
@tailrec
private def parse(args: List[String]): Unit = args match {
case ("--help" | "-h") :: _ =>
printUsageAndExit()
case "--dbName" :: value :: tail =>
dbName = value
parse(tail)
case "--tableName" :: value :: tail =>
tableName = value
parse(tail)
case "--ckHost" :: value :: tail =>
ckHost = value
parse(tail)
case "--ckPort" :: value :: tail =>
ckPort = value
parse(tail)
case "--user" :: value :: tail =>
user = value
parse(tail)
case "--password" :: value :: tail =>
password = value
parse(tail)
case "--local" :: tail =>
local = true
parse(tail)
case Nil =>
case _ =>
printUsageAndExit(1)
}
private def checkArguments(): Unit = {
if ("".equals(tableName) || "".equals(ckHost)) {
printUsageAndExit(2)
}
}
private def getCKJdbcProperties(
user: String,
password: String,
batchSize: String = "1000",
socketTimeout: String = "300000",
numPartitions: String = "8",
rewriteBatchedStatements: String = "true"): Properties = {
val kvMap = ImmutableMap.builder()
.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
.put("user", user)
.put("password", password)
.put("batchsize", batchSize)
.put("socket_timeout", socketTimeout)
.put("numPartitions", numPartitions)
.put("rewriteBatchedStatements", rewriteBatchedStatements)
.build()
val properties = new Properties
properties.putAll(kvMap)
properties
}
private def getSparkSession: SparkSession = {
val builder = SparkSession.builder()
if (local) {
builder.master("local[*]")
}
builder.appName("ClickHouse-Data-Importer")
builder.getOrCreate()
}
}
操作流程
步骤一:创建 ClickHouse 表
- 使用 SSH 方式登录 ClickHouse 集群,详情请参见使用SSH连接主节点。
- 执行如下命令,进入 ClickHouse 客户端。
- 创建 ClickHouse 信息。
i. 执行如下命令,创建数据库 clickhouse_database_name。
CREATE DATABASE clickhouse_database_name ON CLUSTER cluster_emr;
阿里云 EMR 会为 ClickHouse 集群自动生成一个名为 cluster_emr 的集群。数据库名您可以自定义。
ii. 执行如下命令,创建表 clickhouse_table_name_local。
CREATE TABLE clickhouse_database_name.clickhouse_table_name_local ON CLUSTER cluster_emr (
id UInt32,
key1 String,
value1 UInt8,
key2 Int64,
value2 Float64
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/clickhouse_database_name/clickhouse_table_name_local', '{replica}')
ORDER BY id
说明:表名您可以自定义,但请确保表名是以 _local 结尾。layer、shard 和 replica 是阿里云 EMR 为 ClickHouse 集群自动生成的宏定义,可以直接使用。
iii. 执行如下命令,创建与表 clickhouse_table_name_local 字段定义一致的表 clickhouse_table_name_all。
说明: 表名您可以自定义,但请确保表名是以 _all 结尾。
CREATE TABLE clickhouse_database_name.clickhouse_table_name_all ON CLUSTER cluster_emr (
id UInt32,
key1 String,
value1 UInt8,
key2 Int64,
value2 Float64
) ENGINE = Distributed(cluster_emr, clickhouse_database_name, clickhouse_table_name_local, rand());
步骤二:编译并打包
- 下载并解压CKDataImporter示例到本地。
- 在 CMD 命令行中,进入到下载文件中 pom.xml 所在的目录下,执行如下命令打包文件。
根据您pom.xml文件中artifactId的信息,下载文件中的target目录下会出现CKDataImporter-1.0.0.jar的JAR包。
步骤三:提交作业
- 使用 SSH 方式登录 Spark 集群,详情请参见使用SSH连接主节点。
- 执行如下命令提交作业。
spark-submit --master yarn \
--class com.aliyun.emr.CKDataImporter \
/CKDataImporter-1.0.0.jar \
--dbName clickhouse_database_name \
--tableName clickhouse_table_name_all \
--ckHost ${clickhouse_host}
参数 | 说明 |
dbName | ClickHouse集群数据库的名称,默认为default。本文示例为clickhouse_database_name。 |
tableName | ClickHouse集群数据库中表的名称。本文示例为clickhouse_table_name_all。 |
ckHost | ClickHouse集群的Master节点的内网IP地址或公网IP地址。ip地址获取方式,请参见获取主节点的IP地址。 |
获取主节点的 IP 地址
- 进入详情页面。
- 登录阿里云E-MapReduce控制台。
- 在顶部菜单栏处,根据实际情况选择地域和资源组。
- 单击上方的集群管理页签。
- 在集群管理页面,单击相应集群所在行的详情。
- 在集群基础信息页面的主机信息区域,获取主节点的内网或公网IP地址。

二、从 Flink 导入数据至 ClickHouse
——介绍如何将 Flink 中的数据导入至 ClickHouse 集群
前提条件
- 本地安装了 Java JDK 8。
- 本地安装了 Maven 3.x。
- 本地安装了用于 Java 或 Scala 开发的 IDE,推荐 IntelliJ IDEA,且已配置完成 JDK 和 Maven 环境。
- 已创建 Flink 集群,详情请参见创建集群。
- 已创建 ClickHouse 集群,详情请参见创建集群。
背景信息
关于 Flink 的更多介绍,请参见Apache Flink。
代码示例
代码示例如下。
package com.company.packageName
import java.util.concurrent.ThreadLocalRandom
import scala.annotation.tailrec
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala.{StreamTableEnvironment, table2RowDataStream}
object StreamingJob {
case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double)
private var dbName: String = "default"
private var tableName: String = ""
private var ckHost: String = ""
private var ckPort: String = "8123"
private var user: String = "default"
private var password: String = ""
def main(args: Array[String]) {
parse(args.toList)
checkArguments()
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
val insertIntoCkSql =
s"""
| INSERT INTO $tableName (
| id, key1, value1, key2, value2
| ) VALUES (
| ?, ?, ?, ?, ?
| )
|""".stripMargin
val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName"
println(s"jdbc url: $jdbcUrl")
println(s"insert sql: $insertIntoCkSql")
val sink = JDBCAppendTableSink
.builder()
.setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
.setDBUrl(jdbcUrl)
.setUsername(user)
.setPassword(password)
.setQuery(insertIntoCkSql)
.setBatchSize(1000)
.setParameterTypes(Types.INT, Types.STRING, Types.BOOLEAN, Types.LONG, Types.DOUBLE)
.build()
val data: DataStream[Test] = env.fromCollection(1 to 1000).map(i => {
val rand = ThreadLocalRandom.current()
val randString = (0 until rand.nextInt(10, 20))
.map(_ => rand.nextLong())
.mkString("")
Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian())
})
val table = table2RowDataStream(tableEnv.fromDataStream(data))
sink.emitDataStream(table.javaStream)
// execute program
env.execute("Flink Streaming Scala API Skeleton")
}
private def printUsageAndExit(exitCode: Int = 0): Unit = {
println("Usage: flink run com.company.packageName.StreamingJob /path/to/flink-clickhouse-demo-1.0.0.jar [options]")
println(" --dbName 设置ClickHouse数据库的名称,默认为default")
println(" --tableName 设置ClickHouse库中表的名称")
println(" --ckHost 设置ClickHouse地址")
println(" --ckPort 设置ClickHouse端口,默认为8123")
println(" --user 设置ClickHouse所使用的用户名")
println(" --password 设置ClickHouse用户的密码,默认为空")
System.exit(exitCode)
}
@tailrec
private def parse(args: List[String]): Unit = args match {
case ("--help" | "-h") :: _ =>
printUsageAndExit()
case "--dbName" :: value :: tail =>
dbName = value
parse(tail)
case "--tableName" :: value :: tail =>
tableName = value
parse(tail)
case "--ckHost" :: value :: tail =>
ckHost = value
parse(tail)
case "--ckPort" :: value :: tail =>
ckPort = value
parse(tail)
case "--user" :: value :: tail =>
user = value
parse(tail)
case "--password" :: value :: tail =>
password = value
parse(tail)
case Nil =>
case _ =>
printUsageAndExit(1)
}
private def checkArguments(): Unit = {
if ("".equals(tableName) || "".equals(ckHost)) {
printUsageAndExit(2)
}
}
}
package com.company.packageName
import java.util.concurrent.ThreadLocalRandom
import scala.annotation.tailrec
import org.apache.flink.Utils
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.{BatchTableEnvironment, table2RowDataSet}
object BatchJob {
case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double)
private var dbName: String = "default"
private var tableName: String = ""
private var ckHost: String = ""
private var ckPort: String = "8123"
private var user: String = "default"
private var password: String = ""
def main(args: Array[String]) {
parse(args.toList)
checkArguments()
// set up the batch execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = BatchTableEnvironment.create(env)
val insertIntoCkSql =
s"""
| INSERT INTO $tableName (
| id, key1, value1, key2, value2
| ) VALUES (
| ?, ?, ?, ?, ?
| )
|""".stripMargin
val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName"
println(s"jdbc url: $jdbcUrl")
println(s"insert sql: $insertIntoCkSql")
val sink = JDBCAppendTableSink
.builder()
.setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
.setDBUrl(jdbcUrl)
.setUsername(user)
.setPassword(password)
.setQuery(insertIntoCkSql)
.setBatchSize(1000)
.setParameterTypes(Types.INT, Types.STRING, Types.BOOLEAN, Types.LONG, Types.DOUBLE)
.build()
val data = env.fromCollection(1 to 1000).map(i => {
val rand = ThreadLocalRandom.current()
val randString = (0 until rand.nextInt(10, 20))
.map(_ => rand.nextLong())
.mkString("")
Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian())
})
val table = table2RowDataSet(tableEnv.fromDataSet(data))
sink.emitDataSet(Utils.convertScalaDatasetToJavaDataset(table))
// execute program
env.execute("Flink Batch Scala API Skeleton")
}
private def printUsageAndExit(exitCode: Int = 0): Unit = {
println("Usage: flink run com.company.packageName.StreamingJob /path/to/flink-clickhouse-demo-1.0.0.jar [options]")
println(" --dbName 设置ClickHouse数据库的名称,默认为default")
println(" --tableName 设置ClickHouse库中表的名称")
println(" --ckHost 设置ClickHouse地址")
println(" --ckPort 设置ClickHouse端口,默认为8123")
println(" --user 设置ClickHouse所使用的用户名")
println(" --password 设置ClickHouse用户的密码,默认为空")
System.exit(exitCode)
}
@tailrec
private def parse(args: List[String]): Unit = args match {
case ("--help" | "-h") :: _ =>
printUsageAndExit()
case "--dbName" :: value :: tail =>
dbName = value
parse(tail)
case "--tableName" :: value :: tail =>
tableName = value
parse(tail)
case "--ckHost" :: value :: tail =>
ckHost = value
parse(tail)
case "--ckPort" :: value :: tail =>
ckPort = value
parse(tail)
case "--user" :: value :: tail =>
user = value
parse(tail)
case "--password" :: value :: tail =>
password = value
parse(tail)
case Nil =>
case _ =>
printUsageAndExit(1)
}
private def checkArguments(): Unit = {
if ("".equals(tableName) || "".equals(ckHost)) {
printUsageAndExit(2)
}
}
}
操作流程
步骤一:创建 ClickHouse 表
- 使用 SSH 方式登录 ClickHouse 集群,详情请参见使用SSH连接主节点。
- 执行如下命令,进入 ClickHouse 客户端。
- 创建 ClickHouse 信息。
i. 执行如下命令,创建数据库 clickhouse_database_name。
CREATE DATABASE clickhouse_database_name ON CLUSTER cluster_emr;
阿里云 EMR 会为 ClickHouse 集群自动生成一个名为 cluster_emr 的集群。数据库名您可以自定义。
ii. 执行如下命令,创建表 clickhouse_table_name_local。
CREATE TABLE clickhouse_database_name.clickhouse_table_name_local ON CLUSTER cluster_emr (
id UInt32,
key1 String,
value1 UInt8,
key2 Int64,
value2 Float64
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/clickhouse_database_name/clickhouse_table_name_local', '{replica}')
ORDER BY id
说明:表名您可以自定义,但请确保表名是以_local结尾。layer、shard 和 replica 是阿里云 EMR 为 ClickHouse 集群自动生成的宏定义,可以直接使用。
iii. 执行如下命令,创建与表 clickhouse_table_name_local 字段定义一致的表clickhouse_table_name_all。
说明: 表名您可以自定义,但请确保表名是以_all结尾。
CREATE TABLE clickhouse_database_name.clickhouse_table_name_all ON CLUSTER cluster_emr (
id UInt32,
key1 String,
value1 UInt8,
key2 Int64,
value2 Float64
) ENGINE = Distributed(cluster_emr, clickhouse_database_name, clickhouse_table_name_local, rand());
步骤二:编译并打包
- 下载并解压flink-clickhouse-demo.tgz示例到本地。
- 在 CMD 命令行中,进入到下载文件中 pom.xml 所在的目录下,执行如下命令打包文件。
根据您 pom.xml 文件中 artifactId 的信息,下载文件中的 target 目录下会出现 flink-clickhouse-demo-1.0.0.jar 的JAR包。
步骤三:提交作业
- 使用 SSH 方式登录 Flink 集群,详情请参见使用SSH连接主节点。
- 执行如下命令提交作业。代码示例如下:
flink run -m yarn-cluster \
-c com.aliyun.emr.StreamingJob \
flink-clickhouse-demo-1.0.0.jar \
--dbName clickhouse_database_name \
--tableName clickhouse_table_name_all \
--ckHost ${clickhouse_host}
flink run -m yarn-cluster \
-c com.aliyun.emr.BatchJob \
flink-clickhouse-demo-1.0.0.jar \
--dbName clickhouse_database_name \
--tableName clickhouse_table_name_all \
--ckHost ${clickhouse_host}
参数 | 说明 |
dbName | ClickHouse集群数据库的名称,默认为default。本文示例为clickhouse_database_name。 |
tableName | ClickHouse集群数据库中表的名称。本文示例为clickhouse_table_name_all。 |
ckHost | ClickHouse集群的Master节点的内网IP地址或公网IP地址。ip地址获取方式,请参见获取主节点的IP地址。 |
获取主节点的 IP 地址
- 进入详情页面。
- 登录阿里云E-MapReduce控制台。
- 在顶部菜单栏处,根据实际情况选择地域和资源组。
- 单击上方的集群管理页签。
- 在集群管理页面,单击相应集群所在行的详情。
- 在集群基础信息页面的主机信息区域,获取主节点的内网或公网IP地址。

三、从 HDFS 导入数据至 ClickHouse
——可以通过 HDFS 表引擎或表函数读写数据
前提条件
- 已创建 HDFS 集群,详情请参见创建集群。
- 已创建 ClickHouse 集群,详情请参见创建集群。
使用 HDFS 表引擎读写数据
语法
CREATE TABLE [IF NOT EXISTS] [db.]table_name
(
name1 [type1],
name2 [type2],
...
)
Engine = HDFS(uri, format)
其中,涉及参数描述如下表所示。
参数 | 描述 |
db | 数据库名。 |
table_name | 表名。 |
name1/name2 | 列名。 |
tyep1/type2 | 列的类型。 |
uri | HDFS上文件的地址。
说明 - 不可以是目录地址。
- 文件所属的目录需要存在,如果不存在,则写数据时会报错。
|
format | 文件的类型。 |
示例
- 创建业务表和 HDFS 表
i. 使用 SSH 方式登录 ClickHouse 集群,详情请参见登录集群。
ii. 执行如下命令,进入 ClickHouse 客户端。
iii. 执行以下命令,创建数据库 hdfs。
CREATE DATABASE IF NOT EXISTS hdfs ON CLUSTER cluster_emr;
iv. 执行以下命令,创建表 orders。
CREATE TABLE IF NOT EXISTS hdfs.orders ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
ENGINE = HDFS('hdfs://192.168.**.**:9000/orders.csv', 'CSV');
说明 本文示例是将示例数据上传到了HDFS集群的根目录下。代码中的192.168.**.**
为HDFS集群的emr-header-1节点的内网IP地址,您可以在EMR控制台集群管理页签中的主机列表页面查看。
v. 执行以下命令,创建数据库 product。
CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
vi. 执行以下命令,创建业务表 orders。
CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);
vii. 执行以下命令,创建业务表 orders_all。
CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
Engine = Distributed(cluster_emr, product, orders, rand());
- 使用 HDFS 表引擎导入数据
i. 下载并上传示例数据orders.csv至 HDFS 集群的目录下。
说明 本文示例上传到了 HDFS 集群的根目录下。
ii. 执行以下命令,导入数据。
INSERT INTO product.orders_all
SELECT
uid,
date,
skuId,
order_revenue
FROM
hdfs.orders;
iii. 执行以下命令,检查数据一致性。
SELECT
a.*
FROM
hdfs.orders a
LEFT ANTI JOIN
product.orders_all
USING uid;
- 使用 HDFS 表引擎导出数据
i. 执行以下命令,构造数据。
INSERT INTO product.orders_all VALUES \
(60333391,'2021-08-04 11:26:01',49358700,89) \
(38826285,'2021-08-03 10:47:29',25166907,27) \
(10793515,'2021-07-31 02:10:31',95584454,68) \
(70246093,'2021-08-01 00:00:08',82355887,97) \
(70149691,'2021-08-02 12:35:45',68748652,1) \
(87307646,'2021-08-03 19:45:23',16898681,71) \
(61694574,'2021-08-04 23:23:32',79494853,35) \
(61337789,'2021-08-02 07:10:42',23792355,55) \
(66879038,'2021-08-01 16:13:19',95820038,89);
ii. 执行以下命令,导出数据。
INSERT INTO hdfs.orders
SELECT
uid,
date,
skuId,
order_revenue
FROM
product.orders_all;
iii. 执行以下命令,可以检查数据一致性。
SELECT
a.*
FROM
hdfs.orders
RIGHT ANTI JOIN
product.orders_all a
USING uid;
使用 HDFS 表函数读写数据
语法
hdfs(uri, format, structure);
其中,涉及参数描述如下表所示。
参数 | 描述 |
uri | HDFS上文件的地址。
说明 - 不可以是目录地址。
- 文件所属的目录需要存在,如果不存在,则写数据时会报错。
|
format | 文件的类型。 |
structure | 表中字段的类型。例如,column1 UInt32,column2 String。 |
示例
- 创建数据库和业务表
i. 使用 SSH 方式登录 ClickHouse 集群,详情请参见登录集群。
ii. 执行如下命令,进入 ClickHouse 客户端。
iii. 执行以下命令,创建数据库 product。
CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
iv. 执行以下命令,创建业务表 orders。
CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);
v. 执行以下命令,创建业务表 orders_all。
CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
Engine = Distributed(cluster_emr, product, orders, rand());
- 使用 HDFS 表引擎导入数据
i. 下载并上传示例数据orders.csv至HDFS集群的目录下。
说明 本文示例上传到了 HDFS 集群的根目录下。
ii. 执行以下命令,导入数据。
INSERT INTO product.orders_all
SELECT
uid,
date,
skuId,
order_revenue
FROM
hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32');
iii. 执行以下命令,可以检查数据一致性。
SELECT
a.*
FROM
hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32') a
LEFT ANTI JOIN
product.orders_all
USING uid;
- 使用 HDFS 表引擎导出数据
i. 执行以下命令,构造数据。
INSERT INTO product.orders_all VALUES \
(60333391,'2021-08-04 11:26:01',49358700,89) \
(38826285,'2021-08-03 10:47:29',25166907,27) \
(10793515,'2021-07-31 02:10:31',95584454,68) \
(70246093,'2021-08-01 00:00:08',82355887,97) \
(70149691,'2021-08-02 12:35:45',68748652,1) \
(87307646,'2021-08-03 19:45:23',16898681,71) \
(61694574,'2021-08-04 23:23:32',79494853,35) \
(61337789,'2021-08-02 07:10:42',23792355,55) \
(66879038,'2021-08-01 16:13:19',95820038,89);
ii. 执行以下命令,导出数据。
INSERT INTO FUNCTION
hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32')
SELECT
uid,
date,
skuId,
order_revenue
FROM
product.orders_all;
iii. 执行以下命令,可以检查数据一致性。
SELECT
a.*
FROM
hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32')
RIGHT ANTI JOIN
product.orders_all a
配置
EMR ClickHouse 允许使用对 HDFS 进行配置:
<hdfs>
<dfs_default_replica>3</dfs_default_replica>
</hdfs>
说明 查询参数时将下划线(_)替换为半角句号(.)即可。例如,您要查询EMR中的参数dfs_default_replica
,则可以在官网文档中搜索dfs.default.replica
。
- 仅对${user}用户生效的 HDFS 配置,用户配置与全局配置相同的键不同值时,会覆盖全局配置
<hdfs_${user}>
<dfs_default_replica>3</dfs_default_replica>
</hdfs_${user}>
四、从 RDS 导入数据至 ClickHouse 集群
——可以通过 RDS MySQL 表引擎或表函数导入数据至 ClickHouse 集群
前提条件
使用 RDS MySQL 表引擎导入数据
语法
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
...
) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);
其中,涉及参数描述如下表所示。
参数 | 描述 |
db | 数据库名。 |
table_name | 表名。 |
cluster | 集群标识。 |
name1/name2 | 列名。 |
tyep1/type2 | 列的类型。 |
host:port | RDS MySQL的地址,可以在RDS MySQL管理控制台中数据库连接中进行查看。 |
database | RDS MySQL中的数据库名。 |
table | RDS MySQL中的表名。 |
user | 用户名,该用户具有访问上述RDS MySQL中库中的表的权限。 |
password | user 对应的密码。 |
replace_query | 是否将INSERT INTO查询转换为REPLACE INTO的标志。设置为1,表示替换查询。 |
on_duplicate_clause | 会被添加到INSERT语句中。例如,INSERT INTO t (c1,c2) VALUES ('a', 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1 ,此时需要指定on_duplicate_clause 为UPDATE c2 = c2 + 1 。 |
示例
- 在 RDS MySQL 实例中,创建原始数据表并导入原始数据。
i. 连接 MySQL 实例,详情请参见通过客户端、命令行连接RDS MySQL。
ii. 执行以下命令,创建原始数据表。
CREATE TABLE `origin`.`orders` (
`uid` int(10) unsigned DEFAULT NULL,
`date` datetime DEFAULT NULL,
`skuId` int(10) unsigned DEFAULT NULL,
`order_revenue` int(10) unsigned DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8
iii. 执行以下命令,导入原始数据。
INSERT INTO `origin`.`orders` VALUES(60333391, '2021-08-04 11:26:01', 49358700, 89),
(38826285, '2021-08-03 10:47:29', 25166907, 27),
(10793515, '2021-07-31 02:10:31', 95584454, 68),
(70246093, '2021-08-01 00:00:08', 82355887, 97),
(70149691, '2021-08-02 12:35:45', 68748652, 1),
(87307646, '2021-08-03 19:45:23', 16898681, 71),
(61694574, '2021-08-04 23:23:32', 79494853, 35),
(61337789, '2021-08-02 07:10:42', 23792355, 55),
(66879038, '2021-08-01 16:13:19', 95820038, 89);
- 在 ClickHouse 集群中,执行以下操作。
i. 使用 SSH 方式登录 ClickHouse 集群,详情请参见登录集群。
ii. 执行以下命令,进入 ClickHouse 客户端。
iii. 执行以下命令,创建数据库 mysql。
CREATE DATABASE IF NOT EXISTS mysql;
iv. 执行以下命令,创建表 orders。
CREATE TABLE mysql.orders
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
ENGINE = MySQL('host:port', 'origin', 'orders', 'user', 'password');
v. 执行以下命令,创建数据库 product。
CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
vi. 执行以下命令,创建业务表 orders。
CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);
vii. 执行以下命令,创建业务表orders_all。
CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
Engine = Distributed(cluster_emr, product, orders, rand());
viii. 执行以下命令,导入数据。
INSERT INTO product.orders_all
SELECT
uid,
date,
skuId,
order_revenue
FROM
mysql.orders;
ix. 执行以下命令,查询数据。
SELECT a.*
FROM mysql.orders AS a
ANTI LEFT JOIN product.orders_all USING (uid);
说明 查询数据为空时正常。
使用 RDS MySQL 表函数导入数据
语法
mysql('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause'])
其中,涉及参数描述如下表所示。
参数 | 描述 |
host:port | RDS MySQL的地址,您可以在RDS MySQL管理控制台中的数据库连接中查看。 |
database | RDS MySQL中的数据库名。 |
table | RDS MySQL中的表名。 |
user | 用户名,该用户具有访问上述RDS MySQL中库中的表的权限。 |
password | user 对应的密码。 |
replace_query | 是否将INSERT INTO查询转换为REPLACE INTO的标志。设置为1,表示替换查询。 |
on_duplicate_clause | 会被添加到INSERT语句中。例如,INSERT INTO t (c1,c2) VALUES ('a', 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1 ,此时需要指定on_duplicate_clause 为UPDATE c2 = c2 + 1 。 |
示例
- 在 RDS MySQL 实例中,创建表并插入数据。
i. 连接 MySQL 实例,详情请参见通过客户端、命令行连接RDS MySQL。
ii. 执行以下命令,创建表 orders。
CREATE TABLE `origin`.`orders` (
`uid` int(10) unsigned DEFAULT NULL,
`date` datetime DEFAULT NULL,
`skuId` int(10) unsigned DEFAULT NULL,
`order_revenue` int(10) unsigned DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8
iii. 执行以下命令,插入数据。
INSERT INTO `origin`.`orders` VALUES(60333391, '2021-08-04 11:26:01', 49358700, 89),
(38826285, '2021-08-03 10:47:29', 25166907, 27),
(10793515, '2021-07-31 02:10:31', 95584454, 68),
(70246093, '2021-08-01 00:00:08', 82355887, 97),
(70149691, '2021-08-02 12:35:45', 68748652, 1),
(87307646, '2021-08-03 19:45:23', 16898681, 71),
(61694574, '2021-08-04 23:23:32', 79494853, 35),
(61337789, '2021-08-02 07:10:42', 23792355, 55),
(66879038, '2021-08-01 16:13:19', 95820038, 89) ;
- 在 ClickHouse 集群中,执行以下操作。
i. 使用 SSH 方式登录 ClickHouse 集群,详情请参见登录集群。
ii. 执行如下命令,进入 ClickHouse 客户端。
iii. 执行以下命令,创建数据库 product。
CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
iv.执行以下命令,创建表 orders。
CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);
v. 执行以下命令,创建表 orders_all。
CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
Engine = Distributed(cluster_emr, product, orders, rand());
vi. 执行以下命令,导入数据。
INSERT INTO product.orders_all
SELECT
uid,
date,
skuId,
order_revenue
FROM
mysql('host:port', 'origin', 'orders', 'user', 'password');
vii. 执行以下命令,查询数据。
SELECT a.*
FROM
mysql('host:port', 'origin', 'orders', 'user', 'password') AS a
ANTI LEFT JOIN product.orders_all USING (uid);
说明 查询数据为空时正常。
如果您需要导出数据,则将业务表数据写入 MySQL 表函数即可。写入命令如下。
INSERT INTO FUNCTION
mysql('host:port', 'origin', 'orders', 'user', 'password')
FROM
product.orders_all;
五、从 Kafka 导入数据至 ClickHouse
——可以通过 Kafka 表引擎导入数据至 ClickHouse 集群
前提条件
- 已创建 Kafka 集群,详情请参见创建集群。
- 已创建 ClickHouse 集群,详情请参见创建集群。
使用限制
Kafka集群和ClickHouse集群需要在同一VPC下。
语法
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host1:port1,host2:port2',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format';
其中,涉及参数描述如下表所示。
参数 | 描述 |
db | 数据库名。 |
table_name | 表名。 |
cluster | 集群标识。 |
name1/name2 | 列名。 |
tyep1/type2 | 列的类型。 |
kafka_broker_list | Kafka Broker的地址及端口。 Kafka集群所有节点的内网IP地址及端口,您可以在EMR控制台集群管理页签中的主机列表页面查看。 |
kafka_topic_list | 订阅的Topic名称。 |
kafka_group_name | Kafka consumer的分组名称。 |
kafka_format | 数据的类型。例如,CSV和JSONEachRow等,详细信息请参见Formats for Input and Output Data。 |
示例
- 在 ClickHouse 集群中执行以下操作。
i. 使用SSH方式登录 ClickHouse 集群,详情请参见登录集群。
ii. 执行如下命令,进入 ClickHouse 客户端。
iii. 执行如下命令,创建数据库 kafka。
CREATE DATABASE IF NOT EXISTS kafka ON CLUSTER cluster_emr;
说明 数据库名您可以自定义。本文示例中的cluster_emr
是集群默认的标识,如果您修改过,请填写正确的集群标识,您也可以在EMR控制台ClickHouse服务的配置页面,在搜索区域搜索clickhouse_remote_servers参数查看。
iv. 执行如下命令,创建 Kafka 表。
CREATE TABLE IF NOT EXISTS kafka.consumer ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092',
kafka_topic_list = 'clickhouse_test',
kafka_group_name = 'clickhouse_test',
kafka_format = 'CSV';
kafka_broker_list
为 Kafka 集群所有节点的内网 IP 地址及端口,您可以在 EMR 控制台集群管理页签中的主机列表页面查看。其余参数含义请参见语法。

v. 执行如下命令,创建数据库 product。
CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
vi. 执行以下命令,创建本地表。
CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);
vii. 执行以下命令,创建分布式表。
CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
Engine = Distributed(cluster_emr, product, orders, rand());
viii. 执行以下命令,创建 MATERIALIZED VIEW 自动导数据。
CREATE MATERIALIZED VIEW IF NOT EXISTS product.kafka_load ON CLUSTER cluster_emr TO product.orders AS
SELECT *
FROM kafka.consumer;
- 在 Kafka 集群中执行以下操作。
i. 使用 SSH 方式登录 Kafka 集群,详情请参见登录集群。
ii. 在 Kafka 集群的命令行窗口,执行如下命令运行 Kafka 的生产者。
/usr/lib/kafka-current/bin/kafka-console-producer.sh --broker-list 192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092 --topic clickhouse_test
iii. 执行以下命令,输入测试数据。
38826285,2021-08-03 10:47:29,25166907,27
10793515,2021-07-31 02:10:31,95584454,68
70246093,2021-08-01 00:00:08,82355887,97
70149691,2021-08-02 12:35:45,68748652,1
87307646,2021-08-03 19:45:23,16898681,71
61694574,2021-08-04 23:23:32,79494853,35
61337789,2021-08-02 07:10:42,23792355,55
66879038,2021-08-01 16:13:19,95820038,89

- 在 ClickHouse 命令窗口中,执行以下命令,可以查看从 Kafka 中导入至 ClickHouse 集群的数据。
您可以校验查询到的数据与源数据是否一致。
SELECT * FROM product.orders_all;

后续
您已经学习了数据导入,本系列还包括其他内容:
获取更详细的信息,点击下方链接查看:
EMR官网:https://www.aliyun.com/product/emapreduce
EMR ClickHouse :https://help.aliyun.com/document_detail/212195.html
扫描下方二维码加入 EMR 相关产品钉钉交流群一起参与讨论吧!
