{
"type": "job",
"steps": [
{
"stepType": "kafka",
"parameter": {
"server": “xxxx:9092,xxxx:9092", -----kafka的ip地址+服务端口号
"kafkaConfig": {
"group.id": "onaliyun_consumer_group01" -------kafka的高级扩展参数,根据业务情况配置来控制消费数据的行为。
},
"valueType": "ByteArray", ----Kafka的Value的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。
"column": [
"key", ---表示读取kafka消息的key值同步。
"value", ---表示读取kafka消息的完整内容 如果配置了这个参数,那么kafka的整个value信息都会被作为一个字段同步到目的端。
"partition", -----表示当前消息所在分区。
"offset", -----表示当前消息的偏移量。
"timestamp", -----表示当前消息的时间戳。
"'age'", -----------常量列用''包裹,目的端的对应列数据值就是age
"employee.age", -----------获取kafka value的json数据的值并将其同步到目的端,比value为 { "employee":{ "name":"Bill Gates", "age":62, "city":"Seattle" } } 则此配置会将 62取出并同步。目前仅支持读取json嵌套的最外层和一层数据,多层嵌套数据无法获取。
"event_id", -----------如果您的数据是一个JSON,支持获取JSON的属性,例如["event_id"]。如果不需要就不填不配置
],
"topic": "topic名", ------kafka topic
"beginDateTime": "'unknownunknown'", -------kafka数据抽取的开始时间,该值会被转化为unixtime后从kafka侧记时取数(闭区间) 参数配置中可填写:bizdate=$[yyyymmdd] hh=$[hh24miss] 结合调度周期使用
"endDateTime": "'unknownunknown'", -------kafka数据抽取的结束时间,该值会被转化为unixtime后从kafka侧记时终止(闭区间) 参数配置中可填写:bizdate=$[yyyymmdd] hh=$[hh24miss] 结合调度周期使用
"keyType": "ByteArray", -------指定key数据类型
"waitTime": "10"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "odps",
"parameter": {
"partition": "dt='unknown'", ----odps表分区配置,可用参数替换
"truncate": true,
"datasource": "odps_first", -----odps数据源名
"envType": 1,
"column": [
"*" ------------odps表列信息配置
],
"emptyAsNull": true, ----------来源端空值作为null写入
"table": "xxxx" -----odps表名
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"executeMode": null, --------是否开启分布式运行模式(独享集成资源组两个及以上可配置)
"errorLimit": {
"record": "" ----------空值表示允许脏数据,脏数据会默认被丢弃
},
"speed": {
"concurrent": 2, ------并发数
"throttle": false
}
}
}
Kafka数据迁移MaxCompute最佳实践,此回答整理自钉群“DataWorks交流群(答疑@机器人)”
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
MaxCompute(原ODPS)是一项面向分析的大数据计算服务,它以Serverless架构提供快速、全托管的在线数据仓库服务,消除传统数据平台在资源扩展性和弹性方面的限制,最小化用户运维投入,使您经济并高效的分析处理海量数据。