Structred_Source_Kafka_连接 | 学习笔记

简介: 快速学习 Structred_Source_Kafka_连接

开发者学堂课程【大数据Spark2020版(知识精讲与实战演练)第五阶段:Structred_Source_Kafka_连接】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/692/detail/12154


Structred_Source_Kafka_连接

内容介绍

一.数据源目标

二.步骤

三.问题解决

 

一.数据源目标

使用 Spark 流计算连接 Kafka 数据源日标和步骤

通过本章节的数据,能够掌握如何使用 Structured Streaming 对接 Kafka, 从其中获取数据

 

二.步骤

structures dreaming 卡夫卡消息消费者,在小窗口产生消息

创建 Topic 并输入数据到 Topic

Spark 整合 kafka

读取到的 DataFrame 的数据结构

1. 创建 topic 并输入到 Topic

使用命令创建 topic

bin/kafka-topics.sh--create--topicstreaming-test--replication-factor 1 --partitions3 -- zookeeper node01:2181

Cd/export/service/kafka/ ,进入 kafka目录,粘贴指令

创建出 streaming_test_1

指定只有一个副本,3 个分区

image.png

2. 开启 Producer

产生数据放入 kafka

bin/kafka-console-producer.sh--broker-list node01:9092,node02:9092,node03:9092 -topicstreaming-test

image.png

已经进入 kafka

3.把 json 转为单行输入。

{"devices": {" qameras":f"device_ id":"awJo6rH", "last_ event":{"has_sound":true,"has_motion":true,"has_person":true,"start_time":"2016-12-29T00:00:00.000Z","end_time":"2016-12-29T18:42:00. 000Z"}}}}

4.使用 Spark 读取 Kafka 的 Topic

1. 编写 Spark 代码读取 Kafka Topic

val source =spark.readstream

. format("kafka")

.option("kafka.bootstrap.servers","node01:9092 ,node01 :9092 ,node03:9092"). option( "subscribe", "streaming test")

. option("sfartingOffsets", "earliest")

. load( )

package cn. itcast. structured

import org. apache . spark.sql .(Dataframe, Sparksession)

object KafkaSource {

def main(args: Array[String]): Unit = {

//创建 sparksession

val spark = SparkSession. builder()

.appName( name = "hdfs_ source")

.master( master a "local[6]")

. getorCreate()

//读取 kafka 数据

val source: DataFrame = spark. readstream

.format( source 5 "kafka" )

. opt ion("kafka. bootstrap . servers", " node01: 9092, node02 : 9092, node03:9092"). option("subscribe", "streaming_ test 1")

. option( "startingOffsets", "earliest")

. load()

}

}

kafka. bootstrap. servers : 指定 Kafka 的 Server 地址

subscribe : 要监听 Topic, 可以传入多个 Topic, 也可以使用 topic-* 这样的通配符写法

startingOffsets : 从什么位置开始获取数据,可选值有 earliest, assign, latest

format 设置为 Kafka 指定数据 KafkaSource 读取数据

5.思考:从 Kafka 中应该获取什么?

业务系统有很多种类型,有可能是 Web 程序,有可能是物联网。

image.png

前端大多数情况下使用 json 做数据交互

使用 json 会比较多

 

三.问题解决

1.问题一,业务系统如何把数据给卡夫卡?

image.png

Flume/Sqoop 收集

Log4j 日志工具把消息打入 kafka

可以主动或者被动的把数据交给 Kafka,但是无论使用什么方式,都在使用 Kafka的 Client 客服端完成这件事,Kafka 的类库调用方式如下

Producer  kafka 生产者

Producer<String, String> producer new Kafka Producer<string, String>(properties);producer.send(newProducerRecord<string, str ing>("Helloworld", msg));

注意:

其中发给 Kafka 的消息是 KV 类型的

2.问题2

可以指定多个 topic

Structured Streaming 访问 Kafka 获取数据时需要什么东西呢?

需求1.储存当前处理过的卡夫卡的 offset

需求2.对接多个卡 topic 的时候,要知道这条数据属于哪个 topic

3. 总结

Kafka 中收到的消息是 KV 类型的, 有 Key ,有 Valu.

Structured Streaming 对接 Kafka 时, 每一条 Kafka 的消息不能只是 KV ,必须要有 Topic, Partition 之类的信息

4.从 Kafka 获取的 DataFrame 格式

source .printSchema( )

root

key: binary (nullable F true)

value: binary ( nullabletrue )

topic: string (nullabletrue )

partition: integer (nullable = true)offset: long (nullable = true)

timestamp:times tamp (nullable = true)

timestampType: integer (nullable = true)

从 kafka 中读取到的并不是直接数据,而是一个包含各种信息的表格,其中每个字段的含义如下

Key

类型

解释

Key

binary

Kafka 消息的 Key

Value

binary

Kafka 消息的 Value

Topic

string

本条消息所在的 topic,因为整合的时候一个 dateaset 可以对接多个 topic,所以有这样一个信息

 

Partition

integer

消息的分区号

Offset

long

消息在分区的偏移量

timestamp

timestamp

消息进入 Kafka 的时间戳

 

timestampType

integer

时间戳类型

4. 总结

一定将 json 转为 1 行,再使用 producer 发送

使用 Structured Streaming 连接 kafka 需要配置的 3 个参数

kafka. bootstrap. servers : 指定 Kafka 的 Server 地址

subscribe : 要监听 Topic, 可以传入多个 Topic, 也可以使用 topic-* 这样的通配符写法

startingOffsets :从什么位置开始获取数据,可选值有 earliest, assign, latest

从 Kafka 获取的 DataFrame 的 Scheam 如下

root

key: binary (nullable F true)

value: binary ( nullabletrue )

topic: string (nullabletrue )

partition: integer (nullable = true)offset: long (nullable = true)

timestamp:times tamp (nullable = true)

timestampType: integer (nullable = true)

相关文章
|
消息中间件 关系型数据库 MySQL
Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)
Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)
|
5月前
|
消息中间件 前端开发 Kafka
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
|
7月前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
57 0
|
5月前
|
消息中间件 Java Kafka
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
|
6月前
|
消息中间件 监控 Java
查询Kafka生产者是否连接到Kafka服务
查询Kafka生产者是否连接到Kafka服务
301 2
|
8月前
|
消息中间件 存储 分布式计算
Hadoop学习笔记(HDP)-Part.19 安装Kafka
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
196 0
Hadoop学习笔记(HDP)-Part.19 安装Kafka
|
7月前
|
消息中间件 算法 Java
go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费
go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费
150 0
|
8月前
|
消息中间件 Oracle 关系型数据库
实时计算 Flink版操作报错之连接外部kafka本地执行测试代码报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
消息中间件 网络协议 安全
【Kafka从入门到成神系列 八】Kafka 多线程消费者及TCP连接
【Kafka从入门到成神系列 八】Kafka 多线程消费者及TCP连接
【Kafka从入门到成神系列 八】Kafka 多线程消费者及TCP连接

热门文章

最新文章