spark streaming 消费kafka数据导入hbase卡住

# 问题 
虚拟机中运行sparkStreaming job一段时间后,偶尔会卡住, 过一段时间就恢复, 需要定位原因解决  
 
# 背景
1.sparkStreaming消费kafka数据, 开启反压机制, 将接收每一条kafka消息(json串)转换为对象后, 再使用Phoenix存入hbase; 
2.有三台虚拟机, 三个worker节点,以standalone模式运行application
3.使用3个executor, 每个executor 1个core, spark默认并行度6, kafka用Debezium读取数据库mysqlbinlog, 每个表对应一个topic, 每个topic默认一个partition
4. 使用spark 2.2.0 ,hbase1.4.7, Phoenix4.14

# 具体现象:
1.job运行一段时间后卡住,而后恢复(正常job运行时间2s以内)(job121)
job.png


2.stage map操作运行时间过长(Stage 242)

stage.png


3.task2(task ID1453)运行时间过长(40s)
task.png


# 代码节选:
 val spark = SparkSession
.builder()
.appName("ProcessMysqlData")
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config("spark.dynamicAllocation.enabled", "false")
.config("spark.streaming.kafka.maxRatePerPartition", 150)
.config("spark.streaming.backpressure.enabled", "true")
.config("spark.streaming.blockInterval", "3s")
.config("spark.defalut.parallelism", "6")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()

// config kafka
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> CommonUtil.getKafkaServers,
"key.deserializer" -> classOf[JsonDeserializer],
"value.deserializer" -> classOf[JsonDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
 
stream.foreachRDD { rdd =>
if (!rdd.isEmpty) {
val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val tableValue = rdd.filter(r => r.value != null)
//stage显示卡住的map操作
.map(record => convertTableJsonStr(record.value)).reduceByKey(_ ++ _)
tableValue.foreachPartition {
part =>
val con: Connection = CommonUtil.getPhoenixConnection
part.foreach {
info =>
info._1 match { case "" => save(info, con) ...}
}
MysqlUtil.colseConnection(con)
}
}
rdd.count
}
def convertTableJsonStr(json: ObjectNode): (String, ArrayBuffer[String]) = {
val jsonObject = JSON.parseObject(json.toString);
val data = jsonObject.getString("payload");
val schema = JSON.parseObject(jsonObject.getString("schema"));
val topicName = schema.getString("name").split("\\.")
val tableName = topicName(topicName.size - 2)
(tableName, ArrayBuffer(data))
}


# 日志:

## gc

分别对**master, worker和executor**打印gc日志
抽取出分析,最长gc时间只有0.5

以下为卡住的executor1 / 192.168.0.107对应时刻gc日志,**gc频率高,但时间短**
2018-12-10T01:13:00.700+0800: 698.800: [GC (Allocation Failure) [PSYoungGen: 189829K->12080K(199936K)] 356678K->178932K(412928K), 0.0252717 secs] [Times: user=0.03 sys=0.00, real=0.03 secs] 
Heap after GC invocations=133 (full 7):
PSYoungGen total 199936K, used 12080K [0x92cc0000, 0xa0200000, 0xa8200000)
eden space 182016K, 0% used [0x92cc0000,0x92cc0000,0x9de80000)
from space 17920K, 67% used [0x9f080000,0x9fc4c010,0xa0200000)
to space 18176K, 0% used [0x9de80000,0x9de80000,0x9f040000)
ParOldGen total 212992K, used 166852K [0x68200000, 0x75200000, 0x92cc0000)
object space 212992K, 78% used [0x68200000,0x724f11b8,0x75200000)
Metaspace used 43119K, capacity 43442K, committed 43544K, reserved 44336K
}
{Heap before GC invocations=134 (full 7):
PSYoungGen total 199936K, used 194096K [0x92cc0000, 0xa0200000, 0xa8200000)
eden space 182016K, 100% used [0x92cc0000,0x9de80000,0x9de80000)
from space 17920K, 67% used [0x9f080000,0x9fc4c010,0xa0200000)
to space 18176K, 0% used [0x9de80000,0x9de80000,0x9f040000)
ParOldGen total 212992K, used 166852K [0x68200000, 0x75200000, 0x92cc0000)
object space 212992K, 78% used [0x68200000,0x724f11b8,0x75200000)
Metaspace used 43120K, capacity 43442K, committed 43544K, reserved 44336K
2018-12-10T01:13:13.070+0800: 711.170: [GC (Allocation Failure) [PSYoungGen: 194096K->12272K(200192K)] 360948K->179128K(413184K), 0.0148226 secs] [Times: user=0.03 sys=0.00, real=0.02 secs]
Heap after GC invocations=134 (full 7):
PSYoungGen total 200192K, used 12272K [0x92cc0000, 0xa0700000, 0xa8200000)
eden space 182016K, 0% used [0x92cc0000,0x92cc0000,0x9de80000)
from space 18176K, 67% used [0x9de80000,0x9ea7c010,0x9f040000)
to space 18176K, 0% used [0x9f540000,0x9f540000,0xa0700000)
ParOldGen total 212992K, used 166856K [0x68200000, 0x75200000, 0x92cc0000)
object space 212992K, 78% used [0x68200000,0x724f21b8,0x75200000)
Metaspace used 43120K, capacity 43442K, committed 43544K, reserved 44336K
}
{Heap before GC invocations=135 (full 7):
PSYoungGen total 200192K, used 194288K [0x92cc0000, 0xa0700000, 0xa8200000)
eden space 182016K, 100% used [0x92cc0000,0x9de80000,0x9de80000)
from space 18176K, 67% used [0x9de80000,0x9ea7c010,0x9f040000)
to space 18176K, 0% used [0x9f540000,0x9f540000,0xa0700000)
ParOldGen total 212992K, used 166856K [0x68200000, 0x75200000, 0x92cc0000)
object space 212992K, 78% used [0x68200000,0x724f21b8,0x75200000)
Metaspace used 43129K, capacity 43442K, committed 43544K, reserved 44336K


[/code]
executor.png


## executor log (TID 1453)
18/12/10 01:13:24 INFO Executor: Running task 5.0 in stage 241.0 (TID 1450)
18/12/10 01:13:24 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 6 blocks
18/12/10 01:13:24 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
18/12/10 01:13:24 INFO ProcessMysqlData: insert size:570
18/12/10 01:13:25 INFO Executor: Finished task 5.0 in stage 241.0 (TID 1450). 1053 bytes result sent to driver
18/12/10 01:13:26 INFO BlockManager: Removing RDD 476
18/12/10 01:13:30 INFO CoarseGrainedExecutorBackend: Got assigned task 1453

//--------------------------------start---------------------------------------------
18/12/10 01:13:30 INFO Executor: Running task 2.0 in stage 242.0 (TID 1453)
18/12/10 01:13:30 INFO TorrentBroadcast: Started reading broadcast variable 242
18/12/10 01:13:30 INFO MemoryStore: Block broadcast_242_piece0 stored as bytes in memory (estimated size 2.9 KB, free 366.1 MB)
18/12/10 01:13:30 INFO TorrentBroadcast: Reading broadcast variable 242 took 37 ms
18/12/10 01:13:30 INFO MemoryStore: Block broadcast_242 stored as values in memory (estimated size 5.0 KB, free 366.1 MB)
18/12/10 01:13:30 INFO KafkaRDD: Computing topic mysql-clusterd.bigdata.movie_base_info, partition 0 offsets 28752 -> 28848
18/12/10 01:14:10 INFO Executor: Finished task 2.0 in stage 242.0 (TID 1453). 1746 bytes result sent to driver
//--------------------------------end---------------------------------------------

18/12/10 01:14:10 INFO CoarseGrainedExecutorBackend: Got assigned task 1460
18/12/10 01:14:10 INFO Executor: Running task 1.0 in stage 243.0 (TID 1460)
18/12/10 01:14:10 INFO MapOutputTrackerWorker: Updating epoch to 122 and clearing cache
18/12/10 01:14:10 INFO TorrentBroadcast: Started reading broadcast variable 243
18/12/10 01:14:10 INFO MemoryStore: Block broadcast_243_piece0 stored as bytes in memory (estimated size 1957.0 B, free 366.1 MB)
18/12/10 01:14:10 INFO TorrentBroadcast: Reading broadcast variable 243 took 34 ms

## driver log
18/12/10 01:13:30 INFO dag-scheduler-event-loop DAGScheduler 54: Submitting 6 missing tasks from ShuffleMapStage 242 (MapPartitionsRDD[486] at map at ProcessMysqlData.scala:109) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5))
18/12/10 01:13:30 INFO dag-scheduler-event-loop TaskSchedulerImpl 54: Adding task set 242.0 with 6 tasks
18/12/10 01:13:30 INFO dispatcher-event-loop-2 TaskSetManager 54: Starting task 0.0 in stage 242.0 (TID 1452, 192.168.0.108, executor 2, partition 0, PROCESS_LOCAL, 4721 bytes)
18/12/10 01:13:30 INFO dispatcher-event-loop-2 TaskSetManager 54: Starting task 2.0 in stage 242.0 (TID 1453, 192.168.0.107, executor 1, partition 2, PROCESS_LOCAL, 4729 bytes)
18/12/10 01:13:30 INFO dispatcher-event-loop-2 BlockManagerInfo 54: Added broadcast_242_piece0 in memory on 192.168.0.108:34106 (size: 2.9 KB, free: 366.1 MB)
18/12/10 01:13:30 INFO dispatcher-event-loop-2 TaskSetManager 54: Starting task 1.0 in stage 242.0 (TID 1454, 192.168.0.108, executor 2, partition 1, PROCESS_LOCAL, 4726 bytes)
18/12/10 01:13:30 INFO task-result-getter-3 TaskSetManager 54: Finished task 0.0 in stage 242.0 (TID 1452) in 24 ms on 192.168.0.108 (executor 2) (1/6)
18/12/10 01:13:30 INFO dispatcher-event-loop-0 BlockManagerInfo 54: Added broadcast_242_piece0 in memory on 192.168.0.107:56373 (size: 2.9 KB, free: 366.1 MB)
18/12/10 01:13:30 INFO task-result-getter-0 TaskSetManager 54: Finished task 1.0 in stage 242.0 (TID 1454) in 30 ms on 192.168.0.108 (executor 2) (2/6)
18/12/10 01:13:30 INFO dispatcher-event-loop-2 TaskSetManager 54: Starting task 3.0 in stage 242.0 (TID 1455, 192.168.0.108, executor 2, partition 3, PROCESS_LOCAL, 4726 bytes)
18/12/10 01:13:30 INFO dispatcher-event-loop-0 TaskSetManager 54: Starting task 5.0 in stage 242.0 (TID 1456, 192.168.0.108, executor 2, partition 5, PROCESS_LOCAL, 4725 bytes)
18/12/10 01:13:30 INFO task-result-getter-2 TaskSetManager 54: Finished task 3.0 in stage 242.0 (TID 1455) in 75 ms on 192.168.0.108 (executor 2) (3/6)
18/12/10 01:13:30 INFO task-result-getter-1 TaskSetManager 54: Finished task 5.0 in stage 242.0 (TID 1456) in 55 ms on 192.168.0.108 (executor 2) (4/6)
18/12/10 01:13:36 INFO JobGenerator JobScheduler 54: Added jobs for time 1544375616000 ms
18/12/10 01:13:36 INFO dispatcher-event-loop-0 TaskSetManager 54: Starting task 4.0 in stage 242.0 (TID 1457, 192.168.0.101, executor 0, partition 4, ANY, 4734 bytes)
18/12/10 01:13:36 INFO dispatcher-event-loop-0 BlockManagerInfo 54: Added broadcast_242_piece0 in memory on 192.168.0.101:43782 (size: 2.9 KB, free: 366.1 MB)
18/12/10 01:13:36 INFO task-result-getter-3 TaskSetManager 54: Finished task 4.0 in stage 242.0 (TID 1457) in 194 ms on 192.168.0.101 (executor 0) (5/6)
18/12/10 01:13:42 INFO JobGenerator JobScheduler 54: Added jobs for time 1544375622000 ms
18/12/10 01:13:48 INFO JobGenerator JobScheduler 54: Added jobs for time 1544375628000 ms
18/12/10 01:13:54 INFO JobGenerator JobScheduler 54: Added jobs for time 1544375634000 ms
18/12/10 01:14:00 INFO JobGenerator JobScheduler 54: Added jobs for time 1544375640000 ms
18/12/10 01:14:06 INFO JobGenerator JobScheduler 54: Added jobs for time 1544375646000 ms

//-------------------------stage 242.0 (TID 1453) 40142 ms----------------------------------
18/12/10 01:14:10 INFO task-result-getter-0 TaskSetManager 54: Finished task 2.0 in stage 242.0 (TID 1453) in 40142 ms on 192.168.0.107 (executor 1) (6/6)
//-----------------------------stage 242.0 (TID 1453)----------------------------

18/12/10 01:14:10 INFO task-result-getter-0 TaskSchedulerImpl 54: Removed TaskSet 242.0, whose tasks have all completed, from pool
18/12/10 01:14:10 INFO dag-scheduler-event-loop DAGScheduler 54: ShuffleMapStage 242 (map at ProcessMysqlData.scala:109) finished in 40.143 s
18/12/10 01:14:10 INFO dag-scheduler-event-loop DAGScheduler 54: looking for newly runnable stages
18/12/10 01:14:10 INFO dag-scheduler-event-loop DAGScheduler 54: running: Set()
18/12/10 01:14:10 INFO dag-scheduler-event-loop DAGScheduler 54: waiting: Set(ResultStage 243)
18/12/10 01:14:10 INFO dag-scheduler-event-loop DAGScheduler 54: failed: Set()
18/12/10 01:14:10 INFO dag-scheduler-event-loop DAGScheduler 54: Submitting ResultStage 243

## system
虚拟机内存不足

top.png



# 重申问题
**需要定位并分析解决部分 job task pending的问题, 或者其他优化方法**

谢谢!
已邀请:

要回复问题请先登录注册


中国HBase技术社区微信公众号:
hbasegroup

欢迎加入HBase生态+Spark社区钉钉大群