Flink 1.14将数据写入InfluxDB 2.1.1

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: InfluxDB作为时序数据库,在与时间相关的数据记录中,发挥着巨大的作用。下文以flink为例,通过参考Flink第三方扩展(https://github.com/apache/bahir-flink/tree/master/flink-connector-influxdb2).自定义source将数据写入influxDB 2.1.1中。

InfluxDB作为时序数据库,在与时间相关的数据记录中,发挥着巨大的作用。下文以flink为例,通过参考Flink第三方扩展(https://github.com/apache/bahir-flink/tree/master/flink-connector-influxdb2).

自定义source将数据写入influxDB 2.1.1中。

在完成以下工作时,请确保您已经安装并配置了InfluxDB 2.1.1,如果您还未安装配置,可参考以下文章(https://lrting-top.blog.csdn.net/article/details/122270992):

代码修改

当前版本的 bahir-flink对influxdb的支持为2.0.0,如果直接使用该版本,则会出现认证不通过的情况,此时需要修改部分代码,使用token的认证方式。

具体为,InfluxDBSinkBuilder类中的getInfluxDBClient方法,修改为:

    public static InfluxDBClient getInfluxDBClient(final Configuration configuration) {
        final String url = configuration.getString(INFLUXDB_URL);
        final String bucket = configuration.getString(INFLUXDB_BUCKET);
        final String organization = configuration.getString(INFLUXDB_ORGANIZATION);
        final String token = configuration.getString(INFLUXDB_TOKEN);
        final InfluxDBClientOptions influxDBClientOptions =
                InfluxDBClientOptions.builder()
                        .url(url)
                        .authenticateToken(token.toCharArray())
                        .bucket(bucket)
                        .org(organization)
                        .build();
        return InfluxDBClientFactory.create(influxDBClientOptions);
    }

完整代码可参考(https://git.lrting.top/xiaozhch5/drfix):

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
消息中间件 关系型数据库 Kafka
flink cdc 数据问题之数据丢失如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
101 0
|
1月前
|
关系型数据库 MySQL Java
flink cdc 同步问题之多表数据如何同步
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
监控 关系型数据库 MySQL
Flink CDC产品常见问题之flink-cdc任务抓取全量的mysql数据不生效如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
消息中间件 SQL JSON
Flink问题之source并行度不同导致任务没有数据落地如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
331 0
|
1月前
|
API 数据库 流计算
有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
【2月更文挑战第27天】有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
50 3
|
1月前
|
Oracle 关系型数据库 MySQL
Flink CDC产品常见问题之flink Oraclecdc 捕获19C数据时报错错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
消息中间件 关系型数据库 Kafka
Flink CDC产品常见问题之 Oraclecdc JdbcIncrementalSource 捕获不到数据如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
消息中间件 Kafka Apache
Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
【2月更文挑战第6天】Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
71 2
|
1月前
|
分布式计算 Hadoop Java
Flink CDC产品常见问题之tidb cdc 数据量大了就疯狂报空指针如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
资源调度 关系型数据库 测试技术
Flink CDC产品常见问题之没有报错但是一直监听不到数据如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。