大数据处理工具Kafka、Zk、Spark(下)

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 大数据处理工具Kafka、Zk、Spark(下)

关闭防火墙


报错信息:
2020-09-14 03:28:23,562 [myid:0] - WARN  [WorkerSender[myid=0]:QuorumCnxManager@588] - Cannot open channel to 3 at election address h6/192.168.1.16:3888
java.net.ConnectException: 拒绝连接 (Connection refused)


此时需要关闭防火墙


sudo systemctl stop firewalld #临时关闭
sudo systemctl disable firewalld #然后reboot 永久关闭
sudo systemctl status  firewalld #查看防火墙状态


分别启动zk


mkdir /opt/kafka/run
mkdir /opt/kafka/run/kafka
mkdir /opt/kafka/run/zk
cd /opt/kafka/run/zk
nohup /opt/kafka/kafka_2.10-0.8.2.1/bin/zookeeper-server-start.sh /opt/kafka/kafka_2.10-0.8.2.1/config/zookeeper.properties &


分别启动kafka


cd /opt/kafka/run/kafka
nohup /opt/kafka/kafka_2.10-0.8.2.1/bin/kafka-server-start.sh /opt/kafka/kafka_2.10-0.8.2.1/config/server.properties &


查看kafka和zk进程是否启动


ps -ef | grep kafka


验证kafka、zk环境是否可用


创建消息主题


/opt/kafka/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --create \
--replication-factor 3 \
--partition 3 \
--topic user-behavior-topic \
--zookeeper 192.168.84.128:2181,192.168.84.129:2181,192.168.84.130:2181


通过console producer生产消息


启动console producer
/opt/kafka/kafka_2.10-0.8.2.1/bin/kafka-console-producer.sh --broker-list 192.168.84.128:9092 --topic user-behavior-topic


通过console consumer消费消息


在另一台机器打开consumer
/opt/kafka/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper 192.168.84.129:2181 --topic user-behavior-topic --from-beginning
如果在producer console输入一条消息 能从consumer console看到这条消息就代表安装是成功的


centos-1 生产消息


image.png


说明kafka和zk集群环境是可用的


Spark


安装包下载


https://www.apache.org/dyn/closer.lua/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
tar xvf spark-3.0.1-bin-hadoop2.7.tgz


依赖条件


jdk1.8
python 2.7.5


部署方式


1、 底层资源调度可以依赖外部的资源调度框架:相对稳定的Mesos、Hadoop YARN模式


2、使用spark内建的standalone模式


  • Local[N]本地模式 使用N个线程


  • Local Cluster[Worker,core,Memory]


伪分布式 可以配置所需要启动的虚拟工作节点的数量
以及每个工作节点所管理的CPU数量和内存尺寸
Spark://hostname:port:Standalone模式


  • Yarn client

主程序运行在本地 具体任务运行在yarn集群


  • YARN standalone/Yarn cluster

主程序逻辑和任务都运行在YARN集群中


  • 需要部署Spark到相关节点

URL为Spark Master主机地址和端口


Mesos://hostname:port:Mesos 模式


  • 需要部署Spark和Mesos到相关节点


URL为Mesos主机地址和端口


上面的部署方式:实际应用中spark应用程序的运行模式取决于传递给 sparkcontext的

master环境变量的值


个别模式还需要依赖辅助程序接口来配合使用


示例代码


examples/src/main


运行脚本


bin/run-example[params]


计算PI


spark-3.0.1-bin-hadoop2.7/bin/run-example SparkPi 10 > Sparkpilog.txt
日志包含两部分
一部分是通用日志信息由一系列脚本及程序产生(计算机信息、spark信息)
另一部分是运行程序的输出结果


计算词数


假设有一个数据文件wordcountdata.txt
统计该文件单词出现的个数
spark-3.0.1-bin-hadoop2.7/bin/run-example JavaWordCount ./wordcountdata.txt


RDD


一个spark的任务对应一个RDD
RDD是弹性分布式数据集即一个RDD代表一个被分区的只读数据集
一个RDD生成
可以来自于内存集合和外部系统
也可通过转换操作来自于其他RDD map filter join


脚本的调用过程


Run-example.sh->load-spark-env.sh->lib 目录下的 jar 包文件->spark-submit.sh->spark-class


image.png


Scala


1、
scala最终启动的是jvm线程
所以它可以访问java的库文件 例如java.io.File
2、
通过Main函数的方式启动了一个JVM进程
随后针对该进程又托管了一系列线程级别的操作
3、
scala 简单 轻巧 相对java 非常适合并行计算框架的编写


运行过程


image.png


函数


  • map


根据现有数据集返回一个新的分布式数据集


由于每个原元素经过func函数转换后组成


  • flatMap


每一个输入函数 会被影射为0到多个输出函数


返回值是一个Seq 而不是单一元素


  • reduceByKey


在一个(K,V)对的数据集上使用返回一个(K,V)对的数据集


Key相同都会被指定的reduce聚合在一起


总体工作流程


无论本地模式
还是分布式模式
内部程序逻辑结构都是类似的
只是其中部分模块有所简化
本地模式中 集群管理模块被简化为进程内部的线程池


image.png


spark环境部署


使用docker部署 spark集群


飞机票安装SBT环境运行Scala项目


结尾


下篇文章通过一个实际案例来介绍下如何使用 spark streaming
案例描述:
假设某论坛需要根据用户对站内网页的
点击量,停留时间,以及是否点赞,
来近实时的计算网页热度,
进而动态的更新网站的今日热点模块,
把最热话题的链接显示其中。
相关文章
|
12天前
|
分布式计算 DataWorks 关系型数据库
MaxCompute 生态系统中的数据集成工具
【8月更文第31天】在大数据时代,数据集成对于构建高效的数据处理流水线至关重要。阿里云的 MaxCompute 是一个用于处理大规模数据集的服务平台,它提供了强大的计算能力和丰富的生态系统工具来帮助用户管理和处理数据。本文将详细介绍如何使用 DataWorks 这样的工具将 MaxCompute 整合到整个数据处理流程中,以便更有效地管理数据生命周期。
34 0
|
11天前
|
机器学习/深度学习 分布式计算 大数据
Spark 适合解决多种类型的大数据处理问题
【9月更文挑战第1天】Spark 适合解决多种类型的大数据处理问题
24 3
|
15天前
|
分布式计算 大数据 Apache
跨越界限:当.NET遇上Apache Spark,大数据世界的新篇章如何谱写?
【8月更文挑战第28天】随着信息时代的发展,大数据已成为推动企业决策、科研与技术创新的关键力量。Apache Spark凭借其卓越的分布式计算能力和多功能数据处理特性,在大数据领域占据重要地位。然而,对于.NET开发者而言,如何在Spark生态中发挥自身优势成为一个新课题。为此,微软与Apache Spark社区共同推出了.NET for Apache Spark,使开发者能用C#、F#等语言编写Spark应用,不仅保留了Spark的强大功能,还融合了.NET的强类型系统、丰富库支持及良好跨平台能力,极大地降低了学习门槛并拓展了.NET的应用范围。
33 3
|
19天前
|
分布式计算 搜索推荐 物联网
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
|
20天前
|
分布式计算 大数据 数据处理
Apache Spark的应用与优势:解锁大数据处理的无限潜能
【8月更文挑战第23天】Apache Spark以其卓越的性能、易用性、通用性、弹性与可扩展性以及丰富的生态系统,在大数据处理领域展现出了强大的竞争力和广泛的应用前景。随着大数据技术的不断发展和普及,Spark必将成为企业实现数字化转型和业务创新的重要工具。未来,我们有理由相信,Spark将继续引领大数据处理技术的发展潮流,为企业创造更大的价值。
|
19天前
|
存储 分布式计算 供应链
Spark在供应链核算中应用问题之调整Spark读取ODPS离线表分区大小如何解决
Spark在供应链核算中应用问题之调整Spark读取ODPS离线表分区大小如何解决
|
11天前
|
Java Spring API
Spring框架与GraphQL的史诗级碰撞:颠覆传统,重塑API开发的未来传奇!
【8月更文挑战第31天】《Spring框架与GraphQL:构建现代API》介绍了如何结合Spring框架与GraphQL构建高效、灵活的API。首先通过引入`spring-boot-starter-data-graphql`等依赖支持GraphQL,然后定义查询和类型,利用`@GraphQLQuery`等注解实现具体功能。Spring的依赖注入和事务管理进一步增强了GraphQL服务的能力。示例展示了从查询到突变的具体实现,证明了Spring与GraphQL结合的强大潜力,适合现代API设计与开发。
28 0
|
21天前
|
大数据 RDMA
神龙大数据加速引擎MRACC问题之MRACC-Spark利用eRDMA近网络优化插件来提升性能如何解决
神龙大数据加速引擎MRACC问题之MRACC-Spark利用eRDMA近网络优化插件来提升性能如何解决
26 0
|
19天前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
56 9
|
1月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
49 3

热门文章

最新文章