Flink实战(五) - DataStream API编程(上)

简介: Flink实战(五) - DataStream API编程(上)

1 概述

Flink中的DataStream程序是实现数据流转换的常规程序(例如,过滤,更新状态,定义窗口,聚合)。

最初从各种源(例如,消息队列,套接字流,文件)创建数据流。

结果通过接收器返回,接收器可以例如将数据写入文件或标准输出(例如命令行终端)。

Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。 执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。


有关Flink API基本概念的介绍,请参阅

https://blog.csdn.net/qq_33589510/article/details/89893394


2 入门案例

以下程序是流窗口字数统计应用程序的完整工作示例,它在5秒窗口中对来自Web套接字的单词进行计数。 您可以复制并粘贴代码以在本地运行它。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);
        dataStream.print();
        env.execute("Window WordCount");
    }
    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}
nc -lk 9999

只需键入一些单词就可以返回一个新单词。 这些将是字数统计程序的输入。 如果要查看大于1的计数,请在5秒内反复键入相同的单词(如果不能快速输入,则将窗口大小从5秒增加☺)。

  • Socket输入
  • 11.png
  • 程序输出

12.png

创建一个新数据流,其中包含从套接字无限接收的字符串。 接收的字符串由系统的默认字符集解码,使用“\ n”作为分隔符。 当socket关闭时,阅读器立即终止。

13.png

Scala版本

14.png

3 Data source

源是您的程序从中读取输入的位置。可以使用

StreamExecutionEnvironment.addSource(sourceFunction)

将源附加到程序

Flink附带了许多预置实现的源函数,但你可以通过为非并行源实现SourceFunction,或者通过实现ParallelSourceFunction接口或为并行源扩展RichParallelSourceFunction来编写自己的自定义源。


可以从StreamExecutionEnvironment访问几个预定义的流源:


3.1 基于文件

readTextFile(path)

TextInputFormat逐行读取文本文件,即符合规范的文件,并将它们作为字符串返回。


readFile(fileInputFormat, path)

按指定的文件输入格式指定读取(一次)文件。


readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)

这是前两个内部调用的方法。它path根据给定的内容读取文件fileInputFormat。根据提供的内容watchType,此源可以定期监视(每intervalms)新数据(FileProcessingMode.PROCESS_CONTINUOUSLY)的路径,或者处理当前在路径中的数据并退出(FileProcessingMode.PROCESS_ONCE)。使用该pathFilter,用户可以进一步排除正在处理的文件。


实现:

在引擎盖下,Flink将文件读取过程分为两个子任务


目录监控

数据读取

这些子任务中的每一个都由单独的实体实现。监视由单个非并行(并行性= 1)任务实现,而读取由并行运行的多个任务执行。

后者的并行性等于工作并行性。单个监视任务的作用是扫描目录(定期或仅一次,具体取决于watchType),找到要处理的文件,将它们分层分割,并将这些拆分分配给下游读卡器。读者是那些将阅读实际数据的人。每个分割仅由一个读取器读取,而读取器可以逐个读取多个分割。

1.如果watchType设置为FileProcessingMode.PROCESS_CONTINUOUSLY,则在修改文件时,将完全重新处理其内容。这可以打破“完全一次”的语义,因为在文件末尾追加数据将导致其所有内容被重新处理。

2.如果watchType设置为FileProcessingMode.PROCESS_ONCE,则源扫描路径一次并退出,而不等待读者完成读取文件内容。当然读者将继续阅读,直到读取所有文件内容。在该点之后关闭源将导致不再有检查点。这可能会导致节点发生故障后恢复速度变慢,因为作业将从上一个检查点恢复读取。

3.2 基于Socket

socketTextStream

从套接字读取。数据元可以用分隔符分隔。

3.3 基于集合

fromCollection(Collection)

从Java Java.util.Collection创建数据流。集合中的所有数据元必须属于同一类型。


fromCollection(Iterator, Class)

从迭代器创建数据流。该类指定迭代器返回的数据元的数据类型。


fromElements(T …)

从给定的对象序列创建数据流。所有对象必须属于同一类型。


fromParallelCollection(SplittableIterator, Class)

并行地从迭代器创建数据流。该类指定迭代器返回的数据元的数据类型。


generateSequence(from, to)

并行生成给定间隔中的数字序列。


3.4 自定义数据源方式SourceFunction

使用用户定义的源函数为任意源功能创建DataStream。

默认情况下,源具有1的并行性。

15.png

16.png

要启用并行执行,用户定义的源应

  • 实现ParallelSourceFunction

17.png

18.png

或继承RichParallelSourceFunction

19.png

20.png

在这些情况下,生成的源将具有环境的并行性。

要改变它,然后调用DataStreamSource.setParallelism(int)

21.png

  • addSource
    附加新的源函数。例如,要从Apache Kafka中读取,您可以使用 addSource(new FlinkKafkaConsumer08<>(…))
  • 22.png
  • 23.png
  • 24.png
相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
3月前
|
缓存 监控 前端开发
顺企网 API 开发实战:搜索 / 详情接口从 0 到 1 落地(附 Elasticsearch 优化 + 错误速查)
企业API开发常陷参数、缓存、错误处理三大坑?本指南拆解顺企网双接口全流程,涵盖搜索优化、签名验证、限流应对,附可复用代码与错误速查表,助你2小时高效搞定开发,提升响应速度与稳定性。
|
3月前
|
缓存 自然语言处理 API
阿里巴巴国际站关键字搜索 API 实战:3 步搞定多语言适配 + 限流破局,询盘量提升 40%
跨境电商API开发常陷合规、多语言、限流等坑。本文详解从国际合规(GDPR/CCPA)到参数优化、数据结构化及区域化搜索的全链路方案,附Python代码模板与缓存重试架构,助力提升调用成功率至99%+,精准询盘增长42%。
|
4月前
|
数据采集 缓存 API
小红书笔记详情 API 实战指南:从开发对接、场景落地到收益挖掘(附避坑技巧)
本文详解小红书笔记详情API的开发对接、实战场景与收益模式,涵盖注册避坑、签名生成、数据解析全流程,并分享品牌营销、内容创作、SAAS工具等落地应用,助力开发者高效掘金“种草经济”。
小红书笔记详情 API 实战指南:从开发对接、场景落地到收益挖掘(附避坑技巧)
|
3月前
|
Cloud Native 算法 API
Python API接口实战指南:从入门到精通
🌟蒋星熠Jaxonic,技术宇宙的星际旅人。深耕API开发,以Python为舟,探索RESTful、GraphQL等接口奥秘。擅长requests、aiohttp实战,专注性能优化与架构设计,用代码连接万物,谱写极客诗篇。
Python API接口实战指南:从入门到精通
|
3月前
|
开发者 API 机器学习/深度学习
淘宝 / 1688 / 义乌购图搜 API 实战指南:接口调用与商业场景应用
本文详解淘宝、1688、义乌购三大平台图片搜索接口的核心特点、调用流程与实战代码。涵盖跨平台对比、参数配置、响应解析及避坑指南,支持URL/Base64上传,返回商品ID、价格、销量等关键信息,助力开发者快速实现商品识别与比价功能。
淘宝 / 1688 / 义乌购图搜 API 实战指南:接口调用与商业场景应用
|
4月前
|
人工智能 运维 监控
阿里云 API 聚合实战:破解接口碎片化难题,3 类场景方案让业务响应提速 60%
API聚合破解接口碎片化困局,助力开发者降本增效。通过统一中间层整合微服务、第三方接口与AI模型,实现调用次数减少60%、响应提速70%。阿里云实测:APISIX+函数计算+ARMS监控组合,支撑百万级并发,故障定位效率提升90%。
368 0
|
4月前
|
JSON API 调度
Midjourney 技术拆解与阿里云开发者实战指南:从扩散模型到 API 批量生成
Midjourney深度解析:基于优化Stable Diffusion,实现文本到图像高效生成。涵盖技术架构、扩散模型原理、API调用、批量生成系统及阿里云生态协同,助力开发者快速落地AIGC图像创作。
650 0
|
SQL API 流计算
Flink关系型API简介
在接触关系型API之前,用户通常会采用DataStream、DataSet API来编写Flink程序,它们都提供了丰富的处理能力,以DataStream为例,它有如下这些优点: 富有表现力的流处理,包括但不限于:转换数据,更新状态,定义窗口、聚合,事件时间语义,有状态且保证正确性等; 高度自定义.
1707 0
|
5月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
553 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄