开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建):数据处理-链路统计Redis到Mysql1】学习笔记与课程紧密联系,让用户快速学习知识
课程地址:https://developer.aliyun.com/learning/course/670/detail/11636
数据处理-链路统计Redis到Mysql1
内容介绍
一、回顾
二、Redis 是如何同步到 mysql 中去的
三、从 redis 读取数据同步到 mysql 的具体操作
一.回顾
数据都写到 redis 里面,然后这个数据界面里的数据的结果自动的就展示出来,这里面还差几步,redis 数据有一部分同步到 mysql 里面去,那为什么要同步 musql里面去?是怎么同步过去的?
这是第一步,还有一部分就是Mysql里面的数据是怎么展现到前端的?
以及redis链路当前活跃的用户连接数是怎么从redis到我的界面的,这个代码属于前端在数据库中读取数据展现的一个过程,是前端工程师的工作。
二、Redis 是如何同步到 mysql 中去的
Redis 同步到 mysql 主要是由 web 界面触发的,如何触发的?找到web工程,找到以后找到项目代码,项目代码中有一个定时的任务,双击定时任务
*每隔两个小时定时获取 redis 上的链路数据,存到 mysqL 对应的表中( datacoLLect ),*然后删掉redis 上已经备份到 mysqL 的数据。
默认的情况下系统自带的是按照上述代码每隔两小时执行的
但是我们不能等两小时,所以让它五秒钟发一次
三、从 redis 读取数据同步到 mysql 的具体操作
1. 具体代码
第一个代码是整个数据同步任务,cron用的是links中的crontime定时触发,每隔五秒钟运行一下代码。代码是如何运行的呢,进入到代码里面去看一下,选中redis读取数据同步到 mysql 的代码块 saveDataCollectData ();点击此代码块按住 Ctrl alt,这样就进入到了实际实现从 redis 读取数据同步到 mysql 的代码块,
@ Autowired
private IRealTimecomputDataservice realTimeComputDataservice ;
/**
*每隔两个小时定时获取 redis 上的链路数据,存到 mysqL 对应的表中( datacollect ,
*然后删掉 redis 上己经备份 mysqL 的数据。
*
*/
//@ Scheduled ( cron =”**0/2**?”)
@Scheduled ( cron ="0/5****?")
@ Transactional
public void BackupRedisLinkData () {
realTimecomputDataservice . saveDataCollectData ();
}
2. 下面看一下是如何实现的
首先创建了一个redis集群,前面我们在运行时有一段代码未显示出来,调整了环境配置文件之后就能读取数据了,此时读取的数据就是通过配置文件读取获得到我的19268100.160这个redis的集群。redis 集群拿到后继续向后走
@ Override
public void saveDatacollectData ()
JedisCluster jedisCluster = JedisConnctionUtil . getJedisCLusterO );
//记录当天的链璐总数
Map <192.168.2.141,100>
Map < String , Integer > linkCount = Trafficutil . trafficLinkInfo ( Constants . cSANTI _ MOITOR _ LP );
//循环存储所有的 IP
Set < String > keySet = linkCount . keySet ();
//遍历每一个 IP serverAddr 其中的一个 IP
for ( String serverAddr : keySet ){
//根据 serveraddr 查而链路历史数据
String hql =“ from Datacollect where server _ name =: serverName ";
Map く String ,0bject> params = new HashMap く~> O );
params . put (" serverName ", serverAddr );//192.168.2.111
//读取出数据库中 server _ name 与 serverAddr 相同的数据
Datacollect datacollect = datacollectDao . get ( hql , params );
// mysql 历史数据不存在,新存入数据
if ( datacollect ==null){
datacollect = new Datacollect ();
datacollect . setId ( UUID . randomUUID (). tostring ();
datacollect . setserverName ( serverAddr );
datacollect . setLastThreeDay sNum ( linkCount . get ( serverAddr ));
datacollect . setBeforeYesterdayNum (0);
Trafficutil . trafficLinkInfo 里面传入的是一个参数,Constants . cSANTI _ MOITOR _ LP );这个参数是流量数据的键值表示,这个参数在数据写入时redis就是key,这个key以此参数为前缀,再加一个时间戳,就是key的一个格式。
写入数据时 redis 里面的 key 就是写入的链路统计link前缀,按照这个作为前缀加一个时间戳向里面写,在读取时将 Constants . cSANTI _ MOITOR _ LP );作为前缀塞到 trafficLinkInfo 方法中。
前缀传递过去以后又弄了一个 redis 集群,
redis 由 cluster 传递进来,strflage 是刚传递过来的前缀 strflage+“*”拼成了一个字符串传递到
jedisConnectionUtil.keys 里面去
public static Map く String , Integer > trafficLinkInfo ( string strFlage ) {// strFlage
就 CSANTI MONITOR LR
//实例化redis集群
JedisCluster jedsCluster = JedisConnectionUtil . getJedisCluster ();
//从 redis 中拿出所有以 CSANTI_MONITOR_LP 作为前缀的 key
set < String > keySet = JedisConnectionUtil . keys ( jedisCluster , pattern : strFlage +"*”);
3. keys
keys 是干什么的,点击进去
public static Treeset < String > keys ( Jediscluster jc , string pattern ){
TreeSet < String > keys = new TreeSet く>();
Map く String , JedisPool > clusterNodes = jc · getclusterNodes (); for ( String k : clusterNodes . keySet (){
JedisPool jp = clusterNodes · get ( k );
Jedis jedis = jp .getResourceO3
try {
keys .addA1l( jedis . keys ( pattern ));
catch ( Exception e ){
e . printstackTrace ();
finally{
//用完一定要 close 这个链接
jedis . close ();
一定要 cLose .这个链接!!!
}
}
return keys
4.JC
Jc是一个 redis 集群,而 pattern 是一个前缀通过 getclusterNodes 获得了 redis 里集群的一个节点,通过节点遍历节点找到 key,key 再往后初始化 clusterNodes 获得一个节点,然后再根据getResource属性来实践化一个 redis ,这个 redis 也就是 jedis,jedis.keys 就是拿到了以 pattern 为前缀的所有的 key,keys 是 string 中的一个 keyset,获得的所有数据写入到 treeset 中。
写入进去以后关闭集群,然后在返回keys通过这步就拿到了redis里面所有的以CSANTI_MONITOR_LP为前缀的属性,遍历所有的key拿到其中的某一个key,拿到key
以后,key通过定义的redis集群.getkey拿到的结果就是key对应的所有value值,这里面的value就是redis里选中的所有结果
try {
//循坏所有 key
for ( String str : keySet )// str CSANTIMONITOR _LP1553088805099
LinkJsonVO j5onVO= neW LinkJ5onvo()3
//获得某一个个 key 对应 value (" serversCountNap ":【"192.168.2.141“:30)," activeNumMap ":【"192.168.2.141":
String value = jedisCluster . get ( str );
将{" serversCountMap ":"192.168.2.141”;30)," activeNumNap ":{"192.168.2.141”:"5"}
转换 inkJsonvo
jsonVO =JsonResolveUtil.resolveLinkJ5on( value );
ServersCountN
Map < String , Integer > serversCountMap = jsonVO . getserversCountMap ();
//获取每个serversCountMap中的 IP
Set < String >keySet2= serversCountMap keySet ();
//遍历每个 IP string 就是每个 IP
for ( String string :keySet2){
//先从 LnkCount 中我取这个 IP 的值,
// Integer orDefault = linkCount . get ( string );
//若没有获得到,就赋值0 ,若能获得到,将其取出(第一次是空的,后面每次都有值)
orDefault = orDefault == null ?0:orDefault;
//获取{"192.168.2.141":30)中虹 P 对应的值一﹣>30 Integer orDefault2= serversCountMap . get ( string );
//将总统z计 inkCount 内的值和这个 IP 的值求和,并推送到总统计变量中
4. jsonVO
JsonVO是一个bean,bean的主要用途是从redis缓存数据库中读取数据,读取过来以后转化成bean,这就是前面将计算出来的数据写入到redis时初始化两个小的map,小map将它转化为大map,大map的key就是这里来的,redis里的数据转化为JsonVO
通过JsonResolveUtil.resolveLinkJ5on( value )返回JsonVO,
5. LinkJsonvo
JonResolveUtil.resolveLinkJ5on( value 是将string类型的结果转化成bean,传入的参数为字符串如果等于null或者等于空就返回return null,如果不等于就将这个string类型的值转化成LinkJsonvo的class,也就转化成了bean,这个bean就是LinkJsonvo jsonV0,传入过来的字符串已经通过这几行代码转化成了LinkJsonvo jsonV0,字符串返回的是一个jsonvo的bean,所以前面恰好是用LinkJsonvo接收的
public class JsonResolveUtil {
public static Jsonvo resolveJson ( String value ){
if ( null == valuell value =="")
return null ;
}
JsonVo jsonvO= new Jsonvo ();
jsonvo = JSON . parseobject ( value , Jsonvo . class );
return jsonVO ;
public static LinkJsonvo resolveLinkJson ( String value ){
if (nul1= valuell value ==""){
return null ;
LinkJsonvo jsonV0= new LinkJsonvo ();
jsonVO = JSON . parseobject ( value , LinkJsonvo . class );
return jsonVO ;
6. 如何获取结果
JsonVo的结果是从 bean 中 getserverscountMap,这个获取到的结果就是获取到了 redis 中的 serverscountMap,
192.168.100.160:12IP+数值,返回的结果是一个String , Integer,就变成了Ip+数值,转化完以后调用map中的keyset,拿到的是一个集合,keyset是string类型的key有多个值,有可能是多个,在我们的代码中只有一个服务器可能只有一个,如果是多台服务器就是多个值。keyset中拿到的有可能是多个值,多个值再遍历每个值,遍历每个值做什么?
string就是其中的一个IP,这个IP先到linkcount中去获取,刚刚我们看到了linkcount,但是并没有仔细研究,linkcount就是用来记录当前链路的总数,现在第一次运行程序的时候他是空的,空的时候从空的map里面去获取key肯定获取不到,获取不到就是空的。
在向下看如果Default等于null?0orDefault这是一个三元运算符,如果这个值等于null就返回0,如果不等于null那是什么就返回什么。
第一次肯定是0,那orDefault就是0在从serversCountMap . get ( string ),serversCountMap指的是String ,和 Integer,是一个IP➕值,serversCountMap 在. get key就获取了map中的key对应的值value,这个值是12,所以这里面get的值拿到的Default2就是12,就是0➕12,前面的string是一个ip,ip拿到以后一个循环就做完了。
前面在redis中拿到了所有的key,在这里面第一个key刚遍历完,在遍历第二个key,是一模一样的流程,拿到key转化成jsonvo在获取到下一个serverscountMap,拿到服务器ip以后,再去linkcount里面将ip和数值得到,循环的目的是为了得到所有的数据的serverscountMap的ip对应的总和,对应的总和计算出来以后,把linkcount进行返回,最终将一个ip对应的这批数据的所有总和算出来返回了
try
//循环所有的key
for ( String str : keySet )// str CSANTIMONITOR _LP1553088805099
LinkJsonvo jsonV0= new LinkJsonVo ();
//获得某一个 key 对应 vaLue ({" serverscountMap ":{"192.168.2.141”:30)," activeNunap ":{"192.168.2.141”:
String value = jediscluster . get ( str );
//将【" serversCountMap ":{"192.168.2.141”:30)," activeNumMap ":{"192.168.2.141":"5"
转换成L inkJsonVO类的
jsonVo = JsonResolveUtil . resolveLinkJson ( value );
//获取jsonVO中的包含serveraddress ->"192.168.2.141":35)
Map < String , Integer > serversCountMap = jsonVo . getserversCountMap (); Set < String >keySet2= serversCountMap . keySet く);
//获取每个 serversCountMap 中的 IP
for ( String string :keySet2){
//先从 LinkCount 中获取这个 IP 的值,
Integer orDefault = linkCount . get ( string );
//若没有获得到,就赋值0,若能获得到,将其取出(第一次是空的,后面每次都有值)
orDefault = orDefault ==null?0:orDefault;
//获取 {"192.168.2.141":30}中IP对应的值﹣->30
Integer orDefault2= serversCountMap . get ( string );
//}
将总统讯 linkCount 内的值和这个 IP 的值求和,并推送到总统计变量中
linkCount . put ( string , orDefault +orDefault2);
}
}