推广一下个人的微信公众号【数据与算法联盟】
转载请注明出处: 转载自 Thinkgamer的CSDN博客:blog.csdn.net/gamer_gyt
代码下载地址:点击查看
1:Web日志分析系统概述
2:需求分析:日志提取预处理,KPI指标设计,存储与展现
3:算法模型:Hadoop并行算法
4:架构设计:构建hadoop项目
5:程序实现:MR2V程序实现
6:结果可视化
一:Web日志分析系统概述
Web日志由Web服务器产生,可能是Nginx,Apache,Tomcat等,从Web日志中我们可以提取到很多有用的信息,比如说网站每类网页的浏览量(PV),独立IP数,稍微复杂一些包括用户检索的关键词排行,用户停留时间,是否遭遇黑客攻击等,更复杂的我们可以构建广告点击模型,分析用户行为特征等,从而为站方创造价值
当然现在已经存在一些做的很好的日志统计分析平台,比如说百度统计,谷歌统计.......
拿上面的百度统计举例,来源分析其实就是对网站浏览量的一个监控,提取的是日志中的pv,下面的列表是便是对浏览者信息记录的呈现,其中地域便是对ip地址的一个统计,当然我们不能和百度相比,但是我们可以做一个简化的“百度统计”,暂且称它为CyanS统计吧
由于我自己网站的数据量太小了,且几乎都是我自己访问的,所以就从朋友那取了一些进行web日志分析(我的服务器只是简单的部署了环境,作品展示用的,呵呵,欢迎访问)
这是一条访问记录:
31.3.245.106 - - [25/Apr/2016:06:55:21 +0800] "CONNECT www.marathonbet.com:443 HTTP/1.1" 405 575 "https://www.marathonbet.com/en/live/26418" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:16.0) Gecko/20100101 Firefox/16.0"remote_ip: 31.3.245.106 ,记录来源的ip地址,通过ip地址我们可以得到地域
remote_time:“[25/Apr/2016:06:55:21 +0800]”,记录访问的时间和时区,通过对时间的提取,我们可以得到,每小时的PV,也可以结合IP得到,PV最多的IP
request:"CONNECT www.marathonbet.com:443 HTTP/1.1",请求方式是CONNECT(常见的是post和get),http版本是1.1
status:405,状态码,200表示请求成功
body_byte_sent:575,反馈的字节是575b,可以理解为当前页面的大小
see_url:www.marathonbet.com:443 ,表示访问的网页
user_agent: "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:16.0) Gecko/20100101 Firefox/16.0" 记录用户浏览器的相关信息
注:1:若要得到更多的信息,则要通过其他的手段去获取,通过js代码单独发送请求,使用cookies记录用户的访问信息,利用这些信息深入挖掘网站
2:我们可以利用apache自带的rotatelogs实现日志轮播,apache提供了将不把之日直接写入文件,而是同管道发送给另外一个程序的能力,这样做有很大的好处,我们可以充分利用这个机制对日志数据进行预处理,这个管道可以是任意程序,如日志分析,压缩日志等(当然还有其他很多的日志轮播工具,感兴趣的大家可以自己谷歌)
少量数据的情况,即单机可以处理时,我们可以利用linux自带的工具,如awk,grep,sort,join等再配合perl,pytho,正则表达式,基本就可以解决所有的问题
eg:我们想从上面提取的日志文件得到访问量最高的10个IP
cat access.log | awk '{a[$1]++} END {for(b in a) print b"\t"a[b]}' | sort -k2 -r | head -n 10
79.50.131.136 99
173.208.168.74 99
103.210.16.61 96
115.171.36.133 96
87.230.17.128 92
118.161.64.225 91
38.99.252.150 90
95.211.172.4 9
95.105.94.79 9
94.198.2.8 9
海量数据的情况,我们就不能使用单机来处理了,这时我们就需要hadoop并行计算框架和分布式文件存储系统来解决问题了
二:需求分析
1:日志提取预处理
我们可以在部署apache阶段直接对其日志输出进行管道处理,传送给程序,从而进行数据的预处理,当然我们也可以使用python脚本对日志文件进行预处理,这里我采用的是使用python将weblog按照日期写入不同的文件
处理之后显示为:
预处理之后的文件我们上传至HDFS,进行存储
2:KPI指标设计
针对上边的网站日志我们可以设计出以下的指标
a:pv(pageview),日页面访问量统计
b:ip:日页面独立ip的访问量统计
c:request:日请求方式次数统计
d:time:用户每小时的IP访问量
e:source:用户访问的设备统计
3:存储与展现
处理后的数据在web前端展示,也可以使用python画图进行描绘,当然也可以使用R画图展示
三:算法模型:Hadoop并行算法
结合一中的变量
KPI_OneIP_Sum:日来访IP数量统计和地域分布
Map输出:{key:ip+文件名,value:1}
Reduce输出:{key:ip,value:求和} //将不同日期的数据分别写入不同的文件
KPI_OnePV_Sum:指定页面访问次数统计
Map输出:{key:访问的页面+文件名,value:1}
Reduce输出:{key:访问的页面,value:求和}
KPI_OneRequest_Sum:来访请求方式统计
Map输出:{key:请求方式+文件名,value:1}
Reduce输出:{key:请求方式, value:求和}
KPI_OneTime_Sum:每小时的访问量统计
Map输出:{key:时间+文件名,value:1}
Reduce输出:{key:时间,value:sum}
KPI_OneSource_Sum:日用户访问设备统计
Map输出:{key:设备名+文件名,value:1}
Reduce输出:{key:设备名,value:求和sum}
四:架构设计:构建hadoop项目
代码截图
hdfsGYT.java:是我使用java对hdfs的封装
KPI_OneIP_Sum.java:日独立IP统计
KPI_OnePV_Sum.java:日访问量统计
KPI_OneRequest_Sum.java:日请求方式统计
KPI_OneResource_Sum.java:日访问设备统计
KPI_OneTime_Sum.java:日每小时访问量统计
KPIfilter.java:对日志行的解析类
KPIJob.java:任务调度函数
运行结果截图
五:程序实现:MR2V程序实现
在这里只对部分代码进行展示,更多代码请前往github下载:下载地址
1:KPIfilter.java(解析日志行的类)
package WebKPI; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashSet; import java.util.Locale; import java.util.Set; public class KPIfilter { //自定义错误计数器,在执行完程序时显示相应的错误条数 //由于web日志并不是规格的,存在部分数据不完整或者格式有问题,故设计计数 private static int numUser_agent = 0; //用户代理 private static int numStatus = 0; //访问状态码 private String remote_ip; //记录来源的ip地址,通过ip地址我们可以得到地域 private String remote_time; //记录访问的时间和时区 private String request; //记录请求方式 private String status; //网站请求状态码 private String body_byte_sent; //请求网页时反馈的字节大小 private String see_url; //表示从哪个页面连接过来 private String user_agent; //记录用户浏览的相关信息 public int getNumUser_agent() { return numUser_agent; } public static int getNumStatus() { return numStatus; } private boolean valid = true; //判断数据是否合法 @Override public String toString() { // TODO Auto-generated method stub StringBuilder sb = new StringBuilder(); sb.append("valid:" + this.valid); sb.append("\nremote_ip:" + this.remote_ip); sb.append("\nremote_time:" + this.remote_time); sb.append("\nrequest:" + this.request); sb.append("\nstatus:" + this.status); sb.append("\nbody_byte_sent:" + this.body_byte_sent); sb.append("\nsee_url:" + this.see_url); sb.append("\nuser_agent:" + this.user_agent); return sb.toString(); } //get remote_ip public String getRemote_ip() { return remote_ip; } //set remote_ip public void setRemote_ip(String remote_ip) { this.remote_ip = remote_ip; } public Date getTime_local_Date() throws ParseException { SimpleDateFormat df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US); return df.parse(this.remote_time); } public String getTime_local_Date_hour() throws ParseException{ SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH"); return df.format(this.getTime_local_Date()); } //get remote_time public String getRemote_time() { return remote_time; } //set remote_time,时间转化为Unix时间戳 public void setRemote_time(String remote_time) { this.remote_time = remote_time.substring(1); } //get request public String getRequest() { return request; } //set request public void setRequest(String request) { this.request = request.substring(1); } //get status public String getStatus() { return status; } //set status public void setStatus(String status) { this.status = status; } //get body_byte_sent public String getBody_byte_sent() { return body_byte_sent; } //set body_byte_sent public void setBody_byte_sent(String body_byte_sent) { this.body_byte_sent = body_byte_sent; } //get from_url public String getSee_url() { return see_url; } //set from_url public void setSee_url(String see_url) { this.see_url = see_url; } //get user_agent public String getUser_agent() { return user_agent; } //set user_agentl public void setUser_agent(String user_agent) { try{ this.user_agent = user_agent.substring(1); }catch(Exception e){ // e.printStackTrace(); System.out.println("user_agent is inlegal"); this.user_agent = "-"; this.numUser_agent ++; } } //get valid public boolean isValid() { return valid; } //set valid public void setValid(boolean valid) { this.valid = valid; } //解析每行日志 public static KPIfilter parser(String line) throws ParseException{ // System.out.println(line); KPIfilter kpi = new KPIfilter(); //声明一个KPIfilter的对象 String[] arr = line.split(" "); //日志数据并非是规则的,但最短长度为12,所以要大于11 if(arr.length>11){ try{ kpi.setRemote_ip(arr[0]); //设置IP kpi.setRemote_time(arr[3]); //设置时间 kpi.setRequest(arr[5]); //设置请求方式 kpi.setStatus(arr[8]); //设置返回的状态码 kpi.setBody_byte_sent(arr[9]); //设置返回的字节数 kpi.setSee_url(arr[6]); //设置来源页面 kpi.setUser_agent(arr[11]); //设置请求信息 // System.out.println(kpi); // SimpleDateFormat df = new SimpleDateFormat("yyyy.MM.dd:HH:mm:ss", Locale.US); // System.out.println(df.format(kpi.getTime_local_Date())); // System.out.println(kpi.getTime_local_Date_hour()); try{ if (Integer.parseInt(kpi.getStatus()) >= 400) {// 大于400,HTTP错误 kpi.setValid(false); } }catch(Exception e){ // e.printStackTrace(); System.out.println("Status is error"); kpi.setStatus(arr[9]); if (Integer.parseInt(kpi.getStatus()) >= 400) {// 大于400,HTTP错误 kpi.setValid(false); numStatus++; } } // }catch(Exception e){ // e.printStackTrace(); kpi.setValid(false); } }else{//如果长度小于12,则为不满足条件,设置valid为false kpi.setValid(false); } return kpi; } //按page的pv分类,过滤指定网页的浏览量 public static KPIfilter filterPVs(String line) throws ParseException { KPIfilter kpi = parser(line); Set pages =new HashSet(); pages.add("/213.238.172.248"); pages.add("/order-form/"); pages.add("/index.php"); pages.add("http://www.addamiele.it/"); pages.add("http://www.tianya.cn/ "); pages.add("http://www.google.com/"); if(pages.contains(kpi.getSee_url())) { kpi.setValid(true); }else{ kpi.setValid(false); } return kpi; } public static void main(String [] args) throws ParseException { String line = "31.3.245.106 - - [25/Apr/2016:06:55:21 +0800] \"CONNECT www.marathonbet.com:443 HTTP/1.1\" 405 575 \"https://www.marathonbet.com/en/live/26418\" \"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:16.0) Gecko/20100101 Firefox/16.0"; KPIfilter kpi = new KPIfilter(); kpi = kpi.parser(line); System.out.println(kpi.toString()); System.out.println(kpi.getTime_local_Date_hour()); } }
2:KPIJob.java(任务调度函数)
package WebKPI; import java.io.IOException; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; public class KPIJob { //定义全局变量 hdfs地址url public static final String HDFS = "hdfs://127.0.0.1:9000"; public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException, URISyntaxException { //定义一个map集合,存放程序中所需要的路径 Map <String, String> path= new HashMap<String, String>(); // path.put("local_path", "webLogKPI/weblog/access.log"); //本地目录 path.put("input_log", HDFS+"/mr/webLogKPI/log_files"); //hdfs上存放log的目录 path.put("output_oneip", HDFS + "/mr/webLogKPI/KPI_OneIP_Sum"); //hdfs上KPI_OneIP_Sum对应的输出文件 path.put("output_pv", HDFS + "/mr/webLogKPI/KPI_OnePV_Sum"); //hdfs上KPI_OnePV_Sum对应的输出文件 path.put("output_request",HDFS+"/mr/webLogKPI/KPI_OneRequest_Sum"); //hdfs 上KPI_OneRequest_Sum对应的输出文件 path.put("output_time", HDFS+"/mr/webLogKPI/KPI_OneTime_Sum"); //hdfs上KPI_OneTime_Sum对应的输出文件 path.put("output_source", HDFS+"/mr/webLogKPI/KPI_OneResource_Sum"); //hdfs上KPI_OneResource_Sum对应的输出文件 KPI_OneIP_Sum.main(path); //计算独立IP访问量 KPI_OnePV_Sum.main(path); //计算PV访问量 KPI_OneRequest_Sum.main(path); //获得请求方式 KPI_OneTime_Sum.main(path); //每小时的PV KPI_OneSource_Sum.main(path); //日访问设备统计 System.exit(0); } }
3:KPI_OneIP_Sum.java(日独立ip访问量统计)
package WebKPI; import java.io.IOException; import java.net.URISyntaxException; import java.text.ParseException; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /* * 2016.04.17——2016.04.30期间的数据 * 分别统计每天的ip独立访问数目 */ public class KPI_OneIP_Sum { private static KPIfilter kpi = new KPIfilter(); private static class IPMapper extends Mapper<LongWritable,Text,Text,LongWritable>{ private Text ipK = new Text(); private LongWritable ipV = new LongWritable(1); String filename; public void setup(Context context) throws IOException,InterruptedException { // TODO Auto-generated method stub //获取文件名 InputSplit inputSplit = context.getInputSplit(); filename = ((FileSplit) inputSplit).getPath().getName(); filename = (String) filename.subSequence(0,11); System.out.println(filename); } public void map(LongWritable key,Text value, Context context) throws IOException, InterruptedException{ // System.out.println(value.toString()); try { kpi = KPIfilter.parser(value.toString()); //在进行判断时 if(kpi.isValid()){ ipK.set(kpi.getRemote_ip() + "\t" + filename); // System.out.println(ipK + "===" + ipV); context.write(ipK, ipV); } } catch (ParseException e) { // TODO Auto-generated catch block // e.printStackTrace(); System.out.println("this some error"); } } } public static class IPReducer extends Reducer<Text,LongWritable,Text,LongWritable>{ private Text ipK2 = new Text(); private LongWritable ipV2 = new LongWritable(); //声明mos变量,用来将来自不同文件的ip统计写入到不同的文件中 private MultipleOutputs<Text, LongWritable> mos; //setup函数 protected void setup(Context context) throws IOException,InterruptedException { // TODO Auto-generated method stub mos = new MultipleOutputs<Text, LongWritable>(context); } public void reduce(Text key, Iterable<LongWritable>values, Context context ) throws IOException, InterruptedException{ String[] arr = key.toString().split("\t"); String filename = arr[1].replace("-",""); //该数据来自哪个文件, 因为文件命名时不能出现-,所有都去掉 // System.out.println(filename); //统计每个文件下每个IP出现的次数 int num=0; for (LongWritable longWritable : values) { num += longWritable.get(); } ipK2.set(arr[0]); ipV2.set(num); // System.out.println(filename + "______________" + ipK2 + "===========" + ipV2); mos.write(filename, ipK2, ipV2); } //cleanup函数 关闭mos public void cleanup(Context context) throws IOException,InterruptedException { mos.close(); } } public static void main(Map<String, String> path) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { // String local_input = path.get("local_path"); //存放log的本地目录 String hdfs_input = path.get("input_log"); //上传weblog到hdfs的目录 String hdfs_output = path.get("output_oneip"); //运行该job的输出目录 hdfsGYT hdfs = new hdfsGYT(); // hdfs.rmr(hdfs_input); //删除hdfs上weblog的存放目录 hdfs.rmr(hdfs_output); //删除hdfs上任务的输出目录 // hdfs.put(local_input, hdfs_input); //将weblog从本地上传至hdfs Job job = new Job(new Configuration(),"OneIP_Sum"); job.setJarByClass(KPI_OneIP_Sum.class); job.setMapperClass(IPMapper.class); job.setReducerClass(IPReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); MultipleOutputs.addNamedOutput(job, "17Apr2016", TextOutputFormat.class, Text.class, LongWritable.class); MultipleOutputs.addNamedOutput(job, "18Apr2016", TextOutputFormat.class, Text.class, LongWritable.class); MultipleOutputs.addNamedOutput(job, "19Apr2016", TextOutputFormat.class, Text.class, LongWritable.class); MultipleOutputs.addNamedOutput(job, "20Apr2016", TextOutputFormat.class, Text.class, LongWritable.class); MultipleOutputs.addNamedOutput(job, "21Apr2016", TextOutputFormat.class, Text.class, LongWritable.class); MultipleOutputs.addNamedOutput(job, "22Apr2016", TextOutputFormat.class, Text.class, LongWritable.class); MultipleOutputs.addNamedOutput(job, "23Apr2016", TextOutputFormat.class, Text.class, LongWritable.class); MultipleOutputs.addNamedOutput(job, "24Apr2016", TextOutputFormat.class, Text.class, LongWritable.class); MultipleOutputs.addNamedOutput(job, "25Apr2016", TextOutputFormat.class, Text.class, LongWritable.class); MultipleOutputs.addNamedOutput(job, "26Apr2016", TextOutputFormat.class, Text.class, LongWritable.class); MultipleOutputs.addNamedOutput(job, "27Apr2016", TextOutputFormat.class, Text.class, LongWritable.class); MultipleOutputs.addNamedOutput(job, "28Apr2016", TextOutputFormat.class, Text.class, LongWritable.class); MultipleOutputs.addNamedOutput(job, "29Apr2016", TextOutputFormat.class, Text.class, LongWritable.class); MultipleOutputs.addNamedOutput(job, "30Apr2016", TextOutputFormat.class, Text.class, LongWritable.class); FileInputFormat.addInputPath(job, new Path(hdfs_input)); FileOutputFormat.setOutputPath(job, new Path(hdfs_output)); //提交作业 job.waitForCompletion(true); // System.out.println("User_agent Error:" + kpi.getNumUser_agent()); System.out.println("Status Error:" + kpi.getStatus()); } }
运行结果截图:
六:结果可视化
可以采用Python的matplotlib做图展示,或者使用R,或者H5的canvas做图展示
下面只对日每小时访问量做图展示,其他的类似,感兴趣的朋友可以自己尝试着做图
In [21]: import numpy as np In [22]: import pylab as pl In [23]: x = [] In [24]: for i in range(24): x.append(str(2016041900 + i)) ....: In [25]: y=[2043,3293,1480,113,6841,5678,492,523,550,81,20,7,12,15,4,3,5,9,7,4,13,10,4,32] In [26]: pl.plot(x, y) Out[26]: [<matplotlib.lines.Line2D at 0x7f3ff8e85e10>] In [27]: pl.show()
结果图: