[Spark][Python]spark 从 avro 文件获取 Dataframe 的例子

简介:

[Spark][Python]spark 从 avro 文件获取 Dataframe 的例子

从如下地址获取文件:
https://github.com/databricks/spark-avro/raw/master/src/test/resources/episodes.avro

导入到 hdfs 系统:
hdfs dfs -put episodes.avro

读入:
mydata001=sqlContext.read.format("com.databricks.spark.avro").load("episodes.avro")

交互式运行结果:

In [7]: mydata001=sqlContext.read.format("com.databricks.spark.avro").load("episodes.avro")
17/10/03 07:00:47 INFO avro.AvroRelation: Listing hdfs://localhost:8020/user/training/episodes.avro on driver

In [8]: type(mydata001)
Out[8]: pyspark.sql.dataframe.DataFrame

In [9]: mydata001.count()
17/10/03 07:01:05 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 65.5 KB, free 65.5 KB)
17/10/03 07:01:05 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 21.4 KB, free 86.9 KB)
17/10/03 07:01:05 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:40075 (size: 21.4 KB, free: 208.8 MB)
17/10/03 07:01:05 INFO spark.SparkContext: Created broadcast 3 from count at NativeMethodAccessorImpl.java:-2
17/10/03 07:01:05 INFO storage.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 230.4 KB, free 317.3 KB)
17/10/03 07:01:06 INFO storage.MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 21.5 KB, free 338.8 KB)
17/10/03 07:01:06 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:40075 (size: 21.5 KB, free: 208.8 MB)
17/10/03 07:01:06 INFO spark.SparkContext: Created broadcast 4 from hadoopFile at AvroRelation.scala:121
17/10/03 07:01:06 INFO mapred.FileInputFormat: Total input paths to process : 1
17/10/03 07:01:07 INFO spark.SparkContext: Starting job: count at NativeMethodAccessorImpl.java:-2
17/10/03 07:01:07 INFO scheduler.DAGScheduler: Registering RDD 16 (count at NativeMethodAccessorImpl.java:-2)
17/10/03 07:01:07 INFO scheduler.DAGScheduler: Got job 1 (count at NativeMethodAccessorImpl.java:-2) with 1 output partitions
17/10/03 07:01:07 INFO scheduler.DAGScheduler: Final stage: ResultStage 3 (count at NativeMethodAccessorImpl.java:-2)
17/10/03 07:01:07 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 2)
17/10/03 07:01:07 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 2)
17/10/03 07:01:07 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 2 (MapPartitionsRDD[16] at count at NativeMethodAccessorImpl.java:-2), which has no missing parents
17/10/03 07:01:07 INFO storage.MemoryStore: Block broadcast_5 stored as values in memory (estimated size 11.5 KB, free 350.3 KB)
17/10/03 07:01:07 INFO storage.MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 5.7 KB, free 356.0 KB)
17/10/03 07:01:07 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:40075 (size: 5.7 KB, free: 208.8 MB)
17/10/03 07:01:07 INFO spark.SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1006
17/10/03 07:01:07 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 2 (MapPartitionsRDD[16] at count at NativeMethodAccessorImpl.java:-2)
17/10/03 07:01:07 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
17/10/03 07:01:07 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, partition 0,PROCESS_LOCAL, 2249 bytes)
17/10/03 07:01:07 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 2)
17/10/03 07:01:07 INFO rdd.HadoopRDD: Input split: hdfs://localhost:8020/user/training/episodes.avro:0+597
17/10/03 07:01:08 INFO executor.Executor: Finished task 0.0 in stage 2.0 (TID 2). 2484 bytes result sent to driver
17/10/03 07:01:08 INFO scheduler.DAGScheduler: ShuffleMapStage 2 (count at NativeMethodAccessorImpl.java:-2) finished in 0.691 s
17/10/03 07:01:08 INFO scheduler.DAGScheduler: looking for newly runnable stages
17/10/03 07:01:08 INFO scheduler.DAGScheduler: running: Set()
17/10/03 07:01:08 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 3)
17/10/03 07:01:08 INFO scheduler.DAGScheduler: failed: Set()
17/10/03 07:01:08 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 693 ms on localhost (1/1)
17/10/03 07:01:08 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
17/10/03 07:01:08 INFO scheduler.DAGScheduler: Submitting ResultStage 3 (MapPartitionsRDD[19] at count at NativeMethodAccessorImpl.java:-2), which has no missing parents
17/10/03 07:01:08 INFO storage.MemoryStore: Block broadcast_6 stored as values in memory (estimated size 12.6 KB, free 368.5 KB)
17/10/03 07:01:08 INFO storage.MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 6.1 KB, free 374.7 KB)
17/10/03 07:01:08 INFO storage.BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost:40075 (size: 6.1 KB, free: 208.8 MB)
17/10/03 07:01:08 INFO spark.SparkContext: Created broadcast 6 from broadcast at DAGScheduler.scala:1006
17/10/03 07:01:08 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 3 (MapPartitionsRDD[19] at count at NativeMethodAccessorImpl.java:-2)
17/10/03 07:01:08 INFO scheduler.TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
17/10/03 07:01:08 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, localhost, partition 0,NODE_LOCAL, 1999 bytes)
17/10/03 07:01:08 INFO executor.Executor: Running task 0.0 in stage 3.0 (TID 3)
17/10/03 07:01:08 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
17/10/03 07:01:08 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/10/03 07:01:08 INFO executor.Executor: Finished task 0.0 in stage 3.0 (TID 3). 1666 bytes result sent to driver
17/10/03 07:01:08 INFO scheduler.DAGScheduler: ResultStage 3 (count at NativeMethodAccessorImpl.java:-2) finished in 0.344 s
17/10/03 07:01:08 INFO scheduler.DAGScheduler: Job 1 finished: count at NativeMethodAccessorImpl.java:-2, took 1.480495 s
17/10/03 07:01:08 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 345 ms on localhost (1/1)
17/10/03 07:01:08 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 
Out[9]: 8

In [10]: mydata001.take(1)
17/10/03 07:01:18 INFO storage.MemoryStore: Block broadcast_7 stored as values in memory (estimated size 230.1 KB, free 604.8 KB)
17/10/03 07:01:18 INFO storage.MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 21.4 KB, free 626.2 KB)
17/10/03 07:01:18 INFO storage.BlockManagerInfo: Added broadcast_7_piece0 in memory on localhost:40075 (size: 21.4 KB, free: 208.7 MB)
17/10/03 07:01:18 INFO spark.SparkContext: Created broadcast 7 from take at <ipython-input-10-35862abbc114>:1
17/10/03 07:01:18 INFO storage.MemoryStore: Block broadcast_8 stored as values in memory (estimated size 230.5 KB, free 856.7 KB)
17/10/03 07:01:18 INFO storage.MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 21.5 KB, free 878.2 KB)
17/10/03 07:01:18 INFO storage.BlockManagerInfo: Added broadcast_8_piece0 in memory on localhost:40075 (size: 21.5 KB, free: 208.7 MB)
17/10/03 07:01:18 INFO spark.SparkContext: Created broadcast 8 from take at <ipython-input-10-35862abbc114>:1
17/10/03 07:01:18 INFO mapred.FileInputFormat: Total input paths to process : 1
17/10/03 07:01:18 INFO spark.SparkContext: Starting job: take at <ipython-input-10-35862abbc114>:1
17/10/03 07:01:18 INFO scheduler.DAGScheduler: Got job 2 (take at <ipython-input-10-35862abbc114>:1) with 1 output partitions
17/10/03 07:01:18 INFO scheduler.DAGScheduler: Final stage: ResultStage 4 (take at <ipython-input-10-35862abbc114>:1)
17/10/03 07:01:18 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/10/03 07:01:18 INFO scheduler.DAGScheduler: Missing parents: List()
17/10/03 07:01:18 INFO scheduler.DAGScheduler: Submitting ResultStage 4 (MapPartitionsRDD[27] at take at <ipython-input-10-35862abbc114>:1), which has no missing parents
17/10/03 07:01:19 INFO storage.MemoryStore: Block broadcast_9 stored as values in memory (estimated size 5.6 KB, free 883.8 KB)
17/10/03 07:01:19 INFO storage.MemoryStore: Block broadcast_9_piece0 stored as bytes in memory (estimated size 3.0 KB, free 886.9 KB)
17/10/03 07:01:19 INFO storage.BlockManagerInfo: Added broadcast_9_piece0 in memory on localhost:40075 (size: 3.0 KB, free: 208.7 MB)
17/10/03 07:01:19 INFO spark.SparkContext: Created broadcast 9 from broadcast at DAGScheduler.scala:1006
17/10/03 07:01:19 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 4 (MapPartitionsRDD[27] at take at <ipython-input-10-35862abbc114>:1)
17/10/03 07:01:19 INFO scheduler.TaskSchedulerImpl: Adding task set 4.0 with 1 tasks
17/10/03 07:01:19 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 4, localhost, partition 0,PROCESS_LOCAL, 2260 bytes)
17/10/03 07:01:19 INFO executor.Executor: Running task 0.0 in stage 4.0 (TID 4)
17/10/03 07:01:19 INFO rdd.HadoopRDD: Input split: hdfs://localhost:8020/user/training/episodes.avro:0+597
17/10/03 07:01:19 INFO codegen.GenerateUnsafeProjection: Code generated in 124.624053 ms
17/10/03 07:01:19 INFO executor.Executor: Finished task 0.0 in stage 4.0 (TID 4). 2237 bytes result sent to driver
17/10/03 07:01:19 INFO scheduler.DAGScheduler: ResultStage 4 (take at <ipython-input-10-35862abbc114>:1) finished in 0.415 s
17/10/03 07:01:19 INFO scheduler.DAGScheduler: Job 2 finished: take at <ipython-input-10-35862abbc114>:1, took 0.565858 s
17/10/03 07:01:19 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 4.0 (TID 4) in 415 ms on localhost (1/1)
17/10/03 07:01:19 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool 
Out[10]: [Row(title=u'The Eleventh Hour', air_date=u'3 April 2010', doctor=11)]

In [11]:









本文转自健哥的数据花园博客园博客,原文链接:http://www.cnblogs.com/gaojian/p/7624631.html,如需转载请自行联系原作者

目录
相关文章
|
3月前
|
JSON 分布式计算 大数据
大数据-85 Spark 集群 RDD创建 RDD-Action Key-Value RDD详解 RDD的文件输入输出
大数据-85 Spark 集群 RDD创建 RDD-Action Key-Value RDD详解 RDD的文件输入输出
44 1
|
3月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
40 0
|
5月前
【Pandas+Python】初始化一个全零的Dataframe
初始化一个100*3的0矩阵,变为Dataframe类型,并为每列赋值一个属性。
67 2
|
6月前
|
数据采集 机器学习/深度学习 数据可视化
了解数据科学面试中的Python数据分析重点,包括Pandas(DataFrame)、NumPy(ndarray)和Matplotlib(图表绘制)。
【7月更文挑战第5天】了解数据科学面试中的Python数据分析重点,包括Pandas(DataFrame)、NumPy(ndarray)和Matplotlib(图表绘制)。数据预处理涉及缺失值(dropna(), fillna())和异常值处理。使用describe()进行统计分析,通过Matplotlib和Seaborn绘图。回归和分类分析用到Scikit-learn,如LinearRegression和RandomForestClassifier。
113 3
|
7月前
|
Python
在Python的pandas库中,向DataFrame添加新列简单易行
【6月更文挑战第15天】在Python的pandas库中,向DataFrame添加新列简单易行。可通过直接赋值、使用Series或apply方法实现。例如,直接赋值可将列表或Series对象分配给新列;使用Series可基于现有列计算生成新列;apply方法则允许应用自定义函数到每一行或列来创建新列。
511 8
|
6月前
|
分布式计算 Apache Spark
|
6月前
|
存储 数据可视化 数据处理
`geopandas`是一个开源项目,它为Python提供了地理空间数据处理的能力。它基于`pandas`库,并扩展了其对地理空间数据(如点、线、多边形等)的支持。`GeoDataFrame`是`geopandas`中的核心数据结构,它类似于`pandas`的`DataFrame`,但包含了一个额外的地理列(通常是`geometry`列),用于存储地理空间数据。
`geopandas`是一个开源项目,它为Python提供了地理空间数据处理的能力。它基于`pandas`库,并扩展了其对地理空间数据(如点、线、多边形等)的支持。`GeoDataFrame`是`geopandas`中的核心数据结构,它类似于`pandas`的`DataFrame`,但包含了一个额外的地理列(通常是`geometry`列),用于存储地理空间数据。
|
6月前
|
Python
【Python】已解决:(pandas读取DataFrame列报错)raise KeyError(key) from err KeyError: (‘name‘, ‘age‘)
【Python】已解决:(pandas读取DataFrame列报错)raise KeyError(key) from err KeyError: (‘name‘, ‘age‘)
475 0
|
6月前
|
数据采集 数据挖掘 大数据
Pandas是Python数据分析的核心库,基于NumPy,提供DataFrame结构处理结构化数据
【7月更文挑战第5天】Pandas是Python数据分析的核心库,基于NumPy,提供DataFrame结构处理结构化数据。它支持缺失值处理(dropna()、fillna())、异常值检测(Z-Score、IQR法)和重复值管理(duplicated()、drop_duplicates())。此外,数据转换包括类型转换(astype())、数据标准化(Min-Max、Z-Score)以及类别编码(get_dummies())。这些功能使得Pandas成为大数据预处理的强大工具。
69 0
|
7月前
|
分布式计算 Shell 调度
看看airflow怎样调度python写的spark任务吧
看看airflow怎样调度python写的spark任务吧
100 0