五、Spark Streaming基于HDFS的实时计算开发
基于HDFS文件的实时计算,其实就是,监控一个HDFS目录,只要其中有新文件出现,就实时处理。
相当于处理实时的文件流。
streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory) streamingContext.streamingContext.textFileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Spark Streaming会监视指定的HDFS目录,并且处理出现在目录中的文件。要注意的是,所有放入 HDFS目录中的文件,都必须有相同的格式;必须使用移动或者重命名的方式,将文件移入目录;一旦 处理之后,文件的内容即使改变,也不会再处理了;基于HDFS文件的数据源是没有Receiver的,因此 不会占用一个cpu core。
创建输入流数据源目录:
bin/hdfs dfs -mkdir -p /user/caizhengjie/datas/sparkstreaming
写入数据:
bin/hdfs dfs -put /opt/datas/11.txt /user/caizhengjie/datas/sparkstreaming bin/hdfs dfs -copyFromLocal /opt/datas/11.txt /user/caizhengjie/datas/sparkstreaming1.txt
源数据节点时间必须与计算节点时间保持同步(重点)
Java语言实现:
package com.kfk.spark.stream; import org.apache.spark.SparkConf; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.util.Arrays; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/14 * @time : 2:12 下午 */ public class HDFSWordCountJava { public static void main(String[] args) throws InterruptedException { // Create a local StreamingContext with two working thread and batch interval of 5 second SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); String path = "hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/sparkstreaming/"; // hdfs数据源 JavaDStream<String> lines = jssc.textFileStream(path); // flatmap JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator()); // map JavaPairDStream<String,Integer> pair = words.mapToPair(word -> new Tuple2<>(word,1)); // reduceByKey JavaPairDStream<String,Integer> wordcount = pair.reduceByKey((x,y) -> x+y); wordcount.print(); jssc.start(); jssc.awaitTermination(); } }
Scala语言实现:
package com.kfk.spark.stream import org.apache.spark.SparkConf import org.apache.spark.streaming.{Durations, StreamingContext} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/14 * @time : 8:11 下午 */ object HDFSWordCountScala { def main(args: Array[String]): Unit = { // Create a local StreamingContext with two working thread and batch interval of 5 second val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val jssc = new StreamingContext(conf, Durations.seconds(1)) val path = "hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/sparkstreaming/" val lines = jssc.textFileStream(path) // flatmap val words = lines.flatMap(word => word.split(" ")) // map val pair = words.map(x => (x,1)) // reduceByKey val wordcount = pair.reduceByKey((x,y) => x+y) wordcount.print() jssc.start() jssc.awaitTermination() } }
六、Spark Streaming读取并处理Socket流数据
当基于Spark -shell运行Streaming程序时,需要注意要不线程数大于1,要么基于集群
bin/spark-shell --master local[2] bin/spark-shell --master spark:node1:7077
传递给spark的master URL可以有如下几种:
local本地单线程
local[K]本地多线程(指定K个内核)
1ocal[*]本地多线程(指定所有可用内核)
spark://HOST:PORT连接到指定的 Spark standalone clustermaster, 需要指定端口。
mesos://HOST:PORT 连接到指定的Mesos集群,需要指定端口。
yarn-client客户端模式连接到YARN集群。需要配置HADOOP_ CONF_ DIR。
yarn-cluster集群模式连接到YARN集群。需要配置HADOOP_ CONF_ DIR。
NC服务安装并运行Spark Streaming
NetCat 下载地址: http://rpm.pbone.net/index.php3/stat/4/idpl/15991371/dir/scientific_linux_6/com/nc1.84-22.el6.x86_64.rpm.html
这里提供百度云下载
链接: https://pan.baidu.com/s/1pFDTnLihK3ODELhDGkQHRQ 密码: u10t
下载完成之后,将它上传到/opt/Hadoop目录下
然后开始安装:
sudo rpm -ivh nc-1.84-22.el6.x86_64.rpm
首先需要通过使用以下命令将Netcat作为数据服务器运行
nc -lk 9999
下面通过idea工具来编写Spark Streaming程序
Java语言实现
package com.kfk.spark.stream; import org.apache.spark.SparkConf; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.util.Arrays; /** * lambda表达式写法 * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/13 * @time : 9:25 下午 */ public class WordCountJava { public static void main(String[] args) throws InterruptedException { // Create a local StreamingContext with two working thread and batch interval of 5 second SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); JavaReceiverInputDStream<String> lines = jssc.socketTextStream("bigdata-pro-m04",9999); // flatmap JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator()); // map JavaPairDStream<String,Integer> pair = words.mapToPair(word -> new Tuple2<>(word,1)); // reduceByKey JavaPairDStream<String,Integer> wordcount = pair.reduceByKey((x,y) -> x+y); wordcount.print(); jssc.start(); jssc.awaitTermination(); } }
scala语言实现
package com.kfk.spark.stream import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/14 * @time : 12:54 下午 */ object WordCountScala { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(5)) val lines = ssc.socketTextStream("bigdata-pro-m04", 9999) // flatmap val words = lines.flatMap(word => word.split(" ")) // map val pair = words.map(x => (x,1)) // reduceByKey val wordcount = pair.reduceByKey((x,y) => x+y) wordcount.print() ssc.start() ssc.awaitTermination() } }
七、Spark Streaming结果数据保存到MySQL数据库
下面我们测试一下Spark Streaming将结果保存到MySQL数据库
还是上面的案例,通过sparkstreaming将单词和次数写入到数据库中
package com.spark.test import java.sql.DriverManager import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/9/27 * @time : 3:43 下午 */ object TestStreaming { def main(args: Array[String]): Unit = { val spark = SparkSession .builder .master("local[2]") .appName("HdfsTest") .getOrCreate() val sc = spark.sparkContext val ssc = new StreamingContext(sc,Seconds(5)) val lines = ssc.socketTextStream("10.211.55.59",9999) val words = lines.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_ + _) // 将rdd展开 words.foreachRDD(rdd => rdd.foreachPartition(line =>{ // 加载驱动 Class.forName("com.mysql.jdbc.Driver") // 过去connection val conn = DriverManager.getConnection("jdbc:mysql://node1:3306/test","root","199911") try{ // 遍历每一行数据写入数据库 for (row <- line){ val sql = "insert into wordCount(titleName,count) values('"+row._1+"',"+row._2+")" conn.prepareStatement(sql).executeUpdate() } }finally { conn.close() } })) words.print() ssc.start() ssc.awaitTermination() } }
首先启动nc服务
nc -lk 9999
这里我是使用spark-shell运行的,运行的方式:
通过使用:paste可以复制多行代码。
查看测试数据:
八、Spark Streaming与Kafka集成进行数据处理
Kafka项目在版本0.8和0.10之间引入了新的使用者API,因此有2个单独的相应Spark Streaming包可用。请为您的经纪人和所需功能选择正确的软件包;请注意,0.8集成与以后的0.9和0.10代理兼容,但0.10集成与早期的代理不兼容。
注意:自Spark 2.3.0起已弃用Kafka 0.8支持。
这里我使用的版本是kafka_2.11-2.1.1
相关操作可以看一下官网的解释:
http://spark.apache.org/docs/2.4.6/streaming-kafka-0-10-integration.html
首先加载pom.xml的配置文件
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId> <version>${saprk.version}</version> </dependency>
Creating a Direct Stream
请注意,导入的名称空间包括版本org.apache.spark.streaming.kafka010
Direct方式案例
第一步:启动服务
首先要启动zookeeper,再启动kafka,三台要同时启动
启动zookeeper:
zkServer.sh start
启动kafka
在前台启动kafka,注意查看打印在桌面的日志,有无报错信息
bin/kafka-server-start.sh config/server.properties
如果没有报错信息,启动正常,那么就可以在后台启动了
bin/kafka-server-start.sh -daemon config/server.properties
第二步:创建topic
创建一个分区和一个副本的“spark”的topic
bin/kafka-topics.sh --create --zookeeper bigdata-pro-m04:2181 --replication-factor 1 --partitions 1 --topic spark
第三步:启动sparkstreaming与kafka连接
Java语言实现
package com.kfk.spark.common; import org.apache.spark.SparkConf; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaStreamingContext; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/14 * @time : 8:23 下午 */ public class CommStreamingContext { public static JavaStreamingContext getJssc(){ SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("CommStreamingContext"); return new JavaStreamingContext(conf, Durations.seconds(2)); } }
package com.kfk.spark.stream; import com.kfk.spark.common.CommStreamingContext; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import scala.Tuple2; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Map; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/14 * @time : 8:20 下午 */ public class StreamingKafkaJava { public static void main(String[] args) throws InterruptedException { JavaStreamingContext jssc = CommStreamingContext.getJssc(); // sparkstreaming与kafka连接 Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers", "bigdata-pro-m04:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "streaming_kafka_1"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); // 设置topic Collection<String> topics = Arrays.asList("spark"); // kafka数据源 JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream( jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) ); // flatmap JavaDStream<String> words = stream.flatMap(record -> Arrays.asList(record.value().trim().split(" ")).iterator()); // map JavaPairDStream<String,Integer> pair = words.mapToPair(word -> new Tuple2<>(word,1)); // reduceByKey JavaPairDStream<String,Integer> wordcount = pair.reduceByKey((x,y) -> x+y); wordcount.print(); jssc.start(); jssc.awaitTermination(); } }
scala语言实现:
package com.kfk.spark.stream import com.kfk.spark.common.CommStreamingContextScala import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/14 * @time : 9:56 下午 */ object StreamingKafkaScala { def main(args: Array[String]): Unit = { val jssc = CommStreamingContextScala.getJssc; // sparkstreaming与kafka连接 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "bigdata-pro-m04:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "streaming_kafka_1", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) // 设置topic val topics = Array("spark") // kafka数据源 val stream = KafkaUtils.createDirectStream[String, String]( jssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) // flatmap val words = stream.flatMap(record => record.value().trim.split(" ")) // map val pair = words.map(x => (x,1)) // reduceByKey val wordcount = pair.reduceByKey((x,y) => x+y) wordcount.print() jssc.start() jssc.awaitTermination() } }
运行上面代码,出现下图所示表示连接成功
第四步:启动生产者
bin/kafka-console-producer.sh --broker-list bigdata-pro-m04:9092 --topic spark
测试示例:
>java java hive hive >hadoop hbase java
运行结果:
到这里就表示SparkStreaming与Kafka集成成功!
九、Spark Streaming 集成Kafka开发- 基于Direct的方式
在Spark 1.3之后通过createDirectStream替代掉原来使用Receiver来接收数据,这种方式会周期性地查询 Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的 job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。
这种方式有如下优点:
1、简化并行读取: 如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。 Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。
2、高性能: 如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低 下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复 制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复 制,那么就可以通过Kafka的副本进行恢复。
3、一次且仅一次的事务机制:
基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费 Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被 处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。
基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在 checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。