大数据技术之DataX

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
简介: 异构数据源集成,它就是神器。

大家好,我是脚丫先生 (o^^o)

在日常大数据生产环境中,经常会有关系型数据库和关系型数据库,以及关系型和非关系型数据库数据之间的互相转换的需求,在需求选择的初期解决问题的方法----离线数据同步工具/平台,小伙伴们可先收藏后慢慢研究。

小伙伴们如果觉得文章不错,点赞、收藏、评论,分享走一起呀,记得给俺来个一键三连~~

好了,我们开始今天的正文。
在这里插入图片描述

一、Datax概述

1.1 Datax介绍

DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
在这里插入图片描述

1.2 DataX的设计

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
在这里插入图片描述

1.3 框架设计

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
在这里插入图片描述
DataX在设计之初就将同步理念抽象成框架+插件的形式.框架负责内部的序列化传输,缓冲,并发,转换等而核心技术问题,数据的采集(Reader)和落地(Writer)完全交给插件执行。

  • Read 数据采集模块,负责采集数据源的数据,将数据发送至FrameWork。
  • Writer 数据写入模块,负责不断的向FrameWork取数据,并将数据写入目的端。
  • FrameWork 用于连接reader和write,作为两者的数据传输通道,处理缓冲,流控,并发,转换等核心技术问题。

1.4 DataX插件体系

DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图,详情请点击:DataX数据源参考指南

1.5 运行原理

在这里插入图片描述

  • Job 完成单个数据同步的作业称之为job。DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。负责数据清理,子任务划分,TaskGroup监控管理。
  • Task 由Job切分而来,是DataX作业的最小单元,每个Task负责一部分数据的同步工作。
  • Schedule 将Task组成TaskGroup,默认单个任务组的并发数量为5。
  • TaskGroup 负责启动Task。

详细解说:DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务.

DataX调度流程
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:

  • 1 DataXJob根据分库分表切分成了100个Task。
  • 2 根据20个并发,默认单个任务组的并发数量为5,DataX计算共需要分配4个TaskGroup。
  • 3 这里4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

二、快速入门

2.1 官方地址

下载地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
源码地址:https://github.com/alibaba/DataX

2.2 前置要求

  • Linux
  • JDK(1.8以上,推荐1.8)
  • Python(推荐Python2.6.X)

2.3 安装

(1) 将下载好的datax.tar.gz上传到服务器的/home/soft文件夹下

[root@xxx soft]$ ls
datax.tar.gz

(2) 解压datax.tar.gz到/opt/module

[root@xxx soft]$ tar -zxvf datax.tar.gz -C /opt/module/

(3) 运行脚本检测

[root@xxx bin]$ cd /opt/module/datax/bin/
[root@xxxbin]$ python datax.py /opt/module/datax/job/job.json

在这里插入图片描述

(4) json配置文件注释

/*一个json就是一个job,一个job主要包含:
content,setting 两个属性*/
{
  "job": {
    /*content是job的核心,主要放reader和writer插件*/
    "content": {
      /*raader插件*/
      "reader": {},
      /*writer插件*/
      "writer": {}
    },
    /*setting主要用来设置job的基本设置*/
    "setting": {
      /*speed流量控制*/
      "speed": {
        "channel": 1, /*同步时候的并发数*/
        "byte": 1024 /*同步时候的字节大小,影响速度,可选*/
      },
      /*脏数据控制,配置的意思是当脏数据大于10条,或者脏数据比例达到0.05%,任务就会报错*/
      "errorLimit": {
        "record": 10,/*脏数据最大记录数阈值*/
        "percentage": 0.05 /*脏数据占比阈值*/
      }
    }
  }
}

json的reader和writer内容根据插件不同而变化,具体查询官网

三、 DataX类图

整个流程大致如下
在这里插入图片描述
启动步骤解析:
1、解析配置,包括job.json、core.json、plugin.json三个配置

2、设置jobId到configuration当中

3、启动Engine,通过Engine.start()进入启动程序

4、设置RUNTIME_MODEconfiguration当中

5、通过JobContainer的start()方法启动
6、依次执行job的preHandler()、init()、prepare()、split()、schedule()、- post()、postHandle()等方法。

7、init()方法涉及到根据configuration来初始化reader和writer插件,这里涉及到jar包热加载以及调用插件init()操作方法,同时设置reader和writer的configuration信息

8、prepare()方法涉及到初始化reader和writer插件的初始化,通过调用插件的prepare()方法实现,每个插件都有自己的jarLoader,通过集成URLClassloader实现而来

9、split()方法通过adjustChannelNumber()方法调整channel个数,同时执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果,达到切分后数目相等,才能满足1:1的通道模型

10、channel的计数主要是根据byte和record的限速来实现的,在split()的函数中第一步就是计算channel的大小

11、split()方法reader插件会根据channel的值进行拆分,但是有些reader插件可能不会参考channel的值,writer插件会完全根据reader的插件1:1进行返回

12、split()方法内部的mergeReaderAndWriterTaskConfigs()负责合并reader、writer、以及transformer三者关系,生成task的配置,并且重写job.content的配置

13、schedule()方法根据split()拆分生成的task配置分配生成taskGroup对象,根据task的数量和单个taskGroup支持的task数量进行配置,两者相除就可以得出taskGroup的数量

14、schdule()内部通过AbstractScheduler的schedule()执行,继续执行startAllTaskGroup()方法创建所有的TaskGroupContainer组织相关的task,TaskGroupContainerRunner负责运行TaskGroupContainer执行分配的task。scheduler的具体实现类为ProcessInnerScheduler。

15、taskGroupContainerExecutorService启动固定的线程池用以执行TaskGroupContainerRunner对象,TaskGroupContainerRunner的run()方法调用taskGroupContainer.start()方法,针对每个channel创建一个TaskExecutor,通过taskExecutor.doStart()启动任务

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
3月前
|
人工智能 搜索推荐 算法
数据平台演进问题之数据库技术面临挑战如何解决
数据平台演进问题之数据库技术面临挑战如何解决
|
11天前
|
机器学习/深度学习 运维 分布式计算
大数据技术专业就业前景
大数据技术专业就业前景广阔,广泛应用于互联网、金融、医疗等众多行业,助力企业数字化转型。岗位涵盖大数据开发、分析、运维及管理,如大数据工程师、分析师和系统运维工程师等。这些岗位因专业性和稀缺性而享有优厚薪资,尤其在一线城市可达20万至50万年薪。随着技术进步和经验积累,从业者可晋升为高级职位或投身数据咨询、创业等领域,发展空间巨大。
23 5
|
15天前
|
人工智能 编解码 搜索推荐
大模型、大数据与显示技术深度融合 加速智慧医疗多元化场景落地
大模型、大数据与显示技术深度融合 加速智慧医疗多元化场景落地
|
2月前
|
分布式计算 搜索推荐 物联网
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
|
2月前
|
存储 人工智能 算法
AI与大数据的结合:案例分析与技术探讨
【8月更文挑战第22天】AI与大数据的结合为各行各业带来了前所未有的机遇和挑战。通过具体案例分析可以看出,AI与大数据在电商、智能驾驶、医疗等领域的应用已经取得了显著成效。未来,随着技术的不断进步和应用场景的不断拓展,AI与大数据的结合将继续推动各行业的创新与变革。
|
2月前
|
消息中间件 监控 大数据
"探索Streaming技术:如何重塑大数据未来,实时处理引领数据价值即时转化新纪元"
【8月更文挑战第10天】信息技术高速发展,数据成为推动社会进步的关键。面对数据爆炸,高效实时处理成挑战。流处理(Streaming)技术应运而生,即时处理数据流,无需积累。应用于实时监控、日志分析等场景。例如,电商平台利用流处理分析用户行为,推送个性化推荐;智能交通系统预测拥堵。结合Apache Kafka和Flink,实现从数据收集到复杂流处理的全过程。流处理技术促进数据即时价值挖掘,与AI、云计算融合,引领大数据未来发展。
96 5
|
2月前
|
大数据 数据处理 分布式计算
JSF 逆袭大数据江湖!看前端框架如何挑战数据处理极限?揭秘这场技术与勇气的较量!
【8月更文挑战第31天】在信息爆炸时代,大数据已成为企业和政府决策的关键。JavaServer Faces(JSF)作为标准的 Java Web 框架,如何与大数据技术结合,高效处理大规模数据集?本文探讨大数据的挑战与机遇,介绍 JSF 与 Hadoop、Apache Spark 等技术的融合,展示其实现高效数据存储和处理的潜力,并提供示例代码,助您构建强大的大数据系统。
35 0
|
3月前
|
传感器 大数据 数据处理
大数据处理中的流计算技术:实现实时数据处理与分析
【7月更文挑战第30天】随着分布式系统、云原生技术、数据安全与隐私保护技术的不断发展,流计算技术将在更多领域得到应用和推广,为大数据处理和分析提供更加高效、智能的解决方案。
|
3月前
|
机器学习/深度学习 存储 分布式计算
驾驭数据洪流:大数据处理的技术与应用
大数据处理不仅是信息技术领域的一个热门话题,也是推动各行各业创新和发展的重要力量。随着技术的进步和社会需求的变化,大数据处理将继续发挥其核心作用,为企业创造更多的商业价值和社会贡献。未来,大数据处理将更加注重智能化、实时性和安全性,以应对不断增长的数据挑战。
|
2月前
|
SQL 存储 分布式计算
神龙大数据加速引擎MRACC问题之RDMA技术帮助大数据分布式计算优化如何解决
神龙大数据加速引擎MRACC问题之RDMA技术帮助大数据分布式计算优化如何解决
35 0

热门文章

最新文章