开发者社区 > 大数据与机器学习 > 开源大数据平台 E-MapReduce > 正文

如何使用DDI+Confluent进行实时数据采集入湖和分析?

如何使用DDI+Confluent进行实时数据采集入湖和分析?

展开
收起
游客lmkkns5ck6auu 2022-07-28 16:30:07 753 0
1 条回答
写回答
取消 提交回答
  • 本示例中模拟实时数据的数据集为纽约市出租车数据,存储在csv文件内。数据中第一列是key,为交易产生的时间,而后每一列分别为交易价格、乘客上车时间、上车的经纬度、下车的经纬度以及乘客的数量。

    1)进入Cnfluent管控台,Jinxi-DDI-test为本次使用的confluent集群。 2)登录Control Center,可以查看本集群的详细信息,比如broker数量、 broker产生和消费的吞吐量、topic数量、partition数量等信息。
    3)创建一个用于接收数据的topic ,名为nyc_ taxi-data , partition数量设置为3,与broker数量一致。

    4)Python脚本里需要提供集群ID,Control Center的用户名、密码以及topic名称。创建一个名为conf的dict,指定bootstrap server的地址,此地址需要根据ClusterlD拼接而成,其他都为样板代码。生成一个producer,其来源于confluent Kafka库,因此用户需要安装此库。打开train文件,读取csv文件里的数据并发送到confluent集群。

    5)进入ControL Center,可以看到已经开始生产消息。

    6)进入topic,点击message,可以直接跳转到offset0,即最开始的消息,可以查看消息的生产情况以及具体产生了哪些消息。此外,企业版Kafka能提供一些高级特性,比如定义消息的schema , connect, ksqLDB等功能。至此,数据的采集和发布链路已经打通,下一步需要到DDI Notebook连接到Kafka Brocker并对其中的实时数据进行消费、进行ETL处理,最终对这些实时数据进行入湖操作。

    7)准备好Notebook,注意,此处需要再增加一个Kafka connector jar包。创建数据库,使用数据库存储数据表。 按照nyc_taxi_ data的schema创建表,字段与train.csv文件里一致,并使用delta来存储。

    8)此外还设置了表的属性,主要与DDI引擎的特性相关。第一个特性为对OSS的写入操作做合并,从而防止小文件的产生,避免元数据不可扩展以及查询速度下降;第二个特性为定期将小文件进行合并,以避免查询性能下降以及元数据的扩展出现问题。

    9)连接到confluent集群进行消费。消费逻辑与发送逻辑较为相近,需要给定confluent的Cluster ID , server, username, password以及topic。为了安全起见,需要将密码存储至环境变量中,它会随着作业运行结束而消失。Checkpoint_location会记录消费Kafka数据时的offset,如果作业中断或异常停止,重启后可以从对应的offset继续消费。 使用Spark到Kafka的connector读取Kafka中数据。数据为key-value的形式,存储 在value里,因此需要创建一列data,负责从value里取值。va lue为二进制形式,需要先将其转换成string,形成JSON串,而后通过select将里面的数据读取出来,并写入到data表。

    10)运行代码,启动程序。Select若干数据进行查看。在OSS的browser里可以看到注入的数据以及过程中产生checkpoint信息。

    11)刷新后可以看到,数据已经注入到datalake中。

    以上内容摘自《Databricks数据洞悉》电子书,点击https://developer.aliyun.com/topic/download?id=8545可下载完整版

    2022-07-29 10:45:26
    赞同 展开评论 打赏

阿里云EMR是云原生开源大数据平台,为客户提供简单易集成的Hadoop、Hive、Spark、Flink、Presto、ClickHouse、StarRocks、Delta、Hudi等开源大数据计算和存储引擎,计算资源可以根据业务的需要调整。EMR可以部署在阿里云公有云的ECS和ACK平台。

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载