开发者社区> 问答> 正文

spark streaming job运行卡住

问题

虚拟机中运行sparkStreaming job一段时间后,偶尔会卡住, 过一段时间就恢复, 需要定位原因解决

背景

1.sparkStreaming消费kafka数据, 开启反压机制, 将接收每一条kafka消息(json串)转换为对象后, 再使用Phoenix存入hbase;
2.有三台虚拟机, 三个worker节点,以standalone模式运行application
3.使用3个executor, 每个executor 1个core, spark默认并行度6, kafka用Debezium读取数据库mysqlbinlog, 每个表对应一个topic, 每个topic默认一个partition

  1. 使用spark 2.2.0 ,hbase1.4.7, Phoenix4.14

具体现象:

1.job运行一段时间后卡住,而后恢复(正常job运行时间2s以内)(job121)
job

2.stage map操作运行时间过长(Stage 242)
stage

3.task2(task ID1453)运行时间过长(40s)
task

代码节选:

    val spark = SparkSession
      .builder()
      .appName("ProcessMysqlData")
      .config("spark.streaming.stopGracefullyOnShutdown", "true")
      .config("spark.dynamicAllocation.enabled", "false")
      .config("spark.streaming.kafka.maxRatePerPartition", 150)
      .config("spark.streaming.backpressure.enabled", "true")
      .config("spark.streaming.blockInterval", "3s")
      .config("spark.defalut.parallelism", "6")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()

    // config kafka
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> CommonUtil.getKafkaServers,
      "key.deserializer" -> classOf[JsonDeserializer],
      "value.deserializer" -> classOf[JsonDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
stream.foreachRDD { rdd =>
  if (!rdd.isEmpty) {
        val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        val tableValue = rdd.filter(r => r.value != null)
        //stage显示卡住的map操作
          .map(record => convertTableJsonStr(record.value)).reduceByKey(_ ++ _)
       tableValue.foreachPartition {
          part =>
            val con: Connection = CommonUtil.getPhoenixConnection
            part.foreach {
              info =>
                info._1 match { case "" => save(info, con) ...}
            }
            MysqlUtil.colseConnection(con)
        }
  }
 rdd.count
}
  def convertTableJsonStr(json: ObjectNode): (String, ArrayBuffer[String]) = {
    val jsonObject = JSON.parseObject(json.toString);
    val data = jsonObject.getString("payload");
    val schema = JSON.parseObject(jsonObject.getString("schema"));
    val topicName = schema.getString("name").split("\\.")
    val tableName = topicName(topicName.size - 2)
    (tableName, ArrayBuffer(data))
  }

日志:

gc

分别对master, worker和executor打印gc日志
抽取出分析,最长gc时间只有0.5

以下为卡住的executor1 / 192.168.0.107对应时刻gc日志,gc频率高,但时间短

2018-12-10T01:13:00.700+0800: 698.800: [GC (Allocation Failure) [PSYoungGen: 189829K->12080K(199936K)] 356678K->178932K(412928K), 0.0252717 secs] [Times: user=0.03 sys=0.00, real=0.03 secs] 
Heap after GC invocations=133 (full 7):
 PSYoungGen      total 199936K, used 12080K [0x92cc0000, 0xa0200000, 0xa8200000)
  eden space 182016K, 0% used [0x92cc0000,0x92cc0000,0x9de80000)
  from space 17920K, 67% used [0x9f080000,0x9fc4c010,0xa0200000)
  to   space 18176K, 0% used [0x9de80000,0x9de80000,0x9f040000)
 ParOldGen       total 212992K, used 166852K [0x68200000, 0x75200000, 0x92cc0000)
  object space 212992K, 78% used [0x68200000,0x724f11b8,0x75200000)
 Metaspace       used 43119K, capacity 43442K, committed 43544K, reserved 44336K
}
{Heap before GC invocations=134 (full 7):
 PSYoungGen      total 199936K, used 194096K [0x92cc0000, 0xa0200000, 0xa8200000)
  eden space 182016K, 100% used [0x92cc0000,0x9de80000,0x9de80000)
  from space 17920K, 67% used [0x9f080000,0x9fc4c010,0xa0200000)
  to   space 18176K, 0% used [0x9de80000,0x9de80000,0x9f040000)
 ParOldGen       total 212992K, used 166852K [0x68200000, 0x75200000, 0x92cc0000)
  object space 212992K, 78% used [0x68200000,0x724f11b8,0x75200000)
 Metaspace       used 43120K, capacity 43442K, committed 43544K, reserved 44336K
2018-12-10T01:13:13.070+0800: 711.170: [GC (Allocation Failure) [PSYoungGen: 194096K->12272K(200192K)] 360948K->179128K(413184K), 0.0148226 secs] [Times: user=0.03 sys=0.00, real=0.02 secs] 
Heap after GC invocations=134 (full 7):
 PSYoungGen      total 200192K, used 12272K [0x92cc0000, 0xa0700000, 0xa8200000)
  eden space 182016K, 0% used [0x92cc0000,0x92cc0000,0x9de80000)
  from space 18176K, 67% used [0x9de80000,0x9ea7c010,0x9f040000)
  to   space 18176K, 0% used [0x9f540000,0x9f540000,0xa0700000)
 ParOldGen       total 212992K, used 166856K [0x68200000, 0x75200000, 0x92cc0000)
  object space 212992K, 78% used [0x68200000,0x724f21b8,0x75200000)
 Metaspace       used 43120K, capacity 43442K, committed 43544K, reserved 44336K
}
{Heap before GC invocations=135 (full 7):
 PSYoungGen      total 200192K, used 194288K [0x92cc0000, 0xa0700000, 0xa8200000)
  eden space 182016K, 100% used [0x92cc0000,0x9de80000,0x9de80000)
  from space 18176K, 67% used [0x9de80000,0x9ea7c010,0x9f040000)
  to   space 18176K, 0% used [0x9f540000,0x9f540000,0xa0700000)
 ParOldGen       total 212992K, used 166856K [0x68200000, 0x75200000, 0x92cc0000)
  object space 212992K, 78% used [0x68200000,0x724f21b8,0x75200000)
 Metaspace       used 43129K, capacity 43442K, committed 43544K, reserved 44336K

executor

executor log (TID 1453)

18/12/10 01:13:24 INFO Executor: Running task 5.0 in stage 241.0 (TID 1450)
18/12/10 01:13:24 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 6 blocks
18/12/10 01:13:24 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
18/12/10 01:13:24 INFO ProcessMysqlData: insert  size:570
18/12/10 01:13:25 INFO Executor: Finished task 5.0 in stage 241.0 (TID 1450). 1053 bytes result sent to driver
18/12/10 01:13:26 INFO BlockManager: Removing RDD 476
18/12/10 01:13:30 INFO CoarseGrainedExecutorBackend: Got assigned task 1453

 //--------------------------------start---------------------------------------------
18/12/10 01:13:30 INFO Executor: Running task 2.0 in stage 242.0 (TID 1453) 
18/12/10 01:13:30 INFO TorrentBroadcast: Started reading broadcast variable 242
18/12/10 01:13:30 INFO MemoryStore: Block broadcast_242_piece0 stored as bytes in memory (estimated size 2.9 KB, free 366.1 MB)
18/12/10 01:13:30 INFO TorrentBroadcast: Reading broadcast variable 242 took 37 ms
18/12/10 01:13:30 INFO MemoryStore: Block broadcast_242 stored as values in memory (estimated size 5.0 KB, free 366.1 MB)
18/12/10 01:13:30 INFO KafkaRDD: Computing topic mysql-clusterd.bigdata.movie_base_info, partition 0 offsets 28752 -> 28848
18/12/10 01:14:10 INFO Executor: Finished task 2.0 in stage 242.0 (TID 1453). 1746 bytes result sent to driver 
//--------------------------------end---------------------------------------------

18/12/10 01:14:10 INFO CoarseGrainedExecutorBackend: Got assigned task 1460
18/12/10 01:14:10 INFO Executor: Running task 1.0 in stage 243.0 (TID 1460)
18/12/10 01:14:10 INFO MapOutputTrackerWorker: Updating epoch to 122 and clearing cache
18/12/10 01:14:10 INFO TorrentBroadcast: Started reading broadcast variable 243
18/12/10 01:14:10 INFO MemoryStore: Block broadcast_243_piece0 stored as bytes in memory (estimated size 1957.0 B, free 366.1 MB)
18/12/10 01:14:10 INFO TorrentBroadcast: Reading broadcast variable 243 took 34 ms

driver log

18/12/10 01:13:30 INFO dag-scheduler-event-loop DAGScheduler 54: Submitting 6 missing tasks from ShuffleMapStage 242 (MapPartitionsRDD[486] at map at ProcessMysqlData.scala:109) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5))
18/12/10 01:13:30 INFO dag-scheduler-event-loop TaskSchedulerImpl 54: Adding task set 242.0 with 6 tasks
18/12/10 01:13:30 INFO dispatcher-event-loop-2 TaskSetManager 54: Starting task 0.0 in stage 242.0 (TID 1452, 192.168.0.108, executor 2, partition 0, PROCESS_LOCAL, 4721 bytes)
18/12/10 01:13:30 INFO dispatcher-event-loop-2 TaskSetManager 54: Starting task 2.0 in stage 242.0 (TID 1453, 192.168.0.107, executor 1, partition 2, PROCESS_LOCAL, 4729 bytes)
18/12/10 01:13:30 INFO dispatcher-event-loop-2 BlockManagerInfo 54: Added broadcast_242_piece0 in memory on 192.168.0.108:34106 (size: 2.9 KB, free: 366.1 MB)
18/12/10 01:13:30 INFO dispatcher-event-loop-2 TaskSetManager 54: Starting task 1.0 in stage 242.0 (TID 1454, 192.168.0.108, executor 2, partition 1, PROCESS_LOCAL, 4726 bytes)
18/12/10 01:13:30 INFO task-result-getter-3 TaskSetManager 54: Finished task 0.0 in stage 242.0 (TID 1452) in 24 ms on 192.168.0.108 (executor 2) (1/6)
18/12/10 01:13:30 INFO dispatcher-event-loop-0 BlockManagerInfo 54: Added broadcast_242_piece0 in memory on 192.168.0.107:56373 (size: 2.9 KB, free: 366.1 MB)
18/12/10 01:13:30 INFO task-result-getter-0 TaskSetManager 54: Finished task 1.0 in stage 242.0 (TID 1454) in 30 ms on 192.168.0.108 (executor 2) (2/6)
18/12/10 01:13:30 INFO dispatcher-event-loop-2 TaskSetManager 54: Starting task 3.0 in stage 242.0 (TID 1455, 192.168.0.108, executor 2, partition 3, PROCESS_LOCAL, 4726 bytes)
18/12/10 01:13:30 INFO dispatcher-event-loop-0 TaskSetManager 54: Starting task 5.0 in stage 242.0 (TID 1456, 192.168.0.108, executor 2, partition 5, PROCESS_LOCAL, 4725 bytes)
18/12/10 01:13:30 INFO task-result-getter-2 TaskSetManager 54: Finished task 3.0 in stage 242.0 (TID 1455) in 75 ms on 192.168.0.108 (executor 2) (3/6)
18/12/10 01:13:30 INFO task-result-getter-1 TaskSetManager 54: Finished task 5.0 in stage 242.0 (TID 1456) in 55 ms on 192.168.0.108 (executor 2) (4/6)
18/12/10 01:13:36 INFO JobGenerator JobScheduler 54: Added jobs for time 1544375616000 ms
18/12/10 01:13:36 INFO dispatcher-event-loop-0 TaskSetManager 54: Starting task 4.0 in stage 242.0 (TID 1457, 192.168.0.101, executor 0, partition 4, ANY, 4734 bytes)
18/12/10 01:13:36 INFO dispatcher-event-loop-0 BlockManagerInfo 54: Added broadcast_242_piece0 in memory on 192.168.0.101:43782 (size: 2.9 KB, free: 366.1 MB)
18/12/10 01:13:36 INFO task-result-getter-3 TaskSetManager 54: Finished task 4.0 in stage 242.0 (TID 1457) in 194 ms on 192.168.0.101 (executor 0) (5/6)
18/12/10 01:13:42 INFO JobGenerator JobScheduler 54: Added jobs for time 1544375622000 ms
18/12/10 01:13:48 INFO JobGenerator JobScheduler 54: Added jobs for time 1544375628000 ms
18/12/10 01:13:54 INFO JobGenerator JobScheduler 54: Added jobs for time 1544375634000 ms
18/12/10 01:14:00 INFO JobGenerator JobScheduler 54: Added jobs for time 1544375640000 ms
18/12/10 01:14:06 INFO JobGenerator JobScheduler 54: Added jobs for time 1544375646000 ms

//-------------------------stage 242.0 (TID 1453) 40142 ms----------------------------------
18/12/10 01:14:10 INFO task-result-getter-0 TaskSetManager 54: Finished task 2.0 in stage 242.0 (TID 1453) in 40142 ms on 192.168.0.107 (executor 1) (6/6)
//-----------------------------stage 242.0 (TID 1453)----------------------------

18/12/10 01:14:10 INFO task-result-getter-0 TaskSchedulerImpl 54: Removed TaskSet 242.0, whose tasks have all completed, from pool 
18/12/10 01:14:10 INFO dag-scheduler-event-loop DAGScheduler 54: ShuffleMapStage 242 (map at ProcessMysqlData.scala:109) finished in 40.143 s
18/12/10 01:14:10 INFO dag-scheduler-event-loop DAGScheduler 54: looking for newly runnable stages
18/12/10 01:14:10 INFO dag-scheduler-event-loop DAGScheduler 54: running: Set()
18/12/10 01:14:10 INFO dag-scheduler-event-loop DAGScheduler 54: waiting: Set(ResultStage 243)
18/12/10 01:14:10 INFO dag-scheduler-event-loop DAGScheduler 54: failed: Set()
18/12/10 01:14:10 INFO dag-scheduler-event-loop DAGScheduler 54: Submitting ResultStage 243 

system

虚拟机内存不足
2018_12_11_01_40_21_

重申问题

需要定位并分析解决部分 job task pending的问题, 或者其他优化方法

谢谢!

add:
发现是kafka版本问题, 修改kafka版本即可, 参考SPARK-20780

展开
收起
jfengye 2018-12-11 01:52:01 7606 0
1 条回答
写回答
取消 提交回答
  • 是kafka版本问题, 修改kafka版本即可, 参考SPARK-20780

    2019-07-17 23:19:43
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载