大数据实战:用户流量分析系统

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: --------------------------------------------------------------------------------------------------------------- [版权申明:本文系作者原创,转载请注明出处] 文章出处:http://blog.csdn.net/sdksdk0/article/details/5

---------------------------------------------------------------------------------------------------------------

[版权申明:本文系作者原创,转载请注明出处]

文章出处:http://blog.csdn.net/sdksdk0/article/details/51628874

作者:朱培

---------------------------------------------------------------------------------------------------------------


本文是结合hadoop中的mapreduce来对用户数据进行分析,统计用户的手机号码、上行流量、下行流量、总流量的信息,同时可以按照总流量大小对用户进行分组排序等。是一个非常简洁易用的hadoop项目,主要用户进一步加强对MapReduce的理解及实际应用。文末提供源数据采集文件和系统源码。

本案例非常适合hadoop初级人员学习以及想入门大数据、云计算、数据分析等领域的朋友进行学习。

一、待分析的数据源

以下是一个待分析的文本文件,里面有非常多的用户浏览信息,保扩用户手机号码,上网时间,机器序列号,访问的IP,访问的网站,上行流量,下行流量,总流量等信息。这里只截取一小段,具体文件在文末提供下载链接。


二、基本功能实现

想要统计出用户的上行流量、下行流量、总流量信息,我们需要建立一个bean类来对数据进行封装。于是新建应该Java工程,导包,或者直接建立一个MapReduce工程。在这里面建立一个FlowBean.java文件。

        private long upFlow;
	private long dFlow;
	private long sumFlow;
然后就是各种右键生成get,set方法,还要toString(),以及生成构造函数,(千万记得要生成一个空的构造函数,不然后面进行分析的时候会报错)。
完整代码如下:
package cn.tf.flow;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

public class FlowBean  implements WritableComparable<FlowBean>{
	
	private long upFlow;
	private long dFlow;
	private long sumFlow;
	public long getUpFlow() {
		return upFlow;
	}
	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}
	public long getdFlow() {
		return dFlow;
	}
	public void setdFlow(long dFlow) {
		this.dFlow = dFlow;
	}
	public long getSumFlow() {
		return sumFlow;
	}
	public void setSumFlow(long sumFlow) {
		this.sumFlow = sumFlow;
	}
	public FlowBean(long upFlow, long dFlow) {
		super();
		this.upFlow = upFlow;
		this.dFlow = dFlow;
		this.sumFlow = upFlow+dFlow;
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		upFlow=in.readLong();
		dFlow=in.readLong();
		sumFlow=in.readLong();
		
	}
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(upFlow);
		out.writeLong(dFlow);
		out.writeLong(sumFlow);
	}
	public FlowBean() {
		super();
	}

	@Override
	public String toString() {
		 
		return  upFlow + "\t" + dFlow + "\t" + sumFlow;
	}
	@Override
	public int compareTo(FlowBean o) {
		
		return this.sumFlow>o.getSumFlow() ? -1:1;
	}
	
	

}

然后就是这个统计的代码了,新建一个FlowCount.java.在这个类里面,我直接把Mapper和Reduce写在同一个类里面了,如果按规范的要求应该是要分开写的。
在mapper中,获取后面三段数据的值,所以我的这里length-2,length-3.
       public static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

			// 拿到这行的内容转成string
			String line = value.toString();

			String[] fields = StringUtils.split(line, "\t");
			try {
				if (fields.length > 3) {
					// 获得手机号及上下行流量字段值
					String phone = fields[1];
					long upFlow = Long.parseLong(fields[fields.length - 3]);
					long dFlow = Long.parseLong(fields[fields.length - 2]);

					// 输出这一行的处理结果,key为手机号,value为流量信息bean
					context.write(new Text(phone), new FlowBean(upFlow, dFlow));
				} else {
					return;
				}
			} catch (Exception e) {

			}

		}

	}

 
 
在reduce中队数据进行整理,统计
public static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

		@Override
		protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {

			long upSum = 0;
			long dSum = 0;

			for (FlowBean bean : values) {

				upSum += bean.getUpFlow();
				dSum += bean.getdFlow();
			}

			FlowBean resultBean = new FlowBean(upSum, dSum);
			context.write(key, resultBean);

		}

	}


最后在main方法中调用执行。
public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(FlowCount.class);

		job.setMapperClass(FlowCountMapper.class);
		job.setReducerClass(FlowCountReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlowBean.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		boolean res = job.waitForCompletion(true);
		System.exit(res ? 0 : 1);

	}
当然啦,还需要先在你的hdfs根目录中建立/flow/data数据,然后我那个用户的数据源上传上去。
 bin/hadoop fs -mkdir -p /flow/data
 bin/hadoop fs -put HTTP_20130313143750.dat /flow/data
 bin/hadoop jar  ../lx/flow.jar

把上面这个MapReduce工程打包成一个jar文件,然后用hadoop来执行这个jar文件。例如我放在~/hadoop/lx/flow.jar,然后再hadoop安装目录中执行
bin/hadoop jar ../lx/flowsort.jar cn/tf/flow/FlowCount  /flow/data  /flow/output

最后执行结果如下:



在这整过过程中,我们是有yarnchild的进程在执行的,如下图所示:当整个过程执行完毕之后yarnchild也会自动退出。

三、按总流量从大到小排序

如果你上面这个基本操作以及完成了的话,按总流量排序就非常简单了。我们新建一个FlowCountSort.java.

全部代码如下:

package cn.tf.flow;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowCountSort {

public static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
		
		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			
			String line=value.toString();
			String[] fields=StringUtils.split(line,"\t");
			
			String phone=fields[0];
			long upSum=Long.parseLong(fields[1]);
			long dSum=Long.parseLong(fields[2]);
			
			FlowBean sumBean=new FlowBean(upSum,dSum);
			
			context.write(sumBean, new Text(phone));
		
		}	
}

	public static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{
		
		//进来的“一组”数据就是一个手机的流量bean和手机号
		@Override
		protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
	
			context.write(values.iterator().next(), key);
		}
	}

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(FlowCountSort.class);

		job.setMapperClass(FlowCountSortMapper.class);
		job.setReducerClass(FlowCountSortReducer.class);

		job.setMapOutputKeyClass(FlowBean.class);
		job.setMapOutputValueClass(Text.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		boolean res = job.waitForCompletion(true);
		System.exit(res ? 0 : 1);

	}
	
}

这个主要就是使用了FlowBean.java中的代码来实现的,主要是继承了WritableComparable<FlowBean>接口来实现,然后重写了compareTo()方法。

@Override
	public int compareTo(FlowBean o) {
		
		return this.sumFlow>o.getSumFlow() ? -1:1;
	}
	
按照同样的方法对这个文件打成jar包,然后使用hadoop的相关语句进行执行就可以了。

bin/hadoop jar ../lx/flowsort.jar cn/tf/flow/FlowCountSort  /flow/output  /flow/sortoutput
结果图:




四、按用户号码区域进行分类

流量汇总之后的结果需要按照省份输出到不同的结果文件中,需要解决两个问题:

 1、如何让mr的最终结果产生多个文件: 原理:MR中的结果文件数量由reduce
  task的数量绝对,是一一对应的 做法:在代码中指定reduce task的数量
 
 
  2、如何让手机号进入正确的文件 原理:让不同手机号数据发给正确的reduce task,就进入了正确的结果文件
  要自定义MR中的分区partition的机制(默认的机制是按照kv中k的hashcode%reducetask数)
  做法:自定义一个类来干预MR的分区策略——Partitioner的自定义实现类

主要代码与前面的排序是非常类似的,只要在main方法中添加如下两行代码就可以了。

          //指定自定义的partitioner
		job.setPartitionerClass(ProvincePartioner.class);
		
		job.setNumReduceTasks(5);
		

这里我们需要新建一个ProvincePartioner.java来处理号码分类的逻辑。

public class ProvincePartioner extends Partitioner<Text, FlowBean>{
	
	
private static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>();
	
	static {
		
		provinceMap.put("135", 0);
		provinceMap.put("136", 1);
		provinceMap.put("137", 2);
		provinceMap.put("138", 3);		
	}
	
	@Override
	public int getPartition(Text key, FlowBean value, int numPartitions) {

		String prefix = key.toString().substring(0, 3);
		Integer partNum = provinceMap.get(prefix);
		if(partNum == null) partNum=4;
		
		return partNum;
	}

}

执行方法和前面也是一样的。从执行的流程中我们可以看到这里启动了5个reduce task,因为我这里数据量比较小,所以只启动了一个map task。



到这里,整个用户流量分析系统就全部结束了。关于大数据的更多内容,欢迎关注。点击左上角头像下方“点击关注".感谢您的支持!



数据源下载地址:http://download.csdn.net/detail/sdksdk0/9545935

源码项目地址:https://github.com/sdksdk0/HDFS_MapReduce




相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
6天前
|
传感器 人工智能 大数据
高科技生命体征探测器、情绪感受器以及传感器背后的大数据平台在健康监测、生命体征检测领域的设想与系统构建
本系统由健康传感器、大数据云平台和脑机接口设备组成。传感器内置生命体征感应器、全球无线定位、人脸识别摄像头等,搜集超出现有科学认知的生命体征信息。云平台整合大数据、云计算与AI,处理并传输数据至接收者大脑芯片,实现实时健康监测。脑机接口设备通过先进通讯技术,实现对健康信息的实时感知与反馈,确保身份验证与数据安全。
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
204 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
存储 机器学习/深度学习 SQL
大数据处理与分析技术
大数据处理与分析技术
184 2
|
30天前
|
机器学习/深度学习 数据可视化 大数据
机器学习与大数据分析的结合:智能决策的新引擎
机器学习与大数据分析的结合:智能决策的新引擎
156 15
|
1月前
|
SQL 分布式计算 DataWorks
DataWorks产品测评|基于DataWorks和MaxCompute产品组合实现用户画像分析
本文介绍了如何使用DataWorks和MaxCompute产品组合实现用户画像分析。首先,通过阿里云官网开通DataWorks服务并创建资源组,接着创建MaxCompute项目和数据源。随后,利用DataWorks的数据集成和数据开发模块,将业务数据同步至MaxCompute,并通过ODPS SQL完成用户画像的数据加工,最终将结果写入`ads_user_info_1d`表。文章详细记录了每一步的操作过程,包括任务开发、运行、运维操作和资源释放,帮助读者顺利完成用户画像分析。此外,还指出了文档中的一些不一致之处,并提供了相应的解决方法。
|
1月前
|
分布式计算 DataWorks 搜索推荐
用户画像分析(MaxCompute简化版)
通过本教程,您可以了解如何使用DataWorks和MaxCompute产品组合进行数仓开发与分析,并通过案例体验DataWorks数据集成、数据开发和运维中心模块的相关能力。
|
2月前
|
机器学习/深度学习 存储 大数据
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系,保留最大方差信息,实现数据压缩、去噪及可视化。本文详解PCA原理、步骤及其Python实现,探讨其在图像压缩、特征提取等领域的应用,并指出使用时的注意事项,旨在帮助读者掌握这一强大工具。
138 4
|
2月前
|
关系型数据库 分布式数据库 数据库
PolarDB 以其出色的性能和可扩展性,成为大数据分析的重要工具
在数字化时代,企业面对海量数据的挑战,PolarDB 以其出色的性能和可扩展性,成为大数据分析的重要工具。它不仅支持高速数据读写,还通过数据分区、索引优化等策略提升分析效率,适用于电商、金融等多个行业,助力企业精准决策。
47 4
|
2月前
|
机器学习/深度学习 分布式计算 算法
【大数据分析&机器学习】分布式机器学习
本文主要介绍分布式机器学习基础知识,并介绍主流的分布式机器学习框架,结合实例介绍一些机器学习算法。
344 5
|
2月前
|
存储 监控 数据挖掘
【Clikhouse 探秘】ClickHouse 物化视图:加速大数据分析的新利器
ClickHouse 的物化视图是一种特殊表,通过预先计算并存储查询结果,显著提高查询性能,减少资源消耗,适用于实时报表、日志分析、用户行为分析、金融数据分析和物联网数据分析等场景。物化视图的创建、数据插入、更新和一致性保证通过事务机制实现。
303 14

热门文章

最新文章