记一次排查Spark thrift server OOM错误的经历

平时spark thrift server多用来做探索性的查询,比如验证下数据格式或count一下某天数据数量,都是轻量级的查询,查询效率也比较满意。没想到最近要真正用起来的时候,却遭遇各种瓶颈。

前阵子突然有需求需要使用thrift server提供的jdbc接口进行提数,一开始我有点怀疑它能不能撑住,因为Spark基于内存计算,吞吐量肯定比不上MR。所以我先是查了一下表的大小约300M,并不算大。我给Spark thrift server分配了12个executor,每个有2G内存,Driver也有12G内存,感觉完全没有问题。而实际查的时候却出乎意料。查询迟迟不返回结果,看后台log,driver不断在报 java.lang.OutOfMemoryError:GC overhead limit exceeded。再jmap查 Driver的内存,各个分区都在95%以上。卡了一会,整个thrift server就崩溃了。

一开始我以为是负载太高,但反复提交都是同样情况,而且集群也没有其他用户提交查询。于是我加上 limit 100000,查询就可以正常返回,thrift server 的heap也在合理范围。查询的整个结果集是要汇总到Driver一个节点是没错,但是查300M的数据就撑爆了12G内存未免夸张了吧?

google搜下,大部分人都建议使用increamental方式,分批返回result set,但并没有解决这个内存消耗问题。我在stack overflow上看到一个question Spark Thrift Server for exposing big size file,和我的情况基本一致,可惜并没有回答。

Spark从效率上考虑,是先将结果集缓存到Driver内存,再一并返回给客户端。从stage0的进度可以看到,645个task已经完成将近一半,成功task的结果会发到Driver缓存起来。仔细分析log,第一个OOM错误基本上都是Driver的result-getter线程反序列化task结果时报出来的,于是Driver怎么去缓存这些数据就至关重要了。顺着栈信息找到 org.apache.spark.scheduler.TaskResultGetter,其中的 enqueueSuccessfulTask 函数将逐个收集成功tasks的结果数据,缓存到 org.apache.spark.scheduler.TaskResult 对象中。读取单个task结果数据的源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
val blen = in.readInt()
val byteVal = new Array[Byte](blen)
in.readFully(byteVal)
valueBytes = ByteBuffer.wrap(byteVal)
val numUpdates = in.readInt
if (numUpdates == 0) {
accumUpdates = null
} else {
val _accumUpdates = mutable.Map[Long, Any]()
for (i <- 0 until numUpdates) {
_accumUpdates(in.readLong()) = in.readObject()
}
accumUpdates = _accumUpdates
}
metrics = in.readObject().asInstanceOf[TaskMetrics]
valueObjectDeserialized = false
}

从上面可以看出executor往Driver发送数据是一个对象流,协议如下:

Alt spark task result传输协议

前4个字节以整形方式存放结果集的长度,Driver先读取之后按照这个长度申请byte数组,读取整个结果集放到该数组。接着以同样的方式保存累加器数据(如果有的话)。最后读取剩余的字节,也就是监控指标数据(metric)。可以看出spark内存存储还是很紧凑的,并没有浪费内存。

于是我决定祭出必杀技,将Driver的内存dump出来分析。因为我开发机只有8G内存,所以我先将Driver调整为2G,再执行查询使其OOM。之前并没有试过分析这么大内存的进程,光是dump就花了40分钟,MAT加载dump文件更是花了1个小时以上,内存吃掉了7.5G,MAT疯狂GC,我吃完饭回来还在跑。不过幸好结果是好的。如下是MAT分析结果的dominator tree:

Alt Driver dominator_tree

可以看到 org.apache.spark.sql.catalyst.InternalRow[][] 这个二维数组吃了1.2G的内存,其中行是succeeded tasks的index,列是每个task结果集中的一行。我初略算了下,每行占2字节,这个分析结果的内存占用是比较合理的。我count一下要查的表,其实有900W行,那么就是要占18G内存,算上反序列化的空间膨胀,应该也只要6G左右的数据,那么为什么我算hive表大小的时候只有300M呢?突然想起平时在HDFS的文件都是lzo压缩的,我看到hive表没有.lzo后缀就以为没有压缩,这肯定是不对的。查一下,hive表果然默认使用了deflate压缩,由于这个表空值比较多,数据比较稀疏,所以压缩率很高,把6G的数据压到了300M。

到此为止,OOM问题算是找到原因了。

本文是原创文章,转载请注明:时间与精神的小屋 - 记一次排查Spark thrift server OOM错误的经历