企业版Spark Databricks + 企业版Kafka Confluent 联合高效挖掘数据价值

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 本文介绍了如何使用阿里云的Confluent Cloud和Databricks构建数据流和LakeHouse,并介绍了如何使用Databricks提供的能力来挖掘数据价值,使用Spark MLlib构建您的机器学习模型。

前提条件

  • 已注册阿里云账号,详情请参见阿里云账号注册流程
  • 已开通 Databricks 数据洞察服务
  • 已开通 OSS 对象存储服务
  • 已开通 Confluent 流数据服务


创建Databricks集群 & Confluent集群

  1. 登录Confluent管理控制台,创建Confluent集群,并开启公网服务
  2. 登录Databricks管理控制台,创建Databricks集群


Databricks Worker节点公网访问

Databricks的worker节点暂时不支持公网访问,为了能访问Confluent的公网地址,请联系Databricks的开发人员添加NAT网关。


案例:出租车数据入湖及分析

出租车和网约车在每天的运行中持续产生行驶轨迹和交易数据,这些数据对于车辆调度,流量预测,安全监控等场景有着极大的价值。

本案例中我们使用纽约市的出租车数据来模拟网约车数据从产生,发布到流数据服务Confluent,通过Databricks Structured Streaming进行实时数据处理,并存储到LakeHouse的整个流程。数据存储到LakeHouse后,我们使用spark和spark sql对数据进行分析,并使用Spark的MLlib进行机器学习训练。


前置准备:

  1. 创建topic:
    登录Confluent的control center,在左侧选中Topics,点击Add a topic按钮,创建一个名为nyc_taxi_data的topic,将partition设置为3,其他配置保持默认。


  1. 创建OSS bucket:
    在和Databricks同一Region的OSS中,创建bucket,bucket命名为:databricks-confluent-integration
    进入到Bucket列表页,点击创建bucket按钮

    创建好bucket之后,在该bucket创建目录:checkpoint_dir和data/nyc_taxi_data两个目录
  2. 收集url,用户名,密码,路径等以便后续使用
  1. confluent集群ID:在csp的管控界面,集群详情页获取
  2. Confluent Control Center的用户名和密码
  3. 路径:
  • Databricks Structured Streaming的checkpoint存储目录
  • 采集的数据的存储目录

下面是我们后续会使用到的一些变量:

# 集群管控界面获取
confluent_cluster_id = "your_confluent_cluster_id"    
# 使用confluent集群ID拼接得到
confluent_server = "rb-{confluent_cluster_id}.csp.aliyuncs.com:9092" 
control_center_username = "your_confluent_control_center_username"
control_center_password = "your_confluent_control_center_password"
topic = "nyc_taxi_data"
checkpoint_location = "oss://databricks-confluent-integration/checkpoint_dir"
taxi_data_delta_lake = "oss://databricks-confluent-integration/data/nyc_taxi_data"

数据的产生

在本案例中,我们使用Kaggle上的NYC出租车数据集来模拟数据产生。

  • 我们先安装confluent的python客户端,其他语言的客户端参考confluent官网
pip install confluent_kafka
  • 构造用于创建Kafka Producer的基础信息,如:bootstrap-server,control center的username,password等
conf = {
    'bootstrap.servers': confluent_server,
    'key.serializer': StringSerializer('utf_8'),
    'value.serializer': StringSerializer('utf_8'),
    'client.id': socket.gethostname(),
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': control_center_username,
    'sasl.password': control_center_password
}
  • 创建Producer:
producer = Producer(conf)
  • 向Kafka中发送消息(模拟数据的产生):
with open("/Path/To/train.csv", "rt") as f:
    float_field = ['fare_amount', 'pickup_longitude', 'pickup_latitude', 
                   'dropoff_longitude', 'dropoff_latitude']
    for row in reader:
        i += 1
        try:
            for field in float_field:
                row[field] = float(row[field])
            row['passenger_count'] = int(row['passenger_count'])
            producer.produce(topic=topic, value=json.dumps(row))
            if i % 1000 == 0:
                producer.flush()
                if i == 200000:
                    break
        except ValueError: # discard null/NAN data
            continue 

Kafka中的partition和offset

在使用spark读取Kafka中的数据之前,我们回顾一下Kafka中的概念:partition和offset

  • partition:kafka为了能并行进行数据的写入,将每个topic的数据分为多个partition,每个partition由一个Broker负责,向partition写入数据时,负责该partition的Broker将消息复制给它的follower
  • offset:Kafka会为每条写入partition里的消息进行编号,消息的编号即为offset

我们在读取Kafka中的数据时,需要指定我们想要读取的数据,该指定需要从两个维度:partition的维度 + offset的维度。

  • Earliest:从每个partition的offset 0开始读取和加载
  • Latest:从每个partition最新的数据开始读取
  • 自定义:指定每个partition的开始offset和结束offset
  • 读取topic1 partition 0 offset 23和partition 0 offset -2之后的数据:"""{"topic1":{"0":23,"1":-2}}"""

除了指定start offset,我们还可以通过endingOffsets参数指定读取到什么位置为止。

将数据存储到LakeHouse:Spark集成Confluent

理解上述概念后,Databricks和Confluent的集成非常简单,只需要对spark session的readStream参数进行简单的设置就可以将Kafka中的实时流数据转换为Spark中的Dataframe:

lines = (spark.readStream
         # 指定数据源: kafka
         .format("kafka")
         # 指定kafka bootstrap server的URL
         .option("kafka.bootstrap.servers", confluent_server)
         # 指定订阅的topic
         .option("subscribe", topic)
         # 指定想要读取的数据的offset,earliest表示从每个partition的起始点开始读取
         .option("startingOffsets", "earliest")
         # 指定认证协议
         .option("kafka.security.protocol", "SASL_SSL")
         .option("kafka.sasl.mechanism", "PLAIN")
         # 指定confluent的用户名和密码
         .option("kafka.sasl.jaas.config",
                 f"""org.apache.kafka.common.security.plain.PlainLoginModule 
                 required username="{control_center_username}" password="{control_center_password}";""")
         .load())

从kafka中读取的数据格式如下:

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

由于key和value都是binary格式的,我们需要将value(json)由binary转换为string格式,并定义schema,提取出Json中的数据,并转换为对应的格式:

schema = (StructType().add('key', TimestampType())
          .add('fare_amount', FloatType())
          .add('pickup_datetime', TimestampType())
          .add('pickup_longitude', FloatType())
          .add('pickup_latitude', FloatType())
          .add('dropoff_longitude', FloatType())
          .add('dropoff_latitude', FloatType())
          .add('passenger_count', IntegerType())
          )
# 将json中的列提取出来
lines = (lines.withColumn('data', 
                          from_json(
                              col('value').cast('string'), # binary 转 string
                              schema))                     # 解析为schema
         .select(col('data.*')))                           # select value中的所有列

过滤掉错误,为空,NaN的数据:

lines = (lines.filter(col('pickup_longitude') != 0)
         .filter(col('pickup_latitude') != 0)
         .filter(col('dropoff_longitude') != 0)
         .filter(col('dropoff_latitude') != 0)
         .filter(col('fare_amount') != 0)
         .filter(col('passenger_count') != 0))

最后,我们将解析出来的数据输出到LakeHouse中,以进行后续的分析和机器学习模型训练:

# lakehouse 的存储格式为 delta
query = (lines.writeStream.format('delta')
         .option('checkpointLocation', checkpoint_location)
         .option('path', taxi_data_delta_lake).start())
# 执行job,直到出现异常(如果只想执行该Job一段时间,可以指定timeout参数)
query.awaitTermination()

数据分析

我们先将LakeHouse中的数据使用Spark加载进来:

然后,我们对该Dataframe创建一个Table View,并探索fare_amount的分布:

可以看到fare_amount的最小值是负数,这显然是一条错误的数据,我们将这些错误的数据过滤,并探索fare_amount的分布:

然后我们探索价格和年份,月份,星期,打车时间的关系:

从上面可以看出两点:

  • 出租车的价格和年份有很大关系,从09年到15年呈不断增长的态势
  • 在中午和凌晨打车比上午和下午打车更贵一些。

我们再进一步探索价格和乘客数量的关系:

此外,出租车价格的另一个影响因素就是距离,这里我们借助python的geopy包和Spark的UDF来计算给定两个位置的距离,然后再分析费用和距离的关系。

经纬度的范围为[-90, 90],因此,我们第一步是清除错误的数据:

然后,我们增加一列数据:出租车行驶的距离,并将距离离散化,进行后续的分析:

统计打车距离的分布:

从上图可以看出:打车距离分布在区间[0, 15]miles内,我们继续统计在该区间内,打车价格和打车距离的关系:

如上图所示:打车价格和打车距离呈现出线性增长的趋势。

机器学习建模

在上一小节的数据分析中,我们已经提取了和出租车相关联的一些特征,根据这些特征,我们建立一个简单的线性回归模型:

打车费用 ~ (年份,打车时间,乘客数,距离)

先将特征和目标值提取出来:

对特征做归一化:

分割训练集和测试集:

建立线性回归模型进行训练:

训练结果统计:

使用Evaluator对模型进行评价:

总结

我们在本文中介绍了如何使用阿里云的Confluent Cloud和Databricks来构建您的数据流和LakeHouse,并介绍了如何使用Databricks提供的能力来挖掘数据价值,使用Spark MLlib构建您的机器学习模型。有了Confluent Cloud和Databricks,您可以轻松实现数据入湖,及时在最新的数据上进行探索,挖掘您的数据价值。欢迎您试用阿里云ConfluentDatabricks



产品技术咨询

https://survey.aliyun.com/apps/zhiliao/VArMPrZOR  

加入技术交流群

image.png


相关文章
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
40 0
|
1月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
87 0
|
1月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
47 1
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
56 0
|
3月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
279 9
|
3月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
137 0
|
3月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 缓存 Kafka
【Azure 事件中心】使用Kafka消费Azure EventHub中数据,遇见消费慢的情况可以如何来调节呢?
【Azure 事件中心】使用Kafka消费Azure EventHub中数据,遇见消费慢的情况可以如何来调节呢?
下一篇
无影云桌面