Flink实战(五) - DataStream API编程

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 1 概述Flink中的DataStream程序是实现数据流转换的常规程序(例如,过滤,更新状态,定义窗口,聚合)。 最初从各种源(例如,消息队列,套接字流,文件)创建数据流。 结果通过接收器返回,接收器可以例如将数据写入文件或标准输出(例如命令行终端)。

1 概述

Flink中的DataStream程序是实现数据流转换的常规程序(例如,过滤,更新状态,定义窗口,聚合)。

最初从各种源(例如,消息队列,套接字流,文件)创建数据流。

结果通过接收器返回,接收器可以例如将数据写入文件或标准输出(例如命令行终端)。

Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。 执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。

  • 有关Flink API基本概念的介绍,请参阅
    基本概念

2 入门案例

以下程序是流窗口字数统计应用程序的完整工作示例,它在5秒窗口中对来自Web套接字的单词进行计数。 您可以复制并粘贴代码以在本地运行它。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}
nc -lk 9999

只需键入一些单词就可以返回一个新单词。 这些将是字数统计程序的输入。 如果要查看大于1的计数,请在5秒内反复键入相同的单词(如果不能快速输入,则将窗口大小从5秒增加)。

  • Socket输入
  • 程序输出

创建一个新数据流,其中包含从套接字无限接收的字符串。 接收的字符串由系统的默认字符集解码,使用“ n”作为分隔符。 当socket关闭时,阅读器立即终止。

  • Scala版本
    3 Data source源是您的程序从中读取输入的位置。可以使用

StreamExecutionEnvironment.addSource(sourceFunction)
将源附加到程序
Flink附带了许多预置实现的源函数,但你可以通过为非并行源实现SourceFunction,或者通过实现ParallelSourceFunction接口或为并行源扩展RichParallelSourceFunction来编写自己的自定义源。

可以从StreamExecutionEnvironment访问几个预定义的流源:

3.1 基于文件

  • readTextFile(path)
    TextInputFormat逐行读取文本文件,即符合规范的文件,并将它们作为字符串返回。
  • readFile(fileInputFormat, path)
    按指定的文件输入格式指定读取(一次)文件。
  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
    这是前两个内部调用的方法。它path根据给定的内容读取文件fileInputFormat。根据提供的内容watchType,此源可以定期监视(每intervalms)新数据(FileProcessingMode.PROCESS_CONTINUOUSLY)的路径,或者处理当前在路径中的数据并退出(FileProcessingMode.PROCESS_ONCE)。使用该pathFilter,用户可以进一步排除正在处理的文件。

实现:

在引擎盖下,Flink将文件读取过程分为两个子任务

  • 目录监控
  • 数据读取

这些子任务中的每一个都由单独的实体实现。监视由单个非并行(并行性= 1)任务实现,而读取由并行运行的多个任务执行。

后者的并行性等于工作并行性。单个监视任务的作用是扫描目录(定期或仅一次,具体取决于watchType),找到要处理的文件,将它们分层分割,并将这些拆分分配给下游读卡器。读者是那些将阅读实际数据的人。每个分割仅由一个读取器读取,而读取器可以逐个读取多个分割。

如果watchType设置为FileProcessingMode.PROCESS_CONTINUOUSLY,则在修改文件时,将完全重新处理其内容。这可以打破“完全一次”的语义,因为在文件末尾追加数据将导致其所有内容被重新处理。
如果watchType设置为FileProcessingMode.PROCESS_ONCE,则源扫描路径一次并退出,而不等待读者完成读取文件内容。当然读者将继续阅读,直到读取所有文件内容。在该点之后关闭源将导致不再有检查点。这可能会导致节点发生故障后恢复速度变慢,因为作业将从上一个检查点恢复读取。

3.2 基于Socket

  • socketTextStream
    从套接字读取。数据元可以用分隔符分隔。

3.3 基于集合

  • fromCollection(Collection)
    从Java Java.util.Collection创建数据流。集合中的所有数据元必须属于同一类型。
  • fromCollection(Iterator, Class)
    从迭代器创建数据流。该类指定迭代器返回的数据元的数据类型。
  • fromElements(T ...)
    从给定的对象序列创建数据流。所有对象必须属于同一类型。
  • fromParallelCollection(SplittableIterator, Class)
    并行地从迭代器创建数据流。该类指定迭代器返回的数据元的数据类型。
  • generateSequence(from, to)
    并行生成给定间隔中的数字序列。

3.4 自定义数据源方式SourceFunction

使用用户定义的源函数为任意源功能创建DataStream。

默认情况下,源具有1的并行性。

要启用并行执行,用户定义的源应

  • 实现ParallelSourceFunction

  • 或继承RichParallelSourceFunction


在这些情况下,生成的源将具有环境的并行性。
要改变它,然后调用DataStreamSource.setParallelism(int)

  • addSource
    附加新的源函数。例如,要从Apache Kafka中读取,您可以使用 addSource(new FlinkKafkaConsumer08<>(...))

4 算子

算子将一个或多个DataStream转换为新的DataStream。程序可以将多个转换组合成复杂的数据流拓扑。

本节介绍了基本转换,应用这些转换后的有效物理分区以及对Flink 算子链接的见解。

4.1 filter

DataStream→DataStream

  • 计算每个数据元的布尔函数,并保存函数返回true的数据元。过滤掉零值的过滤器
  • Scala
  • Java4.2 unionDataStream *→DataStream

两个或多个数据流的联合,创建包含来自所有流的所有数据元的新流

如果将数据流与自身联合,则会在结果流中获取两次数据元

  • Scala
  • Java

split拆分

DataStream→SplitStream

根据某些标准将流拆分为两个或更多个流。

select

SplitStream→DataStream

从拆分流中选择一个或多个流。

  • Scala
  • Java

5 Data Sinks

数据接收器使用DataStream并将它们转发到文件,套接字,外部系统或打印它们。Flink带有各种内置输出格式,这些格式封装在DataStreams上的算子操作后面:

  • writeAsText()/ TextOutputFormat
    按字符串顺序写入数据元。通过调用每个数据元的toString()方法获得字符串。
  • writeAsCsv(...)/ CsvOutputFormat
    将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
  • print()/ printToErr()
    在标准输出/标准错误流上打印每个数据元的toString()值。可选地,可以提供前缀(msg),其前缀为输出。这有助于区分不同的打印调用。如果并行度大于1,则输出也将与生成输出的任务的标识符一起添加。
  • writeUsingOutputFormat()/ FileOutputFormat
    自定义文件输出的方法和基类。支持自定义对象到字节的转换。
  • writeToSocket
    根据一个套接字将数据元写入套接字 SerializationSchema
  • addSink
    调用自定义接收器函数。Flink捆绑了其他系统(如Apache Kafka)的连接器,这些系统实现为接收器函数。

数据接收器使用DataStream并将它们转发到文件,套接字,外部系统或打印它们。Flink带有各种内置输出格式,这些格式封装在DataStreams上的 算子操作后面:

writeAsText()/ TextOutputFormat- 按字符串顺序写入元素。通过调用每个元素的toString()方法获得字符串。

writeAsCsv(...)/ CsvOutputFormat- 将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。

print()/ printToErr() - 在标准输出/标准错误流上打印每个元素的toString()值。可选地,可以提供前缀(msg),其前缀为输出。这有助于区分不同的打印调用。如果并行度大于1,则输出也将与生成输出的任务的标识符一起添加。

writeUsingOutputFormat()/ FileOutputFormat- 自定义文件输出的方法和基类。支持自定义对象到字节的转换。

writeToSocket - 根据a将元素写入套接字 SerializationSchema

addSink - 调用自定义接收器函数。Flink捆绑了其他系统(如Apache Kafka)的连接器,这些系统实现为接收器函数。

请注意,write*()方法DataStream主要用于调试目的。他们没有参与Flink的检查点,这意味着这些函数通常具有至少一次的语义。刷新到目标系统的数据取决于OutputFormat的实现。这意味着并非所有发送到OutputFormat的数据元都会立即显示在目标系统中。此外,在失败的情况下,这些记录可能会丢失。

要将流可靠,准确地一次传送到文件系统,请使用flink-connector-filesystem。此外,通过该.addSink(...)方法的自定义实现可以参与Flink的精确一次语义检查点。

参考

DataStream API

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
1月前
|
JSON Shell API
免费导航规划API接口详解:调用指南与实战示例
该接口由接口盒子提供,支持根据起点、终点及途经点生成驾车或步行导航路线,可获取详细或简化导航数据。提供GET/POST请求方式,需传入坐标信息及用户认证参数,适用于物流路径优化、步行导航、旅游路线规划等场景。
|
30天前
|
人工智能 API 定位技术
MCP 开发实战:手把手教你封装高德地图与 arXiv API
本教程为 MCP(Model Context Protocol)开发实战第二阶段,带你从零封装第三方 API 为 AI 模型可用工具。通过高德地图地理编码与 arXiv 论文检索两个实例,涵盖项目搭建、工具声明、资源定义、错误处理等核心内容,助你快速上手 MCP 开发并集成至 Claude 使用。
|
1月前
|
缓存 前端开发 API
阿里巴巴国际站关键字搜索 API 实战:从多条件筛选到商品列表高效获客
本文详解了如何通过阿里巴巴国际站的关键字搜索接口实现多条件商品搜索功能,涵盖接口调用、参数设置、分页处理及数据解析,并提供可复用的 Python 实现代码,助力开发者高效构建跨境电商商品搜索系统。
|
1月前
|
机器学习/深度学习 JSON API
2025最新版天猫图片搜索API全解析:从图像识别到商品匹配实战
天猫图片搜索API(拍立淘)基于深度学习与CNN技术,实现以图搜商品,支持图片URL或二进制上传,适用于比价、推荐等场景。2025版新增多模态搜索优化与相似度动态调整。接口支持POST/GET请求,返回商品详情及排序结果,示例代码提供Python请求方式。
|
2月前
|
缓存 安全 测试技术
精选API实战问答,解决开发中的疑难杂症
这是一份精选的 API 开发高频问题与解决方案合集,涵盖基础概念、设计规范、安全认证、调试测试、性能优化等十大主题。每日学习 1-2 个问题,结合实战应用,快速掌握 API 开发核心技能,高效解决开发中的各类疑难杂症,适合开发者构建系统化知识体系。
|
2月前
|
存储 数据采集 监控
电商数据分析实战:利用 API 构建商品价格监控系统
在电商运营中,商品价格直接影响转化率和竞争力。本文介绍如何构建一套自动化价格监控系统,覆盖京东、淘宝双平台,实现数据采集、存储、分析与智能告警,助力企业实时掌握价格动态,优化定价策略。
|
2月前
|
存储 数据采集 API
小红书笔记详情API深度解析与实战指南(2025年最新版)
本文深入解析小红书开放平台笔记详情API的进阶使用与合规策略,涵盖接口升级、数据维度扩展、调用优化等内容,并提供Python调用示例及数据清洗存储方案。结合电商导购、舆情监控等实战场景,助力开发者高效获取并应用内容资产,同时强调数据隐私与平台政策合规要点,帮助构建稳定、安全的数据应用体系。
|
消息中间件 Java 关系型数据库
Flink实战(五) - DataStream API编程(下)
Flink实战(五) - DataStream API编程(下)
345 0
Flink实战(五) - DataStream API编程(下)
|
消息中间件 监控 Java
Flink实战(五) - DataStream API编程(上)
Flink实战(五) - DataStream API编程(上)
602 0
Flink实战(五) - DataStream API编程(上)

热门文章

最新文章