阿里云Dataworks离线数据同步写入Kafka

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
大数据开发治理平台DataWorks,资源组抵扣包 750CU*H
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 之前介绍过Dataworks实时数据同步(Kafka -> maxcompute),这里介绍如何使用离线同步的方法,将maxcompute数据同步到阿里云Kafka Topic。

Step By Step

1、kafka实例的创建&独享数据集成资源组的创建参考博客(资源创建部分):
Dataworks实时数据同步(Kafka -> maxcompute)

2、数据集成配置Kafka数据源&测试连通性
image.png

3、maxcompute创建测试数据表

CREATE TABLE IF NOT EXISTS odps_to_kafka1(key1 STRING,value1 STRING);

INSERT INTO odps_to_kafka1 VALUES ("key_key1","value_value1");
INSERT INTO odps_to_kafka1 VALUES ("key_key2","value_value2");
INSERT INTO odps_to_kafka1 VALUES ("key_key3","value_value3");
INSERT INTO odps_to_kafka1 VALUES ("key_key4","value_value4");
INSERT INTO odps_to_kafka1 VALUES ("key_key5","value_value5");

SELECT * FROM odps_to_kafka1;

image.png

4、配置离线同步脚本(注意目前Kafka仅支持脚本模式,不支持想到模式)

{
    "type": "job",
    "steps": [
        {
            "stepType": "odps",
            "parameter": {
                "partition": [],
                "datasource": "odps_first",
                "envType": 1,
                "column": [
                    "key1",
                    "value1"
                ],
                "table": "odps_to_kafka1"  // maxcompute中表的名称
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "Kafka",
            "parameter": {
                "server": "192.168.0.67:9092,192.168.0.66:9092,192.168.0.65:9092", // 注意配置kafka内网地址
                "keyIndex": 0,   // key值对应maxcompute读取column的第一列
                "valueIndex": 1,  // value值对应maxcompute读取column的第二列
                "valueType": "BYTEARRAY",
                "topic": "from_odps1",  // kafka 中表的名称
                "batchSize": 1024,
                "keyType": "BYTEARRAY"
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "2"
        },
        "speed": {
            "throttle": false,
            "concurrent": 2
        }
    }
}
注意: 保存脚本的时候如果提示不满足json格式规范,将注释部分删除即可。

5、执行同步任务
image.png

6、Kafka控制台查看数据同步情况
image.png

更多参考

Kafka Writer
Dataworks实时数据同步(Kafka -> maxcompute)
新增和使用独享数据集成资源组

相关文章
|
3月前
|
消息中间件 关系型数据库 调度
离线数据同步变迁
本文介绍了从第一代基于Hadoop体系的离线数据同步,到第二代基于DolphinScheduler和StarRocks的改进方案,再到第三代基于Python自定义的离线数据同步的演变过程。每一代方案都在不断优化,以适应日益增长的数据量和复杂的业务需求。
离线数据同步变迁
|
8月前
|
监控 数据挖掘 大数据
阿里云开源利器:DataX3.0——高效稳定的离线数据同步解决方案
对于需要集成多个数据源进行大数据分析的场景,DataX3.0同样提供了有力的支持。企业可以使用DataX将多个数据源的数据集成到一个统一的数据存储系统中,以便进行后续的数据分析和挖掘工作。这种集成能力有助于提升数据分析的效率和准确性,为企业决策提供有力支持。
|
8月前
|
消息中间件 SQL 分布式计算
DataWorks产品使用合集之如何离线增量同步Kafka数据,并指定时间范围进行同步
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
9月前
|
消息中间件 分布式计算 DataWorks
DataWorks产品使用合集之如果设置了从Kafka数据源同步到MaxCompute(mc)的任务,任务一直在执行中,是什么原因
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
77 10
|
9月前
|
分布式计算 DataWorks 关系型数据库
DataWorks操作报错合集之离线同步任务中,把表数据同步到POLARDB,显示所有数据都是脏数据,报错信息:ERROR JobContainer - 运行scheduler 模式[local]出错.是什么原因
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
9月前
|
消息中间件 DataWorks 安全
DataWorks产品使用合集之如何处理Kafka数据
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
DataWorks产品使用合集之如何处理Kafka数据
|
9月前
|
消息中间件 JSON 分布式计算
离线数仓(四)【数仓数据同步策略】(3)
离线数仓(四)【数仓数据同步策略】
|
9月前
|
消息中间件 JSON Java
离线数仓(四)【数仓数据同步策略】(4)
离线数仓(四)【数仓数据同步策略】
|
2月前
|
DataWorks 监控 数据建模
DataWorks产品体验评测
DataWorks产品体验评测
|
2月前
|
分布式计算 DataWorks 搜索推荐
DataWorks 产品评测与最佳实践探索!
DataWorks 是阿里巴巴推出的一站式智能大数据开发治理平台,内置15年实践经验,集成多种大数据与AI服务。本文通过实际使用角度,探讨其优势、潜力及改进建议。评测涵盖用户画像分析、数据治理、功能表现等方面,适合数字化转型企业参考。
51 1