DataWorks中kafka_odps样例是什么?

DataWorks中kafka_odps样例是什么?

展开
收起
真的很搞笑 2023-06-27 10:47:39 56 分享 版权
1 条回答
写回答
取消 提交回答
  • {
    "type": "job",
    "steps": [
    {
    "stepType": "kafka",
    "parameter": {
    "server": “xxxx:9092,xxxx:9092",        -----kafka的ip地址+服务端口号
    "kafkaConfig": {
    "group.id": "onaliyun_consumer_group01" -------kafka的高级扩展参数,根据业务情况配置来控制消费数据的行为。
    },
    "valueType": "ByteArray",              ----Kafka的Value的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。
    "column": [
    "key",                             ---表示读取kafka消息的key值同步。
    "value",                           ---表示读取kafka消息的完整内容 如果配置了这个参数,那么kafka的整个value信息都会被作为一个字段同步到目的端。
    "partition",                       -----表示当前消息所在分区。
    "offset",                          -----表示当前消息的偏移量。
    "timestamp",                       -----表示当前消息的时间戳。
    "'age'",                               -----------常量列用''包裹,目的端的对应列数据值就是age
    "employee.age",                        -----------获取kafka value的json数据的值并将其同步到目的端,比value为 { "employee":{ "name":"Bill Gates", "age":62, "city":"Seattle" } } 则此配置会将 62取出并同步。目前仅支持读取json嵌套的最外层和一层数据,多层嵌套数据无法获取。
    "event_id",                            -----------如果您的数据是一个JSON,支持获取JSON的属性,例如["event_id"]。如果不需要就不填不配置
    ],
    "topic": "topic名",           ------kafka topic
    "beginDateTime": "'unknownunknown'",     -------kafka数据抽取的开始时间,该值会被转化为unixtime后从kafka侧记时取数(闭区间) 参数配置中可填写:bizdate=$[yyyymmdd] hh=$[hh24miss] 结合调度周期使用
    "endDateTime": "'unknownunknown'",       -------kafka数据抽取的结束时间,该值会被转化为unixtime后从kafka侧记时终止(闭区间) 参数配置中可填写:bizdate=$[yyyymmdd] hh=$[hh24miss] 结合调度周期使用
    "keyType": "ByteArray",                -------指定key数据类型
    "waitTime": "10"
    },
    "name": "Reader",
    "category": "reader"
    },
    {
    "stepType": "odps",
    "parameter": {
    "partition": "dt='unknown'",    ----odps表分区配置,可用参数替换
    "truncate": true,
    "datasource": "odps_first",     -----odps数据源名
    "envType": 1,
    "column": [
    "*"                                ------------odps表列信息配置
    ],
    "emptyAsNull": true,  ----------来源端空值作为null写入
    "table": "xxxx"     -----odps表名
    },
    "name": "Writer",
    "category": "writer"
    }
    ],
    "version": "2.0",
    "order": {
    "hops": [
    {
    "from": "Reader",
    "to": "Writer"
    }
    ]
    },
    "setting": {
    "executeMode": null,         --------是否开启分布式运行模式(独享集成资源组两个及以上可配置)
    "errorLimit": {
    "record": ""             ----------空值表示允许脏数据,脏数据会默认被丢弃
    },
    "speed": {
    "concurrent": 2,        ------并发数
    "throttle": false
    }
    }
    }
    ,此回答整理自钉群“DataWorks交流群(答疑@机器人)”
    2023-06-27 10:52:18
    赞同 展开评论

DataWorks基于MaxCompute/Hologres/EMR/CDP等大数据引擎,为数据仓库/数据湖/湖仓一体等解决方案提供统一的全链路大数据开发治理平台。

还有其他疑问?
咨询AI助理