[spark][python]Spark map 处理

简介:

map 就是对一个RDD的各个元素都施加处理,得到一个新的RDD 的过程

[training@localhost ~]$ cat names.txt
Year,First Name,County,Sex,Count
2012,DOMINIC,CAYUGA,M,6
2012,ADDISON,ONONDAGA,F,14
2012,ADDISON,ONONDAGA,F,14
2012,JULIA,ONONDAGA,F,15
[training@localhost ~]$ hdfs dfs -put names.txt
[training@localhost ~]$ hdfs dfs -cat names.txt
Year,First Name,County,Sex,Count
2012,DOMINIC,CAYUGA,M,6
2012,ADDISON,ONONDAGA,F,14
2012,ADDISON,ONONDAGA,F,14
2012,JULIA,ONONDAGA,F,15
[training@localhost ~]$


In [98]: t_names = sc.textFile("names.txt")
17/09/24 06:24:22 INFO storage.MemoryStore: Block broadcast_27 stored as values in memory (estimated size 230.5 KB, free 2.3 MB)
17/09/24 06:24:23 INFO storage.MemoryStore: Block broadcast_27_piece0 stored as bytes in memory (estimated size 21.5 KB, free 2.3 MB)
17/09/24 06:24:23 INFO storage.BlockManagerInfo: Added broadcast_27_piece0 in memory on localhost:33950 (size: 21.5 KB, free: 208.6 MB)
17/09/24 06:24:23 INFO spark.SparkContext: Created broadcast 27 from textFile at NativeMethodAccessorImpl.java:-2

In [99]: rows=t_names.map(lambda line: line.split(","))

In [100]: rows.take(1)


17/09/24 06:25:23 INFO mapred.FileInputFormat: Total input paths to process : 1
17/09/24 06:25:23 INFO spark.SparkContext: Starting job: runJob at PythonRDD.scala:393
17/09/24 06:25:23 INFO scheduler.DAGScheduler: Got job 15 (runJob at PythonRDD.scala:393) with 1 output partitions
17/09/24 06:25:23 INFO scheduler.DAGScheduler: Final stage: ResultStage 15 (runJob at PythonRDD.scala:393)
17/09/24 06:25:23 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/09/24 06:25:23 INFO scheduler.DAGScheduler: Missing parents: List()
17/09/24 06:25:23 INFO scheduler.DAGScheduler: Submitting ResultStage 15 (PythonRDD[46] at RDD at PythonRDD.scala:43), which has no missing parents
17/09/24 06:25:23 INFO storage.MemoryStore: Block broadcast_28 stored as values in memory (estimated size 5.2 KB, free 2.3 MB)
17/09/24 06:25:24 INFO storage.BlockManagerInfo: Removed broadcast_26_piece0 on localhost:33950 in memory (size: 3.3 KB, free: 208.6 MB)
17/09/24 06:25:24 INFO spark.ContextCleaner: Cleaned accumulator 8
17/09/24 06:25:24 INFO storage.BlockManagerInfo: Removed broadcast_18_piece0 on localhost:33950 in memory (size: 3.7 KB, free: 208.6 MB)
17/09/24 06:25:24 INFO storage.MemoryStore: Block broadcast_28_piece0 stored as bytes in memory (estimated size 3.3 KB, free 2.3 MB)
17/09/24 06:25:24 INFO spark.ContextCleaner: Cleaned accumulator 9
17/09/24 06:25:24 INFO storage.BlockManagerInfo: Removed broadcast_19_piece0 on localhost:33950 in memory (size: 3.3 KB, free: 208.6 MB)
17/09/24 06:25:24 INFO spark.ContextCleaner: Cleaned accumulator 10
17/09/24 06:25:24 INFO storage.BlockManagerInfo: Added broadcast_28_piece0 in memory on localhost:33950 (size: 3.3 KB, free: 208.6 MB)
17/09/24 06:25:24 INFO spark.SparkContext: Created broadcast 28 from broadcast at DAGScheduler.scala:1006
17/09/24 06:25:24 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 15 (PythonRDD[46] at RDD at PythonRDD.scala:43)
17/09/24 06:25:24 INFO scheduler.TaskSchedulerImpl: Adding task set 15.0 with 1 tasks
17/09/24 06:25:24 INFO storage.BlockManagerInfo: Removed broadcast_20_piece0 on localhost:33950 in memory (size: 3.7 KB, free: 208.6 MB)
17/09/24 06:25:24 INFO spark.ContextCleaner: Cleaned accumulator 11
17/09/24 06:25:24 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 15.0 (TID 15, localhost, partition 0,PROCESS_LOCAL, 2147 bytes)
17/09/24 06:25:24 INFO storage.BlockManagerInfo: Removed broadcast_21_piece0 on localhost:33950 in memory (size: 3.3 KB, free: 208.6 MB)
17/09/24 06:25:24 INFO spark.ContextCleaner: Cleaned accumulator 12
17/09/24 06:25:24 INFO executor.Executor: Running task 0.0 in stage 15.0 (TID 15)
17/09/24 06:25:24 INFO storage.BlockManagerInfo: Removed broadcast_22_piece0 on localhost:33950 in memory (size: 3.3 KB, free: 208.6 MB)
17/09/24 06:25:24 INFO spark.ContextCleaner: Cleaned accumulator 13
17/09/24 06:25:24 INFO storage.BlockManagerInfo: Removed broadcast_23_piece0 on localhost:33950 in memory (size: 3.3 KB, free: 208.6 MB)
17/09/24 06:25:24 INFO spark.ContextCleaner: Cleaned accumulator 14
17/09/24 06:25:24 INFO rdd.HadoopRDD: Input split: hdfs://localhost:8020/user/training/names.txt:0+136
17/09/24 06:25:24 INFO storage.BlockManagerInfo: Removed broadcast_24_piece0 on localhost:33950 in memory (size: 3.3 KB, free: 208.6 MB)
17/09/24 06:25:24 INFO spark.ContextCleaner: Cleaned accumulator 15
17/09/24 06:25:24 INFO storage.BlockManagerInfo: Removed broadcast_25_piece0 on localhost:33950 in memory (size: 3.3 KB, free: 208.6 MB)
17/09/24 06:25:24 INFO spark.ContextCleaner: Cleaned accumulator 16
17/09/24 06:25:24 INFO python.PythonRunner: Times: total = 78, boot = 49, init = 25, finish = 4
17/09/24 06:25:24 INFO executor.Executor: Finished task 0.0 in stage 15.0 (TID 15). 2203 bytes result sent to driver
17/09/24 06:25:24 INFO scheduler.DAGScheduler: ResultStage 15 (runJob at PythonRDD.scala:393) finished in 0.438 s
17/09/24 06:25:24 INFO scheduler.DAGScheduler: Job 15 finished: runJob at PythonRDD.scala:393, took 1.160085 s
17/09/24 06:25:24 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 15.0 (TID 15) in 429 ms on localhost (1/1)
17/09/24 06:25:24 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 15.0, whose tasks have all completed, from pool 
Out[100]: [[u'Year', u'First Name', u'County', u'Sex', u'Count']]

 

In [101]: rows.take(2)
17/09/24 06:25:29 INFO spark.SparkContext: Starting job: runJob at PythonRDD.scala:393
17/09/24 06:25:29 INFO scheduler.DAGScheduler: Got job 16 (runJob at PythonRDD.scala:393) with 1 output partitions
17/09/24 06:25:29 INFO scheduler.DAGScheduler: Final stage: ResultStage 16 (runJob at PythonRDD.scala:393)
17/09/24 06:25:29 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/09/24 06:25:29 INFO scheduler.DAGScheduler: Missing parents: List()
17/09/24 06:25:29 INFO scheduler.DAGScheduler: Submitting ResultStage 16 (PythonRDD[47] at RDD at PythonRDD.scala:43), which has no missing parents
17/09/24 06:25:29 INFO storage.MemoryStore: Block broadcast_29 stored as values in memory (estimated size 5.2 KB, free 2.2 MB)
17/09/24 06:25:29 INFO storage.MemoryStore: Block broadcast_29_piece0 stored as bytes in memory (estimated size 3.3 KB, free 2.2 MB)
17/09/24 06:25:29 INFO storage.BlockManagerInfo: Added broadcast_29_piece0 in memory on localhost:33950 (size: 3.3 KB, free: 208.6 MB)
17/09/24 06:25:29 INFO spark.SparkContext: Created broadcast 29 from broadcast at DAGScheduler.scala:1006
17/09/24 06:25:29 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 16 (PythonRDD[47] at RDD at PythonRDD.scala:43)
17/09/24 06:25:29 INFO scheduler.TaskSchedulerImpl: Adding task set 16.0 with 1 tasks
17/09/24 06:25:29 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 16.0 (TID 16, localhost, partition 0,PROCESS_LOCAL, 2147 bytes)
17/09/24 06:25:29 INFO executor.Executor: Running task 0.0 in stage 16.0 (TID 16)
17/09/24 06:25:29 INFO rdd.HadoopRDD: Input split: hdfs://localhost:8020/user/training/names.txt:0+136
17/09/24 06:25:29 INFO python.PythonRunner: Times: total = 71, boot = 25, init = 45, finish = 1
17/09/24 06:25:29 INFO executor.Executor: Finished task 0.0 in stage 16.0 (TID 16). 2267 bytes result sent to driver
17/09/24 06:25:30 INFO scheduler.DAGScheduler: ResultStage 16 (runJob at PythonRDD.scala:393) finished in 0.196 s
17/09/24 06:25:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 16.0 (TID 16) in 202 ms on localhost (1/1)
17/09/24 06:25:30 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 16.0, whose tasks have all completed, from pool 
17/09/24 06:25:30 INFO scheduler.DAGScheduler: Job 16 finished: runJob at PythonRDD.scala:393, took 0.408908 s
Out[101]: 
[[u'Year', u'First Name', u'County', u'Sex', u'Count'],
[u'2012', u'DOMINIC', u'CAYUGA', u'M', u'6']]

In [102]:

 

来自:

https://www.supergloo.com/fieldnotes/apache-spark-transformations-python-examples/





本文转自健哥的数据花园博客园博客,原文链接:http://www.cnblogs.com/gaojian/p/7588619.html,如需转载请自行联系原作者

目录
相关文章
WK
|
2月前
|
Python
Python中format_map()方法
在Python中,`format_map()`方法用于使用字典格式化字符串。它接受一个字典作为参数,用字典中的键值对替换字符串中的占位符。此方法适用于从字典动态获取值的场景,尤其在处理大量替换值时更为清晰和方便。
WK
102 36
|
7月前
|
Python
高阶函数如`map`, `filter`, `reduce`和`functools.partial`在Python中用于函数操作
【6月更文挑战第20天】高阶函数如`map`, `filter`, `reduce`和`functools.partial`在Python中用于函数操作。装饰器如`@timer`接收或返回函数,用于扩展功能,如记录执行时间。`timer`装饰器通过包裹函数并计算执行间隙展示时间消耗,如`my_function(2)`执行耗时2秒。
39 3
|
5月前
|
分布式计算 Serverless 数据处理
|
5月前
|
分布式计算 Python
【python笔记】高阶函数map、filter、reduce
【python笔记】高阶函数map、filter、reduce
|
7月前
|
Python
在Python中,`map()`, `filter()` 和 `reduce()` 是函数式编程中的三个核心高阶函数。
【6月更文挑战第24天】Python的`map()`应用函数到序列元素,返回新序列;`filter()`筛选满足条件的元素,生成新序列;`reduce()`累计操作序列元素,返回单一结果。
46 3
|
7月前
|
Python
Python中的Map Function
Python中的Map Function
|
6月前
|
分布式计算 Apache Spark
|
8月前
|
机器学习/深度学习 分布式计算 数据处理
在Python中应用Spark框架
在Python中应用Spark框架
79 1
|
7月前
|
分布式计算 Shell 调度
看看airflow怎样调度python写的spark任务吧
看看airflow怎样调度python写的spark任务吧
101 0
|
8月前
|
SQL 分布式计算 数据可视化
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析