【大数据学习篇8】 热门品类Top10分析(下)

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 【大数据学习篇8】 热门品类Top10分析

创建项目目录


在项目SparkProject中新建Package包。


4479045f745c49aabe9daa4f19b7337f.png


       在“New Package”窗口的文本输入框“Enter new package name”中输入“cn.itcast.top10”设置Package名称,用于存放实现热门品类Top10分析的类文件。


4758cb5e6b5c4774b98691d7a3099d12.png


在Package包“cn.itcast.top10”新建类。


ee06472e9d4f4ce1b95cb8d1eb68c462.png


       在“Create New Class”窗口的文本输入框“Name”中输入“CategoryTop10”设置类名称,在类中实现热门品类Top10分析。


c3ce975303df4bb6be234319fc577161.png


3.2创建Spark连接并读取数据集

       在类CategoryTop10中定义main()方法,该方法是Java程序执行的入口,在main()方法中实现Spark Core程序。


public class CategoryTop10 {     public static void main(String[] arg){     //实现热门品类Top10分析     } }


       在main()方法中,创建JavaSparkContext和SparkConf对象,JavaSparkContext对象用于实现Spark程序,SparkConf对象用于配置Spark程序相关参数。


SparkConf conf = new SparkConf(); //设置Application名称为top3_area_product conf.setAppName("top10_category"); JavaSparkContext sc = new JavaSparkContext(conf);


        在main()方法中,调用JavaSparkContext对象的textFile()方法读取外部文件,将文件中的数据加载到textFileRDD。


JavaRDD<String> textFileRDD = sc.textFile(arg[0]);


3.3 获取业务数据

       在main()方法中,使用mapToPair()算子转换textFileRDD的每一行数据,用于获取每一行数据中的行为类型和品类ID数据,将转换结果加载到transProductRDD。


JavaPairRDD<Tuple2<String,String>,Integer> transformRDD =     textFileRDD.mapToPair(new PairFunction<String,Tuple2<String, String>, Integer>() {     @Override     public Tuple2<Tuple2<String, String>, Integer> call(String s) throws Exception {         JSONObject json = JSONObject.parseObject(s);         String category_id = json.getString("category_id");         String event_type = json.getString("event_type");         return new Tuple2<>(new Tuple2<>(category_id,event_type), new Integer(1));     } });


3.4 统计品类的行为类型

       在main()方法中,使用reduceByKey()算子对transformRDD进行聚合操作,用于统计每个品类中商品被查看、加入购物车和购买的次数,将统计结果加载到aggregationRDD。


JavaPairRDD<Tuple2<String, String>, Integer> aggregationRDD =         transformRDD.reduceByKey(                 new Function2<Integer, Integer, Integer>() {     @Override     public Integer call(Integer integer1, Integer integer2)             throws Exception {         return integer1 + integer2;     } });


3.5 过滤品类的行为类型

在main()方法中,首先使用filter()算子过滤aggregationRDD每一行数据中行为类型为加入购物车和购买的数据,只保留行为类型为查看的数据,然后使用mapToPair()算子对过滤结果进行转换,获取每一行数据中品类被查看次数和品类ID数据,最终将转换结果加载到getViewCategoryRDD。


JavaPairRDD<String,Integer> getViewCategoryRDD =aggregationRDD .filter(new Function<Tuple2<Tuple2<String, String>, Integer>, Boolean>() {         @Override         public Boolean call(Tuple2<Tuple2<String, String>, Integer> tuple2) throws Exception {             String action = tuple2._1._2;             return action.equals("view");         }     }).mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, Integer>() {                 @Override                 public Tuple2<String, Integer> call(Tuple2<Tuple2<String, String>, Integer> tuple2)throws Exception {                     return new Tuple2<>(tuple2._1._1,tuple2._2);                 }             });


       在main()方法中,首先使用filter()算子过滤aggregationRDD每一行数据中行为类型为查看和购买的数据,只保留行为类型为加入购物车的数据,然后使用mapToPair()算子对过滤结果进行转换,获取每一行数据中品类被加入购物车次数和品类ID数据,最终将转换结果加载到getCartCategoryRDD。


JavaPairRDD<String,Integer> getCartCategoryRDD = aggregationRDD         .filter(new Function<Tuple2<Tuple2<String, String>, Integer>, Boolean>() {             @Override             public Boolean call(Tuple2<Tuple2<String, String>, Integer> tuple2) throws Exception {                 String action = tuple2._1._2;                 return action.equals("cart");             }         }).mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, Integer>() {             @Override             public Tuple2<String, Integer> call(Tuple2<Tuple2<String, String>, Integer> tuple2) throws Exception {                 return new Tuple2<>(tuple2._1._1,tuple2._2);             }         });


       在main()方法中,首先使用filter()算子过滤aggregationRDD每一行数据中行为类型为查看和加入购物车的数据,只保留行为类型为购买的数据,然后使用mapToPair()算子对过滤结果进行转换,获取每一行数据中品类被购买次数和品类ID数据,最终将转换结果加载到getPurchaseCategoryRDD。


JavaPairRDD<String,Integer> getPurchaseCategoryRDD = aggregationRDD     .filter(new Function<Tuple2<Tuple2<String, String>, Integer>, Boolean>() {         @Override         public Boolean call(Tuple2<Tuple2<String, String>, Integer> tuple2) throws Exception {             String action = tuple2._1._2;             return action.equals("purchase");         }     }).mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, Integer>() {         @Override         public Tuple2<String, Integer> call(Tuple2<Tuple2<String, String>, Integer> tuple2) throws Exception {             return new Tuple2<>(tuple2._1._1,tuple2._2);         }     });


3.6  合并相同品类的行为类型

       在main()方法中,使用leftOuterJoin(左外连接)算子合并getViewCategoryRDD、getCartCategoryRDD和getPurchaseCategoryRDD,用于合并同一品类的查看次数、加入购物车次数和购买次数,将合并结果加载到joinCategoryRDD。


JavaPairRDD<String,Tuple2<Integer, Optional<Integer>>> tmpJoinCategoryRDD         =getViewCategoryRDD.leftOuterJoin(getCartCategoryRDD); JavaPairRDD<String,Tuple2<Tuple2<Integer, Optional<Integer>>,Optional<Integer>>> joinCategoryRDD         = tmpJoinCategoryRDD.leftOuterJoin(getPurchaseCategoryRDD);


Optional类是一个包含有可选值的包装类,它既可以含有对象也可以为空,主要为了解决空指针异常的问题,因为某些品类中的商品可能被查看但并未被购买或加入购物车。


3.7  根据品类的行为类型进行排序

       在包“cn.itcast.top10”中创建文件CategorySortKey.java,用于实现自定义排序。在类CategorySortKey中继承比较器接口Comparable和序列化接口Serializable,并实现Comparable接口的compareTo()方法。


import java.io.Serializable; public class CategorySortKey implements Comparable<CategorySortKey>,Serializable{      ......     @Override     public int compareTo(CategorySortKey other) {         if(viewCount - other.getViewCount() != 0) {             return (int) (viewCount - other.getViewCount());         } else if(cartCount - other.getCartCount() != 0) {             return (int) (cartCount - other.getCartCount());         } else if(purchaseCount - other.getPurchaseCount() != 0) {             return (int) (purchaseCount - other.getPurchaseCount());         }         return 0;     } }


       在main()方法中,使用mapTopair()算子转换joinCategoryRDD,将joinCategoryRDD中品类被查看次数、加入购物车次数和购买次数映射到自定义排序类CategorySortKey,通过transCategoryRDD加载转换结果。


JavaPairRDD<CategorySortKey,String> transCategoryRDD = joinCategoryRDD .mapToPair(new PairFunction<Tuple2<String, Tuple2<Tuple2<Integer, Optional<Integer>>,Optional<Integer>>>,CategorySortKey,String>() {      @Override      public Tuple2<CategorySortKey,String> call(Tuple2<String, Tuple2<Tuple2<Integer, Optional<Integer>>, Optional<Integer>>> tuple2) throws Exception {                    String category_id = tuple2._1;                    int viewcount = tuple2._2._1._1;                    int cartcount = 0;                    int purchasecount = 0;                    if (tuple2._2._1._2.isPresent()){  cartcount = tuple2._2._1._2.get().intValue();}                    if (tuple2._2._2.isPresent()){ purchasecount = tuple2._2._2.get().intValue(); }                    CategorySortKey sortKey = new CategorySortKey(viewcount, cartcount, purchasecount);                    return new Tuple2<>(sortKey,category_id);                }            });


       在main()方法中,通过sortByKey()算子对transCategoryRDD进行排序操作,使transCategoryRDD中品类被查看次数、加入购物车次数和购买次数根据自定义排序类CategorySortKey指定的排序规则进行排序,将排序结果加载到sortedCategoryRDD。


JavaPairRDD<CategorySortKey,String> sortedCategoryRDD = transCategoryRDD.sortByKey(false);


       在main()方法中,使用take()算子获取sortedCategoryRDD前10个元素,即热门品类Top10分析结果,将分析结果加载到top10CategoryList。


List<Tuple2<CategorySortKey, String>> top10CategoryList = sortedCategoryRDD.take(10);


3.3.8  数据持久化

封装工具类:


(1)在项目SparkProject的 java目录新建Package包“cn.itcast.hbase”,用于存放实现数据持久化的Java文件。在包“cn.itcast.hbase”下创建文件HbaseConnect.java,用于实现封装HBase数据库连接工具类,在类中实现连接HBase数据库的操作。


(2)在项目SparkProject的包“cn.itcast.hbase”中创建文件HbaseUtils.java,用于实现封装HBase数据库操作工具类,在类中实现创建HBase数据表和向HBase数据表中插入数据的操作。


持久化热门品类Top10分析结果


       在类CategoryTop10中添加方法top10ToHbase(),用于将热门品类Top10分析结果持久化到HBase数据库中,该方法包含参数top10CategoryList,表示热门品类Top10分析结果数据。

public static void top10ToHbase(List<Tuple2<CategorySortKey, String>> top10CategoryList) throws Exception{     HbaseUtils.createTable("top10","top10_category");     String[] column = {"category_id","viewcount","cartcount","purchasecount"};     String viewcount = "" , cartcount = "", purchasecount = "", category_id = "";       int count = 0;     for (Tuple2<CategorySortKey, String> top10: top10CategoryList) {         count++;         viewcount = String.valueOf(top10._1.getViewCount());         cartcount = String.valueOf(top10._1.getCartCount());         purchasecount = String.valueOf(top10._1.getPurchaseCount());         category_id = top10._2;         String[] value = {category_id,viewcount,cartcount,purchasecount};         HbaseUtils.putsToHBase("top10","rowkey_top"+count,"top10_category",column,value);     } }


       在类CategoryTop10的main()方法中,调用方法top10ToHbase()并传入参数top10CategoryList,用于在Spark程序中实现top10ToHbase()方法,将热门品类Top10分析结果持久化到HBase数据库中的数据表top10。


try {     top10ToHbase(top10CategoryList); } catch (Exception e) {     e.printStackTrace(); } HbaseConnect.closeConnection(); sc.close();


4. 运行程序

       在IntelliJ IDEA中将热门品类Top10分析程序封装成jar包,并上传到集群环境中,通过spark-submit将程序提交到YARN中运行。


封装jar包:


在IntelliJ IDEA主界面单击右侧“Maven”选项卡打开Maven窗口。


11896cde0858437cb9dff901ce46f677.png




       在Maven窗口单击展开Lifecycle折叠框,双击Lifecycle折叠框中的“package”选项,IntelliJ IDEA会自动将程序封装成jar包,封装完成后,若出现“BUILD SUCCESS”内容,则证明成功封装热门品类Top10分析程序为jar包。


ab1f02c6bf164b55907f50ffb174dfd1.png

6c536be7b203488e96512a80b85419d0.png



       在项目SparkProject中的target目录下会生成SparkProject-1.0-SNAPSHOT.jar文件,为了便于后续与其它程序区分,这里将默认文件名称修改为CategoryTop10.jar。


将jar包上传到集群:


       使用远程连接工具SecureCRT连接虚拟机Spark01,在存放jar文件的目录/export/SparkJar/(该目录需提前创建)下执行“rz”命令,上传热门品类Top10分析程序的jar包CategoryTop10.jar。


将数据集上传到本地文件系统:


       使用远程连接工具SecureCRT连接虚拟机Spark01,在存放数据文件的目录/export/data/SparkData/(该目录需提前创建)下执行“rz”命令,将数据集user_session.txt上传至本地文件系统。


在HDFS创建存放数据集的目录:


       将数据集上传到HDFS前,需要在HDFS的根目录创建目录spark_data,用于存放数据集user_session.txt。


hdfs dfs -mkdir /spark_data


上传数据集到HDFS:


       将本地文件系统目录/export/data/SparkData/下的数据集user_session.txt上传到HDFS的spark_data目录下。


hdfs dfs -put /export/data/SparkData/user_session.txt /spark_data


提交热门品类Top10分析程序到YARN集群:


       通过Spark安装目录中bin目录下的shell脚本文件spark-submit提交热门品类Top10分析程序 到Hadoop集群的YARN运行。


spark-submit \


--master yarn \


--deploy-mode cluster \


--num-executors 3 \


--executor-memory 2G \


--class cn.itcast.top10.CategoryTop10 \


/export/SparkJar/CategoryTop10.jar /spark_data/user_session.txt


查看程序运行状态:


       程序运行时在控制台会生成“Application ID”(程序运行时的唯一ID),在浏览器输入“192.168.121.132:8088”,进入YARN的Web UI界面,通过对应“Application ID”查看程序的运行状态,当程序运行完成后State为FINISHED,并且FinalStatus为SUCCEES。


d0151ace57b24e06a0f75175a6ea017f.png


查看程序运行结果:


在虚拟机Spark01执行“hbase shell”命令,进入HBase命令行工具。


42469d2d4f014e3a8dc1b48582b74d91.png


在HBase命令行工具中执行“list”命令,查看HBase数据库中的所有数据表。


> list TAB


test  


top10      


2 row(s) in 0.1810 seconds


在HBase命令行工具执行“scan 'top10'”命令,查询数据表top10中的数据。


ceff6d756ce54b35bfaf39b467342307.png


       本文主要讲解了如何通过用户行为数据实现热门品类Top10分析,首先我们对数据集进行分析,使读者了解用户行为数据的数据结构。接着通过实现思路分析,使读者了解热门品类Top10分析的实现流程。然后通过IntelliJ IDEA开发工具实现热门品类Top10分析程序并将分析结果存储到HBase数据库,使读者掌握运用Java语言编写Spark Core和HBase程序的能力。最后封装热门品类Top10分析程序并提交到集群运行,使读者掌握运用IntelliJ IDEA开发工具封装Spark Core程序以及Spark ON YARN模式运行Spark Core程序的方法。  


相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
1月前
|
数据可视化 搜索推荐 大数据
基于python大数据的北京旅游可视化及分析系统
本文深入探讨智慧旅游系统的背景、意义及研究现状,分析其在旅游业中的作用与发展潜力,介绍平台架构、技术创新、数据挖掘与服务优化等核心内容,并展示系统实现界面。
|
2月前
|
数据采集 人工智能 分布式计算
ODPS在AI时代的发展战略与技术演进分析报告
ODPS(现MaxCompute)历经十五年发展,从分布式计算平台演进为AI时代的数据基础设施,以超大规模处理、多模态融合与Data+AI协同为核心竞争力,支撑大模型训练与实时分析等前沿场景,助力企业实现数据驱动与智能化转型。
263 4
|
2月前
|
JSON 大数据 API
巧用苏宁易购 API,精准分析苏宁易购家电销售大数据
在数据驱动的电商时代,精准分析销售数据能助力企业优化库存、提升营销效果。本文详解如何利用苏宁易购API获取家电销售数据,结合Python进行数据清洗与统计分析,实现销量预测与洞察提取,帮助企业降本增效。
59 0
|
17天前
|
存储 SQL 分布式计算
终于!大数据分析不用再“又要快又要省钱”二选一了!Dataphin新功能太香了!
Dataphin推出查询加速新功能,支持用StarRocks等引擎直连MaxCompute或Hadoop查原始数据,无需同步、秒级响应。数据只存一份,省成本、提效率,权限统一管理,打破“又要快又要省”的不可能三角,助力企业实现分析自由。
126 49
|
22天前
|
数据采集 数据可视化 关系型数据库
基于python大数据的电影数据可视化分析系统
电影分析与可视化平台顺应电影产业数字化趋势,整合大数据处理、人工智能与Web技术,实现电影数据的采集、分析与可视化展示。平台支持票房、评分、观众行为等多维度分析,助力行业洞察与决策,同时提供互动界面,增强观众对电影文化的理解。技术上依托Python、MySQL、Flask、HTML等构建,融合数据采集与AI分析,提升电影行业的数据应用能力。
|
1月前
|
数据可视化 大数据 数据挖掘
基于python大数据的招聘数据可视化分析系统
本系统基于Python开发,整合多渠道招聘数据,利用数据分析与可视化技术,助力企业高效决策。核心功能包括数据采集、智能分析、可视化展示及权限管理,提升招聘效率与人才管理水平,推动人力资源管理数字化转型。
|
1月前
|
机器学习/深度学习 搜索推荐 算法
基于python大数据的口红商品分析与推荐系统
本研究基于Python大数据技术,构建口红商品分析与推荐系统,旨在解决口红市场产品同质化与消费者选择困难问题。通过分析颜色、质地、价格等多维度数据及用户行为,实现个性化推荐,提升购物体验与品牌营销效率,推动美妆行业数字化转型,具有重要现实意义与市场价值。
|
12天前
|
机器学习/深度学习 传感器 分布式计算
数据才是真救命的:聊聊如何用大数据提升灾难预警的精准度
数据才是真救命的:聊聊如何用大数据提升灾难预警的精准度
79 14
|
2月前
|
机器学习/深度学习 运维 监控
运维不怕事多,就怕没数据——用大数据喂饱你的运维策略
运维不怕事多,就怕没数据——用大数据喂饱你的运维策略
82 0
|
3月前
|
数据采集 分布式计算 DataWorks
ODPS在某公共数据项目上的实践
本项目基于公共数据定义及ODPS与DataWorks技术,构建一体化智能化数据平台,涵盖数据目录、归集、治理、共享与开放六大目标。通过十大子系统实现全流程管理,强化数据安全与流通,提升业务效率与决策能力,助力数字化改革。
98 4

热门文章

最新文章