Flink之DataStream数据源

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

(1)Flink Source之文件数据源


Flink系统支持将文件内容读取到系统中,并转换成分布式数据集DataStream进行数据处理。


在 DataStream API中,可以在read File方法中指定文件读取类型(Watch Type)检测文件变换时间间隔

(interva)、文件路径过滤条件(File Filter)等参数,其中Watch Type共分为两种模式:

PROCESS CONTINUOUSLY和 PROCESS ONCE模式在 PROCESS CONTINUOUSLY模式下,一且检测到文件内容发生变化,Fink会将该文件全部内容加载到Fink系统中进行处理。而在PROCESS ONCE模式下,当文件内容发生变化时,只会将变化的数据读取至Fink中,在这种情况下数据只会被读取和处理一次。


可以看出,在PROCESS CONTINUOUSLY模式下是无法实现Excatly Once级别数据一致性保障的,而在PROCESSONCE模式,可以保证数据Excatly Once级别的一致性保证。但是需要注意的是,如果使用文件作为数据源,当某个节点异常停止的时候,这种情况下 Checkpoints不会更新,如果数据一直不断地在生成将导致该节点数据形成积压,可能需要耗费非常长的时间从最新的checkpoint中恢复应用。


示例代码:

package com.aikfk.flink.datastream.source;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/10 9:29 下午
 */
public class FileSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> stream = env.readFile(
                new TextInputFormat(new Path("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv"))
                , "/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv");
        stream.print();
        env.execute("stream");
    }
}

(2)Flink Source之Socket数据源


Flink支持从Socket端口中接入数据,在StreamExecutionEnvironment调用socketTextStream方法。该方法参数分别为Ip地址和端口,也可以同时传人字符串切割符delimiter和最大尝试次数maxRetry,其中delimiter负责将数据切割成Records数据格式;maxRetry在端口异常的情况,通过指定次数进行重连,如果设定为0,则Flink程序直接停止,不再尝试和端口进行重连。如下代码是使用socketTextStream方法实现了将数据从本地9999ロ中接入数据并转换成 DataStream数据集的操作:

DataStream<String> stream = env.socketTextStream("localhost",9999);

在linux系统下执行

nc -lk 9999

示例代码:

package com.aikfk.flink.datastream.source;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/10 9:29 下午
 */
public class SocketSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> stream = env.socketTextStream("bigdata-pro-m07",9999);
        stream.print();
        env.execute("stream");
    }
}

(3)Flink Source之集合数据源


Fink可以直接将Java或Scaa程序中集合类(Collection)转换成 DataStream数据集,本质上是将本地集合中的数据分发到远端并行执行的节点中。目前Fink支持从Java util。 Collection和 Javautil。 Iterator序列中转换成DataStream数据集。这种方式非常适合调式Flink本地程序,但需要注意的是,集合内容的数据结构类型必须要一致,否则可能会出现数据转换异常。1.png



示例代码:

package com.aikfk.flink.datastream.source;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/10 9:29 下午
 */
public class CollectionSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String,Integer>> stream = env.fromElements(
                new Tuple2<>("spark",2),
                new Tuple2<>("hbase",3)
        );
        stream.print();
        env.execute("stream");
    }
}

(4)Flink Source之外部数据源


前面提到的数据源类型都是一些基本的数据接入方式,例如从文件、Socket端口中接入数据,其实质是实现了不同的 SourceFunction, Fink将其封装成高级API,减少了用户的使用成本。对于流式计算类型的应用,数据大部分都是从外部第三方系统中获取,为此Flink通过实现 Source Function定义了非常丰富的第三方数据连接器,基本覆盖了大部分的高性能存储介质以及中间件等,其中部分连接器是仅支持读取数据,例如 TwitterStreaming API、Nety等;另外一部分仅支持数据输出(smhk,不支持数据输入(s)例如 Apache Cassandra、 Elasticsearch、 Hadoop FileSystem等。还有一部分是既支持数据输入,也支持数据输出,例如 Apache Kafka、Amazon Kinesis、 RabbitMQ等连接器。


2.png

示例代码:

package com.aikfk.flink.datastream.source;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/10 9:38 下午
 */
public class OutSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<Integer,Integer>> stream = env.addSource(new OutSourceFunction());
        stream.print();
        env.execute("stream");
    }
}
package com.aikfk.flink.datastream.source;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/10 9:39 下午
 */
public class OutSourceFunction implements SourceFunction<Tuple2<Integer,Integer>> {
    private int count = 0;
    @Override
    public void run(SourceContext sourceContext) throws Exception {
        while (count < 1000){
            int first = (int) (Math.random() * 10);
            sourceContext.collect(new Tuple2<>(first,first));
            count++;
            Thread.sleep(100L);
        }
    }
    @Override
    public void cancel() {
    }
}




相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
SQL 消息中间件 关系型数据库
Flink数据源问题之读取mysql报错如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
51 0
|
3月前
|
消息中间件 关系型数据库 MySQL
Flink数据源问题之转换异常如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
33 2
|
3月前
|
消息中间件 SQL Kafka
Flink数据源问题之定时扫描key如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
41 0
|
3月前
|
存储 Oracle 关系型数据库
Flink CDC 数据源问题之连接释放冲突如何解决
Flink CDC数据源指的是使用Apache Flink的CDC特性来连接并捕获外部数据库变更数据的数据源;本合集将介绍如何配置和管理Flink CDC数据源,以及解决数据源连接和同步过程中遇到的问题。
73 0
|
3月前
|
消息中间件 关系型数据库 MySQL
Flink CDC 数据源问题之数据变动如何解决
Flink CDC数据源指的是使用Apache Flink的CDC特性来连接并捕获外部数据库变更数据的数据源;本合集将介绍如何配置和管理Flink CDC数据源,以及解决数据源连接和同步过程中遇到的问题。
34 1
|
2月前
|
API 数据库 流计算
有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
【2月更文挑战第27天】有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
55 3
|
5天前
|
存储 算法 API
Flink DataStream API 批处理能力演进之路
本文由阿里云 Flink 团队郭伟杰老师撰写,旨在向 Flink Batch 社区用户介绍 Flink DataStream API 批处理能力的演进之路。
148 1
Flink DataStream API 批处理能力演进之路
|
9天前
|
Java 大数据 API
[AIGC] Flink入门教程:理解DataStream API(Java版)
[AIGC] Flink入门教程:理解DataStream API(Java版)
|
3月前
|
消息中间件 SQL canal
Flink转换问题之DataStream转成table失败如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
3月前
|
SQL 机器学习/深度学习 HIVE
Flink数据源问题之无法写入数据如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
36 2