开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建):数据处理-链路统计redis到mysql2】学习笔记与课程紧密联系,让用户快速学习知识
课程地址:https://developer.aliyun.com/learning/course/670/detail/11637
数据处理-链路统计 redis 到 mysql2
上节数据从 redis 里面读取数据,读取过来以后统计了某一个 redis 某一个 IP 的总和计算出来,并且写入到 LinkCount 里面。
写完以后,把它进行了返回。返回到了 linkCount,linkcount 里面记录的就是某一个IP对应的所有的总和。总和拿到了以后,得到的结果是一个 IP String 和一个和Integer。
如果有多个的时候,这里面就有可能多个值,Linkcount这里面就会有多个,项目代码是LinkCount.keySet。keySet拿到了所有的key。拿到了所有的key,就变成了keySet。
而这个keySet又去遍历,如果这个是多个,就遍历其中的每一个,拿到了某一个IP serveAddr,拿到了这个IP。
后面来了一条hql语句,叫“from Datacollect where serve _name= :serveName”;。
这个 serveName 就是 params。这里面的 params 是个 map,是一个 string 和object。string 就叫做 serveName,实际上就是这个值,必须一模一样。
serverAddr中string key有了而value就是刚刚计算出来的 ip serveAddr。拿到了 ip以后,传入到params里面。Hql和参数有了,接下来就执行这个hql。
“from Datacollect where serve _name= :serveName”;。
找到爬虫中的datacollect。这里面的数据已经被导入进去,有一部分数据了,条件是 serve _name= :serveName,这个 serveName 指的就是 serveAddr,也就是当前的ip。
目前的ip就是100.160.这条datacollect数据就是返回的结果。假如现在的数据库是空的,一行数据都没有,在没有的情况下,第一次获取出来是空的,把数据写进去,先实例化 datacollect,初始化一个UUID().toString())。
这里面的key随便给一个值 UUID, serveName指的就是 serveAddr,也就是当前的ip。
datacollect也是一个bean,bean里面有id,serveName,beforeYesterdayNum和lastThreeDaysNum,就是数据表中datacollect所对应的bean12345这五个字段。
bean就是映射的表里面的数据和表里面的结构。实际上就是serverName,serveName 映射的是数据库里的 serve _name,这个就是 ip,服务器的ip就处理完了。
设置 lastThreeDaysNum 近三天的数据,近三天的数据等于 linkCount.get.( serveAddr).linkCount就是前面计算出来的ip和对应的总和,也就是map。get.(serveAddr)就是这个ip。
从这个ip中获取某一个key,对应的值给lastday,把beforeYesterdayNum设置成0,把setYesterdayNum设置成linkCount.get.( serveAddr)。
当第一次读取没有数据的时候,设置成这样一个值。初始化以后,调用datacollectDAO.save(datacollect),把数据保存到数据库里面,这是第一次读取没有数据的情况。现在读取是有数据的。有数据的时候就用到else里面。就可以计算昨天今天前天近三天的量,近三天的数据=昨天的数量+前天的数量+今天的数量。
Datacollect.setlastThreeDaysNum(inkCount.get.( serveAddr))
就是刚刚统计出来的那个结果+datacollect.getYesterdayNum()+datacollect.getBeforeYesterdayNum())。
将昨天的数据赋值给前天,将今天的数据赋值给昨天,将最新的数据更新到数据库。近三天的数据=昨天的数量+前天的数量+今天的数量。
把数据库中的serve-name,before-yesterday-num 和 last-threeDays-num三个值进行更新,计算出了近三天的数据。把 servename 和 save 保存到了 mysql 里面。就实现了从 redis 读取数据写入到 mysql 的过程。
总结一下,trafficLinkInfo 传递的是前缀,前缀读取了所有的 key,遍历所有的 key中的每一个,计算出某一个 ip 的总和,全部都是在redis里面读取key,再去redis当中通过key取值,读完以后返回 linkCount,这里就是所有的 key 对应的总和,再去数据库里面查。如果数据库里对应的 ip没有,就设置一个值。
设置的值就是刚刚计算出来的结果,如果有,就将昨天的数据赋值给前天,将今天的数据赋值给昨天,将最新的数据更新到数据库,近三天的数据=昨天的数量+前天的数量+今天的数量。这就实现了 redis 读取数据并写入 mysql 过程
代码
public void SaveDataCollectData(){
Jediscluster jediscluster = Jedisconnectionutil . getJediscluster ();
Map < String , Integer > linkCount = TrafficUtil . trafficLinkInfo ( Constants . CSANTI _ MONITOR _ LP );
/循环存储所有的 IP
Set < String > keySet = linkCount . keySet ();
/遍历每一个 serverAddr 其中的一个 IP
for ( String serverAddr : keySet ){
/根据 serveraddr 查询链路历史数据
String hql =" from Datacollect where server _ name =: serverName "; Map く String ,0bject> params - new HashMap く~>0);
params . put (" serverName ", serverAddr )3//192.168.2.111//
读取出数据库中 server _ name 与 serverAddr 相同的数据
Datacollect datacollect - datacollectDao , get ( hql , params );/ mysql
历史数据不存在,新存入数据
if ( datacollect ==nul1){
datacollect = new Datacollect ();
datacollect . setId ( UUID . randomUUID (). tostring() );
datacollect . setserverName ( serverAddr );
datacollect . setLastThreeDaysNum ( linkCount . get ( serverAddr ));
datacollect . setBeforeYesterdayNum ( 0 );
datacollect . setYesterdayNum ( linkCount . get ( serverAddr )); dataCollectDao . save ( datacollect );
} else {
//历史数据存在,累加
//近三天的数据=昨天的数量+前天的数量+今天的数量
datacollect . setLastThreeDaysNum ( linkCount . get ( serverAddr )
+ datacollect . getYesterdayNum ()
+ datacollect . getBeforeYesterdayNum ());
// 将昨天的数据赋值给 datacollect
// 将今天的数据赋值给昨天
datacollect . setYesterdayNum ( linkCount . get ( serverAddr ));
//将最新的数据更新到数去
dataCollectDao . update ( datacOllect )3
setBeforeYesterdayNum ( datacollect . getYesterdayNum ());
3.bean 字段
//与数据库中的字段对应
private string id ;
private String serverName ;
private Integer beforeYesterdayNum ; private Integer yesterdayNum ;
private Integer lastThreeDaysNum ;