Apache doris Datax DorisWriter扩展使用方法

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: ataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。

DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能


Apache Doris是一个现代化的MPP分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。Apache Doris的分布式架构非常简洁,易于运维,并且可以支持10PB以上的超大数据集。


Apache Doris可以满足多种数据分析需求,例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。令您的数据分析工作更加简单高效!


为了更好的扩展Apache doris生态,为doris用户提供更方便的数据导入,社区开发扩展支持了Datax DorisWriter,使大家更方便Datax进行数据进入



1.场景


这里演示介绍的使用 Doris 的 Datax 扩展 DorisWriter实现从Mysql数据定时抽取数据导入到Doris数仓表里


2.编译 DorisWriter


这个的扩展的编译可以不在 doris 的 docker 编译环境下进行,本文是在 windows 下的 WLS 下进行编译的


首先从github上拉取源码

git clone https://github.com/apache/incubator-doris.git

进入到incubator-doris/extension/DataX/ 执行编译


首先执行:

sh init_env.sh

这个脚本主要用于构建 DataX 开发环境,他主要进行了以下操作:


  1. 将 DataX 代码库 clone 到本地。


  1. doriswriter/ 目录软链到 DataX/doriswriter 目录。


  1. DataX/pom.xml 文件中添加 <module>doriswriter</module> 模块。


  1. DataX/core/pom.xml 文件中的 httpclient 版本从 4.5 改为 4.5.13.


httpclient v4.5 在处理 307 转发时有bug。

这个脚本执行后,开发者就可以进入 DataX/ 目录开始开发或编译了。因为做了软链,所以任何对 DataX/doriswriter 目录中文件的修改,都会反映到 doriswriter/ 目录中,方便开发者提交代码


2.1 开始编译


这里我为了加快编译速度去掉了很多无用的插件:这里直接在Datax目录下的pom.xml里注释掉就行

hbase11xreader
hbase094xreader
tsdbreader
oceanbasev10reader
odpswriter
hdfswriter
adswriter
ocswriter
oscarwriter
oceanbasev10writer

然后进入到incubator-doris/extension/DataX/ 目录下的 Datax 目录,执行编译

这里我是执行的将 Datax 编译成 tar 包,和官方的编译命令不太一样。


mvn -U clean package assembly:assembly -Dmaven.test.skip=true

image.png

image-20210903132250723

image.png

image-20210903132539511


编译完成以后,tar 包在 Datax/target 目录下,你可以将这tar包拷贝到你需要的地方,这里我是直接在 datax 执行测试,这里因为的 python 版本是 3.x版本,需要将 bin 目录下的三个文件换成 python 3能之别的版本,这个你可以去下面的地址下载:


https://github.com/WeiYe-Jing/datax-web/tree/master/doc/datax-web/datax-python3

将下载的三个文件替换 bin 目录下的文件以后,整个编译,安装就完成了


如果你编译不成功也可以从我的百度网盘上下载编译好的包,注意我上边编译去掉的那些插件


链接:https://pan.baidu.com/s/1hXYkpkrUE2qW4j98k2Wu7A 
提取码:3azi

3.数据接入


这个时候我们就可以开始使用 Datax 的doriswriter扩展开始从 Mysql(或者其他数据源)直接将数据抽取出来导入到 Doris 表中了。


3.1 Mysql 数据库准备


下面是我数据库的建表脚本(mysql 8):

CREATE TABLE `order_analysis` (
  `date` varchar(19) DEFAULT NULL,
  `user_src` varchar(9) DEFAULT NULL,
  `order_src` varchar(11) DEFAULT NULL,
  `order_location` varchar(2) DEFAULT NULL,
  `new_order` int DEFAULT NULL,
  `payed_order` int DEFAULT NULL,
  `pending_order` int DEFAULT NULL,
  `cancel_order` int DEFAULT NULL,
  `reject_order` int DEFAULT NULL,
  `good_order` int DEFAULT NULL,
  `report_order` int DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT

示例数据


INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-12 00:00:00', '广告二维码', 'Android APP', '上海', 15253, 13210, 684, 1247, 1000, 10824, 862);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-14 00:00:00', '微信朋友圈H5页面', 'iOS APP', '广州', 17134, 11270, 549, 204, 224, 10234, 773);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-17 00:00:00', '地推二维码扫描', 'iOS APP', '北京', 16061, 9418, 1220, 1247, 458, 13877, 749);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-17 00:00:00', '微信朋友圈H5页面', '微信公众号', '武汉', 12749, 11127, 1773, 6, 5, 9874, 678);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-18 00:00:00', '地推二维码扫描', 'iOS APP', '上海', 13086, 15882, 1727, 1764, 1429, 12501, 625);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-18 00:00:00', '微信朋友圈H5页面', 'iOS APP', '武汉', 15129, 15598, 1204, 1295, 1831, 11500, 320);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-19 00:00:00', '地推二维码扫描', 'Android APP', '杭州', 20687, 18526, 1398, 550, 213, 12911, 185);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-19 00:00:00', '应用商店', '微信公众号', '武汉', 12388, 11422, 702, 106, 158, 5820, 474);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-20 00:00:00', '微信朋友圈H5页面', '微信公众号', '上海', 14298, 11682, 1880, 582, 154, 7348, 354);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-21 00:00:00', '地推二维码扫描', 'Android APP', '深圳', 22079, 14333, 5565, 1742, 439, 8246, 211);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-22 00:00:00', 'UC浏览器引流', 'iOS APP', '上海', 28968, 18151, 7212, 2373, 1232, 10739, 578);

3.2 doris数据库准备


下面是我上面数据表在doris对应的建表脚本


CREATE TABLE `order_analysis` (
  `date` datetime DEFAULT NULL,
  `user_src` varchar(30) DEFAULT NULL,
  `order_src` varchar(50) DEFAULT NULL,
  `order_location` varchar(10) DEFAULT NULL,
  `new_order` int DEFAULT NULL,
  `payed_order` int DEFAULT NULL,
  `pending_order` int DEFAULT NULL,
  `cancel_order` int DEFAULT NULL,
  `reject_order` int DEFAULT NULL,
  `good_order` int DEFAULT NULL,
  `report_order` int DEFAULT NULL
) ENGINE=OLAP
DUPLICATE KEY(`date`,user_src)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`user_src`) BUCKETS 1
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "V2"
);

3.3 Datax Job JSON文件


创建并编辑datax job任务json文件,并保存到指定目录

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "zhangfeng",
                        "column": ["date","user_src","order_src","order_location","new_order","payed_order"," pending_order"," cancel_order"," reject_order"," good_order"," report_order" ],
                        "connection": [ { "table": [ "order_analysis" ], "jdbcUrl": [ "jdbc:mysql://localhost:3306/demo" ] } ] }
                },
                "writer": {
                    "name": "doriswriter",
                    "parameter": {
                        "feLoadUrl": ["fe:8030"],
                        "beLoadUrl": ["be1:8040","be1:8040","be1:8040","be1:8040","be1:8040","be1:8040"],
                        "jdbcUrl": "jdbc:mysql://fe:9030/",
                        "database": "test_2",
                        "table": "order_analysis",
                        "column": ["date","user_src","order_src","order_location","new_order","payed_order"," pending_order"," cancel_order"," reject_order"," good_order"," report_order"],
                        "username": "root",
                        "password": "",
                        "postSql": [],
                        "preSql": [],
                        "loadProps": {
                        },
                        "maxBatchRows" : 10000,
                        "maxBatchByteSize" : 104857600,
                        "labelPrefix": "datax_doris_writer_demo_",
                        "lineDelimiter": "\n"
                    }
                }
            }
        ]
    }
}

这块 Mysql reader 使用方式参照:


https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

doriswriter的使用及参数说明:


https://github.com/apache/incubator-doris/blob/master/extension/DataX/doriswriter/doc/doriswriter.md

4.执行Datax数据导入任务


python bin/datax.py doris.json

然后就可以看到执行结果:

image.png


image-20210903134043421


再去 Doris 数据库中查看你的表,数据就已经导入进去了,任务执行结束


因为 Datax 的任务是要靠外部触发才能执行,这里你可以使用Linux的crontab或者海豚调度之类的来控制任务运行




目录
相关文章
|
6天前
|
SQL 缓存 数据处理
数据无界、湖仓无界,Apache Doris 湖仓一体典型场景实战指南(下篇)
Apache Doris 提出“数据无界”和“湖仓无界”理念,提供高效的数据管理方案。本文聚焦三个典型应用场景:湖仓分析加速、多源联邦分析、湖仓数据处理,深入介绍 Apache Doris 的最佳实践,帮助企业快速响应业务需求,提升数据处理和分析效率
数据无界、湖仓无界,Apache Doris 湖仓一体典型场景实战指南(下篇)
|
8天前
|
存储 SQL 数据挖掘
数据无界、湖仓无界, Apache Doris 湖仓一体解决方案全面解读(上篇)
湖仓一体架构融合了数据湖的低成本、高扩展性,以及数据仓库的高性能、强数据治理能力,高效应对大数据时代的挑战。为助力企业实现湖仓一体的建设,Apache Doris 提出了数据无界和湖仓无界核心理念,并结合自身特性,助力企业加速从 0 到 1 构建湖仓体系,降低转型过程中的风险和成本。本文将对湖仓一体演进及 Apache Doris 湖仓一体方案进行介绍。
数据无界、湖仓无界, Apache Doris 湖仓一体解决方案全面解读(上篇)
|
13天前
|
存储 运维 监控
从 ClickHouse 到 Apache Doris:在网易云音乐日增万亿日志数据场景下的落地
日志数据已成为企业洞察系统状态、监控网络安全及分析业务动态的宝贵资源。网易云音乐引入 Apache Doris 作为日志库新方案,替换了 ClickHouse。解决了 ClickHouse 运维复杂、不支持倒排索引的问题。目前已经稳定运行 3 个季度,规模达到 50 台服务器, 倒排索引将全文检索性能提升7倍,2PB 数据,每天新增日志量超过万亿条,峰值写入吞吐 6GB/s 。
从 ClickHouse 到 Apache Doris:在网易云音乐日增万亿日志数据场景下的落地
|
1月前
|
存储 运维 监控
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
中信银行信用卡中心每日新增日志数据 140 亿条(80TB),全量归档日志量超 40PB,早期基于 Elasticsearch 构建的日志云平台,面临存储成本高、实时写入性能差、文本检索慢以及日志分析能力不足等问题。因此使用 Apache Doris 替换 Elasticsearch,实现资源投入降低 50%、查询速度提升 2~4 倍,同时显著提高了运维效率。
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
|
1月前
|
SQL 存储 分布式计算
Apache Doris 2.1.8 版本正式发布
该版本持续在湖仓一体、异步物化视图、查询优化器与执行引擎、存储管理等方面进行改进提升与问题修复,进一步加强系统的性能和稳定性,欢迎大家下载体验。
|
2月前
|
存储 SQL Apache
Apache Doris 创始人:何为“现代化”的数据仓库?
3.0 版本是 Apache Doris 研发路程中的重要里程碑,他将这一进展总结为“实时之路”、“统一之路”和“弹性之路”,详细介绍了所对应的核心特性的设计思考与应用价值,揭晓了 2025 年社区发展蓝图
Apache Doris 创始人:何为“现代化”的数据仓库?
|
2月前
|
SQL 存储 数据处理
别让你的CPU打盹儿:Apache Doris并行执行原理大揭秘!
别让你的CPU打盹儿:Apache Doris并行执行原理大揭秘!
151 1
别让你的CPU打盹儿:Apache Doris并行执行原理大揭秘!
|
1月前
|
存储 SQL 监控
计算效率提升 10 倍,存储成本降低 60%,灵犀科技基于 Apache Doris 建设统一数据服务平台
灵犀科技早期基于 Hadoop 构建大数据平台,在战略调整和需求的持续扩增下,数据处理效率、查询性能、资源成本问题随之出现。为此,引入 [Apache Doris](https://doris.apache.org/) 替换了复杂技术栈,升级为集存储、加工、服务为一体的统一架构,实现存储成本下降 60%,计算效率提升超 10 倍的显著成效。
计算效率提升 10 倍,存储成本降低 60%,灵犀科技基于 Apache Doris 建设统一数据服务平台
|
3月前
|
存储 消息中间件 分布式计算
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
Cisco WebEx 早期数据平台采用了多系统架构(包括 Trino、Pinot、Iceberg 、 Kyuubi 等),面临架构复杂、数据冗余存储、运维困难、资源利用率低、数据时效性差等问题。因此,引入 Apache Doris 替换了 Trino、Pinot 、 Iceberg 及 Kyuubi 技术栈,依赖于 Doris 的实时数据湖能力及高性能 OLAP 分析能力,统一数据湖仓及查询分析引擎,显著提升了查询性能及系统稳定性,同时实现资源成本降低 30%。
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
|
2月前
|
SQL 存储 Apache
Apache Doris 3.0.3 版本正式发布
亲爱的社区小伙伴们,Apache Doris 3.0.3 版本已于 2024 年 12 月 02 日正式发布。该版本进一步提升了系统的性能及稳定性,欢迎大家下载体验。

热门文章

最新文章

推荐镜像

更多