漫谈流式计算的一致性

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介:

参考,

http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/

http://www.confluent.io/blog/real-time-stream-processing-the-next-step-for-apache-flink/

 

image

对于batch分析,fault-tolerant很容易做,失败只需要replay,就可以完美做到容错。

对于streaming分析, 数据流本身是动态,没有所谓的开始或结束,虽然可以replay buffer的部分数据,但fault-tolerant做起来会复杂的多

当前主流的一些streaming分析平台,都有一些各自特有的fault-tolerant的机制,在此分析和总结一下,

无状态流数据处理,

这是种比较简单的流式数据的场景,典型的应用是数据ETL,数据存储,数据流过是没有状态的

保证at least once语义, 
分钟级别,Storm的acker机制,就可以很好的保证, http://storm.apache.org/documentation/Guaranteeing-message-processing.html 
message没有被正确处理,收到ack时,可以选择重发,这样每条message对可以保证被处理到,但可能会被重复处理

小时,天级别,利用kafka的replay,一般达到天级别的cache

保证exactly once语义, 
对于无状态数据流,其实只要依赖最终存储的去重性(deduplication), 就可以达到exactly once 
比如对于数据库,通过unique key和insert ignore就可以解决这个问题,无论你之前重复处理多少次,最终我只存储一次。

如果最终存储不支持去重,或者场景比较复杂不仅仅是存储,比如做叠加计数 或 update 
做叠加计数,当前的机制,你无法知道这个message是否加过 
做update的时候,更新的时序性很重要,这个是ack机制无法保证的

Storm 0.7就提供transactional topology特性,http://storm.apache.org/documentation/Transactional-topologies.html

首先给message加上transaction id,这样有两个好处,可以保证时序性,在写入存储的时候,可以按transaction id顺序写入 
并且在可以外部存储上记录当前最新的transaction id,保证相同的transaction,不会被重复写入 
这个是transactional topology的核心思路,这样确实是可以保证强一致性,exactly once语义 
但这个方案只适用于无状态,或是依赖外部存储的,状态必须要存储在外部存储上

至于使用batch,或将topology分为processing和commit阶段,都是对性能的优化,并不会提升一致性的保障 
但由于使用micro-batch是必须的,所以也称这类方案是micro-batch方案,除了transactional topology,还有Apache Spark Streaming 
micro-batch的坏处, 
1. 改变编程模型,伪流式 
2. windows based聚合的限制,只能是micro-batch的倍数,比如micro-batch是3分钟,你想做个5分钟聚合,没法做 
2. 延迟变大,如果本身秒级别,但如果micro-batch是1分钟,那延迟就至少1分钟 

有状态流数据处理,

典型的场景,就是windows-based的聚合或计算,比如计算1分钟内的计数或平均值,这样会有部分数据需要cache在内存中 
这样当fail-over时,如何可以恢复cache,并保证exactly once语义

最直接的想法,

局部的snapshot

每个component对cache定期做snapshot,然后在fail-over后,各自恢复自己的cache, 
这样做的问题, 
1. snapshot很难增量做,如果cache比较大,成本会比较高 
2. snapshot只能定期做,会有部分丢失 
3. 最关键的,对于分布式系统,各个compoent独立的进行snapshot,很难达到同一个状态,每个component的处理速度都是不一样的,有的处理到n做了snapshot,而有的可能做到n+1才做, 
缺乏一个统一的参照系。

 

change-log 
每个 component,当接收到一个 message 的时候,产生一条 change log 记录该 message 和更新的状态,存入 transactional log 和数据库 
当做 fail-over 的时候,只需要每个 component 将数据库中的 log,拿出来 replay 即可 
这种方式使用的平台如 Google Cloud Dataflow,Apache Samza

对于 Apache Samza,会将 change log 放入kafka中,

image

当fail-over后,每个task从相应的kafka topic里面读出change-log,完成local state的replay

这样做的好处,是不用直接去snapshot local cache,如果cache比较大的话,这样是比较划算的 
但是如果数据流很big的话,这样做也不合适了,因为change-log会非常大

 

Distributed Snapshots (Apache Flink),全局的 snapshot

针对前面提到的局部 snapshot 最关键的问题,提出全局 snapshot 的方法, 
其实最大的问题仍然是分布式系统的根本问题,统一参照系的问题,如何让每个 component 在同一的状态下,进行 snapshot

这个原理来自 Chandy and Lamport, 1985,的paper “Distributed Snapshots: Determining Global States of Distributed Systems”

http://blog.acolyer.org/2015/04/22/distributed-snapshots-determining-global-states-of-distributed-systems/

局部的snapshot会有的问题,

状态丢失,如下图,但状态中传输的时候,对P和Q进行snapshot,会导致队列中的绿蓝橙状态丢失

image

状态重复,brown状态中P和Q的snapshot里面同时出现

image

怎么解这样的问题?分布式系统中缺乏统一参照系的情况下,只有通过通信才能确定偏序的问题 
所以这里使用marker来做组件间的同步,并防止丢失状态,会同时对组件,以及队列同时做snapshot, 如下图

image

P做snapshot,然后发送marker到Q 
Q收到marker的时候,知道P做了snapshot,那么我也要做snapshot 
同时还要对PQ channel做snapshot,此时channel中有个green,但是由于green是在marker后面的,说明它在P的snapshot里面已经做过,不需要再做,所以此时PQ的snapshot为空 
Q在做完snapshot后,还需要把marker返回给P,因为在过程中orange从Q被发送到P 
当P收到Q返回的marker时,由于P的snapshot已经做过,无法改变 
所以把orange放在QP channel的snapshot中

最终做出的全局的snapshot为,

P(red, green, blue)
channel PQ ()
Q(brown, pink)
channel QP (orange)
这样就解决了状态丢或重复的问题

 

Flink’s distributed snapshotting实现基于stream barriers

image

可见,barrier可以将流拆分成一段段的数据,每个barrier都是一个snapshot点,但是这种拆分不同于micro-batch,并不会影响到正常的流式处理 
在DAG,即有向无环图的case下,是不需要对channel做snapshot的,场景会比较简单 
只是每个组件收到barrier的时候去做snapshot就好,该算法的几个前提: 
1. 网络可靠,消息FIFO; 
2. channel可以block,unblock,支持对所有output channel进行广播 
3. 可自动识别注入的barrier

完成过程如图,这是个有两条入边的case,相对复杂些 
当收到一条channel的barrier时,需要先block该channel,然后等待另一个channel中的barrier 
当两条channel的barrier都到达时,说明达到统一状态,进行checkpoint 
然后unblock之前block的channel,并对所有的output channel广播该barrier

image

当DAG上的所有组件都完成snapshot时,那么一个全局的snapshot就完成了,以barrier为唯一标识

比较抽象,下图以kafka为例子解释一下,https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html

image

对于kafka而言,不同的partition需要不同的线程读, 
图中,4个source thread分别从4个partition读取数据 
其中由唯一的master来发起checkpoint流程, 
过程是, 
1. Master给所有的source thread发checkpoint请求 
2. source thread接收到cp请求后,会记录当前的offset,比如5791,并做该offset的message前发出streaming barrier 
    并将offset返回给master

3. 这样master收到所有source的ack offset,就相当于对source做了snapshot,恢复时只需要将相应的source置到该offset即可 
4. 中间每个组件,当收到所有input channel的barrier时,将cp存入数据库,并通知Master 
5. 层层下去,直到所有Sink节点,最终节点,完成snapshot

6. master接收到所有节点的做完cp的ack,知道这次checkpoint全部完成

这个方案的最大的问题是,当多个input channel时,需要等所有的barrier到齐,这个明显会增加latency 
Flink的优化是,不等,看到barrier就打snapshot,这样的问题就是无法保证exactly once,会重复, 
因为后来的barrier打checkpoint时会覆盖先前的cp, 
此时barrier先到的channel已经处理了一些barrier之后的数据,这部分结果也会存在cp中

但当fail-over的时候,因为replay是根据你发送barrier的offset来重发的,所以这部分会重复


本文章摘自博客园,原文发布日期:2015-11-18 

目录
相关文章
|
16天前
|
消息中间件 缓存 Serverless
在进行实时数据处理时,FaaS 如何保证数据的一致性和处理的实时性?
在进行实时数据处理时,FaaS 如何保证数据的一致性和处理的实时性?
|
1月前
|
算法 Java 关系型数据库
漫谈分布式数据复制和一致性!
漫谈分布式数据复制和一致性!
|
3月前
|
缓存 数据库 算法
实时强一致性
【8月更文挑战第17天】
43 1
|
3月前
|
数据处理 流计算
流计算引擎数据问题之保证流计算的正确性如何解决
流计算引擎数据问题之保证流计算的正确性如何解决
25 0
|
存储 关系型数据库 MySQL
如何实现基于Flink的高吞吐、精确一致性数据入湖
APS(ADB Pipeline Service)简介:ADB湖仓版在深化自身湖仓能力建设的同时,还推出了APS(ADB Pipeline Service)数据通道组件,为客户提供实时数据流服务实现数据低成本、低延迟入湖入仓。本文以数据源SLS如何通过APS实现高速精确一致性入湖为例,介绍相关的挑战和解决方法。
|
存储 负载均衡 关系型数据库
基于Flink的高吞吐精确一致性入湖实现
AnalyticDB助力企业降本增效,构建企业级数据分析平台
基于Flink的高吞吐精确一致性入湖实现
|
存储 分布式计算 Oracle
「分布式计算」什么是严格一致性和最终一致性?
「分布式计算」什么是严格一致性和最终一致性?
|
算法 架构师 Java
分布式一致性算法有哪些|学习笔记
快速学习分布式一致性算法有哪些
133 0
|
算法 Java Nacos
分布式事务与分布式一致性的区别 | 学习笔记
快速学习 分布式事务与分布式一致性的区别
159 0
分布式一致性
本文从分布式一致性问题出发,介绍了各种一致性算法,希望通过该文能让大家对分布式系统有一定的认识。