【大数据学习篇8】 热门品类Top10分析(下)

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 【大数据学习篇8】 热门品类Top10分析

创建项目目录


在项目SparkProject中新建Package包。


4479045f745c49aabe9daa4f19b7337f.png


       在“New Package”窗口的文本输入框“Enter new package name”中输入“cn.itcast.top10”设置Package名称,用于存放实现热门品类Top10分析的类文件。


4758cb5e6b5c4774b98691d7a3099d12.png


在Package包“cn.itcast.top10”新建类。


ee06472e9d4f4ce1b95cb8d1eb68c462.png


       在“Create New Class”窗口的文本输入框“Name”中输入“CategoryTop10”设置类名称,在类中实现热门品类Top10分析。


c3ce975303df4bb6be234319fc577161.png


3.2创建Spark连接并读取数据集

       在类CategoryTop10中定义main()方法,该方法是Java程序执行的入口,在main()方法中实现Spark Core程序。


public class CategoryTop10 {     public static void main(String[] arg){     //实现热门品类Top10分析     } }


       在main()方法中,创建JavaSparkContext和SparkConf对象,JavaSparkContext对象用于实现Spark程序,SparkConf对象用于配置Spark程序相关参数。


SparkConf conf = new SparkConf(); //设置Application名称为top3_area_product conf.setAppName("top10_category"); JavaSparkContext sc = new JavaSparkContext(conf);


        在main()方法中,调用JavaSparkContext对象的textFile()方法读取外部文件,将文件中的数据加载到textFileRDD。


JavaRDD<String> textFileRDD = sc.textFile(arg[0]);


3.3 获取业务数据

       在main()方法中,使用mapToPair()算子转换textFileRDD的每一行数据,用于获取每一行数据中的行为类型和品类ID数据,将转换结果加载到transProductRDD。


JavaPairRDD<Tuple2<String,String>,Integer> transformRDD =     textFileRDD.mapToPair(new PairFunction<String,Tuple2<String, String>, Integer>() {     @Override     public Tuple2<Tuple2<String, String>, Integer> call(String s) throws Exception {         JSONObject json = JSONObject.parseObject(s);         String category_id = json.getString("category_id");         String event_type = json.getString("event_type");         return new Tuple2<>(new Tuple2<>(category_id,event_type), new Integer(1));     } });


3.4 统计品类的行为类型

       在main()方法中,使用reduceByKey()算子对transformRDD进行聚合操作,用于统计每个品类中商品被查看、加入购物车和购买的次数,将统计结果加载到aggregationRDD。


JavaPairRDD<Tuple2<String, String>, Integer> aggregationRDD =         transformRDD.reduceByKey(                 new Function2<Integer, Integer, Integer>() {     @Override     public Integer call(Integer integer1, Integer integer2)             throws Exception {         return integer1 + integer2;     } });


3.5 过滤品类的行为类型

在main()方法中,首先使用filter()算子过滤aggregationRDD每一行数据中行为类型为加入购物车和购买的数据,只保留行为类型为查看的数据,然后使用mapToPair()算子对过滤结果进行转换,获取每一行数据中品类被查看次数和品类ID数据,最终将转换结果加载到getViewCategoryRDD。


JavaPairRDD<String,Integer> getViewCategoryRDD =aggregationRDD .filter(new Function<Tuple2<Tuple2<String, String>, Integer>, Boolean>() {         @Override         public Boolean call(Tuple2<Tuple2<String, String>, Integer> tuple2) throws Exception {             String action = tuple2._1._2;             return action.equals("view");         }     }).mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, Integer>() {                 @Override                 public Tuple2<String, Integer> call(Tuple2<Tuple2<String, String>, Integer> tuple2)throws Exception {                     return new Tuple2<>(tuple2._1._1,tuple2._2);                 }             });


       在main()方法中,首先使用filter()算子过滤aggregationRDD每一行数据中行为类型为查看和购买的数据,只保留行为类型为加入购物车的数据,然后使用mapToPair()算子对过滤结果进行转换,获取每一行数据中品类被加入购物车次数和品类ID数据,最终将转换结果加载到getCartCategoryRDD。


JavaPairRDD<String,Integer> getCartCategoryRDD = aggregationRDD         .filter(new Function<Tuple2<Tuple2<String, String>, Integer>, Boolean>() {             @Override             public Boolean call(Tuple2<Tuple2<String, String>, Integer> tuple2) throws Exception {                 String action = tuple2._1._2;                 return action.equals("cart");             }         }).mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, Integer>() {             @Override             public Tuple2<String, Integer> call(Tuple2<Tuple2<String, String>, Integer> tuple2) throws Exception {                 return new Tuple2<>(tuple2._1._1,tuple2._2);             }         });


       在main()方法中,首先使用filter()算子过滤aggregationRDD每一行数据中行为类型为查看和加入购物车的数据,只保留行为类型为购买的数据,然后使用mapToPair()算子对过滤结果进行转换,获取每一行数据中品类被购买次数和品类ID数据,最终将转换结果加载到getPurchaseCategoryRDD。


JavaPairRDD<String,Integer> getPurchaseCategoryRDD = aggregationRDD     .filter(new Function<Tuple2<Tuple2<String, String>, Integer>, Boolean>() {         @Override         public Boolean call(Tuple2<Tuple2<String, String>, Integer> tuple2) throws Exception {             String action = tuple2._1._2;             return action.equals("purchase");         }     }).mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, Integer>() {         @Override         public Tuple2<String, Integer> call(Tuple2<Tuple2<String, String>, Integer> tuple2) throws Exception {             return new Tuple2<>(tuple2._1._1,tuple2._2);         }     });


3.6  合并相同品类的行为类型

       在main()方法中,使用leftOuterJoin(左外连接)算子合并getViewCategoryRDD、getCartCategoryRDD和getPurchaseCategoryRDD,用于合并同一品类的查看次数、加入购物车次数和购买次数,将合并结果加载到joinCategoryRDD。


JavaPairRDD<String,Tuple2<Integer, Optional<Integer>>> tmpJoinCategoryRDD         =getViewCategoryRDD.leftOuterJoin(getCartCategoryRDD); JavaPairRDD<String,Tuple2<Tuple2<Integer, Optional<Integer>>,Optional<Integer>>> joinCategoryRDD         = tmpJoinCategoryRDD.leftOuterJoin(getPurchaseCategoryRDD);


Optional类是一个包含有可选值的包装类,它既可以含有对象也可以为空,主要为了解决空指针异常的问题,因为某些品类中的商品可能被查看但并未被购买或加入购物车。


3.7  根据品类的行为类型进行排序

       在包“cn.itcast.top10”中创建文件CategorySortKey.java,用于实现自定义排序。在类CategorySortKey中继承比较器接口Comparable和序列化接口Serializable,并实现Comparable接口的compareTo()方法。


import java.io.Serializable; public class CategorySortKey implements Comparable<CategorySortKey>,Serializable{      ......     @Override     public int compareTo(CategorySortKey other) {         if(viewCount - other.getViewCount() != 0) {             return (int) (viewCount - other.getViewCount());         } else if(cartCount - other.getCartCount() != 0) {             return (int) (cartCount - other.getCartCount());         } else if(purchaseCount - other.getPurchaseCount() != 0) {             return (int) (purchaseCount - other.getPurchaseCount());         }         return 0;     } }


       在main()方法中,使用mapTopair()算子转换joinCategoryRDD,将joinCategoryRDD中品类被查看次数、加入购物车次数和购买次数映射到自定义排序类CategorySortKey,通过transCategoryRDD加载转换结果。


JavaPairRDD<CategorySortKey,String> transCategoryRDD = joinCategoryRDD .mapToPair(new PairFunction<Tuple2<String, Tuple2<Tuple2<Integer, Optional<Integer>>,Optional<Integer>>>,CategorySortKey,String>() {      @Override      public Tuple2<CategorySortKey,String> call(Tuple2<String, Tuple2<Tuple2<Integer, Optional<Integer>>, Optional<Integer>>> tuple2) throws Exception {                    String category_id = tuple2._1;                    int viewcount = tuple2._2._1._1;                    int cartcount = 0;                    int purchasecount = 0;                    if (tuple2._2._1._2.isPresent()){  cartcount = tuple2._2._1._2.get().intValue();}                    if (tuple2._2._2.isPresent()){ purchasecount = tuple2._2._2.get().intValue(); }                    CategorySortKey sortKey = new CategorySortKey(viewcount, cartcount, purchasecount);                    return new Tuple2<>(sortKey,category_id);                }            });


       在main()方法中,通过sortByKey()算子对transCategoryRDD进行排序操作,使transCategoryRDD中品类被查看次数、加入购物车次数和购买次数根据自定义排序类CategorySortKey指定的排序规则进行排序,将排序结果加载到sortedCategoryRDD。


JavaPairRDD<CategorySortKey,String> sortedCategoryRDD = transCategoryRDD.sortByKey(false);


       在main()方法中,使用take()算子获取sortedCategoryRDD前10个元素,即热门品类Top10分析结果,将分析结果加载到top10CategoryList。


List<Tuple2<CategorySortKey, String>> top10CategoryList = sortedCategoryRDD.take(10);


3.3.8  数据持久化

封装工具类:


(1)在项目SparkProject的 java目录新建Package包“cn.itcast.hbase”,用于存放实现数据持久化的Java文件。在包“cn.itcast.hbase”下创建文件HbaseConnect.java,用于实现封装HBase数据库连接工具类,在类中实现连接HBase数据库的操作。


(2)在项目SparkProject的包“cn.itcast.hbase”中创建文件HbaseUtils.java,用于实现封装HBase数据库操作工具类,在类中实现创建HBase数据表和向HBase数据表中插入数据的操作。


持久化热门品类Top10分析结果


       在类CategoryTop10中添加方法top10ToHbase(),用于将热门品类Top10分析结果持久化到HBase数据库中,该方法包含参数top10CategoryList,表示热门品类Top10分析结果数据。

public static void top10ToHbase(List<Tuple2<CategorySortKey, String>> top10CategoryList) throws Exception{     HbaseUtils.createTable("top10","top10_category");     String[] column = {"category_id","viewcount","cartcount","purchasecount"};     String viewcount = "" , cartcount = "", purchasecount = "", category_id = "";       int count = 0;     for (Tuple2<CategorySortKey, String> top10: top10CategoryList) {         count++;         viewcount = String.valueOf(top10._1.getViewCount());         cartcount = String.valueOf(top10._1.getCartCount());         purchasecount = String.valueOf(top10._1.getPurchaseCount());         category_id = top10._2;         String[] value = {category_id,viewcount,cartcount,purchasecount};         HbaseUtils.putsToHBase("top10","rowkey_top"+count,"top10_category",column,value);     } }


       在类CategoryTop10的main()方法中,调用方法top10ToHbase()并传入参数top10CategoryList,用于在Spark程序中实现top10ToHbase()方法,将热门品类Top10分析结果持久化到HBase数据库中的数据表top10。


try {     top10ToHbase(top10CategoryList); } catch (Exception e) {     e.printStackTrace(); } HbaseConnect.closeConnection(); sc.close();


4. 运行程序

       在IntelliJ IDEA中将热门品类Top10分析程序封装成jar包,并上传到集群环境中,通过spark-submit将程序提交到YARN中运行。


封装jar包:


在IntelliJ IDEA主界面单击右侧“Maven”选项卡打开Maven窗口。


11896cde0858437cb9dff901ce46f677.png




       在Maven窗口单击展开Lifecycle折叠框,双击Lifecycle折叠框中的“package”选项,IntelliJ IDEA会自动将程序封装成jar包,封装完成后,若出现“BUILD SUCCESS”内容,则证明成功封装热门品类Top10分析程序为jar包。


ab1f02c6bf164b55907f50ffb174dfd1.png

6c536be7b203488e96512a80b85419d0.png



       在项目SparkProject中的target目录下会生成SparkProject-1.0-SNAPSHOT.jar文件,为了便于后续与其它程序区分,这里将默认文件名称修改为CategoryTop10.jar。


将jar包上传到集群:


       使用远程连接工具SecureCRT连接虚拟机Spark01,在存放jar文件的目录/export/SparkJar/(该目录需提前创建)下执行“rz”命令,上传热门品类Top10分析程序的jar包CategoryTop10.jar。


将数据集上传到本地文件系统:


       使用远程连接工具SecureCRT连接虚拟机Spark01,在存放数据文件的目录/export/data/SparkData/(该目录需提前创建)下执行“rz”命令,将数据集user_session.txt上传至本地文件系统。


在HDFS创建存放数据集的目录:


       将数据集上传到HDFS前,需要在HDFS的根目录创建目录spark_data,用于存放数据集user_session.txt。


hdfs dfs -mkdir /spark_data


上传数据集到HDFS:


       将本地文件系统目录/export/data/SparkData/下的数据集user_session.txt上传到HDFS的spark_data目录下。


hdfs dfs -put /export/data/SparkData/user_session.txt /spark_data


提交热门品类Top10分析程序到YARN集群:


       通过Spark安装目录中bin目录下的shell脚本文件spark-submit提交热门品类Top10分析程序 到Hadoop集群的YARN运行。


spark-submit \


--master yarn \


--deploy-mode cluster \


--num-executors 3 \


--executor-memory 2G \


--class cn.itcast.top10.CategoryTop10 \


/export/SparkJar/CategoryTop10.jar /spark_data/user_session.txt


查看程序运行状态:


       程序运行时在控制台会生成“Application ID”(程序运行时的唯一ID),在浏览器输入“192.168.121.132:8088”,进入YARN的Web UI界面,通过对应“Application ID”查看程序的运行状态,当程序运行完成后State为FINISHED,并且FinalStatus为SUCCEES。


d0151ace57b24e06a0f75175a6ea017f.png


查看程序运行结果:


在虚拟机Spark01执行“hbase shell”命令,进入HBase命令行工具。


42469d2d4f014e3a8dc1b48582b74d91.png


在HBase命令行工具中执行“list”命令,查看HBase数据库中的所有数据表。


> list TAB


test  


top10      


2 row(s) in 0.1810 seconds


在HBase命令行工具执行“scan 'top10'”命令,查询数据表top10中的数据。


ceff6d756ce54b35bfaf39b467342307.png


       本文主要讲解了如何通过用户行为数据实现热门品类Top10分析,首先我们对数据集进行分析,使读者了解用户行为数据的数据结构。接着通过实现思路分析,使读者了解热门品类Top10分析的实现流程。然后通过IntelliJ IDEA开发工具实现热门品类Top10分析程序并将分析结果存储到HBase数据库,使读者掌握运用Java语言编写Spark Core和HBase程序的能力。最后封装热门品类Top10分析程序并提交到集群运行,使读者掌握运用IntelliJ IDEA开发工具封装Spark Core程序以及Spark ON YARN模式运行Spark Core程序的方法。  


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
14天前
|
分布式计算 Hadoop 大数据
Jupyter 在大数据分析中的角色
【8月更文第29天】Jupyter Notebook 提供了一个交互式的开发环境,它不仅适用于 Python 编程语言,还能够支持其他语言,包括 Scala 和 R 等。这种多语言的支持使得 Jupyter 成为大数据分析领域中非常有价值的工具,特别是在与 Apache Spark 和 Hadoop 等大数据框架集成方面。本文将探讨 Jupyter 如何支持这些大数据框架进行高效的数据处理和分析,并提供具体的代码示例。
23 0
|
6天前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
47 11
|
15天前
|
分布式计算 数据可视化 大数据
Vaex :突破pandas,快速分析100GB大数据集
Vaex :突破pandas,快速分析100GB大数据集
|
13天前
|
大数据 机器人 数据挖掘
这个云ETL工具配合Python轻松实现大数据集分析,附案例
这个云ETL工具配合Python轻松实现大数据集分析,附案例
|
14天前
|
数据采集 人工智能 安全
AI大数据处理与分析实战--体育问卷分析
本文是关于使用AI进行大数据处理与分析的实战案例,详细记录了对深圳市义务教育阶段学校“每天一节体育课”网络问卷的分析过程,包括数据概览、交互Prompt、代码处理、年级和学校维度的深入分析,以及通过AI工具辅助得出的分析结果和结论。
|
16天前
|
消息中间件 前端开发 安全
第三方数据平台技术选型分析
这篇文章分析了第三方数据平台的技术选型,涵盖了移动统计平台、自助分析平台和BI平台的不同代表厂商,讨论了它们的数据源、使用要求和适用场景。
30 2
|
11天前
|
存储 分布式计算 数据处理
MaxCompute 的成本效益分析与优化策略
【8月更文第31天】随着云计算技术的发展,越来越多的企业选择将数据处理和分析任务迁移到云端。阿里云的 MaxCompute 是一款专为海量数据设计的大规模数据仓库平台,它不仅提供了强大的数据处理能力,还简化了数据管理的工作流程。然而,在享受这些便利的同时,企业也需要考虑如何有效地控制成本,确保资源得到最优利用。本文将探讨如何评估 MaxCompute 的使用成本,并提出一些优化策略以降低费用,提高资源利用率。
12 0
|
11天前
|
存储 分布式计算 大数据
MaxCompute 数据分区与生命周期管理
【8月更文第31天】随着大数据分析需求的增长,如何高效地管理和组织数据变得至关重要。阿里云的 MaxCompute(原名 ODPS)是一个专为海量数据设计的计算服务,它提供了丰富的功能来帮助用户管理和优化数据。本文将重点讨论 MaxCompute 中的数据分区策略和生命周期管理方法,并通过具体的代码示例来展示如何实施这些策略。
38 1
|
17天前
数据平台问题之在数据影响决策的过程中,如何实现“决策/行动”阶段
数据平台问题之在数据影响决策的过程中,如何实现“决策/行动”阶段
|
20天前
|
存储 监控 安全
大数据架构设计原则:构建高效、可扩展与安全的数据生态系统
【8月更文挑战第23天】大数据架构设计是一个复杂而系统的工程,需要综合考虑业务需求、技术选型、安全合规等多个方面。遵循上述设计原则,可以帮助企业构建出既高效又安全的大数据生态系统,为业务创新和决策支持提供强有力的支撑。随着技术的不断发展和业务需求的不断变化,持续优化和调整大数据架构也将成为一项持续的工作。

热门文章

最新文章