3-网站日志分析案例-MapReduce执行日志清洗
准备环境:
Linux环境
Windows环境
均做了调试
本文代码是基于window开发,因为数据量较大时,相比虚拟机,本地运行更顺畅些,还不是没钱买服务器。。。
1.数据介绍
1.1 数据情况回顾
参考:https://www.cnblogs.com/edisonchou/p/4449082.html
该论坛数据有两部分:
(1)历史数据约56GB,统计到2012-05-29。这也说明,在2012-05-29之前,日志文件都在一个文件里边,采用了追加写入的方式。
(2)自2013-05-30起,每天生成一个数据文件,约150MB左右。这也说明,从2013-05-30之后,日志文件不再是在一个文件里边。
图2展示了该日志数据的记录格式,其中每行记录有5部分组成:访问者IP、访问时间、访问资源、访问状态(HTTP状态码)、本次访问流量。
图2 日志记录数据格式
1.2 要清理的数据
(1)根据前面的关键指标的分析,我们所要统计分析的均不涉及到访问状态(HTTP状态码)以及本次访问的流量,于是我们首先可以将这两项记录清理掉;
(2)根据日志记录的数据格式,我们需要将日期格式转换为平常所见的普通格式如20150426这种,于是我们可以写一个类将日志记录的日期进行转换;
(3)由于静态资源的访问请求对我们的数据分析没有意义,于是我们可以将"GET /staticsource/"开头的访问记录过滤掉,又因为GET和POST字符串对我们也没有意义,因此也可以将其省略掉;
2.基于IDEA创建Maven工程
工程位于G:\ideaproject\etl下,Maven的GAV坐标为
<groupId>edu.sx</groupId> <artifactId>etl</artifactId> <version>1.0-SNAPSHOT</version>
在Maven工程下的pom.xm中的标签下配置hadoop依赖,注意标签<dependencies>和<dependency>的区别
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.1.2</version> <!-- <version>1.2.3</version> --> </dependency>
添加打包插件
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>2.6</version> <configuration> <archive> <manifest> <mainClass>com.mystudy.hadoopPro.APP</mainClass> </manifest> </archive> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.1.1</version> <!-- 可以打 fat 和thin jar--> <configuration> <!-- <archive>--> <!-- <manifest>--> <!-- <mainClass>org.example.HttpClientTest</mainClass>--> <!-- </manifest>--> <!-- </archive>--> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <encoding>UTF-8</encoding> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
右键pom.xml文件–maven–reload project
会在external libraries中看到新导入的依赖
3.日志清洗
创建日志清洗类
edu.sx.etl.LogParser
package edu.sx.etl; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Locale; public class LogParser { public static final SimpleDateFormat FORMAT = new SimpleDateFormat( "d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH); public static final SimpleDateFormat dateformat1 = new SimpleDateFormat( "yyyyMMddHHmmss"); public static void main(String[] args) throws ParseException { final String S1 = "27.19.74.143 - - [30/May/2013:17:38:20 +0800] \"GET /static/image/common/faq.gif HTTP/1.1\" 200 1127"; LogParser parser = new LogParser(); final String[] array = parser.parse(S1); System.out.println("样例数据: " + S1); System.out.format( "解析结果: ip=%s, time=%s, url=%s, status=%s, traffic=%s", array[0], array[1], array[2], array[3], array[4]); } /** * 解析英文时间字符串 * * @param string * @return * @throws ParseException */ private Date parseDateFormat(String string) { Date parse = null; try { parse = FORMAT.parse(string); } catch (ParseException e) { e.printStackTrace(); } return parse; } /** * 解析日志的行记录 * * @param line * @return 数组含有5个元素,分别是ip、时间、url、状态、流量 */ public String[] parse(String line) { String ip = parseIP(line); String time = parseTime(line); String url = parseURL(line); String status = parseStatus(line); String traffic = parseTraffic(line); return new String[] { ip, time, url, status, traffic }; } private String parseTraffic(String line) { final String trim = line.substring(line.lastIndexOf("\"") + 1) .trim(); String traffic = trim.split(" ")[1]; return traffic; } private String parseStatus(String line) { final String trim = line.substring(line.lastIndexOf("\"") + 1) .trim(); String status = trim.split(" ")[0]; return status; } private String parseURL(String line) { final int first = line.indexOf("\""); final int last = line.lastIndexOf("\""); String url = line.substring(first + 1, last); return url; } private String parseTime(String line) { final int first = line.indexOf("["); final int last = line.indexOf("+0800]"); String time = line.substring(first + 1, last).trim(); Date date = parseDateFormat(time); return dateformat1.format(date); } private String parseIP(String line) { String ip = line.split("- -")[0].trim(); return ip; } }
创建MR
edu.sx.etl.LogCleanJob
package edu.sx.etl; import java.io.File; import java.io.IOException; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class LogCleanJob { private static boolean deleteDir(File dir){ if (dir.isDirectory()) { String[] children = dir.list(); //递归删除目录中的子目录下 for (int i=0; i<children.length; i++) { boolean success = deleteDir(new File(dir, children[i])); if (!success) { return false; } } } // 目录此时为空,可以删除 return dir.delete(); } public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); final Job job = Job.getInstance(new Configuration(), LogCleanJob.class.getSimpleName()); // 在 Hadoop 服务器上执行, // String frameWork = "yarn"; // String inputPath = "/sx/access.log"; // String outputPath = "/sx/output/"; // 在你自己的电脑 本地运行 String frameWork = "local"; String inputPath = "D:\\Hadoop\\input\\*"; String outputPath = "D:\\Hadoop\\output"; // 设置为可以打包运行 job.setJarByClass(LogCleanJob.class); // FileInputFormat.setInputPaths(job, args[0]); FileInputFormat.setInputPaths(job, inputPath); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // FileOutputFormat.setOutputPath(job, new Path(args[1])); FileOutputFormat.setOutputPath(job, new Path(outputPath)); // 删除HDFS上的输出目录 // FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"),conf); // Path outPath = new Path(args[1]); // Path outPath = new Path(outputPath); // if (fs.exists(outPath)) { // fs.delete(outPath, true); // } //删除Window上的输出文件夹 File file = new File(outputPath); boolean successdeleteDir = deleteDir(file); if (successdeleteDir) { System.out.println("Successfully deleted populated directory: " + outputPath); } else { System.out.println("Failed to delete populated directory: " + outputPath); } boolean success = job.waitForCompletion(true); if(success){ System.out.println("Clean process success!"); } else{ System.out.println("Clean process failed!"); } } static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> { LogParser logParser = new LogParser(); Text outputValue = new Text(); protected void map( LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException { final String[] parsed = logParser.parse(value.toString()); // step1.过滤掉静态资源访问请求 if (parsed[2].startsWith("GET /static/") || parsed[2].startsWith("GET /uc_server")) { return; } // step2.过滤掉开头的指定字符串 if (parsed[2].startsWith("GET /")) { parsed[2] = parsed[2].substring("GET /".length()); } else if (parsed[2].startsWith("POST /")) { parsed[2] = parsed[2].substring("POST /".length()); } // step3.过滤掉结尾的特定字符串 if (parsed[2].endsWith(" HTTP/1.1")) { parsed[2] = parsed[2].substring(0, parsed[2].length() - " HTTP/1.1".length()); } // step4.只写入前三个记录类型项 outputValue.set(parsed[0] + "\t" + parsed[1] + "\t" + parsed[2]); context.write(key, outputValue); } } static class MyReducer extends Reducer<LongWritable, Text, Text, NullWritable> { protected void reduce( LongWritable k2, java.lang.Iterable<Text> v2s, org.apache.hadoop.mapreduce.Reducer<LongWritable, Text, Text, NullWritable>.Context context) throws java.io.IOException, InterruptedException { for (Text v2 : v2s) { context.write(v2, NullWritable.get()); } }; } }
导入HDFS
清洗后的数据位于d:\hadoop\output中,将文件上传到linux中的hdfs上,路径为/sx/cleandlog
hadoop fs -mkdir /sx/cleandlog hadoop fs -put part-r-00000 /sx/cleandlog
4.问题解决
问题1:
(null) entry in command string: null chmod 0700 G:\
解决办法:
将hadoop/bin下的hadoop.dll和winutils.exe拷贝到C:\Windows\System32下,然后重新加载IDEA项目,再次打开,运行即可解决此异常
问题2:
使用idea本地运行mapreduce程序,控制台log4j日志没有打印出来,可以这样解决
解决办法:
我们的项目中没有找到log4j.properties或者log4j.xml等默认的配置文件。
解决:
加上一个 log4j.properties 文件,在 官网上 copy 一个例子过来:
# Set root logger level to DEBUG and its only appender to A1. log4j.rootLogger=DEBUG, A1 # A1 is set to be a ConsoleAppender. log4j.appender.A1=org.apache.log4j.ConsoleAppender # A1 uses PatternLayout. log4j.appender.A1.layout=org.apache.log4j.PatternLayout log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
总结
本文网站日志分析案例中的第3部分。本文主要实现日志的清洗,将54w条日志数据进行清洗,清洗后得到17w条数据。