Flink之DataStream数据源

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

(1)Flink Source之文件数据源


Flink系统支持将文件内容读取到系统中,并转换成分布式数据集DataStream进行数据处理。


在 DataStream API中,可以在read File方法中指定文件读取类型(Watch Type)检测文件变换时间间隔

(interva)、文件路径过滤条件(File Filter)等参数,其中Watch Type共分为两种模式:

PROCESS CONTINUOUSLY和 PROCESS ONCE模式在 PROCESS CONTINUOUSLY模式下,一且检测到文件内容发生变化,Fink会将该文件全部内容加载到Fink系统中进行处理。而在PROCESS ONCE模式下,当文件内容发生变化时,只会将变化的数据读取至Fink中,在这种情况下数据只会被读取和处理一次。


可以看出,在PROCESS CONTINUOUSLY模式下是无法实现Excatly Once级别数据一致性保障的,而在PROCESSONCE模式,可以保证数据Excatly Once级别的一致性保证。但是需要注意的是,如果使用文件作为数据源,当某个节点异常停止的时候,这种情况下 Checkpoints不会更新,如果数据一直不断地在生成将导致该节点数据形成积压,可能需要耗费非常长的时间从最新的checkpoint中恢复应用。


示例代码:

package com.aikfk.flink.datastream.source;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/10 9:29 下午
 */
public class FileSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> stream = env.readFile(
                new TextInputFormat(new Path("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv"))
                , "/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv");
        stream.print();
        env.execute("stream");
    }
}

(2)Flink Source之Socket数据源


Flink支持从Socket端口中接入数据,在StreamExecutionEnvironment调用socketTextStream方法。该方法参数分别为Ip地址和端口,也可以同时传人字符串切割符delimiter和最大尝试次数maxRetry,其中delimiter负责将数据切割成Records数据格式;maxRetry在端口异常的情况,通过指定次数进行重连,如果设定为0,则Flink程序直接停止,不再尝试和端口进行重连。如下代码是使用socketTextStream方法实现了将数据从本地9999ロ中接入数据并转换成 DataStream数据集的操作:

DataStream<String> stream = env.socketTextStream("localhost",9999);

在linux系统下执行

nc -lk 9999

示例代码:

package com.aikfk.flink.datastream.source;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/10 9:29 下午
 */
public class SocketSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> stream = env.socketTextStream("bigdata-pro-m07",9999);
        stream.print();
        env.execute("stream");
    }
}

(3)Flink Source之集合数据源


Fink可以直接将Java或Scaa程序中集合类(Collection)转换成 DataStream数据集,本质上是将本地集合中的数据分发到远端并行执行的节点中。目前Fink支持从Java util。 Collection和 Javautil。 Iterator序列中转换成DataStream数据集。这种方式非常适合调式Flink本地程序,但需要注意的是,集合内容的数据结构类型必须要一致,否则可能会出现数据转换异常。1.png



示例代码:

package com.aikfk.flink.datastream.source;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/10 9:29 下午
 */
public class CollectionSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String,Integer>> stream = env.fromElements(
                new Tuple2<>("spark",2),
                new Tuple2<>("hbase",3)
        );
        stream.print();
        env.execute("stream");
    }
}

(4)Flink Source之外部数据源


前面提到的数据源类型都是一些基本的数据接入方式,例如从文件、Socket端口中接入数据,其实质是实现了不同的 SourceFunction, Fink将其封装成高级API,减少了用户的使用成本。对于流式计算类型的应用,数据大部分都是从外部第三方系统中获取,为此Flink通过实现 Source Function定义了非常丰富的第三方数据连接器,基本覆盖了大部分的高性能存储介质以及中间件等,其中部分连接器是仅支持读取数据,例如 TwitterStreaming API、Nety等;另外一部分仅支持数据输出(smhk,不支持数据输入(s)例如 Apache Cassandra、 Elasticsearch、 Hadoop FileSystem等。还有一部分是既支持数据输入,也支持数据输出,例如 Apache Kafka、Amazon Kinesis、 RabbitMQ等连接器。


2.png

示例代码:

package com.aikfk.flink.datastream.source;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/10 9:38 下午
 */
public class OutSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<Integer,Integer>> stream = env.addSource(new OutSourceFunction());
        stream.print();
        env.execute("stream");
    }
}
package com.aikfk.flink.datastream.source;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/10 9:39 下午
 */
public class OutSourceFunction implements SourceFunction<Tuple2<Integer,Integer>> {
    private int count = 0;
    @Override
    public void run(SourceContext sourceContext) throws Exception {
        while (count < 1000){
            int first = (int) (Math.random() * 10);
            sourceContext.collect(new Tuple2<>(first,first));
            count++;
            Thread.sleep(100L);
        }
    }
    @Override
    public void cancel() {
    }
}




相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
5月前
|
SQL API 数据处理
实时计算 Flink版产品使用合集之DataStream方式是否可以实现oracle--&gt;的数据同步
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL Java 关系型数据库
Flink DataSet API迁移到DataStream API实战
本文介绍了作者的Flink项目从DataSet API迁移到DataStream API的背景、方法和遇到的问题以及解决方案。
212 3
|
13天前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
63 0
|
5月前
|
关系型数据库 MySQL OLAP
实时计算 Flink版产品使用合集之可以支持 MySQL 数据源的增量同步到 Hudi 吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
消息中间件 SQL Kubernetes
实时计算 Flink版产品使用合集之多线程环境中,遇到 env.addSource 添加数据源后没有执行到 env.execut,是为什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
13天前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
59 0
|
13天前
|
SQL 消息中间件 分布式计算
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
19 0
|
3月前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用问题之如何对接Oracle数据源
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之怎么使用DataStream生成结果表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
存储 监控 数据处理
Flink⼤状态作业调优实践指南:Datastream 作业篇
本文整理自俞航翔、陈婧敏、黄鹏程老师所撰写的大状态作业调优实践指南。
56541 5
Flink⼤状态作业调优实践指南:Datastream 作业篇