请问Flink有没有clickhouse的连接器分享下呗?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
Flink 提供了与 ClickHouse 集成的连接器,支持通过 JDBC Connector 或 Flink SQL 的方式将数据写入 ClickHouse。以下是关于 Flink 与 ClickHouse 连接器的详细信息和使用方法:
ClickHouse 是一个高性能的列式数据库管理系统,主要用于联机分析(OLAP)。Flink 提供了对 ClickHouse 的支持,主要通过以下两种方式实现数据写入: - JDBC Connector:适用于不同版本的 Flink。 - Flink SQL:通过声明式 SQL 语句直接操作 ClickHouse。
根据知识库资料,ClickHouse 连接器支持以下特性: - 支持类型:仅支持结果表。 - 运行模式:支持流模式和批模式。 - API 类型:支持 SQL。 - 更新或删除数据:当 Flink 结果表的 DDL 上指定了 Primary Key,且参数 ignoreDelete
设置为 false
时,支持更新或删除结果表数据,但性能会显著下降。
此外,ClickHouse 连接器还具有以下特色功能: - 支持直接写入 ClickHouse 的分布式表对应的本地表。 - 对于 EMR 的 ClickHouse,提供 Exactly Once 的语义。
Flink 的 JDBC Connector 在 1.11.0 版本进行了重大重构,分为两个主要版本: - 1.10.1 及之前版本:包名为 flink-jdbc
,支持 Table API (Legacy)。 - 1.11.0 及之后版本:包名为 flink-connector-jdbc
,支持 DataStream 和 Table API (DDL)。
在 Flink 1.10.1 及更早版本中,可以通过 flink-jdbc
包结合 Table API 实现数据写入。以下是具体步骤:
创建项目
使用 Maven 命令创建项目:
mvn archetype:generate
添加依赖
在 pom.xml
文件中添加以下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.10.1</version>
</dependency>
编写程序
示例代码如下:
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
val sink = JDBCAppendTableSink.builder()
.setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
.setDBUrl("jdbc:clickhouse://<clickhouse-host>:<port>/<database>")
.setUsername("<username>")
.setPassword("<password>")
.setQuery("INSERT INTO sink_table (name, grade, rate) VALUES (?, ?, ?)")
.setBatchSize(500)
.setParameterTypes(Types.STRING, Types.LONG, Types.FLOAT)
.build()
tEnv.registerTableSink("sink", Array("name", "grade", "rate"), Array(Types.STRING, Types.LONG, Types.FLOAT), sink)
tEnv.insertInto(resultTable, "sink")
编译运行
执行以下命令编译并运行项目:
mvn clean package
flink run target/example-0.1.jar
在 Flink 1.11.0 及之后版本中,推荐使用 flink-connector-jdbc
包。以下是具体步骤:
创建项目
同样使用 Maven 创建项目。
添加依赖
在 pom.xml
文件中添加以下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.11.0</version>
</dependency>
编写程序
示例代码如下:
import org.apache.flink.connector.jdbc.JdbcSink
import java.sql.PreparedStatement
class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] {
def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = {
ps.setString(1, v._1)
ps.setLong(2, v._2)
ps.setFloat(3, v._3)
}
}
val sink = JdbcSink.sink(
"INSERT INTO sink_table (name, grade, rate) VALUES (?, ?, ?)",
new CkSinkBuilder(),
JdbcExecutionOptions.builder().withBatchSize(500).build(),
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:clickhouse://<clickhouse-host>:<port>/<database>")
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUsername("<username>")
.withPassword("<password>")
.build()
)
stream.addSink(sink)
编译运行
执行以下命令编译并运行项目:
mvn clean package
flink run target/example-0.1.jar
除了编程方式,还可以通过 Flink SQL 将数据写入 ClickHouse。以下是具体步骤:
创建源表和结果表
在 Flink SQL 中定义源表和结果表:
CREATE TEMPORARY TABLE source_table (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '50'
);
CREATE TEMPORARY TABLE clickhouse_output (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = 'jdbc:clickhouse://<clickhouse-host>:<port>',
'userName' = '<username>',
'password' = '<password>',
'tableName' = 'sink_table'
);
插入数据
将源表数据插入到结果表中:
INSERT INTO clickhouse_output
SELECT id, name, age, rate
FROM source_table;
启动作业
在 Flink 控制台中保存、验证并上线作业。
ignoreDelete=false
,但请注意这会导致性能下降。flink-jdbc
或 flink-connector-jdbc
)。通过以上方法,您可以轻松实现 Flink 与 ClickHouse 的集成,并高效地将数据写入 ClickHouse。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。