Flink实战(五) - DataStream API编程(上)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink实战(五) - DataStream API编程(上)

1 概述

Flink中的DataStream程序是实现数据流转换的常规程序(例如,过滤,更新状态,定义窗口,聚合)。

最初从各种源(例如,消息队列,套接字流,文件)创建数据流。

结果通过接收器返回,接收器可以例如将数据写入文件或标准输出(例如命令行终端)。

Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。 执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。


有关Flink API基本概念的介绍,请参阅

https://blog.csdn.net/qq_33589510/article/details/89893394


2 入门案例

以下程序是流窗口字数统计应用程序的完整工作示例,它在5秒窗口中对来自Web套接字的单词进行计数。 您可以复制并粘贴代码以在本地运行它。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);
        dataStream.print();
        env.execute("Window WordCount");
    }
    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}
nc -lk 9999

只需键入一些单词就可以返回一个新单词。 这些将是字数统计程序的输入。 如果要查看大于1的计数,请在5秒内反复键入相同的单词(如果不能快速输入,则将窗口大小从5秒增加☺)。

  • Socket输入
  • 11.png
  • 程序输出

12.png

创建一个新数据流,其中包含从套接字无限接收的字符串。 接收的字符串由系统的默认字符集解码,使用“\ n”作为分隔符。 当socket关闭时,阅读器立即终止。

13.png

Scala版本

14.png

3 Data source

源是您的程序从中读取输入的位置。可以使用

StreamExecutionEnvironment.addSource(sourceFunction)

将源附加到程序

Flink附带了许多预置实现的源函数,但你可以通过为非并行源实现SourceFunction,或者通过实现ParallelSourceFunction接口或为并行源扩展RichParallelSourceFunction来编写自己的自定义源。


可以从StreamExecutionEnvironment访问几个预定义的流源:


3.1 基于文件

readTextFile(path)

TextInputFormat逐行读取文本文件,即符合规范的文件,并将它们作为字符串返回。


readFile(fileInputFormat, path)

按指定的文件输入格式指定读取(一次)文件。


readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)

这是前两个内部调用的方法。它path根据给定的内容读取文件fileInputFormat。根据提供的内容watchType,此源可以定期监视(每intervalms)新数据(FileProcessingMode.PROCESS_CONTINUOUSLY)的路径,或者处理当前在路径中的数据并退出(FileProcessingMode.PROCESS_ONCE)。使用该pathFilter,用户可以进一步排除正在处理的文件。


实现:

在引擎盖下,Flink将文件读取过程分为两个子任务


目录监控

数据读取

这些子任务中的每一个都由单独的实体实现。监视由单个非并行(并行性= 1)任务实现,而读取由并行运行的多个任务执行。

后者的并行性等于工作并行性。单个监视任务的作用是扫描目录(定期或仅一次,具体取决于watchType),找到要处理的文件,将它们分层分割,并将这些拆分分配给下游读卡器。读者是那些将阅读实际数据的人。每个分割仅由一个读取器读取,而读取器可以逐个读取多个分割。

1.如果watchType设置为FileProcessingMode.PROCESS_CONTINUOUSLY,则在修改文件时,将完全重新处理其内容。这可以打破“完全一次”的语义,因为在文件末尾追加数据将导致其所有内容被重新处理。

2.如果watchType设置为FileProcessingMode.PROCESS_ONCE,则源扫描路径一次并退出,而不等待读者完成读取文件内容。当然读者将继续阅读,直到读取所有文件内容。在该点之后关闭源将导致不再有检查点。这可能会导致节点发生故障后恢复速度变慢,因为作业将从上一个检查点恢复读取。

3.2 基于Socket

socketTextStream

从套接字读取。数据元可以用分隔符分隔。

3.3 基于集合

fromCollection(Collection)

从Java Java.util.Collection创建数据流。集合中的所有数据元必须属于同一类型。


fromCollection(Iterator, Class)

从迭代器创建数据流。该类指定迭代器返回的数据元的数据类型。


fromElements(T …)

从给定的对象序列创建数据流。所有对象必须属于同一类型。


fromParallelCollection(SplittableIterator, Class)

并行地从迭代器创建数据流。该类指定迭代器返回的数据元的数据类型。


generateSequence(from, to)

并行生成给定间隔中的数字序列。


3.4 自定义数据源方式SourceFunction

使用用户定义的源函数为任意源功能创建DataStream。

默认情况下,源具有1的并行性。

15.png

16.png

要启用并行执行,用户定义的源应

  • 实现ParallelSourceFunction

17.png

18.png

或继承RichParallelSourceFunction

19.png

20.png

在这些情况下,生成的源将具有环境的并行性。

要改变它,然后调用DataStreamSource.setParallelism(int)

21.png

  • addSource
    附加新的源函数。例如,要从Apache Kafka中读取,您可以使用 addSource(new FlinkKafkaConsumer08<>(…))
  • 22.png
  • 23.png
  • 24.png
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
13天前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
66 0
|
13天前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
59 0
|
13天前
|
SQL 消息中间件 分布式计算
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
20 0
|
2月前
|
大数据 API 数据处理
揭秘!Flink如何从默默无闻到大数据界的璀璨明星?起源、设计理念与实战秘籍大公开!
【8月更文挑战第24天】Apache Flink是一款源自Stratosphere项目的开源流处理框架,由柏林理工大学等机构于2010至2014年间开发,并于2014年捐赠给Apache软件基金会。Flink设计之初即聚焦于提供统一的数据处理模型,支持事件时间处理、精确一次状态一致性等特性,实现了流批一体化处理。其核心优势包括高吞吐量、低延迟及强大的容错机制。
53 1
|
2月前
|
API C# Shell
WPF与Windows Shell完美融合:深入解析文件系统操作技巧——从基本文件管理到高级Shell功能调用,全面掌握WPF中的文件处理艺术
【8月更文挑战第31天】Windows Presentation Foundation (WPF) 是 .NET Framework 的关键组件,用于构建 Windows 桌面应用程序。WPF 提供了丰富的功能来创建美观且功能强大的用户界面。本文通过问题解答的形式,探讨了如何在 WPF 应用中集成 Windows Shell 功能,并通过具体示例代码展示了文件系统的操作方法,包括列出目录下的所有文件、创建和删除文件、移动和复制文件以及打开文件夹或文件等。
62 0
|
3月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之怎么使用DataStream生成结果表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
SQL 分布式计算 测试技术
概述Flink API中的4个层次
【7月更文挑战第14天】Flink的API分为4个层次:核心底层API(如ProcessFunction)、DataStream/DataSet API、Table API和SQL。
|
4月前
|
SQL 关系型数据库 API
实时计算 Flink版产品使用问题之如何使用stream api
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
Kubernetes Oracle 关系型数据库
实时计算 Flink版操作报错合集之用dinky在k8s上提交作业,会报错:Caused by: org.apache.flink.table.api.ValidationException:,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
244 0
|
2天前
|
编解码 监控 API
直播源怎么调用api接口
调用直播源的API接口涉及开通服务、添加域名、获取API密钥、调用API接口、生成推流和拉流地址、配置直播源、开始直播、监控管理及停止直播等步骤。不同云服务平台的具体操作略有差异,但整体流程简单易懂。