Flink 数据源 DataSource是这个样子的?(三)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 因为本篇文章中,有个 Kafka 数据源的 Demo,在一开始解答小伙伴有可能的困惑:

7、更多自定义数据源(如 Kafka)

例如在收集日志时,Kafka 消息中间件用得比较多,可以通过官方集成的方法 new FlinkKafkaConsumer 进行添加 Kafka 数据源

测试类位置在:

cn.sevenyuan.datasource.custom.DataSourceFromKafka

DataStreamSource<String> dataStreamSource = env.addSource(
    new FlinkKafkaConsumer<String>(
            KafkaUtils.TOPIC_STUDENT,
            new SimpleStringSchema(),
            props
    )).setParallelism(1);

7.1、测试场景示意19.jpg


测试场景如上图,模拟一个 A 应用系统,不断的往 Kafka 发送消息,接着我们的 Flink 监听到 Kafka 的数据变动,搜集在一个时间窗口内(例如 10s)的数据,对窗口内的数据进行转换操作,最后进行存储(简单演示,使用的是 Print 打印)



7.2、前置环境安装和启动


如果在本地测试 Kafka 数据源,需要做这三步前置操作:

1. 安装 ZooKeeper,启动命令: zkServer start2. 安装 Kafka,启动命令:kafka-server-start /usr/local/etc/kafka/server.properties3. 安装 Flink,启动单机集群版的命令 /usr/local/Cellar/apache-flink/1.9.0/libexec/bin/start-cluster.sh

7.3、模拟应用系统

在终端中,通过 Kafka 命令创建名字为 studentTopic


$kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic student

启动以下代码的 main 方法,通过 while 循环,每隔 3s 往 kafka 发送一条消息:

KafkaUtils.java

public class KafkaUtils {
    public static final String BROKER_LIST = "localhost:9092";
    public static final String TOPIC_STUDENT = "student";
    public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
    public static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
    public static void writeToKafka() throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", BROKER_LIST);
        props.put("key.serializer", KEY_SERIALIZER);
        props.put("value.serializer", VALUE_SERIALIZER);
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        // 制造传递的对象
        int randomInt = RandomUtils.nextInt(1, 100);
        Student stu = new Student(randomInt, "name" + randomInt, randomInt, "=-=");
        stu.setCheckInTime(new Date());
        // 发送数据
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_STUDENT, null, null, JSON.toJSONString(stu));
        producer.send(record);
        System.out.println("kafka 已发送消息 : " + JSON.toJSONString(stu));
        producer.flush();
    }
    public static void main(String[] args) throws Exception {
        while (true) {
            Thread.sleep(3000);
            writeToKafka();
        }
    }
}

在该工具类中,设定了很多静态变量,例如主题名字、key 序列化类、value的序列化类,之后可以在其它类中进行复用。

点击 main 方法后,可以在控制台终端看到每隔三秒(checkInTime 间隔时间),我们的消息成功的发送出去了。

kafka 已发送消息 : {"address":"=-=","age":49,"checkInTime":1571845900050,"id":49,"name":"name49"}
kafka 已发送消息 : {"address":"=-=","age":92,"checkInTime":1571845903371,"id":92,"name":"name92"}
kafka 已发送消息 : {"address":"=-=","age":72,"checkInTime":1571845906391,"id":72,"name":"name72"}
kafka 已发送消息 : {"address":"=-=","age":19,"checkInTime":1571845909413,"id":19,"name":"name19"}
kafka 已发送消息 : {"address":"=-=","age":34,"checkInTime":1571845912435,"id":34,"name":"name34"}

7.4、启动 Flink 程序

DataSourceFromKafka.java

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 省略 kafka 的参数配置,具体请看代码
    Properties props = new Properties();
    DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer<String>(
            KafkaUtils.TOPIC_STUDENT,
            new SimpleStringSchema(),
            props
    )).setParallelism(1);
    // 数据转换 & 打印
    // 从 kafka 读数据,然后进行 map 映射转换
    DataStream<Student> dataStream = dataStreamSource.map(value -> JSONObject.parseObject(value, Student.class));
    // 不需要 keyBy 分类,所以使用 windowAll,每 10s 统计接收到的数据,批量插入到数据库中
    dataStream
            .timeWindowAll(Time.seconds(10))
            .apply(new AllWindowFunction<Student, List<Student>, TimeWindow>() {
                @Override
                public void apply(TimeWindow window, Iterable<Student> values, Collector<List<Student>> out) throws Exception {
                    List<Student> students = Lists.newArrayList(values);
                    if (students.size() > 0) {
                        System.out.println("最近 10 秒汇集到 " + students.size() + " 条数据");
                        out.collect(students);
                    }
                }
            })
            .print();
        env.execute("test custom kafka datasource");
    }

上述代码主要有三个步骤,获取 Kafka数据源 —> 数据转换(通过 map 映射操作,时间窗口搜集数据) —> 最后的数据存储(简单的 print)。

点击执行代码后,我们就能在控制台中看到如下输出结果:

20.jpg

可以看到,按照发送消息的速度,我们能够在 10s 内搜集到 3-4 条数据,从输出结果能够验证 Flink 程序的正确性。

安装操作请参考网上资源,更多详细添加 Kafka 数据源的操作可以看项目中的测试类 DataSourceFromKafkazhisheng 写的 Flink 从 0 到 1 学习 —— 如何自定义 Data Source


8、总结

本章总结大致可以用下面这张思维导图概括:

21.jpg

  • 集合、文件:读取本地数据,比较适合在本地测试时使用
  • 套接字:监听主机地址和端口号,获取数据,比较少用
  • 自定义数据源:一般常用的数据源,例如 KafkaHiveRabbitMQ,官方都有集成的依赖,通过 POM 进行引用即可使用,还有想要自己扩展的话,通过继承 RichSourceFunction,重写里面的方法,就能够获取自定义的数据

本文主要写了 Flink 提供的数据源使用,介绍了集合、文件、套接字和自定义数据源的例子。当然请根据自己的用途,选择使用合适的数据源,如有疑惑或不对之处请与我讨论~


项目地址

https://github.com/Vip-Augus/flink-learning-note


git clone https://github.com/Vip-Augus/flink-learning-note
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
消息中间件 关系型数据库 MySQL
Flink数据源问题之转换异常如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
100 2
|
1月前
|
消息中间件 关系型数据库 MySQL
Flink CDC 数据源问题之数据变动如何解决
Flink CDC数据源指的是使用Apache Flink的CDC特性来连接并捕获外部数据库变更数据的数据源;本合集将介绍如何配置和管理Flink CDC数据源,以及解决数据源连接和同步过程中遇到的问题。
|
1月前
|
SQL 消息中间件 关系型数据库
Flink数据源问题之读取mysql报错如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
|
1月前
|
消息中间件 SQL Kafka
Flink数据源问题之定时扫描key如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
|
1月前
|
存储 Oracle 关系型数据库
Flink CDC 数据源问题之连接释放冲突如何解决
Flink CDC数据源指的是使用Apache Flink的CDC特性来连接并捕获外部数据库变更数据的数据源;本合集将介绍如何配置和管理Flink CDC数据源,以及解决数据源连接和同步过程中遇到的问题。
100 0
|
1月前
|
API 数据库 流计算
有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
【2月更文挑战第27天】有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
81 3
|
1月前
|
关系型数据库 MySQL OLAP
实时计算 Flink版产品使用合集之可以支持 MySQL 数据源的增量同步到 Hudi 吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
消息中间件 SQL Kubernetes
实时计算 Flink版产品使用合集之多线程环境中,遇到 env.addSource 添加数据源后没有执行到 env.execut,是为什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
11天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之是否支持异构数据源之间的数据映射关系
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
12天前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用问题之在重试失败后如何通过回调的方式来手动关闭数据源连接
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。