优点总线矩阵是数仓建设的纲领性文件,不论是从零开始一个项目,还是中途接手一个项目,总线矩阵总是最好的切入点。总线矩阵有利于主数据管理。核心维度由数据管理责任人定义,在多个业务过程中使用,而不是被单一业务过程或部门定义,提供一致性维度,实现跨业务过程钻取的需求。总线矩阵列表示整个企业的公共维度,有助于创建核心维度列表,解决主数据管理和数据集成的需求。总线矩阵有利于项目规划和排期。总线矩阵将业务过程按主题划分,每个主题下包含多个业务过程,各个主题中业务过程互不重叠。不同开发小组遵循该架构进行异步独立开发,每个小组承担不同数据域或同一数据域下不同业务过程,也可以进行一致性维度开发,从而实现增量式的开发。不同小组间分工更加清晰,每个人对自己在组织中承担的任务也更明确。总线矩阵是数据一致性的重要保障。总线矩阵提倡从初期规划一致性维度,各业务过程共享一致性维度。通过一致性维度,确保维度的有序建设,减少冗余的出现。总线矩阵提供一目了然的维度能力观察,让后面开发的同学了解现有数仓数据,减少烟囱式建设的可能性。总线矩阵可以避免面向需求开发。总线矩阵要求我们基于业务过程建设数仓,要求我们从全面的角度考虑维度建设。避免了拿到需求后盲目建设的情况,也避免后期不停维护扩展。总线矩阵是数仓建设过程中各种角色间的沟通桥梁。架构师通过总线矩阵描述项目概况,进行任务分工;建模人员通过总线矩阵了解项目中一致性维度与业务过程关系,开展建模工作;项目经理通过总线矩阵,了解项目规模,进行排期安排,进度跟进;BI同学通过总线矩阵,了解数仓包含的业务过程与支持钻取的维度。通过总线矩阵,简化不同角色人员间沟通,更好的实现不同组织的工作配合。基于上述优点,我认为总线矩阵是数仓建设中最重要的文档,应当由架构师在项目初期负责建设,并且长期维护和全局共享。干货直达👇数仓建设:数据域和主题域是什么关系?关于未来数据开发技术方向的观点数据架构建设方法及案例基于阿里(OneData)的数仓体系建设更多精彩👇
为什么会有域的概念呢?首先来看看数据仓库的定义吧,数据仓库是一个面向主题的、集成的、相对稳定的、反映历史变化的数据集合,用于支持管理决策。主题域已经体现出来了主题域用于将数据集市按照分析视角进行划分,通常是联系较为紧密的数据主题的集合。可以根据业务的关注点,将这些数据主题划分至不同的主题域。当然,还有另外一种说法。数据仓库是面向主题(数据综合、归类并进行分析利用的抽象)的应用。数据仓库模型设计除横向的分层外,通常也需要根据业务情况进行纵向划分数据域。数据域是联系较为紧密的数据主题的集合,通常是根据业务类别、数据来源、数据用途等多个维度,对企业的业务数据进行的区域划分,将同类型数据存放在一起,便于快速查找需要的内容。不同使用目的数据,分类标准不同。例如,电商行业通常分为交易域、会员域、商品域等。数据域划分原则:全局性、可理解性和数量适中。主题域主题域通常是 联系较为紧密的数据 主题的集合。可以根据业务的关注点,将这些数据主题划分到不同的主题域。主题域的确定必须由最终用户和数据仓库的设计人员共同完成。主题域、主题、实体间关系主题设计是对主题域进一步分解,细化的过程。主题域下面可以有多个主题,主题还可以划分成更多的子主题,而实体则是不可划分的最小单位。主题域、主题、实体的关系如下图所示:可以显而易见的看出,主题域是一个更大的概念,主题是略次之,实体最小,这里的实体表示的是实体对象(对应企业中某一宏观分析领域所涉及的分析对象)。主题域划分主题域是对某个主题进行分析后确定的主题的边界。分析主题域,确定要装载到数据仓库的主题是信息打包技术的第一步。而在进行数据仓库设计时,一般是一次先建立一个主题或企业全部主题中的一部分,因此在大多数数据仓库的设计过程中都有一个主题域的选择过程。确定主题边界实际上需要进一步理解业务关系,因此在确定整个分析主题后,还需要对这些主题进行初步的细化才便于获取每一个主题应该具有的边界。对于四个主题及其在企业中的业务关系可以确定边界。主题的使用由于数据仓库的设计是一个螺旋发展的过程。在刚开始,没有必要在数据仓库的数据库中体现所有的主题,选择最重要的主题作为数据仓库设计的试金石是很有必要的。因此使用主题首先是找到需要分析的主题域。例如在 AdventureWorks DW 数据仓库的概念模型设计中,在对需求进行分析后,认识到“商品”主题既是一个销售型企业最基本的业务对象,又是进行决策分析的最主要领域,因而把“销售分析”主题域定义为要首先建立的主题。通过“商品”主题的建立,经营者就可以对整个企业的经营状况有较全面的了解。先实施“商品”主题可以尽快地满足企业管理人员建立数据仓库的最初要求,所以先选定“商品”主题进行实施。通过将主题边界的划分应用到已经得到的关系模型上还能形成原始的概念模型。这一模型是把主题域的划分和事务处理数据库中的表结合起来的模型,例如在上面的例子中,商品主题可能涵盖的关系表有商品表、供应关系表、购买关系表和仓储关系表;仓库主题可能涵盖的关系表有仓库关系表、仓库表、仓库管理关系表和管理员表。数据域数据域是指面向业务分析,将业务过程或者维度进行抽象的集合。为保障整个体系的生命力,数据域需要抽象提炼,并长期维护更新。在划分数据域时,既能涵盖当前所有的业务需求,又能让新业务在进入时可以被包含进已有的数据域或扩展新的数据域。数据域的划分工作可以在业务调研之后进行,需要分析各个业务模块中有哪些业务活动。数据域是 CDM 层的顶层划分,是对企业业务的抽象提炼,面向业务分析,包含当前所有业务需求,也便于未来扩展。数据域的使用数据域可以按照用户企业的部门划分,也可以按照业务过程或者业务板块中的功能模块进行划分。例如在线教育公司可以划分为如下数据域,数据域中每一部分都是实际业务过程经过归纳抽象之后得出的。数据域和主题域到底有什么区别呢?主题域是针对数据集市提出的概念,数据集市是面向主题,从业务驱动进行分析场景的建设。数据域是阿里数据中台 CDM 层建设中提出的,是数据驱动业务,是对数据的分类,更好的数据赋能业务。总结数据域是对数据的分类,主题域和业务域是对业务的分类。主题域和数据域最终都是对数据的分类,只是一个是数据视角,一个是业务视角。根本的目的是:统一规则,方便管理,容易理解,有利于开发效率,有利于快速服务业务场景就可以了。Tips个人建议,在 DWD 层可以按照数据域进行分类,DWS 层可以按照主题域划分,ADS 层可以按照分析主题域(业务场景)划分。数据域划分几点需要注意的地方1.不重不漏,确保每个表都在一个域里,且只在一个域里(精确定位)2.每个域下都可以根据需要再分子域,不限定层级(最自由方便)3.如果分子域就不能放表,表只放在最底层的域中(树状目录管理时更方便)4.最好保证每个域下的子域数量或表数量在20个左右(太多了不方便记忆管理,太少了没必要划分)5.【其他】很好用,不好划分的都放里面(减少域层级数量有理由理解记忆)6.数据团队分域可以作为分工的标准(数据不重、分工明确、界限清晰)7.数据团队分域后,可以决定域内表的中间命名(看到表名时可以理解更多信息)
一、前言大家好,我是云祁。好久不见,前面分享过 数据仓库常见建模方法与大数据领域建模实例,今天再和大家聊聊数据中台,聊聊中台具体的实施交付过程以及非常重要的OneData理论。很早前,就有多位数据圈的大佬,写过这数据中台以及OneData相关的理论,见解颇深。我这里就站在巨人的肩旁上,聊聊自己的体悟,如有不妥之处,还请各位看官留言斧正。二、什么是数据中台?带着对 “阿里中台到底拆没拆” 的疑问 ,我们不妨先来探讨一下中台到底是啥? 😮早期业界对于 “数据中台” 一直没有明确的定义,但是作为理工直男,我们可以把中台理解是一种中间层。而中间层,它便承担了一个非常重要的特性 解耦!从上图我们不难看出,2014年前阿里的每一块业务都会有对应的ETL开发团队为其提供数据支持,而每个ETL开发团队都会按照自己的思路建设自己的数据体系。这就造成了:数据流向会乱,缺乏方向性;数据管理式无序的,处于失控状态;浪费研发人力和计算存储资源,也不能很好业务的需求;通过这样烟囱式的开发,造成了集团内极大的业务困扰和技术浪费。而阿里早期提出“数据中台”的概念,目的就是为了应对多业务部门千变万化的数据需求。既要满足业务部门日常性的多个业务前台的数据需求,又要满足像双11、618这样的业务高峰。发展到现在,数据中台俨然通过 “方法论+组织+工具” 成为阿里云上实现数据智能的最佳实践。如下图所示:三、什么是 OneData 体系?官方的解释是:阿里云OneData数据中台解决方案基于大数据存储和计算平台为载体,以OneModel统一数据构建及管理方法论为主干,OneID核心商业要素资产化为核心,实现全域链接、标签萃取、立体画像,以数据资产管理为皮,数据应用服务为枝叶的松耦性整体解决方案。其数据服务理念根植于心,强调业务模式,在推进数字化转型中实现价值。数据中台到如今的建设成果主要体现在两方面:一个是数据的技术能力,另一个是数据的资产。今天阿里的各个业务都在共享同一套数据技术和资产。阿里内部为这个统一化的数据体系命名为 “OneData”。OneData又主要抽象成三个部分,分别是:OneID、OneModel、OneService。第一部分:OneModel 致力于实现数据的标准与统一;第二部分:OneID 致力于实现实体的统一,让数据融通而非以孤岛存在,为精准的用户画像提供基础;第三部分:OneService 致力于实现数据服务统一,让数据复用而非复制。在架构图中,我们看到最下面的内容主要是数据采集和接入,按照业态接入数据(比如淘宝、天猫、盒马等),把这些数据抽取到计算平台。通过OneData体系,以 **“业务板块+分析维度”**为架构去构建“公共数据中心”。基于公共数据中心在上层根据业务需求进行建设:消费者数据体系、企业数据体系、内容数据体系等。经过深度加工后,数据就可以发挥其价值被产品、业务所用;最后通过统一的数据服务中间件“OneService”提供统一数据服务。三、OneData 方法论切入点《大数据之路:阿里巴巴大数据实践》一书中详细讲了OneData方法论,具体实施起来还是需要从数据架构方法、数据模型的设计方法以及数据标准化几方面入手。数据架构方法(全局化规划数据体系):数据域划分 -> 数据总线矩阵构建 -> 数据分层规划实现企业数据的全局观规划设计数据模型设计方法(好用且复用):维度 -> 事实 -> 公共汇总面向数据分析场景构建数据模型,让通用计算沉淀,数据复用,提升效能。数据标准化方法(计算口径&表达统一):派生指标 = 原子指标 + 业务限定 + 统计周期 + 统计粒度实现数据标准化定义,计算口径统一,保障数据质量四、OneData 实施流程首先,在数据中台实施过程时,要进行充分的业务调研和需求分析。这是中台建设的基石,业务调研和需求分析做得是否充分直接决定了数据中台建设是否成功。其次,进行数据总体架构设计,主要是根据数据域对数据进行划分;按照维度建模理论,构建总线矩阵、抽象出业务过程和维度。再次,对报表、大屏需求进行抽象整理出相关指标体系,使用OneData数据智能智能构建与管理平台,完成指标规范定义和模型设计。最后,就是代码研发和运维。4.1 项目调研按照业务需求调研+数据调研+业务系统调研+环境调研的思路开展。这个阶段主要注意避免对用户需求的错误理解;对网络情况不了解,影响数据上云;对业务系统的摸排不全面,导致后期模型和效果无法达到。4.2 架构设计数据域的划分数据域是指面向业务分析,将业务过程或者维度进行抽象的集合。其中,业务过程可以概括为一个个不可拆分的行为事件,在业务过程之下,可以定义相关指标;维度则是指度量的环境,如买家下单事件,买家就是维度。为保障整个体系的生命力,数据域是需要抽象提炼,并且长期维护和更新的,但不轻易变动。在划分数据域时,既能涵盖当前所有的业务需求,又能在新业务进入时无影响地被包含进已有的数据域中和扩展新的数据域。构建总线矩阵在进行充分的业务调研和需求调研后,就要着手构建总线矩阵了。在这一步我们需要做两件事情:明确每个数据域下有哪些业务过程;业务过程与哪些维度相关,并定义每个数据域下的业务过程和维度。4.3 规范定义规范定义主要定义指标体系,包括原子指标、业务限定、统计周期、派生指标。时间周期用来明确数据统计的时间范用或者时间点,如最近 30 天、自然周、截至当日等。业务限定是对业务的一种抽象划分。业务限定从属于某个业务域,如日志域的访问终端类型涵盖无线端、 PC端等修饰词。度量 / 原子指标原子指标和度量含义相同,基于某一业务事件行为下的度量,是业务定义中不可再拆分的指标,具有明确业务的名词,如支付金额。维度维度是度量的环境,用来反映业务的一类属性,这类属性的集合构成一个维度,也可以称为实体对象。维度属于一个数据域,如地理维度(包括国家、地区、省以及城市等级的内容)、时间维度(其中包括年、季、月、周、日等级别的内容)派生指标派生指标 = 一个原子指标+多个业务限定(可选)+时间周期。 可以理解为对原子指标业务统计范围的圈定。如原子指标:支付金额,最近一天海外买家支付金额则为派生指标原子指标、业务限定及修饰词都是直接归属于业务过程下,其中修饰词继承修饰类型的数据域。派生指标的种类派生指标可以分为三类:事务型指标、存量型指标和复合型指标。按照其特性不同,有些必须新建原子指标,有些可以在其他类型原子指标的基础上增加修饰词形成派生指标。4.4 模型设计数据模型的维度设计主要还是以维度建模理论为基础,基于维度数据模型总线架构,构建一致性的维度和事实。操作数据层(ODS)把业务系统数据几乎无处理地存放在数据仓库中。同步:结构化数据增量或者全量同步到MaxCompute结构化:非结构化(日志)结构化处理并存储到MaxCompute累积历史、清洗:根据数据业务需求及稽核和审计要求保存历史数据、清洗数据。公共维度模型层(CDM)存放明细事实数据、维表数据及公共指标汇总数据,其中明细事实数据、维表数据一般根据ODS层数据加工生成;公共指标汇总数据一般根据维表数据和明细事实数据加工生成。CDM层又细分为DWD层和DWS层,分别是明细数据层和汇总数据层,采用维度模型方法作为理论基础,更多地采用一些维度退化手法,将维度退化至事实表中,减少事实表和维表的关联,提高明细数据表的易用性。同时在汇总数据层,加强指标的维度退化,采取更多的宽表化手段构建公共指标数据层,提升公共指标的复用性,减少重复加工。其主要功能如下:组合相关和相似数据:采用明细宽表,复用关联计算,减少数据扫描。公共指标统一加工:基于OneData体系构建命名规范、口径一致和算法统一的统计指标,为上层数据产品、应用和服务提供给公共指标;建立逻辑汇总宽表。建立一致性维度:建立一致的数据分析维表,降低数据计算口径、算法不统一的风险。应用数据层(ADS)存放数据产品个性化的统计指标数据,根据CDM层与ODS层加工生成。个性化指标加工:不公用性、复杂性(指数型、比值型、排名型指标)基于应用的数据组装:大宽表集市、横表转纵表、趋势指标串4.5 总结OneData的实施过程是一个高度迭代和动态的过程,一般采用螺旋式实施方法。在总体架构设计完成后,开始根据数据域进行迭代示模型设计和评审。在架构设计、规范定义和模型设计等模型实施过程中,都会引入评审机制,以确保模型实施过程的正确性。五、写在最后在今年五月底,2021阿里云峰会上,阿里云智能总裁张建锋表示 2021 年有四个词非常关键,分别是:做深基础、做厚中台、做强生态、做好服务。“中台包括中间件、数据库、操作系统、大数据平台等,中台是未来的“云”能不能得到进一步快速发展的核心”。“拆中台也好”,“做厚中台”也罢,经过这两年和数据中台打交道,我觉得中台本质上,就是“中心化的能力复用平台”,诚如开篇时提到的,它承担着“解耦”的使命。如果有人说:我有个数据中台,你用了之后可以提高你企业的信息孤岛,提高企业的效率,那肯定是个骗子。数据中台的构建不是不是一蹴而就的,不论大企业,还是小企业,都需要从小的业务场景开始不断累积,需要长时间的业务经验的沉淀,不断的进行优化创新,最终才能构建出具有自己企业业务特色的数据中台。
大家好,我是云祁,好久不见~今天来和大家聊聊数仓常见的一些建模方法和具体的实例演示,一起来看看吧。一、为什么需要数据建模?在开始今天的话题之前,我们不妨思考下,到底为什么需要进行数据建模?随着从IT时代到DT时代的跨越,数据开始出现爆发式的增长,这当中产生的价值也是不言而喻。如何将这些数据进行有序、有结构地分类组织存储,是我们所有数据从业者都要面临的一个挑战。如果把数据看作图书馆里的书,我们希望看到它们在书架上分门别类地放置,而不是乱糟糟的堆砌在一起。大数据的数仓建模正是通过建模的方法,更好的组织、存储数据,以便在性能、成本、效率和数据质量之间找到最佳平衡点,一般我们会从以下面四点考虑:性能:能够快速查询所需的数据,减少数据I/O的吞吐。成本:减少不必要的数据冗余,实现计算结果的复用,降低大数据系统中的存储成本和计算成本。效率:改善用使用数据的体验,提高使用效率。质量:改善数据统计口径的不一致性,减少数据计算错误的可能性,提供高质量的、一致的数据访问平台。因此,毋庸置疑,大数据系统、数据平台都需要数据模型方法来帮助更好的组织和存储数据,数据建模的工作,也正是围绕上述四个指标取得最佳的平衡而努力。二、从 OLTP 和 OLAP 系统的区别看模型方法论的选择OLTP系统通常面向的主要数据操作是随机读写,主要采用3NF的实体关系模型存储数据,从而在事务处理中解决数据的冗余和一致性问题。OLAP系统面向的主要数据操作是批量读写,事务处理中的一致性不是OLAP所关注的,其主要关注数据的整合,以及在一次性的复杂大数据查询和处理的性能,因此它需要采用不同的建模方法,例如维度建模。如果大家想进一步了解 OLAP系统,可以学习这篇文章: 关于OLAP数仓,这大概是史上最全面的总结!三、典型的数据仓库建模方法论数据仓库本质是从数据库衍生出来的,所以数据仓库的建模也是不断衍生发展的。从最早的借鉴关系型数据库理论的范式建模,到逐渐提出维度建模等等,越往后建模的要求越高,越需满足3NF、4NF等。但是对于数据仓库来说,目前主流还是维度建模,会夹杂着范式建模。数据仓库建模方法论可分为:E-R模型、维度模型、Data Vault模型、Anchor模型。3.1 E-R模型将事物抽象为“实体”、“属性”、“关系”来表示数据关联和事物描述,这种对数据的抽象建模通常被称为E-R实体关系模型。数据仓库之父 Bill Inmon 提出的建模方法,从全企业的高度设计一个3NF模型,用实体关系(Entity Relationship)模型来描述企业业务,满足3NF。数据仓库的3NF与OLTP系统中的3NF的区别在于,它是站在企业角度面向主题的抽象,而不是针对某个具体的业务流程。采用 E-R模型建设数据仓库模型的出发点是整合数据,对各个系统的数据以整个企业角度按主题进行相似的组合和合并,并进行一致性处理,为数据分析决策服务,但是并不能直接用于分析决策。作为一种标准的数据建模方案,它的实施周期非常长,一致性和扩展性比较好,能经得起时间的考验。但是随着企业数据的高速增长、复杂化,数仓如果全部使用E-R模型进行建模就显得越来越不适合现代化复杂、多变的业务组织,因此一般只有在数仓底层ODS、DWD会采用E-R关系模型进行设计。E-R建模步骤分为三个阶段:高层模型:一个高度抽象的模型,描述主要的主题以及主题间的关系,用于描述企业的业务总体概况。中层模型:在高层模型的基础上,细化主题的数据项。物理模型(底层模型):在中层模型的基础上,考虑物理存储,同时基于性能和平台特点进行物理属性的设计,也可能做一些表的合并、分区的设计等。E-R模型在实践中最典型的代表是 Teradata 公司基于金融业务发布的 FS-LDM (Financial Services Logical Data Model ),它通过对金融业务的高度抽象和总结,将金融业务划分为10大主题,企业基于此模型适当调整和扩展就能快速实施落地。3.2 维度模型维度模型是数据仓库领域 Ralph Kimball 大师倡导的,是数据仓库工程领域最流行的数仓建模经典。维度建模以分析决策的需求出发构建模型,构建的数据模型为分析需求服务,因此它重点解决用户如何更快速完成分析需求,同时还有较好的大规模复杂查询的响应性能。其中典型的代表就是使用星型模型,以及在一些特殊场景下使用的雪花模型。其设计主要分为以下几个步骤:1.选择需要进行分析决策的业务过程。业务过程可以是单个业务事件,比如交易的支付、退款等;也可以是某个事件的状态,比如当前账户的余额;还有就是一系列相关业务事件组成的业务流程,具体需要我们分析的是某些事件发生的情况,还是当前状态,或是事件流转效率。2.选择粒度。在事件分析中,我们要预判所有分析需要细分的程度,从而决定选择的粒度。粒度是维度的一个组合。3.识别维表。选择好粒度之后,就需要基于这个粒度来设计维表,包括维度属性,用于分析时进行分组和筛选。4.选择事实。确定分析需要衡量的指标。在 Ralph Kimball 提出对数据仓库维度建模,我们将数据仓库中的表划分为事实表、维度表两种类型。针对维度建模中事实表和维度表的设计,之前有详细介绍过,感兴趣的同学可以看:维度建模技术实践——深入事实表 、维度建模的灵魂所在——维度表设计。在这里,我就以常见的电商场景为例:在一次购买的事件中,涉及主体包括客户、商品、商家,产生的可度量值会包括商品数量、金额、件数等。事实表根据粒度的角色划分不同,可分为事务事实表、周期快照事实表、累积快照事实表等。1.事务事实表:用于承载事务数据,任何类型的事件都可以被理解为一种事务,比如商家在交易过程中的常见订单、买家付款,物流过程中的揽货、发货、签收,退款中的申请退款。2.周期快照事实表:快照事实表以预定的间隔采样状态度量,比如自然年至今或者历史至今的下单金额、支付金额、支付买家数、支付商品件数等等状态度量。3.累计快照事实表:数据不断更新,选取多业务过程日期。用来记录具有时间跨度的业务处理过程的整个过程的信息,每个生命周期一行,通常这类事实表比较少见。我们继续就上述的电商场景,聊聊在维表设计时需要关注的一些东西:1.缓慢变化维度:例如会员表的手机号、地址、生日等属性。2.退化维度 :订货单表的订单编号、物流表的物流编号等。3.雪花维度:满足第三范式的维度关系结构。4.非规范化扁平维度:商品维表众中产品、品牌、类目、品类等。5.多层次维度:地区维度的省、市、区县,商品的类目层级。6.角色维度:日期维度在物流中扮演发货日期、送货日期、收获日期等不同角色。接下来就是针对维度建模按照数据的组织类型,可以划分为星型模型、雪花模型、星座模型。1.星型模型:星型模型主要是维表和事实表,以事实表为中心,所有维度直接关联在事实表上,呈星型分布。2. 雪花模型,在星型模型的基础上,维度表上又关联了其他维度表。这种模型维护成本高,性能方面会差一些。3. 星座模型,是对星型模型的扩展延伸,多张事实表共享维度表。实际上数仓模型建设后期,大部分维度建模都是星座模型。简单总结下就是:1.星型模型和雪花模型主要区别就是对维度表的拆分。2.对于雪花模型,维度表的涉及更加规范,一般符合3NF,有效降低数据冗余,维度表之间不会相互关联。3.星型模型,一般采用降维的操作,反规范化,不符合3NF,通过利用冗余来避免模型过于复杂,提高易用性和分析效率,效率相对较高。3.3 DataVault 模型Data Vault 是 Dan Linstedt 发起创建的一种模型,它是 E-R 模型的衍生,其设计的出发点也是为了实现数据的整合,但不能直接用于数据分析决策。它强调建立一个可审计的基础数据层,也就是强调数据的历史性、可追溯性和原子性,而不要求对数据进行过度的一致性处理和整合。同时它基于主题概念将企业数据进行结构化组织,并引入了更进一步的范式处理来优化模型,以应对源系统变更的扩展性。 Data Vault 型由以下几部分组成:1.Hub - 中心表:是企业的核心业务实体,由实体 Key、数仓序列代理键、装载时间、数据来源组成,不包含非键值以外的业务数据属性本身。2.Link - 链接表:代表 Hub 之间的关系。这里与 ER 模型最大的区别是将关系作为一个独立的单元抽象,可以提升模型的扩展性。它可以直接描述 1:1、1:2和n:n的关系,而不需要做任何变更。它由 Hub的代理键、装载时间、数据来源组成。3.Satellite - 卫星表:数仓中数据的主要载体,包括对链接表、中心表的数据描述、数值度量等信息。Data Vault 模型比 E-R 模型更容易设计和产出,它的 ETL 加工可实现配置化。我们可以将 Hub 想象成人的骨架,那么 Link 就是连接骨架的韧带,而 SateIIite 就是骨架上面的血肉。3.4 Anchor 模型Anchor 对 Data Vault 模型做了进一步的规范化处理,它的核心思想是所有的扩展只是添加而不是修改,因此将模型规范到6NF,基本变成了 k-v 结构化模型。1.Anchors :类似于 Data Vault 的 Hub ,代表业务实体,且只有主键。2.Attributes :功能类似于 Data Vault 的 Satellite,但是它更加规范化,将其全部 k-v 结构化, 一个表只有一个 Anchors 的属性描述。3.Ties :就是 Anchors 之间的关系,单独用表来描述,类似于 Data Vault 的 Link ,可以提升整体模型关系的扩展能力。4.Knots :代表那些可能会在 Anchors 中公用的属性的提炼,比如性别、状态等这种枚举类型且被公用的属性。由于过度规范化,使用中牵涉到太多的Join操作,这里我们就仅作了解。四、总结以上为四种基本的建模方法,目前主流建模方法为: E-R模型、维度模型。E-R模型通常用于OLTP数据库建模,应用到构建数仓时就更偏向于数据整合,站在企业整体考虑,将各个系统的数据按相似性一致性、合并处理,为数据分析、决策服务,但并不便于直接用来支持分析。维度建模是面向分析场景而生,针对分析场景构建数仓模型;重点关注快速、灵活的解决分析需求,同时能够提供大规模数据的快速响应性能。针对性强,主要应用于数据仓库构建和OLAP引擎低层数据模型。数据仓库模型的设计是灵活的,不会局限于某一种模型,需要以实际的需求场景为导向,需要兼顾灵活性、可扩展性以及技术可靠性及实现成本。我是「云祁」,一枚热爱技术、会写诗的大数据开发猿,欢迎大家关注呀!Respect ~
文章目录前言什么是 Apache Flink?Flink vs. Blink学习建议End大家好,我是云祁!之前为团队里的小伙伴做了 Flink 与阿里云 Realtime Compute 的技术分享,今天有时间就把PPT的内容做了整理分享给大家 (多图预警)🙄。前言Flink 最早期起源于德国柏林工业大学的一个研究项目Stratosphere,直到 2014年4月 捐献给Apache软件基金会…要知道,在2015年的时候,Filnk几乎没有人知道,更没有人大规模使用它 😭。而阿里是全球第一批使用Flink做大数据计算引擎研发的公司,2015年就引入内部,但最早Flink只能支持小流量互联网场景的数据处理。阿里觉得Flink很有潜力,决定进行改造,并把这个内部版本取名Blink,是英文眨眼的意思:“一眨眼,所有东西都计算好了!在2017年双11,Blink就已成功支持全集团(阿里巴巴、阿里云、菜鸟)所有交易数据的实时计算任务,也验证了Flink可以通过改造支持企业大规模数据计算的场景 😍。目前,国内诸多互联网大厂都已经完全拥抱了Flink。本次的分享就是围绕实时计算Flink和Alibaba Cloud Realtime Compute相关的知识点(能力、限制、典型场景,区别)进行分析。什么是 Apache Flink?如果用一句话聊聊什么是 Apache Flink 的命脉?那我的答案可能是:Apache Flink 是以"批是流的特例"的认知进行系统设计的。就目前最热的两种流计算引擎 Apache Spark 和 Apache Flink 而言,谁最终会成为No1呢?单从 “低延时” 的角度看,Spark是Micro Batching(微批式)模式,延迟Spark能达到0.5~2秒左右,Flink是Native Streaming(纯流式)模式,延时能达到微秒。很显然是相对较晚出道的 Apache Flink 后来者居上。 那么为什么Apache Flink能做到如此之 "快"呢?根本原因是 Apache Flink 设计之初就认为 “批是流的特例”,整个系统是 Native Streaming 设计,每来一条数据都能够触发计算。相对于需要靠时间来积攒数据 Micro Batching 模式来说,在架构上就已经占据了绝对优势。那么为什么关于流计算会有两种计算模式呢?归其根本是因为对流计算的认知不同,是"流是批的特例" 和 “批是流的特例” 两种不同认知产物。首先,我觉得 Flink 应用开发需要先理解 Flink 的 Streams、State、Time 等基础处理语义以及 Flink 兼顾灵活性和方便性的多层次API。Streams:流,分为有限数据流与无限数据流,unbounded stream 是有始无终的数据流,即无限数据流;而bounded stream 是限定大小的有始有终的数据集合,即有限数据流,二者的区别在于无限数据流的数据会随时间的推演而持续增加,计算持续进行且不存在结束的状态,相对的有限数据流数据大小固定,计算最终会完成并处于结束的状态。在 Spark 的世界观中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的。而在 Flink 的世界观中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,这就是所谓的有界流和无界流。State:状态是计算过程中的数据信息,在容错恢复和 Checkpoint 中有重要的作用,流计算在本质上是Incremental Processing(增量处理),因此需要不断查询保持状态;另外,为了确保Exactly- once 语义,需要数据能够写入到状态中;而持久化存储,能够保证在整个分布式系统运行失败或者挂掉的情况下做到Exactly- once,这是状态的另外一个价值。流式计算分为无状态和有状态两种情况。无状态的计算观察每个独立事件,并根据最后一个事件输出结果。- 例如,流处理应用程序从传感器接收温度读数,并在温度超过 90 度时发出警告。有状态的计算则会基于多个事件输出结果。以下是一些例子:所有类型的窗口。例如,计算过去一小时的平均温度,就是有状态的计算所有用于复杂事件处理的状态机。例如,若在一分钟内收到两个相差 20 度以上的温度读数,则发出警告,这是有状态的计算流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态的计算Time,分为Event time、Ingestion time、Processing time,Flink 的无限数据流是一个持续的过程,时间是我们判断业务状态是否滞后,数据处理是否及时的重要依据。EventTime,因为我们要根据日志的生成时间进行统计。在不同的语义时间有不同的应用场景我们往往更关心事件时间 EventTimeAPI 通常分为三层,由上而下可分为SQL / Table API、DataStream API、ProcessFunction 三层,API 的表达能力及业务抽象能力都非常强大,但越接近SQL 层,表达能力会逐步减弱,抽象能力会增强,反之,ProcessFunction 层API 的表达能力非常强,可以进行多种灵活方便的操作,但抽象能力也相对越小。实际上,大多数应用并不需要上述的底层抽象,而是针对核心 API(Core APIs) 进行编程,比如 DataStream API(有界或无界流数据)以及 DataSet API(有界数据集)。这些 API 为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换(transformations),连接(joins),聚合(aggregations),窗口操作(windows)等等。DataSet API 为有界数据集提供了额外的支持,例如循环与迭代。这些 API处理的数据类型以类(classes)的形式由各自的编程语言所表示。第一:Flink 具备统一的框架处理有界和无界两种数据流的能力。第二:部署灵活,Flink 底层支持多种资源调度器,包括 Yarn、Kubernetes 等。Flink 自身带的 Standalone 的调度器,在部署上也十分灵活。第三:极高的可伸缩性,可伸缩性对于分布式系统十分重要,阿里巴巴双 11 大屏采用 Flink 处理海量数据,使用过程中测得 Flink 峰值可达 17 亿 / 秒。第四:极致的流式处理性能。Flink 相对于 Storm 最大的特点是将状态语义完全抽象到框架中,支持本地状态读取,避免了大量网络 IO,可以极大提升状态存取的性能。接下来聊聊 Flink 常见的三种应用场景 :实时数仓当下游要构建实时数仓时,上游则可能需要实时的Stream ETL。这个过程会进行实时清洗或扩展数据,清洗完成后写入到下游的实时数仓的整个链路中,可保证数据查询的时效性,形成实时数据采集、实时数据处理以及下游的实时Query。搜索引擎推荐搜索引擎这块以淘宝为例,当卖家上线新商品时,后台会实时产生消息流,该消息流经过Flink 系统时会进行数据的处理、扩展。然后将处理及扩展后的数据生成实时索引,写入到搜索引擎中。这样当淘宝卖家上线新商品时,能在秒级或者分钟级实现搜索引擎的搜索。移动应用中的用户行为分析消费者技术中的实时数据即席查询在触发某些规则后,Data Driven 会进行处理或者是进行预警,这些预警会发到下游产生业务通知,这是Data Driven 的应用场景,Data Driven 在应用上更多应用于复杂事件的处理。实时推荐(例如在客户浏览商家页面的同时进行商品推荐)模式识别或复杂事件处理(例如根据信用卡交易记录进行欺诈识别)异常检测(例如计算机网络入侵检测)接下来就该讲讲 Apache Flink 的几点优势:Flink作为分布式的处理引擎,在分布式的场景下,进行多个本地状态的运算,只产生一个全域一致的快照,如需要在不中断运算值的前提下产生全域一致的快照,就涉及到分散式状态容错。
文章目录一、什么是Druid二、Druid 的三个设计原则2.1 快速查询(Fast Query)2.2 水平扩展能力(Horizontal Scalability)2.3 实时分析(Realtime Analytics)三、Druid 的主要特点四、Druid 的应用场景4.1 主要应用4.2 适合场景4.3 不合适的场景一、什么是DruidDruid 是一个分布式的支持实时分析的数据存储系统(Data Store)。美国广告技术公司MetaMarkets 于2011 年创建了Druid 项目,并且于2012 年晚期开源了Druid 项目。Druid设计之初的想法就是为分析而生,它在处理数据的规模、数据处理的实时性方面,比传统的OLAP 系统有了显著的性能改进,而且拥抱主流的开源生态,包括Hadoop 等。多年以来,Druid 一直是非常活跃的开源项目。Druid 的官方网站是 http://druid.io。另外,阿里巴巴也曾创建过一个开源项目叫作Druid(简称阿里Druid),它是一个数据库连接池的项目。阿里Druid 和本问讨论的Druid 没有任何关系,它们解决完全不同的问题,因此大家不要把两者混淆了。另外,我们还需要了解的一点:Druid是一个实时多维OLAP分析的数据处理系统。但是OLAP分析又分为关系型联机分析处理(ROLAP)、多维联机分析处理(MOLAP)两种。简单来说MOLAP需要数据预计算好为一个多维数组,典型的方式是Cube。而ROLAP就是数据本身什么样,就是什么样,通过MPP提高分布式计算能力。Druid 走的就是 MOLAP。可以快速(实时)访问大量的、很少变化的数据的系统。并被设计为,在面对代码部署、机器故障和生产系统的其他可能性问题时,依旧能 100% 地正常提供服务。二、Druid 的三个设计原则快速查询(Fast Query):部分数据的聚合(Partial Aggregate)+内存化(In-emory)+索引(Index)。水平扩展能力(Horizontal Scalability):分布式数据(Distributed Data)+ 并行化查询(Parallelizable Query)。实时分析(Realtime Analytics):不可变的过去,只追加的未来(Immutable Past,Append-Only Future)。2.1 快速查询(Fast Query)对于数据分析场景,大部分情况下,我们只关心一定粒度聚合的数据,而非每一行原始数据的细节情况。因此,数据聚合粒度可以是1 分钟、5 分钟、1 小时或1 天等。部分数据聚合(Partial Aggregate)给Druid 争取了很大的性能优化空间。数据内存化也是提高查询速度的杀手锏。内存和硬盘的访问速度相差近百倍,但内存的大小是非常有限的,因此在内存使用方面要精细设计,比如Druid 里面使用了Bitmap 和各种压缩技术。另外,为了支持Drill-Down 某些维度,Druid 维护了一些倒排索引。这种方式可以加快AND和OR 等计算操作。2.2 水平扩展能力(Horizontal Scalability)Druid 查询性能在很大程度上依赖于内存的优化使用。数据可以分布在多个节点的内存中,因此当数据增长的时候,可以通过简单增加机器的方式进行扩容。为了保持平衡,Druid按照时间范围把聚合数据进行分区处理。对于高基数的维度,只按照时间切分有时候是不够的(Druid 的每个Segment 不超过2000 万行),故Druid 还支持对Segment 进一步分区。历史Segment 数据可以保存在深度存储系统中,存储系统可以是本地磁盘、HDFS 或远程的云服务。如果某些节点出现故障,则可借助Zookeeper 协调其他节点重新构造数据。Druid 的查询模块能够感知和处理集群的状态变化,查询总是在有效的集群架构中进行。集群上的查询可以进行灵活的水平扩展。2.3 实时分析(Realtime Analytics)Druid 提供了包含基于时间维度数据的存储服务,并且任何一行数据都是历史真实发生的事件,因此在设计之初就约定事件一但进入系统,就不能再改变。对于历史数据Druid 以Segment 数据文件的方式组织,并且将它们存储到深度存储系统中,例如文件系统或亚马逊的S3 等。当需要查询这些数据的时候,Druid 再从深度存储系统中将它们装载到内存供查询使用。三、Druid 的主要特点列式存储格式。Druid 使用面向列的存储,这意味着,它只需要加载特定查询所需要的列。这为只查看几列的查询提供了巨大的速度提升。此外,每列都针对其特定的数据类型进行优化,支持快速扫描和聚合。可扩展的分布式系统。Druid 通常部署在数十到数百台服务器的集群中,并且提供数百万条/秒的摄取率,保留数百万条记录,以及亚秒级到几秒钟的查询延迟。大规模的并行处理。Druid 可以在整个集群中进行大规模的并行查询。实时或批量摄取。Druid 可以实时摄取数据(实时获取的数据可立即用于查询)或批量处理数据。自愈,自平衡,易操作。集群扩展和缩小,只需添加或删除服务器,集群将在后台自动重新平衡,无需任何停机时间。原生云、容错的架构,不会丢失数据。一旦Druid 吸收了您的数据,副本就安全地存储在深度存储中(通常是云存储、HDFS 或共享文件系统)。即使每个Druid 服务器都失败,也可以从深层存储恢复数据。对于仅影响少数Druid 服务器的更有限的故障,复制确保在系统恢复时仍然可以执行查询。用于快速过滤的索引。Druid 使用CONCISE 或Roaring 压缩位图索引来创建索引,这些索引可以快速过滤和跨多个列搜索。近似算法。Druid 包括用于近似计数、近似排序以及计算近似直方图和分位数的算法。这些算法提供了有限的内存使用,并且通常比精确计算快得多。对于准确度比速度更重要的情况,Druid 还提供精确的计数-明确和准确的排名。插入数据时自动聚合。Druid 可选地支持摄取时的数据自动汇总。预先汇总了您的数据,并且可以导致巨大的成本节约和性能提升。四、Druid 的应用场景Druid 应用最多的是 类似于广告分析创业公司MetaMarkets 中的应用场景,如广告分析、互联网广告系统监控以及网络监控等 。当业务中出现以下情况时,Druid 是一个很好的技术方案选择:需要交互式聚合和快速探究大量数据时;具有大量数据时,如每天数亿事件的新增、每天数10T 数据的增加;对数据尤其是大数据进行实时分析时;需要一个高可用、高容错、高性能数据库时。目前从 Apache Druid 官网看到有 100 多家企业都在使用,也包括了很多国内的公司,例如 BAT、字节跳动、知乎、优酷、小米、Oppo、有赞、作业帮等等,大概占到 10% 左右。4.1 主要应用点击流分析(Web 和 移动端分析)网络遥测分析(网络性能监控)服务器指标存储供应链分析(制造指标)应用程序性能指标数字营销 / 广告分析商业智能 / OLAP4.2 适合场景插入率很高,但更新并不常见大多数查询都是聚合查询和 groupBy 分组查询,包括搜索和扫描查询查询响应时间为百毫秒~几秒钟之间时序数据可能有多个表,但是每个查询仅命中其中某一个表具有高基数数据列(例如 URL,用户 ID 等),并且需要对其进行快速计数和排序要从 Kafka,HDFS,本地文件或 Amazon S3、AliyunOSS 之类的对象存储中加载数据Apache Druid 支持将 groupBy 查询的中间结果溢出存储至磁盘,以应对大查询导致的内存资源不足。通过 druid.query.groupBy.maxOnDiskStorage 配置项可以控制对应的磁盘空间大小,默认值为 0,表示不开启该特性。4.3 不合适的场景使用主键对现有记录进行低延迟更新。 Druid 支持流式插入,但不支持流式更新(使用后台批处理作业完成更新)正在构建脱机报告系统,此时查询延迟不是很重要需要进行大的联接查询(将一个大事实表连接到另一个大事实表),并且可以花很长时间来完成这些查询
文章目录一、前言二、中台的发展历程三、数据中台实践3.1 第一步:数据资源的盘点与规划3.2 第二步:数据应用规划与设计3.3 第三步:数据资产建设3.4 第四步:数据应用的详细设计与实现3.5 第五步:数据化组织规划四、总结一、前言数据中台可以说是当下非常火热的话题,在BATJ等互联网大厂大肆推广中台建设成果的当下,各个行业的企业似乎都想做数字化转型,建设业务中台,但是中台到底是啥,需要我们提前了解和学习。本文是我学习张旭老师《数据中台架构:企业数据化最佳实践》一书的总结,阅读之后初步地了解所谓的中台战略,但又还是停留在感性层次,有点浅尝辄止之感。当前的数据中台定义是宽泛的,这与数据中台目前所处的实际业务阶段相符合。书中作者张旭老师认为数据中台至少首先是一个分布式的数据仓库,同时包含相对应实施的方法论和方案,介于分布式数据仓库和企业全面数据化中间的任意一个点都可以被定义为数据中台。可以说,数据中台是实现企业全面数据化的一个解决方案,是一套支撑企业全面数据化的架构,会成为企业开展全面数据化的基础设施。但数据中台这个东西,现在业界并没有一个完整的标准定义,每个人的经验和视角也不同,因此可能一百个学习者心中会有一百个中台,这里我主要结合我关于数据中台的学习做个总结:(1)中台是什么?企业级能力复用平台!(2)如何构建中台?一句话概括:“以用户为中心,从战略入手,愿景为指引,用科学有效的方法,步步为营沉淀企业级能力,辅以必要的组织与系统架构调整,方得中台。”(3)中台的价值是啥?中台为前台而生,专注于为前台赋能,沉淀企业的能力与复用,提升企业的客户响应力。(4)如何成为数据中台的参与者?数据中台围绕数据技术开展。除了编程技术、应用开发技术与传统的IT应用技术具有部分的重合,数据中台还有自己的技术体系,比如大数据开发技术、数据仓库建模技术、数据分析体系、数据应用技术体系等。二、中台的发展历程了解一个东西,需要首先了解它的发展史,又或者说看看它的过去,这里我们就先看看中台的发展历程:2008~2015:孕育期2008年阿里巴巴开始战略调整,重复建设与烟囱架构问题出现阿里共享事业部诞生,前台系统中的公共部分开始平台化改造2015:中台战略诞生马云带领阿里高官走访芬兰游戏公司Supercell受到触动阿里巴巴正式启动中台战略“大中台、小前台”2017:横空出世互联网大厂集体发声,各自分享中台建设经验2018:全面爆发互联网大厂集体宣布组织架构调整,正式将中台推上舞台2019:迷雾仍存中台的热度越发高涨,跟进企业越来越多,但问题不降反增从2015年阿里提出了“大中台,小前台”的中台战略,提出之初阿里有近 4 亿用户,为超过 1000万各类企业提供服务,业务种类繁多,业务之间相互网状依赖。同时,阿里部门也越来越多,分工越来越细,沟通过多,相互依赖,创新成本非常高,对业务响应也越来越慢。阿里需要找到能够对外界变化快速反应,整合阿里各种基础能力,高效支撑业务创新的机制,于是“中台”的概念就出现了。如今,不管是身处浪潮一线的互联网大厂,还是传统行业的转型企业,似乎在2020年都有建设一个中台的需求(至少都在采取行动或开始学习),不管真的想进行能力沉淀复用 还是 追概念来个弯道超车,中台正在被越来越多的人熟知。三、数据中台实践以下是我阅读《数据中台架构:企业数据化最佳实践》一书的学习笔记,所有内容出自张旭老师的这本书。3.1 第一步:数据资源的盘点与规划数据化的基础是信息化或者信息化所产生的数据。这些数据本就有数据化的含义,同时这些数据又会进入数据化框架体系,继续通过计算产出更多的数据和更大的价值。所以,对企业数据资源的盘点是数据化建设的前提和基础。一份完整、准确的数据资源是后续数据化建设的有力保障。数据资源的盘点与规划需要达到以下目的:(1)对现有数据资源盘点和统计。(2)对企业可以拥有或者应该拥有的数据资源进行规划。(3)构建盘点体系并使用必要工具,保证盘点的成果能够始终与真实情况相符。3.2 第二步:数据应用规划与设计企业要基于现有的技术条件和方案,进行相对完整的数据应用规划。这个步骤可以回答如下问题。企业中有哪些数据需求我们要从业务线、业务层级到最细粒度的岗位,梳理数据需求。企业应该构建哪些数据应用我们要围绕数据需求进行数据应用的整体规划和设计。应该按照什么顺序实现这些数据应用我们要对数据应用建立评估模型,评估的维度包括数据应用是否可以实现、数据应用的业务价值、数据应用的实现成本这三个主要方面。通过评定结果,我们可以确定数据应用的实现路径。3.3 第三步:数据资产建设数据资产的建设要依托数据中台的核心产品完成。数据资产是企业数据化建设的关键基础。所有的数据化建设最后都以数据资产为基础,并且围绕这个基础展开。数据资产将是企业在全面数据化建设前期中投入最多、见效最慢的基础层模块。关于数据中台的种种探讨和争议以及妥协的很大一部分原因是这个基础建设庞大、复杂和投入高。数据资产建设的内容包括以下几个方面:技术建设(1)产品选型。产品选型包括如何选择数据中台产品、数据中台产品应该具备的功能以及技术参数指标。(2)技术架构设计。技术架构设计包括数据中台产品如何部署、如何替换传统的数据仓库或者与之并行、数据中台如何抽取当前的应用数据。标准和数据仓库模型构建(1)建模及开发规范。建模及开发规范包括数据仓库模型设计规范的制定,数据开发规范的制定,如何避免当前较为常见的数据开发混乱、难以运维的情况。(2)数据建模。数据建模包括进行数据仓库模型构建,并提交评审。数据抽取、数据开发、任务监控与运维(1)数据抽取。数据抽取包括从数据资源层抽取数据进入ODS层。(2)数据开发。数据开发包括进行数据任务开发,进行数据清洗、数据计算。(3)任务监控与运维。任务监控与运维包括监控所有数据任务,对异常和错误任务进行必要的人工干预和处理。数据质量校验数据质量校验包括对当前发现的数据质量问题进行校验和处理,推动数据治理工作开展和持续优化。数据应用支撑数据应用支撑包括为当前的数据应用开发提供支撑开发平台。3.4 第四步:数据应用的详细设计与实现不管是使用瀑布模型还是敏捷模型,数据应用的设计大体上都可以遵循传统信息化应用设计的过程和理念。数据应用中的数据开发一般在数据库或者数据仓库中完成。数据应用的内容展示可以采用BI分析工具展现,例如可视化大屏或者定制化开发应用。数据应用还可以通过API接口服务提供数据成果,让其他外部应用按需调用。数据应用的开发与传统信息化应用的开发有以下不同之处。数据应用关注数据源的内容和质量我们在数据应用实施前应该充分了解企业当前的数据源情况,包括数据种类、每种数据的具体属性、数据内容的质量等问题。大部分落地失败的数据应用,都是由数据源的各种问题引起的,比如数据缺失或者数据质量问题。复杂的数据开发需要不断调优和迭代随着机器学习、深度学习等算法的引入,数据模型的构建手段越来越丰富。但是在通常情况下,最终业务价值的产生是一个复杂的过程,不仅需要数据的支撑,还需要管理的配合。数据应用的结果数据的验证工作量占比高论证数据结果的正确与否或者评估数据应用的效果,是一项费时、费力的工作。即使相对简单的指标计算,最后也经常会占用全部过程中1/3以上的时间进行正确性验证。甚至很多算法类项目,需要提前构建成果评估模型,并首先获得甲方企业的认可,然后才能开始进行数据开发。数据应用的运维难度大因为数据中的各种异常情况往往是不可知或者意想不到的,所以数据运维需要有强大的人工保障,以保持任务的运转。数据应用的成果需要运营数据应用的开发完成只是数据发挥价值的第一步,如何让业务部门理解模型、用好数据才是后续的关键。尤其是在刚刚引入新的数据,且尚未显现业务价值的时候,企业更需要对数据进行深入运营。3.5 第五步:数据化组织规划企业数据化应该是在未来一个时期内具有企业战略高度的事情,数据化需要一个具有同等战略高度的组织负责推进。无论是从传统的IT部门转型还是由战略部门或者类似部门介入都是很好的选择。组织是保障数据中台顺利落地的一个核心,也是推动企业数据化进程的人员抓手。四、总结张旭老师在书中一个观点我是非常赞同的:“数据中台是实现企业全面数据化的一个解决方案,是一套支撑企业全面数据化的架构,会成为企业开展全面数据化的基础设施。”如果用技术语言总结就是:“前台聚合,中台解耦,数据融合,业务创新”。随着大数据和人工智能的进一步普及,几乎所有的传统企业都在拥抱并推动自身数字化转型。作为一本数据中台实践,内容基本上覆盖了企业数字化实战的方方面面,对方法论、实施路径、平台、数据应用等方面都有阐述,有着一定的借鉴价值。
文章目录一、DataWorks 简介1.1 DataWorks 的功能概述1.2 DataWorks 产品特点1.3 DataWorks 产品优势1.4 应用场景 (助力企业搭建大数据信息平台)二、DataWorks 基本概念2.1 组织与项目空间2.2 任务(Task)2.3 工作流、节点、依赖关系2.4 任务(Task)类别2.5 实例(Instance)2.6 资源与函数三、DataWorks 功能架构3.1 功能模块3.2 组织管理3.3 项目管理3.4 数据开发3.5 数据管理3.6 运维中心四、DataWorks 角色隔离4.1 DataWorks 中的角色五、DataWorks 开发流程5.1 新建项目空间5.2 添加组织成员+项目成员5.3 数据开发5.4 数据开发流程5.5 数据输入5.6 数据加工5.7 数据输出5.8 代码发布5.9 生产调度5.10 生产运维六、DataWorks 数据开发6.1 数据开发总览6.2 任务开发6.3 任务类型6.4 脚本开发6.5 函数管理6.6 发布管理6.7 导入本地文件七、DataWorks 调度配置7.1 调度周期配置7.2 调度参数配置7.3 DataWorks 中的参数功能7.4 调度依赖关系7.5 跨周期依赖八、数据管理8.1 数据管理8.2 全局概览8.3 数据表的管理操作8.4 数据权限九、DataWorks 运维管理9.1 运维管理9.2 运维有关的权限9.3 运维概览9.4 手动任务 & 周期任务9.5 监控报警十、DataWorks 项目管理10.1 项目管理综述10.2 项目配置10.3 项目成员管理10.4 调度资源管理一、DataWorks 简介DataWorks(数据工场,原大数据开发套件)是阿里云重要的PaaS平台产品,提供数据集成、数据开发、数据地图、数据质量和数据服务等全方位的产品服务,一站式开发管理的界面,帮助企业专注于数据价值的挖掘和探索。DataWorks支持多种计算和存储引擎服务,包括离线计算MaxCompute、开源大数据引擎E-MapReduce、实时计算(基于Flink)、机器学习PAI、图计算服务Graph Compute和交互式分析服务等,并且支持用户自定义接入计算和存储服务。DataWorks提供全链路智能大数据及AI开发和治理服务。1.1 DataWorks 的功能概述全面托管的调度DataWorks提供强大的调度功能,详情请参见调度配置。支持根据时间、依赖关系,进行任务触发的机制。详情请参见时间属性和依赖关系。支持每日千万级别的任务,根据DAG关系准确、准时地运行。支持分钟、小时、天、周和月多种调度周期配置。完全托管的服务,无需关心调度的服务器资源问题。提供隔离功能,确保不同租户之间的任务不会相互影响。DataWorks支持离线同步、Shell、ODPS SQL、ODPS MR等多种节点类型,通过节点之间的相互依赖,对复杂的数据进行分析处理。数据转化:依托MaxCompute强大的能力,保证了大数据的分析处理性能。数据同步:依托DataWorks中数据集成的强力支撑,支持超过20种数据源,为您提供稳定高效的数据传输功能。可视化开发DataWorks提供可视化的代码开发、工作流设计器页面,无需搭配任何开发工具,简单拖拽和开发,即可完成复杂的数据分析任务。只要有浏览器有网络,您即可随时随地进行开发工作。监控告警运维中心提供可视化的任务监控管理工具,支持以DAG图的形式展示任务运行时的全局情况,详情请参见运维中心。1.2 DataWorks 产品特点1.3 DataWorks 产品优势1.4 应用场景 (助力企业搭建大数据信息平台)二、DataWorks 基本概念2.1 组织与项目空间2.2 任务(Task)2.3 工作流、节点、依赖关系2.4 任务(Task)类别2.5 实例(Instance)说明:在阿里云大数据开发平台中,节点任务在执行时会被实例化,并以MaxCompute 实例的形式存在。实例会经历未运行、等待时间/等待资源、运行中,成功/失败几个状态。2.6 资源与函数说明:资源与函数都是 MaxCompute 的概念。三、DataWorks 功能架构3.1 功能模块3.2 组织管理3.3 项目管理3.4 数据开发3.5 数据管理3.6 运维中心四、DataWorks 角色隔离4.1 DataWorks 中的角色五、DataWorks 开发流程5.1 新建项目空间5.2 添加组织成员+项目成员5.3 数据开发5.4 数据开发流程5.5 数据输入5.6 数据加工5.7 数据输出5.8 代码发布5.9 生产调度5.10 生产运维
文章目录一、前言二、现代数据架构三、数据湖架构参考数据湖架构-流程数据湖架构-集成企业数据湖架构数据湖最核心的能力四、数据湖的作用1、数据集成能力(数据接入)2、数据存储3、数据搜索4、数据治理5、数据质量6、安全管控7、自助数据发现五、数据湖与数据仓库的区别六、数据湖成熟度的划分七、数据湖的优势一、前言数据湖的概念最早是2011年提出来的,最初数据湖是数据仓库的补充,是为了解决数据仓库漫长的开发周期,高昂的开发、维护成本,细节数据丢失等问题出现的。数据湖大多是相对于传统基于RDBMS的数据仓库,而从2011年前后,也就是数据湖概念出现的时候,很多数据仓库逐渐迁移到以Hadoop为基础的技术栈上,而且除了结构化数据,半结构化、非结构数据也逐渐的存储到数据仓库中,并提供此类服务。这样的数据仓库,已经具有了数据湖的部分功能。关于数据湖的定义,维基百科上是这样讲的:数据湖(Data Lake)是一个以原始格式存储数据的存储库或系统。它按原样存储数据,而无需事先对数据进行结构化处理。一个数据湖可以存储结构化数据(如关系型数据库中的表),半结构化数据(如CSV、日志、XML、JSON),非结构化数据(如电子邮件、文档、PDF)和二进制数据(如图形、音频、视频)。但是随着大数据技术的融合发展,数据湖不断演变,汇集了各种技术,包括数据仓库、实时和高速数据流技术、数据挖掘、深度学习、分布式存储和其他技术。逐渐发展成为一个可以存储所有结构化和非结构化任意规模数据,并可以运行不同类型的大数据工具,对数据进行大数据处理、实时分析和机器学习等操作的统一数据管理平台。二、现代数据架构三、数据湖架构参考结合目前开源的数据湖平台和组件,总结数据湖的基本参考架构如下:数据湖架构-流程数据湖架构-集成企业数据湖架构数据湖最核心的能力四、数据湖的作用1、数据集成能力(数据接入)1)接入不同数据源,包括数据库中的表(关系型或者非关系型)、各种格式的文件(csv、json、文档等)、数据流、ETL工具(Kafka、Logstash、DataX等)转换后的数据、应用API获取的数据(如日志等);2)自动生成元数据信息,确保进入数据湖的数据都有元数据;3)提供统一的接入方式,如统一的API或者接口;2、数据存储数据湖存储的数据量巨大且来源多样,数据湖应该支持异构和多样的存储,如HDFS、HBase、Hive等;3、数据搜索数据湖中拥有海量的数据,对于用户来说,明确知道数据湖中数据的位置,快速的查找到数据,是一个非常重要的功能。4、数据治理1)自动提取元数据信息,并统一存储;2)对元数据进标签和分类,建立统一的数据目录;3)建立数据血缘,梳理上下游的脉络关系,有助于数据问题定位分析、数据变更影响范围评估、数据价值评估;4)跟踪数据时间旅行,提供不同版本的数据,便于进行数据回溯和分析;5、数据质量1)对于接入的数据质量管控,提供数据字段校验、数据完整性分析等功能;2)监控数据处理任务,避免未执行完成任务生成不完备数据;6、安全管控1)对数据的使用权限进行监管;2)对敏感数据进行脱敏和加密;7、自助数据发现提供一系列数据分析工具,便于用户对数据湖的数据进行自助数据发现,包括:联合分析;交互式大数据SQL分析;机器学习BI报表…五、数据湖与数据仓库的区别数据仓库是一种具有正式架构的成熟的、安全的技术。它们存储经过全面处理的结构化数据,以便完成数据治理流程。数据仓库将数据组合为一种聚合、摘要形式,以在企业范围内使用,并在执行数据写入操作时写入元数据和模式定义。数据仓库通常拥有固定的配置;它们是高度结构化的,因此不太灵活和敏捷。数据仓库成本与在存储前处理所有数据相关,而且大容量存储的费用相对较高。相比较而言,数据湖是较新的技术,拥有不断演变的架构。它是一个集中式存储库,允许以任意规模存储所有结构化和非结构化数据。根据定义,数据湖不会接受数据治理,但专家们都认为良好的数据管理对预防数据湖转变为数据沼泽不可或缺。数据湖在数据读取期间创建模式。与数据仓库相比,数据湖缺乏结构性,而且更灵活;它们还提供了更高的敏捷性。在检索数据之前无需执行任何处理,而且数据湖特意使用了便宜的存储。根据要求,典型的组织将需要数据仓库和数据湖,因为它们可满足不同的需求和使用案例。数据仓库本质上其实就是一个优化的数据库,用于分析来自事务系统和业务线应用程序的关系数据。事先定义数据结构和 Schema 以优化快速 SQL 查询,其中结果通常用于操作报告和分析。数据经过了清理、丰富和转换,因此可以充当用户可信任的“单一信息源”。数据湖有所不同,因为它存储来自业务线应用程序的关系数据,以及来自移动应用程序、IoT 设备和社交媒体的非关系数据。捕获数据时,未定义数据结构或 Schema。这意味着您可以存储所有数据,而不需要精心设计也无需知道将来您可能需要哪些问题的答案。可以对数据使用不同类型的分析(如 SQL 查询、大数据分析、全文搜索、实时分析和机器学习)来获得见解。随着使用数据仓库的组织看到数据湖的优势,他们正在改进其仓库以包括数据湖,并启用各种查询功能、数据科学使用案例和用于发现新信息模型的高级功能。总结:1)数仓中保存的都是结构化处理后的数据,而数据湖中可以保存原始数据也可以保存结构化处理后的数据,保证用户能获取到各个阶段的数据。因为数据的价值跟不同的业务和用户强相关,有可能对于A用户没有意义的数据,但是对于B用户来说意义巨大,所以都需要保存在数据湖中。2)数据湖能够支持各种用户使用,包括数据科学家这类专业的数据人员。3) 数据仓库的数据进入这个池之前是预先分类的,这可以指导其后面如何进行数据的分析。但在大数据时代,这些都是素材而已,你根本不知道以后如何用它,而数据湖给后面的数据分析带来了更大的弹性。因此,这个放大数据的仓库,专家建议叫数据湖,以区别于数据仓库。六、数据湖成熟度的划分第一级是在没有用Hadoop之前,这个时候各个大型应用都有自己的数据库,也有自己的数据仓库来做数据分析。第二级是企业引入了Hadoop。企业的应用数据和Hadoop有交互。第三级是数据湖的成长期。新的系统直接支持Hadoop,Hadoop成为缺省配置,而数据仓库只在某些特定场景下使用,外部的数据也引入数据湖中。第四阶段就是数据湖和应用云阶段。Hadoop大量采用,并且加强其可靠性、安全性。看来,近两年数据湖是构造企业差异化竞争的很好思路,而Hadoop是目前流行的实现手段。但是,之前为什么用Hadoop呢?因为在当前,Hadoop是实现数据湖的最常用技术手段,但以后也许有更好的方式。也就是说,数据湖是一个概念,而Hadoop是实现这个概念的技术手段。七、数据湖的优势轻松地收集和摄入数据:企业中的所有数据源都可以送入数据湖中。因此,数据湖成为了存储在企业内部服务器或云服务器中的结构化和非结构化数据的无缝访问点。通过数据分析工具可以轻松地获得整个无孤岛的数据集合。此外,数据湖可以用多种文件格式存储多种格式的数据,比如文本、音频、视频和图像。这种灵活性简化了旧有数据存储的集成。支持实时数据源:数据湖支持对实时和高速数据流执行 ETL 功能,这有助于将来自 IoT 设备的传感器数据与其他数据源一起融合到数据湖中。更快地准备数据:分析师和数据科学家不需要花时间直接访问多个来源,可以更轻松地搜索、查找和访问数据,这加速了数据准备和重用流程。数据湖还会跟踪和确认数据血统,这有助于确保数据值得信任,还会快速生成可用于数据驱动的决策的 BI。更好的可扩展性和敏捷性:数据湖可以利用分布式文件系统来存储数据,因此具有很高的扩展能力。开源技术的使用还降低了存储成本。数据湖的结构没那么严格,因此天生具有更高的灵活性,从而提高了敏捷性。数据科学家可以在数据湖内创建沙箱来开发和测试新的分析模型。具有人工智能的高级分析:访问原始数据,创建沙箱的能力,以及重新配置的灵活性,这些使得数据湖成为了一个快速开发和使用高级分析模型的强大平台。数据湖非常适合使用机器学习和深度学习来执行各种任务,比如数据挖掘和数据分析,以及提取非结构化数据。
文章目录一、NiFi 安装二、NiFi 的简单使用一、NiFi 安装环境要求:a、需要Java 8或更高版本b、支持的操作系统:Linux、Unix、Windows、Mac OS X1、下载安装包命令:wget -b http://mirror.bit.edu.cn/apache/nifi/1.8.0/nifi-1.8.0-bin.tar.gz2、解压安装包、即可使用命令:tar -zxvf nifi-1.8.0-bin.tar.gz目录如下:3、配置文件( nifi-1.8.0/conf/nifi.properties )、可以使用默认配置,根据自己情况进行修改4、操作NIFI,启动的时候,比较慢,注意机器内存是否足够后台启动命令:./bin/nifi.sh start前端启动命令:./bin/nifi.sh run关闭命令:./bin/nifi.sh stop首次启动NiFi时,会创建以下文件和目录:content_repositorydatabase_repositoryflowfile_repositoryprovenance_repositorywork 目录logs 目录在conf目录中,将创建flow.xml.gz文件5、启动后,使用浏览器进行访问,地址:http://ip:8080/nifi二、NiFi 的简单使用不理解NIFI是做什么的,看一个简单的例子(同步文件夹)吧,帮助理解。1、从工具栏中拖入一个Processor,在弹出面板中搜索GetFIle,然后确认。2、配置GetFile,设置结束关系、输入目录、保留源文件,其他设置可以不动,输入目录中有文件:file.txt(内容为abc)。3、从工具栏中拖入一个Processor,在弹出面板中搜索PutFIle,然后确认,如第一步。4、配置PutFile,设置结束关系、输出目录,其他设置可以不动,输出目录为空文件夹。5、将GetFIle与PutFIle关联起来,从GetFIle中心点击,拖拉到PutFIle上。6、右键启动GetFIle与PutFIle,可以看到结果,输入目录中的文件同步到,输出目录中了。注意:操作过程中,注意错误排查1、Processor上的警告2、Processor上的错误
文章目录一、简介二、NiFi 核心概念三、设计模型四、NiFi 架构五、NiFi 的性能期望与特点六、NiFi 功能的高级概述一、简介Apache NiFi 是一个易于使用、功能强大而且可靠的数据拉取、数据处理和分发系统,用于自动化管理系统间的数据流。它支持高度可配置的指示图的数据路由、转换和系统中介逻辑,支持从多种数据源动态拉取数据。NiFi原来是NSA(National Security Agency [美国国家安全局])的一个项目,目前已经代码开源,是Apache基金会的顶级项目之一NiFi基于Web方式工作,后台在服务器上进行调度。用户可以为数据处理定义为一个流程,然后进行处理,后台具有数据处理引擎、任务调度等组件。二、NiFi 核心概念Nifi 的设计理念接近于基于流的编程 Flow Based Programming。FlowFile:表示通过系统移动的每个对象,包含数据流的基本属性FlowFile Processor(处理器):负责实际对数据流执行工作Connection(连接线):负责不同处理器之间的连接,是数据的有界缓冲区Flow Controller(流量控制器):管理进程使用的线程及其分配Process Group(过程组):进程组是一组特定的进程及其连接,允许组合其他组件创建新组件三、设计模型什么是SEDA(Staged Event-Driven Architecture)?核心思想是把一个请求处理过程分成几个Stage,不同资源消耗的Stage使用不同数量的线程来处理,Stage间使用事件驱动的异步通信模式。Nifi的设计模型类似SEDA, 其提供了以下优势:适用于视觉创建和管理处理器的有向图本质上是异步的,即使在处理和流量波动时也允许非常高的吞吐量和自然缓冲提供高度并发的模型,而开发人员不必担心并发性的典型复杂性促进发展粘性和松散耦合的部件,然后可以在其他情况下重复使用,并促进可测试的部件资源受限的连接使关键功能(如背压和压力释放)非常自然和直观错误处理变得与基本逻辑一样自然,而不是粗粒度的一网打尽数据进出系统的点以及流程如何被很好的理解和易于跟踪四、NiFi 架构Nifi是在主机操作系统上的JVM内执行。JVM上的Nifi主要组件如下:网络服务器Web服务器的目的是托管NiFi的基于HTTP的命令和控制API。流控制器流控制器是操作的大脑。它提供用于扩展程序运行的线程,并管理扩展程序接收资源以执行的时间表。扩展有各种类型的NiFi扩展在其他文档中描述。这里的关键是扩展在JVM中运行和执行。FlowFile存储库FlowFile存储库是NiFi跟踪目前在流程中活动的给定FlowFile的知识状态。存储库的实现是可插拔的。默认方法是位于指定磁盘分区上的持久写入前端日志。内容存储库Content Repository是给定FlowFile的实际内容字节。存储库的实现是可插拔的。默认方法是一个相当简单的机制,它将数据块存储在文件系统中。可以指定多个文件系统存储位置,以便获得不同的物理分区,以减少任何单个卷上的争用。源头存储库Provenance Repository是存储所有来源的事件数据的地方。存储库构造是可插入的,默认实现是使用一个或多个物理磁盘卷。在每个位置内,事件数据被索引和可搜索。NiFi还能够在集群内运行。从NiFi 1.0版本开始,采用零主分类范例。NiFi集群中的每个节点对数据执行相同的任务,但是每个节点都在不同的数据集上进行操作。Apache ZooKeeper选择单个节点作为群集协调器,故障转移由ZooKeeper自动处理。所有群集节点向群集协调器报告心跳和状态信息。集群协调器负责断开连接节点。此外,每个集群都有一个主节点,也由ZooKeeper选择。作为DataFlow管理者,可以通过任何节点的用户界面(UI)与NiFi集群进行交互。操作者的任何更改都会被复制到群集中的所有节点,从而允许多个入口点。五、NiFi 的性能期望与特点NiFi旨在充分利用其正在运行的底层主机系统的功能。对于CPU和磁盘,资源最大化特别强。有关其他详细信息。对于IO可以看到的吞吐量或延迟可能会有很大的不同,这取决于系统的配置方式。考虑到大多数主要的NiFi子系统都有可插拔的方法,性能取决于实现。但是,对于具体和广泛适用的内容,请考虑开箱即用的默认实现。保守的,假设典型服务器中适度的磁盘或RAID卷上的读/写速率大约为每秒50 MB。对于大量数据流,NiFi应该能够有效地达到每秒100 MB或更多的吞吐量。由于物理分区和内容存储库都预期添加到NiFi的线性增长, 可能会导致FlowFile存储库和来源存储库的某个时间点出现瓶颈。Nifi 计划提供一个基准测试和性能测试模板,以包含在构建中,从而允许用户轻松测试他们的系统,并确定瓶颈在哪里,以及哪些可能成为一个因素。此模板还可使系统管理员轻松进行更改并验证其影响。对于CPU流控制器用作引擎,指定特定处理器何时被执行线程。写处理器一经执行任务就会立即返回线程。流控制器可以给出一个配置值,指示其维护的各种线程池的可用线程。使用的理想线程数取决于主机系统资源的核心数量,无论该系统是否运行其他服务,以及流程中的处理特性。对于典型的IO大流量,可以使数十条线程可用。对于RAMNiFi存在于JVM中,因此受限于由JVM提供的内存空间。JVM垃圾收集成为限制实际堆大小的一个非常重要的因素,可以通过调整GC优化应用程序运行时间。当定期阅读相同的内容时,NiFi工作可能是I / O密集型的。配置足够大的磁盘以优化性能。六、NiFi 功能的高级概述流量管理保证交货NiFi的核心理念是,即使在非常高的规模,必须保证交付。数据缓冲背压和压力释放NiFi支持对所有排队的数据进行缓冲,以及当队列达到指定限制时提供背压的能力,或者在数据达到指定年龄时使其老化(其值已经消失)的能力。优先排队NiFi允许设置一个或多个优先级排序方案来了解如何从队列中检索数据。默认值是最早的,但有时候数据应该被拉到最新,最大的第一个或其他一些自定义方案。流特定Qos(延迟 vs 吞吐量, 丢失容限等)数据流的一些点数据非常关键,并且是不容忍的。还有一段时间,它必须在几秒钟内被处理和交付成为任何价值。NiFi使得细粒度流特定配置这些问题。使用方便视觉指挥与控制数据流可能变得相当复杂。能够可视化这些流程并在视觉上表达它们可以大大减少复杂性并确定需要简化的领域。NiFi不仅可以直观地建立数据流,而且可以实时地实现。而不是设计和部署它更像是成型工具。如果对更改的数据流进行更改立即生效。更改是细粒度的,并且与受影响的组件隔离。不需要为了进行一些具体的修改而停止整个流程。流模板数据流往往是高度模式化的,而通常有许多不同的方式来解决问题。模板允许主题专家构建和发布他们的流程设计,并为其他人创造和合作。资料来源Nifi自动记录,索引并提供可用的来源数据,因为对象即使在扇入,扇出,转换等过程中也可以流经系统。该信息在支持合规性,故障排除,优化和其他场景方面变得非常重要。恢复/记录细粒历史的滚动缓冲区NiFi的内容存储库旨在作为历史的滚动缓冲区。只有当数据从内容存储库中老化或者需要空间时才会被删除。这与数据来源功能相结合,使得在对象的生命周期中甚至跨越世代的特定点上实现点击内容,内容下载和重放非常有用的基础。安全系统到系统数据流中每一点的NiFi都可以通过使用诸如双向SSL等加密协议提供安全交换。此外,NiFi使得流可以加密和解密内容,并使用发件人/收件人方程的任一侧上的共享密钥或其他机制。用户到系统NiFi支持双向SSL身份验证,并提供可插拔授权,从而可以正确控制用户的访问和特定级别(只读,数据流管理器,管理员)。如果用户在流程中输入密码等敏感属性,则立即加密服务器端,即使在加密形式下也不会再次暴露在客户端。多租户授权给定数据流的权限级别适用于每个组件,允许管理员用户具有细粒度的访问控制。这意味着每个NiFi集群都能够处理一个或多个组织的要求。与独立拓扑相比,多租户授权可实现数据流管理的自助服务模式,从而允许每个团队或组织对流程进行管理,同时充分了解流程的其他部分,无法访问。可扩展架构延期NiFi的核心是扩展的核心,因此它是数据流处理可以以可预测和可重复的方式执行和交互的平台。扩展点包括:处理器,控制器服务,报告任务,优先级和客户用户界面。分类器隔离对于任何基于组件的系统,可能会迅速发生依赖问题。NiFi通过提供自定义类加载器模型来解决这个问题,确保每个扩展捆绑包都暴露在非常有限的依赖关系中。因此,可以构建扩展,而不用担心它们是否可能与另一个扩展冲突。这些扩展束的概念称为NiFi Archives, 在开发人员指南中有更详细的讨论。站点到站点通信协议NiFi实例之间的首选通信协议是NiFi站点到站点(S2S)协议。S2S可以方便,高效,安全地将数据从一个NiFi实例传输到另一个。NiFi客户端库可以轻松构建并捆绑到其他应用程序或设备中,以通过S2S与NiFi通信。S2S中都支持基于套接字的协议和HTTP(S)协议作为底层传输协议,从而可以将代理服务器嵌入到S2S通信中。灵活的缩放模型横向扩展(聚类)NiFi旨在通过如上所述将多个节点聚类在一起使用来展开。如果单个节点被配置并配置为每秒处理数百MB,则可以配置适度的集群来处理每秒的GB数。这将带来NiFi与获取数据的系统之间的负载平衡和故障转移的挑战。可以使用基于异步的消息队列, 如Kafka来解决问题。使用NiFi的站点到站点功能也非常有效,因为它是允许NiFi和客户端(包括另一个NiFi集群)相互通话,共享关于加载的信息以及在特定授权端口上交换数据的协议。放大与缩小NiFi也被设计成以非常灵活的方式进行放大和缩小。在从NiFi框架的角度增加吞吐量方面,可以在配置时增加“计划”选项卡下的处理器上的并发任务数量。这允许更多的进程同时执行,提供更大的吞吐量。可以将NiFi完美地缩放到适合于在硬件资源有限的边缘设备上运行,因为需要较小的占用空间。
通常在大数据开发的过程中,我们会经常遇见小文件过多的情况,对查询和运算的性能都会有一定的影响,那么这篇文章将会帮助大家解决 hive 中小文件过多的问题 😎文章目录一、哪里会产生小文件 ?二、影响三、解决方法方法一:通过调整参数进行合并方法二:使用 distribute by rand() 将数据随机分配给 reduce方法三:使用 sequencefile 作为表存储格式,不要用 textfile,在一定程度上可以减少小文件方法四:使用hadoop的archive归档补充:hadoop自带的三种小文件处理方案一、哪里会产生小文件 ?源数据本身有很多小文件动态分区会产生大量小文件reduce个数越多, 小文件越多按分区插入数据的时候会产生大量的小文件, 文件个数 = maptask个数 * 分区数二、影响从Hive的角度看,小文件会开很多map,一个map开一个JVM去执行,所以这些任务的初始化,启动,执行会浪费大量的资源,严重影响性能。HDFS存储太多小文件, 会导致namenode元数据特别大, 占用太多内存, 制约了集群的扩展。三、解决方法方法一:通过调整参数进行合并#每个Map最大输入大小(这个值决定了合并后文件的数量) set mapred.max.split.size=256000000; #一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并) set mapred.min.split.size.per.node=100000000; #一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并) set mapred.min.split.size.per.rack=100000000; #执行Map前进行小文件合并 set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; #===设置map输出和reduce输出进行合并的相关参数: #设置map端输出进行合并,默认为true set hive.merge.mapfiles = true #设置reduce端输出进行合并,默认为false set hive.merge.mapredfiles = true #设置合并文件的大小 set hive.merge.size.per.task = 256*1000*1000 #当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge。 set hive.merge.smallfiles.avgsize=16000000方法二:使用 distribute by rand() 将数据随机分配给 reduce针对按分区插入数据的时候产生大量的小文件的问题, 可以使用DISTRIBUTE BY rand() 将数据随机分配给Reduce,这样可以使得每个Reduce处理的数据大体一致。# 设置每个reducer处理的大小为5个G set hive.exec.reducers.bytes.per.reducer=5120000000; # 使用distribute by rand()将数据随机分配给reduce, 避免出现有的文件特别大, 有的文件特别小 insert overwrite table test partition(dt) select * from iteblog_tmp DISTRIBUTE BY rand();方法三:使用 sequencefile 作为表存储格式,不要用 textfile,在一定程度上可以减少小文件方法四:使用hadoop的archive归档#用来控制归档是否可用 set hive.archive.enabled=true; #通知Hive在创建归档时是否可以设置父目录 set hive.archive.har.parentdir.settable=true; #控制需要归档文件的大小 set har.partfile.size=1099511627776; #使用以下命令进行归档 ALTER TABLE srcpart ARCHIVE PARTITION(ds='2008-04-08', hr='12'); #对已归档的分区恢复为原文件 ALTER TABLE srcpart UNARCHIVE PARTITION(ds='2008-04-08', hr='12'); #::注意,归档的分区不能够INSERT OVERWRITE,必须先unarchive补充:hadoop自带的三种小文件处理方案Hadoop ArchiveHadoop Archive或者HAR,是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样在减少namenode内存使用的同时,仍然允许对文件进行透明的访问。Sequence filesequence file由一系列的二进制key/value组成,如果为key小文件名,value为文件内容,则可以将大批小文件合并成一个大文件。CombineFileInputFormat它是一种新的inputformat,用于将多个文件合并成一个单独的split,另外,它会考虑数据的存储位置。
在Kafka Version为0.11.0.0之后,Consumer的Offset信息不再默认保存在Zookeeper上,而是选择用Topic的形式保存下来。在命令行中可以使用kafka-consumer-groups的脚本实现Offset的相关操作。更新Offset的三个维度:Topic的作用域重置策略执行方案Topic的作用域--all-topics:为consumer group下所有topic的所有分区调整位移)--topic t1 --topic t2:为指定的若干个topic的所有分区调整位移--topic t1:0,1,2:为指定的topic分区调整位移重置策略--to-earliest:把位移调整到分区当前最小位移--to-latest:把位移调整到分区当前最新位移--to-current:把位移调整到分区当前位移--to-offset <offset>: 把位移调整到指定位移处--shift-by N: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动--to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000--by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S--from-file <file>:从CSV文件中读取调整策略执行方案什么参数都不加:只是打印出位移调整方案,不具体执行--execute:执行真正的位移调整--export:把位移调整方案按照CSV格式打印,方便用户成csv文件,供后续直接使用注意事项1.consumer group状态必须是inactive的,即不能是处于正在工作中的状态2.不加执行方案,默认是只做打印操作常用示例1.更新到当前group最初的offset位置bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-earliest --execute2.更新到指定的offset位置bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset 500000 --execute3.更新到当前offset位置(解决offset的异常)bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-current --execute4.offset位置按设置的值进行位移bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --shift-by -100000 --execute5.offset设置到指定时刻开始bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --t
文章目录一、前言二、往hive表中导入数据三、通过查询insert ....select的形式往hive中导入数据四、使用create....as 语句往hive表里装载数据五、从hive表里导出数据到文件系统一、前言往hive数据仓库的表中导入数据,可以直接insert ,也可以选择load方式。当然也可以通过第三方工具如sqoop等将数据导入到hive当初。特别注意:hive虽然不会验证用户装载的数据和表的模式是否匹配,但是hive会验证文件的存储格式和hive定义的表结构的存储格式是否一致。比如将文本文件装载到sequencefile表中则报错。二、往hive表中导入数据load data 导入数据到hive中,这种情况适合提供了外部数据文件,然后将其导入hive仓库的表中。(这种方式其实使用的不是太多,大厂的数据源一般都是数据库中数据,直接定时任务抽取即可,除非外包数据以文件形式提供)1.将本地数据文件导入到hive非分区表中,如下文件可以是个目录,会导入目录中所有的文件 load data local inpath '/home/robot/' overwrite into table fdm_sor.personinfo 2.将本地数据文件导入到hive分区表中 load data local inpath '/home/robot/' overwrite into table fdm_sor.personinfo partition(country='china',city='nanjing') 注意: 1.inpath里只要填目录即可,不用具体到文件,会加载目录下所有问题,但该目录下不能再有子目录,否则报错。 2.overwrite 可以不加,加的话会将表中所有数据覆盖掉(分区表只覆盖当前分区数据),into talbe 将数据追加到表中。 3.into talbe 如果表里数据已经存在了,会再次到导入,底层文件存储会给同文件名加序列号然后存储。 3.将分布式文件系统上的数据导入的hive中,比如讲hdfs上数据导入到hive中 load data inpath '/user/robot/' overwrite into table fdm_sor.personinfo 注意:去掉local,则默认的路径是分布式文件系统上的路径,如hdfs上的路径。总结:overwrite覆盖的原理,是先删除数据,然后再写入数据。如果开了trash回收站功能,可以在回收站查看到回收的数据。三、通过查询insert …select的形式往hive中导入数据使用insert子句将查询结果插入表中,这是开发中往表里导入数据最常用的方式之一,主要用来项目开发中使用,多表关联计算等操作。1.通过查询将数据覆盖导入的分区表中(或者用into追加结果,往动态分区表中插入数据,请参考本系列其他博客。) insert overwrite table fdm_sor.personinfo partition(statis_date='${staits_date}' select a.id,a.name,b.address from person a left join address b on a.id = b.id 2.多次插入,从一张表中读数据,下面这种方式效率最高,只需要扫描一次表即可。注意中间没有分号; from T_DEDUCT_SIGN_D_external t insert into table t1 select 123 ,sign_no string,null insert into table t2 select 345 ,null ,bp_no string insert into table t3 select 678 ,sign_no string,bp_no string where t.statis_date = '20180101';注意:使用,insert…select 往表中导入数据时,查询的字段个数必须和目标的字段个数相同,不能多,也不能少,否则会报错。但是如果字段的类型不一致的话,则会使用null值填充,不会报错。而使用load data形式往hive表中装载数据时,则不会检查。如果字段多了则会丢弃,少了则会null值填充。同样如果字段类型不一致,也是使用null值填充。四、使用create…as 语句往hive表里装载数据hive (fdm_sor)> create table mytest_createas > as select id ,name from mytest_tmp2_p where country='china' and city='beijing'; 注意:使用create... as 创建的表,表的存储属性是默认的textfile,serde也是默认的lazyserde.同时表没有分区.如果对表的结构有要求, 比如我们公司sor要求使用rcfile存储,则不能使用create ..as创建表,并且加载数据。 2.如果多次操作需要取同一个表中数据,可以优化如下,将from放到最前面,这样只扫描一次表即可完成。 from tu_trade t insert overwrite table credit partition(statis_date='201805') select * where t.statis_date ='201805' insert overwrite table credit partition(statis_date='201804') select * where t.statis_date ='201804' ....... insert overwrite table credit partition(statis_date='201704') select * where t.statis_date ='201704'五、从hive表里导出数据到文件系统不管数据在hive中如何存储,hive会将所有内容以字符串的形式序列化到文件里。但是要注意的是hive将数据序列化成文件的时候,文件的编码格式和hive里的一致。比如,hive中存储格式为sequencefile,则序列化的数据文件编码也是二进制格式,如果hive中列分隔符是默认的,则序列化文件也是默认的^A(不可视)的分隔符。所以为了序列化后文件可读性,一般要将需要导出的数据在hive中的编码格式改成textfile,分隔符比如为逗号等等(可以通过使用临时表)。注意导出数据只要insert overwrite没有insert into 所以很容易造成数据覆盖丢失。1.使用insert …overwrite…directory方式导出数据到本地或者分布式文件系统上标准语法格式: INSERT OVERWRITE [LOCAL] DIRECTORY directory1 [ROW FORMAT row_format] [STORED AS file_format] (Note: Only available starting with Hive 0.11.0) SELECT ... FROM ... 多次多出语法格式: FROM from_statement INSERT OVERWRITE [LOCAL] DIRECTORY directory1 select_statement1 [INSERT OVERWRITE [LOCAL] DIRECTORY directory2 select_statement2] ... 案例演示: hive (fdm_sor)> insert overwrite local directory '/home/robot/mydata/111' > select * from mytest_tmp2_p where country='china'; 注意:1.不加local则将数据导出到分布式文件系统上,比如hdfs.加了local则默认为本地,如linux上。 2.overwrite会将目录下的内容覆盖掉,尤其是如果当前目录下有数据,会丢失。但是这里没有into的用法。 3.如果导出的目录,不存在,则会重新创建。 4.注意导出产生的文件个数取决于计算过程中reducers个数。2.如果对表里的数据全部需要的话,因为hive的数据存储在hdfs上,可以直接通过hadoop命令-cp从该表的存储位置上将数据文件下载下来。这是最快的方式。3.通过sqoop等工具导出数据,具体参考sqoop篇章。
前言数据类型转换这个在任何语言框架中都会涉及到,看起来非常简单,不过要把所有的数据类型都掌握还是需要一定的时间历练。SparkSQL数据类型数字类型ByteType:代表一个字节的整数。范围是-128到127ShortType:代表两个字节的整数。范围是-32768到32767IntegerType:代表4个字节的整数。范围是-2147483648到2147483647LongType:代表8个字节的整数。范围是-9223372036854775808到9223372036854775807FloatType:代表4字节的单精度浮点数DoubleType:代表8字节的双精度浮点数DecimalType:代表任意精度的10进制数据。通过内部的java.math.BigDecimal支持。BigDecimal由一个任意精度的整型非标度值和一个32位整数组成StringType:代表一个字符串值BinaryType:代表一个byte序列值BooleanType:代表boolean值Datetime类型TimestampType:代表包含字段年,月,日,时,分,秒的值DateType:代表包含字段年,月,日的值复杂类型ArrayType(elementType, containsNull):代表由elementType类型元素组成的序列值。containsNull用来指明ArrayType中的值是否有null值MapType(keyType, valueType, valueContainsNull):表示包括一组键 - 值对的值。通过keyType表示key数据的类型,通过valueType表示value数据的类型。valueContainsNull用来指明MapType中的值是否有null值StructType(fields):表示一个拥有StructFields (fields)序列结构的值StructField(name, dataType, nullable):代表StructType中的一个字段,字段的名字通过name指定,dataType指定field的数据类型,nullable表示字段的值是否有null值。Spark SQL数据类型和Scala数据类型对比Spark SQL数据类型转换案例一句话描述:调用Column类的cast方法如何获取Column类这个之前写过df("columnName") // On a specific `df` DataFrame. col("columnName") // A generic column not yet associated with a DataFrame. col("columnName.field") // Extracting a struct field col("`a.column.with.dots`") // Escape `.` in column names. $"columnName" // Scala short hand for a named column.测试数据准备1,tom,23 2,jack,24 3,lily,18 4,lucy,19spark入口代码val spark = SparkSession .builder() .appName("test") .master("local[*]") .getOrCreate()测试默认数据类型spark.read. textFile("./data/user") .map(_.split(",")) .map(x => (x(0), x(1), x(2))) .toDF("id", "name", "age") .dtypes .foreach(println)结果:(id,StringType) (name,StringType) (age,StringType)说明默认都是StringType类型把数值型的列转为IntegerTypeimport spark.implicits._ spark.read. textFile("./data/user") .map(_.split(",")) .map(x => (x(0), x(1), x(2))) .toDF("id", "name", "age") .select($"id".cast("int"), $"name", $"age".cast("int")) .dtypes .foreach(println)结果:(id,IntegerType) (name,StringType) (age,IntegerType)Column类cast方法的两种重载第一种def cast(to: String): ColumnCasts the column to a different data type, using the canonical string representation of the type. The supported types are:string, boolean, byte, short, int, long, float, double, decimal, date, timestamp.// Casts colA to integer. df.select(df("colA").cast("int")) Since 1.3.0第二种def cast(to: DataType): ColumnCasts the column to a different data type.// Casts colA to IntegerType. import org.apache.spark.sql.types.IntegerType df.select(df("colA").cast(IntegerType)) // equivalent to df.select(df("colA").cast("int"))
spark dataframe派生于RDD类,但是提供了非常强大的数据操作功能。当然主要对类SQL的支持。在实际工作中会遇到这样的情况,主要是会进行两个数据集的筛选、合并,重新入库。首先加载数据集,然后在提取数据集的前几行过程中,才找到limit的函数。而合并就用到union函数,重新入库,就是registerTemple注册成表,再进行写入到HIVE中。不得不赞叹dataframe的强大。具体示例:对文件中某一列的空值进行统计过滤,并实现一行数据映射为多行scala> uf.filter(col("friends").isNotNull).select(col("user"),col("friends")).show(3) +----------+--------------------+ | user| friends| +----------+--------------------+ |3197468391|1346449342 387324...| |3537982273|1491560444 395798...| | 823183725|1484954627 195038...| +----------+--------------------+ only showing top 3 rows scala> uf.filter(col("friends").isNotNull).select(col("user"),col("friends")).withColumn("friends",explode(split(col("friends")," "))).show(5) +----------+----------+ | user| friends| +----------+----------+ |3197468391|1346449342| |3197468391|3873244116| |3197468391|4226080662| |3197468391|1222907620| |3197468391| 547730952| +----------+----------+ only showing top 5 rows最后附上dataframe的一些操作及用法:DataFrame 的函数Action 操作1、 collect() ,返回值是一个数组,返回dataframe集合所有的行2、 collectAsList() 返回值是一个java类型的数组,返回dataframe集合所有的行3、 count() 返回一个number类型的,返回dataframe集合的行数4、 describe(cols: String*) 返回一个通过数学计算的类表值(count, mean, stddev, min, and max),这个可以传多个参数,中间用逗号分隔,如果有字段为空,那么不参与运算,只这对数值类型的字段。例如df.describe(“age”, “height”).show()5、 first() 返回第一行 ,类型是row类型6、 head() 返回第一行 ,类型是row类型7、 head(n:Int)返回n行 ,类型是row 类型8、 show()返回dataframe集合的值 默认是20行,返回类型是unit9、 show(n:Int)返回n行,,返回值类型是unit10、 table(n:Int) 返回n行 ,类型是row 类型dataframe的基本操作1、 cache()同步数据的内存2、 columns 返回一个string类型的数组,返回值是所有列的名字3、 dtypes返回一个string类型的二维数组,返回值是所有列的名字以及类型4、 explan()打印执行计划 物理的5、 explain(n:Boolean) 输入值为 false 或者true ,返回值是unit 默认是false ,如果输入true 将会打印 逻辑的和物理的6、 isLocal 返回值是Boolean类型,如果允许模式是local返回true 否则返回false7、 persist(newlevel:StorageLevel) 返回一个dataframe.this.type 输入存储模型类型8、 printSchema() 打印出字段名称和类型 按照树状结构来打印9、 registerTempTable(tablename:String) 返回Unit ,将df的对象只放在一张表里面,这个表随着对象的删除而删除了10、 schema 返回structType 类型,将字段名称和类型按照结构体类型返回11、 toDF()返回一个新的dataframe类型的12、 toDF(colnames:String*)将参数中的几个字段返回一个新的dataframe类型的,13、 unpersist() 返回dataframe.this.type 类型,去除模式中的数据14、 unpersist(blocking:Boolean)返回dataframe.this.type类型 true 和unpersist是一样的作用false 是去除RDD集成查询:1、 agg(expers:column*) 返回dataframe类型 ,同数学计算求值df.agg(max(“age”), avg(“salary”))df.groupBy().agg(max(“age”), avg(“salary”))2、 agg(exprs: Map[String, String]) 返回dataframe类型 ,同数学计算求值 map类型的df.agg(Map(“age” -> “max”, “salary” -> “avg”))df.groupBy().agg(Map(“age” -> “max”, “salary” -> “avg”))3、 agg(aggExpr: (String, String), aggExprs: (String, String)) 返回dataframe类型 ,同数学计算求值df.agg(Map(“age” -> “max”, “salary” -> “avg”))df.groupBy().agg(Map(“age” -> “max”, “salary” -> “avg”))4、 apply(colName: String) 返回column类型,捕获输入进去列的对象5、 as(alias: String) 返回一个新的dataframe类型,就是原来的一个别名6、 col(colName: String) 返回column类型,捕获输入进去列的对象7、 cube(col1: String, cols: String) 返回一个GroupedData类型,根据某些字段来汇总8、 distinct 去重 返回一个dataframe类型9、 drop(col: Column) 删除某列 返回dataframe类型10、 dropDuplicates(colNames: Array[String]) 删除相同的列 返回一个dataframe11、 except(other: DataFrame) 返回一个dataframe,返回在当前集合存在的在其他集合不存在的12、 explode[A, B](inputColumn: String, outputColumn: String)(f: (A) ⇒ TraversableOnce[B])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[B]) 返回值是dataframe类型,这个 将一个字段进行更多行的拆分df.explode(“name”,“names”) {name :String=> name.split(" ")}.show();将name字段根据空格来拆分,拆分的字段放在names里面13、 filter(conditionExpr: String): 刷选部分数据,返回dataframe类型 df.filter(“age>10”).show(); df.filter(df(“age”)>10).show(); df.where(df(“age”)>10).show(); 都可以14、 groupBy(col1: String, cols: String*) 根据某写字段来汇总返回groupedate类型 df.groupBy(“age”).agg(Map(“age” ->“count”)).show();df.groupBy(“age”).avg().show();都可以15、 intersect(other: DataFrame) 返回一个dataframe,在2个dataframe都存在的元素16、 join(right: DataFrame, joinExprs: Column, joinType: String)一个是关联的dataframe,第二个关联的条件,第三个关联的类型:inner, outer, left_outer, right_outer, leftsemidf.join(ds,df(“name”)===ds(“name”) and df(“age”)===ds(“age”),“outer”).show();17、 limit(n: Int) 返回dataframe类型 去n 条数据出来18、 na: DataFrameNaFunctions ,可以调用dataframenafunctions的功能区做过滤 df.na.drop().show(); 删除为空的行19、 orderBy(sortExprs: Column*) 做alise排序20、 select(cols:string*) dataframe 做字段的刷选 df.select($“colA”, $“colB” + 1)21、 selectExpr(exprs: String*) 做字段的刷选 df.selectExpr(“name”,“name as names”,“upper(name)”,“age+1”).show();22、 sort(sortExprs: Column*) 排序 df.sort(df(“age”).desc).show(); 默认是asc23、 unionAll(other:Dataframe) 合并 df.unionAll(ds).show();24、 withColumnRenamed(existingName: String, newName: String) 修改列表 df.withColumnRenamed(“name”,“names”).show();25、 withColumn(colName: String, col: Column) 增加一列 df.withColumn(“aa”,df(“name”)).show();
文章目录一、前言二、利用ImportTsv将csv文件导入到HBase三、利用completebulkload将数据导入到HBase四、利用Import将数据导入到HBase一、前言HBase作为Hadoop DataBase,除了使用put进行数据导入之外,还有以下几种导入数据的方式:(1)使用importTsv功能将csv文件导入HBase;(2)使用import功能,将数据导入HBase;(3)使用BulkLoad功能将数据导入HBase。二、利用ImportTsv将csv文件导入到HBase命令:格式:hbase [类] [分隔符] [行键,列族] [表] [导入文件] bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator="," -Dimporttsv.columns=HBASE_ROW_KEY,cf hbase-tb1-001 /simple.csvsimple.csv内容如下:1,"Tony" 2,"Ivy" 3,"Tom" 4,"Spark" 5,"Storm"创建文件 [root@hadoop1 datamove]# cat simple.csv 1,"Tony" 2,"Ivy" 3,"Tom" 4,"Spark" 5,"Storm" 上传文件 [root@hadoop1 datamove]# hdfs dfs -put simple.csv /liguodong [root@hadoop1 datamove]# hdfs dfs -ls /liguodong Found 5 items -rw-r--r-- 3 root supergroup 45 2015-07-06 11:13 /liguodong/simple.csv 创建表 hbase(main):001:0> create 'hbase-tb1-001','cf' 0 row(s) in 3.1120 seconds => Hbase::Table - hbase-tb1-001 执行mapreduce [root@hadoop1 datamove]# hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator="," -Dimporttsv.columns=HBASE_ROW_KEY,cf hbase-tb1-001 /liguodong/simple.csv 查看是否成功导入 hbase(main):003:0> scan 'hbase-tb1-001' ROW COLUMN+CELL 1 column=cf:, timestamp=1436152834178, value="Tony" 2 column=cf:, timestamp=1436152834178, value="Ivy" 3 column=cf:, timestamp=1436152834178, value="Tom" 4 column=cf:, timestamp=1436152834178, value="Spark" 5 column=cf:, timestamp=1436152834178, value="Storm" 5 row(s) in 0.1490 seconds三、利用completebulkload将数据导入到HBaseHBase支持bulkload的入库方式,它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接在HDFS中生成持久化的HFile数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配和mapreduce完成,高效便捷,而且不占用region资源,增添负载,在大数据量写入时,能极大的提高写入效率,并降低对HBase节点的写入压力。通过使用先生成HFile,然后再BulkLoad到HBase的方式来替代之前直接调用HTableOutputFormat的方法有如下的好处:1、消除了对HBase集群的插入压力2、提高了Job的运行速度,降低了Job的执行时间利用completebulkload将数据导入到HBase1、先通过lmportTsv生成HFile命令:hbase [类] [分隔符] [输出存储路径] [行键,列族] [表] [导入原始数据文件] bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator="," -Dimporttsv.bulk.output=/hfile_tmp -Dimporttsv.columns=HBASE_ROW_KEY,cf hbase-tbl-002 /simple.csv2、通过completebulkload将数据导入表hbase-tbl-002命令:hadoop jar lib/hbase-server-0.96.0.jar completebulkload /hfile_tmp hbase-tbl-002[root@hadoop1 datamove]# hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator="," -Dimporttsv.bulk.output=/liguodong/hfile_tmp -Dimporttsv.columns=HBASE_ROW_KEY,cf hbase-tbl-002 /liguodong/simple.csv 以上的指令,它会主动创建表hbase-tbl-002和文件夹hfile_tmp。 [root@hadoop1 datamove]# hdfs dfs -ls /liguodong drwxr-xr-x - root supergroup 0 2015-07-06 11:54 /liguodong/hfile_tmp [root@hadoop1 datamove]# hdfs dfs -ls -R /liguodong/hfile_tmp -rw-r--r-- 3 root supergroup 0 2015-07-06 11:54 /liguodong/hfile_tmp/_SUCCESS drwxr-xr-x - root supergroup 0 2015-07-06 11:54 /liguodong/hfile_tmp/cf -rw-r--r-- 3 root supergroup 1196 2015-07-06 11:54 /liguodong/hfile_tmp/cf/e20e3fe899de47a88ca476e05da2c9d7 hbase(main):008:0> scan 'hbase-tbl-002' ROW COLUMN+CELL 0 row(s) in 0.0310 seconds 将数据导入表hbase-tbl-002 [root@hadoop1 datamove]# hadoop jar /opt/cloudera/parcels/CDH/lib/hbase/hbase-server-0.98.6-cdh5.3.4.jar completebulkload /liguodong/hfile_tmp hbase-tbl-002四、利用Import将数据导入到HBase1、HBase export工具导出的数据的格式是sequence file。比如,在执行完命令bin/hbase org.apache.hadoop.hbase.mapreduce.Export hbase-tbl-002 /test-output后,hbase会启动一个MapReduce作业,作业完成后会在hdfs上面会生成sequence file格式的数据文件。2、对于这类Sequence file格式的数据文件,HBase是可以通过Import工具直接将它导入到HBase的表里面的。执行命令:bin/hbase org.apache.hadoop.hbase.mapreduce.Import hbase-tbl-003 /test-output,随后hbase会启动一个MapReduce作业,然后表test会成功入数据。导出到hdfs [root@hadoop1 lib]# hbase org.apache.hadoop.hbase.mapreduce.Export hbase-tb1-001 /liguodong/test-output 创建新表 hbase(main):010:0> create 'hbase-tb1-003','cf' 0 row(s) in 0.4290 seconds => Hbase::Table - hbase-tb1-003 导入到hbase [root@hadoop1 lib]# hbase org.apache.hadoop.hbase.mapreduce.Import hbase-tb1-003 /liguodong/test-output 验证 hbase(main):011:0> scan 'hbase-tb1-003' ROW COLUMN+CELL 1 column=cf:, timestamp=1436152834178, value="Tony" 2 column=cf:, timestamp=1436152834178, value="Ivy" 3 column=cf:, timestamp=1436152834178, value="Tom" 4 column=cf:, timestamp=1436152834178, value="Spark" 5 column=cf:, timestamp=1436152834178, value="Storm" 5 row(s) in 0.0580 seconds
文章目录搜索聚合高阶概念搜索即从一个索引下按照特定的字段或关键词搜索出符合用户预期的一个或者一堆cocument,然后根据文档的相关度得分,在返回的结果集里并根据得分对这些文档进行一定的排序。聚合根据业务需求,对文档中的某个或某几个字段进行数据的分组并做一些指标数据的统计分析,比如要计算一批文档中某个业务字段的总数,平均数,最大最小值等,都属于聚合的范畴。以上两个概念后是理解下面实验的基础,如果是传统关系数据库mysql、oracle等存储的数据,也可以搜索和聚合,但是在数据聚合分析一块,毕竟不是它们的强项,而且需要在程序中做大量的处理,耗时费力,尤其是大数据量的情况下就有些力不从心了。但在es中,由于内置了聚合统计的相关功能,只需要使用好它的语法即可达到几近实时的聚合统计,和搜索花费时间基本上没有太大差别,因此使用es很适合在数据量大的业务场景下做聚合统计与分析。高阶概念Buckets(桶/集合):满足特定条件的文档的集合Metrics(指标):对桶内的文档进行统计计算(例如最小值,求和,最大值等)在聚合统计分析中,使用很频繁的一个名词叫 aggs,它是聚合的关键词之一,下面就用实验来演示一下使用aggs进行数据聚合的多种场景。1、实验准备数据,首先往es整合批量插入一些实验数据,这里我们以一个家电卖场的电视为背景进行模拟设定文档中field的相关分词属性。PUT http://192.168.56.235:9201/demo2 { "setting":{ "index":{ "number_of_shards":5, "number_of_replicats":1 } }, "mappings":{ "sales":{ "properties":{ "price":{ "type":"long" }, "color":{ "type":"keyword" }, "brand":{ "type":"keyword" }, "sold_date":{ "type":"date" } } } } }2、批量插入数据POST http://192.168.56.235:9201/demo2/sales { "price" : 1000, "color" : "红色", "brand" : "长虹", "sold_date" : "2016-10-28" } { "price" : 2000, "color" : "红色", "brand" : "长虹", "sold_date" : "2016-11-05" } { "price" : 3000, "color" : "绿色", "brand" : "小米", "sold_date" : "2017-05-18" } { "price" : 1500, "color" : "蓝色", "brand" : "TCL", "sold_date" : "2017-07-02" } { "price" : 1200, "color" : "绿色", "brand" : "TCL", "sold_date" : "2018-08-19" } { "price" : 2000, "color" : "红色", "brand" : "长虹", "sold_date" : "2017-11-05" } { "price" : 8000, "color" : "红色", "brand" : "三星", "sold_date" : "2017-01-01" } { "price" : 2500, "color" : "蓝色", "brand" : "小米", "sold_date" : "2018-02-12" }数据准备完毕2、按照颜色分组统计各种颜色电视的数量查询语法如下:{ "size":0, "aggs":{ "group_color":{ "terms":{ "field":"color" }, "aggs":{ "avg_color_price":{ "avg":{ "field":"price" } } } } } }查询结果如下,这里简单对其中的几个参数和结果名称做一下说明。在查询语句中:size:0表示聚合查询的结果不需要返回中间的文档内容,group_color 我们自定义的分组名字,最好是见名知意的在返回结果中:hits:{},这部分存放的是返回结果的基本统计结果,如果上面的size制指定了不为0,文档内容则会放在这个里面buckets:存放聚合后的统计结果详细信息,以key-value的形式展现3、按照颜色分组统计各种颜色电视的数量,并在此基础上,统计出各种颜色电视的平均价格分析:按照color去分bucket,可以拿到每个color bucket中的数量,这个仅仅只是一个bucket操作, doc_count其实只是es的bucket操作默认执行的一个内置metric。在一个aggs执行的bucket操作(terms),平级的json结构下,再加一个aggs,这个第二个aggs内部,同样取个名字,执行一个metric操作,avg,对之前的每个bucket中的数据的指定的field、pricefield,求一个平均值就是一个metric,就是一个对一个bucket分组操作之后,对每个bucket都要执行的一个metric,也可以理解成功嵌套聚合,在es中获取到某个指标的数据后,继续对这个指标的数据进行其他聚合分析也被叫做下钻该需求查询语句如下:{ "size":0, "aggs":{ "group_color":{ "terms":{ "field":"color" }, "aggs":{ "avg_color_price":{ "avg":{ "field":"price" } } } } } }返回结果如下,通过结果可以很清晰的看出来,在颜色统计分析的基础上,每一个{}里面还增加了一个指标,即自定义的计算平均值的avg_color_price,这个查询几乎是毫秒级的,基本没有延迟,如果转化为sql查询应该是这样的:select avg(price) from tvs.sales group by color4、根据颜色分组,求出每种颜色的电视价格的最大值,最小值,平均值{ "size":0, "aggs":{ "group_by_color":{ "terms":{ "field":"color" }, "aggs":{ "max_price":{ "max":{ "field":"price" } }, "min_price":{ "min":{ "field":"price" } }, "avg_price":{ "avg":{ "field":"price" } } } } } }所得结果如下:{ "took": 4, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 7, "max_score": 0, "hits": [] }, "aggregations": { "group_by_color": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [ { "key": "红色", "doc_count": 4, "max_price": { "value": 8000 }, "min_price": { "value": 1000 }, "avg_price": { "value": 3250 } }, { "key": "蓝色", "doc_count": 2, "max_price": { "value": 2500 }, "min_price": { "value": 1500 }, "avg_price": { "value": 2000 } }, { "key": "绿色", "doc_count": 1, "max_price": { "value": 1200 }, "min_price": { "value": 1200 }, "avg_price": { "value": 1200 } } ] } } }/5、按照不同的价格区间对电视进行划分,并求出每个价格区间的电视的平均价格在es中根据区间间隔划分,有一个叫做 histogram的语法可以帮助我们执行,类似于terms,也是进行bucket分组操作,接收一个field,按照这个field的值的各个范围区间,进行bucket分组操作。“histogram”:{ “field”: “price”, “interval”: 2000 },interval:2000,划分范围,0 ~ 2000,2000 ~ 4000,4000 ~ 6000,6000 ~ 8000,8000 ~ 10000,buckets根据price的值,比如2500,看落在哪个区间内,比如2000 ~ 4000,此时就会将这条数据放入2000 ~ 4000对应的那个bucket中bucket划分的方法terms,将field值相同的数据划分到一个bucket中bucket有了之后,一样的,去对每个bucket执行avg,count,sum,max,min,等各种metric操作,聚合分析{ "size":0, "aggs":{ "interval_price":{ "histogram":{ "field":"price", "interval":2000 }, "aggs":{ "revenue":{ "avg":{ "field":"price" } } } } } }查询的结果如下:可以看到,按照2000一个等级将所有电视的价格划分在不同的区间了,并将每个区间的价格平均值统计了出来{ "took": 7, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 7, "max_score": 0, "hits": [] }, "aggregations": { "interval_price": { "buckets": [ { "key": 0, "doc_count": 3, "revenue": { "value": 1233.3333333333333 } }, { "key": 2000, "doc_count": 3, "revenue": { "value": 2166.6666666666665 } }, { "key": 4000, "doc_count": 0, "revenue": { "value": null } }, { "key": 6000, "doc_count": 0, "revenue": { "value": null } }, { "key": 8000, "doc_count": 1, "revenue": { "value": 8000 } } ] } } }6、按照不同的时间区间对电视进行划分,并求出每个价格区间的电视的平均价格date histogram,按照我们指定的某个date类型的日期field,以及日期interval,按照一定的日期间隔,去划分bucket,这个概念的理解和上一个有点类似,可以对照理解。date interval = 1 month2017-01-01~2017-01-31,就是一个bucket2017-02-01~2017-02-28,就是一个bucket然后会去扫描每个数据的date field,判断date落在哪个bucket中,就将其放入那个bucket2017-01-05,就将其放入2017-01-01~2017-01-31,就是一个bucketmin_doc_count:即使某个日期interval,2017-01-01~2017-01-31中,一条数据都没有,那么这个区间也是要返回的,不然默认是会过滤掉这个区间的extended_bounds,min,max:划分bucket的时候,会限定在这个起始日期,和截止日期内根据上述分析我们构建查询语句{ "size":0, "aggs":{ "sales":{ "date_histogram":{ "field":"sold_date", "interval":"month", "format":"yyyy-MM-dd", "min_doc_count":0, "extended_bounds":{ "min":"2017-01-01", "max":"2018-12-31" } } } } }返回结果如下,按照月份,将指定区间内各个月份的数量做了统计当然,如果我们觉得按照月份统计粒度太细,也可以根据季度对数据进行统计,只需要将month换成quarter即可,查询语法如下:{ "size":0, "aggs":{ "sales":{ "date_histogram":{ "field":"sold_date", "interval":"quarter", "format":"yyyy-MM-dd", "min_doc_count":0, "extended_bounds":{ "min":"2017-01-01", "max":"2018-12-31" } } } } }查询结果如下:
文章目录1、SpringBoot 整合 Redis2、Docker中启动MySQL、Redis3、Postman测试分页的接口4、JMeter压力测试1. 设置线程组2. 设置测试 HTTP请求3. 设置测试随机数4. 测试结果1、SpringBoot 整合 RedisRedis是一种高性能的非关系型数据库。Redis作用在内存,性能极高。Spring Boot同样可以把Redis整合到项目里。打开IDEA ,创建一个新的项目,勾选本次demo中用到的Redis、MySQL相关依赖。第一步就是为项目添加Redis依赖。在SpringBoot下有spring-boot-starter-data-redis,使用Redis就相当的简单,可以看到pom.xml文件中已经成功出现。 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> 第二步添加上Redis配置信息。包括Redis服务器的IP、端口、密码等信息,前提是已经安装好Redis服务,密码等信息必须和服务器一致。我们这里配置的Redis都是在Docker容器中的,application.yml内容如下所示。server: port: 9000 spring: datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://192.168.56.100:3306/shoppings username: root password: ok redis: database: 0 host: 192.168.56.100 port: 6379 password:第三步,接下来就要写例子来使用使用Redis,在这个例子中使用的是框架封装Redis的类StringRedisTemplate。为啥用这个呢?,因为例子设计的k-v都是String类型,官方建议符合这种格式的推荐使用 StringRedisTemplate。当k-v是非String类型的object时,推荐使用RedisTemplate。services 层 Goodservices.java 如下所示,实体类 Goods,dao 层 GoodsDAO 很简单就不附上了。通过saveRedis方法进行数据转化,将MySQL数据库中内容转到Redis数据库中,注意这里执行过一次之后通过findRedisData方法实现分页。package com.kgc.msrd.services; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.kgc.msrd.dao.GoodsDAO; import com.kgc.msrd.entity.Goods; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.List; @Service public class Goodsservice { @Autowired private GoodsDAO goodsDAO; @Autowired private StringRedisTemplate redisTemplate; public List<Goods> findAll(int pg){ return goodsDAO.findAll((pg-1)*20); } /*数据转化 mysql数据库中内容转到redis数据库中,当数据成功转出后这段代码便可以注去*/ public void saveRedis(){ // List<String> res=new ArrayList<String>(); // List<Goods> goods=findAll(); // ObjectMapper om=new ObjectMapper(); // for(Goods gd:goods){ // try { // res.add(om.writeValueAsString(gd)); // } catch (JsonProcessingException e) { // e.printStackTrace(); // } // } // redisTemplate.opsForList().leftPushAll("good",res); } public List<String> findRedisData(int pg){ return redisTemplate.opsForList().range("good",(pg-1)*20,pg*20-1); } }controller层 InitCtrlpackage com.kgc.msrd.controller; import com.kgc.msrd.entity.Goods; import com.kgc.msrd.services.Goodsservice; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.List; @RestController public class InitCtrl { @Autowired private Goodsservice goodsservice; @RequestMapping("/mysql") public List<Goods> init(int pg){ return goodsservice.findAll(pg); } /* @RequestMapping("/save") public String save(){ goodsservice.saveRedis(); return "ok"; }*/ @RequestMapping("/redis") public List<String> rd(int pg){ return goodsservice.findRedisData(pg); } }package com.kgc.msrd; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication @MapperScan("com.kgc.msrd.dao") public class MsrdApplication { public static void main(String[] args) { SpringApplication.run(MsrdApplication.class, args); } }2、Docker中启动MySQL、Redis每一次登录,我们先要重启网络、重启容器systemctl restart network systemctl restart docker在Xshell 6 Docker上使用命令,进入Redis[root@zj ~]# docker start redis redis [root@zj ~]# docker exec -it redis /bin/bash root@4e6d4e0082e8:/data# redis-cli -h localhost3、Postman测试分页的接口接下来使用postman进行接口测试。Postman的使用可以参考这篇教程:接口测试工具Postman接口测试图文教程可以看到,我们对数据进行了分页,之前代码中,我们对数据分了20页,这时使用http://localhost:9000/redis?pg=4我们可以成功查询到第五页的数据。4、JMeter压力测试1. 设置线程组2. 设置测试 HTTP请求3. 设置测试随机数4. 测试结果通过压力测试,比较MySQL与Redis的性能,并进行分析。当数据量很庞大时,很明显看出Redis缓存服务器极高的性能。
今天在整合SSM框架时,遇到如下问题,尝试了网上很多方法,终于成功解决,相信以下方法可以解决大多数人的问题。排查方法如下:1、mapper接口和mapper.xml是否在同一个包(package)下?名字是否一样(仅后缀不同)?2、mapper.xml的命名空间(namespace)是否跟mapper接口的包名一致?3、接口的方法名,与xml中的一条sql标签的id一致 (我个人是由于这个原因,在mapper.xml的配置文件中id与接口id没能一致,导致自己的insertUser方法出现问题!)4、如果接口中的返回值List集合(不知道其他集合也是),那么xml里面的配置,尽量用resultMap(保证resultMap配置正确),不要用resultType5、如果你的项目是maven项目,请你在编译后,到接口所在target目录看一看,很有可能是没有mapper生产对应的xml文件,因为maven默认是不编译的,因此,你需要在你的pom.xml的里面,加这么一段:<build> <resources> <resource> <directory>src/main/java</directory> <includes> <include>**/*.properties</include> <include>**/*.xml</include> </includes> <filtering>false</filtering> </resource> </resources> </build>
2022年06月