大数据处理工具Kafka、Zk、Spark(下)

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: 大数据处理工具Kafka、Zk、Spark(下)

关闭防火墙


报错信息:
2020-09-14 03:28:23,562 [myid:0] - WARN  [WorkerSender[myid=0]:QuorumCnxManager@588] - Cannot open channel to 3 at election address h6/192.168.1.16:3888
java.net.ConnectException: 拒绝连接 (Connection refused)


此时需要关闭防火墙


sudo systemctl stop firewalld #临时关闭
sudo systemctl disable firewalld #然后reboot 永久关闭
sudo systemctl status  firewalld #查看防火墙状态


分别启动zk


mkdir /opt/kafka/run
mkdir /opt/kafka/run/kafka
mkdir /opt/kafka/run/zk
cd /opt/kafka/run/zk
nohup /opt/kafka/kafka_2.10-0.8.2.1/bin/zookeeper-server-start.sh /opt/kafka/kafka_2.10-0.8.2.1/config/zookeeper.properties &


分别启动kafka


cd /opt/kafka/run/kafka
nohup /opt/kafka/kafka_2.10-0.8.2.1/bin/kafka-server-start.sh /opt/kafka/kafka_2.10-0.8.2.1/config/server.properties &


查看kafka和zk进程是否启动


ps -ef | grep kafka


验证kafka、zk环境是否可用


创建消息主题


/opt/kafka/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --create \
--replication-factor 3 \
--partition 3 \
--topic user-behavior-topic \
--zookeeper 192.168.84.128:2181,192.168.84.129:2181,192.168.84.130:2181


通过console producer生产消息


启动console producer
/opt/kafka/kafka_2.10-0.8.2.1/bin/kafka-console-producer.sh --broker-list 192.168.84.128:9092 --topic user-behavior-topic


通过console consumer消费消息


在另一台机器打开consumer
/opt/kafka/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper 192.168.84.129:2181 --topic user-behavior-topic --from-beginning
如果在producer console输入一条消息 能从consumer console看到这条消息就代表安装是成功的


centos-1 生产消息


image.png


说明kafka和zk集群环境是可用的


Spark


安装包下载


https://www.apache.org/dyn/closer.lua/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
tar xvf spark-3.0.1-bin-hadoop2.7.tgz


依赖条件


jdk1.8
python 2.7.5


部署方式


1、 底层资源调度可以依赖外部的资源调度框架:相对稳定的Mesos、Hadoop YARN模式


2、使用spark内建的standalone模式


  • Local[N]本地模式 使用N个线程


  • Local Cluster[Worker,core,Memory]


伪分布式 可以配置所需要启动的虚拟工作节点的数量
以及每个工作节点所管理的CPU数量和内存尺寸
Spark://hostname:port:Standalone模式


  • Yarn client

主程序运行在本地 具体任务运行在yarn集群


  • YARN standalone/Yarn cluster

主程序逻辑和任务都运行在YARN集群中


  • 需要部署Spark到相关节点

URL为Spark Master主机地址和端口


Mesos://hostname:port:Mesos 模式


  • 需要部署Spark和Mesos到相关节点


URL为Mesos主机地址和端口


上面的部署方式:实际应用中spark应用程序的运行模式取决于传递给 sparkcontext的

master环境变量的值


个别模式还需要依赖辅助程序接口来配合使用


示例代码


examples/src/main


运行脚本


bin/run-example[params]


计算PI


spark-3.0.1-bin-hadoop2.7/bin/run-example SparkPi 10 > Sparkpilog.txt
日志包含两部分
一部分是通用日志信息由一系列脚本及程序产生(计算机信息、spark信息)
另一部分是运行程序的输出结果


计算词数


假设有一个数据文件wordcountdata.txt
统计该文件单词出现的个数
spark-3.0.1-bin-hadoop2.7/bin/run-example JavaWordCount ./wordcountdata.txt


RDD


一个spark的任务对应一个RDD
RDD是弹性分布式数据集即一个RDD代表一个被分区的只读数据集
一个RDD生成
可以来自于内存集合和外部系统
也可通过转换操作来自于其他RDD map filter join


脚本的调用过程


Run-example.sh->load-spark-env.sh->lib 目录下的 jar 包文件->spark-submit.sh->spark-class


image.png


Scala


1、
scala最终启动的是jvm线程
所以它可以访问java的库文件 例如java.io.File
2、
通过Main函数的方式启动了一个JVM进程
随后针对该进程又托管了一系列线程级别的操作
3、
scala 简单 轻巧 相对java 非常适合并行计算框架的编写


运行过程


image.png


函数


  • map


根据现有数据集返回一个新的分布式数据集


由于每个原元素经过func函数转换后组成


  • flatMap


每一个输入函数 会被影射为0到多个输出函数


返回值是一个Seq 而不是单一元素


  • reduceByKey


在一个(K,V)对的数据集上使用返回一个(K,V)对的数据集


Key相同都会被指定的reduce聚合在一起


总体工作流程


无论本地模式
还是分布式模式
内部程序逻辑结构都是类似的
只是其中部分模块有所简化
本地模式中 集群管理模块被简化为进程内部的线程池


image.png


spark环境部署


使用docker部署 spark集群


飞机票安装SBT环境运行Scala项目


结尾


下篇文章通过一个实际案例来介绍下如何使用 spark streaming
案例描述:
假设某论坛需要根据用户对站内网页的
点击量,停留时间,以及是否点赞,
来近实时的计算网页热度,
进而动态的更新网站的今日热点模块,
把最热话题的链接显示其中。
相关文章
|
3天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
19 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
25天前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
54 0
|
25天前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
34 0
|
5天前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
18 6
|
3天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
20 2
|
3天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
18 1
|
4天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
25 1
|
15天前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
28 1
|
23天前
|
消息中间件 分布式计算 Kafka
大数据平台的毕业设计02:Spark与实时计算
大数据平台的毕业设计02:Spark与实时计算
|
25天前
|
SQL 消息中间件 分布式计算
大数据-141 - ClickHouse 集群 副本和分片 Zk 的配置 Replicated MergeTree原理详解(一)
大数据-141 - ClickHouse 集群 副本和分片 Zk 的配置 Replicated MergeTree原理详解(一)
46 0