使用了阿里的包,写入clickhouse 阿里云flink-connector-clickhouse写入ClickHouse https://help.aliyun.com/document_detail/185696.html?spm=5176.12901015.0.i12901015.2b41525cECNyYW&accounttraceid=1ac9126237284ef9b0a25f666c3030dfxaso
测试写入clickhouse ,返回如下,无报错,但并未成功写入,不知从何下手排查,请教各位大佬 +---------------------------------------------+ | default_catalog.default_database.sink_table | +---------------------------------------------+ | -1 | +---------------------------------------------+
代码如下 package com.daniel import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.sources._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.api._ import org.apache.flink.types.Row import org.apache.flink.table.api.{ TableEnvironment, TableSchema, Types, ValidationException }
object StreamingJob { def main(args: Array[String]) { val SourceCsvPath = "/Users/flink-sql-demo/flink-sql-source.csv"
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableClosureCleaner
val tEnv = StreamTableEnvironment.create(env)
val csvTableSource = CsvTableSource .builder() .path(SourceCsvPath) .ignoreFirstLine() .fieldDelimiter(",") .field("name", DataTypes.STRING()) .field("age", DataTypes.BIGINT()) // .field("sex", DataTypes.STRING()) // .field("grade", DataTypes.INT()) .field("rate", DataTypes.FLOAT()) .build()
tEnv.registerTableSource("source", csvTableSource)
val create_sql = s""" | CREATE TABLE sink_table ( | name VARCHAR |) WITH ( | 'connector' = 'clickhouse', | 'url' = 'clickhouse://****:8080', | 'username' = '', | 'password' = '', | 'database-name' = '**', | 'table-name' = 'live.d_sink_table', | 'sink.batch-size' = '1', | 'sink.partition-strategy' = 'hash', | 'sink.partition-key' = 'name' |) |""".stripMargin
tEnv.executeSql(create_sql);
val result = tEnv.executeSql( "INSERT INTO sink_table SELECT name FROM source" ) result.print() }
}*来自志愿者整理的flink邮件归档
换个第三方工具看看 https://github.com/blynkkk/clickhouse4j
cc.blynk.clickhouse clickhouse4j 1.4.4 *来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。