阿里云 E-MapReduce(简称EMR)是运行在阿里云平台上的一种大数据处理的系统解决方案。ClickHouse 作为开源的列式存储数据库,主要用于在线分析处理查询(OLAP),能够使用 SQL 查询实时生成分析数据报告。而阿里云 EMR ClickHouse 则提供了开源 OLAP 分析引擎 ClickHouse 的云上托管服务。
本系列文章将从以下几个方面详细介绍 EMR ClickHouse 的操作指南:
- ClickHouse 概述
- 快速入门
- ClickHouse 运维
- 数据导入(本文)
- 常见问题
EMR ClickHouse 操作指南 —数据导入
一、从 Spark 导入数据至 ClickHouse
——介绍如何将 Spark 中的数据导入至 ClickHouse 集群
前提条件
- 开发工具
- 本地安装了 Java JDK 8。
- 本地安装了 Maven 3.x。
- 本地安装了用于 Java 或 Scala 开发的 IDE,推荐 IntelliJ IDEA,且已配置完成 JDK 和 Maven 环境。
背景信息
关于 Spark 的更多介绍,请参见简介。
代码示例
代码示例如下。
package com.company.packageName import java.util.Properties import java.util.concurrent.ThreadLocalRandom import scala.annotation.tailrec import com.google.common.collect.ImmutableMap import org.apache.spark.internal.Logging import org.apache.spark.sql.{SaveMode, SparkSession} case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double) object CKDataImporter extends Logging { private var dbName: String = "default" private var tableName: String = "" private var ckHost: String = "" private var ckPort: String = "8123" private var user: String = "default" private var password: String = "" private var local: Boolean = false def main(args: Array[String]): Unit = { parse(args.toList) checkArguments() val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName" logInfo(s"Use jdbc: $jdbcUrl") logInfo(s"Use table: $tableName") val spark = getSparkSession // generate test data val rdd = spark.sparkContext.parallelize(1 to 1000).map(i => { val rand = ThreadLocalRandom.current() val randString = (0 until rand.nextInt(10, 20)) .map(_ => rand.nextLong()) .mkString("") Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian()) }) val df = spark.createDataFrame(rdd) df.write .mode(SaveMode.Append) .jdbc(jdbcUrl, tableName, getCKJdbcProperties(user, password)) } private def printUsageAndExit(exitCode: Int = 0): Unit = { logError("Usage: java -jar /path/to/CKDataImporter.jar [options]") logError(" --dbName 设置ClickHouse数据库的名称,默认为default") logError(" --tableName 设置ClickHouse库中表的名称") logError(" --ckHost 设置ClickHouse地址") logError(" --ckPort 设置ClickHouse端口,默认为8123") logError(" --user 设置ClickHouse所使用的用户名") logError(" --password 设置ClickHouse用户的密码,默认为空") logError(" --local 设置此程序使用Spark Local模式运行") System.exit(exitCode) } @tailrec private def parse(args: List[String]): Unit = args match { case ("--help" | "-h") :: _ => printUsageAndExit() case "--dbName" :: value :: tail => dbName = value parse(tail) case "--tableName" :: value :: tail => tableName = value parse(tail) case "--ckHost" :: value :: tail => ckHost = value parse(tail) case "--ckPort" :: value :: tail => ckPort = value parse(tail) case "--user" :: value :: tail => user = value parse(tail) case "--password" :: value :: tail => password = value parse(tail) case "--local" :: tail => local = true parse(tail) case Nil => case _ => printUsageAndExit(1) } private def checkArguments(): Unit = { if ("".equals(tableName) || "".equals(ckHost)) { printUsageAndExit(2) } } private def getCKJdbcProperties( user: String, password: String, batchSize: String = "1000", socketTimeout: String = "300000", numPartitions: String = "8", rewriteBatchedStatements: String = "true"): Properties = { val kvMap = ImmutableMap.builder() .put("driver", "ru.yandex.clickhouse.ClickHouseDriver") .put("user", user) .put("password", password) .put("batchsize", batchSize) .put("socket_timeout", socketTimeout) .put("numPartitions", numPartitions) .put("rewriteBatchedStatements", rewriteBatchedStatements) .build() val properties = new Properties properties.putAll(kvMap) properties } private def getSparkSession: SparkSession = { val builder = SparkSession.builder() if (local) { builder.master("local[*]") } builder.appName("ClickHouse-Data-Importer") builder.getOrCreate() } }
操作流程
步骤一:创建 ClickHouse 表
- 使用 SSH 方式登录 ClickHouse 集群,详情请参见使用SSH连接主节点。
- 执行如下命令,进入 ClickHouse 客户端。
clickhouse-client
- 创建 ClickHouse 信息。
i. 执行如下命令,创建数据库 clickhouse_database_name。
CREATE DATABASE clickhouse_database_name ON CLUSTER cluster_emr;
阿里云 EMR 会为 ClickHouse 集群自动生成一个名为 cluster_emr 的集群。数据库名您可以自定义。
ii. 执行如下命令,创建表 clickhouse_table_name_local。
CREATE TABLE clickhouse_database_name.clickhouse_table_name_local ON CLUSTER cluster_emr ( id UInt32, key1 String, value1 UInt8, key2 Int64, value2 Float64 ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/clickhouse_database_name/clickhouse_table_name_local', '{replica}') ORDER BY id
说明:表名您可以自定义,但请确保表名是以 _local 结尾。layer、shard 和 replica 是阿里云 EMR 为 ClickHouse 集群自动生成的宏定义,可以直接使用。
iii. 执行如下命令,创建与表 clickhouse_table_name_local 字段定义一致的表 clickhouse_table_name_all。
说明: 表名您可以自定义,但请确保表名是以 _all 结尾。
CREATE TABLE clickhouse_database_name.clickhouse_table_name_all ON CLUSTER cluster_emr ( id UInt32, key1 String, value1 UInt8, key2 Int64, value2 Float64 ) ENGINE = Distributed(cluster_emr, clickhouse_database_name, clickhouse_table_name_local, rand());
步骤二:编译并打包
- 下载并解压CKDataImporter示例到本地。
- 在 CMD 命令行中,进入到下载文件中 pom.xml 所在的目录下,执行如下命令打包文件。
mvn clean package
根据您pom.xml文件中artifactId的信息,下载文件中的target目录下会出现CKDataImporter-1.0.0.jar的JAR包。
步骤三:提交作业
- 使用 SSH 方式登录 Spark 集群,详情请参见使用SSH连接主节点。
- 执行如下命令提交作业。
spark-submit --master yarn \ --class com.aliyun.emr.CKDataImporter \ /CKDataImporter-1.0.0.jar \ --dbName clickhouse_database_name \ --tableName clickhouse_table_name_all \ --ckHost ${clickhouse_host}
参数 | 说明 |
dbName | ClickHouse集群数据库的名称,默认为default。本文示例为clickhouse_database_name。 |
tableName | ClickHouse集群数据库中表的名称。本文示例为clickhouse_table_name_all。 |
ckHost | ClickHouse集群的Master节点的内网IP地址或公网IP地址。ip地址获取方式,请参见获取主节点的IP地址。 |
获取主节点的 IP 地址
- 进入详情页面。
- 登录阿里云E-MapReduce控制台。
- 在顶部菜单栏处,根据实际情况选择地域和资源组。
- 单击上方的集群管理页签。
- 在集群管理页面,单击相应集群所在行的详情。
- 在集群基础信息页面的主机信息区域,获取主节点的内网或公网IP地址。
二、从 Flink 导入数据至 ClickHouse
——介绍如何将 Flink 中的数据导入至 ClickHouse 集群
前提条件
- 开发工具
- 本地安装了 Java JDK 8。
- 本地安装了 Maven 3.x。
- 本地安装了用于 Java 或 Scala 开发的 IDE,推荐 IntelliJ IDEA,且已配置完成 JDK 和 Maven 环境。
背景信息
关于 Flink 的更多介绍,请参见Apache Flink。
代码示例
代码示例如下。
- 流处理
package com.company.packageName import java.util.concurrent.ThreadLocalRandom import scala.annotation.tailrec import org.apache.flink.api.common.typeinfo.Types import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.scala.{StreamTableEnvironment, table2RowDataStream} object StreamingJob { case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double) private var dbName: String = "default" private var tableName: String = "" private var ckHost: String = "" private var ckPort: String = "8123" private var user: String = "default" private var password: String = "" def main(args: Array[String]) { parse(args.toList) checkArguments() // set up the streaming execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) val insertIntoCkSql = s""" | INSERT INTO $tableName ( | id, key1, value1, key2, value2 | ) VALUES ( | ?, ?, ?, ?, ? | ) |""".stripMargin val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName" println(s"jdbc url: $jdbcUrl") println(s"insert sql: $insertIntoCkSql") val sink = JDBCAppendTableSink .builder() .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") .setDBUrl(jdbcUrl) .setUsername(user) .setPassword(password) .setQuery(insertIntoCkSql) .setBatchSize(1000) .setParameterTypes(Types.INT, Types.STRING, Types.BOOLEAN, Types.LONG, Types.DOUBLE) .build() val data: DataStream[Test] = env.fromCollection(1 to 1000).map(i => { val rand = ThreadLocalRandom.current() val randString = (0 until rand.nextInt(10, 20)) .map(_ => rand.nextLong()) .mkString("") Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian()) }) val table = table2RowDataStream(tableEnv.fromDataStream(data)) sink.emitDataStream(table.javaStream) // execute program env.execute("Flink Streaming Scala API Skeleton") } private def printUsageAndExit(exitCode: Int = 0): Unit = { println("Usage: flink run com.company.packageName.StreamingJob /path/to/flink-clickhouse-demo-1.0.0.jar [options]") println(" --dbName 设置ClickHouse数据库的名称,默认为default") println(" --tableName 设置ClickHouse库中表的名称") println(" --ckHost 设置ClickHouse地址") println(" --ckPort 设置ClickHouse端口,默认为8123") println(" --user 设置ClickHouse所使用的用户名") println(" --password 设置ClickHouse用户的密码,默认为空") System.exit(exitCode) } @tailrec private def parse(args: List[String]): Unit = args match { case ("--help" | "-h") :: _ => printUsageAndExit() case "--dbName" :: value :: tail => dbName = value parse(tail) case "--tableName" :: value :: tail => tableName = value parse(tail) case "--ckHost" :: value :: tail => ckHost = value parse(tail) case "--ckPort" :: value :: tail => ckPort = value parse(tail) case "--user" :: value :: tail => user = value parse(tail) case "--password" :: value :: tail => password = value parse(tail) case Nil => case _ => printUsageAndExit(1) } private def checkArguments(): Unit = { if ("".equals(tableName) || "".equals(ckHost)) { printUsageAndExit(2) } } }
- 批处理
package com.company.packageName import java.util.concurrent.ThreadLocalRandom import scala.annotation.tailrec import org.apache.flink.Utils import org.apache.flink.api.common.typeinfo.Types import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala.{BatchTableEnvironment, table2RowDataSet} object BatchJob { case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double) private var dbName: String = "default" private var tableName: String = "" private var ckHost: String = "" private var ckPort: String = "8123" private var user: String = "default" private var password: String = "" def main(args: Array[String]) { parse(args.toList) checkArguments() // set up the batch execution environment val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = BatchTableEnvironment.create(env) val insertIntoCkSql = s""" | INSERT INTO $tableName ( | id, key1, value1, key2, value2 | ) VALUES ( | ?, ?, ?, ?, ? | ) |""".stripMargin val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName" println(s"jdbc url: $jdbcUrl") println(s"insert sql: $insertIntoCkSql") val sink = JDBCAppendTableSink .builder() .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") .setDBUrl(jdbcUrl) .setUsername(user) .setPassword(password) .setQuery(insertIntoCkSql) .setBatchSize(1000) .setParameterTypes(Types.INT, Types.STRING, Types.BOOLEAN, Types.LONG, Types.DOUBLE) .build() val data = env.fromCollection(1 to 1000).map(i => { val rand = ThreadLocalRandom.current() val randString = (0 until rand.nextInt(10, 20)) .map(_ => rand.nextLong()) .mkString("") Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian()) }) val table = table2RowDataSet(tableEnv.fromDataSet(data)) sink.emitDataSet(Utils.convertScalaDatasetToJavaDataset(table)) // execute program env.execute("Flink Batch Scala API Skeleton") } private def printUsageAndExit(exitCode: Int = 0): Unit = { println("Usage: flink run com.company.packageName.StreamingJob /path/to/flink-clickhouse-demo-1.0.0.jar [options]") println(" --dbName 设置ClickHouse数据库的名称,默认为default") println(" --tableName 设置ClickHouse库中表的名称") println(" --ckHost 设置ClickHouse地址") println(" --ckPort 设置ClickHouse端口,默认为8123") println(" --user 设置ClickHouse所使用的用户名") println(" --password 设置ClickHouse用户的密码,默认为空") System.exit(exitCode) } @tailrec private def parse(args: List[String]): Unit = args match { case ("--help" | "-h") :: _ => printUsageAndExit() case "--dbName" :: value :: tail => dbName = value parse(tail) case "--tableName" :: value :: tail => tableName = value parse(tail) case "--ckHost" :: value :: tail => ckHost = value parse(tail) case "--ckPort" :: value :: tail => ckPort = value parse(tail) case "--user" :: value :: tail => user = value parse(tail) case "--password" :: value :: tail => password = value parse(tail) case Nil => case _ => printUsageAndExit(1) } private def checkArguments(): Unit = { if ("".equals(tableName) || "".equals(ckHost)) { printUsageAndExit(2) } } }
操作流程
步骤一:创建 ClickHouse 表
- 使用 SSH 方式登录 ClickHouse 集群,详情请参见使用SSH连接主节点。
- 执行如下命令,进入 ClickHouse 客户端。
clickhouse-client
- 创建 ClickHouse 信息。
i. 执行如下命令,创建数据库 clickhouse_database_name。
CREATE DATABASE clickhouse_database_name ON CLUSTER cluster_emr;
阿里云 EMR 会为 ClickHouse 集群自动生成一个名为 cluster_emr 的集群。数据库名您可以自定义。
ii. 执行如下命令,创建表 clickhouse_table_name_local。
CREATE TABLE clickhouse_database_name.clickhouse_table_name_local ON CLUSTER cluster_emr ( id UInt32, key1 String, value1 UInt8, key2 Int64, value2 Float64 ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/clickhouse_database_name/clickhouse_table_name_local', '{replica}') ORDER BY id
说明:表名您可以自定义,但请确保表名是以_local结尾。layer、shard 和 replica 是阿里云 EMR 为 ClickHouse 集群自动生成的宏定义,可以直接使用。
iii. 执行如下命令,创建与表 clickhouse_table_name_local 字段定义一致的表clickhouse_table_name_all。
说明: 表名您可以自定义,但请确保表名是以_all结尾。
CREATE TABLE clickhouse_database_name.clickhouse_table_name_all ON CLUSTER cluster_emr ( id UInt32, key1 String, value1 UInt8, key2 Int64, value2 Float64 ) ENGINE = Distributed(cluster_emr, clickhouse_database_name, clickhouse_table_name_local, rand());
步骤二:编译并打包
- 下载并解压flink-clickhouse-demo.tgz示例到本地。
- 在 CMD 命令行中,进入到下载文件中 pom.xml 所在的目录下,执行如下命令打包文件。
mvn clean package
根据您 pom.xml 文件中 artifactId 的信息,下载文件中的 target 目录下会出现 flink-clickhouse-demo-1.0.0.jar 的JAR包。
步骤三:提交作业
- 使用 SSH 方式登录 Flink 集群,详情请参见使用SSH连接主节点。
- 执行如下命令提交作业。代码示例如下:
- 流作业
flink run -m yarn-cluster \ -c com.aliyun.emr.StreamingJob \ flink-clickhouse-demo-1.0.0.jar \ --dbName clickhouse_database_name \ --tableName clickhouse_table_name_all \ --ckHost ${clickhouse_host}
- 批作业
flink run -m yarn-cluster \ -c com.aliyun.emr.BatchJob \ flink-clickhouse-demo-1.0.0.jar \ --dbName clickhouse_database_name \ --tableName clickhouse_table_name_all \ --ckHost ${clickhouse_host}
参数 | 说明 |
dbName | ClickHouse集群数据库的名称,默认为default。本文示例为clickhouse_database_name。 |
tableName | ClickHouse集群数据库中表的名称。本文示例为clickhouse_table_name_all。 |
ckHost | ClickHouse集群的Master节点的内网IP地址或公网IP地址。ip地址获取方式,请参见获取主节点的IP地址。 |
获取主节点的 IP 地址
- 进入详情页面。
- 登录阿里云E-MapReduce控制台。
- 在顶部菜单栏处,根据实际情况选择地域和资源组。
- 单击上方的集群管理页签。
- 在集群管理页面,单击相应集群所在行的详情。
- 在集群基础信息页面的主机信息区域,获取主节点的内网或公网IP地址。
三、从 HDFS 导入数据至 ClickHouse
——可以通过 HDFS 表引擎或表函数读写数据
前提条件
使用 HDFS 表引擎读写数据
语法
CREATE TABLE [IF NOT EXISTS] [db.]table_name ( name1 [type1], name2 [type2], ... ) Engine = HDFS(uri, format)
其中,涉及参数描述如下表所示。
参数 | 描述 |
db |
数据库名。 |
table_name |
表名。 |
name1/name2 |
列名。 |
tyep1/type2 |
列的类型。 |
uri |
HDFS上文件的地址。 说明
|
format |
文件的类型。 |
示例
- 创建业务表和 HDFS 表
i. 使用 SSH 方式登录 ClickHouse 集群,详情请参见登录集群。
ii. 执行如下命令,进入 ClickHouse 客户端。
clickhouse-client
iii. 执行以下命令,创建数据库 hdfs。
CREATE DATABASE IF NOT EXISTS hdfs ON CLUSTER cluster_emr;
iv. 执行以下命令,创建表 orders。
CREATE TABLE IF NOT EXISTS hdfs.orders ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) ENGINE = HDFS('hdfs://192.168.**.**:9000/orders.csv', 'CSV');
说明 本文示例是将示例数据上传到了HDFS集群的根目录下。代码中的192.168.**.**
为HDFS集群的emr-header-1节点的内网IP地址,您可以在EMR控制台集群管理页签中的主机列表页面查看。
v. 执行以下命令,创建数据库 product。
CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
vi. 执行以下命令,创建业务表 orders。
CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}') PARTITION BY toYYYYMMDD(date) ORDER BY toYYYYMMDD(date);
vii. 执行以下命令,创建业务表 orders_all。
CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) Engine = Distributed(cluster_emr, product, orders, rand());
- 使用 HDFS 表引擎导入数据
i. 下载并上传示例数据orders.csv至 HDFS 集群的目录下。
说明 本文示例上传到了 HDFS 集群的根目录下。
ii. 执行以下命令,导入数据。
INSERT INTO product.orders_all SELECT uid, date, skuId, order_revenue FROM hdfs.orders;
iii. 执行以下命令,检查数据一致性。
SELECT a.* FROM hdfs.orders a LEFT ANTI JOIN product.orders_all USING uid;
- 使用 HDFS 表引擎导出数据
i. 执行以下命令,构造数据。
INSERT INTO product.orders_all VALUES \ (60333391,'2021-08-04 11:26:01',49358700,89) \ (38826285,'2021-08-03 10:47:29',25166907,27) \ (10793515,'2021-07-31 02:10:31',95584454,68) \ (70246093,'2021-08-01 00:00:08',82355887,97) \ (70149691,'2021-08-02 12:35:45',68748652,1) \ (87307646,'2021-08-03 19:45:23',16898681,71) \ (61694574,'2021-08-04 23:23:32',79494853,35) \ (61337789,'2021-08-02 07:10:42',23792355,55) \ (66879038,'2021-08-01 16:13:19',95820038,89);
ii. 执行以下命令,导出数据。
INSERT INTO hdfs.orders SELECT uid, date, skuId, order_revenue FROM product.orders_all;
iii. 执行以下命令,可以检查数据一致性。
SELECT a.* FROM hdfs.orders RIGHT ANTI JOIN product.orders_all a USING uid;
使用 HDFS 表函数读写数据
语法
hdfs(uri, format, structure);
其中,涉及参数描述如下表所示。
参数 | 描述 |
uri |
HDFS上文件的地址。 说明
|
format |
文件的类型。 |
structure |
表中字段的类型。例如,column1 UInt32,column2 String。 |
示例
- 创建数据库和业务表
i. 使用 SSH 方式登录 ClickHouse 集群,详情请参见登录集群。
ii. 执行如下命令,进入 ClickHouse 客户端。
clickhouse-client
iii. 执行以下命令,创建数据库 product。
CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
iv. 执行以下命令,创建业务表 orders。
CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}') PARTITION BY toYYYYMMDD(date) ORDER BY toYYYYMMDD(date);
v. 执行以下命令,创建业务表 orders_all。
CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) Engine = Distributed(cluster_emr, product, orders, rand());
- 使用 HDFS 表引擎导入数据
i. 下载并上传示例数据orders.csv至HDFS集群的目录下。
说明 本文示例上传到了 HDFS 集群的根目录下。
ii. 执行以下命令,导入数据。
INSERT INTO product.orders_all SELECT uid, date, skuId, order_revenue FROM hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32');
iii. 执行以下命令,可以检查数据一致性。
SELECT a.* FROM hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32') a LEFT ANTI JOIN product.orders_all USING uid;
- 使用 HDFS 表引擎导出数据
i. 执行以下命令,构造数据。
INSERT INTO product.orders_all VALUES \ (60333391,'2021-08-04 11:26:01',49358700,89) \ (38826285,'2021-08-03 10:47:29',25166907,27) \ (10793515,'2021-07-31 02:10:31',95584454,68) \ (70246093,'2021-08-01 00:00:08',82355887,97) \ (70149691,'2021-08-02 12:35:45',68748652,1) \ (87307646,'2021-08-03 19:45:23',16898681,71) \ (61694574,'2021-08-04 23:23:32',79494853,35) \ (61337789,'2021-08-02 07:10:42',23792355,55) \ (66879038,'2021-08-01 16:13:19',95820038,89);
ii. 执行以下命令,导出数据。
INSERT INTO FUNCTION hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32') SELECT uid, date, skuId, order_revenue FROM product.orders_all;
iii. 执行以下命令,可以检查数据一致性。
SELECT a.* FROM hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32') RIGHT ANTI JOIN product.orders_all a
配置
EMR ClickHouse 允许使用对 HDFS 进行配置:
- 全局生效的 HDFS 配置。
<hdfs> <dfs_default_replica>3</dfs_default_replica> </hdfs>
- HDFS 参数的详细信息,请参见官网文档HDFS Configuration Reference。
说明 查询参数时将下划线(_)替换为半角句号(.)即可。例如,您要查询EMR中的参数dfs_default_replica
,则可以在官网文档中搜索dfs.default.replica
。
- 仅对${user}用户生效的 HDFS 配置,用户配置与全局配置相同的键不同值时,会覆盖全局配置
<hdfs_${user}> <dfs_default_replica>3</dfs_default_replica> </hdfs_${user}>
四、从 RDS 导入数据至 ClickHouse 集群
——可以通过 RDS MySQL 表引擎或表函数导入数据至 ClickHouse 集群
前提条件
- 已购买 RDS,详情请参见创建RDS MySQL实例。
- 已创建 ClickHouse 集群,详情请参见创建集群。
使用 RDS MySQL 表引擎导入数据
语法
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2], ... ) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);
其中,涉及参数描述如下表所示。
参数 | 描述 |
db |
数据库名。 |
table_name |
表名。 |
cluster |
集群标识。 |
name1/name2 |
列名。 |
tyep1/type2 |
列的类型。 |
host:port |
RDS MySQL的地址,可以在RDS MySQL管理控制台中数据库连接中进行查看。 |
database |
RDS MySQL中的数据库名。 |
table |
RDS MySQL中的表名。 |
user |
用户名,该用户具有访问上述RDS MySQL中库中的表的权限。 |
password |
user 对应的密码。 |
replace_query |
是否将INSERT INTO查询转换为REPLACE INTO的标志。设置为1,表示替换查询。 |
on_duplicate_clause |
会被添加到INSERT语句中。例如,INSERT INTO t (c1,c2) VALUES ('a', 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1 ,此时需要指定on_duplicate_clause 为UPDATE c2 = c2 + 1 。 |
示例
- 在 RDS MySQL 实例中,创建原始数据表并导入原始数据。
i. 连接 MySQL 实例,详情请参见通过客户端、命令行连接RDS MySQL。
ii. 执行以下命令,创建原始数据表。
CREATE TABLE `origin`.`orders` ( `uid` int(10) unsigned DEFAULT NULL, `date` datetime DEFAULT NULL, `skuId` int(10) unsigned DEFAULT NULL, `order_revenue` int(10) unsigned DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8
iii. 执行以下命令,导入原始数据。
INSERT INTO `origin`.`orders` VALUES(60333391, '2021-08-04 11:26:01', 49358700, 89), (38826285, '2021-08-03 10:47:29', 25166907, 27), (10793515, '2021-07-31 02:10:31', 95584454, 68), (70246093, '2021-08-01 00:00:08', 82355887, 97), (70149691, '2021-08-02 12:35:45', 68748652, 1), (87307646, '2021-08-03 19:45:23', 16898681, 71), (61694574, '2021-08-04 23:23:32', 79494853, 35), (61337789, '2021-08-02 07:10:42', 23792355, 55), (66879038, '2021-08-01 16:13:19', 95820038, 89);
- 在 ClickHouse 集群中,执行以下操作。
i. 使用 SSH 方式登录 ClickHouse 集群,详情请参见登录集群。
ii. 执行以下命令,进入 ClickHouse 客户端。
clickhouse-client
iii. 执行以下命令,创建数据库 mysql。
CREATE DATABASE IF NOT EXISTS mysql;
iv. 执行以下命令,创建表 orders。
CREATE TABLE mysql.orders ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) ENGINE = MySQL('host:port', 'origin', 'orders', 'user', 'password');
v. 执行以下命令,创建数据库 product。
CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
vi. 执行以下命令,创建业务表 orders。
CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}') PARTITION BY toYYYYMMDD(date) ORDER BY toYYYYMMDD(date);
vii. 执行以下命令,创建业务表orders_all。
CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) Engine = Distributed(cluster_emr, product, orders, rand());
viii. 执行以下命令,导入数据。
INSERT INTO product.orders_all SELECT uid, date, skuId, order_revenue FROM mysql.orders;
ix. 执行以下命令,查询数据。
SELECT a.* FROM mysql.orders AS a ANTI LEFT JOIN product.orders_all USING (uid);
说明 查询数据为空时正常。
使用 RDS MySQL 表函数导入数据
语法
mysql('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause'])
其中,涉及参数描述如下表所示。
参数 | 描述 |
host:port |
RDS MySQL的地址,您可以在RDS MySQL管理控制台中的数据库连接中查看。 |
database |
RDS MySQL中的数据库名。 |
table |
RDS MySQL中的表名。 |
user |
用户名,该用户具有访问上述RDS MySQL中库中的表的权限。 |
password |
user 对应的密码。 |
replace_query |
是否将INSERT INTO查询转换为REPLACE INTO的标志。设置为1,表示替换查询。 |
on_duplicate_clause |
会被添加到INSERT语句中。例如,INSERT INTO t (c1,c2) VALUES ('a', 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1 ,此时需要指定on_duplicate_clause 为UPDATE c2 = c2 + 1 。 |
示例
- 在 RDS MySQL 实例中,创建表并插入数据。
i. 连接 MySQL 实例,详情请参见通过客户端、命令行连接RDS MySQL。
ii. 执行以下命令,创建表 orders。
CREATE TABLE `origin`.`orders` ( `uid` int(10) unsigned DEFAULT NULL, `date` datetime DEFAULT NULL, `skuId` int(10) unsigned DEFAULT NULL, `order_revenue` int(10) unsigned DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8
iii. 执行以下命令,插入数据。
INSERT INTO `origin`.`orders` VALUES(60333391, '2021-08-04 11:26:01', 49358700, 89), (38826285, '2021-08-03 10:47:29', 25166907, 27), (10793515, '2021-07-31 02:10:31', 95584454, 68), (70246093, '2021-08-01 00:00:08', 82355887, 97), (70149691, '2021-08-02 12:35:45', 68748652, 1), (87307646, '2021-08-03 19:45:23', 16898681, 71), (61694574, '2021-08-04 23:23:32', 79494853, 35), (61337789, '2021-08-02 07:10:42', 23792355, 55), (66879038, '2021-08-01 16:13:19', 95820038, 89) ;
- 在 ClickHouse 集群中,执行以下操作。
i. 使用 SSH 方式登录 ClickHouse 集群,详情请参见登录集群。
ii. 执行如下命令,进入 ClickHouse 客户端。
clickhouse-client
iii. 执行以下命令,创建数据库 product。
CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
iv.执行以下命令,创建表 orders。
CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}') PARTITION BY toYYYYMMDD(date) ORDER BY toYYYYMMDD(date);
v. 执行以下命令,创建表 orders_all。
CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) Engine = Distributed(cluster_emr, product, orders, rand());
vi. 执行以下命令,导入数据。
INSERT INTO product.orders_all SELECT uid, date, skuId, order_revenue FROM mysql('host:port', 'origin', 'orders', 'user', 'password');
vii. 执行以下命令,查询数据。
SELECT a.* FROM mysql('host:port', 'origin', 'orders', 'user', 'password') AS a ANTI LEFT JOIN product.orders_all USING (uid);
说明 查询数据为空时正常。
如果您需要导出数据,则将业务表数据写入 MySQL 表函数即可。写入命令如下。
INSERT INTO FUNCTION mysql('host:port', 'origin', 'orders', 'user', 'password') FROM product.orders_all;
五、从 Kafka 导入数据至 ClickHouse
——可以通过 Kafka 表引擎导入数据至 ClickHouse 集群
前提条件
使用限制
Kafka集群和ClickHouse集群需要在同一VPC下。
语法
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'host1:port1,host2:port2', kafka_topic_list = 'topic1,topic2,...', kafka_group_name = 'group_name', kafka_format = 'data_format';
其中,涉及参数描述如下表所示。
参数 | 描述 |
db |
数据库名。 |
table_name |
表名。 |
cluster |
集群标识。 |
name1/name2 |
列名。 |
tyep1/type2 |
列的类型。 |
kafka_broker_list |
Kafka Broker的地址及端口。 Kafka集群所有节点的内网IP地址及端口,您可以在EMR控制台集群管理页签中的主机列表页面查看。 |
kafka_topic_list |
订阅的Topic名称。 |
kafka_group_name |
Kafka consumer的分组名称。 |
kafka_format |
数据的类型。例如,CSV和JSONEachRow等,详细信息请参见Formats for Input and Output Data。 |
示例
- 在 ClickHouse 集群中执行以下操作。
i. 使用SSH方式登录 ClickHouse 集群,详情请参见登录集群。
ii. 执行如下命令,进入 ClickHouse 客户端。
clickhouse-client
iii. 执行如下命令,创建数据库 kafka。
CREATE DATABASE IF NOT EXISTS kafka ON CLUSTER cluster_emr;
说明 数据库名您可以自定义。本文示例中的cluster_emr
是集群默认的标识,如果您修改过,请填写正确的集群标识,您也可以在EMR控制台ClickHouse服务的配置页面,在搜索区域搜索clickhouse_remote_servers参数查看。
iv. 执行如下命令,创建 Kafka 表。
CREATE TABLE IF NOT EXISTS kafka.consumer ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) ENGINE = Kafka() SETTINGS kafka_broker_list = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092', kafka_topic_list = 'clickhouse_test', kafka_group_name = 'clickhouse_test', kafka_format = 'CSV';
kafka_broker_list
为 Kafka 集群所有节点的内网 IP 地址及端口,您可以在 EMR 控制台集群管理页签中的主机列表页面查看。其余参数含义请参见语法。
v. 执行如下命令,创建数据库 product。
CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
vi. 执行以下命令,创建本地表。
CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}') PARTITION BY toYYYYMMDD(date) ORDER BY toYYYYMMDD(date);
vii. 执行以下命令,创建分布式表。
CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) Engine = Distributed(cluster_emr, product, orders, rand());
viii. 执行以下命令,创建 MATERIALIZED VIEW 自动导数据。
CREATE MATERIALIZED VIEW IF NOT EXISTS product.kafka_load ON CLUSTER cluster_emr TO product.orders AS SELECT * FROM kafka.consumer;
- 在 Kafka 集群中执行以下操作。
i. 使用 SSH 方式登录 Kafka 集群,详情请参见登录集群。
ii. 在 Kafka 集群的命令行窗口,执行如下命令运行 Kafka 的生产者。
/usr/lib/kafka-current/bin/kafka-console-producer.sh --broker-list 192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092 --topic clickhouse_test
iii. 执行以下命令,输入测试数据。
38826285,2021-08-03 10:47:29,25166907,27 10793515,2021-07-31 02:10:31,95584454,68 70246093,2021-08-01 00:00:08,82355887,97 70149691,2021-08-02 12:35:45,68748652,1 87307646,2021-08-03 19:45:23,16898681,71 61694574,2021-08-04 23:23:32,79494853,35 61337789,2021-08-02 07:10:42,23792355,55 66879038,2021-08-01 16:13:19,95820038,89
- 在 ClickHouse 命令窗口中,执行以下命令,可以查看从 Kafka 中导入至 ClickHouse 集群的数据。
您可以校验查询到的数据与源数据是否一致。
SELECT * FROM product.orders_all;
后续
您已经学习了数据导入,本系列还包括其他内容:
- ClickHouse 概述
- 快速入门
- ClickHouse 运维
- 常见问题
获取更详细的信息,点击下方链接查看:
EMR官网:https://www.aliyun.com/product/emapreduce
EMR ClickHouse :https://help.aliyun.com/document_detail/212195.html
扫描下方二维码加入 EMR 相关产品钉钉交流群一起参与讨论吧!