Step By Step
1、kafka实例的创建&独享数据集成资源组的创建参考博客(资源创建部分):
Dataworks实时数据同步(Kafka -> maxcompute)
2、数据集成配置Kafka数据源&测试连通性
3、maxcompute创建测试数据表
CREATE TABLE IF NOT EXISTS odps_to_kafka1(key1 STRING,value1 STRING);
INSERT INTO odps_to_kafka1 VALUES ("key_key1","value_value1");
INSERT INTO odps_to_kafka1 VALUES ("key_key2","value_value2");
INSERT INTO odps_to_kafka1 VALUES ("key_key3","value_value3");
INSERT INTO odps_to_kafka1 VALUES ("key_key4","value_value4");
INSERT INTO odps_to_kafka1 VALUES ("key_key5","value_value5");
SELECT * FROM odps_to_kafka1;
4、配置离线同步脚本(注意目前Kafka仅支持脚本模式,不支持想到模式)
{
"type": "job",
"steps": [
{
"stepType": "odps",
"parameter": {
"partition": [],
"datasource": "odps_first",
"envType": 1,
"column": [
"key1",
"value1"
],
"table": "odps_to_kafka1" // maxcompute中表的名称
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "Kafka",
"parameter": {
"server": "192.168.0.67:9092,192.168.0.66:9092,192.168.0.65:9092", // 注意配置kafka内网地址
"keyIndex": 0, // key值对应maxcompute读取column的第一列
"valueIndex": 1, // value值对应maxcompute读取column的第二列
"valueType": "BYTEARRAY",
"topic": "from_odps1", // kafka 中表的名称
"batchSize": 1024,
"keyType": "BYTEARRAY"
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "2"
},
"speed": {
"throttle": false,
"concurrent": 2
}
}
}
注意: 保存脚本的时候如果提示不满足json格式规范,将注释部分删除即可。
5、执行同步任务
6、Kafka控制台查看数据同步情况
更多参考
Kafka Writer
Dataworks实时数据同步(Kafka -> maxcompute)
新增和使用独享数据集成资源组