开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

时序数据库influxdb2的Flink CDC。有做过么?

时序数据库influxdb2的Flink CDC。有大佬做过么?

展开
收起
cuicuicuic 2023-10-22 22:39:18 129 0
2 条回答
写回答
取消 提交回答
  • Flink CDC目前并不直接支持InfluxDB 2.0。InfluxDB 2.0使用的是TSI(Time Series Index)作为其存储引擎,这与传统的数据库有所不同,因此可能需要特殊的适配器来实现CDC。

    然而,有一些开源项目正在尝试实现这一点。例如,InfluxDB-Flink项目就是由InfluxData公司开发的一个Flink connector,它支持InfluxDB 2.0的CDC。

    此外,你也可以考虑使用InfluxDB Cloud Sync服务,它可以将数据从InfluxDB 2.0复制到另一个支持CDC的数据库,如PostgreSQL或MySQL,然后再使用Flink的CDC connector进行数据处理。

    2023-10-23 10:34:13
    赞同 展开评论 打赏
  • Flink CDC 可以支持 InfluxDB 2.x 版本,但是需要使用一个专门的流式连接器来实现。这个连接器是 Apache Bahir 项目的一部分,它提供了一个 Source 和一个 Sink,分别可以从 InfluxDB 读取和写入数据。

    这个连接器的使用方法如下:

    • 项目中添加以下依赖:

    org.apache.bahir
    flink-connector-influxdb2_2.12
    1.1-SNAPSHOT
    • Flink 程序中创建一个 InfluxDBSource 或 InfluxDBSink,并提供相应的参数和接口实现。例如:

    InfluxDBSource influxDBSource = InfluxBSource.builder()
    .setDeserializer(new TestDeserializer())
    .build();

    InfluxDBSink influxDBSink = InfluxDBSink.builder()
    .setInfluxDBSchemaSerializer(new TestSerializer())
    .setInfluxDBUrl("http://localhost:8086")
    .setUsername("username")
    .setPassword("password")
    .setBucket("bucket")
    .setOrganization("organization")
    .build();

    • 将 Source 或 Sink 添加到DataStream 或 Table API 中,并执行 Flink 程序。
    2023-10-23 09:52:24
    赞同 1 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载