第二阶段总结|学习笔记

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 快速学习第二阶段总结

开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建)第二阶段总结】学习笔记与课程紧密联系,让用户快速学习知识

课程地址https://developer.aliyun.com/learning/course/670/detail/11639


第二阶段总结

 

内容介绍

一、前情回顾

二、第二阶段总体目标

 

一、前情回顾

数据收集代码,链路统计的调用以及代码,数据同步mysql代码实际上指的是redis同步到mysql的过程,以上学习完以后进入第三阶段,现在开始进行总体的总结。


二、第二阶段的总体目标

首先回顾kafkaAPI,通过lua脚本收集数据,收集数据以后打入到kafka里面,又介绍了爬虫,是反爬虫所以就需要有一个爬虫,给大家提供了一个爬虫,做了测试以及介绍,接下来介绍了spark和hadoop相应的环境就是开发任务相应的环境,下一个是SparkStreaming消费kafka的两种方式,再向后是搭建streaming项目以及实现kafka数据的读取,最后是实现链路统计

1. 读取kafka的API

先看第一部分,读取kafka的API,实际上就是kafka生产者写入数据的过程

首先创建一个kafka的生产者然后创建消息的载体,利用生产者将数据的载体发送数据,就发送到了kafka里面,在关闭kafka的生产者,这个是kafkaAPI的回顾,通过这样的四步

创建 kafka 生产者 KafkaProducer < String . String > kafkaProducer = new KafkaProducercString . Stringprops )

创建数据的载体 ProducerRecord < String , String record = new ProducerRecord < String : String >(" test ","123456");

发送数据 kafkaProducer . send ( record );

关闭生产者 kafkaProducer .close()

2.lua脚本收集数据

后来又学习了通过lua脚本收集数据,这些数据写入到kafka脚本里面,写入的数据是前面我们整理的数据,request等。

配置文件在执行任务前数据采集时涉及到数据分区,而在数据分区时需要用到共享内存变量而lua采集到kafka掌握的相关配置指的是开启一个共享字典,这个共享字典能够让每一个Nginx访问到,类似于spark里面的广播变量,在内存中设计一个变量进行接收比如用count记录数量,然后用来均衡数据,下一步是开启nginx监控和加载lua配置文件,需要用到kafka第三方的包重新加载后将lua生成的上传至集群,加载进去以后传感集群就可以识别到了,nginx集群也可以识别到

开启了一个共享字典,供毎个 nginx 进行消费,这个变量是用来记录消息数( count ),然后用来均衡数据

开启了 nginx 的监控和加载 lua 的配置文件

将 Lua 集成 kafka 的第三方依赖包上传至集群

接下来是书写lua脚本

总的流程分为四步

1导入依赖包

2创建 kafka 生产者

3创建数据的载体

4发送数据

3.自定义分区

这是总的目标,下面要自定义分区,自定义分区重新定义了一下分区的格式,在nginx拿到内存共享空间用count做一个标记用数据的条数和分区的数量进行取余,余几就是哪个分区,这是算法,算法讲完以后count需要进行自增操作,自增第一二三条就能算出相应分区数取余,知道数据应该落在哪个区域

1在 nginx 中设置共享内存2、在 lua 的脚本中取出共享内存,然后在内存中获取 count

2获取到 count 对 topicpartitions 进行取余操作,确定分区

3将 count 进行自增操作

4.数据收集到kafka

收集到kafka实际是收集到request,time_local这些数据,收集到以后使用#CS#进行拼接,拼接后把数据发送到kafka

1采集 request 、 time _ local 等支撑反爬虫项目的数据

2将数据使用”# CS #”进行拼接

3发送数据到 kafka

5.Streaming消费kafka

下一个是Streaming消费kafka的两种方式

这里面讲的主要是createDirectStream和createStream的区别,区别有一下几点,createDirectStream是原始的API是低层次的,createStream是高层次的API经过了二次封装。

createDirectStream由程序维护或者程序员自己维护,而createStream是由zookeeper维护。

第三个区别是createStream有数据丢失的风险所以需要写日志。写日志就涉及到两次写数据,而createDirectStream是写一次数据,createStream创建的是rrd数量与kafka分区没有关系,而createDirectStream是有关系的 

方式:

KafkaUtils .createStream KafkaUtils .createDirectStream()

KafkaUtils . createDirectStream

调用的是低层次(最原始的)的 APl

kafka 的 offset 是由程序员自己或者 DirectStream 维护的只写数据,写一次数据

创建的 rdd 数 kafka topic 的 partition 数量相同(有关系)

KafkaUtils . createStream

调用的是高层(经过二次封装)次的 API

kafka 的 offset 是由z0okeeper维护的

需要写 WAL 日志,再写数据,实际上是写了两次数据

创建的 rdd 数量与 kafka topic 的 partition 没有关系

6. createDirectStream消费数据

createDirectStream消费数据的步骤

第一步创建spark conf第二步创建sparkContext第三部创建StreamingContext第四步读取kafka数据第五部消费数据第六步开启streaming任务+开启循环

1创建 Spark conf

val conf = new SparkConfO . setMaster (" local [2]"). setAppName (" TestCreat eDirect Stream "')

2创建 SparkContext

val sc = new SparkCont ext ( conf )

3创建

Streaming Context val ssc = new StreamingContext ( sc , Seconds (2))

4读取 kafka 内的数据

SsC , kafkaParams , topics )Va7 kafkaDatas = Kafkautils . creat eDirect Stream [ String , String , StringDecoder , StringDecoder ]( ssC , kafkaPar ams , topics )

5消费数据

kafkavalue . foreachRDD ( rdd => rdd . for each ( print ln ))

6开启 Streaming 任务+开启循环

ssc . start()

ssc.awaitTermination()

7. 数据预处理

预处理首先创建项目然后引入pom文件,引入pom文件需要用到bean题目的工具类,爬虫数据预处理以及爬虫框架等依赖。

第三部引入工具类bean枚举类,第四步实现了数据预处理的驱动程序,这个程度第一步按照createDirectStream消费数据步骤创建数据预处理的主程序,将数据读取过来进行了一个升级,创建了一个setupSsc的方法,把读取数据以及实例化streamingcontext的方法全部都加载到setup方法里面去。创建streamingcontext,创建kafka消费的directStream,打印消息以及返回ssc.

1创建项目

2引入 pom 的文件

3引入工具类、 bean 、枚举类

4写 dataprocess 的驱动类( DataProcessLauncher ),在 dataprocess 中步骤如下

 ①定义日志级别和批处理完成后任务结束的设置

②在 main 方法中定义 conf 、 SparkContext

读取 kafka 的参数,定义方法,设置 sparkStreaming 的业务处理

5创建 setupSsC 方法

创建 streamingcontext

创建 kafka 消费的 DirectStream

打印消息返回 ssc

实现链路统计功能,链路统计功能的主要目的是让大家能够知道企业需要实时了解每个链路的运行情况数据采集量,并且把他作为前端展现

8.前端展现操作:

第一步统计每个批次每台服务器访问的总量,第二步是统计每个批次每台服务器当前活跃连接数的量,第三步将两批数据写入 redis。

以下是每个操作的具体步骤,第一步是数据是由#CS#进行拼接的,#CS#进行拆分,拆分后第十个值是服务器ip,第十二个值是当前活跃链接数分别对应的角标是9和11。要拿到服务器的ip先计算出ip和1,调用reducebykey求取ip和对应的总数。

先获取第十个值对应的角标是9这个是服务器的ip,再将ip和1进行返回,调用上划线加下划线求去除这个批次每台服务器的放总量,实际的代码就是如下的代码。

第二步计算每个ip当前活跃用户的连接数,先用#CS#进行拆分,当前活跃用户连接数的下角标是11,先截取第十二个数据及服务器名称,截取出来以后将他们输出,输出小角标为九和十一的,输出以后调用reducebykey,这就拿到了数据当中的最后一个数据,相同的key对应有很多的key value对,拿到了key value对中最后一个,也就是当前活跃用户连接数。

第三步是将数据写入到redis,首先在两个数据不为空的前提下,将两个数据转换成两个小的 map,调用collectAsMap,将两个小的 MAP 封装成一个大的 MAP,这个map里面有一点需要注意的是key要和前端工程师进行约定好的,数据准备好就要准备redis前缀,

前缀在配置文件中已经配置好,从配置文件中调用

propertiesUtil,getStringByKey,传两个参数将数据前缀读取过来,加一个时间戳,时间戳加好在进行初始化配置文件,还定义了数据在redis中存取的时间存储周期,周期默认为二十四个小时,同步周期为两个小时,然后再写入redis就可以了。

1、统计每个批次台服务器访问的总量

获取到一条数据,使用“# CS #”对数据进行切割,

获取切分后的第十个数据(角标是9),将第十个数据和1,进行输出调用 reducebykey (下划线+下划线)求去除这个批次毎台服务器的放总量

 val serverCount = rdd map ( message => f

//2抽取出服务器的 IP

var ip =""

if ( message . split (" CS #",-1). length >9) E ip = message . split (" CS #”,-1)(9)

//3将 p 和1返回

( ip ,1)

). reduceByKey ( _ +_)//4调用 reducebykey 计算出 ip 和总数

2、统计每个批次每台服务器当前活跃链接数的量

获取到一条数据,使用“# CS #”对数据进行切割,

获取切分后的第十二个数据(角标是111)和第十个数据(角标是9),

将第十个数据和第十二个数据,进行输出

调用 reducebykey (( kv )=> v )求去毎个服务器多个数据中的最后一个数据

//1获取到一条数据,使用“ CS #”对数据进行切割

val activeUs erCount = rdd , map ( message =>{

var ip =””

var activeUserCount ="

//切分数据

 if ( message . split (" CS #”").1ength>11){

//2获取切分后的第十二个数据(角标是11)和第十个数据(角标是9)

//截取当前活跃连接数

activeUserCount = message . split (" CS #”)(11)

//截取 IP

ip = message . split ぐ" CS #”)(9)

//3将第十个数据和第十二个数据,进行输出( ip , activeUs erCount )

3). reduceByKey (( k , v = v )//4调用 reducebykey (( k ,)= v )求去每个服务器多个数据中的最后一个数据

3、将两批数据写入 redis

在两个数据不为空的前提下,将两个数据转换成两个小的 map

if (! serverCount . isEmptyO 88 lactiveUserCount . isEmptyO ) f //将两个数据转换成两个小的 map

val serverCountMap = ser verCount .collectAsMap()

val activeUser CountMap = activeUserCount .collectAsMap())

封装最终要写入 redis 的数据(将两个小的 MAP 封装成一个大的 MAP )

val Maps = Map (

" serversCountMap "-> server CountMap ,

" activeNumMap "-> activeUserCountMap

在配置文件中读取出数据 key 的前缀,+时间戳( redis 中对数据的 key )

val

key = PropertiesUtil . get StringByKey (" cluster . key . monitor . linkProcess "," jedisConfig . proper

ties ')+ System .currentTimeMi11isO. toString

在配置文件中读取出数据的有效存储时间

Val

time = PropertiesUtil . getStringByKeyC " cluster . exptime . monitor "," jedisConfig . properties ")

tOInt

将数据写入 redis

 redis . setex ( key , time , Json ( DefaultFormats ). write ( Maps ))

mysql如何展现到前端看一下web代码,这个不是重点,实际上在写简历时不写这里也可以,但是如果能接受最好,面试的时候可以把他写进去

@ Autowired

private IRealTimecomputDataservice realTimeComputDataservice ;

/**

*每隔两个小时定时获取 redis 上的链路数据,存到 mysqL 对应的表中( datacollect ,

*然后删掉 redis 上己经备份 mysqL 的数据。

*

*/

//@ Scheduled ( cron =”**0/2**?”)

@Scheduled ( cron ="0/5****?")

@ Transactional

public void BackupRedisLinkData () {

realTimecomputDataservice . saveDataCollectData ();

相关文章
|
前端开发 Java 应用服务中间件
SpringMVC学习
SpringMVC学习
89 0
|
Shell Go 区块链
阿里云多机部署Fabric 1order节点多个peer节点
阿里云多机部署Fabric 1order节点多个peer节点
293 1
|
Android开发 容器
Flutter教程 - 基础组件(上)
Flutter的几个基础组件解析(上)
Flutter教程 - 基础组件(上)
|
网络安全 Apache Ruby
puppet连载三:服务端安装http、passenger
服务端安装插件: yum install -y ruby-devel ruby-libs rubygems libcurl-devel httpd httpd-devel apr-util-devel apr-devel mod_ssl gcc-c++...
1111 0
|
JavaScript
你不知道的js中关于this绑定机制的解析[看完还不懂算我输]
最近正在看《你不知道的JavaScript》,里面关于this绑定机制的部分讲的特别好,很清晰,这部分对我们js的使用也是相当关键的,并且这也是一个面试的高频考点,所以整理一篇文章分享一下这部分的内容,相信看本文的解析,你一定会有所收获的,如果喜欢的话可以点波赞/关注,支持一下。
1248 0
|
4天前
|
人工智能 运维 安全
|
2天前
|
人工智能 异构计算
敬请锁定《C位面对面》,洞察通用计算如何在AI时代持续赋能企业创新,助力业务发展!
敬请锁定《C位面对面》,洞察通用计算如何在AI时代持续赋能企业创新,助力业务发展!
|
9天前
|
人工智能 JavaScript 测试技术
Qwen3-Coder入门教程|10分钟搞定安装配置
Qwen3-Coder 挑战赛简介:无论你是编程小白还是办公达人,都能通过本教程快速上手 Qwen-Code CLI,利用 AI 轻松实现代码编写、文档处理等任务。内容涵盖 API 配置、CLI 安装及多种实用案例,助你提升效率,体验智能编码的乐趣。
807 109