关闭防火墙
报错信息: 2020-09-14 03:28:23,562 [myid:0] - WARN [WorkerSender[myid=0]:QuorumCnxManager@588] - Cannot open channel to 3 at election address h6/192.168.1.16:3888 java.net.ConnectException: 拒绝连接 (Connection refused)
此时需要关闭防火墙
sudo systemctl stop firewalld #临时关闭 sudo systemctl disable firewalld #然后reboot 永久关闭 sudo systemctl status firewalld #查看防火墙状态
分别启动zk
mkdir /opt/kafka/run mkdir /opt/kafka/run/kafka mkdir /opt/kafka/run/zk cd /opt/kafka/run/zk nohup /opt/kafka/kafka_2.10-0.8.2.1/bin/zookeeper-server-start.sh /opt/kafka/kafka_2.10-0.8.2.1/config/zookeeper.properties &
分别启动kafka
cd /opt/kafka/run/kafka nohup /opt/kafka/kafka_2.10-0.8.2.1/bin/kafka-server-start.sh /opt/kafka/kafka_2.10-0.8.2.1/config/server.properties &
查看kafka和zk进程是否启动
ps -ef | grep kafka
验证kafka、zk环境是否可用
创建消息主题
/opt/kafka/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --create \ --replication-factor 3 \ --partition 3 \ --topic user-behavior-topic \ --zookeeper 192.168.84.128:2181,192.168.84.129:2181,192.168.84.130:2181
通过console producer生产消息
启动console producer /opt/kafka/kafka_2.10-0.8.2.1/bin/kafka-console-producer.sh --broker-list 192.168.84.128:9092 --topic user-behavior-topic
通过console consumer消费消息
在另一台机器打开consumer /opt/kafka/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper 192.168.84.129:2181 --topic user-behavior-topic --from-beginning 如果在producer console输入一条消息 能从consumer console看到这条消息就代表安装是成功的
centos-1 生产消息
说明kafka和zk集群环境是可用的
Spark
安装包下载
https://www.apache.org/dyn/closer.lua/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz tar xvf spark-3.0.1-bin-hadoop2.7.tgz
依赖条件
jdk1.8 python 2.7.5
部署方式
1、 底层资源调度可以依赖外部的资源调度框架:相对稳定的Mesos、Hadoop YARN模式
2、使用spark内建的standalone模式
- Local[N]本地模式 使用N个线程
- Local Cluster[Worker,core,Memory]
伪分布式 可以配置所需要启动的虚拟工作节点的数量 以及每个工作节点所管理的CPU数量和内存尺寸 Spark://hostname:port:Standalone模式
- Yarn client
主程序运行在本地 具体任务运行在yarn集群
- YARN standalone/Yarn cluster
主程序逻辑和任务都运行在YARN集群中
- 需要部署Spark到相关节点
URL为Spark Master主机地址和端口
Mesos://hostname:port:Mesos 模式
- 需要部署Spark和Mesos到相关节点
URL为Mesos主机地址和端口
上面的部署方式:实际应用中spark应用程序的运行模式取决于传递给 sparkcontext的
master环境变量的值
个别模式还需要依赖辅助程序接口来配合使用
示例代码
examples/src/main
运行脚本
bin/run-example[params]
计算PI
spark-3.0.1-bin-hadoop2.7/bin/run-example SparkPi 10 > Sparkpilog.txt 日志包含两部分 一部分是通用日志信息由一系列脚本及程序产生(计算机信息、spark信息) 另一部分是运行程序的输出结果
计算词数
假设有一个数据文件wordcountdata.txt 统计该文件单词出现的个数 spark-3.0.1-bin-hadoop2.7/bin/run-example JavaWordCount ./wordcountdata.txt
RDD
一个spark的任务对应一个RDD RDD是弹性分布式数据集即一个RDD代表一个被分区的只读数据集 一个RDD生成 可以来自于内存集合和外部系统 也可通过转换操作来自于其他RDD map filter join
脚本的调用过程
Run-example.sh->load-spark-env.sh->lib 目录下的 jar 包文件->spark-submit.sh->spark-class
Scala
1、 scala最终启动的是jvm线程 所以它可以访问java的库文件 例如java.io.File 2、 通过Main函数的方式启动了一个JVM进程 随后针对该进程又托管了一系列线程级别的操作 3、 scala 简单 轻巧 相对java 非常适合并行计算框架的编写
运行过程
函数
- map
根据现有数据集返回一个新的分布式数据集
由于每个原元素经过func函数转换后组成
- flatMap
每一个输入函数 会被影射为0到多个输出函数
返回值是一个Seq 而不是单一元素
- reduceByKey
在一个(K,V)对的数据集上使用返回一个(K,V)对的数据集
Key相同都会被指定的reduce聚合在一起
总体工作流程
无论本地模式 还是分布式模式 内部程序逻辑结构都是类似的 只是其中部分模块有所简化 本地模式中 集群管理模块被简化为进程内部的线程池
spark环境部署
使用docker部署 spark集群
结尾
下篇文章通过一个实际案例来介绍下如何使用 spark streaming 案例描述: 假设某论坛需要根据用户对站内网页的 点击量,停留时间,以及是否点赞, 来近实时的计算网页热度, 进而动态的更新网站的今日热点模块, 把最热话题的链接显示其中。