开发者学堂课程【SaaS 模式云数据仓库实战:Kafka 数据如何MaxComput】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/760/detail/13341
Kafka 数据如何同步到 MaxCompute
六、同步过程及其注意事项
1.DataWorks数据集成操作
(1)进入DataWorks操作界面,点击创建业务流程,在新建的业务流程里添加数据同步节点。
(2)数据源选择kafka和modeps,进入数据同步节点,点击数据源为Kafka,点击转化为脚本模式。右上角有一个帮助面板,同步的参数就近可以点击一下,更加方便操作。
2.Kafka Reader的主要参数讲解
(1)server
Kafka的broker server地址(默认接入点),格式为ip:poxt.这个是必填的。
(2)topic
Kafka的topic,是Kafka处理资源的消息源(feeds of messages)的聚合。
(3)column
需要读取的Kafka数据,支持常量列、数据列和属性列。
常量列:使用单引号包裹的列为常量列,例如["abc"、“123”]。
数据列
如果您的数据是一个JSON,支持获取JSON的属性,例如["event_ jd].
如果您的数据是一个JSON, 支持获取JSON的嵌套子属性,例如[tag desc"].
属性列
__key__表示消息的key。
__value___表示消息的完整内容。
__partition__ 表示当前消息所在分区。
__headers__表示当前消息headers信息。
__offset__表示当前消息的偏移量。
__timestamp__表示当前消息的时间戳。
同步的消息一般存放在value中,column是必填的。
(4)keyType
Kafka的key的类型,包括BYTEARRAY. DOUBLE、 FLOAT、INTEGER、LONG和SHORT.它是必填的。根据资源同步数据选择信息,相应同步一个类型。
(5)valueType
Kafka的value的类型,包括BYTEARRAY、 DOUBLE、FLOAT、INTEGER.、LONG和SHORT.它是必填的。
需要选择是按照消息时间同步还是按照消费点位同步,有以下四种类型
(6)beginDateTime
数据消费的开始时间位点,为时间范围(左闭右开)的左边界。ysymmddhhmmss格式的时间字符串,可以和时间属性配合使用。Kafka 0.10.2以上的版本支持此功能。beginDateTime和beginOffset是二选其一的,beginDateTime和enddatetime配合使用。注意Kafka 0.10.2以上的版本才支持消费时间方式。
(7)enddatetime
数据消费的结束时间位点,为时间范围(左闭右开)的右边界。yryymnddhhmmss格式的时间字符串,可以和时间属性配合使用。Kafka 0.10 2以上的版本支持此功能。enddatetime需要和endoffset二选一,enddatetime和begindatetime配合使用。
(8)beginoffset
数据消费的开始时间位点,您可以配置以下形式。
●例如15553274的数字形式,表示开始消费的点位。
●seekToBeginning: 表示从开始点位消费数据。
●seekToLast: 表示从上次的偏移位置读取数据。按照时间消费的话,可以多次进行消费,取决于消费存放时间的多少。
●seekToEnd: 表示从最后点位消费数据,会读取到空数据。
需要和begindatetime二选一。
(9)endOffset
数据消费的结束位点,用来控制什么时候应该结束数据消费任务退出。
需要和enddatetime二选一。
(10)skipExceedRecord
Kafka使用public ConsumerRecords poll (final Duration timeout) 消费数据, 一次poll调用获取的数据可能在endOffset或者endDateTime之外。skipExceedRecord用来控制这些多余的数据是否写出到目的端。由于消费数据使用了自动点位提交,建议:
●Kafka 0.10.2之前版本:建议skipExceedRecord配置为false.
●Kafka 0.10.2及以上版本:建议skipExceedRecord配置为true.
一般为否,默认值为false
(11)partition
Kafka的一个topic有多个分区 (partition) ,正常情况下数据同步任务是读取topic (多个分区)一个 点位区间的数据。您也可以指定partition,仅读取一个分区点位区间的数据。一般为否,无默认值。
(12)kafkaConfig
创建Kafka数据消费客户端KafkaConsumer可以指定扩展参数,例如bootstrap. servers. auto. commit. interval. ms.session. timeout. ms等,您可以基于kafkaConfig控制KafkaConsumer消费数据的行为。为否
2.MaxCompute Writer的主要参数讲解
(1)datasource
数据源名称,脚本模式支持深加数据源,此配置项填写的内容必须与添加的数据源名称保持一致。
(2)table
写入的数据表的表称(大小写不敏感),不支持填写多张表。
(3)partition
需要写入数据表的分区信息,必须指定到最后一级分区。例如把数据写入一个三级分区表,必须配置到最后一级分区,例如pt=20150101, type=1,biz=2。
●对于非分区表,该值务必不要填写,表示直接导入至目标表。
●MaxCompute wiriter不支持数据路由写入,对于分区表请务必保证写入
数据到最后一级分区。
此处在操作时可能出现错误,如果表为分区表,则必填。如果表为非分区表,则不能填写。
(4)column
与kafka中的column相关值一一对应。
需要导入的字段列表。当导入全部字段时,可以配置为"eoluan" :["*"]。 当需要插入部分MaxCompute列,则填写部分列,例如"column"; ["id", "name"]。
●MaxCompute Witer支持列强选。列换序。例如-张表中有a. bIOc三个
字段,您只同步cICb两个字段,则可以配置为"eolun" :["c","b"] 。在导入过程中,字段a自动补空。设置为null.
●column必须显示指定同步的列重台,不允许为空。
(5)truncate
通过配置“truncate": true" 保证写入的幕等性。 即当出现写入失败再次运行时,MaxCompute Witer将清理前述数据,并导入新数据,可以保证每次重跑之后的数据都保持一致.
因为利用MaxCompute SQL进行数据清理工作,SQL无法做到原子性,所以runcate选项不是原子操作。因此当多个任务同时向一个Table/Partion清理分区时,可能出现并发时序问题,请务心注息。
针对这类问题,建议您尽量不要多个作业DDL同时操作同一个分区,或者在多个并发作业启动前,提前创建分区。
3.Kafka2MaxCompute脚本模式编写
4.kafka同步数据到MaxCompute
针对于上面的图主要拆分为三部分
(1)kafka的Reader
运行代码如下
”type" :“ job" ,
"steps" : [{
stepType" :' kafka' ,
'parameter" : {
"server" :“******".
end0ffset":"000000",
"kafkaConfig" : {
"group. id" :
"FinancesGroup"
"valuerpe": "Byterray",
" column": [
"__value__",
"__timestamp__",
"__partition__"
"__offset__"
],
” topic" :"finances_ topic" ,
"keyType" :" ByteArray',
"beginOffset" :" seekToLast'
"name" :“Reader",
” category" :" reader"
}
Server、endoffset、kafkaconfig、groupid、valuvetype类型、column字段等等
(2)MaxCompute的Weriter
代码运行如下
{
'stepType" :”odps" ,
"parameter" :{
"truncate" : false,
"compress" : false,
"datasource" :"odps_ first" ,
"column" : [
"value",
"timestamp1" , .
"partition" ,
"offset"
]
"emptyAsNull" : false,
" table" :“testkafka3"
},
"name" :"Writer",
"category" :”writer"
},
]
Truncate覆盖、compress压缩,同步哪个表以及相关字段
(3)限制参数
主要是errorlimit发布报错,speed流速是多少等等都可以做一些限制
"version": “2.0",
"order": {
"hops": [{
"from":" Reader" ,
"to":'" Writer"
}]
"setting" : {
"errorLimit": {
" record" : "4"
},
"speed": {
"throttle" : false,
"concurrent" : 1
}
}
5.参考Kafka生产者SDK编写代码
代码主要是配置数据的读取,默认接入点以及协议、区别化方式、请求等待时间,发送的topic是哪个topic,里面涉及到发送的消息是什么样的消息,消息这块是需要去编写代码的,发送完以后会回传信息
详细代码参考文档涉及到配置文件,消息来源,生产者消费者的代码模板:
https:/help. aliyun. com/document detai1/99957. htm1?spm=a2c4g.11186623. 6. 566. 45fc54eayX69b0
6.代码打包运行在ECS上(与kafka同一个可用区)
(1)执行crontab - e执行定时任务发送消息
017***java -jar home/export/upload/javaCode3.jar > >
/home/export/uploadlogfile. log
(2)查看发送消息的定时任务日志
7.在MaxCompute上创建表
(1)创建目标表界面
(2)DDL语句
CREATE TABLE ^testkafka3^ (
^Value^ string,
^times tampl^ string,
^partition^ string,
^offset^string
);
可以根据不同的业务去创建不同的表和字段
七、开发测试以及生产部署
1.选择自定义资源组(或独享集成资源组)进行同步操作
右上角有一个配置,资源组,选择使用自定义资源组还是独享集成资源组,点击执行,执行完之后会有标识有多少条记录,还有是否成功,同步过程就结束了。
(1)选择可使用的独享资源组与自定义资源组进行同步。
(2)同步任务成功会显示,同步数据记录以及结果标志。
2.查询同步的数据结果
在DataWorks的临时界面查看同步数据结果。数据同步过来说明测试成功了。
3.设置调度参数
(1)点击右侧调度配置,输入调入时间。
(2)参考DataWork官方文档完善业务处理流程。
开发业务流程跟开发数据同步过来,根据相关模型进行业务处理,最后会设计一些SQL节点,会做一些部署。
4.提交业务流程节点,并打包发布
(1)点击业务流程,提交业务流程节点。
(2)进入任务发布界面,将节点添加到待发布进行任务的部署。
5.确认业务流程发布成功
在运维中心页面,确认发布是否在生产环境中存在,至此Kafka同步数据MaxCompute 过程结束。