使用外部数据源Kafka

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 使用外部数据源Kafka

一、实验目的

掌握Kafka作为Spark结构化流的外部数据源。

  掌握kafka作为Spark的Sink。

二、实验内容

1、在Spark结构化流中连接Kafka,将Kafka作为外部数据源。

  2、将Spark结构化流程序计算后的结果写入Kafka。

三、实验原理

Kafka通常用于构建实时流数据管道,以可靠地在系统之间移动数据,还用于转换和响应数据流。Kafka作为集群在一个或多个服务器上运行。

 Kafka的一些关键概念在这里描述:

Topic:消息发布到的类别或流名称的高级抽象。主题可以有0、1或多个消费者,这些消费者订阅发布到该主题的消息。用户为每个新的消息类别定义一个新主题;

• Producers:向主题发布消息的客户端;

• Consumers:使用来自主题的消息的客户端;

• Brokers:复制和持久化消息数据的一个或多个服务器。

  默认情况下,Kafka的数据源并不是Spark的内置数据源。如果我们要从spark shell使用Kafka数据源,那么需要在启动spark shell时将依赖的JAR包添加到classpath中。可手动将依赖包添加到classpath。

四、实验环境

硬件:x86_64 ubuntu 16.04服务器

  软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3

五、实验步骤

以下操作均在”VNC界面”下操作。请首先切换到VNC界面。

5.1 启动zookeeper服务和kafka服务

1、启动zookeeper服务

  Kafka依赖于Apache ZooKeeper,所以在启动Kafka之前,要先启动它。打开一个终端窗口,执行如下命令:

1.  $ cd /opt/kafka
2.  $ ./bin/zookeeper-server-start.sh config/zookeeper.properties

这将在2181端口启动ZooKeeper进程

  2、启动kafaka服务

  接下来,启动Kafka服务器。另打开一个终端窗口,执行如下命令:

1.  $ cd /opt/kafka
2.  $ ./bin/kafka-server-start.sh config/server.properties

3、创建用于发送数据的主题”topic1”和接收计算结果数据的主题”topic2”。另外打开第三个终端窗口,执行如下命令:

1.  $ cd /opt/kafka
2.  $ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic1
3.  $ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic2

4、浏览所创建的主题。继续在第三个终端窗口下,执行如下命令:

1.  $ ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092

应该可以看到刚才创建的主题”topic1”和”topic2”。

5.2 启动HDFS和Spark集群

在终端窗口下,输入以下命令,启动HDFS和Spark集群:

1.  $ start-dfs.sh
2.  $ cd /opt/spark
3.  $ ./sbin/start-all.sh

5.3 编写Spark结构化流程序

按下来,编写Spark结构化流程序,读取Kafa的”topic1”主题中的内容,经过简单处理,将计算结果写出到Kafka的”topic2”主题。请按以下步骤操作:

1、启动spark-shell。在终端窗口下,执行如下命令:(注意,请将命令中的localhost替换为当前实际的机器名)

1.  $ spark-shell --master spark://localhost:7077 --jars /data/software/spark-sql-kafka-0-10_2.11-2.3.2.jar,/data/software/kafka-clients-2.4.1.jar

2、读取Kafa的”topic1”主题中的内容。在spark-shell窗口,输入如下代码:

1.  val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "topic1").load()

3、将消息内容转换为String类型,再从DataFrame转为DataSet[String]。在spark-shell窗口,输入如下代码:

1.  import spark.implicits._
2.  val content = df.selectExpr("CAST(value AS STRING)").as[String]

4、对从”topic1”抓取的消息进行转换。在spark-shell窗口,输入如下代码:

1.  import org.apache.spark.sql.functions.upper
2.  val resultDf = content.withColumn("value", upper(col("value")));

5、将转换后的DataFrame再写入到Kafka的”topic2”主题中。在spark-shell窗口,进入到paste模式,然后输入如下代码:

1.  val sinkQuery = resultDf.writeStream
2.                          .format("kafka")
3.                          .outputMode("append")
4.                          .option("kafka.bootstrap.servers", "localhost:9092")
5.                          .option("topic","topic2")
6.                          .option("checkpointLocation", "/ck/streaming_kafka")
7.                          .start()

5.4 执行流处理程序

请按以下步骤,执行流处理程序。

  1. 另外再打开一个终端窗口(第4个终端窗口),在这个终端窗口中,启动Kakfa自带的生产者客户端脚本程序,向Kafka的”topic1”主题写入一些消息。在终端窗口下,键入如下命令:

1.  $ cd /opt/kafka
2.  $ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic1

2. 另外再打开一个终端窗口(第5个终端窗口),在这个终端窗口中,启动Kakfa自带的消费者客户端脚本程序,它会消费来自”topic2”主题的消息,查看我们的流程序的输出内容。在终端窗口下,键入如下命令:

1.  $ cd /opt/kafka
2.  $ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic2

3. 切换到第4个终端窗口,输入以下内容,这些内容会被写入到Kafka的”topic1”主题,里面被我们的Spark流程序实时处理,并把处理结果写出到Kafka的”topic2”主题中:

1.  hello spark

4. 切换到第5个终端窗口,会看到如下所示的输出:

HELLO SPARK

5.等待所有的文件都被处理完后,从shell停止运算流。切换到第3个终端窗口,执行以下代码:

1.  sinkQuery.stop()

— END —

相关文章
|
6月前
|
消息中间件 分布式计算 Kafka
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
102 5
|
5月前
|
消息中间件 分布式计算 DataWorks
DataWorks产品使用合集之如果设置了从Kafka数据源同步到MaxCompute(mc)的任务,任务一直在执行中,是什么原因
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
57 10
|
6月前
|
消息中间件 Java Kafka
MQ产品使用合集之对于Kafka作为数据源的情况,官方比较推荐哪种使用方式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 存储 数据采集
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源
105 0
|
消息中间件 存储 分布式计算
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(二)
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(二)
|
消息中间件 存储 分布式计算
Flink之DataStream API(执行环境、数据源、读取kafka)
Flink之DataStream API(执行环境、数据源、读取kafka)
1825 0
|
消息中间件 分布式计算 Kafka
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
|
消息中间件 存储 分布式计算
Kafka 数据源、Receiver 和 Direct 方式接收数据_3|学习笔记
快速学习 Kafka 数据源、Receiver 和 Direct 方式接收数据_3
295 0
|
1月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
49 1
下一篇
无影云桌面