Spark-再接着上次的Lamda架构

本文涉及的产品
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
注册配置 MSE Nacos/ZooKeeper,182元/月
云原生网关 MSE Higress,422元/月
简介: 日志分析单机日志分析,适用于小数据量的。(最大10G),awk/grep/sort/join等都是日志分析的利器。 例子: 1、shell得到Nginx日志中访问量最高的前十个IPcat access.

日志分析

单机日志分析,适用于小数据量的。(最大10G),awk/grep/sort/join等都是日志分析的利器。
例子:
1、shell得到Nginx日志中访问量最高的前十个IP

cat access.log.10 | awk '(a[$1]++) END (for(b in a) print b"\t"a[b])' | sort -k2 -r | head -n 10

2、python 统计每个IP的地址点击数

 import re
 import sys
 contents=sys.argv[1]
 def NginxIpHit(logfile_path):
     ipadd = r'\.'.join([r'\d{1,3}']*4)
     re_ip = re.compile(ipadd)
     iphitlisting = {}
     for line in open(contents):
     match = re_ip.match(line)
     if match:
        ip = match.group()
        iphitlisting[ip]=iphitlisting.get(ip,0)+1
     print iphitlisting
 NginxIpHit(contents)

**大规模的日志处理,日志分析指标:
PV、UV、PUPV、漏斗模型和准化率、留存率、用户属性
最终用UI展示各个指标的信息。**

架构

这里写图片描述

  • 1、实时日志处理流线

数据采集:采用Flume NG进行数据采集
数据汇总和转发:用Flume 将数据转发和汇总到实时消息系统Kafka
数据处理:采用spark streming 进行实时的数据处理
结果显示:flask作为可视化工具进行结果显示

  • 2、离线日志处理流线

数据采集:通过Flume将数据转存到HDFS
数据处理:使用spark sql进行数据的预处理
结果呈现:结果汇总到mysql上,最后使用flask进行结果的展现
Lamda架构:低响应延迟的组合数据传输环境。
查询过程:一次流处理、一次批处理。对应着实时和离线处理。

项目流程

安装flume
Flume进行日志采集,web端的日志一般Nginx、IIS、Tomcat等。Tomcat的日志在var/data/log
安装jdk
安装Flume

wget http://mirrors.cnnic.cn/apache/flume/1.5.0/apache-flume-1.5.0-bin.tar.gz
tar –zxvf  apache-flume-1.5.0-bin.tar.gz
mv apache-flume-1.5.0 –bin  apache-flume-1.5.0
ln   -s  apache-flume-1.5.0   fiume 

环境变量配置

Vim  /etc/profile 
Export JAVA_HOME=/usr/local/jdk
Export CLASS_PATH = .:$ JAVA_HOME/lib/dt.jar: $ JAVA_HOME/lib/tools.jar
Export PATH=$ PATH:$ JAVA_HOME/bin
Export FlUME_HOME=/usr/local/flume
Export FlUME_CONF_DIR=$ FlUME_HOME/conf
Export PATH=$ PATH:$ FlUME_HOME /bin
Souce  /etc/profile 

创建agent配置文件将数据输出到hdfs上,修改flume.conf:

a1.sources = r1
a1.sinks = k1
a1.channels =c1
描述和配置sources
第一步:配置数据源
a1.sources.r1.type =exec
a1.sources.r1.channels =c1
配置需要监控的日志输出目录
a1.sources.r1.command=tail  –f  /va/log/data
第二步:配置数据输出
a1.sink.k1.type =hdfs
a1.sink.k1.channels =c1
a1.sink.k1.hdfs.useLocalTimeStamp=true
a1.sink.k1.hdfs.path =hdfs://192.168.11.177:9000/flume/events/%Y/%m/%d/%H/%M
a1.sink.k1.hdfs.filePrefix =cmcc
a1.sink.k1.hdfs.minBlockReplicas=1
a1.sink.k1.hdfs.fileType =DataStream
a1.sink.k1.hdfs.writeFormat=Text
a1.sink.k1.hdfs.rollInterval =60
a1.sink.k1.hdfs.rollSize =0
a1.sink.k1.hdfs.rollCount=0
a1.sink.k1.hdfs.idleTimeout =0
配置数据通道
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
第四步:将三者级联
a1.souces.r1.channels =c1
a1.sinks.k1.channel =c1

启动Flume Agent

cd  /usr/local/flume
nohup bin/flume-ng  agent  –n  conf  -f  conf/flume-conf.properties
&

已经将flume整合到了hdfs中

  • 整合Flume、kafka、hhdfs
#hdfs输出端
a1.sink.k1.type =hdfs
a1.sink.k1.channels =c1
a1.sink.k1.hdfs.useLocalTimeStamp=true
a1.sink.k1.hdfs.path =hdfs://192.168.11.174:9000/flume/events/%Y/%m/%d/%H/%M
a1.sink.k1.hdfs.filePrefix =cmcc-%H
a1.sink.k1.hdfs.minBlockReplicas=1
a1.sink.k1.hdfs.fileType =DataStream
a1.sink.k1.hdfs.rollInterval =3600
a1.sink.k1.hdfs.rollSize =0
a1.sink.k1.hdfs.rollCount=0
a1.sink.k1.hdfs.idleTimeout =0
#kafka输出端 为了提高性能使用内存通道
a1.sink.k2.type =com.cmcc.chiwei.Kafka.CmccKafkaSink
a1.sink.k2.channels =c2
a1.sink.k2.metadata.broker.List=192.168.11.174:9002;192.168.11.175:9092; 192.168.11.174:9092
a1.sink.k2.partion.key =0
a1.sink.k2.partioner.class= com.cmcc.chiwei.Kafka.Cmcc Partion
a1.sink.k2.serializer.class= kafka. Serializer.StringEncoder
a1.sink.k2.request.acks=0
a1.sink.k2.cmcc.encoding=UTF-8
a1.sink.k2.cmcc.topic.name=cmcc
a1.sink.k2.producer.type =async
a1.sink.k2.batchSize =100

a1.sources.r1.selector.type=replicating


a1.sources = r1
a1.sinks = k1 k2
a1.channels =c1 c2

#c1
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/home/flume/flumeCheckpoint
a1.channels.c1.dataDir=/home/flume/flumeData, /home/flume/flumeDataExt
a1.channels.c1.capacity=2000000
a1.channels.c1.transactionCapacity=100
#c2
a1.channels.c2.type=memory
a1.channels.c2.capacity=2000000
a1.channels.c2.transactionCapacity=100

用Kafka将日志汇总

1.4 Tar –zxvf  kafka_2.10-0.8.1.1.tgz
1.5 配置kafka和zookeeper文件
配置zookeeper.properties
dataDir=/tmp/zookeeper
client.Port=2181
maxClientCnxns = 0
initLimit = 5
syncLimit = 2
##
server.43 = 10.190.182.43:2888:3888
server.38 = 10.190.182.38:2888:3888

server.33 = 10.190.182.33:2888:3888

配置zookeeper myid

在每个服务器dataDir 创建 myid文件 写入本机id
//server.43   myid  本机编号43
echo “43” >  /tmp/ zookeeper/myid
配置kafka文件, config/server.properties
每个节点根据不同主机名配置
broker.id43
host.name:10.190.172.43
zookeeper.connect=10.190.172.43:2181, 10.190.172.33:218110.190.172.38:2181

启动zookeeper
kafka通过zookeeper存储元数据,先启动它,提供kafka相应的连接地址
Kafka自带的zookeeper

在每个节点 bin/zookeeper-server-start.sh config/zookeeper. properties
启动Kafka

Bin/Kafka-server-start.sh

创建和查看topic
Topic和flume中的要一致,spark streming 也用的这个

Bin/ Kafka-topics.sh  --create  --zookeeper 10.190.172.43:2181
 --replication-factor  1  -- partions   1  --topic  KafkaTopic

查看下:

Bin/ Kafka-topics.sh   --describe   -- zookeeper  10.190.172.43:2181

整合kafka sparkstreming

Buid.sbt
Spark-core
Spark-streming
Spark-streamng-kafka
kafka
  • Spark streming 实时分析
    数据收集和中转已经好了,kafka给sparkstreming
  • Spark sql 离线分析
  • Flask可视化

代码

移步: github.com/jinhang

目录
相关文章
|
5天前
|
分布式计算 Kubernetes 调度
Kubeflow-Spark-Operator-架构学习指南
本指南系统解析 Spark Operator 架构,涵盖 Kubebuilder 开发、控制器设计与云原生集成。通过四阶段学习路径,助你从部署到贡献,掌握 Kubernetes Operator 核心原理与实战技能。
35 0
|
3月前
|
SQL JSON 分布式计算
Spark SQL架构及高级用法
Spark SQL基于Catalyst优化器与Tungsten引擎,提供高效的数据处理能力。其架构涵盖SQL解析、逻辑计划优化、物理计划生成及分布式执行,支持复杂数据类型、窗口函数与多样化聚合操作,结合自适应查询与代码生成技术,实现高性能大数据分析。
|
5月前
|
机器学习/深度学习 人工智能 自然语言处理
3 秒音频也能克隆?拆解 Spark-TTS 架构的极致小样本学习
本文深入解析了 Spark-TTS 模型的架构与原理,该模型仅需 3 秒语音样本即可实现高质量的零样本语音克隆。其核心创新在于 BiCodec 单流语音编码架构,将语音信号分解为语义 Token 和全局 Token,实现内容与音色解耦。结合大型语言模型(如 Qwen 2.5),Spark-TTS 能直接生成语义 Token 并还原波形,简化推理流程。实验表明,它不仅能克隆音色、语速和语调,还支持跨语言朗读及情感调整。尽管面临相似度提升、样本鲁棒性等挑战,但其技术突破为定制化 AI 声音提供了全新可能。
412 35
|
11月前
|
分布式计算 大数据 Apache
Apache Spark & Paimon Meetup · 北京站,助力 LakeHouse 架构生产落地
2024年11月15日13:30北京市朝阳区阿里中心-望京A座-05F,阿里云 EMR 技术团队联合 Apache Paimon 社区举办 Apache Spark & Paimon meetup,助力企业 LakeHouse 架构生产落地”线下 meetup,欢迎报名参加!
370 59
|
存储 分布式计算 算法
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
230 0
|
消息中间件 分布式计算 Kafka
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
213 0
|
SQL 存储 分布式计算
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
183 0
|
分布式计算 Spark 容器
Spark 架构和组件集的简要概述
Spark 架构和组件集的简要概述   Flex 4 提供的 Spark 组件 Flex 4 目前提供各种 Spark 组件。Flex 的后续版本将提供更多 Spark 控件,与 MX 组件集并驾齐驱。
|
4月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
220 0