Flink 数据源 DataSource是这个样子的?(二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 因为本篇文章中,有个 Kafka 数据源的 Demo,在一开始解答小伙伴有可能的困惑:

6、自定义 DataSource

从前面介绍中看到,Flink 提供了一个 addSource(SourceFunction<OUT>) 的方法,其中 SourceFunction 是实现自定义数据源的关键接口,而我们常用来扩展的是它的抽象子类 RichSourceFunction


6.1、RichSourceFunction

进行自定义扩展数据源前,来看下这个类的继承体系:18.jpg

下面是我测试的一个场景:

  1. 启动 Redis,手动不断设置某个 key的值,模拟应用不断对它的修改
  2. Flink 读取 Redis 数据源,进行数据加工
  3. 存储加工后的数据(例如放入数据库或者简单打印出来)

于是乎,创建了一个自定义的 Redis 数据源,重写上面图中提到的方法

MyRedisDataSourceFunction.java

public class MyRedisDataSourceFunction extends RichSourceFunction<String> {
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // noop
    }
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (true) {
            String maxNumber = RedisUtils.get("maxNumber", String.class);
            ctx.collect(StringUtils.isBlank(maxNumber) ? "0" : maxNumber);
            // 隔 1 s 执行程序
            Thread.sleep(1000);
        }
    }
    @Override
    public void cancel() {
        // noop
    }
    @Override
    public void close() throws Exception {
        super.close();
        RedisUtils.close();
    }
}

从上面代码可以看出,我在 run 方法中,通过 while 循环,不断从 Redis中获取数据,关于缓存的相关操作,封装到了 RedisUtils,感兴趣的可以下载项目来看看。

由于偷懒,opencancel 是没有做操作,在关闭方法中,也只是简单释放了 jedis 连接。


6.2、验证自定义数据源结果

DataSourceFromRedis.java

public class DataSourceFromRedis {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> customSource = 
            env.addSource(new MyRedisDataSourceFunction());
        SingleOutputStreamOperator<String> operator = customSource
                .map((MapFunction<String, String>) value -> "当前最大值为 : " + value);
        operator.print();
        env.execute("test custom redis datasource function");
    }
}

上面代码,主要核心在于 env.addSource(new MyRedisDataSourceFunction()),从我们自定义的 Redis 数据源中获取数据,编写好代码后,进行打包并通过 flink run 执行。

为了方便,我直接在本地 IDEA 中,点击了绿色执行按钮,进行本地调试,接着来修改数据源和查看输出结果。

一、修改 Redis 中的数据

$ redis-cli -h localhost -p 6379
> set maxNumber 100
> set maxNumber 200
> set maxNumber 300
> set maxNumber 400

二、查看控制台输出结果

3> 当前最大值为 : 100
4> 当前最大值为 : 100
6> 当前最大值为 : 200
7> 当前最大值为 : 200
1> 当前最大值为 : 200
2> 当前最大值为 : 300
....

可以看到数据源的修改,我们的程序能够正常接收到并进行处理。当然这个 Demo 只是用来演示,用来演示我们可以基于变动的数据源进行更多复杂的操作,从而来达到数据处理想要的目的。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4月前
|
消息中间件 关系型数据库 MySQL
Flink数据源问题之转换异常如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
121 2
|
4月前
|
SQL 消息中间件 关系型数据库
Flink数据源问题之读取mysql报错如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
|
4月前
|
消息中间件 SQL Kafka
Flink数据源问题之定时扫描key如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
|
4月前
|
存储 Oracle 关系型数据库
Flink CDC 数据源问题之连接释放冲突如何解决
Flink CDC数据源指的是使用Apache Flink的CDC特性来连接并捕获外部数据库变更数据的数据源;本合集将介绍如何配置和管理Flink CDC数据源,以及解决数据源连接和同步过程中遇到的问题。
147 0
|
4月前
|
API 数据库 流计算
有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
【2月更文挑战第27天】有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
109 3
|
4月前
|
关系型数据库 MySQL OLAP
实时计算 Flink版产品使用合集之可以支持 MySQL 数据源的增量同步到 Hudi 吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
消息中间件 SQL Kubernetes
实时计算 Flink版产品使用合集之多线程环境中,遇到 env.addSource 添加数据源后没有执行到 env.execut,是为什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用问题之如何对接Oracle数据源
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之是否支持异构数据源之间的数据映射关系
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用问题之在重试失败后如何通过回调的方式来手动关闭数据源连接
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。