一、实验目的
掌握Kafka作为Spark结构化流的外部数据源。
掌握kafka作为Spark的Sink。
二、实验内容
1、在Spark结构化流中连接Kafka,将Kafka作为外部数据源。
2、将Spark结构化流程序计算后的结果写入Kafka。
三、实验原理
Kafka通常用于构建实时流数据管道,以可靠地在系统之间移动数据,还用于转换和响应数据流。Kafka作为集群在一个或多个服务器上运行。
Kafka的一些关键概念在这里描述:
• Topic:消息发布到的类别或流名称的高级抽象。主题可以有0、1或多个消费者,这些消费者订阅发布到该主题的消息。用户为每个新的消息类别定义一个新主题;
• Producers:向主题发布消息的客户端;
• Consumers:使用来自主题的消息的客户端;
• Brokers:复制和持久化消息数据的一个或多个服务器。
默认情况下,Kafka的数据源并不是Spark的内置数据源。如果我们要从spark shell使用Kafka数据源,那么需要在启动spark shell时将依赖的JAR包添加到classpath中。可手动将依赖包添加到classpath。
四、实验环境
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3
五、实验步骤
以下操作均在”VNC界面”下操作。请首先切换到VNC界面。
5.1 启动zookeeper服务和kafka服务
1、启动zookeeper服务
Kafka依赖于Apache ZooKeeper,所以在启动Kafka之前,要先启动它。打开一个终端窗口,执行如下命令:
1. $ cd /opt/kafka 2. $ ./bin/zookeeper-server-start.sh config/zookeeper.properties
这将在2181端口启动ZooKeeper进程
2、启动kafaka服务
接下来,启动Kafka服务器。另打开一个终端窗口,执行如下命令:
1. $ cd /opt/kafka 2. $ ./bin/kafka-server-start.sh config/server.properties
3、创建用于发送数据的主题”topic1”和接收计算结果数据的主题”topic2”。另外打开第三个终端窗口,执行如下命令:
1. $ cd /opt/kafka 2. $ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic1 3. $ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic2
4、浏览所创建的主题。继续在第三个终端窗口下,执行如下命令:
1. $ ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
应该可以看到刚才创建的主题”topic1”和”topic2”。
5.2 启动HDFS和Spark集群
在终端窗口下,输入以下命令,启动HDFS和Spark集群:
1. $ start-dfs.sh 2. $ cd /opt/spark 3. $ ./sbin/start-all.sh
5.3 编写Spark结构化流程序
按下来,编写Spark结构化流程序,读取Kafa的”topic1”主题中的内容,经过简单处理,将计算结果写出到Kafka的”topic2”主题。请按以下步骤操作:
1、启动spark-shell。在终端窗口下,执行如下命令:(注意,请将命令中的localhost替换为当前实际的机器名)
1. $ spark-shell --master spark://localhost:7077 --jars /data/software/spark-sql-kafka-0-10_2.11-2.3.2.jar,/data/software/kafka-clients-2.4.1.jar
2、读取Kafa的”topic1”主题中的内容。在spark-shell窗口,输入如下代码:
1. val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "topic1").load()
3、将消息内容转换为String类型,再从DataFrame转为DataSet[String]。在spark-shell窗口,输入如下代码:
1. import spark.implicits._ 2. val content = df.selectExpr("CAST(value AS STRING)").as[String]
4、对从”topic1”抓取的消息进行转换。在spark-shell窗口,输入如下代码:
1. import org.apache.spark.sql.functions.upper 2. val resultDf = content.withColumn("value", upper(col("value")));
5、将转换后的DataFrame再写入到Kafka的”topic2”主题中。在spark-shell窗口,进入到paste模式,然后输入如下代码:
1. val sinkQuery = resultDf.writeStream 2. .format("kafka") 3. .outputMode("append") 4. .option("kafka.bootstrap.servers", "localhost:9092") 5. .option("topic","topic2") 6. .option("checkpointLocation", "/ck/streaming_kafka") 7. .start()
5.4 执行流处理程序
请按以下步骤,执行流处理程序。
1. 另外再打开一个终端窗口(第4个终端窗口),在这个终端窗口中,启动Kakfa自带的生产者客户端脚本程序,向Kafka的”topic1”主题写入一些消息。在终端窗口下,键入如下命令:
1. $ cd /opt/kafka 2. $ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic1
2. 另外再打开一个终端窗口(第5个终端窗口),在这个终端窗口中,启动Kakfa自带的消费者客户端脚本程序,它会消费来自”topic2”主题的消息,查看我们的流程序的输出内容。在终端窗口下,键入如下命令:
1. $ cd /opt/kafka 2. $ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic2
3. 切换到第4个终端窗口,输入以下内容,这些内容会被写入到Kafka的”topic1”主题,里面被我们的Spark流程序实时处理,并把处理结果写出到Kafka的”topic2”主题中:
1. hello spark
4. 切换到第5个终端窗口,会看到如下所示的输出:
HELLO SPARK
5.等待所有的文件都被处理完后,从shell停止运算流。切换到第3个终端窗口,执行以下代码:
1. sinkQuery.stop()
— END —