FlinkX 如何读取和写入 Clickhouse?

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 本文将主要介绍 FlinkX 读取和写入 Clickhouse 的过程及相关参数,核心内容将围绕以下3个问题:1. FlinkX读写Clickhouse支持哪个版本?、2. ClickHouse读写Clickhouse有哪些参数?、3. ClickHouse读写Clickhouse参数都有哪些说明?

本文将主要介绍 FlinkX 读取和写入 Clickhouse 的过程及相关参数,核心内容将围绕以下3个问题,FlinkX 插件下载:

https://github.com/DTStack/flinkx

  1. FlinkX读写Clickhouse支持哪个版本?
  2. ClickHouse读写Clickhouse有哪些参数?
  3. ClickHouse读写Clickhouse参数都有哪些说明?

ClickHouse 读取

一、插件名称

名称:clickhousereader

二、支持的数据源版本

ClickHouse 19.x及以上

三、参数说明

「jdbcUrl」

  • 描述:针对关系型数据库的jdbc连接字符串
  • jdbcUrl参考文档:clickhouse-jdbc官方文档
  • 必选:是
  • 默认值:无

「username」

  • 描述:数据源的用户名
  • 必选:是
  • 默认值:无

「password」

  • 描述:数据源指定用户名的密码
  • 必选:是
  • 默认值:无

「where」

  • 描述:筛选条件,reader插件根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create > time。
  • 注意:不可以将where条件指定为limit 10,limit不是SQL的合法where子句。
  • 必选:否
  • 默认值:无

「splitPk」

  • 描述:当speed配置中的channel大于1时指定此参数,Reader插件根据并发数和此参数指定的字段拼接sql,使每个并发读取不同的数据,提升读取速率。注意:推荐splitPk使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。目前splitPk仅支持整形数据切分,不支持浮点、字符串、日期等其他类型。如果用户指定其他非支持类型,FlinkX将报错!如果channel大于1但是没有配置此参数,任务将置为失败。
  • 必选:否
  • 默认值:无

「fetchSize」

  • 描述:读取时每批次读取的数据条数。
  • 注意:此参数的值不可设置过大,否则会读取超时,导致任务失败。
  • 必选:否
  • 默认值:1000

「queryTimeOut」

  • 描述:查询超时时间,单位秒。
  • 注意:当数据量很大,或者从视图查询,或者自定义sql查询时,可通过此参数指定超时时间。
  • 必选:否
  • 默认值:1000

「customSql」

  • 描述:自定义的查询语句,如果只指定字段不能满足需求时,可通过此参数指定查询的sql,可以是任意复杂的查询语句。注意:只能是查询语句,否则会导致任务失败;查询语句返回的字段需要和column列表里的字段严格对应;当指定了此参数时,connection里指定的table无效;当指定此参数时,column必须指定具体字段信息,不能以*号代替;
  • 必选:否
  • 默认值:无

「column」

  • 描述:需要读取的字段。
  • 格式:支持3种格式

1.读取全部字段,如果字段数量很多,可以使用下面的写法:

"column":["*"]

2.只指定字段名称:

"column":["id","name"]

3.指定具体信息:

"column": [{
    "name": "col",
    "type": "datetime",
    "format": "yyyy-MM-dd hh:mm:ss",
    "value": "value"
}]

属性说明:

  1. name:字段名称
  2. type:字段类型,可以和数据库里的字段类型不一样,程序会做一次类型转换
  3. format:如果字段是时间字符串,可以指定时间的格式,将字段类型转为日期格式返回
  4. value:如果数据库里不存在指定的字段,则会报错。如果指定的字段存在,当指定字段的值为null时,会以此value值作为默认值返回
  • 必选:是
  • 默认值:无

「polling」

  • 描述:是否开启间隔轮询,开启后会根据pollingInterval轮询间隔时间周期性的从数据库拉取数据。开启间隔轮询还需配置参数pollingInterval,increColumn,可以选择配置参数startLocation。若不配置参数startLocation,任务启动时将会从数据库中查询增量字段最大值作为轮询的开始位置。
  • 必选:否
  • 默认值:false

「pollingInterval」

描述:轮询间隔时间,从数据库中拉取数据的间隔时间,默认为5000毫秒。
必选:否
默认值:5000

「requestAccumulatorInterval」

  • 描述:发送查询累加器请求的间隔时间。
  • 必选:否
  • 默认值:2

配置示例

1、基础配置

{
  "job": {
    "content": [{
      "reader": {
        "parameter" : {
          "column" : [ {
            "name" : "id",
            "type" : "bigint",
            "key" : "id"
          }, {
            "name" : "user_id",
            "type" : "bigint",
            "key" : "user_id"
          }, {
            "name" : "name",
            "type" : "varchar",
            "key" : "name"
          } ],
          "username" : "username",
          "password" : "password",
          "connection" : [ {
            "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
            "table" : [ "tableTest" ]
          } ],
          "where": "id > 1",
          "splitPk": "id",
          "fetchSize": 1000,
          "queryTimeOut": 1000,
          "customSql": "",
          "requestAccumulatorInterval": 2
        },
        "name" : "clickhousereader"
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      }
    }
  }
}

2、多通道

{
  "job": {
    "content": [{
      "reader": {
        "parameter" : {
          "column" : [ {
            "name" : "id",
            "type" : "bigint",
            "key" : "id"
          }, {
            "name" : "user_id",
            "type" : "bigint",
            "key" : "user_id"
          }, {
            "name" : "name",
            "type" : "varchar",
            "key" : "name"
          } ],
          "username" : "username",
          "password" : "password",
          "connection" : [ {
            "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
            "table" : [ "tableTest" ]
          } ],
          "where": "id > 1",
          "splitPk": "id",
          "fetchSize": 1000,
          "queryTimeOut": 1000,
          "customSql": "",
          "requestAccumulatorInterval": 2
        },
        "name" : "clickhousereader"
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }],
    "setting": {
      "speed": {
        "channel": 3,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      }
    }
  }
}

3、指定customSql

{
  "job": {
    "content": [{
      "reader": {
        "parameter" : {
          "column" : [ {
            "name" : "id",
            "type" : "bigint",
            "key" : "id"
          }, {
            "name" : "user_id",
            "type" : "bigint",
            "key" : "user_id"
          }, {
            "name" : "name",
            "type" : "varchar",
            "key" : "name"
          } ],
          "username" : "username",
          "password" : "password",
          "connection" : [ {
            "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
            "table" : [ "tableTest" ]
          } ],
          "where": "id > 1",
          "splitPk": "id",
          "fetchSize": 1000,
          "queryTimeOut": 1000,
          "customSql": "select id from tableTest",
          "requestAccumulatorInterval": 2
        },
        "name" : "clickhousereader"
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      }
    }
  }
}

4、增量同步指定startLocation

{
  "job": {
    "content": [{
      "reader": {
        "parameter" : {
          "column" : [ {
            "name" : "id",
            "type" : "bigint",
            "key" : "id"
          }, {
            "name" : "user_id",
            "type" : "bigint",
            "key" : "user_id"
          }, {
            "name" : "name",
            "type" : "varchar",
            "key" : "name"
          } ],
          "username" : "username",
          "password" : "password",
          "connection" : [ {
            "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
            "table" : [ "tableTest" ]
          } ],
          "where": "id > 1",
          "splitPk": "id",
          "fetchSize": 1000,
          "queryTimeOut": 1000,
          "customSql": "",
          "increColumn": "id",
          "startLocation": "20",
          "requestAccumulatorInterval": 2
        },
        "name" : "clickhousereader"
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      }
    }
  }
}

5、间隔轮询

{
  "job": {
    "content": [{
      "reader": {
        "parameter" : {
          "column" : [ {
            "name" : "id",
            "type" : "bigint",
            "key" : "id"
          }, {
            "name" : "user_id",
            "type" : "bigint",
            "key" : "user_id"
          }, {
            "name" : "name",
            "type" : "varchar",
            "key" : "name"
          } ],
          "username" : "username",
          "password" : "password",
          "connection" : [ {
            "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
            "table" : [ "tableTest" ]
          } ],
          "where": "id > 1",
          "splitPk": "id",
          "fetchSize": 1000,
          "queryTimeOut": 1000,
          "customSql": "",
          "requestAccumulatorInterval": 2,
          "polling": true,
          "pollingInterval": 3000
        },
        "name" : "clickhousereader"
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      }
    }
  }
}

ClickHouse 写入

一、插件名称

名称:clickhousewriter

二、支持的数据源版本

ClickHouse 19.x及以上

三、参数说明

「jdbcUrl」

  • 描述:针对关系型数据库的jdbc连接字符串
  • 必选:是
  • 默认值:无

「username」

  • 描述:数据源的用户名
  • 必选:是
  • 默认值:无

「password」

  • 描述:数据源指定用户名的密码
  • 必选:是
  • 默认值:无

「column」

  • 描述:目的表需要写入数据的字段,字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。
  • 必选:是
  • 默认值:否
  • 默认值:无

「preSql」

  • 描述:写入数据到目的表前,会先执行这里的一组标准语句
  • 必选:否
  • 默认值:无

「postSql」

  • 描述:写入数据到目的表后,会执行这里的一组标准语句
  • 必选:否
  • 默认值:无

「table」

  • 描述:目的表的表名称。目前只支持配置单个表,后续会支持多表
  • 必选:是
  • 默认值:无

「writeMode」

  • 描述:控制写入数据到目标表采用 insert into 语句,只支持insert操作
  • 必选:是
  • 所有选项:insert
  • 默认值:insert

「batchSize」

  • 描述:一次性批量提交的记录数大小,该值可以极大减少FlinkX与数据库的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成FlinkX运行进程OOM情况
  • 必选:否
  • 默认值:1024

文章来源如下,感兴趣的同学可查看原文:
https://www.aboutyun.com/forum.php?mod=viewthread&tid=29271

更多 Flink 技术问题可在钉钉群交流

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
缓存 API 流计算
Flink--7、窗口(窗口的概念、分类、API、分配器、窗口函数)、触发器、移除器
Flink--7、窗口(窗口的概念、分类、API、分配器、窗口函数)、触发器、移除器
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
875 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
分布式计算 容灾 大数据
MaxCompute( 原名ODPS)大数据容灾方案与实现(及项目落地实例)专有云
一,背景与概述    复杂系统的灾难恢复是个难题,具有海量数据及复杂业务场景的大数据容灾是个大难题。    MaxCompute是集团内重要数据平台,是自主研发的大数据解决方案,其规模和稳定性在业界都是领先的。
2797 17
|
5月前
|
druid Java 应用服务中间件
五大主流数据库连接池的深度剖析与对比
HikariCP通过优化concurrentBag和fastStatementList等集合,提升了并发的读写效率。它采用threadlocal缓存连接,并大量运用CAS机制,以最大程度地减少lock的使用。从字节码的维度进行代码优化,确保方法尽量控制在35个字节码以内,以提升JVM处理效率。HikariCP在此基础上的进一步优化措施包括:利用ping命令进行mysql连接,以及通过Sharding-JDBC的Driver、Server和Sidecar三个版本,构建灵活多样的生态系统,满足不同需求和环境。对于线上应用,Sharding-JDBC-Driver可提供直连数据库的最优性能,而Sha
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
764 0
|
12月前
|
机器学习/深度学习 数据采集 算法
多维偏好分析及其在实际决策中的应用:基于PCA-KMeans的数据降维与模式识别方法
多维偏好分析(MPA)是市场营销、心理学和公共政策等领域广泛应用的工具,用于研究复杂偏好决策过程。本文通过主成分分析(PCA)和K均值聚类算法对鸢尾花数据集进行降维和模式识别,展示了PCA在保留95.8%方差的同时实现物种分类的有效性,K均值聚类结果与实际物种分类高度一致。该方法揭示了高维数据中的隐含模式,为各领域的实际决策提供了可靠的分析框架,具有重要的应用价值。研究表明,PCA和聚类分析能够有效简化和理解高维偏好数据,帮助决策者制定更有针对性的策略。
450 3
|
机器学习/深度学习 数据采集 分布式计算
【机器学习】Spark ML 对数据进行规范化预处理 StandardScaler 与向量拆分
标准化Scaler是数据预处理技术,用于将特征值映射到均值0、方差1的标准正态分布,以消除不同尺度特征的影响,提升模型稳定性和精度。Spark ML中的StandardScaler实现此功能,通过`.setInputCol`、`.setOutputCol`等方法配置并应用到DataFrame数据。示例展示了如何在Spark中使用StandardScaler进行数据规范化,包括创建SparkSession,构建DataFrame,使用VectorAssembler和StandardScaler,以及将向量拆分为列。规范化有助于降低特征重要性,提高模型训练速度和计算效率。
576 6
|
SQL 消息中间件 分布式计算
大数据-141 - ClickHouse 集群 副本和分片 Zk 的配置 Replicated MergeTree原理详解(一)
大数据-141 - ClickHouse 集群 副本和分片 Zk 的配置 Replicated MergeTree原理详解(一)
349 0
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何配置Connector来保持与MySOL一致
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
安全 网络安全 数据安全/隐私保护