开发者学堂课程【大数据Spark2020版(知识精讲与实战演练)第五阶段:Structred_Source_Kafka_连接】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/692/detail/12154
Structred_Source_Kafka_连接
内容介绍
一.数据源目标
二.步骤
三.问题解决
一.数据源目标
使用 Spark 流计算连接 Kafka 数据源日标和步骤
通过本章节的数据,能够掌握如何使用 Structured Streaming 对接 Kafka, 从其中获取数据
二.步骤
structures dreaming 卡夫卡消息消费者,在小窗口产生消息
创建 Topic 并输入数据到 Topic
Spark 整合 kafka
读取到的 DataFrame 的数据结构
1. 创建 topic 并输入到 Topic
使用命令创建 topic
bin/kafka-topics.sh--create--topicstreaming-test--replication-factor 1 --partitions3 -- zookeeper node01:2181
Cd/export/service/kafka/ ,进入 kafka目录,粘贴指令
创建出 streaming_test_1
指定只有一个副本,3 个分区
2. 开启 Producer
产生数据放入 kafka
bin/kafka-console-producer.sh--broker-list node01:9092,node02:9092,node03:9092 -topicstreaming-test
已经进入 kafka
3.把 json 转为单行输入。
{"devices": {" qameras":f"device_ id":"awJo6rH", "last_ event":
{"has_sound":true,"has_motion":true,"has_person":true,"start_time":"2016-12-29T00:00:00.000Z","end_time":"2016-12-29T18:42:00. 000Z"}}}}
4.使用 Spark 读取 Kafka 的 Topic
1. 编写 Spark 代码读取 Kafka Topic
val source =spark.readstream
. format("kafka")
.option("kafka.bootstrap.servers","node01:9092 ,node01 :9092 ,node03:9092"). option( "subscribe", "streaming test")
. option("sfartingOffsets", "earliest")
. load( )
package cn. itcast. structured
import org. apache . spark.sql .(Dataframe, Sparksession)
object KafkaSource {
def main(args: Array[String]): Unit = {
//创建 sparksession
val spark = SparkSession. builder()
.appName( name = "hdfs_ source")
.master( master a "local[6]")
. getorCreate()
//读取 kafka 数据
val source: DataFrame = spark. readstream
.format( source 5 "kafka" )
. opt ion("kafka. bootstrap . servers", " node01: 9092, node02 : 9092, node03:9092"). option("subscribe", "streaming_ test 1")
. option( "startingOffsets", "earliest")
. load()
}
}
kafka. bootstrap. servers : 指定 Kafka 的 Server 地址
subscribe : 要监听 Topic, 可以传入多个 Topic, 也可以使用 topic-* 这样的通配符写法
startingOffsets : 从什么位置开始获取数据,可选值有 earliest, assign, latest
format 设置为 Kafka 指定数据 KafkaSource 读取数据
5.思考:从 Kafka 中应该获取什么?
业务系统有很多种类型,有可能是 Web 程序,有可能是物联网。
前端大多数情况下使用 json 做数据交互
使用 json 会比较多
三.问题解决
1.问题一,业务系统如何把数据给卡夫卡?
Flume/Sqoop 收集
Log4j 日志工具把消息打入 kafka
可以主动或者被动的把数据交给 Kafka,但是无论使用什么方式,都在使用 Kafka的 Client 客服端完成这件事,Kafka 的类库调用方式如下
Producer kafka 生产者
Producer<String, String> producer new Kafka Producer<string, String>(properties);producer.send(newProducerRecord<string, str ing>("Helloworld", msg));
注意:
其中发给 Kafka 的消息是 KV 类型的
2.问题2
可以指定多个 topic
Structured Streaming 访问 Kafka 获取数据时需要什么东西呢?
需求1.储存当前处理过的卡夫卡的 offset
需求2.对接多个卡 topic 的时候,要知道这条数据属于哪个 topic
3. 总结
Kafka 中收到的消息是 KV 类型的, 有 Key ,有 Valu.
Structured Streaming 对接 Kafka 时, 每一条 Kafka 的消息不能只是 KV ,必须要有 Topic, Partition 之类的信息
4.从 Kafka 获取的 DataFrame 格式
source .printSchema( )
root
key: binary (nullable F true)
value: binary ( nullabletrue )
topic: string (nullabletrue )
partition: integer (nullable = true)offset: long (nullable = true)
timestamp:times tamp (nullable = true)
timestampType: integer (nullable = true)
从 kafka 中读取到的并不是直接数据,而是一个包含各种信息的表格,其中每个字段的含义如下
Key |
类型 |
解释 |
Key |
binary |
Kafka 消息的 Key |
Value |
binary |
Kafka 消息的 Value |
Topic |
string |
本条消息所在的 topic,因为整合的时候一个 dateaset 可以对接多个 topic,所以有这样一个信息
|
Partition |
integer |
消息的分区号 |
Offset |
long |
消息在分区的偏移量 |
timestamp |
timestamp |
消息进入 Kafka 的时间戳
|
timestampType |
integer |
时间戳类型 |
4. 总结
一定将 json 转为 1 行,再使用 producer 发送
使用 Structured Streaming 连接 kafka 需要配置的 3 个参数
kafka. bootstrap. servers
: 指定 Kafka 的 Server 地址
subscribe : 要监听 Topic, 可以传入多个 Topic, 也可以使用 topic-* 这样的通配符写法
startingOffsets :从什么位置开始获取数据,可选值有 earliest, assign, latest
从 Kafka 获取的 DataFrame 的 Scheam 如下
root
key: binary (nullable F true)
value: binary ( nullabletrue )
topic: string (nullabletrue )
partition: integer (nullable = true)offset: long (nullable = true)
timestamp:times tamp (nullable = true)
timestampType: integer (nullable = true)