开发者社区 > 大数据与机器学习 > 大数据计算 MaxCompute > 正文

DataWorks中kafka数据迁移maxcomput?

DataWorks中kafka数据迁移maxcomput?

展开
收起
真的很搞笑 2023-09-06 14:10:57 49 0
1 条回答
写回答
取消 提交回答
  • {
    "type": "job",
    "steps": [
    {
    "stepType": "kafka",
    "parameter": {
    "server": “xxxx:9092,xxxx:9092", -----kafka的ip地址+服务端口号
    "kafkaConfig": {
    "group.id": "onaliyun_consumer_group01" -------kafka的高级扩展参数,根据业务情况配置来控制消费数据的行为。
    },
    "valueType": "ByteArray", ----Kafka的Value的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。
    "column": [
    "key", ---表示读取kafka消息的key值同步。
    "value", ---表示读取kafka消息的完整内容 如果配置了这个参数,那么kafka的整个value信息都会被作为一个字段同步到目的端。
    "partition", -----表示当前消息所在分区。
    "offset", -----表示当前消息的偏移量。
    "timestamp", -----表示当前消息的时间戳。
    "'age'", -----------常量列用''包裹,目的端的对应列数据值就是age
    "employee.age", -----------获取kafka value的json数据的值并将其同步到目的端,比value为 { "employee":{ "name":"Bill Gates", "age":62, "city":"Seattle" } } 则此配置会将 62取出并同步。目前仅支持读取json嵌套的最外层和一层数据,多层嵌套数据无法获取。
    "event_id", -----------如果您的数据是一个JSON,支持获取JSON的属性,例如["event_id"]。如果不需要就不填不配置
    ],
    "topic": "topic名", ------kafka topic
    "beginDateTime": "'unknownunknown'", -------kafka数据抽取的开始时间,该值会被转化为unixtime后从kafka侧记时取数(闭区间) 参数配置中可填写:bizdate=$[yyyymmdd] hh=$[hh24miss] 结合调度周期使用
    "endDateTime": "'unknownunknown'", -------kafka数据抽取的结束时间,该值会被转化为unixtime后从kafka侧记时终止(闭区间) 参数配置中可填写:bizdate=$[yyyymmdd] hh=$[hh24miss] 结合调度周期使用
    "keyType": "ByteArray", -------指定key数据类型
    "waitTime": "10"
    },
    "name": "Reader",
    "category": "reader"
    },
    {
    "stepType": "odps",
    "parameter": {
    "partition": "dt='unknown'", ----odps表分区配置,可用参数替换
    "truncate": true,
    "datasource": "odps_first", -----odps数据源名
    "envType": 1,
    "column": [
    "*" ------------odps表列信息配置
    ],
    "emptyAsNull": true, ----------来源端空值作为null写入
    "table": "xxxx" -----odps表名
    },
    "name": "Writer",
    "category": "writer"
    }
    ],
    "version": "2.0",
    "order": {
    "hops": [
    {
    "from": "Reader",
    "to": "Writer"
    }
    ]
    },
    "setting": {
    "executeMode": null, --------是否开启分布式运行模式(独享集成资源组两个及以上可配置)
    "errorLimit": {
    "record": "" ----------空值表示允许脏数据,脏数据会默认被丢弃
    },
    "speed": {
    "concurrent": 2, ------并发数
    "throttle": false
    }
    }
    }

    Kafka数据迁移MaxCompute最佳实践,此回答整理自钉群“DataWorks交流群(答疑@机器人)”

    2023-09-06 15:09:17
    赞同 展开评论 打赏

MaxCompute(原ODPS)是一项面向分析的大数据计算服务,它以Serverless架构提供快速、全托管的在线数据仓库服务,消除传统数据平台在资源扩展性和弹性方面的限制,最小化用户运维投入,使您经济并高效的分析处理海量数据。

相关电子书

更多
DataWorks调度任务迁移最佳实践-2020飞天大数据平台实战应用第一季 立即下载
DataWorks商业化资源组省钱秘籍-2020飞天大数据平台实战应用第一季 立即下载
基于DataWorks数据服务构建疫情大屏-2020飞天大数据平台实战应用第一季 立即下载

相关实验场景

更多