Spark Streaming架构原理详解!(二)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 笔记

五、Spark Streaming基于HDFS的实时计算开发


基于HDFS文件的实时计算,其实就是,监控一个HDFS目录,只要其中有新文件出现,就实时处理。

相当于处理实时的文件流。

streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory) 
streamingContext.streamingContext.textFileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory) 

Spark Streaming会监视指定的HDFS目录,并且处理出现在目录中的文件。要注意的是,所有放入 HDFS目录中的文件,都必须有相同的格式;必须使用移动或者重命名的方式,将文件移入目录;一旦 处理之后,文件的内容即使改变,也不会再处理了;基于HDFS文件的数据源是没有Receiver的,因此 不会占用一个cpu core。


创建输入流数据源目录:

bin/hdfs dfs -mkdir -p /user/caizhengjie/datas/sparkstreaming

写入数据:

bin/hdfs dfs -put /opt/datas/11.txt /user/caizhengjie/datas/sparkstreaming 
bin/hdfs dfs -copyFromLocal /opt/datas/11.txt /user/caizhengjie/datas/sparkstreaming1.txt

源数据节点时间必须与计算节点时间保持同步(重点)

Java语言实现:

package com.kfk.spark.stream;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/14
 * @time : 2:12 下午
 */
public class HDFSWordCountJava {
    public static void main(String[] args) throws InterruptedException {
        // Create a local StreamingContext with two working thread and batch interval of 5 second
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
        String path = "hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/sparkstreaming/";
        // hdfs数据源
        JavaDStream<String> lines = jssc.textFileStream(path);
        // flatmap
        JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        // map
        JavaPairDStream<String,Integer> pair =  words.mapToPair(word -> new Tuple2<>(word,1));
        // reduceByKey
        JavaPairDStream<String,Integer> wordcount = pair.reduceByKey((x,y) -> x+y);
        wordcount.print();
        jssc.start();
        jssc.awaitTermination();
    }
}

Scala语言实现:

package com.kfk.spark.stream
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Durations, StreamingContext}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/14
 * @time : 8:11 下午
 */
object HDFSWordCountScala {
    def main(args: Array[String]): Unit = {
        // Create a local StreamingContext with two working thread and batch interval of 5 second
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
        val jssc = new StreamingContext(conf, Durations.seconds(1))
        val path = "hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/sparkstreaming/"
        val lines = jssc.textFileStream(path)
        // flatmap
        val words = lines.flatMap(word => word.split(" "))
        // map
        val pair = words.map(x => (x,1))
        // reduceByKey
        val wordcount = pair.reduceByKey((x,y) => x+y)
        wordcount.print()
        jssc.start()
        jssc.awaitTermination()
    }
}


六、Spark Streaming读取并处理Socket流数据


当基于Spark -shell运行Streaming程序时,需要注意要不线程数大于1,要么基于集群

bin/spark-shell --master local[2]
bin/spark-shell --master spark:node1:7077

传递给spark的master URL可以有如下几种:


local本地单线程

local[K]本地多线程(指定K个内核)

1ocal[*]本地多线程(指定所有可用内核)

spark://HOST:PORT连接到指定的 Spark standalone clustermaster, 需要指定端口。

mesos://HOST:PORT 连接到指定的Mesos集群,需要指定端口。

yarn-client客户端模式连接到YARN集群。需要配置HADOOP_ CONF_ DIR。

yarn-cluster集群模式连接到YARN集群。需要配置HADOOP_ CONF_ DIR。

NC服务安装并运行Spark Streaming

NetCat 下载地址: http://rpm.pbone.net/index.php3/stat/4/idpl/15991371/dir/scientific_linux_6/com/nc1.84-22.el6.x86_64.rpm.html


这里提供百度云下载

链接: https://pan.baidu.com/s/1pFDTnLihK3ODELhDGkQHRQ 密码: u10t

下载完成之后,将它上传到/opt/Hadoop目录下

然后开始安装:

sudo rpm -ivh nc-1.84-22.el6.x86_64.rpm 

首先需要通过使用以下命令将Netcat作为数据服务器运行

nc -lk 9999

下面通过idea工具来编写Spark Streaming程序

Java语言实现

package com.kfk.spark.stream;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
/**
 * lambda表达式写法
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/13
 * @time : 9:25 下午
 */
public class WordCountJava {
    public static void main(String[] args) throws InterruptedException {
        // Create a local StreamingContext with two working thread and batch interval of 5 second
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("bigdata-pro-m04",9999);
        // flatmap
        JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        // map
        JavaPairDStream<String,Integer> pair =  words.mapToPair(word -> new Tuple2<>(word,1));
        // reduceByKey
        JavaPairDStream<String,Integer> wordcount = pair.reduceByKey((x,y) -> x+y);
        wordcount.print();
        jssc.start();
        jssc.awaitTermination();
    }
}

scala语言实现

package com.kfk.spark.stream
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/14
 * @time : 12:54 下午
 */
object WordCountScala {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
        val ssc = new StreamingContext(conf, Seconds(5))
        val lines = ssc.socketTextStream("bigdata-pro-m04", 9999)
        // flatmap
        val words = lines.flatMap(word => word.split(" "))
        // map
        val pair = words.map(x => (x,1))
        // reduceByKey
        val wordcount = pair.reduceByKey((x,y) => x+y)
        wordcount.print()
        ssc.start()
        ssc.awaitTermination()
    }
}


七、Spark Streaming结果数据保存到MySQL数据库


下面我们测试一下Spark Streaming将结果保存到MySQL数据库

还是上面的案例,通过sparkstreaming将单词和次数写入到数据库中

package com.spark.test
import java.sql.DriverManager
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/9/27
 * @time : 3:43 下午
 */
object TestStreaming {
    def main(args: Array[String]): Unit = {
        val spark = SparkSession
                .builder
                .master("local[2]")
                .appName("HdfsTest")
                .getOrCreate()
        val sc = spark.sparkContext
        val ssc = new StreamingContext(sc,Seconds(5))
        val lines = ssc.socketTextStream("10.211.55.59",9999)
        val words = lines.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_ + _)
        // 将rdd展开
        words.foreachRDD(rdd => rdd.foreachPartition(line =>{
            // 加载驱动
            Class.forName("com.mysql.jdbc.Driver")
            // 过去connection
            val conn = DriverManager.getConnection("jdbc:mysql://node1:3306/test","root","199911")
            try{
                // 遍历每一行数据写入数据库
                for (row <- line){
                    val sql = "insert into wordCount(titleName,count) values('"+row._1+"',"+row._2+")"
                    conn.prepareStatement(sql).executeUpdate()
                }
            }finally {
                conn.close()
            }
        }))
        words.print()
        ssc.start()
        ssc.awaitTermination()
    }
}

首先启动nc服务

nc -lk 9999

这里我是使用spark-shell运行的,运行的方式:

15.png

通过使用:paste可以复制多行代码。

查看测试数据:

16.png


八、Spark Streaming与Kafka集成进行数据处理


Kafka项目在版本0.8和0.10之间引入了新的使用者API,因此有2个单独的相应Spark Streaming包可用。请为您的经纪人和所需功能选择正确的软件包;请注意,0.8集成与以后的0.9和0.10代理兼容,但0.10集成与早期的代理不兼容。

注意:自Spark 2.3.0起已弃用Kafka 0.8支持。

这里我使用的版本是kafka_2.11-2.1.1

相关操作可以看一下官网的解释:

http://spark.apache.org/docs/2.4.6/streaming-kafka-0-10-integration.html

首先加载pom.xml的配置文件

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
  <version>${saprk.version}</version>
</dependency>

Creating a Direct Stream

请注意,导入的名称空间包括版本org.apache.spark.streaming.kafka010


Direct方式案例

第一步:启动服务

首先要启动zookeeper,再启动kafka,三台要同时启动

启动zookeeper:

zkServer.sh start

启动kafka

在前台启动kafka,注意查看打印在桌面的日志,有无报错信息

bin/kafka-server-start.sh config/server.properties

如果没有报错信息,启动正常,那么就可以在后台启动了

bin/kafka-server-start.sh -daemon config/server.properties

第二步:创建topic

创建一个分区和一个副本的“spark”的topic

bin/kafka-topics.sh --create --zookeeper bigdata-pro-m04:2181 --replication-factor 1 --partitions 1 --topic spark

第三步:启动sparkstreaming与kafka连接

Java语言实现

package com.kfk.spark.common;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/14
 * @time : 8:23 下午
 */
public class CommStreamingContext {
    public static JavaStreamingContext getJssc(){
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("CommStreamingContext");
        return new JavaStreamingContext(conf, Durations.seconds(2));
    }
}
package com.kfk.spark.stream;
import com.kfk.spark.common.CommStreamingContext;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/14
 * @time : 8:20 下午
 */
public class StreamingKafkaJava {
    public static void main(String[] args) throws InterruptedException {
        JavaStreamingContext jssc = CommStreamingContext.getJssc();
        // sparkstreaming与kafka连接
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "bigdata-pro-m04:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "streaming_kafka_1");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);
        // 设置topic
        Collection<String> topics = Arrays.asList("spark");
        // kafka数据源
        JavaInputDStream<ConsumerRecord<String, String>> stream =
                KafkaUtils.createDirectStream(
                        jssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                );
        // flatmap
        JavaDStream<String> words = stream.flatMap(record -> Arrays.asList(record.value().trim().split(" ")).iterator());
        // map
        JavaPairDStream<String,Integer> pair = words.mapToPair(word -> new Tuple2<>(word,1));
        // reduceByKey
        JavaPairDStream<String,Integer> wordcount = pair.reduceByKey((x,y) -> x+y);
        wordcount.print();
        jssc.start();
        jssc.awaitTermination();
    }
}

scala语言实现:

package com.kfk.spark.stream
import com.kfk.spark.common.CommStreamingContextScala
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/14
 * @time : 9:56 下午
 */
object StreamingKafkaScala {
    def main(args: Array[String]): Unit = {
        val jssc = CommStreamingContextScala.getJssc;
        // sparkstreaming与kafka连接
        val kafkaParams = Map[String, Object](
            "bootstrap.servers" -> "bigdata-pro-m04:9092",
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "group.id" -> "streaming_kafka_1",
            "auto.offset.reset" -> "latest",
            "enable.auto.commit" -> (false: java.lang.Boolean)
        )
        // 设置topic
        val topics = Array("spark")
        // kafka数据源
        val stream = KafkaUtils.createDirectStream[String, String](
            jssc,
            PreferConsistent,
            Subscribe[String, String](topics, kafkaParams)
        )
        // flatmap
        val words = stream.flatMap(record => record.value().trim.split(" "))
        // map
        val pair = words.map(x => (x,1))
        // reduceByKey
        val wordcount = pair.reduceByKey((x,y) => x+y)
        wordcount.print()
        jssc.start()
        jssc.awaitTermination()
    }
}

运行上面代码,出现下图所示表示连接成功

20.png

第四步:启动生产者

bin/kafka-console-producer.sh --broker-list bigdata-pro-m04:9092 --topic spark

测试示例:

>java java hive hive 
>hadoop hbase java

运行结果:

21.png

到这里就表示SparkStreaming与Kafka集成成功!



九、Spark Streaming 集成Kafka开发- 基于Direct的方式


在Spark 1.3之后通过createDirectStream替代掉原来使用Receiver来接收数据,这种方式会周期性地查询 Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的 job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。


这种方式有如下优点:

1、简化并行读取: 如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。 Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。


2、高性能: 如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低 下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复 制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复 制,那么就可以通过Kafka的副本进行恢复。


3、一次且仅一次的事务机制:

基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费 Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被 处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。


基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在 checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。


相关文章
|
2月前
|
存储 SQL 关系型数据库
MySQL进阶突击系列(03) MySQL架构原理solo九魂17环连问 | 给大厂面试官的一封信
本文介绍了MySQL架构原理、存储引擎和索引的相关知识点,涵盖查询和更新SQL的执行过程、MySQL各组件的作用、存储引擎的类型及特性、索引的建立和使用原则,以及二叉树、平衡二叉树和B树的区别。通过这些内容,帮助读者深入了解MySQL的工作机制,提高数据库管理和优化能力。
|
2月前
|
人工智能 前端开发 编译器
【AI系统】LLVM 架构设计和原理
本文介绍了LLVM的诞生背景及其与GCC的区别,重点阐述了LLVM的架构特点,包括其组件独立性、中间表示(IR)的优势及整体架构。通过Clang+LLVM的实际编译案例,展示了从C代码到可执行文件的全过程,突显了LLVM在编译器领域的创新与优势。
122 3
|
3月前
|
运维 持续交付 云计算
深入解析云计算中的微服务架构:原理、优势与实践
深入解析云计算中的微服务架构:原理、优势与实践
121 3
|
27天前
|
Java Linux C语言
《docker基础篇:2.Docker安装》包括前提说明、Docker的基本组成、Docker平台架构图解(架构版)、安装步骤、阿里云镜像加速、永远的HelloWorld、底层原理
《docker基础篇:2.Docker安装》包括前提说明、Docker的基本组成、Docker平台架构图解(架构版)、安装步骤、阿里云镜像加速、永远的HelloWorld、底层原理
313 89
|
1天前
|
存储 SQL 缓存
MySQL原理简介—2.InnoDB架构原理和执行流程
本文介绍了MySQL中更新语句的执行流程及其背后的机制,主要包括: 1. **更新语句的执行流程**:从SQL解析到执行器调用InnoDB存储引擎接口。 2. **Buffer Pool缓冲池**:缓存磁盘数据,减少磁盘I/O。 3. **Undo日志**:记录更新前的数据,支持事务回滚。 4. **Redo日志**:确保事务持久性,防止宕机导致的数据丢失。 5. **Binlog日志**:记录逻辑操作,用于数据恢复和主从复制。 6. **事务提交机制**:包括redo日志和binlog日志的刷盘策略,确保数据一致性。 7. **后台IO线程**:将内存中的脏数据异步刷入磁盘。
|
19天前
|
存储 缓存 监控
ClickHouse 架构原理及核心特性详解
ClickHouse 是由 Yandex 开发的开源列式数据库,专为 OLAP 场景设计,支持高效的大数据分析。其核心特性包括列式存储、字段压缩、丰富的数据类型、向量化执行和分布式查询。ClickHouse 通过多种表引擎(如 MergeTree、ReplacingMergeTree、SummingMergeTree)优化了数据写入和查询性能,适用于电商数据分析、日志分析等场景。然而,它在事务处理、单条数据更新删除及内存占用方面存在不足。
169 21
|
19天前
|
存储 消息中间件 druid
Druid 架构原理及核心特性详解
Druid 是一个分布式、支持实时多维OLAP分析的列式存储数据处理系统,适用于高速实时数据读取和灵活的多维数据分析。它通过Segment、Datasource等元数据概念管理数据,并依赖Zookeeper、Hadoop和Kafka等组件实现高可用性和扩展性。Druid采用列式存储、并行计算和预计算等技术优化查询性能,支持离线和实时数据分析。尽管其存储成本较高且查询语言功能有限,但在大数据实时分析领域表现出色。
72 19
|
19天前
|
存储 SQL NoSQL
Doris 架构原理及核心特性详解
Doris 是百度内部孵化的OLAP项目,现已开源并广泛应用。它采用MPP架构、向量化执行引擎和列存储技术,提供高性能、易用性和实时数据处理能力。系统由FE(管理节点)和BE(计算与存储节点)组成,支持水平扩展和高可用性。Doris 适用于海量数据分析,尤其在电商、游戏等行业表现出色,但资源消耗较大,复杂查询优化有局限性,生态集成度有待提高。
61 15
|
16天前
|
Java 网络安全 开发工具
Git进阶笔记系列(01)Git核心架构原理 | 常用命令实战集合
通过本文,读者可以深入了解Git的核心概念和实际操作技巧,提升版本管理能力。
|
1月前
|
机器学习/深度学习 算法 PyTorch
深度强化学习中SAC算法:数学原理、网络架构及其PyTorch实现
软演员-评论家算法(Soft Actor-Critic, SAC)是深度强化学习领域的重要进展,基于最大熵框架优化策略,在探索与利用之间实现动态平衡。SAC通过双Q网络设计和自适应温度参数,提升了训练稳定性和样本效率。本文详细解析了SAC的数学原理、网络架构及PyTorch实现,涵盖演员网络的动作采样与对数概率计算、评论家网络的Q值估计及其损失函数,并介绍了完整的SAC智能体实现流程。SAC在连续动作空间中表现出色,具有高样本效率和稳定的训练过程,适合实际应用场景。
151 7
深度强化学习中SAC算法:数学原理、网络架构及其PyTorch实现

热门文章

最新文章