第16题:Python数据清洗转换
需求列表
1. Python数据清洗转换
数据分析师小 Q 收到一份微信用户行为 log 文件,文件中每一行包含三列 (ftime, event_code, event_value) 数据样例如下:
2020-01-18 19:02:48 | view_article | A1=wxid123&A3=post123&A4=1282&A5=
2020-01-18 19:02:48 | comment_article | A1=wxid123&A3=post123&A4=1282&A5=
说明:
第一列 ftime 记录了用户行行为时间例如 2020-01-18 19:02:48
第二列 event_code 记录了用户事件id, 例如 ‘view_article’ = 阅读文章,
‘comment_article’ = 评论文章
第三列 event_value 记录了事件详情,例如 ‘A1=wxid123&A3=post123&A4=1&A5=’ A1 表示用户 ID
A3 表 示 文 章 ID
A4 表示文章类型
请帮助小 Q 实现一个函数,从文件中提取出用户阅读文章的行为字段(时间,用户ID,文章 ID)
思路分析
使用PySpark读取微信日志数据,并对数据进行转换和清洗,最终将清洗后的结果RDD保存到HDFS中。在if name == ‘main’:代码块中,先创建了一个SparkConf配置对象和一个SparkContext上下文对象,然后使用SparkContext上下文对象从本地文件系统中读取微信日志数据,并将每行数据作为RDD的一个元素。接着定义了一个函数data_wash用于将每行日志数据进行转换清洗,并返回清洗后的结果。最后,使用RDD的map函数将原始日志数据RDD中的每个元素都应用data_wash函数进行清洗和转换,得到清洗后的结果RDD,并将结果RDD保存到HDFS中。最后,使用SparkContext的stop函数关闭SparkContext上下文对象,释放资源。
答案获取
建议你先动脑思考,动手写一写再对照看下答案,如果实在不懂可以点击下方卡片,回复:大厂sql
即可。
参考答案适用HQL,SparkSQL,FlinkSQL,即大数据组件,其他SQL需自行修改。
加技术群讨论
点击下方卡片关注 联系我进群
或者直接私信我进群
文末SQL小技巧
提高SQL功底的思路。
1、造数据。因为有数据支撑,会方便我们根据数据结果去不断调整SQL的写法。
造数据语法既可以create table再insert into,也可以用下面的create temporary view xx as values语句,更简单。
其中create temporary view xx as values语句,SparkSQL语法支持,hive不支持。
2、先将结果表画出来,包括结果字段名有哪些,数据量也画几条。这是分析他要什么。
从源表到结果表,一路可能要走多个步骤,其实就是可能需要多个子查询,过程多就用with as来重构提高可读性。
3、要由简单过度到复杂,不要一下子就写一个很复杂的。
先写简单的select from table…,每个中间步骤都执行打印结果,看是否符合预期, 根据中间结果,进一步调整修饰SQL语句,再执行,直到接近结果表。
4、数据量要小,工具要快,如果用hive,就设置set hive.exec.mode.local.auto=true;如果是SparkSQL,就设置合适的shuffle并行度,set spark.sql.shuffle.partitions=4;