使用外部数据源Kafka

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 使用外部数据源Kafka

一、实验目的

掌握Kafka作为Spark结构化流的外部数据源。

  掌握kafka作为Spark的Sink。

二、实验内容

1、在Spark结构化流中连接Kafka,将Kafka作为外部数据源。

  2、将Spark结构化流程序计算后的结果写入Kafka。

三、实验原理

Kafka通常用于构建实时流数据管道,以可靠地在系统之间移动数据,还用于转换和响应数据流。Kafka作为集群在一个或多个服务器上运行。

 Kafka的一些关键概念在这里描述:

Topic:消息发布到的类别或流名称的高级抽象。主题可以有0、1或多个消费者,这些消费者订阅发布到该主题的消息。用户为每个新的消息类别定义一个新主题;

• Producers:向主题发布消息的客户端;

• Consumers:使用来自主题的消息的客户端;

• Brokers:复制和持久化消息数据的一个或多个服务器。

  默认情况下,Kafka的数据源并不是Spark的内置数据源。如果我们要从spark shell使用Kafka数据源,那么需要在启动spark shell时将依赖的JAR包添加到classpath中。可手动将依赖包添加到classpath。

四、实验环境

硬件:x86_64 ubuntu 16.04服务器

  软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3

五、实验步骤

以下操作均在”VNC界面”下操作。请首先切换到VNC界面。

5.1 启动zookeeper服务和kafka服务

1、启动zookeeper服务

  Kafka依赖于Apache ZooKeeper,所以在启动Kafka之前,要先启动它。打开一个终端窗口,执行如下命令:

1.  $ cd /opt/kafka
2.  $ ./bin/zookeeper-server-start.sh config/zookeeper.properties

这将在2181端口启动ZooKeeper进程

  2、启动kafaka服务

  接下来,启动Kafka服务器。另打开一个终端窗口,执行如下命令:

1.  $ cd /opt/kafka
2.  $ ./bin/kafka-server-start.sh config/server.properties

3、创建用于发送数据的主题”topic1”和接收计算结果数据的主题”topic2”。另外打开第三个终端窗口,执行如下命令:

1.  $ cd /opt/kafka
2.  $ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic1
3.  $ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic2

4、浏览所创建的主题。继续在第三个终端窗口下,执行如下命令:

1.  $ ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092

应该可以看到刚才创建的主题”topic1”和”topic2”。

5.2 启动HDFS和Spark集群

在终端窗口下,输入以下命令,启动HDFS和Spark集群:

1.  $ start-dfs.sh
2.  $ cd /opt/spark
3.  $ ./sbin/start-all.sh

5.3 编写Spark结构化流程序

按下来,编写Spark结构化流程序,读取Kafa的”topic1”主题中的内容,经过简单处理,将计算结果写出到Kafka的”topic2”主题。请按以下步骤操作:

1、启动spark-shell。在终端窗口下,执行如下命令:(注意,请将命令中的localhost替换为当前实际的机器名)

1.  $ spark-shell --master spark://localhost:7077 --jars /data/software/spark-sql-kafka-0-10_2.11-2.3.2.jar,/data/software/kafka-clients-2.4.1.jar

2、读取Kafa的”topic1”主题中的内容。在spark-shell窗口,输入如下代码:

1.  val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "topic1").load()

3、将消息内容转换为String类型,再从DataFrame转为DataSet[String]。在spark-shell窗口,输入如下代码:

1.  import spark.implicits._
2.  val content = df.selectExpr("CAST(value AS STRING)").as[String]

4、对从”topic1”抓取的消息进行转换。在spark-shell窗口,输入如下代码:

1.  import org.apache.spark.sql.functions.upper
2.  val resultDf = content.withColumn("value", upper(col("value")));

5、将转换后的DataFrame再写入到Kafka的”topic2”主题中。在spark-shell窗口,进入到paste模式,然后输入如下代码:

1.  val sinkQuery = resultDf.writeStream
2.                          .format("kafka")
3.                          .outputMode("append")
4.                          .option("kafka.bootstrap.servers", "localhost:9092")
5.                          .option("topic","topic2")
6.                          .option("checkpointLocation", "/ck/streaming_kafka")
7.                          .start()

5.4 执行流处理程序

请按以下步骤,执行流处理程序。

  1. 另外再打开一个终端窗口(第4个终端窗口),在这个终端窗口中,启动Kakfa自带的生产者客户端脚本程序,向Kafka的”topic1”主题写入一些消息。在终端窗口下,键入如下命令:

1.  $ cd /opt/kafka
2.  $ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic1

2. 另外再打开一个终端窗口(第5个终端窗口),在这个终端窗口中,启动Kakfa自带的消费者客户端脚本程序,它会消费来自”topic2”主题的消息,查看我们的流程序的输出内容。在终端窗口下,键入如下命令:

1.  $ cd /opt/kafka
2.  $ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic2

3. 切换到第4个终端窗口,输入以下内容,这些内容会被写入到Kafka的”topic1”主题,里面被我们的Spark流程序实时处理,并把处理结果写出到Kafka的”topic2”主题中:

1.  hello spark

4. 切换到第5个终端窗口,会看到如下所示的输出:

HELLO SPARK

5.等待所有的文件都被处理完后,从shell停止运算流。切换到第3个终端窗口,执行以下代码:

1.  sinkQuery.stop()

— END —

相关文章
|
1月前
|
消息中间件 分布式计算 Kafka
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
50 5
|
4月前
|
消息中间件 存储 数据采集
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源
53 0
|
8月前
|
消息中间件 存储 分布式计算
Flink之DataStream API(执行环境、数据源、读取kafka)
Flink之DataStream API(执行环境、数据源、读取kafka)
895 0
|
8月前
|
消息中间件 存储 分布式计算
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(二)
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(二)
|
8月前
|
消息中间件 分布式计算 Kafka
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
|
消息中间件 存储 分布式计算
Kafka 数据源、Receiver 和 Direct 方式接收数据_3|学习笔记
快速学习 Kafka 数据源、Receiver 和 Direct 方式接收数据_3
266 0
|
2月前
|
消息中间件 安全 Kafka
2024年了,如何更好的搭建Kafka集群?
我们基于Kraft模式和Docker Compose同时采用最新版Kafka v3.6.1来搭建集群。
429 2
2024年了,如何更好的搭建Kafka集群?
|
3月前
|
消息中间件 存储 数据可视化
kafka高可用集群搭建
kafka高可用集群搭建
43 0
|
6月前
|
消息中间件 存储 Kubernetes
Helm方式部署 zookeeper+kafka 集群 ——2023.05
Helm方式部署 zookeeper+kafka 集群 ——2023.05
242 0
|
3月前
|
消息中间件 Kafka Linux
Apache Kafka-初体验Kafka(03)-Centos7下搭建kafka集群
Apache Kafka-初体验Kafka(03)-Centos7下搭建kafka集群
64 0