Kafka 数据如何同步到 MaxCompute | 学习笔记

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
DataWorks Serverless资源组免费试用套餐,300CU*H 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 快速学习 Kafka 数据如何同步到 MaxCompute,介绍了 Kafka 数据如何同步到 MaxCompute系统机制, 以及在实际应用过程中如何使用。

开发者学堂课程【SaaS 模式云数据仓库实战Kafka 数据如何同步到 MaxCompute】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/332/detail/3718


Kafka 数据如何同步到 MaxCompute


内容介绍:

一、Kafka 消息队列使用以及原理

二、资源组介绍以及配置

三、同步过程及其注意事项

四、开发测试以及生产部署


实验目的:

日常工作中,企业需要将 APP 或网站产生的行为日志和业务数据,通过 Kafka 消息队列统一收集后,投递到数据仓库 MaxCompute 中,再通过大数据分析后将指标数据在报表中展示,如用户特征、销售排名、订单地区分布等。

通过本次实验,我们可以学习了解 Kafka 数据如何通过 Dataworks 数据集成同步到 MaxCompute。

image.png

接下来我们所要展示的是 Kafka 通过 Dataworks 上传到 MaxCompute,其中是通过两种方案进行上传的。

方案一,使用自定义资源组的背景一般为网络环境复杂适用于数据上云的场景,该实验将展示使用 ECS 作为自定义资源组的操作过程。

方案二,使用独享集成资源组背景一般为集成资源不足影响数据同步步过程,该实验将展示使用独享资源组的操作方式(重点关注 VPC 的绑定)


一、Kafka 消息队列使用以及原理

1、Kafka 产品概述

消息队列 for Apache Kafka 是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。

消息队列 for Apache Kafka 广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域。

消息队列 for Apache Kafka 针对开源的 Apache Kafka 提供全托管服务,彻底解决开源产品长期以来的痛点。

有了消息队列 for Apache Kafka,您只需专注于业务开发,无需部署运维,具有低成本、更弹性、更可靠的优势。

2、Kafka 架构介绍

image.png

一个典型的消息队列 for Apache Kafka 集群包括四个部分

Producer通过 push 模式向消息队列 for Apache Kafka 的 Kafka Broker 发送消息。发送的消息可以是网站的页面访问、服务器日志,也可以是 CPU 和内存相关的系统资源信息。

Kafka Broker用于存储消息的服务器。Kafka Broker 支持水平扩展。Kafka Broker 节点的数量越多,Kafka 集群的吞吐率越高。

Consumer Group通过 pull 模式从消息队列 for Apache Kafka Broker 订阅并消费消息。

Zookeeper管理集群的配置、选举 leader 分区,并且在 Consumer Group 发生变化时,进行负载均衡。

3、Kafka 消息队列购买以及部署

(1) Kafka 消息队列产品页面点击购买,选择对应消费方式,地区,实例类型,磁盘,流量以及消息存放时间。

(2)开通完成之后点击部署,选择合适的 VPC 以及交换机(注意可用区的位置)

(3)进入 Topic 管理页面,点击创建 Topic 按钮,创建个人的 Topic。 

(4)进入 Consumer Group 管理,点击创建 Consumer Group创建自己所需的 Consumer Group。

4、Kafka 白名单配置

确认需要访问 Kafka 的网段信息。


二、资源组介绍以及配置

1、自定义资源组的使用背景

DataWorks 可以通过免费传输能力(默认任务资源组)进行海量数据上云,默认资源组无法实现传输速度存在较高要求或复杂环境中的数据源同步上云的需求。

可以新增自定义的任务资源运行数据同步任务,解决 DataWorks 默认资源组与您的数据源不通的问题,或实现更高速度的传输能力。

当默认任务资源无法与您的复杂的网络环境连通时,可以通过数据集成自定义资源的部署,打通任意网络环境之间的数据传输同步。

2、自定义资源组的配置

(1)进入 Dataworks 控制台,点击需要数据同步的项目空间,点击数据集成。

(2)进入数据源界面,点击新增自定义资源组。

(3)确认 Kafka 与需要添加自定义资源组属于同一个 VPC 下。

(4)登录 ECS,执行命令 dmidecode|grep UUID 得到 ECS的 UUID

(5) ECS  UUID 以及 IP所占用资源的 CPU 与内存填写进来。

image.png

(6) ECS 上执行安装 Agent 的命令,添加完成测试连通性。

3、独享资源组的使用背景

独享资源模式下,机器的物理资源(网络、磁盘、CPU 和内存等)完全独享。

不仅可以隔离用户间的资源使用,也可以隔离不同工作空间任务的资源使用。此外,独享资源也支持灵活的扩容、缩容功能,可以满足资源独享、灵活配置等需求。

独享资源组可以访问同一地域的 VPC 数据源,也可以访问跨地域的公网 RDS 地址。

4、独享资源组的配置

(1)进入 DataWorks 控制台的资源组列表,点击新增独享集成资源组,点击购买选择对应的地区,CPU 以及内存。

(2)点击专有网路绑定,选择与 Kafka 对应 VPC 以及交换机(明显的区别是可用区),安全组。


三、同步过程及其注意事项

1、DataWorks 数据集成操作

(1)进入 DataWorks 操作界面,点击创建业务流程,在新建的业务流程里添加数据同步节点。

(2)进入数据同步节点,点击数据源为 Kafka,点击转化为脚本模式。

2、Kafka Reader 的主要参数讲解

serverKafka  broker server 地址,格式为 ip:poxt,是必填的。 

topicKafka  topic,是 Kafka 处理资源的消息源(feeds of messages)的,是必填的。

column需要取的 Kafka 数据,支持常量列数据列和属性列。

常量列使用单引号包裹的列为常量列,例如["abc""123"]。

数据列

如果您的数据是一个 JSON,支持获取 JSON 的属性,例如["event_id"].

如果您的数据是一个 JSON,支持获取 JSON 的嵌套子属性,例如["tag.desc"]。

属性列

_key_表示息的 key

_value_表示消息的完整内容。

_partition_表示当前消息所在分区。

_headers_表示当前消息 headers 信息

_offset_表示当前消息的偏移量 

_timestamp_表示当前消息的时间戮。

完整示例如下

"column"[

"_key_",

"_value_"

"_partition_"

"_offset_"

"_timestamp_"

"123",

"event_id"

"tag. desc"

]

keyType Kafka  key 的类型,包括 BYTEARRAY、DOUBLE、FLOATINTEGER、LONG  SHORT,是必填的。

valueTypeKafka  value 的类型,包括 BYTEARRAY、DOUBLEFLOAT、INTEGERLONG  SHORT必填的。 

beginDateTime数据消费的开始时间位点,为时间范围(左闭右开)的左边界。yyyymmddhhmmss 格式的时间字符串,可以和时间属性配合使用。Kafka 0.10.2以上的版本支持此功能。需要和 beginOffset 二选一。(说明beginDateTime 和 endDateTime 使用。)

endDateTime数据消费的结束时间位点,为时间范围(左闭右开)的右边界。yyyymmddhhmmss 格式的时间字符串,可以和时间属性配合使用。Kafka 0.10.2以上的版本支持此功能。需要和 endOffset 二选一。(说明endDateTime 和 beginDateTime 配合使用。) 

beginOffset数据消费的开始时间位点,您可以配置以下形式。

例如15553274的数字形式,表示开始消费的点位。

seekToBeginning表示从开始点位消费数据 

seekToLast表示从上次的位置读取数据。

seekToEnd表示从最后点位消费数据,会读取到空数据。

需要和 beginDateTime 二选一。

endOffset数据消费的结束位点,用来控制什么时候应该结束数据消费任务退出需要和 endDateTime 二选一。

skipExceedRecord

Kafka 使用 public ConsumerRecords<K,V>poll (final Duration timeout)消费数据一次 poll 调用获取的数据可能在 endOffset 或者 endDateTime 之外。skipExceedRecord 用来控制这些多余的数据是否写出到目的端。由于消费数据使用了自动点位提交

建议

Kafka0.10.2之前版本:建议 skipExceedRecord 配置为 false

Kafka0.10.2及以上版本:建议 sKipExceedRecord 配置为 true

不必填写,默认值为 false

partitionKafka 的一个 topic 有多个分区(parttion)正常情况下数据同步任务是读取 topic(多个分区)一个点位区间的数据。您也可以指定 partition仅读取一个分区点位区间的数据。不必填写,无默认值。

kafkaConfig创建 Kafka 数据消费套户端 KafkaConsumer 可以指定扩展参数bootstrap-serversauto.commit.interval.msSession.timeout.ms等,您可以基于 kafkaConfig控KafkaConsumer 消费数据的行为,不必填写。

3、MaxCompute Writer 的主要参数讲解

datasource数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须与添加数据源名称保持一致必填的。

table写入的数据表的表名称(大小写不敏感),不支持填写多张表必填的。

partition需要写入数据表的分区信息,必须指定到最后一级分区。例如把数据写入一个三级分区表,必须配置到最后一级分区,例如 pt=20150101,type=1biz=2

·对于非分区表,该值务必不要填写,表示直接导入至目标表。  

·MaxCompute Writer 不支持数据路由写入,对于分区表请务必保证写入数据到最后一级分区。

如果表为分区表,则必填。如果表为非分区表则不能填写。

column需要导入的字段列表当导入全部字段时,可以配置为“column”:[“*”]。当需要入部分 MaxCompute 列,则填写部分列,例如“column:["id","name"]

·MaxCompute Writer 支持列筛选、列换序。例如一张表中有 a、b  c 三个字段,您只同步 c  b 两个字段,则可以配置为“column”:[“c”,“b”],在导入过程中,字段 a 自动补空,设置为 null

·column 必须显示指定同步的列集台,不允许为空。

是必填的。

truncate通过配置“truncate”:“true”保证写入的等性即当出现写入失败再次运行时,MaxCompute Writer 将清理前述数据,并导入新数据,可以保证每次重跑之后的数据都保持一致。

因为利用 MaxCompute SQL 进行数据清理工作,SQL 无法做到原子性,所以 truncate 选项不是原子操作因此当多个任务同时向一个 Table/Partition 清理分区时,可能出现并发时序问题,请务必注意。

针对这类问题,建议您尽量不要多个作业 DDL 同时操作同一个分区,或者在多个井发作业启动前,提前创建分区。

是必填的。

4、Kafka2MaxCompute 脚本模式编写

{

"type""job"

"steps"[{

"stepType""kafka"

"parameter"{

"server"“******"

"endOffset""1000000"

"kafkaConfig"{

"group.id"

"FinancesGroup"

},

"valueType""ByteArray"

"column”[

"__value__"

"__timestamp__"

"__partition__"

"__offset__"

],

"topic" "finances_topic"

"keyType""ByteArray"

"beginOffset""seekToLast"

},

"name":"Reader"

"category""reader"

},

{

"stepType" "odps"

"parameter"{

"truncate"false

"compress"false

"datasource""odps_first"

"column"[

"value"

"timestamp1"

"partition"

"offset"

],

"emptyAsNull"false

"table""testkafka3"

},

"name""Writer"

"category""writer"

}

],

"version""2.0"

"order"{

"hops"[{

"from""Reader"

"to""Writer"

}]

},

"setting”{

"errorLimit"{

"record""4"

},

"speed”{

"throttle"false

"concurrent"1

}

}

}

5、Kafka 同步数据到 MaxCompute

(1)Kafka  Reader

{

"type""job"

"steps"[{

"stepType""kafka"

"parameter"{

"server"“*******"

"endOffset""1000000"

"kafkaConfig"{

"group.id"

"FinancesGroup"

},

"valueType""ByteArray"

"column”[

"__value__"

"__timestamp__"

"__partition__"

"__offset__"

],

"topic" "finances_topic"

"keyType""ByteArray"

"beginOffset""seekToLast"

},

"name":"Reader"

"category""reader"

}, 

(2) MaxCompute 的 Writer

{

"stepType" "odps"

"parameter"{

"truncate"false

"compress"false

"datasource""odps_first"

"column"[

"value"

"timestamp1"

"partition"

"offset"

],

"emptyAsNull"false

"table""testkafka3"

},

"name""Writer"

"category""writer"

}

],

(3)限制参数

"version""2.0"

"order"{

"hops"[{

"from""Reader"

"to""Writer"

}]

},

"setting”{

"errorLimit"{

"record""4"

},

"speed”{

"throttle"false

"concurrent"1

}

}

}

6、参考 Kafka 生产者 SDK 编写代码

示例代码如下:

//加kafka.properties

Properties kafkaProperties = JavaKafkaConfigurer.getKafka Properties(): Properties props = new Properties():

//设接入点,即控制台的实例详情页显示的默认接入点

props.put(PreducerConfig.BOOTSTRAP_SERVERS_CONFIG,kafkaProperties.getProperty("bootatrap. servers”)):

//接入协议

props.put(CommonClientConfig =.SECURITY_PROTOCOL_CONFIG,“PLAINTEXT”):

// Kafka 消息的序列化方式

props.put(ProducerConfig.KHY_SERIALIZER_CLASS_CONFIG, "org, spache, kafka. common, serialination. StringSerializen”

props.put(ProducerConfig.VALUE_SERIAL.IZHR_CLASS_COMFIG,"ore.apache.kafka.common. gerislization.StringSerializen

//请求的最长等待时间

props,put (ProducerConfig.MAX_DLOCK_MS_CONFIG,30  *  1000):

//构造 Producer 对象,注意,该对象是线程安全的,一般来说,一个进程内一个 Producer对象即可

//如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个

KafkaProducer<String, String>producer = nev KafkaProducer <String,String>(props):

//构造一个 Kafka 消息

String topic = kafkaProperties.getProperty(“topic”)://消息所属的 Topic,请在控制台申请之后填写在这里

String value  = "this is the message's value”://消息的内容

ProducerRecord<String, String> kafkaMessage = new ProducerR ecord<String, String>(topic, value);

Try {

//发送息,并获得一个 Future 对象

Future<RecordMetadata> metadataFuture = producer.send(kafka Message):

//同步获得 Future 对象的结果

RecordMetadata recordMetadata = netadataFuture.get():

Syztem.out.println(“Produce ok:" + recordMetadata. toString ()):

]  catch(Exceptien e)  [

//要考虑重试

Syztem.out.println("error accurred”):

e.printStackTrace():

]

详细代码参考文档涉及到配置文件,消息来源,生产者消费者的代码模板

https://help.aliyun.com/document detail/99957.html?spm =a2c4g.11186623.6.566.45fc54eayX69 b0

7、代码打包运行在在 ECS 上(与 Kafka 同一个可用区)

(1)执行 crontab-e 执行定时任务发送消息

017***java -jar/home/export/upload/javaCode3.jar >>/home/export/upload/logfile.log

(2)查看发送消息的定时任务日志

image.png

8、在 MaxCompute 上创建表

(1)创建目标表界面

(2)DDL 语句

CREATE TABLE `testkafka3`(`value` string,`timestamp1` string,` partition` string, `offset string)

 

四、开发测试以及生产部署

1、选择自定义资源组(或独享集成资源组)进行同步操作

(1)选择可使用的独享资源组与自定义资源组进行同步。

(2)同步任务成功会显示,同步数据记录以及结果标志。

2、查询同步的数据结果

在 DataWorks 的临时界面查看同步数据结果。

3、设置调度参数

(1)点击右侧调度配置,输入调入时间。

(2)参考 DataWork 官方文档完善业务处理流程。

4、提交业务流程节点,并打包发布

(1)点击业务流程,提交业务流程节点。

(2)进入任务发布界面,将节点添加到待发布进行任务的部署。

5、确认业务流程发布成功

在运维中心页面,确认发布是否在生产环境中存在,至此 Kafka 同步数据到 MaxCompute 过程结束。

image.png

相关文章
|
13天前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
58 11
|
19天前
|
存储 分布式计算 大数据
MaxCompute 数据分区与生命周期管理
【8月更文第31天】随着大数据分析需求的增长,如何高效地管理和组织数据变得至关重要。阿里云的 MaxCompute(原名 ODPS)是一个专为海量数据设计的计算服务,它提供了丰富的功能来帮助用户管理和优化数据。本文将重点讨论 MaxCompute 中的数据分区策略和生命周期管理方法,并通过具体的代码示例来展示如何实施这些策略。
50 1
|
24天前
数据平台问题之在数据影响决策的过程中,如何实现“决策/行动”阶段
数据平台问题之在数据影响决策的过程中,如何实现“决策/行动”阶段
|
26天前
|
分布式计算 搜索推荐 物联网
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
|
27天前
|
存储 监控 安全
大数据架构设计原则:构建高效、可扩展与安全的数据生态系统
【8月更文挑战第23天】大数据架构设计是一个复杂而系统的工程,需要综合考虑业务需求、技术选型、安全合规等多个方面。遵循上述设计原则,可以帮助企业构建出既高效又安全的大数据生态系统,为业务创新和决策支持提供强有力的支撑。随着技术的不断发展和业务需求的不断变化,持续优化和调整大数据架构也将成为一项持续的工作。
|
30天前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之ODPS数据怎么Merge到MySQL数据库
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
18天前
|
分布式计算 安全 大数据
MaxCompute 的安全性和数据隐私保护
【8月更文第31天】在当今数字化转型的时代背景下,企业越来越依赖于大数据分析来推动业务增长。与此同时,数据安全和隐私保护成为了不容忽视的关键问题。作为阿里巴巴集团推出的大数据处理平台,MaxCompute(原名 ODPS)致力于为企业提供高效、安全的数据处理解决方案。本文将探讨 MaxCompute 在数据安全方面的实践,包括数据加密、访问控制及合规性考虑等方面。
41 0
|
27天前
|
消息中间件 Java Kafka
Kafka生产者同步和异步的JavaAPI代码演示
Kafka生产者同步和异步的JavaAPI代码演示
25 0
|
30天前
|
SQL 分布式计算 大数据
"揭秘MaxCompute大数据秘术:如何用切片技术在数据海洋中精准打捞?"
【8月更文挑战第20天】在大数据领域,MaxCompute(曾名ODPS)作为阿里集团自主研发的服务,提供强大、可靠且易用的大数据处理平台。数据切片是其提升处理效率的关键技术之一,它通过将数据集分割为小块来优化处理流程。使用MaxCompute进行切片可显著提高查询性能、支持并行处理、简化数据管理并增强灵活性。例如,可通过SQL按时间或其他维度对数据进行切片。此外,MaxCompute还支持高级切片技术如分区表和分桶表等,进一步加速数据处理速度。掌握这些技术有助于高效应对大数据挑战。
52 0
|
1月前
|
SQL 分布式计算 DataWorks
DataWorks操作报错合集之如何解决datax同步任务时报错ODPS-0410042:Invalid signature value
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。

热门文章

最新文章