
阿里云技术专家,表格存储产品研发
前言之前参加DataFun社区举办的线上活动(2022.3.26 DataFunSummit大数据存储架构峰会),作为讲师分享了一站式物联网存储产品的架构设计的主题,本文是当天的演讲稿整理,在ATA上面也记录一下。感谢DataFun社区和整理编辑李瑶。演讲内容导读本文将结合一个常见的物联网场景分享其存储引擎架构和设计。通过对团队在过去一段时间承接的一些设备元数据案例进行总结,结合他们的需求和存储架构的演进,介绍在存储引擎层的一些沉淀和思考。今天的介绍会围绕下面四部分展开:业务场景介绍存储引擎迭代历程自研新存储总结和展望会重点介绍这个场景下用户选型的存储部分的历代架构,也会展开说说这些架构的优缺点,中间会涉及到一些存储引擎的原理。第三部分会介绍下我们通过自研新存储引擎如何承接这个场景,并解决之前架构遇到的各项问题。最后做一个总结和展望。场景介绍首先介绍下业务场景,以及这里会有一些什么问题需要去优化和解决。我们的场景是一个物联网设备元数据管理平台,需要接入多样化的设备数据,比如汽车终端设备、无人贩售机等。设备数据中很重要的一环就是设备元数据。设备元数据主要维护设备的基础信息,包括设备的id、设备类型、设备状态等,还可能会根据不同类型有一些扩展属性,例如车联网中的设备需要存储他的当前地理坐标等。设备元数据与设备时序数据不同,一个设备通常仅保留有限条记录,尤其是最新的状态信息。在上述场景下,最基础的需求就是存储需求,把元数据进行统一规整的存储。我们需要接入不同类型的设备。随着物联网的发展,设备数量级会有爆发性增长,可能会达到亿级。同时,这些设备状态可能会在线高频更新,比如地理位置等信息可能会频繁更新。在累积了海量数据后,会有一些拓展需求,比如去做一些复杂的设备检索和圈选,还会有一些活动带来业务访问量的激增。基于这些需求,我们的业务架构大体如下:在各种设备会有一个我们的客户端,定期上传设备元数据信息。在云上,有一个接入网关服务器,接入各类设备的原始消息数据,经过处理我们会使用各类存储进行数据存储。通常会用消息队列做消息的写入。数据库做设备元数据的长期存储,缓存承载一些热数据的高频查询。下文将重点展开介绍数据库存储这部分的设计。存储引擎迭代第一代架构第一代架构比较简单和直观。使用 MySQL 单库存储我们的设备元数据,CRUD 都基于 SQL 接口进行访问。在活跃设备数量级万级别,终端类型个位数级别,峰值读写百 QPS 级别,也就是访问量不是很大的情况下,这套架构可以很好地满足我们的需求。整套架构的优势就是开发便捷,架构简单。但是随着业务发展,尤其是我们定位做一个设备统一管理平台,接入不同类型的设备。在万物互联时代,设备的量级可能突破亿级别甚至更高。随着存储量级增长,活跃设备带来的更新操作也会大幅增长,峰值读写可能达到10k+ QPS。这时单机的MySQL就很难满足我们的需求了,我们很自然的想到一个架构的升级,即基于 MySQL 进行分库分表。MySQL分库分表可以在一定程度上满足我们的需求,但整体来说这样一套架构在物联网场景还是有些笨重,也存在一些问题。当面临业务改造,我们可以借助一些相对成熟的 MySQL 中间件来降低改造开销,但依然存在维护这样一个分布式MySQL集群的运维成本。在物联网设备场景下,通常对完整的事务能力要求没那么高,随着数据的写入规模增长,这套架构可能难以满足弹性的诉求。同时,在遇到一些复杂查询和分析需求时,比如前文提到的设备圈选,在海量设备中基于一些“组合条件”选中部分设备,并拉取这些设备的一些信息。这种场景在MySQL分库分表的架构也很难满足需求。架构还需要进一步演进。第二代架构业务会引入一个开源搜索引擎 ElasticSearch,形成一个MySQL+ES组合的方式,MySQL承接一些基础的查询,而ES负责复杂的检索和查询。ES 擅长全文检索,多字段组合查询等,并且 ES 本身是分布式的架构对亿到数十亿的设备存储并不是一件难事。这个架构拓展了 MySQL 的查询能力,但是本身并没有解决分库分表的维护成本。由于 ES 本身并不是一个写入即可见的存储系统,通常不太能直接替换MySQL 做整个业务的主存储。因此业务方不得不同时维护两套存储系统。有些业务方会进一步优化,把分库分表的MySQL替换成 Hbase,分布式 NoSQL 数据库。替换的原因主要在于,MySQL是关系模型,而物联网设备云存储对关系模型并没有很强的依赖,这时Hbase 相对灵活的存储模型(schema free)就更有优势。Hbase天然是分布式的架构,并且采用存储计算分离的架构,做计算节点或者存储节点扩容都相对 MySQL 要容易很多。Hbase 加上 ES 的组合可以在扩展性和功能上很好的满足我们这一场景的需求,业务的能力也可以提升到存储亿级别设备,100k+的峰值读写请求,大量复杂查询也可以得到支持。我们再来仔细对比一下已经提到的三个存储系统,MySQL,Hbase 和 ES。MySQL是单机的架构,采用B+树索引,有着丰富的生态,以及内嵌的索引,提供SQL查询的能力。它擅长中小数据集合下的CRUD,对一致性、事务关系模型要求比较高的情况下,MySQL是一个比较好的选择。Hbase是分布式架构,采用了LSM的索引结构,有着自己的大数据生态,没有原生SQL引擎,相比于MySQL最大的特点就是存储计算分离,在存储或计算达到瓶颈时可以分别去做扩容,这样可以大大降低成本。同时它采用Range分片,这样可以做灵活的切片和快捷的定位查找。它还具有写入能力强的特点。Hbase擅长海量数据的存储和简单查询。ES也是分布式架构,使用Lucene倒排,有着自己的ELK生态,同样没有原生SQL。它虽然使用分布式架构,但并非存储计算分离,而是用本地磁盘读写,因此要保证数据可靠性,可能要维护多个副本。数据分片采用Hash策略。同时基于Lucene特点,它的更新能力相对Hbase较弱,它擅长的场景是全文索引和复杂查询。再进一步对比Hbase和ES:Hbase和ES分区方式不同。以数据ID列0-100为例,Hbase采用Range分区,分区1承载了0-60,分区2是剩下的61-100。而ES采取Hash方式,数据不是连续排布,而是离散分开的。同时,Hbase的数据组织方式是连续排序的方式,如果查询20-30,那么可以很快定位到分区1,找到是哪一个文件,把文件中的数据通过连续定位扫描出来。ES文件存储采用倒排方式,通过倒排列组合去查询出数据。通过以上对比,可以看出Hbase和ES是互补的,Hbase 擅长高并发的简单读写,ES 更新能力大不如 Hbase,但是复杂查询能力,全文检索等能力也是 Hbase 所不能做的。因此用户往往需要结合Hbase和ES。下表是对前述架构优缺点的一个总结。那 Hbase + ES 是不是就已经满足所有需求了呢?其实还是存在一些问题的。Hbase 和 ES 不论那个存储组件单拿出来都已经需要不小的运维能力,同时 Hbase 到 ES 也没有非常成熟的数据同步方案,前面也提到 ES 的写入能力和 Hbase 不太对等,所以 ES 峰值写入可能扛不住。我们在做查询的时候也很难实现统一查询,需要分别去两套系统里面做对应的查询,想原生使用 SQL 也更是奢求。因此我们希望通过自研一套架构,充分利用这些存储引擎的优势,并解决存在的问题。自研新存储架构设计我们所面对的业务需求可以总结为三大方面:海量设备数据接入、高频更新,以及圈选、复杂检索和分析。基于这些需求,我们希望这样去设计架构:底层是DFS分布式文件系统,脱离本地盘,并将计算和存储分离,解决成本和扩展的问题。在计算层,分区策略吸取Hbase和ES各自优势,同时支持Range分区和Hash分区。为了同时满足简单查询和复杂查询,会有两种不同的存储类型,主存储和索引节点。为了应对业务激增,我们还会有只读节点的扩展能力。由于维护两套存储系统,可能会用两套系统分别去查询,查询接口可能并不很友好,所以我们希望提供原生API和SQL的方式给用户使用。基于以上设计,我们的架构就会演进成如下图所示:如果希望保留MySQL,可以通过MySQL的binlog复制到我们的存储引擎中来,由我们的系统承担所有的查询能力。或者可以直接用我们的存储来替换掉MySQL,实现统一的元数据存储。为了满足我们的设计需求,存储引擎需要包括以下核心功能:首先是采用分布式架构,并且存储与计算分离;使用LSM存储引擎,可以承载更高并发的写入能力,同时支持CDC数据订阅能力,解决数据同步问题;自动分区扩展,当业务量上涨,快速增加计算节点,需要做分区扩容,对一些高并发的读场景,做一些只读副本的扩展;多元索引,包括二级索引、倒排索引、多维度空间索引等。介绍了这些概念和设计。我们再来深入的就海量设备圈选这一场景,看一下我们在自研存储引擎时遇到的问题,以及如何进一步优化。场景分析上图中的SQL是场景的一个样例,比如查询最近一小时都没有活跃过的设备,并且设备版本号>xxx,状态和类型满足某些条件的设备是哪些。实现时,首先从接入层传入,通过索引获取设备ID,再通过主存储去查询设备的其它属性,返回数据。当查询量很大时,可能无法一次返回,就需要循环迭代的去翻页查询。循环过程如上图所示:根据条件构造请求基于条件拉取过滤命中数据,开发返回第一批 ID 和 nexttoken 用来翻页根据 ID 获取反查获取所需所有列,返回数据和 nexttoken根据nexttoken 循环进入 2亿到数十亿级别设备中,根据条件进行圈选,召回的数据从百级别到千万级别,希望做到近实时,对于千万级别数据,吞吐可以达到 200w/s,同时不影响线上其他访问。而实际上假设我们一次查询拉取1000条数据需要耗时30~50ms,一秒的吞吐数据也就在几万行级别,性能远不及预期。我们不难想到除了一些过滤优化降低查询延时等常见手段外,我们最直接获取更高吞吐的方式就是加并发。我们可以让业务设计一个分桶key,所有的数据都带上这个key,在查询的时候我们可以带上这个key 让不同分桶的数据可以并发。比如有20个桶,吞吐就可以翻20倍。理论上确实如此,但是我们很快发现这样仍然达不到百万级别,原因是什么呢?是因为我们的查询会有读扩散,每次查询需要路由到很多节点查询,并发高了以后整个集群的请求量会随着一个索引的节点数以及请求并发数倍数增长。这时候不仅上限很容易触发,其他在线读写也会受到影响。我们需要理解存储原理做一些优化。这里就引入了一个概念“routing key”,即路由key,虽然整个索引的数据是 hash 打散的,但是我们可以定制一些打散路由规则,类似前面分桶,我们在存储层也基于一个规则进行分部,每次查询的时候分桶key 变成这个路由key,单一请求只会路由到一个分区节点,这样大大减少了读扩散带来的开销。我们的吞吐此时可以达到百万级别/s。下图分别是优化前和优化后。优化以后,我们发现性能已经接近满足业务需求。但是随着吞吐百万数据,对我们的内存,cpu的开销还是大幅增长。如何再进一步优化单次请求对服务的压力开销呢?我们进一步剖析存储的原理不难发现,一次请求会访问多个文件,从多个文件中捞取我们要的数据,但是因为涉及到单次访问数据的限制,很多命中的数据不能一次返回,我们需要反复交互进行读取。但是这种读取是没有上下文的,每一次请求过来都要重新去构造中间的数据结构,也没有保留哪些文件访问过哪些没有访问过,存储引擎需要重新过滤数据,获取对应的完整结果。这也正是导致CPU和内容开销大的原因。我们要进一步优化的方式就是增加上下文的能力。我们提供了一个针对这种海量数据圈选的接口ParallelScan。在接入层和存储节点进行交互,生成上下文ID,在每次请求时都会带上这个ID。同时在存储节点中会维护这些ID,保留当前有哪些文件需要被查询,也会保留每次查询命中的文件,记录这些上下文。这样业务方不再需要自己去设置分桶或routing key去实现并发,而是天然的通过系统去实现并发,我们会根据分片数,以及每个分片有多少数据去切分并发,一方面让切割更加均衡,同时也做到对业务方的无感知。这样的设计经过我们实测会减少大约80%的开销。最后再来总结对比一下我们的自研存储引擎架构和传统架构:总结和展望以上通过一个具体场景介绍了我们自研的存储系统,并与传统开源系统进行了对比。统一的存储引擎带来了架构简化,满足了多存储引擎的需求,在解决了数据的基础存储的同时,支持高并发读写,以及复杂查询。我们的系统也在快速迭代中,有如下一些工作在进行中:多介质分层存储 (SSD,HDD,OSS)基于现有 SQL 能力进行扩展分布式 OLAP 能力存储引擎支持更强大的计算能力,更高效地支撑 OLAP 计算。
表格存储(Tablestore)是阿里云自研的多模型结构化数据存储,提供海量结构化数据存储以及快速的查询和分析服务。表格存储的分布式存储和强大的索引引擎能够支持 PB 级存储、千万 TPS 以及毫秒级延迟的服务能力。使用表格存储你可以方便的存储和查询你的海量数据。表格存储在 21 年 9 月正式公测了 SQL 功能,使得你在享受表格存储全托管,灵活的存储能力之外,可以让你的业务迁移更加平顺。经过几个月的公测和反馈,我们今天正式发布表格存储 SQL 商业化版本,基于这个版本,您可以放心的进行生产业务的迁移。目前我们提供控制台,官方 cli,官方 Java SDK 和 Go SDK,以及表格存储的 JDBC Driver,Go database driver 等多种方式进行 SQL 的访问。 具体可以参考我们的官方文档。本文主要介绍下表格存储 SQL 功能的使用场景和功能,基本概念和 SQL 使用最佳实践。具体计费详细介绍可以参考SQL查询计量计费。使用场景表格存储 SQL 和之前的产品功能一致,是全托管免运维的,当使用 SQL 访问表格存储和 SDK 直接读取数据在费用上一致,不会额外收费。这样在可以享受访问接口的便利性和计算能力同时,对于一些之前需要拉取较多数据二次计算的场景可以大幅减少客户端的数据传输,提升访问性能。那我们这里我们简单总结下表格存储 SQL 适合的场景。基础数据查询GetRow / GetRange 的替换。如果您的场景是大量是用表格存储进行基于主键的点查,范围查询。那么你可以很方便的替换成 SQL 的查询,同时我们可以实现线上几万每秒到几十万每秒的高并发访问能力。基于多元索引的检索与统计聚合使用多元索引的各类多字段检索,统计聚合能力等。如果您的场景是使用多元索引,进行多字段组合查询,又或者是全文检索,统计聚合查询等,这些您可以不再需要理解 SDK 的写法,直接使用 SQL 来进行数据链路的访问。相信在查询相对复杂的情况下,SQL 可以更方便您进行查询语义的表达。主表与索引组合查询主表,索引的组合查询使用。我们提供自动索引选择能力,可以在您查询时,根据查询条件预估查询开销,进行智能的索引选择。轻量级统计聚合轻量级的聚合分析,对于主表和全局二级索引,我们支持基于索引命中后进行扫描 10w 条记录的实时在线聚合计算(如有更大规模计算可以联系我们调整)。对于能命中多元索引查询,我们会利用索引引擎的计算能力不限制单次计算数据量。时序模型的查询和分析时序模型的各类查询场景,表格存储推出了针对时序场景的定制化存储引擎和分析能力。您也可以使用同样的访问入口,针对时序模型的数据进行各类查询,这里我们会提供一些针对时序场景的定制算子支持。对于目前具体支持的 SQL 语法列表可以在我们的官方文档中查看。如果您的场景没在上述列表中,您也希望通过 SQL 来进行访问,您可以通过工单或者钉钉群找到我们进行反馈,我们的 SQL 功能在快速迭代,会有更多更强大的能力持续发布。基本概念表格存储是一个 Schema free 的结构化存储产品,所以在您使用之际,只需要定义主键,不需要定义完整行的结构。但是在 SQL 查询中,通常是需要有 Schema 概念的,便于做数据类的的检查,投影,计算等操作。所以我们定义了 SQL 绑定概念。即在第一次使用SQL的时候,对一张表,需要通过 Create Table 语句进行表 Schema 定义。这个语句和普通的创建表没有区别,但是对于已经在 OTS 的表,他不会重新创建数据表,是进行一个 Schema 定义并在SQL 中关联。关联后即可以对已有的表格存储表开启 SQL 查询。表格存储之前使用实例,表和索引这些概念表示对应的资源,对应在 SQL 中我们也有标准的映射,具体如下:名词描述数据库按照数据结构来组织、存储和管理数据的仓库。一个数据库中可以包含一个或者多个表,映射为表格存储的实例表由行和列组成,映射为表格存储的表。索引映射为表格存储的二级索引或者多元索引。进行建表(前置条件表格存储中该表已经存在,通过 SQL 语法定义 Schema):建表语法CREATE TABLE [IF NOT EXISTS] table_name(column_name data_type [NOT NULL | NULL],... | PRIMARY KEY(key_part[,key_part]));建表样例CREATE TABLE exampletable1 (id BIGINTPRIMARY KEY, colvalue BIGINT, content MEDIUMTEXT);你可以通过控制台页面,或者 SDK 执行这条 SQL 语句。其中如果您使用了控制台,控制台可以根据表中已有数据做一些探测,自动帮助您生成建表语句。如下图所示点击确认后,探测出的 Schema 我们会自动产出一条 SQL 语句。在创建了绑定表以后,就可以像查询 MySQL 一样进行数据查询了。最佳实践同样的查询需求,不同的 SQL 写法或者存储引擎也会产生很大的性能差别。您可以参考我们的 "索引选择策略" 和 "计算下推"。更优的索引选择会减少单次扫描的数据量,提升性能,计算下推会把可能的算子推到存储层,减少存储层和计算引擎之间的数据交互,这两部分的能力都可以大幅降低整体的访问延时。这里我们举两个例子做为说明:场景1 基于索引进行多字段组合查询假设我们有一张订单表,表中会存储订单的明细各类字段。其中在表格存储中的主键定义如下(备注:场景中的案例使用测试数据,非生产样例):在表格存储中的数据样例如下:针对这张表,如果我们基于主键查询,可以很快获取结果例如SELECT * FROM `ordertable` where `order` = "xxx"这是因为这样的一个查询条件会被利用主键索引,变成一个小范围的 range 查询,如果你指定了全部主键就会自然的变成点查询。那如果我们希望通过属性列来查询数据性能会怎么样呢?熟悉存储原理的同学不难理解,如果没有其他额外索引,我们需要通过属性列进行过滤,那往往可能会触发整表的数据扫描再返回最终结果。这时候性能可能会大不及预期。如果我们创建了一个多元索引,包含您要查询的所有字段。性能会非常的快:这是因为,我们 SQL 会根据您的索引情况,进行动态选择,把 SQL 查询转化成对应的 Search 查询,利用倒排索引的特性,快速执行整条 SQL 语句。场景2 计算下推我们继续使用这张订单表,假设我们的查询除了过滤还有一些统计聚合。在没有多元索引的情况下,SQL 会先根据 Filter 捞取命中的数据,然后在执行统计聚合。有多元索引的情况下,SQL 会利用 Search 支持轻量级聚合的能力。例如如下 SQL 场景,我们希望按照用户维度查看购买 top 20 的订单金额信息。SELECT sum(productprice) as total, customerid FROM `ordertable` group by customerid order by total desc LIMIT 20;在没有多元索引的场景下,如果您的订单总量很大,会触发到我们的扫描上限(有放大需求可以加钉钉群或者工单咨询)。同时也会导致性能较差,如果使用了多元索引,就可以在秒级别进行这样的聚合查询。最后对于 SQL 的使用性能问题,索引如何优化问题,欢迎加入我们的交流群讨论。总结更详细的功能介绍,欢迎参考表格存储官网文档,可以查看具体的 SQL 语法,用例,限制项等。想了解更多表格存储的用法或者咨询欢迎加群讨论:我们的开发者技术交流群,可搜索群号『11789671』或『23307953』,亦可直接扫码加入。
功能介绍表格存储(Tablestore)是阿里云自研的多模型结构化数据存储,提供海量结构化数据存储以及快速的查询和分析服务。表格存储的分布式存储和强大的索引引擎能够支持 PB 级存储、千万 TPS 以及毫秒级延迟的服务能力。使用表格存储你可以方便的存储和查询你的海量数据。表格存储正式发布了 SQL 功能,满足用户业务平滑迁移到表格存储并可以继续通过 SQL 方式访问表格存储,表格存储在传统的NoSQL结构化存储之上,提供云原生的 SQL 引擎能力,SQL查询兼容 MySQL 的查询语法,同时提供基础的SQL DDL 建表能力。对于已有数据表,通过执行 CREATE TABLE 一键自动建立映射关系后,您可以使用SQL方式访问表中数据。我们这次发布用户可以通过控制台进行部分 SQL 功能的访问,后续我们会发布 SDK 以及更多的 SQL 功能。基本概念名词描述数据库按照数据结构来组织、存储和管理数据的仓库。一个数据库中可以包含一个或者多个表,映射为表格存储的实例表由行和列组成,映射为表格存储的表。索引映射为表格存储的二级索引或者多元索引。快速上手下面就来结合控制台操作介绍下具体如何使用我们的 SQL 功能。登录控制台进入实例管理页面点击 “SQL查询” 切换进入 SQL 页面。第一次使用你会发现怎么我的“数据库”下面没有表,别慌,点击➕按钮,进行绑定,对于存量表,这个绑定只是用来辅助 SQL 定义 Schema,不会对你的数据产生任何影响,可以放心使用。点击加好自动生成创建对实例下的存量表的映射 SQL,你可以根据属性列的情况,对 SQL 做一些修改。选中 SQL 后点击执行 SQL,创建对应的表映射。此时可以开始进行数据查询,执行结果栏会展示查询的 SQL 结果。其他更详细的功能介绍,欢迎参考表格存储官网文档,可以查看具体的 SQL 语法,用例,限制项等。想了解更多表格存储的用法或者咨询欢迎加群讨论:我们的开发者技术交流群,可搜索群号『11789671』或『23307953』,亦可直接扫码加入。
开篇表格存储做为一款结构化存储系统,近期发布了新功能 SQL,大幅简化了查询的门槛,用户无需学习繁琐的 SDK,也不用区分表,索引等不同的接口,可以像访问传统的 MySQL 这类数据库一样,使用 SQL 的方式访问云原生的结构化大数据存储。下面我们就来具体实操下,看看查询用起来顺不顺手。操作举例话不多说我们就开始进入表格存储的控制台进行一番操作。实例详情页这里我们准备了两张订单表。右上角有 SQL 查询 标签,点击这里即可进入我们的 SQL 查询页面。SQL 查询页面1. 表绑定进入页面,你会发现,在数据库“sqltest” 下面并没有表,不用慌,这是因为这些表不是通过 SQL 引擎创建出来的,本身表格存储并不要求您定义完整的表 Schema,即 Schema free 的灵活方式,但是为了实现 SQL 查询,我们需要这里定义一下 Schema,注意这里的定义不会影响你的线上非 SQL 写入,只会给 SQL 查询用,您可以放心的设置。点击➕图标,可以选择一个表,我们会帮您生成一个默认的创建定义SQL Schema的模板,语法和创建物理表是一样的。你可以根据需求去添加,删除 Create Table 中的列,注意数据类型的映射关系可以参考数据类型映射。很快我们绑定好两张表后,就可以开始体验查询了。2. 查看绑定信息这里可以查看到您之前的绑定信息,同时在OTS中的主键也会提示出来。主键会和后续我们的查询性能有很大的关系。3. 查看索引索引也会对查询性能有很大影响,后面随着我们的查询会在详细解释。4. 体验查询点查熟悉表格存储的同学应该知道,ots 中有一个查询接口叫 GetRow,那什么样的SQL会类似点查的效果呢。select * from ordertable where `order` = '0000004bf78d' and orderid = 'o132042352'当我们的 SQL where 条件中,包含了所有的pk,并且用 = 确定具体 pk 的值,这时候就会触发我们的点查。以上面的 SQL 为例。针对这类点查,我们的 SQL 引擎可以承载高并发低延时的访问,这个和我们的存储引擎有着同样的特性。范围查询select * from ordertable where `order` >= '0000004bf78d' limit 100当你的查询指定的是部分主键,或者主键有返回,那么就会变成表格存储的RangeScan进行查询。范围查询和过滤select * from ordertable where `order` >= '0000004bf78d' and salespersonid = 's0065' limit 100指定了属性列的一些条件,就会添加过滤条件。注意这里,可能会有多次数据和存储进行交互,因为可能需要持续扫描,以这个为例,我们扫描出100条满足条件得到数据花费了1.1s。扫描数据超限SQL 查询相对表格存储之前的固定pk方式的扫描要灵活许多,所以可能会遇到一些问题,例如您的查询因为没有合理指定条件,导致了全表大范围的扫描,这里我们对您的存储和消耗资源做了保护,对于一些大的查询我们进行了限制。例如:select * from ordertable where `order` >= '0000004bf78d' and salespersonid = 's0065' order by ordertime limit 100这样的请求可能会扫很多数据过滤然后在进行排序才能返回结果,此时就会触发我们的超限。控制台会看到如下错误提示:这里我们只是做了保护,具体的限制项可以参考我们的官网文档,SQL使用限制,如果希望更大规模的数据计算需求,可以联系我们。5. 索引加速上面最后一个例子,我们会发现有些场景跑起来会比较慢因为要扫描很多数据,又或者直接超过了我们的扫描数据上限。那是不是就束手无策了呢?那既然您看到这里,我们肯定会给您一个优化的方案就是:创建多元索引。统计表行数约10亿行数据,进行一次全表count,1.7秒。基于属性列的过滤可以看到查询还是非常快的,这是因为有了多元索引后,我们的 SQL 查询过滤条件会被下推到存储层,减少数据的扫描和传输。那这时候你可能会问如果我带一些聚合查询会快么,那我们来尝试一发:统计聚合一些聚合函数也都可以在秒级别返回。这是因为我们的聚合函数,groupby 也下推进入存储索引层,加速了整个过程中的数据传输损耗等。小结如果你的查询多基于 pk,可以直接通过绑定表进行数据操作了。如果你的条件相对比较灵活,又或者有一些聚合计算需求,推荐你创建我们的多元索引,然后可以体验到毫秒到秒级别的 SQL 查询体验啦!最后欢迎扫码加群和我们一起讨论你的使用反馈!更详细的功能介绍,欢迎参考表格存储官网文档,可以查看具体的 SQL 语法,用例,限制项等。想了解更多表格存储的用法或者咨询欢迎加群讨论:我们的开发者技术交流群,可搜索群号『11789671』或『23307953』,亦可直接扫码加入。
本文介绍使用 Data Lake Formation (DLF)服务,实时订阅 Tablestore(原 OTS) 的数据,并以 Delta Lake 的格式投递进入 OSS,构建实时数据湖。 架构介绍 表格存储是一种全托管的云原生数据库,使用表格存储您无需担心软硬件预置、配置、故障、集群扩展、安全等问题。提供高服务可用性的同时极大地减少了管理成本。 表格存储支持多种数据库模型,可以广泛应用于时序数据、时空数据、消息数据、元数据以及大数据等核心数据场景。当海量的数据存储在表格存储中您希望把数据实时汇聚在 OSS 构建内部的大数据数据湖时,可以使用 Data Lake Formation 提供的托管的数据投递功能,把写入表格存储的数据实时 ETL 到 OSS 中,比以 Delta Lake 的格式存储在 OSS 之上。进入 Delta Lake 后的数据可以再做进一步的流计算或者批计算,大体架构如下图所示 本文会重点介绍如何操作快速构建上述的架构图。这里我们假设你已经购买并使用 Tablestore 做为你的存储选型,如果还没有使用,可以参考这里。购买并开通表格存储服务,使用我们的 SDK,或者各类数据管道,导入工具例如 Datax,数据集成服务,Datahub,DTS,又或者计算引擎 Spark,Flink,把数据写入表格存储。下面我们重点介绍如何把这些数据进行 Delta Lake 的数据构建。 数据湖构建环境准备 1.登录数据湖构建 2.创建新的入湖模板 选择实时 OTS,这里 DLF 会使用 Spark Streaming 的方式去订阅 Tablestore 中的数据。Tablestore 的 CDC 因为支持灵活的数据订阅能力,包括全量,全加增,以及增量三种模式,所以可以使用同一的流式入口。 3.注意如果是第一次使用,需要创建配置一下数据湖位置,即上图中的目标数据库,选择一个你的 OSS 路径。配置后点击下一步,如图中所示我们创建了一个 BaseAndStream 类型的通道,使得数据湖中可以包含表格存储的全量数据以及未来的新增实时数据。CU 可以根据你的数据写入量和大小进行动态配置。这里我们设置了10cu。 4.创建好入湖模板后,点击运行即可开始数据的实时入湖工作流。 5.点击运行,投递任务开始进行,这时候等一小段时间就可以在 OSS 中看到你的 delta 数据,分为 log 路径和data 路径,如下图所示: 数据文件使用的也是 parquet。这时候当数据实时汇聚在 delta 后,就可以开始我们的基于 deltalake 的数据分析处理了。 6.除了通过 dlf 的监控查看消费情况,还可以在 Tablestore 控制台查看数据投递进度: 数据湖数据分析 这一节来简单介绍下,数据通过 DLF 投递入湖后,如何进行数据的分析。这里我们使用 EMR Spark 来进行数据分析。使用了 DatalakeFormation 进行数据投递后,创建 emr 集群选择数据湖元数据,做为 emr 元数据。此时我们之前投递的 OSS 数据湖可以自动关联外表。 集群创建好后,我们登陆集群启动 Spark SQL, spark-sql --master yarn --num-executors 4 --executor-memory 8g --executor-cores 4 执行showdatabse,dlf create oss 数据湖会被list出来: 20/12/29 11:26:29 INFO [main] SparkSQLCLIDriver: Spark master: yarn, Application Id: application_1609144808351_0011 spark-sql> show databases; 20/12/29 11:26:51 INFO [main] SparkSQLQueryListener: command is called 20/12/29 11:26:51 INFO [main] SparkSQLQueryListener: Spark user root executed on 1609212411579 with spark sql successfully. 20/12/29 11:26:51 INFO [main] CodeGenerator: Code generated in 168.938794 ms 20/12/29 11:26:51 INFO [main] SparkSQLQueryListener: execution is called 20/12/29 11:26:51 INFO [main] SparkSQLQueryListener: Spark user root executed on 1609212411922 with spark sql successfully. default dlftpch1 testlakeformation1 Time taken: 1.344 seconds, Fetched 3 row(s) 20/12/29 11:26:51 INFO [main] SparkSQLCLIDriver: Time taken: 1.344 seconds, Fetched 3 row(s) 进入dlf的数据库dlftpch1,执行show tables; deltalake的数据湖也会被自动关联外表,schema和我们的DLF schema一致。 spark-sql> show tables; 20/12/29 11:28:18 INFO [main] SparkSQLQueryListener: command is called 20/12/29 11:28:18 INFO [main] SparkSQLQueryListener: Spark user root executed on 1609212498535 with spark sql successfully. 20/12/29 11:28:18 INFO [main] CodeGenerator: Code generated in 9.697932 ms 20/12/29 11:28:18 INFO [main] SparkSQLQueryListener: execution is called 20/12/29 11:28:18 INFO [main] SparkSQLQueryListener: Spark user root executed on 1609212498560 with spark sql successfully. dlftpch1 tpchdata1 false Time taken: 0.121 seconds, Fetched 1 row(s) 20/12/29 11:28:18 INFO [main] SparkSQLCLIDriver: Time taken: 0.121 seconds, Fetched 1 row(s) 然后我们就可以在这张deltalake表上进行adhoc的sql计算 spark-sql> select count(*) from tpchdata1; 20/12/29 11:29:11 INFO [main] SparkSQLQueryListener: Spark user root executed on 1609212551026 with spark sql successfully. 528026067 Time taken: 21.712 seconds, Fetched 1 row(s) 20/12/29 11:29:11 INFO [main] SparkSQLCLIDriver: Time taken: 21.712 seconds, Fetched 1 row(s) 我们可以查看下这张外表的create语句: spark-sql> show create table tpchdata1; 20/12/29 11:31:21 INFO [main] SparkSQLQueryListener: command is called 20/12/29 11:31:21 INFO [main] SparkSQLQueryListener: Spark user root executed on 1609212681298 with spark sql successfully. 20/12/29 11:31:21 INFO [main] SparkSQLQueryListener: execution is called 20/12/29 11:31:21 INFO [main] SparkSQLQueryListener: Spark user root executed on 1609212681306 with spark sql successfully. CREATE TABLE `tpchdata1` (`l_orderkey` INT, `l_linenumber` INT, `l_comment` STRING, `l_commitdate` STRING, `l_discount` DOUBLE, `l_extendedprice` DOUBLE, `l_linestatus` STRING, `l_partkey` INT, `l_quantity` DOUBLE) USING delta OPTIONS ( `serialization.format` '1', path 'oss://lakeformation1/DLF-tpchdata1' ) 总结 本文介绍了使用 DLF (Data Lake Formation)的实时数据湖构建能力,订阅 Tablestore 的全增数据,构建流批一体的数据湖存储格式 Delta Lake。并在实例中使用 EMR Spark 进行构建数据的交互分析。整套架构可以帮助你基于 Tablestore + OSS 两套 Serverless 存储,低成本的构建实时的数据读写和分析。对细节架构感兴趣的同学欢迎加群交流(群号:23307953)。
背景 Tablestore 启发自 Google 的 Bigtable 论文,从2009年开始,在阿里云的飞天团队内,开始萌发。经过10年的锤炼,如今在集团内,云上积累了各式各样的客户和场景。这篇文章我们就来介绍下 Tablestore 大数据场景下的架构和用户场景案例。看看自研的分布式存储平台如何通过贴身理解客户场景,带来大数据架构的简化和升级,并助力业务方最终快速落地他们的需求。 大数据系统建设中的痛 数据的产生是多式多样,我们根据需求会选择不同的存储产品来存放这些数据,例如订单数据,用户信息数据我们会选用一款 OLTP 数据库,日志数据会选择 SLS 日志服务,或者自建 ELK 的方案。又有一些数据我们希望做实时的 计算,并且在 ETL 后在进行长期存储,这时会把数据双写到 Kafka,利用 Kafka 流式对接计算引擎的能力实现上面的需求。一些非结构化数据又会使用 HDFS/OSS 来进行长期存储。这些数据从存储到可以给计算引擎分析需要经过很长的链路。例如如果你选用了一款 OLTP 数据库,你希望进行实时计算或者定期批量计算。考虑到避免影响 TP 在线业务,通常我们会把数据实时或者定期投递出来,进入 Kafka/AP 数据库再分别实现实时计算和批计算。投递的过程可能会涉及到很多模块,例如 DTS,Kafka,Spark 批计算,Spark Streaming,AP 数据仓库。搭建一整套大数据架构(例如下图的典型大数据 Lambda 架构),不论是对数据库的研发同学,还是运维同学都带来了较高的门槛。模块多了以后,不论是维护成本还是开发成本都会相应提高。同时各个模块的存储,计算资源可能未必能很好的共享,比如写入峰值到来的时候,可能我们的批计算需求并不大,但是此时批计算的集群资源很难直接让实时写入层共享。反过来也一样,批计算的峰值,Kafka的资源,ETL的资源又未必可以给批计算共享。那有没有办法解决这些问题?降低大数据架构的接入门槛,使用成本,提升开发效率,资源利用率呢?我们下一章来说。典型的大数据 Lambda 架构 云上 Tablestore 的大数据方案 在开始介绍云上大数据架构之前,我们再看下,双十一大促下需要的大数据架构会对存储提出哪些具体的要求呢: 高吞吐,高可用的大数据架构同时满足弹性和低成本 做为大数据架构,分析的数据通常是海量的,吞吐决定了大数据分析的时延,如果不能及时处理数据会导致业务的决策滞后,进而让大数据方案大打折扣。 高可用同样是一个很重要的指标,往往业务上云就是希望可以告别线下自建系统的不稳定性。 做为双十一大促的主旋律,业务峰值是不可避免的,一套弹性的架构是帮助我们真正实现业务在大促下依然可以高可用,高吞吐的的利器。 资源成本 抛开成本谈弹性都是没有意义的,既然选择了云上的方案,我们自然希望可以获得较好弹性的同时,控制成本。 Serverless 云原生的产品形态通常可以比较好的在具备弹性的同时控制我们的成本开销。架构无需在业务低峰期预留很多不必要的资源来防御业务峰值的到来。 生态完善 大数据架构中涉及到的概念和需求非常多,这些需求也会对应不同的计算引擎,例如 Spark,Flink,Presto 或者云上的自研组建。存储需要和这些组建,开源产品进行无缝打通,这样可以降低使用的门槛。 除了计算引擎,数据通道链路的完善也很重要,例如日志类的数据和 OLTP 的数据能方便汇总,数据打宽等。 Tablestore 是一款 Serverless 云原生存储引擎,Serverless 相比实例售卖类型的产品,在业务有波峰波谷时天生就有较大的优势,基于 bigTable 的主存储采用行的方式进行存储,可以支撑单表亿级别的QPS。下面列了一些 Tablestore的核心特性:Tablestore 除了有强大的主存储满足海量业务的实时读写外,基于主存储的分布式日志提供了完整的数据派生能力(详情参考),海量实时写入 Tablestore 的数据,可以实时订阅进行消费。这样就满足了我们的实时计算需求。Lambda 架构中除了实时数据写入,实时计算之前,全量数据需要提供高性能扫描能力,Tablestore 采用行列混合,双引擎的架构,在主存储之外内部通过通道服务实时构建一个列存储,支撑 PB 级别数据的高吞吐扫描。同时在海量的数据场景下,我们相信数据是需要分层存储,所以在构建自身列存的同时,我们会帮助用户构建推送云上数据湖的链路,通过全托管的数据湖投递,降低用户的存储成本。基于 Tablestore 的 Lambda 架构Tablestore 在专注于打造一款极致性能和成本的存储引擎同时,更加关注完整的计算生态,伴随产品核心功能迭代的过程中,我们和阿里云的几大核心计算引擎做了完善的对接具体包括: MaxCompute 的对接,支持 MaxCompute 计算引擎通过外表的方式直读写 Tablestore EMR Spark 对接,支持流批源表读,流批结果表写,集团内第一款全 Connector 支持的 kv 存储引擎 Blink 对接,支持流批源表读,流批结果表写,维表读,集团内第一款全 Connector 支持的 kv 存储引擎 DLA 对接,支持 SQL 直接读写 Tablestore 的数据 FC 对接,支持流式增量触发器 计算的结果集合需要提供丰富灵活的高并发查询,支撑用户生成实时大屏,报表的场景,Tablestore 通过主存储结合丰富的索引能力以及 MPP 类型计算引擎来实现。Tablestore Lambda架构结果集合存储和报表展示对 Tablestore 大数据架构感兴趣的同学还可以参考之前的一些架构文章: 结构化大数据分析平台设计 数据中台之结构化大数据存储设计 Lambda plus: 云上大数据解决方案 基于大数据的舆情分析系统架构 - 架构篇 通过EMR Spark Streaming实时读取Tablestore数据 Tablestore 助力双十一集团业务的落地 介绍完 Tablestore 结构化大数据存储引擎的基础能力后,来看看具体的一些双十一集团业务架构: 实时酒店推荐 场景描述: 商家会实时的更新酒店房型库存和报价,酒店的属性等字段。实时选品过程针对同一个酒店不同供应商的库存和报价进行计算和筛选,最终展示出有库存且按照报价质量排序的列表给平台用户,进行酒店搜索的结果返回 架构优势: 在线和分析业务流量隔离 基于 Tablestore 对接流计算的便利性实现源表和维表的存储统一 全链路延时要求高,商品价格变动后,5秒内可以反馈至搜索库 基于 Tablestore 的酒店选品实时推荐架构 双十一监控作战大屏 场景描述: 海量的机器,业务监控秒级别数据,实时千万级别每秒写入流量,写入 Tablestore 数据实时查询和展示。跨时间的实时聚合,形成分钟级别数据。关键指标的实时聚合和展示,离线批计算提供报表分析能力。 架构优势: 核心单表数据规模达 10 PB,可自定义数据生命周期 核心单表持续每秒写入进 5000 万个数据点 数据实时写入,大大提升数据可见时效性 毫秒级实时查询展示趋势图和报表,查询性能不受单表规模约束 基于 Tablestore 时序监控大屏场景 全网爬虫系统 场景描述: 海量爬虫高并发高吞吐数据写入,全量数据存储,数据量达 PB 级,多类数据存储,包含半结构化原始数据、结构化标签数据以及分析结果数据,数据写入后需要即时触发数据实时处理,后续需要对接离线处理。 架构优势: 分布式 LSM 引擎数据存储,提供高并发高吞吐写入,PB 级数据存储 通过数据更新捕获,实时触发后续对数据的自定义处理逻辑 与大数据平台实时数据同步,分析结果写入结果表,供应用层实时查询 基于 Tablestore 的爬虫大数据架构 总结 随着处理数据量,处理实时性的变化,物理硬件处理能力的升级,大数据架构演进是非常迅速的。随着业务数据逐步上云后,大数据处理平台也必然会迁移至云上,相比 TP 类业务,AP 类场景的业务峰值,处理数据带宽可能会更大。我们希望合理的云上大数据架构可以借助存储计算分离,在可控的成本下提供更弹性的数据处理能力,解决各业务在大促到来所遇到的挑战。也让大数据平台的研发人员彻底从基础运维中释放出来,可以全身心的投入满足业务对数据分析平台的需求。如果有兴趣的同学欢迎与我们交流探讨(钉钉交流群:23307953)。
背景 Tablestore 启发自 Google 的 Bigtable 论文,从2009年开始,在阿里云的飞天团队内,开始萌发。经过10年的锤炼,如今在集团内,云上积累了各式各样的客户和场景。这篇文章我们就来介绍下 Tablestore 大数据场景下的架构和用户场景案例。看看自研的分布式存储平台如何通过贴身理解客户场景,带来大数据架构的简化和升级,并助力业务方最终快速落地他们的需求。 大数据系统建设中的痛 数据的产生是多式多样,我们根据需求会选择不同的存储产品来存放这些数据,例如订单数据,用户信息数据我们会选用一款 OLTP 数据库,日志数据会选择 SLS 日志服务,或者自建 ELK 的方案。又有一些数据我们希望做实时的 计算,并且在 ETL 后在进行长期存储,这时会把数据双写到 Kafka,利用 Kafka 流式对接计算引擎的能力实现上面的需求。一些非结构化数据又会使用 HDFS/OSS 来进行长期存储。这些数据从存储到可以给计算引擎分析需要经过很长的链路。例如如果你选用了一款 OLTP 数据库,你希望进行实时计算或者定期批量计算。考虑到避免影响 TP 在线业务,通常我们会把数据实时或者定期投递出来,进入 Kafka/AP 数据库再分别实现实时计算和批计算。投递的过程可能会涉及到很多模块,例如 DTS,Kafka,Spark 批计算,Spark Streaming,AP 数据仓库。搭建一整套大数据架构(例如下图的典型大数据 Lambda 架构),不论是对数据库的研发同学,还是运维同学都带来了较高的门槛。模块多了以后,不论是维护成本还是开发成本都会相应提高。同时各个模块的存储,计算资源可能未必能很好的共享,比如写入峰值到来的时候,可能我们的批计算需求并不大,但是此时批计算的集群资源很难直接让实时写入层共享。反过来也一样,批计算的峰值,Kafka的资源,ETL的资源又未必可以给批计算共享。那有没有办法解决这些问题?降低大数据架构的接入门槛,使用成本,提升开发效率,资源利用率呢?我们下一章来说。 典型的大数据 Lambda 架构 云上 Tablestore 的大数据方案 在开始介绍云上大数据架构之前,我们再看下,双十一大促下需要的大数据架构会对存储提出哪些具体的要求呢: 高吞吐,高可用的大数据架构同时满足弹性和低成本 做为大数据架构,分析的数据通常是海量的,吞吐决定了大数据分析的时延,如果不能及时处理数据会导致业务的决策滞后,进而让大数据方案大打折扣。高可用同样是一个很重要的指标,往往业务上云就是希望可以告别线下自建系统的不稳定性。做为双十一大促的主旋律,业务峰值是不可避免的,一套弹性的架构是帮助我们真正实现业务在大促下依然可以高可用,高吞吐的的利器。 资源成本 抛开成本谈弹性都是没有意义的,既然选择了云上的方案,我们自然希望可以获得较好弹性的同时,控制成本。Serverless 云原生的产品形态通常可以比较好的在具备弹性的同时控制我们的成本开销。架构无需在业务低峰期预留很多不必要的资源来防御业务峰值的到来。 生态完善 大数据架构中涉及到的概念和需求非常多,这些需求也会对应不同的计算引擎,例如 Spark,Flink,Presto 或者云上的自研组建。存储需要和这些组建,开源产品进行无缝打通,这样可以降低使用的门槛。除了计算引擎,数据通道链路的完善也很重要,例如日志类的数据和 OLTP 的数据能方便汇总,数据打宽等。 Tablestore 是一款 Serverless 云原生存储引擎,Serverless 相比实例售卖类型的产品,在业务有波峰波谷时天生就有较大的优势,基于 bigTable 的主存储采用行的方式进行存储,可以支撑单表亿级别的QPS。下面列了一些 Tablestore的核心特性: Tablestore 除了有强大的主存储满足海量业务的实时读写外,基于主存储的分布式日志提供了完整的数据派生能力(详情参考),海量实时写入 Tablestore 的数据,可以实时订阅进行消费。这样就满足了我们的实时计算需求。 Lambda 架构中除了实时数据写入,实时计算之前,全量数据需要提供高性能扫描能力,Tablestore 采用行列混合,双引擎的架构,在主存储之外内部通过通道服务实时构建一个列存储,支撑 PB 级别数据的高吞吐扫描。同时在海量的数据场景下,我们相信数据是需要分层存储,所以在构建自身列存的同时,我们会帮助用户构建推送云上数据湖的链路,通过全托管的数据湖投递,降低用户的存储成本。 基于 Tablestore 的 Lambda 架构 Tablestore 在专注于打造一款极致性能和成本的存储引擎同时,更加关注完整的计算生态,伴随产品核心功能迭代的过程中,我们和阿里云的几大核心计算引擎做了完善的对接具体包括: MaxCompute 的对接,支持 MaxCompute 计算引擎通过外表的方式直读写 TablestoreEMR Spark 对接,支持流批源表读,流批结果表写,集团内第一款全 Connector 支持的 kv 存储引擎Blink 对接,支持流批源表读,流批结果表写,维表读,集团内第一款全 Connector 支持的 kv 存储引擎DLA 对接,支持 SQL 直接读写 Tablestore 的数据FC 对接,支持流式增量触发器 计算的结果集合需要提供丰富灵活的高并发查询,支撑用户生成实时大屏,报表的场景,Tablestore 通过主存储结合丰富的索引能力以及 MPP 类型计算引擎来实现。 Tablestore Lambda架构结果集合存储和报表展示 对 Tablestore 大数据架构感兴趣的同学还可以参考之前的一些架构文章: 结构化大数据分析平台设计数据中台之结构化大数据存储设计Lambda plus: 云上大数据解决方案基于大数据的舆情分析系统架构 - 架构篇通过EMR Spark Streaming实时读取Tablestore数据 Tablestore 助力双十一集团业务的落地 介绍完 Tablestore 结构化大数据存储引擎的基础能力后,来看看具体的一些双十一集团业务架构: 实时酒店推荐 场景描述: 商家会实时的更新酒店房型库存和报价,酒店的属性等字段。实时选品过程针对同一个酒店不同供应商的库存和报价进行计算和筛选,最终展示出有库存且按照报价质量排序的列表给平台用户,进行酒店搜索的结果返回 架构优势: 在线和分析业务流量隔离基于 Tablestore 对接流计算的便利性实现源表和维表的存储统一全链路延时要求高,商品价格变动后,5秒内可以反馈至搜索库 基于 Tablestore 的酒店选品实时推荐架构 双十一监控作战大屏 场景描述: 海量的机器,业务监控秒级别数据,实时千万级别每秒写入流量,写入 Tablestore 数据实时查询和展示。跨时间的实时聚合,形成分钟级别数据。关键指标的实时聚合和展示,离线批计算提供报表分析能力。 架构优势: 核心单表数据规模达 10 PB,可自定义数据生命周期核心单表持续每秒写入进 5000 万个数据点数据实时写入,大大提升数据可见时效性毫秒级实时查询展示趋势图和报表,查询性能不受单表规模约束 基于 Tablestore 时序监控大屏场景 全网爬虫系统 场景描述: 海量爬虫高并发高吞吐数据写入,全量数据存储,数据量达 PB 级,多类数据存储,包含半结构化原始数据、结构化标签数据以及分析结果数据,数据写入后需要即时触发数据实时处理,后续需要对接离线处理。 架构优势: 分布式 LSM 引擎数据存储,提供高并发高吞吐写入,PB 级数据存储通过数据更新捕获,实时触发后续对数据的自定义处理逻辑与大数据平台实时数据同步,分析结果写入结果表,供应用层实时查询 基于 Tablestore 的爬虫大数据架构 总结 随着处理数据量,处理实时性的变化,物理硬件处理能力的升级,大数据架构演进是非常迅速的。随着业务数据逐步上云后,大数据处理平台也必然会迁移至云上,相比 TP 类业务,AP 类场景的业务峰值,处理数据带宽可能会更大。我们希望合理的云上大数据架构可以借助存储计算分离,在可控的成本下提供更弹性的数据处理能力,解决各业务在大促到来所遇到的挑战。也让大数据平台的研发人员彻底从基础运维中释放出来,可以全身心的投入满足业务对数据分析平台的需求。如果有兴趣的同学欢迎与我们交流探讨(钉钉交流群:23307953)。 
背景 Spark 中国社区联合阿里云 EMR 技术交流群,Tablestore 技术交流群举办了一场联合技术直播。直播的话题是“海量结构化数据的实时计算和处理”,主要介绍基于 Tablestore 的数据变更实时捕获订阅能力,实现云上Lambda 架构的轻量化实现。在直播中有一个demo环节,本篇文章会提供demo环节的简单操作步骤,方便大家后续在阿里云上搭建和demo场景类似的一整套架构,实现数据的实时和离线处理。 演示场景介绍 演示模拟了一个电商订单场景,通过流计算实现订单大屏的场景,做到海量订单实时注入的同时,进行10s的订单统计聚合以及交易金额统计并做实时的大屏幕展示。整个订单的大屏幕样例如下: 大屏我们使用阿里云的 DATAV 对接 Tablestore数据源来实现,那么下面我们就具体看看从订单的原始数据到结果大屏数据的产生过程以及操作步骤。 整套后台的架构大体如下: 在ecs,或者本地模拟一个订单生成器,实时的注入订单数据到 Tablestore 中。 在 Tablestore 控制台创建通道 在 EMR 控制台购买 Spark 集群 下载最新的 EMR SDK 执行下面提供的建表语句和SQL命令实现实时计算,结果表会写回 Tablestore中。 通过 DATAV 进行实时大屏展示结果表数据 操作步骤一:登陆阿里云官网 Tablestore 控制台进行实例和表创建 创建实例后,可以创建一张表,表主键schema如下: 启动客户端注入程序随机写入数据,样例数据如下: Tablestore 产品是 Serverless的形态,用户使用无需购买大小或者规格,产品回根据业务做自动水平扩展。 操作步骤二:登陆阿里云官网 EMR 控制台购买Spark集群 Spark的集群规模可以根据业务需求灵活选取,我们实测三节点,可以轻松的实时消费100w/s的数据做聚合计算哟! 操作步骤三:登陆EMR集群执行作业脚本 登陆EMR的master节点,执行下面命令启动流任务: 1.启动stream sql交互在EMR 官网获取最新版本EMR sdk(1.8)streaming-sql --driver-class-path emr-datasources_shaded_2.11-1.8.0.jar --jars emr-datasources_shaded_2.11-1.8.0.jar --master yarn-client --num-executors 8 --executor-memory 2g --executor-cores 2 2.创建streaming source 表DROP TABLE IF EXISTS ots_order_test;CREATE TABLE ots_order_testUSING tablestoreOPTIONS(endpoint="填写Tablestore VPC的地址",access.key.id="",access.key.secret="",instance.name="",table.name="",tunnel.id="在Tablestore控制台查找对应想消费通道ID",catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"}, "OrderId": {"col": "OrderId", "type": "string"},"price": {"cols": "price", "type": "long"}, "timestamp": {"cols": "timestamp", "type": "long"}}}'); 3.创建streaming sink表DROP TABLE IF EXISTS ots_order_sink_test;CREATE TABLE ots_order_sink_testUSING tablestoreOPTIONS(endpoint="",access.key.id="",access.key.secret="",instance.name="",table.name="",tunnel.id="",catalog='{"columns": {"begin": {"col": "begin", "type": "string"},"end": {"col": "end", "type": "string"}, "count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "long"}}}'); 4.创建Streaming作业CREATE SCAN ots_table_stream on ots_order_test USING STREAM OPTIONS ("maxoffsetsperchannel"="10000");CREATE STREAM job1options(checkpointLocation='/tmp/spark/cp/test1',outputMode='update')insert into ots_order_sink_testSELECT CAST(window.start AS String) AS begin, CAST(window.end AS String) AS end, count(*) AS count, sum(price) AS totalPrice FROM ots_table_stream GROUP BY window(to_timestamp(timestamp / 1000000000), "10 seconds"); 最后实验有任何问题,或者希望做技术交流的同学欢迎加入我们的技术交流群(钉钉:23307953 或者11789671),来与我们一起探讨。
前言 任何线上系统都离不开数据,有些数据是业务系统自身需要的,例如系统的账号,密码,页面展示的内容等。有些数据是业务系统或者用户实时产生的,例如业务系统的日志,用户浏览访问的记录,系统的购买订单,支付信息,会员的个人资料等。大多数企业对内,对外有很多这样的线上系统,这些数据是驱动业务发展,决策和创新最核心的东西。让这些数据更好的支撑线上系统的是数据库和数据分析平台。数据库主要承担的是线上系统的实时数据写入和根据预定好的需求进行查询,严格说就是数据库中的OLTP类型数据库。这类数据库中最为大家所熟知的就是Oracle和MySQL。业务系统对数据库的要求可能是多样的,近些年也由传统的关系型数据库融入了NoSQL数据库和NewSQL。业务系统中除了和业务直接相关的数据存储在数据库中并累积起来外,还有海量的系统监控数据,系统业务日志产生。如果我们希望这些数据可以更持久的存储并且做一些实时或者离线的分析,辅助我们的业务做决策,提供业务大盘和报表,很多公司会构建自己的数据分析平台。也就是时下『大数据』平台的由来。这类平台主要解决以下几个问题: 1. 丰富的数据源支持和数据格式延迟绑定 丰富的数据源是因为这样一个数据分析平台是汇总我们各类业务数据的地方,数据源可能来自各类数据库例如MySQL,MongoDB,日志源等等。这个平台需要能够方便各类数据源便捷的入库,例如通常大家会发现大数据架构中有一个Kafka,各类数据源会先进入Kafka,再由Kafka推送到大数据的存储系统中。这里Kafka就承担了解耦大数据平台的存储接口和上游数据源的作用。数据格式延时绑定是一个很重要的概念,TP类数据库往往需要根据业务需求预先定义Schema,也就是通常说的写入型Schema,数据在写入时即会做严格的数据字段类型检验。但是分析系统并不希望因为Schema约束或者限制的数据入库,通常会采用读取型Schema,也就是这里的延时绑定,数据在分析时才会根据数据类型做对应的处理。 2. 存储和计算弹性扩展 存储和计算弹性扩展是指大数据系统需要能支撑海量数据和保持高吞吐的读写。数据分析平台会汇总接纳各类线上系统中的各类数据,同时数据会随着时间进行累积。大数据分析平台能够支撑海量数据的存储是必须的,而且这个规模并不是预先定义好的,而是随着数据的累积弹性增加的,这里的存储量可能从TB级到PB级别,甚至数百PB。同时整套架构的计算能力也一样具备弹性,举个直观的例子,可能我们在TB级别做一次全量处理需要20分钟,是不是到了百PB级别,处理时间也翻了好几个数量级从而导致每天的分析结果不能及时产生,从而让大数据平台的价值大打折扣,限制了业务的飞速发展。 3. 大规模低成本 很多大数据平台设计之初未必会意识到成本,主要依据自身对开源方案的熟悉度,业务方对数据规模和分析实效性进行方案的选取。但当业务量真的起来后,不得不面临一个挑战就是大数据平台的成本问题。这里甚至会导致不得不进行平台的架构改造或者数据迁移。所以对于企业的大数据平台设计之初,我们就需要把整套架构的成本考虑进来。这对应的就是数据的分层存储和存储计算引擎的选取。时下云上的大数据平台往往最终会选择一个可扩展,低成本的存储平台落地最终的数据,例如阿里云上的OSS或者AWS的S3,这些存储平台本身也支持进一步的分层存储。这类存储之上的计算平台可以选取Elastic MapReduce方案。整套架构就组成了时下火热的『数据湖』方案。在线下用户可能会自建一个Hadoop集群,并使用HDFS来存储这些汇总的数据,进而构建自己的大数据数据仓库。 4. 在线业务和分析业务隔离 隔离是因为分析业务往往需要扫描较多的数据进行分析,这类大流量的扫描如果是发生在在线库,可能会影响线上服务的SLA。同时分析流量的访问模式和在线模式未必相同,在线库数据的存储分布和格式也未必适合分析系统。所以一般典型的大数据平台会有自己的一份存储,数据分布,格式和索引会面向分析需求而做相应的优化。例如典型的TP类引擎的存储格式往往是行存,分析的时候会转变成列存。 介绍到这里,希望引导大家来思考这样一个问题,不论是传统的数据仓库,还是云上的数据湖,我们最终还是希望可以有效的解决业务中数据存储和分析的问题。那究竟业务需求是什么,尤其是当我们希望分析数据源是数据库,日志监控这类结构化或者半结构化数据时,对大数据平台的需求是什么呢?我想这里大家可以先思考一下,后面我们会和大家一起看看时下一些主流的开源方案和云上的构建方案,然后再来总结下结构化大数据存储和分析的需求。 开源大数据存储分析平台架构 前面我们提及线上业务的实现离不开OLTP数据库的支持,来实现实时的数据读写。这一章我们一起看看,开源和云上一些主流的组合数据库和大数据分析平台的架构。 Hadoop大数据方案 方案一:Uber Hadoop大数据架构我们以Uber的一套大数据架构为例,图中展示了各类数据库通过Kafka推送到Hadoop集群中进行全量批计算,结果集合会再写入几类存储引擎中进行结果查询展示。在传统的Hadoop架构中,各类结构化数据例如日志数据通过采集管道进入Kafka,Spark 可以实时的消费Kafka的数据写入集群内的HDFS中。数据库例如RDS中的数据会使用Spark定期全量扫表同步到HDFS,通常周期是一天一次,在业务低峰期进行同步。这样使用HDFS存储汇总了用户的数据,对数据库数据而言其实是一个定期的snapshot。例如每天的凌晨会把行为日志与数据库中用户的信息进行联合的分析,产生当天的分析报告比如包含当天访问量汇总,用户的消费倾向等报表数据,给业务负责人决策使用。架构中之所以说RDS的数据是全量入库,主要原因是HDFS本身只是一个分布式文件存储,对Record级别的更新删除并不友好。所以为了简化这些数据库中的合并修改删除逻辑,在数据规模不大的情况下会选择全量扫描。当数据库数据较大时,例如Uber的架构中,基于HDFS开发了一套存储引擎来支持修改和删除。这套方案的特点是,分析时数据已经是静态,借助于Hadoop集群的高并发能力,可以较为容易的实现百TB到PB量级行为数据的离线计算和处理,同时数据大块的存储在HDFS上,综合存储成本也相对较低。美中不足的是数据是定期入库,数据计算的时效性通常是T+1。如果业务方有近实时推荐的需求,这时架构会从离线计算升级到『Lambda架构』。架构如下图: Lambda架构具体细节可以参考Lambda介绍。通过HDFS全量存储和Kafka存储增量来实现离线和实时两类计算需求。本质上HDFS存储的全量仍然是T+1式的。但是通过Kafka对接流计算弥补实时计算的需求。也就是多了一份存储和计算逻辑实现业务实时性的需求。不论是传统离线分析架构还是Lambda架构,结果集合可能仍然比较大,需要持久化在一个结构化存储系统中。此时的存储主要做为结果集合进行查询,例如实时大盘,报表,BI业务决策人员的即席查询等。所以主流的做法是把结果写入RDS然后同步至Elasticsearch或者直接写入Elasticsearch,这里主要希望借助于ES强大的全文检索和多字段组合查询能力。 分布式NoSQL数据库方案 方案二:基于分布式NoSQL数据库Hbase的大数据架构之前的架构我们不难发现,RDS在做批计算的时候需要同步至HDFS形成静态数据做批计算。这样的架构可能会遇到一个场景,全量数据很大,每天全量同步,时效性很差甚至如果资源不够会同步不完,如何优化这个问题呢?我们不难想到如果数据仓库本身就是一个数据库,直接支持CRUD操作,那岂不是不需要同步全量!甚至部分在线数据可以直接写入这个海量数据库中,没错业界很多开源方案会基于分布式的NoSQL数据库例如Hbase来打造这个架构。上图就是一个简单的实例。Hbase schema free以及支持实时的CRUD操作,大大简化了数据源数据的实时写入,同步问题。同时可以跨数据源打造大宽表,大宽表会大大降低计算时通过join构建完整数据的复杂度。同时Hbase组合Kafka也可以实现Lambda支持批和流两类需求。那这种架构是完美的么?可以完全替换方案一么?答案肯定不是,一方面Hbase为了支持好实时的数据写入,是采用了LSM存储引擎,新数据通过追加的方式入库,数据更新和合并依赖后台的合并优化减少读操作。这类支持数据引擎的数据读写成本是要高于直接读写HDFS静态文件。另一方面Hbase数据落盘的存储格式是按行进行组织,也就是我们通常说的行存储。行存储在数据的压缩和支持批量扫描计算上的能力远不如列存,方案一中的HDFS往往会选择Parquet或者Orc这类列存。所以当数据量增长到PB甚至数百PB时,全量使用Hbase存储进行批量分析,在性能和成本上有可能会遇到瓶颈。所以主流的Hbase方案也会结合方案一,使用HDFS加速Hbase的方式来存储各类结构化数据,从而来控制整套架构的成本和提升扩展能力。但这样的组合也同时带来一个问题,组件增多运维难度会加大。同时Hbase和HDFS中的数据数冷热分层,还是按照业务需求来划分。如果是分层场景,Hbase中的数据如何方便的流入HDFS,这些都是很实际的挑战。 数据库结合AP分析引擎方案 前面说的NoSQL方案本质上并没有解决数据结果集合的即席查询问题,Hbase本身可以支撑基于Rowkey查询,但是对于多字段的即席查询支持较为费力。一些高级玩家,大厂会基于Hbase对接Solr或者自己二次开发定制各类索引来加速查询,再对接Phoenix实现分布式的计算能力。这一套复杂的开发,多组件整合后本质上是希望赋予一个TP数据库AP的能力。这也自然的把我们的架构引入TP引擎结合AP引擎实现完整的分析架构。 方案三:基于ClickHouse的实时分析平台例如上图所示,通过构建一套基于ClickHouse分析引擎的集群,各类结构化数据同步到分析引擎后可以很便捷的进行交互分析。这套架构相比之前的架构看上去简化了一些步骤,主要原因是这类引擎自身提供了类似数据库的读写能力的同时也自带一套完善的分析引擎。业界主流的分布式AP引擎有很多,例如Druid,ClickHouse,Piont,Elasticsearch或者列存版本hbase--Kudu。这类系统也各有侧重,有擅长Append场景支持数据的预聚合再分析的例如Druid,也有以实现各类索引,通过索引的强大filter能力减少IO次数来加速分析的Elasticsearch,像Kudu直接是为了优化Hbase批量扫描能力同时保留了它的单行操作能力,把持久化的格式转成了列存。这些系统的共同点是数据都基于列存,部分引擎引入倒排索引,Bitmap索引等进一步加速查询。这套架构的好处是直接抛开了传统离线大数据架构,希望借助存储引擎本身良好的存储格式和计算下推的支持实现实时批量计算,实时展现计算结果。这套架构在GB到100TB级别,相比之前的架构有了很大的提升,此时实时计算甚至和批量离线计算的界限都变得模糊起来,TB级别的数据aggregation在秒到分钟级就可以响应,BI人员无需再像传统大数据架构下等待一个T+1的数据同步时延后再进行分钟级甚至小时级的离线计算才能拿到最终的结果,大幅加快了数据为商业带来价值的步伐。那这套架构会是结构化大数据处理的终结者么?当然短时间内看未必,原因是这套架构虽然具备良好的扩展能力,但是相比Hadoop方案离线处理百PB来说,在扩展能力,复杂计算场景和存储成本上还是相对弱一些。例如全索引的Elasticsearch,索引本身通常会带来三倍的存储空间膨胀,通常还需要依赖SSD这样的存储介质。其他方面这类架构会把计算需要的所有数据加载进内存做实时计算,很难支持两个大表的Join场景,如果有较重的计算逻辑也可能会影响计算的时效性。TB级以上级别数据的ETL场景也不是这类引擎所擅长的。 云上的数据湖Datalake方案 方案四:AWS 基于S3的数据湖方案AWS的这套数据湖方案可以理解为是传统Hadoop方案的云上落地和升级,同时借助于云原生存储引擎S3,在保留了自建HDFS集群的分布式存储可靠性和高吞吐能力外,借助于自身强大的管道能力例如Kinesis Firehose和Glue来实现各类数据快速便捷的入数据湖,进一步降低了传统方案的运维和存储成本。这套架构示例还对大数据平台的使用者做了区分和定义,针对不同的使用场景,数据的使用方式,分析复杂度和时效性也会有不同,这也和我们前面提到方案一和二互补是相同情况。当然这套数据湖方案本身并没有解决传统方案的所有痛点,例如如何保证数据湖中的数据质量做到数据入库原子性,或者如何高效支持数据更新和删除。 Delta Lake 云上希望通过数据湖概念的引入,把数据进行汇总和分析。同时借助于云上分布式存储的技术红利,在保证数据的可靠性前提下大幅降低汇总数据持久化存储的成本。同时这样一个集中式的存储也使得我们的大数据分析框架自然演进到了存储计算分离的架构。存储计算分离对分析领域的影响要远大于OLTP数据库,这个也很好理解,数据随着时间不断累积,而计算是根据业务需求弹性变化,谷歌三驾马车中的GFS也是为了解决这个问题。数据湖同时很好的满足了计算需要访问不同的数据源的需求。但是数据湖中的数据源毕竟有不同,有日志类数据,静态的非结构化数据,数据库的历史归档和在线库的实时数据等等。当我们的数据源是数据库这类动态数据时,数据湖面临了新的挑战,数据更新如何和原始的数据合并呢?当用户的账号删除,我们希望把数据湖中这个用户的数据全部清除,如何处理呢?如何在批量入库的同时保证数据一致性呢。Spark商业化公司Databricks近期提出了基于数据湖之上的新方案『Delta Lake』。Delta Lake本身的存储介质还是各类数据湖,例如自建HDFS或者S3,但是通过定义新的格式,使用列存来存base数据,行的格式存储新增delta数据,进而做到支持数据操作的ACID和CRUD。并且完全兼容Spark的大数据生态,从这个角度看Databricks希望引入Delta Lake的理念,让传统Hadoop擅长分析静态文件进入分析动态数据库源的数据,离线的数据湖逐步演进到实时数据湖。也就是方案二和三想解决的问题。 介绍了这些结构化数据平台的架构后,我们再来做一下总结,其实每套架构都有自己擅长的方案和能力: 适合场景 数据规模 存储格式 数据导入模式 成本 计算方式 方案运维复杂度 数据变更性 传统Hadoop 海量数据离线处理Append为主的场景 大 列存 批量离线 低 MapReduce 较高 不可更新静态文件 分布式NoSQL数据库 海量数据,支持实时CRUD批量离线处理,可以部分做方案一的结果存储集 中上 行存 实时在线 中 MapReduce 中 可更新 分布式分析型数据库 实时/近实时入库,即席查询分析,经常做为方案一的结果存储集 中 行列混合 实时/近实时 高 MPP 中 可更新 数据湖/DeltaLake 海量数据离线处理,实时流计算具备ACID和CRUD能力 大 行列混合 批量离线/近实时 低 MapReduce 中 可更新 通过上面对比我们不难看出,每套方案都有自己擅长和不足的地方。各方案的计算模式或者计算引擎甚至可以是一个,例如Spark,但是它们的场景和效率确相差很大,原因是什么呢?区别在于存储引擎。这里我们不难看出大数据的架构抛开计算引擎本身的性能外,比拼的根本其实是存储引擎,现在我们可以总结一下大数据分析平台的需求是什么:在线和分析库的隔离,数据平台需要具备自己的存储引擎,不依赖于在线库的数据,避免对线上库产生影响。有灵活的schema支持,数据可以在这里进行打宽合并,支持数据的CRUD,全量数据支持高效批量计算,分析结果集可以支持即席查询,实时写入支持实时流计算。 综上所述,架构的区别源自于存储引擎,那是否有一些解决方案可以融合上面的各类存储引擎的优点,进一步整合出一套更加全面,可以胜任各类业务场景,也能平衡存储成本的方案呢? 下面我们就来一起看看构建在阿里云上的一款云原生结构化大数据存储引擎:Tablestore如何解决这些场景和需求。 Tablestore的存储分析架构 Tablestore是阿里云自研的结构化大数据存储产品,具体产品介绍可以参考官网以及权威指南。Tablestore的设计理念很大程度上顾及了数据系统内对结构化大数据存储的需求,并且基于派生数据体系这个设计理念专门设计和实现了一些特色的功能,也通过派生数据能力打通融合了各类存储引擎。Tablestore的基本设计理念可以参考这篇文章的剖析。 大数据设计理念 存储计算分离架构:采用存储计算分离架构,底层基于飞天盘古分布式文件系统,这是实现存储计算成本分离的基础。 CDC技术:CDC即数据变更捕获,Tablestore的CDC技术名为Tunnel Service,支持全量和增量的实时数据订阅,并且能无缝对接Flink流计算引擎来实现表内数据的实时流计算。基于CDC技术可以很便捷的打通Tablestore内的各类引擎以及云上的其他存储引擎。 多存储引擎支持:理想的数据平台希望可以拥有数据库类的行存,列存引擎,倒排引擎,也能支持数据湖方案下的HDFS或者DeltaLake,热数据采用数据库的存储引擎,冷全量数据采用更低成本数据湖方案。整套数据的热到冷可以做到全托管,根据业务场景定制数据在各引擎的生命周期。Tablestore上游基于Free Schema的行存,下游通过CDC技术派生支持列存,倒排索引,空间索引,二级索引以及云上DeltaLake和OSS,实现同时具备上述四套开源架构方案的能力。 数据最终的落地归档必然是数据湖OSS:这里很好理解,当我们的热数据随着时间推移变成冷数据,数据必然会逐渐归档进入OSS,甚至是归档OSS存储中。这样可以让我们的PB级别数据实现最低成本的高可用存储。同时面对极为偶尔的全量分析场景,也可以以一个相对稳定高效的速率吞吐出想要的文件。所以在Tablestore平台上的大数据最终我们会推荐归档进入OSS。 说了这些理念基于Tablestore我们可以较为轻松的构建下面四套架构,具体的架构选型可以结合业务场景,同时可以很方便的做到动态方案切换: 附加值较高的数据,希望具备高并发点查询,即席查询分析能力(9月已发布): 组合Tablestore的宽表,Tablestore Tunnel的CDC技术,索引分析引擎,这套架构类似方案2和3的融合,在具备宽表合并高吞吐低成本存储的同时,可以提供TB级别数据即席查询和分析的能力。这套架构的最大优势就是无需过度依赖额外的计算引擎,实现高效实时分析能力。 Tablestore 分析引擎方案 海量数据,非高频率更新的数据,拥有云上EMR集群(即将支持敬请期待): 组合Tablestore的宽表,Tablestore Tunnel的数据派生CDC技术,Spark Streaming和DeltaLake,构建类似开源方案1或者4的架构。通过CDC技术,EMR集群中的Spark Streaming实时订阅Tablestore Tunnel中的增量数据写入EMR集群中的DeltaLake,借助于DeltaLake对数据CRUD的合并能力,实现数据湖支持数据修改和删除。借助于Spark集群的分析能力进行高吞吐的批量计算。 Tablestore DeltaLake 方案 海量数据,更新较少的数据,有明显分区维度属性的数据(例如可用属性中的时间戳做数据分层): 组合Tablestore的宽表,Tablestore Tunnel的CDC技术,OSS和DLA,低成本全托管的构建方案1的架构。数据实时写入Tablestore,通过CDC技术,Tablestore会全托管的把数据定期或者同步的推送到OSS中,OSS中的数据可以借助于Spark来实现高吞吐的批量计算处理。这套方案的最大优势是存储和运维的成本都相对较低。 Table数据湖方案 全引擎融合方案: 组合Tablestore的宽表,CDC技术,多元分析引擎,同时冷数据自动归档DeltaLake/OSS。这套架构热数据实现宽表合并,秒级别即席查询和分析能力,冷数据提供离线高吞吐批量计算能力。这样的架构可以在冷热数据的存储成本和计算延时上有一个很好的平衡。 Tablestore大数据架构 总结一下,基于Tablestore的大数据架构,数据写入都是Tablestore的宽表行存引擎,通过统一写来简化整个写入链路的一致性和写入逻辑,降低写入延时。大数据的分析查询的需求是多样化的,通过数据派生驱动打通不同引擎,业务可以根据需求灵活组合派生引擎是势不可挡的趋势。同时强调数据的冷热分层,让热数据尽可能的具备最丰富的查询和分析能力,冷数据在不失基本批量计算能力的同时尽可能的减少存储成本和运维成本。这里说的大数据架构主要说批计算和交互分析这部分,如果是实时流计算需求,可以参考我们的云上Lambda Plus架构。存储引擎方面Tablestore,基于分布式NoSQL数据库也就是行存做为主存储,利用数据派生CDC技术整合了分布式分析型数据库支持列存和倒排,并结合Spark生态打造Delta Lake以及基于OSS数据湖。在计算查询方面,Tablestore自身通过多维分析引擎或者DLA支持MPP,借助于Spark实现传统MapReduce大数据分析。未来我们也会规划在查询侧打通计算引擎的读取,可以做到基于查询语句选取最佳的计算引擎,例如点查命中主键索引则请求访问行存,批量load分析热数据则访问数据库列存,复杂字段组合查询和分析访问数据库列存和倒排,历史数据定期大批量扫描走DeltaLake或者OSS。我们相信一套可以基于CDC技术统一读写的融合存储引擎会成为未来云上大数据方案的发展趋势。 总结和展望 本篇文章我们谈了典型的开源结构化大数据架构,并重点分析了各套架构的特点。通过总结和沉淀现有的分析架构,我们引出云上结构化存储平台Tablestore在大数据分析方面具备和即将支持的能力。希望通过这套CDC驱动的大数据平台可以把TP类数据和各类AP需求做到最好的全托管融合,整套Serverless的架构让我们的计算和存储资源可以得到充分利用,让数据驱动业务发展走的更远。如果对基于Tablestore的大数据存储分析架构感兴趣的朋友可以加入我们的技术交流群(钉钉:23307953 或者11789671),来与我们一起探讨。
前言 任何线上系统都离不开数据,有些数据是业务系统自身需要的,例如系统的账号,密码,页面展示的内容等。有些数据是业务系统或者用户实时产生的,例如业务系统的日志,用户浏览访问的记录,系统的购买订单,支付信息,会员的个人资料等。大多数企业对内,对外有很多这样的线上系统,这些数据是驱动业务发展,决策和创新最核心的东西。让这些数据更好的支撑线上系统的是数据库和数据分析平台。数据库主要承担的是线上系统的实时数据写入和根据预定好的需求进行查询,严格说就是数据库中的OLTP类型数据库。这类数据库中最为大家所熟知的就是Oracle和MySQL。业务系统对数据库的要求可能是多样的,近些年也由传统的关系型数据库融入了NoSQL数据库和NewSQL。业务系统中除了和业务直接相关的数据存储在数据库中并累积起来外,还有海量的系统监控数据,系统业务日志产生。如果我们希望这些数据可以更持久的存储并且做一些实时或者离线的分析,辅助我们的业务做决策,提供业务大盘和报表,很多公司会构建自己的数据分析平台。也就是时下『大数据』平台的由来。这类平台主要解决以下几个问题: 1. 丰富的数据源支持和数据格式延迟绑定 丰富的数据源是因为这样一个数据分析平台是汇总我们各类业务数据的地方,数据源可能来自各类数据库例如MySQL,MongoDB,日志源等等。这个平台需要能够方便各类数据源便捷的入库,例如通常大家会发现大数据架构中有一个Kafka,各类数据源会先进入Kafka,再由Kafka推送到大数据的存储系统中。这里Kafka就承担了解耦大数据平台的存储接口和上游数据源的作用。数据格式延时绑定是一个很重要的概念,TP类数据库往往需要根据业务需求预先定义Schema,也就是通常说的写入型Schema,数据在写入时即会做严格的数据字段类型检验。但是分析系统并不希望因为Schema约束或者限制的数据入库,通常会采用读取型Schema,也就是这里的延时绑定,数据在分析时才会根据数据类型做对应的处理。 2. 存储和计算弹性扩展 存储和计算弹性扩展是指大数据系统需要能支撑海量数据和保持高吞吐的读写。数据分析平台会汇总接纳各类线上系统中的各类数据,同时数据会随着时间进行累积。大数据分析平台能够支撑海量数据的存储是必须的,而且这个规模并不是预先定义好的,而是随着数据的累积弹性增加的,这里的存储量可能从TB级到PB级别,甚至数百PB。同时整套架构的计算能力也一样具备弹性,举个直观的例子,可能我们在TB级别做一次全量处理需要20分钟,是不是到了百PB级别,处理时间也翻了好几个数量级从而导致每天的分析结果不能及时产生,从而让大数据平台的价值大打折扣,限制了业务的飞速发展。 3. 大规模低成本 很多大数据平台设计之初未必会意识到成本,主要依据自身对开源方案的熟悉度,业务方对数据规模和分析实效性进行方案的选取。但当业务量真的起来后,不得不面临一个挑战就是大数据平台的成本问题。这里甚至会导致不得不进行平台的架构改造或者数据迁移。所以对于企业的大数据平台设计之初,我们就需要把整套架构的成本考虑进来。这对应的就是数据的分层存储和存储计算引擎的选取。时下云上的大数据平台往往最终会选择一个可扩展,低成本的存储平台落地最终的数据,例如阿里云上的OSS或者AWS的S3,这些存储平台本身也支持进一步的分层存储。这类存储之上的计算平台可以选取Elastic MapReduce方案。整套架构就组成了时下火热的『数据湖』方案。在线下用户可能会自建一个Hadoop集群,并使用HDFS来存储这些汇总的数据,进而构建自己的大数据数据仓库。 4. 在线业务和分析业务隔离 隔离是因为分析业务往往需要扫描较多的数据进行分析,这类大流量的扫描如果是发生在在线库,可能会影响线上服务的SLA。同时分析流量的访问模式和在线模式未必相同,在线库数据的存储分布和格式也未必适合分析系统。所以一般典型的大数据平台会有自己的一份存储,数据分布,格式和索引会面向分析需求而做相应的优化。例如典型的TP类引擎的存储格式往往是行存,分析的时候会转变成列存。 介绍到这里,希望引导大家来思考这样一个问题,不论是传统的数据仓库,还是云上的数据湖,我们最终还是希望可以有效的解决业务中数据存储和分析的问题。那究竟业务需求是什么,尤其是当我们希望分析数据源是数据库,日志监控这类结构化或者半结构化数据时,对大数据平台的需求是什么呢?我想这里大家可以先思考一下,后面我们会和大家一起看看时下一些主流的开源方案和云上的构建方案,然后再来总结下结构化大数据存储和分析的需求。 开源大数据存储分析平台架构 前面我们提及线上业务的实现离不开OLTP数据库的支持,来实现实时的数据读写。这一章我们一起看看,开源和云上一些主流的组合数据库和大数据分析平台的架构。 Hadoop大数据方案 方案一:Uber Hadoop大数据架构我们以Uber的一套大数据架构为例,图中展示了各类数据库通过Kafka推送到Hadoop集群中进行全量批计算,结果集合会再写入几类存储引擎中进行结果查询展示。在传统的Hadoop架构中,各类结构化数据例如日志数据通过采集管道进入Kafka,Spark 可以实时的消费Kafka的数据写入集群内的HDFS中。数据库例如RDS中的数据会使用Spark定期全量扫表同步到HDFS,通常周期是一天一次,在业务低峰期进行同步。这样使用HDFS存储汇总了用户的数据,对数据库数据而言其实是一个定期的snapshot。例如每天的凌晨会把行为日志与数据库中用户的信息进行联合的分析,产生当天的分析报告比如包含当天访问量汇总,用户的消费倾向等报表数据,给业务负责人决策使用。架构中之所以说RDS的数据是全量入库,主要原因是HDFS本身只是一个分布式文件存储,对Record级别的更新删除并不友好。所以为了简化这些数据库中的合并修改删除逻辑,在数据规模不大的情况下会选择全量扫描。当数据库数据较大时,例如Uber的架构中,基于HDFS开发了一套存储引擎来支持修改和删除。这套方案的特点是,分析时数据已经是静态,借助于Hadoop集群的高并发能力,可以较为容易的实现百TB到PB量级行为数据的离线计算和处理,同时数据大块的存储在HDFS上,综合存储成本也相对较低。美中不足的是数据是定期入库,数据计算的时效性通常是T+1。如果业务方有近实时推荐的需求,这时架构会从离线计算升级到『Lambda架构』。架构如下图:Lambda架构具体细节可以参考Lambda介绍。通过HDFS全量存储和Kafka存储增量来实现离线和实时两类计算需求。本质上HDFS存储的全量仍然是T+1式的。但是通过Kafka对接流计算弥补实时计算的需求。也就是多了一份存储和计算逻辑实现业务实时性的需求。不论是传统离线分析架构还是Lambda架构,结果集合可能仍然比较大,需要持久化在一个结构化存储系统中。此时的存储主要做为结果集合进行查询,例如实时大盘,报表,BI业务决策人员的即席查询等。所以主流的做法是把结果写入RDS然后同步至Elasticsearch或者直接写入Elasticsearch,这里主要希望借助于ES强大的全文检索和多字段组合查询能力。 分布式NoSQL数据库方案 方案二:基于分布式NoSQL数据库Hbase的大数据架构之前的架构我们不难发现,RDS在做批计算的时候需要同步至HDFS形成静态数据做批计算。这样的架构可能会遇到一个场景,全量数据很大,每天全量同步,时效性很差甚至如果资源不够会同步不完,如何优化这个问题呢?我们不难想到如果数据仓库本身就是一个数据库,直接支持CRUD操作,那岂不是不需要同步全量!甚至部分在线数据可以直接写入这个海量数据库中,没错业界很多开源方案会基于分布式的NoSQL数据库例如Hbase来打造这个架构。上图就是一个简单的实例。Hbase schema free以及支持实时的CRUD操作,大大简化了数据源数据的实时写入,同步问题。同时可以跨数据源打造大宽表,大宽表会大大降低计算时通过join构建完整数据的复杂度。同时Hbase组合Kafka也可以实现Lambda支持批和流两类需求。那这种架构是完美的么?可以完全替换方案一么?答案肯定不是,一方面Hbase为了支持好实时的数据写入,是采用了LSM存储引擎,新数据通过追加的方式入库,数据更新和合并依赖后台的合并优化减少读操作。这类支持数据引擎的数据读写成本是要高于直接读写HDFS静态文件。另一方面Hbase数据落盘的存储格式是按行进行组织,也就是我们通常说的行存储。行存储在数据的压缩和支持批量扫描计算上的能力远不如列存,方案一中的HDFS往往会选择Parquet或者Orc这类列存。所以当数据量增长到PB甚至数百PB时,全量使用Hbase存储进行批量分析,在性能和成本上有可能会遇到瓶颈。所以主流的Hbase方案也会结合方案一,使用HDFS加速Hbase的方式来存储各类结构化数据,从而来控制整套架构的成本和提升扩展能力。但这样的组合也同时带来一个问题,组件增多运维难度会加大。同时Hbase和HDFS中的数据数冷热分层,还是按照业务需求来划分。如果是分层场景,Hbase中的数据如何方便的流入HDFS,这些都是很实际的挑战。 数据库结合AP分析引擎方案 前面说的NoSQL方案本质上并没有解决数据结果集合的即席查询问题,Hbase本身可以支撑基于Rowkey查询,但是对于多字段的即席查询支持较为费力。一些高级玩家,大厂会基于Hbase对接Solr或者自己二次开发定制各类索引来加速查询,再对接Phoenix实现分布式的计算能力。这一套复杂的开发,多组件整合后本质上是希望赋予一个TP数据库AP的能力。这也自然的把我们的架构引入TP引擎结合AP引擎实现完整的分析架构。方案三:基于ClickHouse的实时分析平台例如上图所示,通过构建一套基于ClickHouse分析引擎的集群,各类结构化数据同步到分析引擎后可以很便捷的进行交互分析。这套架构相比之前的架构看上去简化了一些步骤,主要原因是这类引擎自身提供了类似数据库的读写能力的同时也自带一套完善的分析引擎。业界主流的分布式AP引擎有很多,例如Druid,ClickHouse,Piont,Elasticsearch或者列存版本hbase--Kudu。这类系统也各有侧重,有擅长Append场景支持数据的预聚合再分析的例如Druid,也有以实现各类索引,通过索引的强大filter能力减少IO次数来加速分析的Elasticsearch,像Kudu直接是为了优化Hbase批量扫描能力同时保留了它的单行操作能力,把持久化的格式转成了列存。这些系统的共同点是数据都基于列存,部分引擎引入倒排索引,Bitmap索引等进一步加速查询。这套架构的好处是直接抛开了传统离线大数据架构,希望借助存储引擎本身良好的存储格式和计算下推的支持实现实时批量计算,实时展现计算结果。这套架构在GB到100TB级别,相比之前的架构有了很大的提升,此时实时计算甚至和批量离线计算的界限都变得模糊起来,TB级别的数据aggregation在秒到分钟级就可以响应,BI人员无需再像传统大数据架构下等待一个T+1的数据同步时延后再进行分钟级甚至小时级的离线计算才能拿到最终的结果,大幅加快了数据为商业带来价值的步伐。那这套架构会是结构化大数据处理的终结者么?当然短时间内看未必,原因是这套架构虽然具备良好的扩展能力,但是相比Hadoop方案离线处理百PB来说,在扩展能力,复杂计算场景和存储成本上还是相对弱一些。例如全索引的Elasticsearch,索引本身通常会带来三倍的存储空间膨胀,通常还需要依赖SSD这样的存储介质。其他方面这类架构会把计算需要的所有数据加载进内存做实时计算,很难支持两个大表的Join场景,如果有较重的计算逻辑也可能会影响计算的时效性。TB级以上级别数据的ETL场景也不是这类引擎所擅长的。 云上的数据湖Datalake方案 方案四:AWS 基于S3的数据湖方案AWS的这套数据湖方案可以理解为是传统Hadoop方案的云上落地和升级,同时借助于云原生存储引擎S3,在保留了自建HDFS集群的分布式存储可靠性和高吞吐能力外,借助于自身强大的管道能力例如Kinesis Firehose和Glue来实现各类数据快速便捷的入数据湖,进一步降低了传统方案的运维和存储成本。这套架构示例还对大数据平台的使用者做了区分和定义,针对不同的使用场景,数据的使用方式,分析复杂度和时效性也会有不同,这也和我们前面提到方案一和二互补是相同情况。当然这套数据湖方案本身并没有解决传统方案的所有痛点,例如如何保证数据湖中的数据质量做到数据入库原子性,或者如何高效支持数据更新和删除。 Delta Lake 云上希望通过数据湖概念的引入,把数据进行汇总和分析。同时借助于云上分布式存储的技术红利,在保证数据的可靠性前提下大幅降低汇总数据持久化存储的成本。同时这样一个集中式的存储也使得我们的大数据分析框架自然演进到了存储计算分离的架构。存储计算分离对分析领域的影响要远大于OLTP数据库,这个也很好理解,数据随着时间不断累积,而计算是根据业务需求弹性变化,谷歌三驾马车中的GFS也是为了解决这个问题。数据湖同时很好的满足了计算需要访问不同的数据源的需求。但是数据湖中的数据源毕竟有不同,有日志类数据,静态的非结构化数据,数据库的历史归档和在线库的实时数据等等。当我们的数据源是数据库这类动态数据时,数据湖面临了新的挑战,数据更新如何和原始的数据合并呢?当用户的账号删除,我们希望把数据湖中这个用户的数据全部清除,如何处理呢?如何在批量入库的同时保证数据一致性呢。Spark商业化公司Databricks近期提出了基于数据湖之上的新方案『Delta Lake』。Delta Lake本身的存储介质还是各类数据湖,例如自建HDFS或者S3,但是通过定义新的格式,使用列存来存base数据,行的格式存储新增delta数据,进而做到支持数据操作的ACID和CRUD。并且完全兼容Spark的大数据生态,从这个角度看Databricks希望引入Delta Lake的理念,让传统Hadoop擅长分析静态文件进入分析动态数据库源的数据,离线的数据湖逐步演进到实时数据湖。也就是方案二和三想解决的问题。 介绍了这些结构化数据平台的架构后,我们再来做一下总结,其实每套架构都有自己擅长的方案和能力: 适合场景 数据规模 存储格式 数据导入模式 成本 计算方式 方案运维复杂度 数据变更性 传统Hadoop 海量数据离线处理Append为主的场景 大 列存 批量离线 低 MapReduce 较高 不可更新静态文件 分布式NoSQL数据库 海量数据,支持实时CRUD批量离线处理,可以部分做方案一的结果存储集 中上 行存 实时在线 中 MapReduce 中 可更新 分布式分析型数据库 实时/近实时入库,即席查询分析,经常做为方案一的结果存储集 中 行列混合 实时/近实时 高 MPP 中 可更新 数据湖/DeltaLake 海量数据离线处理,实时流计算具备ACID和CRUD能力 大 行列混合 批量离线/近实时 低 MapReduce 中 可更新 通过上面对比我们不难看出,每套方案都有自己擅长和不足的地方。各方案的计算模式或者计算引擎甚至可以是一个,例如Spark,但是它们的场景和效率确相差很大,原因是什么呢?区别在于存储引擎。这里我们不难看出大数据的架构抛开计算引擎本身的性能外,比拼的根本其实是存储引擎,现在我们可以总结一下大数据分析平台的需求是什么:在线和分析库的隔离,数据平台需要具备自己的存储引擎,不依赖于在线库的数据,避免对线上库产生影响。有灵活的schema支持,数据可以在这里进行打宽合并,支持数据的CRUD,全量数据支持高效批量计算,分析结果集可以支持即席查询,实时写入支持实时流计算。 综上所述,架构的区别源自于存储引擎,那是否有一些解决方案可以融合上面的各类存储引擎的优点,进一步整合出一套更加全面,可以胜任各类业务场景,也能平衡存储成本的方案呢? 下面我们就来一起看看构建在阿里云上的一款云原生结构化大数据存储引擎:Tablestore如何解决这些场景和需求。 Tablestore的存储分析架构 Tablestore是阿里云自研的结构化大数据存储产品,具体产品介绍可以参考官网以及权威指南。Tablestore的设计理念很大程度上顾及了数据系统内对结构化大数据存储的需求,并且基于派生数据体系这个设计理念专门设计和实现了一些特色的功能,也通过派生数据能力打通融合了各类存储引擎。Tablestore的基本设计理念可以参考这篇文章的剖析。 大数据设计理念 存储计算分离架构:采用存储计算分离架构,底层基于飞天盘古分布式文件系统,这是实现存储计算成本分离的基础。 CDC技术:CDC即数据变更捕获,Tablestore的CDC技术名为Tunnel Service,支持全量和增量的实时数据订阅,并且能无缝对接Flink流计算引擎来实现表内数据的实时流计算。基于CDC技术可以很便捷的打通Tablestore内的各类引擎以及云上的其他存储引擎。 多存储引擎支持:理想的数据平台希望可以拥有数据库类的行存,列存引擎,倒排引擎,也能支持数据湖方案下的HDFS或者DeltaLake,热数据采用数据库的存储引擎,冷全量数据采用更低成本数据湖方案。整套数据的热到冷可以做到全托管,根据业务场景定制数据在各引擎的生命周期。Tablestore上游基于Free Schema的行存,下游通过CDC技术派生支持列存,倒排索引,空间索引,二级索引以及云上DeltaLake和OSS,实现同时具备上述四套开源架构方案的能力。 数据最终的落地归档必然是数据湖OSS:这里很好理解,当我们的热数据随着时间推移变成冷数据,数据必然会逐渐归档进入OSS,甚至是归档OSS存储中。这样可以让我们的PB级别数据实现最低成本的高可用存储。同时面对极为偶尔的全量分析场景,也可以以一个相对稳定高效的速率吞吐出想要的文件。所以在Tablestore平台上的大数据最终我们会推荐归档进入OSS。 说了这些理念基于Tablestore我们可以较为轻松的构建下面四套架构,具体的架构选型可以结合业务场景,同时可以很方便的做到动态方案切换: 附加值较高的数据,希望具备高并发点查询,即席查询分析能力(9月已发布): 组合Tablestore的宽表,Tablestore Tunnel的CDC技术,索引分析引擎,这套架构类似方案2和3的融合,在具备宽表合并高吞吐低成本存储的同时,可以提供TB级别数据即席查询和分析的能力。这套架构的最大优势就是无需过度依赖额外的计算引擎,实现高效实时分析能力。Tablestore 分析引擎方案海量数据,非高频率更新的数据,拥有云上EMR集群(即将支持敬请期待): 组合Tablestore的宽表,Tablestore Tunnel的数据派生CDC技术,Spark Streaming和DeltaLake,构建类似开源方案1或者4的架构。通过CDC技术,EMR集群中的Spark Streaming实时订阅Tablestore Tunnel中的增量数据写入EMR集群中的DeltaLake,借助于DeltaLake对数据CRUD的合并能力,实现数据湖支持数据修改和删除。借助于Spark集群的分析能力进行高吞吐的批量计算。Tablestore DeltaLake 方案海量数据,更新较少的数据,有明显分区维度属性的数据(例如可用属性中的时间戳做数据分层): 组合Tablestore的宽表,Tablestore Tunnel的CDC技术,OSS和DLA,低成本全托管的构建方案1的架构。数据实时写入Tablestore,通过CDC技术,Tablestore会全托管的把数据定期或者同步的推送到OSS中,OSS中的数据可以借助于Spark来实现高吞吐的批量计算处理。这套方案的最大优势是存储和运维的成本都相对较低。Table数据湖方案全引擎融合方案: 组合Tablestore的宽表,CDC技术,多元分析引擎,同时冷数据自动归档DeltaLake/OSS。这套架构热数据实现宽表合并,秒级别即席查询和分析能力,冷数据提供离线高吞吐批量计算能力。这样的架构可以在冷热数据的存储成本和计算延时上有一个很好的平衡。Tablestore大数据架构总结一下,基于Tablestore的大数据架构,数据写入都是Tablestore的宽表行存引擎,通过统一写来简化整个写入链路的一致性和写入逻辑,降低写入延时。大数据的分析查询的需求是多样化的,通过数据派生驱动打通不同引擎,业务可以根据需求灵活组合派生引擎是势不可挡的趋势。同时强调数据的冷热分层,让热数据尽可能的具备最丰富的查询和分析能力,冷数据在不失基本批量计算能力的同时尽可能的减少存储成本和运维成本。这里说的大数据架构主要说批计算和交互分析这部分,如果是实时流计算需求,可以参考我们的云上Lambda Plus架构。存储引擎方面Tablestore,基于分布式NoSQL数据库也就是行存做为主存储,利用数据派生CDC技术整合了分布式分析型数据库支持列存和倒排,并结合Spark生态打造Delta Lake以及基于OSS数据湖。在计算查询方面,Tablestore自身通过多维分析引擎或者DLA支持MPP,借助于Spark实现传统MapReduce大数据分析。未来我们也会规划在查询侧打通计算引擎的读取,可以做到基于查询语句选取最佳的计算引擎,例如点查命中主键索引则请求访问行存,批量load分析热数据则访问数据库列存,复杂字段组合查询和分析访问数据库列存和倒排,历史数据定期大批量扫描走DeltaLake或者OSS。我们相信一套可以基于CDC技术统一读写的融合存储引擎会成为未来云上大数据方案的发展趋势。 总结和展望 本篇文章我们谈了典型的开源结构化大数据架构,并重点分析了各套架构的特点。通过总结和沉淀现有的分析架构,我们引出云上结构化存储平台Tablestore在大数据分析方面具备和即将支持的能力。希望通过这套CDC驱动的大数据平台可以把TP类数据和各类AP需求做到最好的全托管融合,整套Serverless的架构让我们的计算和存储资源可以得到充分利用,让数据驱动业务发展走的更远。如果对基于Tablestore的大数据存储分析架构感兴趣的朋友可以加入我们的技术交流群(钉钉:23307953 或者11789671),来与我们一起探讨。
前言 表格存储是阿里云自研分布式存储系统,可以用来存储海量结构化、半结构化的数据。表格存储支持高性能和容量型两种实例类型。高性能使用SSD的存储介质,针对读多写多的场景都有较好的访问延时。容量型使用的是SSD和SATA混合的存储介质。对写多的场景,性能接近高性能,读方面,如果遇到冷数据产生读SATA盘的话,延时会比高性能上涨一个量级。在海量数据存储场景下,例如时序场景,我们会希望最新的数据可以支持高性能查询,较早的数据的读写频次都会低很多。这时候一个基于表格存储高性能和容量型存储分层的需求就产生了。 方案细节 表格存储近期对外正式发布的全增量一体的通道服务(参考文档),通道服务基于表格存储数据接口之上的全增量一体化服务。通道服务为用户提供了增量、全量、增量加全量三种类型的分布式数据实时消费通道。有了通道服务,我们可以很方便的构建从高性能实例下的表到容量型表之间的实时数据同步,进而可以在高性能表上使用表格存储的特性数据生命周期(参考文档),根据业务需求设置一个合理的TTL。总体来说就可以构建一个如下图所示的架构:整个数据的流动过程如下: 业务写入端直接写入高性能实例 高性能实例中的数据通过通道服务同步至容量型 高性能实例中的老数据自动过期,减少存储量占用 用户查询请求根据时序查询条件,判断是否是近期数据 近期数据查询进入高性能,毫秒级别返回 较早数据查询进入容量型,几十毫秒后返回 代码和操作流程: 在高性能实例上根据业务主键需求创建数据表,并设置合理的数据TTL,然后在容量型下创建相同的schema的表用来持久化存储所有数据。 然后在通道页面创建一个全增量类型的通道: 通过控制台可以简单清晰的查看到同步的状态,并发,进度等信息:下面贴一下通过Tunnel进行复制同样schema表TableStore表的Sample代码: func main () { //高性能实例的信息 tunnelClient := tunnel.NewTunnelClient("", "", "", "") //容量型实例的信息 client := tablestore.NewClient("", "", "", "") //配置callback到SimpleProcessFactory,配置消费端TunnelWorkerConfig workConfig := &tunnel.TunnelWorkerConfig{ ProcessorFactory: &tunnel.SimpleProcessFactory{ ProcessFunc: replicateDataFunc, CustomValue: client, }, } //使用TunnelDaemon持续消费指定tunnel daemon := tunnel.NewTunnelDaemon(tunnelClient, "", workConfig) err := daemon.Run() if err != nil { fmt.Println("failed to start tunnel daemon with error:", err) } } func replicateDataFunc(ctx *tunnel.ChannelContext, records []*tunnel.Record) error { client := ctx.CustomValue.(*tablestore.TableStoreClient) fmt.Println(client) for _, rec := range records { fmt.Println("tunnel record detail:", rec.String()) updateRowRequest := new(tablestore.UpdateRowRequest) updateRowRequest.UpdateRowChange = new(tablestore.UpdateRowChange) updateRowRequest.UpdateRowChange.TableName = "coldtable" updateRowRequest.UpdateRowChange.PrimaryKey = new(tablestore.PrimaryKey) updateRowRequest.UpdateRowChange.SetCondition(tablestore.RowExistenceExpectation_IGNORE) for _, pk := range rec.PrimaryKey.PrimaryKeys { updateRowRequest.UpdateRowChange.PrimaryKey.AddPrimaryKeyColumn(pk.ColumnName, pk.Value) } for _, col := range rec.Columns { if col.Type == tunnel.RCT_Put { updateRowRequest.UpdateRowChange.PutColumn(*col.Name, col.Value) } else if col.Type == tunnel.RCT_DeleteOneVersion { updateRowRequest.UpdateRowChange.DeleteColumnWithTimestamp(*col.Name, *col.Timestamp) } else { updateRowRequest.UpdateRowChange.DeleteColumn(*col.Name) } } _, err := client.UpdateRow(updateRowRequest) if err != nil { fmt.Println("hit error when put record to cold data", err) } } fmt.Println("a round of records consumption finished") return nil } 总结 通过通道服务,存储在表格存储中的结构化,半结构化数据可以实时流出,进行加工,萃取,计算或进行同步。如果是想进一步降低冷数据的存储成本,可以参考这篇文章把表格存储的数据备份到OSS归档存储。
CloudLab场景介绍 随着信息化的发展,企业每天会产生各式各样的结构化,半结构化的数据。如何高效低成本的存储和处理这些数据,如何充分发挥数据的价值是企业普遍面临的挑战。今天的CloudLab,会带大家手把手部署一个简易的消息系统,让大家体验一下,基于表格存储(TableStore)的Timeline模型可以快速开发一款企业内部的消息系统。同时本次云栖大会,表格存储(TableStore)会发布GlobalIndex和SearchIndex的功能,基于强大的检索赋能,表格存储(TableStore)可以在高效存储海量数据的同时提供便利的查询检索功能。下面就让我们一起动手体验下CloudLab的完整步骤吧。 CloudLab 环境准备 携带自己的开发笔记本,或者使用阿里云账号购买一台ECS,准备好java8环境。(备注:如果使用ECS的话打开8081端口) 创建ECS可以在控制台如下操作 Java8 可以在这里下载。 Mac Linux 64位 Windows 64位 在表格存储控制台创建实例。(如果使用ecs建议使用同region的表格存储实例和VPC地址访问) 输入自己的实例名,这里主要需要region唯一,建议选一个个性化的实例名避免和已有的实例冲突。我们的程序会自动创建表,这里可以不用手动建表,当然你也可以体验下在控制台自己创建数据表,并进行数据的读写操作。 查看阿里云账号的AK。 下载demo使用binary。 binary地址 http://danieldoc.oss-cn-hangzhou.aliyuncs.com/im-demo.jar 一键部署启动消息系统。 替换下面的实例信息为上面创建的实例和AK。 java -jar im-demo.jar \ --aliyun.tablestore.endpoint="xxxxx" \ --aliyun.tablestore.AccessKeyID="xxxxx" \ --aliyun.tablestore.AccessKeySecret="xxxxx" \ --aliyun.tablestore.instanceName="xxxxx" 通过网页。http://localhost:8081 或者 http://ecs公网地址:8081 访问我们的IM聊天页面。通过网页进行简单的收发消息后,我们可以回到表格存储的官网控制台来做一些数据的查询操作。 进入实例所在region的控制台,点击进入实例详情 进入实例中表的数据管理页面可以进行数据查询,根据Lab中内容的介绍我们的demo中会主要有两张数据表一张是im_demo_timeline_SyncTable,这张是做消息分发同步用,另一张是im_demo_timeline_StoreTable,这张是一张全量消息表。我们可以做一个数据查询看看Timeline模型下数据会如何组织,点击数据管理。 im_demo_timeline_StoreTable,我们可以看下主键有两列,第一列是timelineid,在这里就是一个人的发件箱,即他的所有发出去的消息。第二列是一个自增列,我们的客户端可以通过这个自增的id做到消息的不丢顺序拉取。因为新来的消息在TableStore服务端对应生成更大的id号。消息内容我们的模型是自由的用户可以根据需要加密存储在我们这里或者序列化存储或者直接存储消息的原文字符串。这里为了演示方便,我们使用了明文string存放。im_demo_timeline_SyncTable 也一样是timeline模型的表,只是这张表是消息写扩散后的表,每个timeline代表了一个用户的消息收件箱,同样使用自增列,客户端可以做到不丢顺序拉取新消息。 除了上面基础的数据查询功能,我们演示一个本次云栖大会我们新发布的SearchIndex功能,也就是通过索引的构建灵活的查询数据。具体新功能的介绍可以参考这里。 除了解决了灵活属性的检索,消息内容的全文检索以外,索引也可以帮助我们做一些快速的统计,例如消息数目的统计:以上这些功能已经在这次云栖正式发布,大家可以申请邀测。申请邀测后你就可以在自己的控制台体验完整的上面的功能啦。 如果你没有及时走通整个流程,可以使用下面的地址直接体验:http://101.132.45.75:8081/ 后记 如果你有兴趣可以在这里下载demo的源码 进一步了解如何基于表格存储开发消息系统。 当然一切表格存储的问题欢迎扫码加群,我们会有研发进行专家服务。
前言 Lucene 是一个基于 Java 的全文信息检索工具包,目前主流的搜索系统Elasticsearch和solr都是基于lucene的索引和搜索能力进行。想要理解搜索系统的实现原理,就需要深入lucene这一层,看看lucene是如何存储需要检索的数据,以及如何完成高效的数据检索。 在数据库中因为有索引的存在,也可以支持很多高效的查询操作。不过对比lucene,数据库的查询能力还是会弱很多,本文就将探索下lucene支持哪些查询,并会重点选取几类查询分析lucene内部是如何实现的。为了方便大家理解,我们会先简单介绍下lucene里面的一些基本概念,然后展开lucene中的几种数据存储结构,理解了他们的存储原理后就可以方便知道如何基于这些存储结构来实现高效的搜索。本文重点关注是lucene如何做到传统数据库较难做到的查询,对于分词,打分等功能不会展开介绍。 本文具体会分以下几部分: 介绍lucene的数据模型,细节可以参阅lucene数据模型一文。 介绍lucene中如何存储需要搜索的term。 介绍lucene的倒排链的如何存储以及如何实现docid的快速查找。 介绍lucene如何实现倒排链合并。 介绍lucene如何做范围查询和前缀匹配。 介绍lucene如何优化数值类范围查询。 Lucene数据模型 Lucene中包含了四种基本数据类型,分别是: Index:索引,由很多的Document组成。Document:由很多的Field组成,是Index和Search的最小单位。Field:由很多的Term组成,包括Field Name和Field Value。Term:由很多的字节组成。一般将Text类型的Field Value分词之后的每个最小单元叫做Term。 在lucene中,读写路径是分离的。写入的时候创建一个IndexWriter,而读的时候会创建一个IndexSearcher,下面是一个简单的代码示例,如何使用lucene的IndexWriter建索引以及如何使用indexSearch进行搜索查询。 Analyzer analyzer = new StandardAnalyzer(); // Store the index in memory: Directory directory = new RAMDirectory(); // To store an index on disk, use this instead: //Directory directory = FSDirectory.open("/tmp/testindex"); IndexWriterConfig config = new IndexWriterConfig(analyzer); IndexWriter iwriter = new IndexWriter(directory, config); Document doc = new Document(); String text = "This is the text to be indexed."; doc.add(new Field("fieldname", text, TextField.TYPE_STORED)); iwriter.addDocument(doc); iwriter.close(); // Now search the index: DirectoryReader ireader = DirectoryReader.open(directory); IndexSearcher isearcher = new IndexSearcher(ireader); // Parse a simple query that searches for "text": QueryParser parser = new QueryParser("fieldname", analyzer); Query query = parser.parse("text"); ScoreDoc[] hits = isearcher.search(query, 1000).scoreDocs; //assertEquals(1, hits.length); // Iterate through the results: for (int i = 0; i < hits.length; i++) { Document hitDoc = isearcher.doc(hits[i].doc); System.out.println(hitDoc.get("fieldname")); } ireader.close(); directory.close(); 从这个示例中可以看出,lucene的读写有各自的操作类。本文重点关注读逻辑,在使用IndexSearcher类的时候,需要一个DirectoryReader和QueryParser,其中DirectoryReader需要对应写入时候的Directory实现。QueryParser主要用来解析你的查询语句,例如你想查 “A and B",lucene内部会有机制解析出是term A和term B的交集查询。在具体执行Search的时候指定一个最大返回的文档数目,因为可能会有过多命中,我们可以限制单词返回的最大文档数,以及做分页返回。 下面会详细介绍一个索引查询会经过几步,每一步lucene分别做了哪些优化实现。 Lucene 查询过程 在lucene中查询是基于segment。每个segment可以看做是一个独立的subindex,在建立索引的过程中,lucene会不断的flush内存中的数据持久化形成新的segment。多个segment也会不断的被merge成一个大的segment,在老的segment还有查询在读取的时候,不会被删除,没有被读取且被merge的segement会被删除。这个过程类似于LSM数据库的merge过程。下面我们主要看在一个segment内部如何实现高效的查询。 为了方便大家理解,我们以人名字,年龄,学号为例,如何实现查某个名字(有重名)的列表。 docid name age id 1 Alice 18 101 2 Alice 20 102 3 Alice 21 103 4 Alan 21 104 5 Alan 18 105 在lucene中为了查询name=XXX的这样一个条件,会建立基于name的倒排链。以上面的数据为例,倒排链如下:姓名 Alice | [1,2,3]---- | --- | Alan | [4,5]如果我们还希望按照年龄查询,例如想查年龄=18的列表,我们还可以建立另一个倒排链: 18 | [1,5]---| --- | 20 | [2]21 | [3,4] 在这里,Alice,Alan,18,这些都是term。所以倒排本质上就是基于term的反向列表,方便进行属性查找。到这里我们有个很自然的问题,如果term非常多,如何快速拿到这个倒排链呢?在lucene里面就引入了term dictonary的概念,也就是term的字典。term字典里我们可以按照term进行排序,那么用一个二分查找就可以定为这个term所在的地址。这样的复杂度是logN,在term很多,内存放不下的时候,效率还是需要进一步提升。可以用一个hashmap,当有一个term进入,hash继续查找倒排链。这里hashmap的方式可以看做是term dictionary的一个index。 从lucene4开始,为了方便实现rangequery或者前缀,后缀等复杂的查询语句,lucene使用FST数据结构来存储term字典,下面就详细介绍下FST的存储结构。 FST 我们就用Alice和Alan这两个单词为例,来看下FST的构造过程。首先对所有的单词做一下排序为“Alice”,“Alan”。 插入“Alan” 插入“Alice” 这样你就得到了一个有向无环图,有这样一个数据结构,就可以很快查找某个人名是否存在。FST在单term查询上可能相比hashmap并没有明显优势,甚至会慢一些。但是在范围,前缀搜索以及压缩率上都有明显的优势。 在通过FST定位到倒排链后,有一件事情需要做,就是倒排链的合并。因为查询条件可能不止一个,例如上面我们想找name="alan" and age="18"的列表。lucene是如何实现倒排链的合并呢。这里就需要看一下倒排链存储的数据结构 SkipList 为了能够快速查找docid,lucene采用了SkipList这一数据结构。SkipList有以下几个特征: 元素排序的,对应到我们的倒排链,lucene是按照docid进行排序,从小到大。 跳跃有一个固定的间隔,这个是需要建立SkipList的时候指定好,例如下图以间隔是3 SkipList的层次,这个是指整个SkipList有几层 有了这个SkipList以后比如我们要查找docid=12,原来可能需要一个个扫原始链表,1,2,3,5,7,8,10,12。有了SkipList以后先访问第一层看到是然后大于12,进入第0层走到3,8,发现15大于12,然后进入原链表的8继续向下经过10和12。有了FST和SkipList的介绍以后,我们大体上可以画一个下面的图来说明lucene是如何实现整个倒排结构的: 有了这张图,我们可以理解为什么基于lucene可以快速进行倒排链的查找和docid查找,下面就来看一下有了这些后如何进行倒排链合并返回最后的结果。 倒排合并 假如我们的查询条件是name = “Alice”,那么按照之前的介绍,首先在term字典中定位是否存在这个term,如果存在的话进入这个term的倒排链,并根据参数设定返回分页返回结果即可。这类查询,在数据库中使用二级索引也是可以满足,那lucene的优势在哪呢。假如我们有多个条件,例如我们需要按名字或者年龄单独查询,也需要进行组合 name = "Alice" and age = "18"的查询,那么使用传统二级索引方案,你可能需要建立两张索引表,然后分别查询结果后进行合并,这样如果age = 18的结果过多的话,查询合并会很耗时。那么在lucene这两个倒排链是怎么合并呢。假如我们有下面三个倒排链需要进行合并。 在lucene中会采用下列顺序进行合并: 在termA开始遍历,得到第一个元素docId=1 Set currentDocId=1 在termB中 search(currentDocId) = 1 (返回大于等于currentDocId的一个doc), 因为currentDocId ==1,继续 如果currentDocId 和返回的不相等,执行2,然后继续 到termC后依然符合,返回结果 currentDocId = termC的nextItem 然后继续步骤3 依次循环。直到某个倒排链到末尾。 整个合并步骤我可以发现,如果某个链很短,会大幅减少比对次数,并且由于SkipList结构的存在,在某个倒排中定位某个docid的速度会比较快不需要一个个遍历。可以很快的返回最终的结果。从倒排的定位,查询,合并整个流程组成了lucene的查询过程,和传统数据库的索引相比,lucene合并过程中的优化减少了读取数据的IO,倒排合并的灵活性也解决了传统索引较难支持多条件查询的问题。 BKDTree 在lucene中如果想做范围查找,根据上面的FST模型可以看出来,需要遍历FST找到包含这个range的一个点然后进入对应的倒排链,然后进行求并集操作。但是如果是数值类型,比如是浮点数,那么潜在的term可能会非常多,这样查询起来效率会很低。所以为了支持高效的数值类或者多维度查询,lucene引入类BKDTree。BKDTree是基于KDTree,对数据进行按照维度划分建立一棵二叉树确保树两边节点数目平衡。在一维的场景下,KDTree就会退化成一个二叉搜索树,在二叉搜索树中如果我们想查找一个区间,logN的复杂度就会访问到叶子结点得到对应的倒排链。如下图所示: 如果是多维,kdtree的建立流程会发生一些变化。比如我们以二维为例,建立过程如下: 确定切分维度,这里维度的选取顺序是数据在这个维度方法最大的维度优先。一个直接的理解就是,数据分散越开的维度,我们优先切分。 切分点的选这个维度最中间的点。 递归进行步骤1,2,我们可以设置一个阈值,点的数目少于多少后就不再切分,直到所有的点都切分好停止。 下图是一个建立例子: BKDTree是KDTree的变种,因为可以看出来,KDTree如果有新的节点加入,或者节点修改起来,消耗还是比较大。类似于LSM的merge思路,BKD也是多个KDTREE,然后持续merge最终合并成一个。不过我们可以看到如果你某个term类型使用了BKDTree的索引类型,那么在和普通倒排链merge的时候就没那么高效了所以这里要做一个平衡,一种思路是把另一类term也作为一个维度加入BKDTree索引中。 如何实现返回结果进行排序聚合 通过之前介绍可以看出lucene通过倒排的存储模型实现term的搜索,那对于有时候我们需要拿到另一个属性的值进行聚合,或者希望返回结果按照另一个属性进行排序。在lucene4之前需要把结果全部拿到再读取原文进行排序,这样效率较低,还比较占用内存,为了加速lucene实现了fieldcache,把读过的field放进内存中。这样可以减少重复的IO,但是也会带来新的问题,就是占用较多内存。新版本的lucene中引入了DocValues,DocValues是一个基于docid的列式存储。当我们拿到一系列的docid后,进行排序就可以使用这个列式存储,结合一个堆排序进行。当然额外的列式存储会占用额外的空间,lucene在建索引的时候可以自行选择是否需要DocValue存储和哪些字段需要存储。 Lucene的代码目录结构 介绍了lucene中几个主要的数据结构和查找原理后,我们在来看下lucene的代码结构,后续可以深入代码理解细节。lucene的主要有下面几个目录: analysis模块主要负责词法分析及语言处理而形成Term。 codecs模块主要负责之前提到的一些数据结构的实现,和一些编码压缩算法。包括skiplist,docvalue等。 document模块主要包括了lucene各类数据类型的定义实现。 index模块主要负责索引的创建,里面有IndexWriter。 store模块主要负责索引的读写。 search模块主要负责对索引的搜索。 geo模块主要为geo查询相关的类实现 util模块是bkd,fst等数据结构实现。 最后 本文介绍了lucene中的一些主要数据结构,以及如何利用这些数据结构实现高效的查找。我们希望通过这些介绍可以加深理解倒排索引和传统数据库索引的区别,数据库有时候也可以借助于搜索引擎实现更丰富的查询语意。除此之外,做为一个搜索库,如何进行打分,query语句如何进行parse这些我们没有展开介绍,有兴趣的同学可以深入lucene的源码进一步了解。
前言 消息队列,通常有两种场景,一种是发布者订阅模式,一种是生产者消费者模式。发布者订阅模式,即发布者生产消息放入队列,多个监听的消费者都会收到同一份消息,也就是每个消费者收到的消息是一样的。生产者消费者模式,生产者生产消息放入队列,多个消费者同时监听队列,谁先抢到消息就会从队列中取走消息,最终每个消息只会有一个消费者拥有。 在大数据时代,传统的生产者消费者队列模式中的Topic数目可能从少量的几个变为海量topic。例如要实现一个全网爬虫抓取任务调度系统,每个大型的门户,SNS都会成为一个topic。在topic内部也会有海量的子网页需要抓取。在实现这样的一个任务分发调度系统时可能会遇到以下一些问题: 海量的topic,意味着我们可能会有海量的队列。针对爬虫场景,根据网页类型,一类网站对应到一个任务队列,不同的任务队列会有自己的生产者和消费者。 生产者和消费者会有多个,在业务峰值期间,产生较大并发访问,消息总量也是海量。针对爬虫任务消息总量可能就是全网的网页地址数量。 任务可能会有优先级,为了实现优先级高的任务优先调度,我们可能会在一个topic下再细分子队列。 消息消费不能丢失,如果是作为任务的调度消息,我们的消息丢失失零容忍的。 消费者模式中如果消费者因为种种原因处理失败或者超时,需要支持消息被重新调度。 在保证消息一定会被处理的前提下,我们也要避免少量消息因为各种原因处理堆积,而影响整个系统的吞吐。因为消息读区往往是轻量级,消息的处理是资源密集型。我们不希望因为消息读区堆积导致处理资源闲置。 解决方案 基于TableStore(表格存储)的跨分区高并发,主键自增列这个特性又很好的适配到我们的队列特性。支持海量,不同分区键下使用各自的自增主键,可以很好的实现海量队列。具体我们给出如下方案: 需要设计以下表: 任务消息表 消息消费checkpoint表 全量消息表 在介绍表设计之前,先做一些名词解释。 每个任务消息,我们假设已有一个唯一的id。 任务优先级,我们假设优先级范围是固定并且已经知道,如果任务优先级过多,可以分层,例如优先级1~100的映射到层级1。这里如果我们的任务没有优先级,那可以根据任务数据量级做一个简单的分桶,然后轮训抓取每个分桶中的任务。 两个游标,对应到每个topic的每个优先层级,我们需要记录2个游标位移点。一个是抓取扫描游标,一个是完成游标。扫描游标的定义是指当前任务当前优先层级下,被扫描到的最大位移位置。完成位移点表示改任务当前优先层级下,最大的抓取完成位移点,之前的任务都已经完成抓取。 表设计 任务消息表 主键顺序 名称 类型 值 备注 1 partition_key string md5(topicid)_topicid 为了负载均衡 2 priority int 优先级或优先级的映射或简单分桶 这里如果要做分桶或者映射,一个基本的原则是希望数据可以打散 3 id int 自增Id 主键自增列 这里,每一个子任务都会被插入这张表,任务可能由不同的爬虫端抓取后产生子任务,在子任务产生的同时,任务的访问地址,访问优先级已经被固定。我们根据一个分层算法进行映射。所以主键前两列已经确定,插入TableStore(表格存储)后,id会自增生成,用于后续消费者读任务用。 消息消费checkpoint表 这张表用于消息消费的checkpoint。下面会结合schema具体说下checkpoint的内容。 主键顺序 名称 类型 值 备注 1 partition_key string md5(taskid)_taskid 为了负载均衡 2 priority int 优先级层级 这张表属性列上会有两列,一列用来表示抓取扫描位移点,一列记录完成位移点。这里checkpoint的记录需要使用条件更新,即我们只会确保原来值小于待更新的值才会更新。 全量消息表 我们用全量消息表存放我们的消息id以及对应属性,一个消息任务是否重复处理也通过这张表做判断。 主键顺序 名称 类型 值 备注 1 partition_key string md5(taskid) 为了负载均衡 2 taskid string 任务id 在全网信息表中,有一列属性用来表示任务处理状态,消费者在拿到任务id时需要条件更新这张表对应的这个key,对应行不存在可以直接插入。如果已经存在,需要先读状态为非结束状态,版本为读到版本情况下再做更新。更新成功者意味着当前id的任务被这个消费者抢占。其中行不存在表示第一次爬取,如果存在非结束状态,表示之前的任务可能已经失败。 任务消费处理流程 下面我们用爬虫抓取全网网页做为例子来看下具体如何基于TableStore(表格存储)做消息队列并最终实现任务的分发: 这张图展现了我们的整个爬虫框架,爬虫具体流程如下 不同的爬虫端会根据自身爬取进度定时从TableStore的爬虫任务表进行拉取爬虫任务,这里一般我们单线程GetRange访问TableStore,我们认为这里的任务读区速率会远大于抓取消费者的速度,从TableStore读区到的任务数据进入爬虫内存队列,然后进行下一轮任务消息读区。直到当前内存队列满后等待下轮唤醒继续抓取,如果有特殊需求可以并发拉取不同优先级。 初始对于每个任务的各个priority,他们的默认checkpoint都对应于TableStore的一个flag即Inf_Min,也就是第一行。 GetRange拉取到当前任务各优先级抓取任务后(例如我们可以设置从优先级高到低,一次最多200条,抓够200条进行一次任务抢占),爬虫会先根据具体优先级排序,然后按照优先级从高到低尝试更新网页信息表,进行爬取任务抢占,抢占成功后,该任务会被放进爬虫的内存任务队列给抓取线程使用。抢占成功同时我们也会更新一下爬虫任务表中的状态,和当前的时间,表示任务最新的更新时间,后续的任务状态检验线程会查看任务是否已经过期需要重新处理。注意这里假如有一个爬虫线程比较lag,是上一轮抢占任务后卡了很久才尝试更新这个时间,也没有问题。这种小概率的lag可能会带来重复抓取,但是不会影响数据的一致性。并且我们可以在内存中记录下每一步的时间,如果我们发现每一步内存中的时间超时也可以结束当前任务,进一步减少小概率的重复抓取。 当一轮的任务全部填充后,我们会根据当前拿到的最大任务表id+1(即爬虫任务表第三个主键,也就是自增主键)进行尝试当前任务对应优先级checkpoint表的更新(这里更新频率可以根据业务自由决定),更新的原则是新的id要大于等于当前id。如果更新成功后,可以使用当前更新值继续拉取,如果更新失败,意味着有另一个爬虫已经取得更新的任务,需要重新读一下checkpoint表获得最新的checkpoint id值,从该id继续拉取。 除了任务抓取线程以为,每个爬虫端可以有一个频率更低的任务进行任务完成扫描,这个任务用来最新的完成任务游标。扫描中getrange的最大值为当前拉取的起始位置,扫描的逻辑分以下几种: 扫描到该行已经更新为完成,此时游标可以直接下移 扫描到任务还是initial状态,一个任务没有被任何人设置为running,切被拉去过,原因是这个任务是一个重复抓取的任务,此时可以去url表中检查这个url是否存在,存在直接跳过。 扫描到任务是running,不超时认为任务还在执行,结束当轮扫描。如果检查时间戳超时,检查url表,如果内容已经存在,则有可能是更新状态回任务表失败,游标继续下移。如果内容也不存在,一种简单做法是直接在表对应优先级中put一个新任务,唯一的问题是如果是并发检查可能会产生重复的任务(重复任务通过url去重也可以解决)。另一种做法也是通过抢任务一样更新url表,更新成功者可以新建任务下移坐标。其余的人停止扫描,更新checkpoint为当前位置。更新成功者可以继续下移扫描直至尾部或者任务正常进行位置,然后更新checkpoint。 爬虫抓取每个任务完成后,会更新全网url表中的状态以及对应爬虫任务表中的状态,其中全网url的状态用来给后续抓取任务去重使用,爬虫任务表中的状态给上面步骤5的完成游标扫描线程使用判断一个任务是否已经完成。 整个写入子任务和读取我们可以抽象出下面这张图 新任务会根据优先级并发写入不同的队列,其中图中编号就对应表格存储中的自增列,用户按照上面设计表结构的话,不需要自己处理并发写入的编号,表格存储服务端会保证唯一且自增,即新任务在对应队列末尾。爬虫读取任务的游标就是图中红色,蓝色对应完成的任务列表。两个游标在响应优先级下独立维护。 下面我们举个例子,如果一个爬虫任务拉取线程假如设置一次拉2个任务为例, 我们的爬虫任务表会从上面切换成下图,task1 priority=3的扫描游标更新到了10011,priority=2的扫描游标更新到10006。也就意味着扫描优先级3的下次会从10011开始,优先级2的会从10006开始。 并发处理 多爬虫拉取任务有重复,这部分我们通过条件更新大表决定了同一个网页不会同时被抓取。 多爬虫条件更新checkpoint表决定了我们整个拉取任务不会漏过当前拉到的一批任务,如果checkpoint更新如果条件失败任务继续进行,其他类型可重试错误会继续重试(例如服务短时间不可用,lag等。)这里只有可能导致其他爬虫唤醒后拉到重复数据,但是抓取因为抢占失败也不会重复拉取,并且新唤醒的客户端也会更新更大的游标,保证系统不会因为一个客户端lag而任务扫描游标滞后。 任务判定完成逻辑我们可以做分布式互斥,同时只有一个进程在判断。也可以在判断任务失败的时候进行条件更新原表,更新成功后再新插入一条新任务。 总结 最后我们再来看下整个设计中几个关键的问题是否满足 海量topic,TableStore(表格存储)天然的以一个分区键做为一个队列的能力使得我们可以很容易的实现海量的队列,数量级可以在亿级别甚至更多。 优先级,优先级对应一个主键列,依照优先级进行分层优先级高的会被优先getrange获得。 系统吞吐,整个系统中两个游标的设计,使得我们任务扫描游标每轮扫描后都会快速向下走,长尾任务不会阻碍对新任务的扫描。另一方面我们任务会在url大表上做抢占,避免不必要的重复抓取。 子任务不丢失,自增列的保证了新任务会用更大的值即排在当前队列末尾。另外有一个完成扫描线程,会确保新任务全部完成后才会更新,这个游标代表了最后整个任务是否完成。这个游标也保证了任务不会丢失。这个任务会对长尾的任务重新建一个任务并插入队列,新任务会被新爬虫端重新触发,也避免了因为一个客户端卡住而饿死的问题。 其他场景使用可以参考TableStore进阶之路。也欢迎大家加入我们的钉钉群进行交流
前言 在时下互联网信息的浪潮下,信息的传播速度远超我们的想象。微博里一条大V的帖子,朋友圈的一个状态更新,热门论坛的一条新闻,购物平台的购物评价,可能会产生数以万计的转发,关注,点赞。如果是一些非理性负面的评论会激发人们的负面感,甚至影响到消费者对企业品牌的认同,如果不能及时的采取正确的应对措施,会造成难以估计的损失。所以我们需要一个高效的全网舆情分析系统,帮助我们实时的观测舆情。 这个全网舆情分析系统,可以实现百亿条网页数据的存储、实时新增网页的抓取和存储并能对新增网页做实时的元数据提取。有了提取结果,我们还需要进行进一步的挖掘分析,这些分析包括但不限于 舆情的影响力诊断,从传播量级和扩散趋势来做预测,确定是否最终形成舆情。 传播路径分析,分析舆情传播的关键路径。 用户画像,对舆情的参与者提供共性特征勾勒,如性别,年龄,地域和感兴趣话题。 情感分析,分析新闻或者评价是正面还是负面。情感分类后进行统计聚合。 预警设置,我们支持舆情讨论量阈值设置,达到阈值后通知推送业务方,避免错过舆情的黄金参与时间。 这些挖掘后的舆情结果会被推送至需求方,同时也提供接口给各业务方搜索,查询使用。下面我们就展开讨论系统设计中可能会遇到的问题,我们会重点关注系统设计中存储相关的话题,针对这些问题找到一个最优化的方案。 系统设计 对于一个舆情系统,首先需要一个爬虫引擎,去采集各大主流门户,购物网站,社区论坛原始页面内容,微博,朋友圈的各类消息信息。采集到的海量网页,消息数据(百亿级别)需要实时存储下来。再根据网站url获取网页之前还需要判断一下是否是之前爬过的页面,避免不必要的重复爬取。采集网页后我们需要对网页进行萃取,去除不必要的标签,提取标题,摘要,正文内容,评论等。萃取后的内容进入存储系统方便后续查询,同时还需要把新增的抽取结果推送至计算平台进行统计分析,出报表,或者后续提供舆情检索等功能。计算的内容根据算法不同可能需要新增数据,也可能需要全量数据。而舆情本身的时效敏感性决定了我们系统一定要能高效处理这些新增内容,最好是秒级别延时后就可以检索到新热搜。 我们可以总结下整个数据流如下: 根据上图我们不难发现,设计一个全网舆情的存储分析平台,我们需要处理好抓取,存储,分析,搜索和展示。具体我们需要解决如下问题: 如何高效存储百亿级别的网页原始信息,为了提高舆情分析的全面性,准确性,我们往往希望可以尽可能多的爬取网页信息,再根据我们设置的权重聚合。所以网页的历史全库会比较大,积累数百亿的网页信息,数据量可以达到百TB甚至数PB。在数据量如此之大的情况下,我们还需要做到读写毫秒级别的低延时,这使得传统数据库难以满足需求。 如何在爬虫爬取网页之前判断是否之前已经爬取过,针对普通网页,舆情在意他们的时效性,可能我们对同一个网页只希望爬取一次,那我们就可以利用网页地址做爬取前去重,减少不必要的网页资源浪费。所以我们需要分布式存储提供基于网页的高效随机查询。 如何新增原始网页存储完成后进行实时的结构化提取,并存储提取结果。这里我们原始的网页可能是包括各种html的标签,我们需要去除这些html的标签,提取出文章的标题,作者,发布时间等。这些内容为后续舆情情感分析提供必要的结构化数据。 如何高效的对接计算平台,流式新增提取后的结构化数据进行实时的计算。这里我们需要根据网页,消息描述的内容做分类,进行情感识别,识别后的结果统计分析。由于全量分析时效性差,加上舆情往往关注最新的新闻,评论,所以我们必须做增量分析。 如何提供高效的舆情搜索,用户除了订阅固定关键词的舆情以外,做一些关键词搜索。例如希望了解竞争公司新产品的一些舆情分析。 如何实现新增舆情的实时推送,为了保证舆情的时效性,我们不仅需要持久化舆情分析结果,同时也要支持推送舆情结果。推送的内容通常是我们实时分析出来的新增舆情。 系统架构 针对上面介绍这些问题,我们下面来介绍下如何基于阿里云上的各类云产品来打造全网百亿级别的舆情分析平台,我们会重点关注存储产品的选型和如何高效的对接各类计算,搜索平台。 爬虫引擎我们选用ECS,可以根据爬取量决定使用ECS的机器资源数,在每天波峰的时候也可以临时扩容资源进行网页爬取。原始网页爬取下来后,原始网页地址,网页内容写入存储系统。同时如果想避免重复爬取,爬虫引擎抓取之前要根据url列表进行去重。存储引擎需要支持低延时的随机访问查询,确定当前url是否已经存在,如果存在则无需重复抓取。 为了实现网页原始内容的实时抽取,我们需要把新增页面推送至计算平台。之前的架构往往需要做应用层的双写,即原始网页数据入库同时,我们重复写入一份数据进入计算平台。这样的架构会需要我们维护两套写入逻辑。同样的在结构化增量进入舆情分析平台中,也有类似的问题,抽取后的结构化元数据也需要双写进入舆情分析平台。舆情的分析结果也需要一份写入分布式存储,一份推送至搜索平台。到这里我们可以发现,图中的三根红线会带来我们对三个数据源的双写要求。这会加大代码开发工作量,也会导致系统实现,维护变的复杂。每一个数据源的双写需要感知到下游的存在,或者使用消息服务,通过双写消息来做解耦。传统数据库例如mysql支持订阅增量日志binlog,如果分布式存储产品在可以支撑较大访问,存储量的同时也可以提供增量订阅就可以很好的简化我们的架构。 网页数据采集入库后,增量流入我们的计算平台做实时的元数据抽取,这里我们可以选用函数计算,当有新增页面需要提取时触发函数计算的托管函数进行网页元数据抽取。抽取后的结果进入存储系统持久化后,同时推送至MaxCompute进行舆情分析,例如情感分析,文本聚类等。这里可能会产生一些舆情报表数据,用户情感数据统计等结果。舆情结果会写入存储系统和搜索引擎,部分报表,阈值报警会被推送给订阅方。搜索引擎的数据提供给在线舆情检索系统使用。 在介绍完整体架构后,下面我们看下在阿里云上如何做存储选型。 存储选型 通过架构介绍我们再总结一下对存储选型的要求: 可以支撑海量数据存储(TB/PB级别),高并发访问(十万TPS~千万TPS),访问延时低。 业务随着采集订阅的网页源调整,采集量会动态调整。同时一天内,不同时间段爬虫爬下来的网页数也会有明显波峰波谷,所以数据库需要可以弹性扩展,缩容。 自由的表属性结构,普通网页和社交类平台页面的信息我们需要关注的属性可能会有较大区别。灵活的schema会方便我们做扩展。 对老数据可以选择自动过期或者分层存储。因为舆情数据往往关注近期热点,老的数据访问频率较低。 需要有较好的增量通道,可以定期把新增的数据导出至计算平台。上面的图中有三段红色虚线,这三部分都有个共同的特点需要可以实时的把增量导至对应的计算平台做计算,计算后的结果再写入对应的存储引擎。如果数据库引擎本身就支持增量,则可以很大程度简化架构,减少之前需要全量读区筛选增量,或者客户端双写来实现得到增量的逻辑。 需要可以有较好的搜索解决方案(本身支持或者可以数据无缝对接搜索引擎)。 有了这些需求后,我们需要使用一款分布式的NoSQL数据来解决海量数据的存储,访问。多个环节的增量数据访问的需求,业务的峰值访问波动进一步确定弹性计费的表格存储是我们在这套架构中的最佳选择。表格存储的架构介绍可以参考表格存储数据模型 TableStore(表格存储)相比同类数据库一个很大的功能优势就是TableStore(表格存储)有较完善的增量接口,即Stream增量API,Stream的介绍可以参考表格存储Stream概述。场景介绍可以参考Stream应用场景介绍,具体API使用可以参考JAVA SDK Stream。有了Stream接口,我们可以很方便的订阅TableStore(表格存储)的所有修改操作,也就是新增的各类数据。同时我们基于Stream打造了很多数据通道,对接各类下游计算产品,用户甚至不需要直接调用Stream API,使用我们的通道直接在下游订阅增量数据,自然的接入了整个阿里云的计算生态。针对上面架构中提到的函数计算,MaxCompute,ElasticSearch和DataV,TableStore(表格存储)都已经支持,具体使用可以参考: Stream和函数计算对接 Stream和MaxCompute Stream和Elasticsearch 通过DataV展示表格存储的数据 TableStore(表格存储)在属性列上,是自由的表结构。针对舆情分析这个场景,随着舆情分析算法的升级我们可能会新增属性字段,同时针对普通网页和微博这类社交页面的属性也可能不尽相同。所以自由表结构相比传统数据库可以很好的匹配我们这个需求。 在架构中,我们有三个存储库需求。分别是原始页面库,结构化元数据库和舆情结果库。前两者一般是一个离线存储分析库,最后一个是一个在线数据库。他们对访问性能,存储成本有着不同的需求。表格存储有两种类型的实例类型支持存储分层,即高性能和容量型。高性能适用于写多读多的场景也就是做为在线业务存储使用。容量型适合写多读少的场景,也就是离线业务存储用。他们的写入单行延时都可以控制在10毫秒内,读取高性能可以保持在毫秒级别。TableStore(表格存储)同时支持TTL,设置表级别数据过期时间。根据需求,舆情结果我们可以设置TTL,只提供近期数据的查询,较老的舆情自动过期删除。 有了TableStore(表格存储)的这些功能特性,系统对存储选型的六项要求就可以得到很好的满足,基于TableStore(表格存储)可以完美的设计和实现全网舆情存储分析系统。 后记 本文对实现海量数据舆情分析这一场景中会遇到的存储和分析问题进行了总结,介绍了如何通过使用阿里云自研的TableStore(表格存储)在满足业务基本数据量的前提下,通过Stream接口和计算平台的对接实现架构简化。TableStore(表格存储)是阿里云自主研发的专业级分布式NoSQL数据库,是基于共享存储的高性能、低成本、易扩展、全托管的半结构化数据存储平台,舆情数据存储分析是TableStore在大数据处理领域的重要应用之一。其他场景使用可以参考TableStore进阶之路。 更多的应用场景和技术探讨,欢迎加入我们的钉钉交流群(群号:11789671)。
上面一篇我们介绍了表格存储新功能Stream, 下面我们展开说一些场景,看看有了Stream后,哪些我们常见的应用场景可以更高效的设计和实现。 直播用户行为分析和存储 场景描述 现在视频直播非常火热,假如我们使用TableStore记录用户的每一次进入房间和离开房间,房间内的操作记录等,并希望根据用户的最近的观看记录,更新直播推荐列表。给主播提供近期收看其直播的用户的属性特征,帮助主播优化直播内容迎合观众。 表结构设计 主键顺序 名称 类型 值 备注 1 partition_key string md5(user_id)前四位 为了负载均衡 2 user_id string/int 用户id 可以是字符串也可以是长整型数字 3 room_id string/int 房间Id 可以使字符串也可以是长整型数字 4 timestamp int 时间戳 使用长整型,64位,足够保存毫秒级别的时间戳 数据存储示例 设计好表结构后,我们看下具体如何存储:比如原始数据是: 2017/5/20 10:10:10的时候小王在进入房间001,主播5此时在房间1做直播 2017/5/20 10:12:30的时候小王在房间001点了赞 2017/5/20 10:15:06的时候小王在房间001送给主播鲜花 2017/5/20 10:15:16的时候小王在房间001关注了主播 2017/5/20 10:25:41的时候小王离开了房间001 part_key user_id room_id timestamp operation actor_id device network 01f3 000001 001 1495246210 进入房间 005 Iphone7 4G 01f3 000001 001 1495246810 点赞 005 Iphone7 4G 01f3 000001 001 1495256810 鲜花 005 Iphone7 4G 01f3 000001 001 1495259810 关注主播 005 Iphone7 4G 01f3 000001 001 1495266810 退出房间 005 Iphone7 4G 主键 part_key:第一个主键,分区建,主要是为了负载均衡,保证数据可以均匀分布在所有机器上,提高并发度和性能。如果业务主键user_id可以保证均匀分布,那么可以不需要这个主键。 user_id:第二个主键,用户ID,可以是字符串也可以是数字,唯一标识一个用户。 room_id:第三个主键,房间ID,每个直播房间我们可以认为有一个唯一的标识,可以是字符串也可以是数字。 timestamp:第四个主键,时间戳,表示某一个时刻,单位可以是秒或者毫秒,用来表示用户产生操作的时间戳,记录了操作的时间戳,我们可以用来分析用户操作频率,或者和直播内容进行关联分析。 至此,上述四个主键可以唯一确定某一个用户在某一个时间点在某一个房间的操作数据。 属性列 operation:操作类型,例如进入房间,离开房间,关注,购买,打赏等等。 actor_id:直播人的id。也就是主播的id,一些特殊活动下,可能会变成一个主播列表。 device:用户访问的设备类型。 除了上面提到的一些基本属性以外,我们也可以根据需求添加关注的属性,例如用户的访问设备mac地址,ip地址等。 数据分析需求 如果我们现在想做一些运营分析,例如: 最近10分钟有多少用户在房间内做了支付操作。 最近用户支付较多的房间主播有什么共同属性。 过去一天什么时间段,用户房间内操作最活跃。 对于某一个用户,如何根据他最近的房间操作,例如离开了什么样的房间,在什么样的房间会滞留,推荐后续的直播内容。 从上面的这些分析需求我们大体可以分为两类: 离线分析过去一段时间用户操作行为,例如上面的场景3 实时分析最近用户的行为,例如上面的场景1,2,4 如何获取增量数据 假设我们直接使用API根据时间来获取增量数据,那么我们需要先要得到所有的用户id以及房间id,然后根据时间进行读取。用户数乘以房间数可能会是一个非常大的量,那么我们的分析就难以保证实效性。有了增量通道,我们可以使用Stream Client,订阅实时的增量数据。在Stream Client实现代码把增量数据推送到流计算平台或者ODPS中,做定期的分析。 结构图如下: 商品订单系统 场景描述 假设,我们的系统已经使用表格存储记录每个用户的订单信息,现在我们希望根据订单信息进行分析,近期的热点商品,根据订单更新采购库存。 表结构设计 主键顺序 名称 类型 值 备注 1 partition_key string md5(user_id)前四位 为了负载均衡 2 user_id string/int 用户id 可以是字符串也可以是长整型数字 3 timestamp int 时间戳 使用长整型,64位,足够保存毫秒级别的时间戳 4 order_id string/int 订单Id 可以使字符串也可以是长整型数字 数据存储示例 设计好表结构后,我们看下具体如何存储:比如原始数据是: 2017/5/20 10:12:20小王下了订单,订单号10005,购买了商品5,数量2,单价15,使用支付宝支付30元。 part_key user_id timestamp order_id commodity_id price count total payment_type status 01f3 000001 1495246210 10005 5 15 2 30 alipay finished 主键 part_key:第一个主键,分区建,主要是为了负载均衡,保证数据可以均匀分布在所有机器上,提高并发度和性能。如果业务主键user_id可以保证均匀分布,那么可以不需要这个主键。 user_id:第二个主键,用户ID,可以是字符串也可以是数字,唯一标识一个用户。 timestamp:第三个主键,时间戳,表示某一个时刻,单位可以是秒或者毫秒,用来表示用户订单的时间戳。在这里放置时间,是因为系统往往需要查询某个用户一段时间内的所有订单信息。 order_id:第四个主键,订单号。 至此,上述四个主键可以唯一确定某一个用户在某一个时间点下的一个订单。 属性列 commodity_id:购买商品的id。 price:购买商品的单价。 count:购买商品的总价。 total:订单的总价。 payment_type:用户支付类型。 status:订单的状态。 数据消费需求 针对订单系统,我们需要一下功能: 用户可以快速查询过去一段时间的所有订单 当有用户下单后,我们需要更新我们的仓储信息,当库存少于一定数量后需要发起采购 分析用户的近期购买兴趣,做购买推荐 检测异常订单,例如某个用户短时间内大量都买一个产品 针对需求1,我们的表设计再结合表格存储的GetRange可以很方便的实现。需求2,基于我们的增量可以很方便获取近期的订单,定期更新库存。表格存储即将发布的Stream对接FC(敬请期待),可以做到完全的无服务器触发整个流程,实现订单,库存的自动化管理。需求3和4,如果希望依赖ODPS做离线分析,可以使用DATAX结合我们的Stream Reader插件将数据导入opds进行分析。如果希望接入其他流计算平台,可以使用Stream Client订阅增量数据。 基于Stream实现属性高效查询 场景描述 我们用表格存储存放商家,以及商品的信息,商品有较多的属性例如价格,产地,适合人群等等,我们希望可以对这些商品的属性做各种灵活的查询。例如,产地是杭州的价格50到100之间的商品,或者我们希望对属性中某个做模糊搜索,例如“茶”。 表结构设计 主键顺序 名称 类型 值 备注 1 partition_key string md5(user_id)前四位 为了负载均衡 2 merchant_id string/int 商户id 可以是字符串也可以是长整型数字 3 commodity_id string/int 商品Id 可以使字符串也可以是长整型数字 数据存储示例 设计好表结构后,我们看下具体如何存储:比如原始数据是: 2017/5/20 商家1上架商品10005,商品产地杭州,名称西湖龙井茶叶,价格300,适合人群12岁以上, 属性描述如下:XXXXX |part_key | merchant_id | commodity_id | location | price | age | property|------- | ------- | ------- | ------- | ------- | ------- | ------- | |01f3 | 000001 | 10005 | 杭州 | 300| above 12 | XXXXX| 为了实现灵活的查询我们可能需要借助一个搜索系统例如Elasticsearch,那我们在插入一条新的商品的时候需要双写表格存储和Elasticsearch。那我们架构是: 基于这样的架构,我们需要引入一个MQ,自己实现表格存储和ES的双写。现在有了Stream功能后,我们可以直接写入表格存储,把增量同步进入Elasticsearch。架构修改为如下: 一个更让人期待的功能是阿里云数据同步通道DTS正在集成表格存储到ES,届时用户只需要做一些配置就可以打通表格存储和ES,灵活的实现属性的查询搜索。我们也可以参考表格存储和ES场景分析和实践 总结 最后我们来总结一下有了Stream带来的好处和适用的场景,Stream可以很方便的在以下场景中使用: 增量数据复制 DataX StreamReader 对接流计算,实时计算平台 对接函数计算 对接搜索 订阅增量数据 在使用表格存储这类水平扩展的分布式数据库的时候,我们需要让我们数据的分区键尽量分布均匀,避免写入尾部热点。所以我们无法使用时间做为分区键,但是如果我们的业务需要基于时间去读取消费数据,例如下图中,pk1,pk20和pk95等一些键值产生了新数据,我们需要跳着读区这些新数据,可能这样的key非常多,我们也很难得知哪些key产生的新数据。为了获得这些增量数据,我们得依赖一个队列,对一个更新操作执行双写,这样会增加很多额外的系统依赖和成本。而Stream的天生基于Commitlog的特性彻底改变了这一点,数据库的内容是根据主键进行排序组织,而数据库日志是根据修改顺序排序(如下图所示)。所以我们可以很方便的连续读区到这些在数据库文件中跳跃的键值。 谢谢使用表格存储,欢迎扫码加群讨论
阿里云自研PB级nosql数据库TableStore近期发布了新功能Stream,也就是增量通道,可以让用户实时的获取数据库中的增删改操作。很多使用TableStore的用户会定期把数据导入各类计算平台做数据的离线分析,以前的做法是使用DATAX或者使用TableStore的SDK定期拉取数据。之前我们只能采用全量拉取的办法,定期的全量拉取势必会带来很多不必要的开销,并且也失去了新增数据实时处理的可能。那有了Stream增量通道后,之前的这些痛点都会被迎刃而解。这个功能究竟怎么使用,又可以用在哪里呢?下面我就带大家初探TableStore的Stream功能。大家也可以先阅读下Stream的原理 产品功能 Stream功能和其他表格存储的很多功能一样,是用户表的一个属性。用户在创建表的时候可以指定是否开启Stream功能。用户也可以通过UpdateTable操作在后续需要使用Stream的时候开启。当用户开启Stream后,用户的修改记录在生命周期内(周期长短由用户开启Stream的时候指定,目前默认最大是一天,如有更长周期需要可以在官网提工单)会被一直保留。除了表操作以外,Stream的API具体有以下: ListStreams 获取当前表的 Stream 信息,例如 StreamID。具体请参见ListStreams DescribeStream 获取当前表增量数据的分区信息,熟悉表格存储特性的同学会知道表格存储会自动根据用户指定的分片键做分区来实现负载均衡。而我们的增量数据也是通过分区来进行组织,所以消费增量数据之前需要了解当前的分区信息,也就是Stream中的Shard信息。具体参见DescribeStream GetShardIterator 获取具体某个分区的读取iterator。这个iterator可以简单理解为一个偏移量标记我们可以从哪里开始消费增量数据。具体参见GetShardIterator GetStreamRecord 拉取增量数据,每次拉取结束后,会更新iterator用来下次拉取。如果返回数据为空表示当前尚未读取到新数据。如果返回null说明这个分区已经不存在没有后续的增量数据了。GetStreamRecord 如何理解TableStore的增量数据 介绍了Stream API,可能还不是很直观理解TableStore的Stream数据是如何组织的,下面就以用户轨迹为例来介绍如何使用。如何基于表格存储实现用户轨迹数据的存储,可以参考如何高效存储海量GPS数据。在这里,我们假设你使用如下的表结构存储你的海量轨迹数据, 主键顺序 名称 类型 值 备注 1 partition_key string md5(user_id)前四位 为了负载均衡 2 user_id string/int 用户id 可以是字符串也可以是长整型数字 3 task_id string/int 此次轨迹图的id 可以使字符串也可以是长整型数字 4 timestamp int 时间戳 使用长整型,64位,足够保存毫秒级别的时间戳 假如你有原始数据如下: 2017/5/20 10:10:10的时候小王在杭州虎跑路,开着私家宝马车,速度25km/h,当时风速2m/s,温度20度,已经开了8公里。 在表格存储中存储的是(11列); part_key user_id task_id timestamp longitude latitude brand speed wind_speed temperature distance 04fc 000001 001 1495246210 120.1516525097 30.2583277934 BMW 25 2 20 8000 当用户的位置不断发生变化,我们会产生一系列类似上面的轨迹数据,例如我们的粒度可以是10秒一个轨迹点。这样在一段时间内,我们可以积累海量的轨迹数据。那对于业务方,往往要做一些运营分析。 分析话题1:统计过去10分钟内是否有一个区域有驾驶热点,会带来交通拥堵。发现潜在拥堵点后,提前做一些车流疏散。 分析话题2:又或者我们希望在晚饭时间点,统计一下来某个商圈吃饭的客户都是从哪些地方开车过来的,日后可以在辐射区域内做一些精准推广。 这类问题的共同点是需要在这张轨迹表中获取一个时间段内新写入的数据,针对我们的表结构设计,如果没有增量通道的时候,我们能做的就是拿到所有的用户id和taskid进行时间段内的getrange读,这样如果同时的轨迹用户较多,会带来大量的getrange并发访问,而且我们还需要一张额外的表记录用户和轨迹id的关系。如果我们修改表结构,把时间作为第一主键,又会带来严重的数据写入尾部热点,数据分布不均匀等问题。 那么我们的架构就由以前的 加上大量的range读变为了下图的基于增量获取: Stream 的数据返回格式 当你使用我们的Stream APi读取增量数据的时候上面的数据会以下面的形式返回,我们以Go Sdk为例返回如下格式的Stream record。 record 0: {"Type":"PutRow", "PrimaryKey":{[{"Name": "pk1", "Value": "04fc"} {"Name": "pk2", "Value": "000001"} {"Name": "pk3", "Value": "001"} {"Name": "pk4", "Value": "%!s(int64=1495246210)"}]}, "Info":{"Epoch":0, "Timestamp": 1503555067832234, "RowIndex": 1}, "Columns":[{"Name":"longitude", "Type":"Put", "Timestamp":1503555067833, "Value":1e+02} {"Name":"latitude", "Type":"Put", "Timestamp":1503555067833, "Value":30.2583277934} {"Name":"brand", "Type":"Put", "Timestamp":1503555067833, "Value":BMW} {"Name":"speed", "Type":"Put", "Timestamp":1503555067833, "Value":25} {"Name":"wind_speed", "Type":"Put", "Timestamp":1503555067833, "Value":2} {"Name":"temperature", "Type":"Put", "Timestamp":1503555067833, "Value":20} {"Name":"distance", "Type":"Put", "Timestamp":1503555067833, "Value":8000}]} record 1: {"Type":"PutRow", "PrimaryKey":{[{"Name": "pk1", "Value": "04fc"} {"Name": "pk2", "Value": "000001"} {"Name": "pk3", "Value": "001"} {"Name": "pk4", "Value": "%!s(int64=1495246310)"}]}, "Info":{"Epoch":0, "Timestamp": 1503555068082609, "RowIndex": 1}, "Columns":[{"Name":"longitude", "Type":"Put", "Timestamp":1503555068083, "Value":1e+02} {"Name":"latitude", "Type":"Put", "Timestamp":1503555068083, "Value":30.2583277934} {"Name":"brand", "Type":"Put", "Timestamp":1503555068083, "Value":BMW} {"Name":"speed", "Type":"Put", "Timestamp":1503555068083, "Value":25} {"Name":"wind_speed", "Type":"Put", "Timestamp":1503555068083, "Value":2} {"Name":"temperature", "Type":"Put", "Timestamp":1503555068083, "Value":20} {"Name":"distance", "Type":"Put", "Timestamp":1503555068083, "Value":8001}]} 我们可以发现,表格的一次操作对应Stream的一条记录,记录中会涵盖这次操作的类型,操作的主键以及修改列的内容。有了这些数据我们可以方便做以下事情: 将数据做清洗写入另一张TableStore表 将数据写入流计算平台,做实时计算分析 将数据写入MaxCompute做进行分析 下面罗列下我们目前有如下几种方式可以读区表格存储的增量数据: SDK直接访问,目前我们的Java SDK和Go SDK已经支持Stream的Api,具体的使用可以参考Java Stream 示例和Go Stream 示例 DATAX 离线读取stream数据到odps,具体使用参考[DATAX 访问TableStore增量]() 基于Stream Client,用户自己开发实时数据通道将数据导出至不同的数据源。使用可以参考Stream Client使用 Stream对接FC,通过FC触发数据处理逻辑。 [即将发布,敬请期待]() 下面我们就用外卖订单系统为例再说明下Stream如何可以方便我们简化,高效的实现我们的应用。 外卖订单系统 场景描述 现在外卖行业非常火热,几家大厂都在角逐这个领域。而外卖也确实给我们的日常生活带来的很多便利,那如何基于表格存储打造一款高效的外卖应用呢,下面我们来详细介绍下。 系统特点 很多外卖会在不同时间有明显的波峰波谷,例如食品外卖,三餐点和夜宵时间点会有明显的波峰。那么表格存储这类海量高性能弹性计费的数据库产品就非常适合。除此之外,外卖系统还要基于一个区域内所有用户的下单情况做一个做优化的配送,实现效率最优,那么这样的系统我们如何设计表结构呢。 表结构设计 表1 订单表 主键顺序 名称 类型 值 备注 1 partition_key string md5(user_id)前四位 为了负载均衡 2 user_id string/int 用户id 可以是字符串也可以是长整型数字 3 timestamp int 时间戳 使用长整型,64位,足够保存毫秒级别的时间戳 4 order_id string/int 订单Id 可以使字符串也可以是长整型数字 表2 配送表 主键顺序 名称 类型 值 备注 1 partition_key string md5(user_id)前四位 为了负载均衡 2 user_id string/int 配送员id 可以是字符串也可以是长整型数字 3 delivery_id int 配送序列 使用长整型,基于表格存储主键自增列 数据存储示例 设计好表结构后,我们看下具体如何存储:订单表原始数据是: 2017/5/20 10:12:20小王下了订单,订单号10005,购买了两串烤肉和一杯咖啡,总共支付来51元,收获地址是西湖区XXX路XX号。 配送表原始数据是: 2017/5/20 10:12:20配送员小李收到配送订单信息,订单号10005,购买了两串烤肉和一杯咖啡,收获地址是西湖区XXX路XX号。 part_key user_id timestamp order_id merchant_id commodity price address payment_type status 01f3 000001 1495246210 10005 黑暗料理 2烤肉,1咖啡 51 西湖区XXX路XX号 alipay 等待配送 part_key user_id delivery_id order_id merchant_id commodity price address payment_type status 01f3 000001 1495249230 10005 黑暗料理 2烤肉,1咖啡 51 西湖区XXX路XX号 alipay 配送中 主键 订单表 part_key:第一个主键,分区建,主要是为了负载均衡,保证数据可以均匀分布在所有机器上,提高并发度和性能。如果业务主键user_id可以保证均匀分布,那么可以不需要这个主键。 user_id:第二个主键,用户ID,可以是字符串也可以是数字,唯一标识一个用户。 timestamp:第三个主键,时间戳,表示某一个时刻,单位可以是秒或者毫秒,用来表示用户订单的时间戳。在这里放置时间,是因为系统往往需要查询某个用户一段时间内的所有订单信息。 order_id:第四个主键,订单号。 至此,上述四个主键可以唯一确定某一个用户在某一个时间点下的一个订单。 配送表 part_key:第一个主键,分区建,主要是为了负载均衡,保证数据可以均匀分布在所有机器上,提高并发度和性能。如果业务主键user_id可以保证均匀分布,那么可以不需要这个主键。 user_id:第二个主键,配送员ID,可以是字符串也可以是数字,唯一标识一个用户。 delivery_id:第三个主键,配送号,注意不是用户的订单号,这一列使用自增列,配送员的客户端可以根据这个id拉去更新的配送信息。 至此,上述三个主键可以获取一个配送订单的详细信息。 属性列 订单表 merchant_id :商家id commodity:商品内容。 price:订单价格。 address: 配送地址 payment_type:用户支付类型。 status:订单的状态。 配送表 order_id :订单id commodity:商品内容。 price:订单价格。 address: 配送地址 payment_type:用户支付类型。 status:订单的状态。 由于表格存储的分区键可以在数据访问增加时进行分裂,当我们有百万用户同时在高峰期下单时我们可以分裂出较多的分区轻松应对每秒数十万甚至数百万的新增订单。有了这样的一个订单存储系统后,如何衔接我们的派单系统呢,这时候我们就可以使用增量功能,把近期的订单信息导入排单系统进行线路优化计算。前面我们也提到了外卖订单的伸缩特性,所以我们推荐使用函数计算进行订单的派送计算,我们表格存储Stream对接函数计算的功能也即将上线,届时我们是需要一些配置就可以打通表格存储和函数计算这两款全托管完全弹性计费的存储,计算产品。让我们的外卖订单飞的再快一点吧。 谢谢使用表格存储,欢迎扫码加群讨论
Workshop 环境准备 通过阿里云官网控制台创建一台ECS机器 下载实验需要的程序 修改相应配置文件 我们点击ECS图标进入ECS控制台,创建ECS实例。 注意需要选取64位的linux系统。 实例创建完成后,通过网页版登录机器 网页跳转至ECS机器操作界面,记住网页登录秘密 输入刚才保存下来的登录密码 进入ECS后输入系统管理员账号默认为root和密码为控制台中指定的密码 下载GameServer至Demo目录 mkdir demo cd demo wget http://workshop-img.oss-cn-shanghai.aliyuncs.com/demopackage.tar tar xvf demopackage.tar 进入demo目录,替换gameserver的配置文件中阿里云账号AK信息。 4. 安装日志服务客户端Logtail 在Logtail安装页面 安装Logtail - ECS 经典网络 - 选择华东2 安装脚本wget http://logtail-release-sh.oss-cn-shanghai-internal.aliyuncs.com/linux64/logtail.sh; chmod 755 logtail.sh; sh logtail.sh install cn_shanghai 登陆ECS 运行安装命令 安装完成
海量用户数据管理及分析 场景介绍 X游戏公司有多款手游,页游在线上运营。近期也有一批新款游戏设计出炉准备开发,公司希望根据游戏热度决定未来资源投入的方向。与此同时,近期频发的盗号现象,也让公司倍感苦恼。开发一个登录风控模块,迫在眉睫。架构师小吴接到这个任务,平时热爱了解,使用阿里云的他认为表格存储的多版本功能可以很方便的实现用户元数据的管理,并为风控模块服务。 实验概述 实验中我们会开发一个建议的游戏服务器,服务器提供用户登录功能,并嵌入风控模块。实验可以通过portal模拟用户的登录行为,并且系统会根据登录信息判断登录风险。整个实验的执行流程如下: 具体步骤 我们已经在账号下准备好了一个ECS实例,假设我们的游戏服务器会跑在这台ECS中。 进入阿里云主页,使用预先准备好的账号登录管控 点击表格存储图标进入表格存储控制台 选择华东2地区,创建表格存储实例 实例名字确保全局唯一,实例类型可以根据业务需求选取,这里我们使用高性能 创建好实例后,我们建一张存放用户meta的表,注意红色框出的数据不要填错 表格创建好后我们可以看到表格的基本信息,包括endpoint,表的meta 我们点击ECS图标进入ECS控制台,通过网页版登录机器 网页跳转至ECS机器操作界面,记住网页登录秘密 输入刚才保存下来的登录密码 输入系统管理员账号和密码,我们预先为您建立的用户名root,密码User@123 我们在Demo目录下已经准备好了GameServer的binary 进入demo目录,可以检查下gameserver的配置文件中阿里云账号AK信息是否正确,如果没有填写可以按照如下查找AK信息并更新配置文件 将表格存储实例的相应信息填入并保存 启动游戏服务器。并替换之前建立的表格存储实例名和连接地址 ./demo instanceName endpoint 在本地机器打开浏览器访问ECS的公网地址. http://ecsvmipaddress 输入你的登录信息,点击登录,得到风险监测结果,通过表格显示用户的登录历史记录 谢谢使用表格存储,欢迎扫码加群讨论
2019年10月
2019年09月