阿里云Dataworks离线数据同步写入Kafka

本文涉及的产品
大数据开发治理平台 DataWorks,不限时长
简介: 之前介绍过Dataworks实时数据同步(Kafka -> maxcompute),这里介绍如何使用离线同步的方法,将maxcompute数据同步到阿里云Kafka Topic。

Step By Step

1、kafka实例的创建&独享数据集成资源组的创建参考博客(资源创建部分):
Dataworks实时数据同步(Kafka -> maxcompute)

2、数据集成配置Kafka数据源&测试连通性
image.png

3、maxcompute创建测试数据表

CREATE TABLE IF NOT EXISTS odps_to_kafka1(key1 STRING,value1 STRING);

INSERT INTO odps_to_kafka1 VALUES ("key_key1","value_value1");
INSERT INTO odps_to_kafka1 VALUES ("key_key2","value_value2");
INSERT INTO odps_to_kafka1 VALUES ("key_key3","value_value3");
INSERT INTO odps_to_kafka1 VALUES ("key_key4","value_value4");
INSERT INTO odps_to_kafka1 VALUES ("key_key5","value_value5");

SELECT * FROM odps_to_kafka1;

image.png

4、配置离线同步脚本(注意目前Kafka仅支持脚本模式,不支持想到模式)

{
    "type": "job",
    "steps": [
        {
            "stepType": "odps",
            "parameter": {
                "partition": [],
                "datasource": "odps_first",
                "envType": 1,
                "column": [
                    "key1",
                    "value1"
                ],
                "table": "odps_to_kafka1"  // maxcompute中表的名称
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "Kafka",
            "parameter": {
                "server": "192.168.0.67:9092,192.168.0.66:9092,192.168.0.65:9092", // 注意配置kafka内网地址
                "keyIndex": 0,   // key值对应maxcompute读取column的第一列
                "valueIndex": 1,  // value值对应maxcompute读取column的第二列
                "valueType": "BYTEARRAY",
                "topic": "from_odps1",  // kafka 中表的名称
                "batchSize": 1024,
                "keyType": "BYTEARRAY"
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "2"
        },
        "speed": {
            "throttle": false,
            "concurrent": 2
        }
    }
}
注意: 保存脚本的时候如果提示不满足json格式规范,将注释部分删除即可。

5、执行同步任务
image.png

6、Kafka控制台查看数据同步情况
image.png

更多参考

Kafka Writer
Dataworks实时数据同步(Kafka -> maxcompute)
新增和使用独享数据集成资源组

相关文章
|
1月前
|
SQL 数据采集 分布式计算
DataWorks常见问题之添加阿里云selectdb失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
2月前
|
消息中间件 监控 数据挖掘
NineData:从Kafka到ClickHouse的数据同步解决方案
NineData 提供了强大的数据转换和映射功能,以解决 Kafka 和 ClickHouse 之间的格式和结构差异,确保数据在同步过程中的一致性和准确性。
64 2
NineData:从Kafka到ClickHouse的数据同步解决方案
|
1月前
|
数据库
阿里云DTS数据迁移和数据同步的差异性分析
阿里云DTS作为一款常用的数据库表迁移工具,提供了功能非常类似的两个功能:数据迁移、数据同步。阿里云DTS产品官网对这两个功能模块进行了简单的区分: 场景1:存量数据批量迁移,建议使用数据迁移功能。 场景2:增量数据实时同步,建议使用数据同步功能。 实际上,无论是数据迁移还是数据同步,都可以做 “结构初始化”+“全量数据迁移”+“增量迁移”,因此两者功能差异并不明显。笔者在多个项目实践DTS数据迁移,在简单需求场景下,将DTS的数据迁移、数据同步进行对比和总结。
|
23天前
|
消息中间件 NoSQL Kafka
云原生最佳实践系列 5:基于函数计算 FC 实现阿里云 Kafka 消息内容控制 MongoDB DML 操作
该方案描述了一个大数据ETL流程,其中阿里云Kafka消息根据内容触发函数计算(FC)函数,执行针对MongoDB的增、删、改操作。
|
1月前
|
消息中间件 分布式计算 DataWorks
DataWorks常见问题之sap haha数据同步kafka如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
48 6
|
1月前
|
消息中间件 Cloud Native Kafka
活动报名|AutoMQ x 阿里云云原生创新论坛(2024.03.09)见证“新一代云原生 Kafka ”重磅发布!
新一年, AutoMQ 首场线下活动重磅来袭!2024年3月9日,由 AutoMQ 与阿里云联合举办的云原生创新论坛将于杭州与大家见面,双方联合重磅发布新一代云原生 Kafka ——AutoMQ On-Prem 版本 !现场将会分享如何通过云原生和存算分离架构实现 Kafka 产品的10倍成本优化,并保持秒级分区无损迁移。另外,活动现场还有来自得物的技术专家分享 AutoMQ 在生产场景中的应用实践,以及阿里云的资深专家为大家剖析多 AZ 块存储的原理。
124 0
活动报名|AutoMQ x 阿里云云原生创新论坛(2024.03.09)见证“新一代云原生 Kafka ”重磅发布!
|
1月前
|
人工智能 DataWorks 数据可视化
心动基于阿里云DataWorks构建游戏行业通用大数据模型
心动游戏在阿里云上构建云原生大数据平台,基于DataWorks构建行业通用大数据模型,如玩家、产品、SDK、事件、发行等,满足各种不同的分析型应用的要求,如AI场景、风控场景、数据分析场景等。
334 1
|
2月前
|
SQL 消息中间件 关系型数据库
Flink CDC数据同步问题之向kafka同步数据报错如何解决
Flink CDC数据同步是指利用Flink CDC实现不同数据源之间的实时数据同步任务;本合集旨在提供Flink CDC数据同步的操作指南、性能优化建议和常见问题处理,助力用户高效实施数据同步。
|
3月前
|
消息中间件 SQL Java
阿里云Flink-自定义kafka sink partitioner实践及相关踩坑记录
阿里云Flink-自定义kafka sink partitioner实践及相关踩坑记录
|
4月前
|
数据采集 DataWorks 数据安全/隐私保护
有没有方式可以实现dataworks数据迁移(从阿里云一个账号迁移到另外一个账号)?
有没有方式可以实现dataworks数据迁移(从阿里云一个账号迁移到另外一个账号)?
78 0

热门文章

最新文章