Spark代码调优(一)

简介: 环境极其恶劣情况下:import org.apache.spark.SparkContextimport org.apache.spark.rdd.RDDimport org.apache.spark.

环境极其恶劣情况下:

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, Row, SQLContext}

import org.apache.spark.sql.hive.HiveContext

 

val sqlContext = new HiveContext(sc)

val sql = sqlContext.sql("select * from ysylbs9 ").collect

 

中间发生报错:

cluster.YarnScheduler: Lost executor 2 on zdbdsps025.iccc.com: Container marked as failed: container_e55_1478671093534_0624_01_000003 on host: zdbdsps025.iccc.com. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143

Container exited with a non-zero exit code 143

Killed by external signal

 

是因为yarn管理的某个节点掉了,所以spark将任务移至其他节点执行:

 

16/11/15 14:24:28 WARN scheduler.TaskSetManager: Lost task 224.0 in stage 0.0 (TID 224, zdbdsps025.iccc.com): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_e55_1478671093534_0624_01_000003 on host: zdbdsps025.iccc.com. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143

Container exited with a non-zero exit code 143

Killed by external signal

 

16/11/15 14:24:28 INFO cluster.YarnClientSchedulerBackend: Asked to remove non-existent executor 2

中间又报错:

16/11/15 14:30:43 WARN spark.HeartbeatReceiver: Removing executor 6 with no recent heartbeats: 133569 ms exceeds timeout 120000 ms

16/11/15 14:30:43 ERROR cluster.YarnScheduler: Lost executor 6 on zdbdsps027.iccc.com: Executor heartbeat timed out after 133569 ms

每个task 都超时了

16/11/15 14:30:43 WARN scheduler.TaskSetManager: Lost task 329.0 in stage 0.0 (TID 382, zdbdsps027.iccc.com): ExecutorLostFailure (executor 6 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 133569 ms

 

 DAGScheduler发现Executor 6 也挂了,于是将executor移除

16/11/15 14:30:43 INFO scheduler.DAGScheduler: Executor lost: 6 (epoch 1)

16/11/15 14:30:43 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 6 from BlockManagerMaster.

16/11/15 14:30:43 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(6, zdbdsps027.iccc.com, 38641)

16/11/15 14:30:43 INFO storage.BlockManagerMaster: Removed 6 successfully in removeExecutor

16/11/15 14:30:43 INFO cluster.YarnClientSchedulerBackend: Requesting to kill executor(s) 6

 

 

 

然后移至其他节点,随后又发现RPC出现问题

16/11/15 14:32:58 ERROR server.TransportRequestHandler: Error sending result RpcResponse{requestId=4735002570883429008, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=47]}} to zdbdsps027.iccc.com/172.19.189.53:51057; closing connection

 

java.io.IOException: 断开的管道

        at sun.nio.ch.FileDispatcherImpl.write0(Native Method)

        at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)

        at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)

 

Spark是移动计算而不是移动数据的,所以由于其他节点挂了,所以任务在数据不在的节点,再进行拉取,由于极端情况下,环境恶劣,通过namenode知道数据所在节点位置,spark依旧会去有问题的节点fetch数据,所以还会报错 再次kill掉,由于hadoop是备份三份数据的,spark通过会去其他节点拉取数据。随之一直发现只在一个节点完成task. 最终问题查找,yarn的节点挂了,

 

 

 

 

 

下面是部分代码调试:

 

import org.slf4j.{Logger, LoggerFactory}

import java.util.{Calendar, Date, GregorianCalendar}

 

import algorithm.DistanceCalculator

import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}

import org.apache.hadoop.hbase.client.{HTable, Scan}

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.hadoop.hbase.protobuf.ProtobufUtil

import org.apache.hadoop.hbase.util.{Base64, Bytes}

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.DataFrame

import org.apache.spark.sql.hive.HiveContext

import org.apache.spark.{SparkConf, SparkContext}

import org.slf4j.{Logger, LoggerFactory}

 

import scala.collection.mutable.ArrayBuffer

 

 

  case class LBS_STATIC_TABLE(LS_certifier_no: String,LS_location: String,LS_phone_no: String,time: String)

该case class 作为最终注册转换为hive表

  val logger: Logger = LoggerFactory.getLogger(LbsCalculator.getClass)

 

//从hbase获取数据转换为RDD

def hbaseInit() = {

    val tableName = "EVENT_LOG_LBS_HIS"

    val conf = HBaseConfiguration.create()

   // conf.addResource("hbase-site.xml ")

    val HTable = new HTable(conf, tableName)

    HTable

  }

 

 

def tableInitByTime(sc : SparkContext,tablename:String,columns :String,fromdate: Date,todate:Date):RDD[(ImmutableBytesWritable,Result)] = {

val configuration = HBaseConfiguration.create()

//这里上生产注释掉,调试时可打开,因为提交yarn会自动加载yarn管理的hbase配置文件

    configuration.addResource("hbase-site.xml")

    configuration.set(TableInputFormat.INPUT_TABLE, tablename)

val scan = new Scan

//这里按timestrap进行过滤,比用scan过滤器要高效,因为用hbase的过滤器其实也是先scan全表再进行过滤的,效率很低。

    scan.setTimeRange(fromdate.getTime,todate.getTime)

    val column = columns.split(",")

    for(columnName <- column){

      scan.addColumn("f1".getBytes, columnName.getBytes)

    }

    val hbaseRDD = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])

    System.out.println(hbaseRDD.count())

    hbaseRDD

  }

 

//这里写了一种过滤器方法,后续将所有hbase过滤器方法写成公共类

  val filter: Filter = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, new SubstringComparator("20160830"))

  scan.setFilter(filter)

 

 

   

 

//这里要注意,拿到的数据在1个partition中,在拿到后需要进行repartition,因为如果一个task能够承载比如1G的数据,那么将只有1个patition,所以要重新repatition加大后续计算的并行度。这里repatition的个数需要根据具体多少数据量,进行调整,后续测试完毕写成公共方法。通过Rdd map 转换为(身份证号,经纬度坐标,手机号码,时间)这里就将获取的数据repatition了

    val transRDD = hbRDD.repartition(200).map{ p => {

      val id =Bytes.toString(p._2.getValue("f1".getBytes, "LS_certifier_no".getBytes))

      val loc = Bytes.toString(p._2.getValue("f1".getBytes, "LS_location".getBytes))

      val phone = Bytes.toString(p._2.getValue("f1".getBytes, "LS_phone_no".getBytes))

      val rowkey = Bytes.toString(p._2.getValue("f1".getBytes, "rowkey".getBytes))

      val hour = rowkey.split("-")(2).substring(8,10)

      (id,loc,phone,hour)

    }

}

 

//这里进行了字段过滤,因为很多时候数据具有不完整性,会导致后续计算错误

val calculateRDD = transRDD.repartition(200).filter(_._1 != null).filter(_._2 != null).filter(_._3 != null).filter(_._4 !=null)

需要注意的是reduceByKey并不会在监控页面单独为其创建监控stage,所以你会发现与之前的map(filer)的stage中,同时监控中会发现已经进行了repartition

.reduceByKey(_ + _)

//进行hiveContext对象的创建,为后续进行表操作做准备。

val hiveSqlContext = HiveTableHelper.hiveTableInit(sc)       

def hiveTableInit(sc:SparkContext): HiveContext ={

    val sqlContext = new HiveContext(sc)

    sqlContext

  }

//传入之前数据分析过的结果,生成表

val hiveRDD = hRDD.map(p => LBS_STATIC_TABLE(p._1,p._2,p._3,p._4,p._5)

//创建DataFrame并以parquet格式保存为表。这里需要注意的是,尽量少的直接用hiveSqlContext.sql()直接输入sql的形式,因为这样还会走spark自己的解析器。需要调用RDD的DataFrame API会加快数据处理速度。后续整理所有算子。

val hiveRDDSchema = hiveSqlContext.createDataFrame(hiveRDD)

      val aaa = hiveRDDSchema.show(10)

       hiveSqlContext.sql("drop table if exists " + hivetablename)

       hiveRDDSchema.registerTempTable("LBS_STATIC_TABLE")

       hiveRDDSchema.write.format("parquet").saveAsTable(hivetablename)

 

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
8月前
|
分布式计算 API Spark
Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码
Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码
133 11
|
2月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
3月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
50 4
|
3月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
61 0
|
8月前
|
分布式计算 Java 数据库连接
回答粉丝疑问:Spark为什么调优需要降低过多小任务,降低单条记录的资源开销?
回答粉丝疑问:Spark为什么调优需要降低过多小任务,降低单条记录的资源开销?
68 1
|
8月前
|
分布式计算 大数据 Linux
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
265 0
|
8月前
|
分布式计算 资源调度 大数据
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day30】——Spark数据调优(文末附完整文档)
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day30】——Spark数据调优(文末附完整文档)
114 0
|
SQL 分布式计算 Java
Spark入门以及wordcount案例代码
Spark入门以及wordcount案例代码
|
SQL 数据采集 存储
工作经验分享:Spark调优【优化后性能提升1200%】
工作经验分享:Spark调优【优化后性能提升1200%】
1056 1
工作经验分享:Spark调优【优化后性能提升1200%】
|
SQL 存储 缓存
工作常用之Spark调优【二】资源调优
使用 kryo 序列化并且使用 rdd 序列化缓存级别。使用 kryo 序列化需要修改 spark 的序列化模式,并且需要进程注册类操作。
203 1
工作常用之Spark调优【二】资源调优