【大数据学习篇8】 热门品类Top10分析(下)

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 【大数据学习篇8】 热门品类Top10分析

创建项目目录


在项目SparkProject中新建Package包。


4479045f745c49aabe9daa4f19b7337f.png


       在“New Package”窗口的文本输入框“Enter new package name”中输入“cn.itcast.top10”设置Package名称,用于存放实现热门品类Top10分析的类文件。


4758cb5e6b5c4774b98691d7a3099d12.png


在Package包“cn.itcast.top10”新建类。


ee06472e9d4f4ce1b95cb8d1eb68c462.png


       在“Create New Class”窗口的文本输入框“Name”中输入“CategoryTop10”设置类名称,在类中实现热门品类Top10分析。


c3ce975303df4bb6be234319fc577161.png


3.2创建Spark连接并读取数据集

       在类CategoryTop10中定义main()方法,该方法是Java程序执行的入口,在main()方法中实现Spark Core程序。


public class CategoryTop10 {     public static void main(String[] arg){     //实现热门品类Top10分析     } }


       在main()方法中,创建JavaSparkContext和SparkConf对象,JavaSparkContext对象用于实现Spark程序,SparkConf对象用于配置Spark程序相关参数。


SparkConf conf = new SparkConf(); //设置Application名称为top3_area_product conf.setAppName("top10_category"); JavaSparkContext sc = new JavaSparkContext(conf);


        在main()方法中,调用JavaSparkContext对象的textFile()方法读取外部文件,将文件中的数据加载到textFileRDD。


JavaRDD<String> textFileRDD = sc.textFile(arg[0]);


3.3 获取业务数据

       在main()方法中,使用mapToPair()算子转换textFileRDD的每一行数据,用于获取每一行数据中的行为类型和品类ID数据,将转换结果加载到transProductRDD。


JavaPairRDD<Tuple2<String,String>,Integer> transformRDD =     textFileRDD.mapToPair(new PairFunction<String,Tuple2<String, String>, Integer>() {     @Override     public Tuple2<Tuple2<String, String>, Integer> call(String s) throws Exception {         JSONObject json = JSONObject.parseObject(s);         String category_id = json.getString("category_id");         String event_type = json.getString("event_type");         return new Tuple2<>(new Tuple2<>(category_id,event_type), new Integer(1));     } });


3.4 统计品类的行为类型

       在main()方法中,使用reduceByKey()算子对transformRDD进行聚合操作,用于统计每个品类中商品被查看、加入购物车和购买的次数,将统计结果加载到aggregationRDD。


JavaPairRDD<Tuple2<String, String>, Integer> aggregationRDD =         transformRDD.reduceByKey(                 new Function2<Integer, Integer, Integer>() {     @Override     public Integer call(Integer integer1, Integer integer2)             throws Exception {         return integer1 + integer2;     } });


3.5 过滤品类的行为类型

在main()方法中,首先使用filter()算子过滤aggregationRDD每一行数据中行为类型为加入购物车和购买的数据,只保留行为类型为查看的数据,然后使用mapToPair()算子对过滤结果进行转换,获取每一行数据中品类被查看次数和品类ID数据,最终将转换结果加载到getViewCategoryRDD。


JavaPairRDD<String,Integer> getViewCategoryRDD =aggregationRDD .filter(new Function<Tuple2<Tuple2<String, String>, Integer>, Boolean>() {         @Override         public Boolean call(Tuple2<Tuple2<String, String>, Integer> tuple2) throws Exception {             String action = tuple2._1._2;             return action.equals("view");         }     }).mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, Integer>() {                 @Override                 public Tuple2<String, Integer> call(Tuple2<Tuple2<String, String>, Integer> tuple2)throws Exception {                     return new Tuple2<>(tuple2._1._1,tuple2._2);                 }             });


       在main()方法中,首先使用filter()算子过滤aggregationRDD每一行数据中行为类型为查看和购买的数据,只保留行为类型为加入购物车的数据,然后使用mapToPair()算子对过滤结果进行转换,获取每一行数据中品类被加入购物车次数和品类ID数据,最终将转换结果加载到getCartCategoryRDD。


JavaPairRDD<String,Integer> getCartCategoryRDD = aggregationRDD         .filter(new Function<Tuple2<Tuple2<String, String>, Integer>, Boolean>() {             @Override             public Boolean call(Tuple2<Tuple2<String, String>, Integer> tuple2) throws Exception {                 String action = tuple2._1._2;                 return action.equals("cart");             }         }).mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, Integer>() {             @Override             public Tuple2<String, Integer> call(Tuple2<Tuple2<String, String>, Integer> tuple2) throws Exception {                 return new Tuple2<>(tuple2._1._1,tuple2._2);             }         });


       在main()方法中,首先使用filter()算子过滤aggregationRDD每一行数据中行为类型为查看和加入购物车的数据,只保留行为类型为购买的数据,然后使用mapToPair()算子对过滤结果进行转换,获取每一行数据中品类被购买次数和品类ID数据,最终将转换结果加载到getPurchaseCategoryRDD。


JavaPairRDD<String,Integer> getPurchaseCategoryRDD = aggregationRDD     .filter(new Function<Tuple2<Tuple2<String, String>, Integer>, Boolean>() {         @Override         public Boolean call(Tuple2<Tuple2<String, String>, Integer> tuple2) throws Exception {             String action = tuple2._1._2;             return action.equals("purchase");         }     }).mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, Integer>() {         @Override         public Tuple2<String, Integer> call(Tuple2<Tuple2<String, String>, Integer> tuple2) throws Exception {             return new Tuple2<>(tuple2._1._1,tuple2._2);         }     });


3.6  合并相同品类的行为类型

       在main()方法中,使用leftOuterJoin(左外连接)算子合并getViewCategoryRDD、getCartCategoryRDD和getPurchaseCategoryRDD,用于合并同一品类的查看次数、加入购物车次数和购买次数,将合并结果加载到joinCategoryRDD。


JavaPairRDD<String,Tuple2<Integer, Optional<Integer>>> tmpJoinCategoryRDD         =getViewCategoryRDD.leftOuterJoin(getCartCategoryRDD); JavaPairRDD<String,Tuple2<Tuple2<Integer, Optional<Integer>>,Optional<Integer>>> joinCategoryRDD         = tmpJoinCategoryRDD.leftOuterJoin(getPurchaseCategoryRDD);


Optional类是一个包含有可选值的包装类,它既可以含有对象也可以为空,主要为了解决空指针异常的问题,因为某些品类中的商品可能被查看但并未被购买或加入购物车。


3.7  根据品类的行为类型进行排序

       在包“cn.itcast.top10”中创建文件CategorySortKey.java,用于实现自定义排序。在类CategorySortKey中继承比较器接口Comparable和序列化接口Serializable,并实现Comparable接口的compareTo()方法。


import java.io.Serializable; public class CategorySortKey implements Comparable<CategorySortKey>,Serializable{      ......     @Override     public int compareTo(CategorySortKey other) {         if(viewCount - other.getViewCount() != 0) {             return (int) (viewCount - other.getViewCount());         } else if(cartCount - other.getCartCount() != 0) {             return (int) (cartCount - other.getCartCount());         } else if(purchaseCount - other.getPurchaseCount() != 0) {             return (int) (purchaseCount - other.getPurchaseCount());         }         return 0;     } }


       在main()方法中,使用mapTopair()算子转换joinCategoryRDD,将joinCategoryRDD中品类被查看次数、加入购物车次数和购买次数映射到自定义排序类CategorySortKey,通过transCategoryRDD加载转换结果。


JavaPairRDD<CategorySortKey,String> transCategoryRDD = joinCategoryRDD .mapToPair(new PairFunction<Tuple2<String, Tuple2<Tuple2<Integer, Optional<Integer>>,Optional<Integer>>>,CategorySortKey,String>() {      @Override      public Tuple2<CategorySortKey,String> call(Tuple2<String, Tuple2<Tuple2<Integer, Optional<Integer>>, Optional<Integer>>> tuple2) throws Exception {                    String category_id = tuple2._1;                    int viewcount = tuple2._2._1._1;                    int cartcount = 0;                    int purchasecount = 0;                    if (tuple2._2._1._2.isPresent()){  cartcount = tuple2._2._1._2.get().intValue();}                    if (tuple2._2._2.isPresent()){ purchasecount = tuple2._2._2.get().intValue(); }                    CategorySortKey sortKey = new CategorySortKey(viewcount, cartcount, purchasecount);                    return new Tuple2<>(sortKey,category_id);                }            });


       在main()方法中,通过sortByKey()算子对transCategoryRDD进行排序操作,使transCategoryRDD中品类被查看次数、加入购物车次数和购买次数根据自定义排序类CategorySortKey指定的排序规则进行排序,将排序结果加载到sortedCategoryRDD。


JavaPairRDD<CategorySortKey,String> sortedCategoryRDD = transCategoryRDD.sortByKey(false);


       在main()方法中,使用take()算子获取sortedCategoryRDD前10个元素,即热门品类Top10分析结果,将分析结果加载到top10CategoryList。


List<Tuple2<CategorySortKey, String>> top10CategoryList = sortedCategoryRDD.take(10);


3.3.8  数据持久化

封装工具类:


(1)在项目SparkProject的 java目录新建Package包“cn.itcast.hbase”,用于存放实现数据持久化的Java文件。在包“cn.itcast.hbase”下创建文件HbaseConnect.java,用于实现封装HBase数据库连接工具类,在类中实现连接HBase数据库的操作。


(2)在项目SparkProject的包“cn.itcast.hbase”中创建文件HbaseUtils.java,用于实现封装HBase数据库操作工具类,在类中实现创建HBase数据表和向HBase数据表中插入数据的操作。


持久化热门品类Top10分析结果


       在类CategoryTop10中添加方法top10ToHbase(),用于将热门品类Top10分析结果持久化到HBase数据库中,该方法包含参数top10CategoryList,表示热门品类Top10分析结果数据。

public static void top10ToHbase(List<Tuple2<CategorySortKey, String>> top10CategoryList) throws Exception{     HbaseUtils.createTable("top10","top10_category");     String[] column = {"category_id","viewcount","cartcount","purchasecount"};     String viewcount = "" , cartcount = "", purchasecount = "", category_id = "";       int count = 0;     for (Tuple2<CategorySortKey, String> top10: top10CategoryList) {         count++;         viewcount = String.valueOf(top10._1.getViewCount());         cartcount = String.valueOf(top10._1.getCartCount());         purchasecount = String.valueOf(top10._1.getPurchaseCount());         category_id = top10._2;         String[] value = {category_id,viewcount,cartcount,purchasecount};         HbaseUtils.putsToHBase("top10","rowkey_top"+count,"top10_category",column,value);     } }


       在类CategoryTop10的main()方法中,调用方法top10ToHbase()并传入参数top10CategoryList,用于在Spark程序中实现top10ToHbase()方法,将热门品类Top10分析结果持久化到HBase数据库中的数据表top10。


try {     top10ToHbase(top10CategoryList); } catch (Exception e) {     e.printStackTrace(); } HbaseConnect.closeConnection(); sc.close();


4. 运行程序

       在IntelliJ IDEA中将热门品类Top10分析程序封装成jar包,并上传到集群环境中,通过spark-submit将程序提交到YARN中运行。


封装jar包:


在IntelliJ IDEA主界面单击右侧“Maven”选项卡打开Maven窗口。


11896cde0858437cb9dff901ce46f677.png




       在Maven窗口单击展开Lifecycle折叠框,双击Lifecycle折叠框中的“package”选项,IntelliJ IDEA会自动将程序封装成jar包,封装完成后,若出现“BUILD SUCCESS”内容,则证明成功封装热门品类Top10分析程序为jar包。


ab1f02c6bf164b55907f50ffb174dfd1.png

6c536be7b203488e96512a80b85419d0.png



       在项目SparkProject中的target目录下会生成SparkProject-1.0-SNAPSHOT.jar文件,为了便于后续与其它程序区分,这里将默认文件名称修改为CategoryTop10.jar。


将jar包上传到集群:


       使用远程连接工具SecureCRT连接虚拟机Spark01,在存放jar文件的目录/export/SparkJar/(该目录需提前创建)下执行“rz”命令,上传热门品类Top10分析程序的jar包CategoryTop10.jar。


将数据集上传到本地文件系统:


       使用远程连接工具SecureCRT连接虚拟机Spark01,在存放数据文件的目录/export/data/SparkData/(该目录需提前创建)下执行“rz”命令,将数据集user_session.txt上传至本地文件系统。


在HDFS创建存放数据集的目录:


       将数据集上传到HDFS前,需要在HDFS的根目录创建目录spark_data,用于存放数据集user_session.txt。


hdfs dfs -mkdir /spark_data


上传数据集到HDFS:


       将本地文件系统目录/export/data/SparkData/下的数据集user_session.txt上传到HDFS的spark_data目录下。


hdfs dfs -put /export/data/SparkData/user_session.txt /spark_data


提交热门品类Top10分析程序到YARN集群:


       通过Spark安装目录中bin目录下的shell脚本文件spark-submit提交热门品类Top10分析程序 到Hadoop集群的YARN运行。


spark-submit \


--master yarn \


--deploy-mode cluster \


--num-executors 3 \


--executor-memory 2G \


--class cn.itcast.top10.CategoryTop10 \


/export/SparkJar/CategoryTop10.jar /spark_data/user_session.txt


查看程序运行状态:


       程序运行时在控制台会生成“Application ID”(程序运行时的唯一ID),在浏览器输入“192.168.121.132:8088”,进入YARN的Web UI界面,通过对应“Application ID”查看程序的运行状态,当程序运行完成后State为FINISHED,并且FinalStatus为SUCCEES。


d0151ace57b24e06a0f75175a6ea017f.png


查看程序运行结果:


在虚拟机Spark01执行“hbase shell”命令,进入HBase命令行工具。


42469d2d4f014e3a8dc1b48582b74d91.png


在HBase命令行工具中执行“list”命令,查看HBase数据库中的所有数据表。


> list TAB


test  


top10      


2 row(s) in 0.1810 seconds


在HBase命令行工具执行“scan 'top10'”命令,查询数据表top10中的数据。


ceff6d756ce54b35bfaf39b467342307.png


       本文主要讲解了如何通过用户行为数据实现热门品类Top10分析,首先我们对数据集进行分析,使读者了解用户行为数据的数据结构。接着通过实现思路分析,使读者了解热门品类Top10分析的实现流程。然后通过IntelliJ IDEA开发工具实现热门品类Top10分析程序并将分析结果存储到HBase数据库,使读者掌握运用Java语言编写Spark Core和HBase程序的能力。最后封装热门品类Top10分析程序并提交到集群运行,使读者掌握运用IntelliJ IDEA开发工具封装Spark Core程序以及Spark ON YARN模式运行Spark Core程序的方法。  


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
1月前
|
存储 机器学习/深度学习 SQL
大数据处理与分析技术
大数据处理与分析技术
103 2
|
2天前
|
机器学习/深度学习 数据可视化 大数据
机器学习与大数据分析的结合:智能决策的新引擎
机器学习与大数据分析的结合:智能决策的新引擎
38 15
|
8天前
|
SQL 分布式计算 DataWorks
DataWorks产品测评|基于DataWorks和MaxCompute产品组合实现用户画像分析
本文介绍了如何使用DataWorks和MaxCompute产品组合实现用户画像分析。首先,通过阿里云官网开通DataWorks服务并创建资源组,接着创建MaxCompute项目和数据源。随后,利用DataWorks的数据集成和数据开发模块,将业务数据同步至MaxCompute,并通过ODPS SQL完成用户画像的数据加工,最终将结果写入`ads_user_info_1d`表。文章详细记录了每一步的操作过程,包括任务开发、运行、运维操作和资源释放,帮助读者顺利完成用户画像分析。此外,还指出了文档中的一些不一致之处,并提供了相应的解决方法。
|
6天前
|
分布式计算 DataWorks 搜索推荐
用户画像分析(MaxCompute简化版)
通过本教程,您可以了解如何使用DataWorks和MaxCompute产品组合进行数仓开发与分析,并通过案例体验DataWorks数据集成、数据开发和运维中心模块的相关能力。
42 4
|
26天前
|
机器学习/深度学习 存储 大数据
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系,保留最大方差信息,实现数据压缩、去噪及可视化。本文详解PCA原理、步骤及其Python实现,探讨其在图像压缩、特征提取等领域的应用,并指出使用时的注意事项,旨在帮助读者掌握这一强大工具。
63 4
|
27天前
|
关系型数据库 分布式数据库 数据库
PolarDB 以其出色的性能和可扩展性,成为大数据分析的重要工具
在数字化时代,企业面对海量数据的挑战,PolarDB 以其出色的性能和可扩展性,成为大数据分析的重要工具。它不仅支持高速数据读写,还通过数据分区、索引优化等策略提升分析效率,适用于电商、金融等多个行业,助力企业精准决策。
34 4
|
27天前
|
机器学习/深度学习 分布式计算 算法
【大数据分析&机器学习】分布式机器学习
本文主要介绍分布式机器学习基础知识,并介绍主流的分布式机器学习框架,结合实例介绍一些机器学习算法。
180 5
|
2月前
|
存储 机器学习/深度学习 分布式计算
大数据技术——解锁数据的力量,引领未来趋势
【10月更文挑战第5天】大数据技术——解锁数据的力量,引领未来趋势
|
1月前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
301 7
|
1月前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
46 2