Kafka 数据如何同步到 MaxCompute | 学习笔记(二)

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 快速学习 Kafka 数据如何同步到 MaxCompute

开发者学堂课程【SaaS 模式云数据仓库实战Kafka 数据如何MaxComput】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/760/detail/13341


Kafka 数据如何同步到 MaxCompute


六、同步过程及其注意事项

1.DataWorks数据集成操作

(1)进入DataWorks操作界面,点击创建业务流程,在新建的业务流程里添加数据同步节点。

(2)数据源选择kafka和modeps,进入数据同步节点,点击数据源为Kafka,点击转化为脚本模式。右上角有一个帮助面板,同步的参数就近可以点击一下,更加方便操作。

2.Kafka Reader的主要参数讲解

(1)server

Kafka的broker server地址(默认接入点),格式为ip:poxt.这个是必填的。

(2)topic

Kafka的topic,是Kafka处理资源的消息源(feeds of messages)的聚合。

(3)column

需要读取的Kafka数据,支持常量列、数据列和属性列。

常量列:使用单引号包裹的列为常量列,例如["abc"、“123”]。

数据列

如果您的数据是一个JSON,支持获取JSON的属性,例如["event_ jd].

如果您的数据是一个JSON, 支持获取JSON的嵌套子属性,例如[tag desc"].

属性列

__key__表示消息的key。

__value___表示消息的完整内容。

__partition__ 表示当前消息所在分区。

__headers__表示当前消息headers信息。

__offset__表示当前消息的偏移量。

__timestamp__表示当前消息的时间戳。

同步的消息一般存放在value中,column是必填的。

(4)keyType

Kafka的key的类型,包括BYTEARRAY. DOUBLE、 FLOAT、INTEGER、LONG和SHORT.它是必填的。根据资源同步数据选择信息,相应同步一个类型。

(5)valueType

Kafka的value的类型,包括BYTEARRAY、 DOUBLE、FLOAT、INTEGER.、LONG和SHORT.它是必填的。

需要选择是按照消息时间同步还是按照消费点位同步,有以下四种类型

(6)beginDateTime

数据消费的开始时间位点,为时间范围(左闭右开)的左边界。ysymmddhhmmss格式的时间字符串,可以和时间属性配合使用。Kafka 0.10.2以上的版本支持此功能。beginDateTime和beginOffset是二选其一的,beginDateTime和enddatetime配合使用。注意Kafka 0.10.2以上的版本才支持消费时间方式。

(7)enddatetime

数据消费的结束时间位点,为时间范围(左闭右开)的右边界。yryymnddhhmmss格式的时间字符串,可以和时间属性配合使用。Kafka 0.10 2以上的版本支持此功能。enddatetime需要和endoffset二选一,enddatetime和begindatetime配合使用。

(8)beginoffset

数据消费的开始时间位点,您可以配置以下形式。

●例如15553274的数字形式,表示开始消费的点位。

●seekToBeginning: 表示从开始点位消费数据。

●seekToLast: 表示从上次的偏移位置读取数据。按照时间消费的话,可以多次进行消费,取决于消费存放时间的多少。

●seekToEnd: 表示从最后点位消费数据,会读取到空数据。

需要和begindatetime二选一。

(9)endOffset

数据消费的结束位点,用来控制什么时候应该结束数据消费任务退出。

需要和enddatetime二选一。

(10)skipExceedRecord

Kafka使用public ConsumerRecords poll (final Duration timeout) 消费数据, 一次poll调用获取的数据可能在endOffset或者endDateTime之外。skipExceedRecord用来控制这些多余的数据是否写出到目的端。由于消费数据使用了自动点位提交,建议:

●Kafka 0.10.2之前版本:建议skipExceedRecord配置为false.

●Kafka 0.10.2及以上版本:建议skipExceedRecord配置为true.

一般为否,默认值为false

(11)partition

Kafka的一个topic有多个分区 (partition) ,正常情况下数据同步任务是读取topic (多个分区)一个 点位区间的数据。您也可以指定partition,仅读取一个分区点位区间的数据。一般为否,无默认值。

(12)kafkaConfig

创建Kafka数据消费客户端KafkaConsumer可以指定扩展参数,例如bootstrap. servers. auto. commit. interval. ms.session. timeout. ms等,您可以基于kafkaConfig控制KafkaConsumer消费数据的行为。为否

2.MaxCompute Writer的主要参数讲解

(1)datasource

数据源名称,脚本模式支持深加数据源,此配置项填写的内容必须与添加的数据源名称保持一致。

(2)table

写入的数据表的表称(大小写不敏感),不支持填写多张表。

(3)partition

需要写入数据表的分区信息,必须指定到最后一级分区。例如把数据写入一个三级分区表,必须配置到最后一级分区,例如pt=20150101, type=1,biz=2。

●对于非分区表,该值务必不要填写,表示直接导入至目标表。

●MaxCompute wiriter不支持数据路由写入,对于分区表请务必保证写入

数据到最后一级分区。

此处在操作时可能出现错误,如果表为分区表,则必填。如果表为非分区表,则不能填写。

(4)column

与kafka中的column相关值一一对应。

需要导入的字段列表。当导入全部字段时,可以配置为"eoluan" :["*"]。 当需要插入部分MaxCompute列,则填写部分列,例如"column"; ["id", "name"]。

●MaxCompute Witer支持列强选。列换序。例如-张表中有a. bIOc三个

字段,您只同步cICb两个字段,则可以配置为"eolun" :["c","b"] 。在导入过程中,字段a自动补空。设置为null.

●column必须显示指定同步的列重台,不允许为空。

(5)truncate

通过配置“truncate": true" 保证写入的幕等性。 即当出现写入失败再次运行时,MaxCompute Witer将清理前述数据,并导入新数据,可以保证每次重跑之后的数据都保持一致.

因为利用MaxCompute SQL进行数据清理工作,SQL无法做到原子性,所以runcate选项不是原子操作。因此当多个任务同时向一个Table/Partion清理分区时,可能出现并发时序问题,请务心注息。

针对这类问题,建议您尽量不要多个作业DDL同时操作同一个分区,或者在多个并发作业启动前,提前创建分区。

3.Kafka2MaxCompute脚本模式编写

4.kafka同步数据到MaxCompute

针对于上面的图主要拆分为三部分

(1)kafka的Reader

运行代码如下

”type" :“ job" ,

"steps" : [{

stepType" :' kafka' ,

'parameter" : {

"server" :“******".

end0ffset":"000000",

"kafkaConfig" : {

"group. id" :

"FinancesGroup"

"valuerpe": "Byterray",

" column": [

"__value__",

"__timestamp__",

"__partition__"

"__offset__"

],

” topic" :"finances_ topic" ,

"keyType" :" ByteArray',

"beginOffset" :" seekToLast'

"name" :“Reader",

” category" :" reader"

}

Server、endoffset、kafkaconfig、groupid、valuvetype类型、column字段等等

(2)MaxCompute的Weriter

代码运行如下

{

'stepType" :”odps" ,

"parameter" :{

"truncate" : false,

"compress" : false,  

"datasource" :"odps_ first" ,

"column" : [

"value",

"timestamp1" , .

"partition" ,

"offset"

]

"emptyAsNull" : false,

" table" :“testkafka3"

},

"name" :"Writer",

"category" :”writer"

},

]

Truncate覆盖、compress压缩,同步哪个表以及相关字段

(3)限制参数

主要是errorlimit发布报错,speed流速是多少等等都可以做一些限制

"version": “2.0",

"order": {

"hops": [{

"from":" Reader" ,

"to":'" Writer"

}]

"setting" : {

"errorLimit": {

" record" : "4"

},

"speed": {

"throttle" : false,

"concurrent" : 1

}

}

5.参考Kafka生产者SDK编写代码

代码主要是配置数据的读取,默认接入点以及协议、区别化方式、请求等待时间,发送的topic是哪个topic,里面涉及到发送的消息是什么样的消息,消息这块是需要去编写代码的,发送完以后会回传信息

详细代码参考文档涉及到配置文件,消息来源,生产者消费者的代码模板:

https:/help. aliyun. com/document detai1/99957. htm1?spm=a2c4g.11186623. 6. 566. 45fc54eayX69b0

6.代码打包运行在ECS上(与kafka同一个可用区)

(1)执行crontab - e执行定时任务发送消息

017***java -jar home/export/upload/javaCode3.jar > >

/home/export/uploadlogfile. log

(2)查看发送消息的定时任务日志

7.在MaxCompute上创建表

(1)创建目标表界面

(2)DDL语句

CREATE TABLE ^testkafka3^ (

^Value^ string,

^times tampl^ string,

^partition^ string,

^offset^string

);

可以根据不同的业务去创建不同的表和字段


七、开发测试以及生产部署

1.选择自定义资源组(或独享集成资源组)进行同步操作

右上角有一个配置,资源组,选择使用自定义资源组还是独享集成资源组,点击执行,执行完之后会有标识有多少条记录,还有是否成功,同步过程就结束了。

(1)选择可使用的独享资源组与自定义资源组进行同步。

(2)同步任务成功会显示,同步数据记录以及结果标志。

2.查询同步的数据结果

在DataWorks的临时界面查看同步数据结果。数据同步过来说明测试成功了。

3.设置调度参数

(1)点击右侧调度配置,输入调入时间。

(2)参考DataWork官方文档完善业务处理流程。

开发业务流程跟开发数据同步过来,根据相关模型进行业务处理,最后会设计一些SQL节点,会做一些部署。

4.提交业务流程节点,并打包发布

(1)点击业务流程,提交业务流程节点。

(2)进入任务发布界面,将节点添加到待发布进行任务的部署。

5.确认业务流程发布成功

在运维中心页面,确认发布是否在生产环境中存在,至此Kafka同步数据MaxCompute 过程结束。

相关文章
|
12天前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
100 7
|
12天前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
27 2
|
24天前
|
数据采集 监控 数据管理
数据治理之道:大数据平台的搭建与数据质量管理
【10月更文挑战第26天】随着信息技术的发展,数据成为企业核心资源。本文探讨大数据平台的搭建与数据质量管理,包括选择合适架构、数据处理与分析能力、数据质量标准与监控机制、数据清洗与校验及元数据管理,为企业数据治理提供参考。
68 1
|
9天前
|
存储 大数据 数据管理
大数据分区简化数据维护
大数据分区简化数据维护
16 4
|
19天前
|
存储 大数据 定位技术
大数据 数据索引技术
【10月更文挑战第26天】
41 3
|
19天前
|
存储 大数据 OLAP
大数据数据分区技术
【10月更文挑战第26天】
52 2
|
22天前
|
消息中间件 分布式计算 大数据
数据为王:大数据处理与分析技术在企业决策中的力量
【10月更文挑战第29天】在信息爆炸的时代,大数据处理与分析技术为企业提供了前所未有的洞察力和决策支持。本文探讨了大数据技术在企业决策中的重要性和实际应用,包括数据的力量、实时分析、数据驱动的决策以及数据安全与隐私保护。通过这些技术,企业能够从海量数据中提取有价值的信息,预测市场趋势,优化业务流程,从而在竞争中占据优势。
66 2
|
23天前
|
数据采集 分布式计算 大数据
数据治理之道:大数据平台的搭建与数据质量管理
【10月更文挑战第27天】在数字化时代,数据治理对于确保数据资产的保值增值至关重要。本文探讨了大数据平台的搭建和数据质量管理的重要性及实践方法。大数据平台应包括数据存储、处理、分析和展示等功能,常用工具如Hadoop、Apache Spark和Flink。数据质量管理则涉及数据的准确性、一致性和完整性,通过建立数据质量评估和监控体系,确保数据分析结果的可靠性。企业应设立数据治理委员会,投资相关工具和技术,提升数据治理的效率和效果。
55 2
|
26天前
|
存储 安全 大数据
大数据隐私保护:用户数据的安全之道
【10月更文挑战第31天】在大数据时代,数据的价值日益凸显,但用户隐私保护问题也愈发严峻。本文探讨了大数据隐私保护的重要性、面临的挑战及有效解决方案,旨在为企业和社会提供用户数据安全的指导。通过加强透明度、采用加密技术、实施数据最小化原则、加强访问控制、采用隐私保护技术和提升用户意识,共同推动大数据隐私保护的发展。
|
30天前
|
SQL 存储 大数据
大数据中数据提取
【10月更文挑战第19天】
54 2
下一篇
无影云桌面