Apache Flink源码解析之stream-sink

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: 上一篇我们谈论了Flink stream source,它作为流的数据入口是整个DAG(有向无环图)拓扑的起点。那么与此对应的,流的数据出口就是跟source对应的Sink。这是我们本篇解读的内容。 SinkFunction 跟SourceFunction对应,Flink针对Sink的根接口被称为SinkFunction。

上一篇我们谈论了Flink stream source,它作为流的数据入口是整个DAG(有向无环图)拓扑的起点。那么与此对应的,流的数据出口就是跟source对应的Sink。这是我们本篇解读的内容。

SinkFunction

SourceFunction对应,Flink针对Sink的根接口被称为SinkFunction。继承自Function这一标记接口。SinkFunction接口只提供了一个方法:

    void invoke(IN value) throws Exception;

该方法提供基于记录级别的调用(也就是每个被输出的记录都会调用该接口一次)。上面方法的参数value即为需要输出的记录。

SinkFunction相对来说比较简洁,下面我们来看一下它的实现者。

内置的SinkFunction

同样,我们先来看一下完整的类型继承体系:

flink-stream-sink_all-class-diagram

DiscardingSink

这是最简单的SinkFunction的实现,它的实现等同于没有实现(其实现为空方法)。它的作用就是将记录丢弃掉。它的主要场景应该是那些无需最终处理结果的记录。

WriteSinkFunction

WriteSinkFunction是一个抽象类。该类的主要作用是将需要输出的tuples(元组)作为简单的文本输出到指定路径的文件中去,元组被收集到一个list中去,然后周期性得写入文件。

WriteSinkFunction的构造器接收两个参数:

  • path : 需要写入的文件路径
  • format : WriteFormat的实例,用于指定写入数据的格式

在构造器中,它调用方法cleanFile,该方法用于初始化指定path的文件。初始化的行为是:如果不存在则创建,如果存在则清空

invoke方法的实现:

    public void invoke(IN tuple) {

        tupleList.add(tuple);
        if (updateCondition()) {
            format.write(path, tupleList);
            resetParameters();
        }

    }

从实现来看,其先将需要sink的元组加入内部集合。然后调用updateCondition方法。该方法是WriteSinkFunction定义的抽象方法。用于实现判断将tupleList写入文件以及清空tupleList的条件。接着将集合中的tuple写入到指定的文件中。最后又调用了resetParameters方法。该方法同样是一个抽象方法,它的主要用途是当写入的场景是批量写入时,可能会有一些状态参数,该方法就是用于对状态进行reset。

WriteSinkFunctionByMillis

该类是WriteSinkFunction的实现类。它支持以指定的毫秒数间隔将tuple批量写入文件。间隔由构造器参数millis指定。在内部,WriteSinkFunctionlastTime维护上一次写入的时间状态。它主要涉及上面提到的两个抽象方法的实现:

    protected boolean updateCondition() {
        return System.currentTimeMillis() - lastTime >= millis;
    }

updateCondition的实现很简单,拿当前主机的当前时间戳跟上一次的执行时间戳状态作对比:如果大于指定的间隔,则条件为真,触发写入。

    protected void resetParameters() {
        tupleList.clear();
        lastTime = System.currentTimeMillis();
    }

resetParameters实现是先清空tupleList,然后将lastTime老的时间戳状态覆盖为最新时间戳。

WriteFormat

一个写入格式的抽象类,提供了两种实现:

  • WriteFormatAsText : 以原样文本的形式写入指定路径的文件
  • WriteFormatAsCsv : 以csv格式写入指定文件

RichSinkFunction

RichSinkFunction通过继承AbstractRichFunction为实现一个rich SinkFunction提供基础(AbstractRichFunction提供了一个open/close方法对,以及获取运行时上下文对象手段)。RichSinkFunction也是抽象类,它有三个具体实现。

SocketClientSink

支持以socket的方式将数据发送到特定目标主机所在的服务器作为flink stream的sink。数据被序列化为byte array然后写入到socket。该sink支持失败重试模式的消息发送。该sink 可启用autoFlush,如果启用,那么会导致吞吐量显著下降,但延迟也会降低。该方法的构造器,提供的参数:

  • hostName : 待连接的server的host name
  • port : server的端口
  • schema :SerializationSchema的实例,用于序列化对象。
  • maxNumRetries : 最大重试次数(-1为无限重试)
  • autoflush : 是否自动flush

重试的策略在invoke方法中,当发送失败时进入到异常捕捉块中进行。

OutputFormatSinkFunction

一个将记录写入OutputFormat的SinkFunction的实现。

OutputFormat :定义被消费记录的输出接口。指定了最终的记录如何被存储,比如文件就是一种存储实现。

PrintSinkFunction

该实现用于将每条记录输出到标准输出流(stdOut)或标准错误流(stdErr)。在输出时,如果当前task的并行subtask实例个数大于1,也就是说当前task是并行执行的(同时存在多个实例),那么在输出每条记录之前会输出一个prefix前缀。prefix为在全局上下文中当前subtask的位置。

常见连接器中的Sink

Flink自身提供了一些针对第三方主流开源系统的连接器支持,它们有:

  • elasticsearch
  • flume
  • kafka(0.8/0.9版本)
  • nifi
  • rabbitmq
  • twitter

这些第三方系统(除了twitter)的sink,无一例外都是继承自RichSinkFunction

小结

这篇文章我们主要谈及了Flink的stream sink相关的设计、实现。当然这个主题还没有完全谈完,还会有后续篇幅继续解读。



原文发布时间为:2016-05-07


本文作者:vinoYang


本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
21天前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
505 13
Apache Flink 2.0-preview released
|
25天前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
59 3
|
11天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
40 2
|
30天前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
52 1
|
29天前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
|
SQL 消息中间件 分布式计算
《Apache Flink 案例集(2022版)》——5.数字化转型——移动云Apache Flink 在移动云实时计算的实践(上)
《Apache Flink 案例集(2022版)》——5.数字化转型——移动云Apache Flink 在移动云实时计算的实践(上)
267 0
|
数据采集 分布式计算 Kubernetes
《Apache Flink 案例集(2022版)》——5.数字化转型——移动云Apache Flink 在移动云实时计算的实践(下)
《Apache Flink 案例集(2022版)》——5.数字化转型——移动云Apache Flink 在移动云实时计算的实践(下)
294 0
|
存储 SQL 传感器
【Flink】(04)Apache Flink 漫谈系列 —— 实时计算 Flink 与 Alibaba Cloud Realtime Compute 剖析2
【Flink】(04)Apache Flink 漫谈系列 —— 实时计算 Flink 与 Alibaba Cloud Realtime Compute 剖析2
600 0
【Flink】(04)Apache Flink 漫谈系列 —— 实时计算 Flink 与 Alibaba Cloud Realtime Compute 剖析2
|
SQL 消息中间件 分布式计算
【Flink】(04)Apache Flink 漫谈系列 —— 实时计算 Flink 与 Alibaba Cloud Realtime Compute 剖析1
【Flink】(04)Apache Flink 漫谈系列 —— 实时计算 Flink 与 Alibaba Cloud Realtime Compute 剖析1
342 0
【Flink】(04)Apache Flink 漫谈系列 —— 实时计算 Flink 与 Alibaba Cloud Realtime Compute 剖析1
|
SQL 消息中间件 分布式计算
【Flink】(04)Apache Flink 漫谈系列 —— 实时计算 Flink 与 Alibaba Cloud Realtime Compute 剖析3
【Flink】(04)Apache Flink 漫谈系列 —— 实时计算 Flink 与 Alibaba Cloud Realtime Compute 剖析3
173 0

推荐镜像

更多