kafka ETL架构解析
基于Kafka connect+函数计算的轻量计算解决方案
——竹恩
消息队列Kafka产品研发专家
KafkaETL基于kafkaconnect加函数计算,为云上用户提供了一套数据流转加数据计算的一站式解决方案。
一、问题背景
在大数据云计算时代,一个复杂的大型系统一般都是由许多处理特定任务的子系统构成,各个子系统由不同的团队开发,因此各个系统在数据、数据的格式和内容上会存在天然的不一致性。
当数据需要在各个子系统之间进行流转的时候,就需要对数据进行格式化处理,以消除各个系统之间的数据差异。此外,还可能需要收集来自各个子系统的异构数据,对采集到的数据做一些加工和计算,然后投递到数据仓库进行后续的数据分析。
上述流程中,可以抽象出两个典型的场景:数据流转和数据计算。数据流转场景主要面对的是异构系统之间数据如何进行流转,数据计算场景主要面对的是如何在数据流转的过程中进行数据的加工计算。
二、数据流转场景
在数据流转场景中,需要将各种关系型数据库和非关系型数据库中的数据导入到数据仓库,或是将MySQL中的数据导入到Elasticsearch,来提高查询体验,又比如将一些数据导入到图形数据库中。
实现上述需求,需要解决以下几个问题:
第一,各个不同源之间的数据如何拷贝;
第二,如何满足传递的实时性,比如MySQL里的变更需要马上在elastic社区中反映出来,不然会导致后台数据变更后用户却查不到最新的消息;除此之外,还需要保证数据拷贝的高可用性、可伸缩性以及可扩展性。
而传统的方案可能是为各个数据源之间都专门实现一个数据拷贝工具,但是此方案会带来以下问题:
首先,为每个场景都写一个专门的工具,工作量非常大。
其次,业务耦合会非常严重。一旦上层的密码发生了变化,下层就需要修改代码,所以上层需要感知到所有下层的存在。
因此,专门的工具可行性不高。
而Kafka connect正是为解决以上异构数据同步问题而生的。它的解决思路是在各个数据源之间加一层消息中间件,所有数据都经过消息中间件进行存储和分发。这样做的好处有二:其一,通过消息中间件做异步解耦,所有系统只需要和消息中间件通信;其二,需要开发的解析工作工具数量也从原来的n²个变成了2n个。
Kafkaconnect主要用于连接消息系统和数据源,根据数据的流向,连接可以分为source connector和sink connector。其原理也很简单,source connect负责解析来源数据,转换成标准格式的消息,通过Kafkaproducer发送到Kafka block中。同理,sink connect则是通过Kafkaconsumer消费对应的topic,然后投递到目标系统中。
整个过程中,Kafka connect统一解决了任务调度、消息系统交互、自动扩缩容、容错以及监控等问题,大大减少了重复劳动。
如何将来源系统的数据解析成message,或将message解析成目标系统中的数据,需要根据不同的数据系统以不同的方式实现的。
对于目前主流的系统,各大厂商皆有相应的connect实现。比如消息系统Kafka版提供了全托管、免运维的Kafkaconnect,用于消息队列Kafka版与其他阿里云服务之间的数据同步。消息队列Kafka版支持了MySQL source connector、oss sink connector、MaxComputesink connector以及FC sink connector等主流的connector。用户只需要在消息队列Kafka控制台的图形界面上做一些配置,就可以一键拉起connect任务进行数据同步。
三、数据计算场景
Kafka connect也提供了一些transformer解决了一小部分数据转换的需求,但是依旧缺乏实时计算能力。为了应对以上产品的数据实时处理需求,市场上出现了许多优秀的处理工具。
从最初的hadoop、hive到spark、Flink以及Kafka stream等,都提供了对应的组件模块和上下游解决方案,但是这些处理方案都或多或少存在一些问题:
①处理框架比较重,占用资源多,比如当下流行的spark和flink都需要先搭建一个集群。而集群本身的运行就需要不少资源,集群的规模一般按照流量分值进行配置,大多数时候都是浪费的。
②在诸多框架中,需要根据实际需求做选技术选型。
③后期可能需要专门的团队或者人员去运行维护,过程中需要消耗大量的学习成本和维护成本。
针对部分无状态的简单计算,函数计算或许是一个很好的选择。阿里云上的函数计算是事件驱动的全托管计算服务。使用函数计算时,用户无需采购与管理服务器等基础资源,只需编写上传代码即可,函数计算会帮助用户准备好计算资源,弹性、可靠的运行任务,并提供日志查询、性能监控和报警等功能。
总的来说,函数计算以简单易用的方式为用户提供了计算能力,并能满足绝大多数计算场景的需求。
四、Kafka ETL数据流转计算一站式解决方案
阿里云上消息队列kafka版近期推出了kafka ETL组件,通过kafka+kafkaconnect+函数计算的架构,能够很好地应对数据转储和实时计算问题,具有轻量、学习成本低、开发周期短、资源动态伸缩、简单快速等优点。
上述云原生一站式解决方案,通过kafkaconnect作为实时处理任务的触发器,能够实时接收到topic中的新数据,然后转发至函数计算,触发实时数据处理任务。
在数据流转阶段,大量异构系统中的数据以各种方式汇集到kafka中,然后以kafka为中心做后续的处理。作为数据数据流转中的一环,kafka connect除了保障数据的实时性以外,还解决了任务调度、与交易系统交互、自动扩缩容、容错以及监控等问题,大大减少了重复劳动。
数据到达函数计算以后,会自动触发用户编写的数据处理逻辑,对原始消息内容进行计算,最后函数计算可以将加工完成的数据投递到用户指定的目标端,比如投递回消息队列kafka,或投递到MaxComputer进行下一步的数据分析。
上述任务的配置、创建和运行都只需要通过云上kafka控制台的图形界面进行操作即可完成。
五、Kafka ETL示例
上图是kafka ETL的应用示例。用户需要从一个电商业务系统中采集日志,存储到kafka,对日志进行加工后将其投递到两个目标端:一是将所有数据投递到MaxComputer进行数据分析,二是将支付数据投递到Elistic Search中进行检索。
那么,如何利用kafka ETL来实现上述流程?
第一步,采集原始日志到消息队列kafkatopic中。这里可以使用一些比较成熟的开源组件,比如FileBeat、logstash、flume等,将用户应用端的日志消息投递到kafka中。当然这里也可以做一些简单的转换,但是一般不建议这么做,还是建议保留一份原始信息,因为原始的日志可能来自各个关联的应用,内容和格式上会存在些许差异。
示例中的订单应用生成了一条消息,日志中包含一个用户的ID action、订单ID以及当前状态。支付应用中又生成了另一条日志,同样包含以上信息,只是在格式上会存在一些小差别。这两条来自不同子系统的日志都被采集到kafka的user_order_raw topic中,最终对应的到Topic里两条key=null,value为日志的原始内容。
由于来源系统日志格式不一样,topic里包含的两条消息格式也存在一定的差别。
第二步,对topic中的消息做简单的数据加工。数据到达kafka的topic之后,kafka connect会消费消息并投递到函数计算中。再由函数计算对数据进行加工计算,计算的目标是抽取userID、action、orderID以及status,并将数据都转换成为大写字母,形成统一的格式。最终,所有处理后的消息发往MaxCompute进行分析。此外,还需要额外筛选出action=pay所有消息发往Elastic Search中。
以上步骤可以在kafka控制台图形界面上创建一个ETL任务,用户选择数据来源时,只需要选择topic:user_order_raw,然后写一段对数据的处理代码。这里,ETL已经提供了部分模板,用户在模板的基础上做稍许改动即可。
在本示例中,用户只需要写一段从不同格式的日志中抽取userID、action、orderID以及status的代码,然后将所有处理过的消息路由到不同的目标topic即可。
第三步,函数计算处理完的消息投递回kafka。经过这一步处理,所有消息被路由到目标topic:user_order_processed。此topic中会包含两条消息,消息的key=null,value如上图左所示。另外 action=pay的消息还会被路由到topic:use_order_pay_info,此topic中会包含一条消息:key=null,value上图右所示。可以看到此时的消息格式已经统一。
最后一步,将处理完的消息再次投递到目标端。将topic:user_order_process中所有处理完的订单相关消息投递到MaxCompute中进行数据分析,将topic:user_order_pay_info中的支付信息投递到Elastic Search中进行后续搜索。
针对最后一步,可以一键创建kafka connect任务,将数据投递到目标端。
上述完整流程可以总结如下:在数据到达kafka之后,用户只需要在消息队列kafka控制台创建一个ETL任务,写一小段处理代码即可。上述第二步处理完数据之后,可以不经过第三步投递回kafka,而是在处理完之后直接将消息路由到MaxCompute和es中。
本设计中采用的方式是将处理完的数据再次发送回kafka,并再次投递到目标系统中。这种方式可以在kafka端保留一份处理后的数据,用户也可以比较灵活地对这份数据做进一步的处理,或继续投递到其他第三方系统。
六、消息队列Kafka ETL差异化优势
通过上述的事例,可以看出kafka ETL具有如下优势:
① 开箱即用免运维。
②支持快速上线,一站式体验,技术投入小,节省大量时间成本。
③节省成本,用户不需要额外购买流计算等重产品。
④控制台上的相关日志,信息比较全面,便于问题的排查。
⑤支持秒级弹缩。
接下来,分享一下阿里云消息队列kafka在内核层面的差异化优势。
当前众多的云厂商都在售卖kafka。面对竞争,阿里云上的消息队列kafka版在发展过程中,除了解决了易用性和稳定性方面的问题,还做到了有区分度,在内核层面做出了自己的核心竞争力和优势。
消息队列kafka版的topic支持云存储和local存储这两种存储引擎。其中 local topic指的就是以kafka原生的方式存储数据,保留开源kafka的全部特性,且百分百兼容了开源kafka。而云存储则是本文接下来要介绍的重点。
消息队列kafka通过自研云存储引擎彻底解决了原生kafka一些深层的bug,以及因为本身架构导致的难以解决的问题,实现了支持海量分区、通过多副本技术降低存储成本以及支持无缝弹缩迁移这三大主要特性。
1、支持海量分区
在消息引擎中常见的消息存储方式有碎片式存储和集中式存储,碎片式存储通常以topic或分区维度进行存储,其主要优势是架构比较简单,可以针对topic或分区,控制持久化的容量。kafka在架构上是基于分区的碎片式存储,在分区规模不大的情况下,可以通过磁盘的顺序读写,获得较高的读消息读写性能。
通常情况下,一般规格的kafka集群可以支持到千级别的分区规模。如果分区规模持续扩大,且大部分分区都有读写请求时,原本的顺序读写就变成了随机读写,会导致kafka的读写性能急剧下降。
不同于碎片式存储,集中式存储则将所有消息集中存储到一个commitLog中。然后根据topic和分区信息构建队列,队列通常作为索引使用。相比于碎片式存储,集中式存储的主要优势是支持的分区数多,并且很容易通过删除旧的commitLog来控制磁盘的水位。
在阿里云的消息队列kafka版中,底层自研云存储引擎正式采用了集中式存储方式。云存储引擎相比kafka原生的存储引擎的主要优势有两点:一是解决了kafka分区规模扩大时性能急剧下降的问题,相比于原生kafka千级别的分区规模,其支持的分区规模可以达到10万级别;二是在大量分区同时写入的场景下,相比原生kafka的碎片式存储,自研云存储引擎能够获得更好的性能,同时我们对写入耗时做了优化,减少了毛刺的产生。
2、多副本技术优化
为了保证Kafka集群的高可靠和高可用性,我们为所有topic设置了3副本存储。机器宕机时,Kafka可以快速从可用的Follower副本中选出新的Leader,接替宕机机器上的Leader继续提供服务。
消息队列Kafka在选择块存储设备时,选择了阿里云上的云盘。云盘是阿里云为云服务器ECS提供的数据块级别的块存储产品,具有低时延、高性能、持久性、高可靠等特点。云盘本身采用了分布式三副本机制,为ECS实例提供了极强的数据可靠性和可用性保证。在此背景下,由于使用了云盘,在Kafka层面设置3副本,实际会有9个副本。同时,由于Kafka层面的Follower需要主动从Leader同步数据,这也会消耗集群的计算和网络资源,将用户的业务流量扩大至3倍。但是,如果在Kafka层面设置单副本,由于Kafka本身不能利用到云盘的三副本能力,其高可用性就不能保证。
因此,如何利用好云盘的三副本能力,降低存储成本和网络成本,就成了我们面临的一大挑战。
我们通过接入自研云存储引擎,解决了存储成本和网络成本问题。其核心原理主要是:在自研存储引擎中引入了逻辑队列和物理队列两个概念。逻辑队列是暴露给用户的概念,可以理解为客户端看到的partition;而物理队列则用于实际存储数据。这里会通过映射关系将逻辑队列和物理队列关联到一起。
在阿里的自研引擎中,所有的分区在逻辑上都是单副本的。数据的可靠性和可用性由云盘底层的三副本机制保证。发送到特定逻辑partition的数据,会根据映射关系写入到对应的物理队列中。同理,消费的时候也是根据映射关系从实际的物理队列中拉取。
那么,云存储是如何做到容错和高可用的呢?
如上图,在节点-0的ECS宕机时,可以通过QueueMapper秒级切换逻辑队列0的映射关系到节点1中的已有队列Queue-3,或新增一个物理队列Queue-4。此时,发往逻辑队列-0的消息,将被路由到Queue-3或者Queue-4中。这种情况下,用户的发送业务不会受到影响,依旧可以继续发送成功,并且最新的消息也能被消费到。
上述流程在Failover期间会存在一个问题:逻辑队列-0在节点-0上的消息暂时不能消费。但是,对大多数应用场景来说,短暂的部分消息消费延迟并不是大问题,只要不影响发送即可。
在节点-0的ECS宕机后,阿里云备用ECS会迅速生成新的机器替换节点-0,挂载原有云盘并分钟级时间内恢复节点-0服务。节点-0恢复后,只需要重新将逻辑队列-0的映射关系切回Queue-0,系统即可重新恢复原有状态。此时,发送/消费依旧能保持原生Kafka的特性。
通过以上方式,能够将存储成本节省到原生Kafka的1/3。同时,由于在Kafka层面的副本数是1,避免了Follower从Leader中同步数据的操作,网络流量也节省到原生Kafka的1/3。
3、弹性扩缩能力
弹性扩缩能力是消息队列的核心能力之一。由于Kafka服务端节点是有状态的,因此新增了若干节点之后,需要重新均衡各个Topic的队列,使得客户端往集群中发送或消费的流量,能均衡地打到后端各个服务节点上。
开源kafka在水平扩展了机器之后,实现数据均衡的主要方式有两种:
第一种,在新的broker中新增队列。这种方式主要的痛点是:
①系统状态发生改变,需要客户端主动重启,否则无法消费新分区;
② Kafka设计上的问题导致分片数无法下降,后续无法缩容。
第二种,数据迁移。这种方式的主要痛点是:①流量复制,产生网络风暴,干扰正常使用;②均衡与数据量有关,如果数据量巨大,可能要花费几天来迁移。
那么,云存储引擎是怎么解决以上弹缩问题的的?
前文提到,消息队列Kafka引入了两级队列:第一级为逻辑队列,第二级为物理队列,即阿里自研的云存储队列。逻辑队列对外暴露,物理队列则用于存储实际数据,通过QueueMapper模块维护逻辑队列与物理队列之间的映射关系。
一个逻辑队列可以由多个物理队列拼接而成,通过位点分段映射来保证顺序。扩容时,只需要将逻辑队列指向新机器上的物理队列即可,这样新写入的消息就可以根据新的映射关系,直接写入到新加的机器。同样,消费时也可以根据位点分段映射关系,找到实际的物理队列,然后从物理队列中读取消息。
上述方案通过两级队列分段映射的方式,解决了消息队列弹缩和迁移问题,它具有如下优点:
第一,服务端扩缩容后,不变更队列数量,保持系统状态不变。
第二,扩缩容时无需迁移数据,耗时短,可以在秒级时间内完成Topic队列重新均衡。
第三,兼顾了吞吐与扩展性,不影响原有消息队列的性能。