DataLakeAnalysis: 使用DataX同步Kafka数据到OSS进行分析

简介:

平常业务开发中我们经常有流式数据保存在Kafka里面,这部分数据很多场景也是需要分析的,今天给大家介绍下如果使用DataX把数据从Kafka同步到OSS,保存成对分析友好的Parquet格式,然后利用DLA进行分析的全流程。
为了后续叙述的方便,我们假设Kafka里面保存的是订单的数据,它包含如下字段:

  • id, int类型
  • name, string类型
  • gmt_create, bigint类型, 时间戳字段
  • map_col: MAP 类型
  • array_col: ARRAY 类型
  • struct_col: STRUCT 类型

我们最终希望把Kafka上的这个数据最终保存到OSS上,并且映射到DLA里面的一个分区表,表结构如下:

CREATE EXTERNAL TABLE orders_p (
    id int,
    name string,
    gmt_create timestamp,
    map_col MAP<string, string>,
    array_col ARRAY<string>,
    struct_col STRUCT<id:bigint,name:string>
)
PARTITIONED BY (dt string)
STORED AS PARQUET 
LOCATION 'oss://test-bucket/datasets/oss_demo/orders_p/';

注意我们最终表结构里面有一个分区字段 dt, 因为分析场景下数据量都很大,进行分区才能提高分析的效率。而这个分区字段在原始数据里面是没有直接对应的。
因为从DataX过来的数据无法自动根据目录分区,因此我们建议从Kafka过来的数据放到中间表 orders 里面去, 在数据进入orders之后我们再跟一个任务做一个“数据拆分”的工作,把数据拆分到具体的分区里面去。总结一下:要把Kafka的数据正确的落到DLA里面的 orders_p 需要经过如下步骤:

  • kafka -> oss: DataX定时把数据同步到中间表: orders。
  • orders -> orders_p: DLA任务定时把数据从中间表orders同步到orders_p

这两个任务串行执行,第二个任务依赖第一个任务,每5分钟调度一次。这样就可以Kafka里面的数据以5分钟延时的粒度不断地写入到OSS里面去,然后使用DLA进行高效的分析。

kafka -> oss: DataX定时把数据同步到中间表: orders

因为Kafka上的数据量很大,在DLA中一般会进行分区处理以获得更好的分析性能,但是DataX目前还无法支持直接把数据写入到分区表,因此我们要搞一个中间表: orders 过度一下,它的表结构跟最终表orders_p几乎一样,只是没有分区

CREATE EXTERNAL TABLE orders (
    id int,
    name string,
    gmt_create timestamp,
    map_col MAP<string, string>,
    array_col ARRAY<string>,
    struct_col STRUCT<id:bigint,name:string>
)
STORED AS PARQUET 
LOCATION 'oss://test-bucket/datasets/oss_demo/orders/';

那么我们第一步要做的事情就是要通过DataX把数据写到这个 orders 表对应的LOCATION: oss://test-bucket/datasets/oss_demo/orders/。
整个DataX的任务的JSON配置蛮复杂的,我们直接贴在这里:

{
    "job": {
        "setting": {
            "speed": {
                "channel": 3
            },
            "errorLimit": {
                "record": 5
            }
        },
        "content": [
            {
                "reader": {
                    "name": "kafkareader",
                    "parameter": {
                        "server": "127.0.0.2:9093",
                        "column": [
                            "id",
                            "name",
                            "gmt_create",
                            "map_col",
                            "array_col",
                            "struct_col"
                        ],
                        "kafkaConfig": {
                            "group.id": "demo_test",
                            "java.security.auth.login.config": "/the-path/kafka/kafka_client_jaas.conf",
                            "ssl.truststore.location": "/the-path/kafka.client.truststore.jks",
                            "ssl.truststore.password": "KafkaOnsClient",
                            "security.protocol": "SASL_SSL",
                            "sasl.mechanism": "PLAIN",
                            "ssl.endpoint.identification.algorithm": ""
                        },
                        "topic": "yucha",
                        "waitTime": "10",
                        "partition_": "0",
                        "keyType": "ByteArray",
                        "valueType": "ByteArray",
                        "seekToBeginning_": "true",
                        "seekToLast_": "true",
                        "beginDateTime": "20190501010000",
                        "endDateTime":   "20190501010500"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "oss://test-bucket",
                        "fileType": "parquet",
                        "path": "/datasets/oss_demo/kpt",
                        "fileName": "test",
                        "writeMode": "truncate",
                        "compress":"SNAPPY",
                        "encoding":"UTF-8",
                        "hadoopConfig": {
                            "fs.oss.accessKeyId": "the-access-id",
                            "fs.oss.accessKeySecret": "the-access-key",
                            "fs.oss.endpoint": "oss-cn-hangzhou.aliyuncs.com"
                        },
                        "parquetSchema": "message test {\n    required int64 id;\n    optional binary name (UTF8);\n    optional int64 gmt_create;\n    required group map_col (MAP) {\n        repeated group key_value {\n            required binary key (UTF8);\n            required binary value (UTF8);\n        }\n    }\n    required group array_col (LIST) {\n        repeated group list {\n            required binary element (UTF8);\n        }\n    }\n    required group struct_col {\n        required int64 id;\n        required binary name (UTF8);\n    }    \n}",
                        "dataxParquetMode": "fields"
                    }
                }
            }
        ]
    }
}

这个配置分为两段: kafkareader 和 hdfswriter, 分别负责读写数据。我们分别详细介绍一下。
kafkareader 里面大多数参数都比较好理解,比较重要的参数是 beginDateTime, endDateTime, 指定这个任务要消费的kafka数据的范围,比如我们任务每5分钟跑一次,那么这里指定的可能就是当前时间往前推5分钟的时间范围,比如我们示例代码里面的是 20190501010000 到 20190501010500, 时间精确到秒。关于KafkaReader更详细的信息可以参考KafkaReader文档。
hdfswriter, 这里我们使用hdfswriter来写oss数据是因为OSS实现了Hadoop File System的接口,我们可以通过HDFS Writer来向OSS导数据,因为倒过来的数据后面要通过DLA来分析,推荐使用Parquet这种列存格式来保存,目前HDFS Writer支持PARQUET的绝大部分类型,包括基本类型以及复杂类型如array, map, struct, 要以Parquet格式同步数据,我们首先要描述一下这个Parquet的格式, 我们示例数据对应的Parquet的Schema如下:

message test {
    required int64 id;
    optional binary name (UTF8);
    optional int64 gmt_create;
    required group map_col (MAP) {
        repeated group key_value {
            required binary key (UTF8);
            required binary value (UTF8);
        }
    }
    required group array_col (LIST) {
        repeated group list {
            required binary element (UTF8);
        }
    }
    required group struct_col {
        required int64 id;
        required binary name (UTF8);
    }    
}

上面DataX任务描述文件里面的parquetSchema字段里面的内容就是上面这段,只不过缩成了一行以保证整个DataX描述文件符合JSON格式。 关于Parquet Schema更多的信息可以查看Parquet Logical Type Definitions。
另外一个注意的配置点是 writeMode, 在我们的这个方案里面,我们推荐使用 truncate, 因为这个任务是每5分钟调度一次,下一次执行的时候需要把前一次执行的数据清空掉(truncate)。

orders -> orders_p: DLA任务定时把数据从中间表orders同步到orders_p

拆分的SQL每个具体的业务会不一样,我们这个示例里面比较简单,主要干了两件事:
把原始的 bigint 类型的 gmt_create转成了timestamp类型。
从gmt_create里面生成新的dt字段。

INSERT INTO orders_p
SELECT 
id, 
name, 
from_unixtime(gmt_create),  -- bigint -> timestamp
map_col, array_col, struct_col, 
cast(date(from_unixtime(gmt_create)) as string) -- 添加分区字段 
FROM orders

这两个任务的串行操作可以通过任务调度服务比如阿里云上DataWorks来进行串联,在 kafka -> oss 的任务完成后,运行这个 数据拆分 的任务。

总结

这篇文章介绍了如何把Kafka里面的数据实时地流入OSS,利用DLA进行高效的数据分析。借助于DataX对于Parquet复杂类型的支持,我们已经可以帮助用户把各种复杂数据搬进OSS,希望对有类似场景的客户有所帮助。

欢迎关注数据湖技术社区

数据湖开发者社区由 阿里云开发者社区 与 阿里云Data Lake Analytics团队 共同发起,致力于推广数据湖相关技术,包括hudi、delta、spark、presto、oss、元数据、存储加速、格式发现等,学习如何构建数据湖分析系统,打造适合业务的数据架构。

x

目录
相关文章
|
12月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
688 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
12月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
326 12
|
9月前
|
机器学习/深度学习 SQL 大数据
什么是数据集成?和数据融合有什么区别?
在大数据领域,“数据集成”与“数据融合”常被混淆。数据集成关注数据的物理集中,解决“数据从哪来”的问题;数据融合则侧重逻辑协同,解决“数据怎么用”的问题。两者相辅相成,集成是基础,融合是价值提升的关键。理解其差异,有助于企业释放数据潜力,避免“数据堆积”或“盲目融合”的误区,实现数据从成本到生产力的转变。
什么是数据集成?和数据融合有什么区别?
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
1054 5
|
12月前
|
消息中间件 架构师 Java
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
|
人工智能 安全 DataX
【瓴羊数据荟】 Data x AI :大模型时代的数据治理创新实践 | 瓴羊数据Meet Up城市行第三期
第三期瓴羊数据Meetup 将于2025年1月3日在线上与大家见面,共同探讨AI时代的数据治理实践。
1464 10
【瓴羊数据荟】 Data x  AI :大模型时代的数据治理创新实践 | 瓴羊数据Meet Up城市行第三期
|
DataWorks 关系型数据库 Serverless
DataWorks数据集成同步至Hologres能力介绍
本次分享的主题是DataWorks数据集成同步至Hologres能力,由计算平台的产品经理喆别(王喆)分享。介绍DataWorks将数据集成并同步到Hologres的能力。DataWorks数据集成是一款低成本、高效率、全场景覆盖的产品。当我们面向数据库级别,向Hologres进行同步时,能够实现简单且快速的同步设置。目前仅需配置一个任务,就能迅速地将一个数据库实例内的所有库表一并传输到Hologres中。
343 12
|
JSON 分布式计算 DataX
【YashanDB知识库】使用DataX工具迁移yashan数据到maxcompute
本文介绍使用崖山适配的DataX工具进行数据库迁移的方法,包括单表迁移和批量表迁移。单表迁移需配置json文件并执行同步命令;批量迁移则通过脚本自动化生成json配置文件并完成数据迁移,最后提供数据比对功能验证迁移结果。具体步骤涵盖连接信息配置、表清单获取、json文件生成、数据迁移执行及日志记录,确保数据一致性。相关工具和脚本简化了复杂迁移过程,提升效率。
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
DataWorks 关系型数据库 Serverless
DataWorks数据集成同步至Hologres能力介绍
本文由DataWorks PD王喆分享,介绍DataWorks数据集成同步至Hologres的能力。DataWorks提供低成本、高效率的全场景数据同步方案,支持离线与实时同步。通过Serverless资源组,实现灵活付费与动态扩缩容,提升隔离性和安全性。文章还详细演示了MySQL和ClickHouse整库同步至Hologres的过程。