Kafka 数据如何同步到 MaxCompute | 学习笔记(二)

简介: 快速学习 Kafka 数据如何同步到 MaxCompute

开发者学堂课程【SaaS 模式云数据仓库实战Kafka 数据如何MaxComput】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/760/detail/13341


Kafka 数据如何同步到 MaxCompute


六、同步过程及其注意事项

1.DataWorks数据集成操作

(1)进入DataWorks操作界面,点击创建业务流程,在新建的业务流程里添加数据同步节点。

(2)数据源选择kafka和modeps,进入数据同步节点,点击数据源为Kafka,点击转化为脚本模式。右上角有一个帮助面板,同步的参数就近可以点击一下,更加方便操作。

2.Kafka Reader的主要参数讲解

(1)server

Kafka的broker server地址(默认接入点),格式为ip:poxt.这个是必填的。

(2)topic

Kafka的topic,是Kafka处理资源的消息源(feeds of messages)的聚合。

(3)column

需要读取的Kafka数据,支持常量列、数据列和属性列。

常量列:使用单引号包裹的列为常量列,例如["abc"、“123”]。

数据列

如果您的数据是一个JSON,支持获取JSON的属性,例如["event_ jd].

如果您的数据是一个JSON, 支持获取JSON的嵌套子属性,例如[tag desc"].

属性列

__key__表示消息的key。

__value___表示消息的完整内容。

__partition__ 表示当前消息所在分区。

__headers__表示当前消息headers信息。

__offset__表示当前消息的偏移量。

__timestamp__表示当前消息的时间戳。

同步的消息一般存放在value中,column是必填的。

(4)keyType

Kafka的key的类型,包括BYTEARRAY. DOUBLE、 FLOAT、INTEGER、LONG和SHORT.它是必填的。根据资源同步数据选择信息,相应同步一个类型。

(5)valueType

Kafka的value的类型,包括BYTEARRAY、 DOUBLE、FLOAT、INTEGER.、LONG和SHORT.它是必填的。

需要选择是按照消息时间同步还是按照消费点位同步,有以下四种类型

(6)beginDateTime

数据消费的开始时间位点,为时间范围(左闭右开)的左边界。ysymmddhhmmss格式的时间字符串,可以和时间属性配合使用。Kafka 0.10.2以上的版本支持此功能。beginDateTime和beginOffset是二选其一的,beginDateTime和enddatetime配合使用。注意Kafka 0.10.2以上的版本才支持消费时间方式。

(7)enddatetime

数据消费的结束时间位点,为时间范围(左闭右开)的右边界。yryymnddhhmmss格式的时间字符串,可以和时间属性配合使用。Kafka 0.10 2以上的版本支持此功能。enddatetime需要和endoffset二选一,enddatetime和begindatetime配合使用。

(8)beginoffset

数据消费的开始时间位点,您可以配置以下形式。

●例如15553274的数字形式,表示开始消费的点位。

●seekToBeginning: 表示从开始点位消费数据。

●seekToLast: 表示从上次的偏移位置读取数据。按照时间消费的话,可以多次进行消费,取决于消费存放时间的多少。

●seekToEnd: 表示从最后点位消费数据,会读取到空数据。

需要和begindatetime二选一。

(9)endOffset

数据消费的结束位点,用来控制什么时候应该结束数据消费任务退出。

需要和enddatetime二选一。

(10)skipExceedRecord

Kafka使用public ConsumerRecords poll (final Duration timeout) 消费数据, 一次poll调用获取的数据可能在endOffset或者endDateTime之外。skipExceedRecord用来控制这些多余的数据是否写出到目的端。由于消费数据使用了自动点位提交,建议:

●Kafka 0.10.2之前版本:建议skipExceedRecord配置为false.

●Kafka 0.10.2及以上版本:建议skipExceedRecord配置为true.

一般为否,默认值为false

(11)partition

Kafka的一个topic有多个分区 (partition) ,正常情况下数据同步任务是读取topic (多个分区)一个 点位区间的数据。您也可以指定partition,仅读取一个分区点位区间的数据。一般为否,无默认值。

(12)kafkaConfig

创建Kafka数据消费客户端KafkaConsumer可以指定扩展参数,例如bootstrap. servers. auto. commit. interval. ms.session. timeout. ms等,您可以基于kafkaConfig控制KafkaConsumer消费数据的行为。为否

2.MaxCompute Writer的主要参数讲解

(1)datasource

数据源名称,脚本模式支持深加数据源,此配置项填写的内容必须与添加的数据源名称保持一致。

(2)table

写入的数据表的表称(大小写不敏感),不支持填写多张表。

(3)partition

需要写入数据表的分区信息,必须指定到最后一级分区。例如把数据写入一个三级分区表,必须配置到最后一级分区,例如pt=20150101, type=1,biz=2。

●对于非分区表,该值务必不要填写,表示直接导入至目标表。

●MaxCompute wiriter不支持数据路由写入,对于分区表请务必保证写入

数据到最后一级分区。

此处在操作时可能出现错误,如果表为分区表,则必填。如果表为非分区表,则不能填写。

(4)column

与kafka中的column相关值一一对应。

需要导入的字段列表。当导入全部字段时,可以配置为"eoluan" :["*"]。 当需要插入部分MaxCompute列,则填写部分列,例如"column"; ["id", "name"]。

●MaxCompute Witer支持列强选。列换序。例如-张表中有a. bIOc三个

字段,您只同步cICb两个字段,则可以配置为"eolun" :["c","b"] 。在导入过程中,字段a自动补空。设置为null.

●column必须显示指定同步的列重台,不允许为空。

(5)truncate

通过配置“truncate": true" 保证写入的幕等性。 即当出现写入失败再次运行时,MaxCompute Witer将清理前述数据,并导入新数据,可以保证每次重跑之后的数据都保持一致.

因为利用MaxCompute SQL进行数据清理工作,SQL无法做到原子性,所以runcate选项不是原子操作。因此当多个任务同时向一个Table/Partion清理分区时,可能出现并发时序问题,请务心注息。

针对这类问题,建议您尽量不要多个作业DDL同时操作同一个分区,或者在多个并发作业启动前,提前创建分区。

3.Kafka2MaxCompute脚本模式编写

4.kafka同步数据到MaxCompute

针对于上面的图主要拆分为三部分

(1)kafka的Reader

运行代码如下

”type" :“ job" ,

"steps" : [{

stepType" :' kafka' ,

'parameter" : {

"server" :“******".

end0ffset":"000000",

"kafkaConfig" : {

"group. id" :

"FinancesGroup"

"valuerpe": "Byterray",

" column": [

"__value__",

"__timestamp__",

"__partition__"

"__offset__"

],

” topic" :"finances_ topic" ,

"keyType" :" ByteArray',

"beginOffset" :" seekToLast'

"name" :“Reader",

” category" :" reader"

}

Server、endoffset、kafkaconfig、groupid、valuvetype类型、column字段等等

(2)MaxCompute的Weriter

代码运行如下

{

'stepType" :”odps" ,

"parameter" :{

"truncate" : false,

"compress" : false,  

"datasource" :"odps_ first" ,

"column" : [

"value",

"timestamp1" , .

"partition" ,

"offset"

]

"emptyAsNull" : false,

" table" :“testkafka3"

},

"name" :"Writer",

"category" :”writer"

},

]

Truncate覆盖、compress压缩,同步哪个表以及相关字段

(3)限制参数

主要是errorlimit发布报错,speed流速是多少等等都可以做一些限制

"version": “2.0",

"order": {

"hops": [{

"from":" Reader" ,

"to":'" Writer"

}]

"setting" : {

"errorLimit": {

" record" : "4"

},

"speed": {

"throttle" : false,

"concurrent" : 1

}

}

5.参考Kafka生产者SDK编写代码

代码主要是配置数据的读取,默认接入点以及协议、区别化方式、请求等待时间,发送的topic是哪个topic,里面涉及到发送的消息是什么样的消息,消息这块是需要去编写代码的,发送完以后会回传信息

详细代码参考文档涉及到配置文件,消息来源,生产者消费者的代码模板:

https:/help. aliyun. com/document detai1/99957. htm1?spm=a2c4g.11186623. 6. 566. 45fc54eayX69b0

6.代码打包运行在ECS上(与kafka同一个可用区)

(1)执行crontab - e执行定时任务发送消息

017***java -jar home/export/upload/javaCode3.jar > >

/home/export/uploadlogfile. log

(2)查看发送消息的定时任务日志

7.在MaxCompute上创建表

(1)创建目标表界面

(2)DDL语句

CREATE TABLE ^testkafka3^ (

^Value^ string,

^times tampl^ string,

^partition^ string,

^offset^string

);

可以根据不同的业务去创建不同的表和字段


七、开发测试以及生产部署

1.选择自定义资源组(或独享集成资源组)进行同步操作

右上角有一个配置,资源组,选择使用自定义资源组还是独享集成资源组,点击执行,执行完之后会有标识有多少条记录,还有是否成功,同步过程就结束了。

(1)选择可使用的独享资源组与自定义资源组进行同步。

(2)同步任务成功会显示,同步数据记录以及结果标志。

2.查询同步的数据结果

在DataWorks的临时界面查看同步数据结果。数据同步过来说明测试成功了。

3.设置调度参数

(1)点击右侧调度配置,输入调入时间。

(2)参考DataWork官方文档完善业务处理流程。

开发业务流程跟开发数据同步过来,根据相关模型进行业务处理,最后会设计一些SQL节点,会做一些部署。

4.提交业务流程节点,并打包发布

(1)点击业务流程,提交业务流程节点。

(2)进入任务发布界面,将节点添加到待发布进行任务的部署。

5.确认业务流程发布成功

在运维中心页面,确认发布是否在生产环境中存在,至此Kafka同步数据MaxCompute 过程结束。

相关文章
|
13天前
|
分布式计算 DataWorks 大数据
MaxCompute产品使用合集之大数据计算MaxCompute如何实现通过离线同步脚本模式
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
13天前
|
分布式计算 DataWorks Oracle
DataWorks操作报错合集之DataWorks ODPS数据同步后,timesramp遇到时区问题,解决方法是什么
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
26 0
|
13天前
|
分布式计算 DataWorks 数据库
DataWorks操作报错合集之DataWorks使用数据集成整库全增量同步oceanbase数据到odps的时候,遇到报错,该怎么处理
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
24 0
|
13天前
|
SQL JSON 分布式计算
DataWorks产品使用合集之DataWorks一键maxcompute数据同步的操作步骤是什么
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
23 0
|
13天前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之在 DataWorks 中,使用Oracle作为数据源进行数据映射和查询,如何更改数据源为MaxCompute或其他类型
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
27 1
|
13天前
|
分布式计算 DataWorks 调度
DataWorks产品使用合集之在DataWorks中,查看ODPS表的OSS对象如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
28 1
|
13天前
|
分布式计算 DataWorks MaxCompute
DataWorks产品使用合集之在DataWorks中,将数据集成功能将AnalyticDB for MySQL中的数据实时同步到MaxCompute中如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
24 0
|
13天前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之在DataWorks中,MaxCompute创建外部表,MaxCompute和DataWorks的数据一直保持一致如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
20 0
DataWorks产品使用合集之在DataWorks中,MaxCompute创建外部表,MaxCompute和DataWorks的数据一直保持一致如何解决
|
13天前
|
分布式计算 DataWorks 安全
DataWorks产品使用合集之在DataWorks中,从Elasticsearch同步数据到ODPS时同步_id字段的如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
25 0
|
13天前
|
分布式计算 DataWorks Java
DataWorks操作报错合集之dataworks 同步es数据到maxcompute 遇到报错:获取表列信息失败如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
23 0

热门文章

最新文章