陌路茶色/

Spark任务遇到的一些问题以及解决方案总结

因为是我自己的一些任务对应的错误,因此可能需要针对不同的任务做不同的解决,仅便于自己查阅。

Spark java.lang.OutOfMemoryError: Java heap space

这个错误指向的是我PhraseMining.FeatureExtraction代码中reduceByKey那一行(227)
参考Spark java.lang.OutOfMemoryError: Java heap space的解决方案中提到的几点解决方案:
1.提到executor的内存大小:spark.executor.memory
2.增加更多的partition,程序中使用repartition来修改partition个数。【后面说对于大数据而言,每个cpu至少超过4个,有点不理解,是说一个executor对应的core的个数吗,但是core多了,executor对应的memory不变,那每个core分到的memory不就变小了吗,那不是更容易OutOfMemory???】
3.如果变量比较大,使用broadcast,因为broadcast后,一个executor对应的多个core共享broadcast变量
4.cache RDD到disk中
5.避免使用String和严重嵌套的结构(比如Map或嵌套类),尽可能在嵌套结构上使用WrappedArray
6.使用Dataset来cache你的结构,序列化更有效。

但是我将executor的内存由8G变为16G,而且设置每个executor只有两个core,仍旧报错,输入语料大概20G,此时我将reduceByKey前面一行repartition到10000个partition,并削减输入语料继续跑。还是报同样的错误,后面又参考了Spark Heap OOM(堆内存溢出)这篇blog,提到说这个是Executor heap,有可能是key对应的value太多,于是我看了一下我的代码,发现reduceByKey中有字符串相加的过程,我在想是不是字符串变量有最大的限制,于是将其变成array,再相加,然后问题就解决了。

java.lang.NegativeArraySizeException

这个错误在我代码sc.broadcast中传入比较大的变量时报错,我以为是变量太大,broadcast中传入的大小有限(实际上我传入的只有4百多M),参考scala广播 sc.broadcast 500M数据 以及 kryo的解决方案,在main中添加如下代码解决:

import com.esotericsoftware.kryo.Kryo
val kryo = new Kryo
kryo.setReferences(false)

spark读hive表太慢

在读取hive表的时候为了均匀化,repartition使用的是repartitionByRange函数,导致读取非常慢,我以为是因为集群资源太少了,后来我将repartitionByRange函数改为简单的repartition函数,然后同样资源下跑的速度就快了很多。

spark中读取了一个全局变量,然后一直取不到该值

比如我定义了一个全局变量vocabHash词典,然后在driver端给与其赋值,然后对RDD如下处理:

data
  .map(arr=> arr.filter(w=>words.contains(w)))
  .filter(arr=> arr.length>=2)
  .map(arr=>arr.map(w=>vocabHash.getOrElse(w,-1)))

然后返回的一直是-1,想了半天都不知道是什么原因,后来突然想到是因为RDD是在集群上处理的,于是我将vocabHash broadcast后的结果就正确了。但是上述的vocabHash难道不能作为变量传入吗,非得使用broadcast的方式才能吗...

java.lang.UnsupportedOperationException: empty.reduceLeft

empty Array 不能进行reduce操作,应该使用reduceOption,参考:avoid use reduce

repartation过慢问题发现

我发现有一个stage跑了10多个小时还没有跑完,看了一下该stage对应的操作,包括repartition,filter,map等,确认filter和map不会导致stage执行缓慢,看了一下repartition操作,是对读取的hive表做repartition:

val itemInfoRDD=ss.sql(itemInfoSql)
  .rdd
  .repartition(100)

从hive表路劲中可以看到repartation前的part个数为20000个(每个分区会有若干个文件,这些文件的个数就是part的个数),现在要repartition到200个,怀疑是这个导致的,于是把该句去掉了。
【repartition操作为什么会如此慢,以及repartition前part的个数和repartition后的part的个数对repartition的操作的影响,我理解repartition前的part大于repartition后的part这种场景的操作时间应该大于repartition前的part小于repartition后的part这种场景,因为我觉的前者不需要重新hash,不过按理说应该都需要重新hash的】

留下一条评论

暂无评论