Flink 数据源 DataSource是这个样子的?(二)

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
实时计算 Flink 版,5000CU*H 3个月
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 因为本篇文章中,有个 Kafka 数据源的 Demo,在一开始解答小伙伴有可能的困惑:

6、自定义 DataSource

从前面介绍中看到,Flink 提供了一个 addSource(SourceFunction<OUT>) 的方法,其中 SourceFunction 是实现自定义数据源的关键接口,而我们常用来扩展的是它的抽象子类 RichSourceFunction


6.1、RichSourceFunction

进行自定义扩展数据源前,来看下这个类的继承体系:18.jpg

下面是我测试的一个场景:

  1. 启动 Redis,手动不断设置某个 key的值,模拟应用不断对它的修改
  2. Flink 读取 Redis 数据源,进行数据加工
  3. 存储加工后的数据(例如放入数据库或者简单打印出来)

于是乎,创建了一个自定义的 Redis 数据源,重写上面图中提到的方法

MyRedisDataSourceFunction.java

public class MyRedisDataSourceFunction extends RichSourceFunction<String> {
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // noop
    }
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (true) {
            String maxNumber = RedisUtils.get("maxNumber", String.class);
            ctx.collect(StringUtils.isBlank(maxNumber) ? "0" : maxNumber);
            // 隔 1 s 执行程序
            Thread.sleep(1000);
        }
    }
    @Override
    public void cancel() {
        // noop
    }
    @Override
    public void close() throws Exception {
        super.close();
        RedisUtils.close();
    }
}

从上面代码可以看出,我在 run 方法中,通过 while 循环,不断从 Redis中获取数据,关于缓存的相关操作,封装到了 RedisUtils,感兴趣的可以下载项目来看看。

由于偷懒,opencancel 是没有做操作,在关闭方法中,也只是简单释放了 jedis 连接。


6.2、验证自定义数据源结果

DataSourceFromRedis.java

public class DataSourceFromRedis {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> customSource = 
            env.addSource(new MyRedisDataSourceFunction());
        SingleOutputStreamOperator<String> operator = customSource
                .map((MapFunction<String, String>) value -> "当前最大值为 : " + value);
        operator.print();
        env.execute("test custom redis datasource function");
    }
}

上面代码,主要核心在于 env.addSource(new MyRedisDataSourceFunction()),从我们自定义的 Redis 数据源中获取数据,编写好代码后,进行打包并通过 flink run 执行。

为了方便,我直接在本地 IDEA 中,点击了绿色执行按钮,进行本地调试,接着来修改数据源和查看输出结果。

一、修改 Redis 中的数据

$ redis-cli -h localhost -p 6379
> set maxNumber 100
> set maxNumber 200
> set maxNumber 300
> set maxNumber 400

二、查看控制台输出结果

3> 当前最大值为 : 100
4> 当前最大值为 : 100
6> 当前最大值为 : 200
7> 当前最大值为 : 200
1> 当前最大值为 : 200
2> 当前最大值为 : 300
....

可以看到数据源的修改,我们的程序能够正常接收到并进行处理。当然这个 Demo 只是用来演示,用来演示我们可以基于变动的数据源进行更多复杂的操作,从而来达到数据处理想要的目的。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
7月前
|
消息中间件 关系型数据库 MySQL
Flink数据源问题之转换异常如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
137 2
|
7月前
|
SQL 消息中间件 关系型数据库
Flink数据源问题之读取mysql报错如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
124 0
|
7月前
|
消息中间件 SQL Kafka
Flink数据源问题之定时扫描key如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
|
7月前
|
存储 Oracle 关系型数据库
Flink CDC 数据源问题之连接释放冲突如何解决
Flink CDC数据源指的是使用Apache Flink的CDC特性来连接并捕获外部数据库变更数据的数据源;本合集将介绍如何配置和管理Flink CDC数据源,以及解决数据源连接和同步过程中遇到的问题。
177 0
|
7月前
|
API 数据库 流计算
有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
【2月更文挑战第27天】有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
147 3
|
7月前
|
关系型数据库 MySQL OLAP
实时计算 Flink版产品使用合集之可以支持 MySQL 数据源的增量同步到 Hudi 吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
消息中间件 SQL Kubernetes
实时计算 Flink版产品使用合集之多线程环境中,遇到 env.addSource 添加数据源后没有执行到 env.execut,是为什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
48 0
|
5月前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用问题之如何对接Oracle数据源
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之是否支持异构数据源之间的数据映射关系
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
下一篇
DataWorks