陌路茶色/

关于Spark中并行化的一些实践

这里记录一下自己在写Spark任务时遇到的一些并行化问题以及自己的解决方法,真实场景中遇到的问题

TASK1

RDD中的一条数据是(phrase,label,rel),label表示phrase所属的类别,为数字型字符串,且两个phrase的label值越近则两个phrase所属的cell越近,任务是计算phrase对cell而言的区分度,需要用到邻居cell(即context),公式如下图,问如何在spark中求解公式中的分母 ???
屏幕快照 2019-11-05 下午9.17.22.png

假设context大小等于k,那么对于当前label聚合的是[label-k,label+k]的数据,最开始想到的是有一个函数能够将这个范围的数据映射为一个固定的值,然后在聚合,后面想到将每一条数据映射为2k+1条数据再聚合,如下:
屏幕快照 2019-11-05 下午9.33.11.png
整个计算为将分子的RDD和分母的RDD union操作再在reduceByKey中做除法,除法规则是当x大于y时为y/x,反之为x/y。

TASK2

大量评论,要提取评论中的短语,并统计该短语的上下文信息,比如统计短语左邻词的分布,如何解决 ???
屏幕快照 2019-11-17 下午9.48.51.png
解决方案如上图,首先使用ngram得到评论的一系列候选短语,使用flatMap拍扁,左边这条线直接统计短语对应的评论特征,因为左边这条线特征中会用到短语的频次,因此右边解决的就是将拍扁的短语reduceByKey统计频次,使用broadcast传到左边的RDD中。
注意该过程会存储问题:broadcast传入的变量是有大小限制的,我测试的时候超过400M左右估计就不行了,需要为每个executor分配更多的内存空间,这显然不是一个很好的解决方案。
目前的解决方案是使用hash函数将短语映射为32位的整形,这样(phrase,freq)的RDD就变成(hash,freq),一个短语长度至少在两个中文字符以上,一个中文字符至少为2个字节,而32为整形只有4个字节大小。
见TASK3,对该问题有进一步分析。

TASK3

两个RDD,一个RDD是短语评论数据(无限大),即对应的每一个元素是(phrase,comment),一条短语可能对应多条评论,也就是说RDD中可能有两个元素的phrase相同但comment不同,另外一个RDD是短语频次表(百兆级别),即每一个元素是(phrase,freq),那么如何得到一个RDD,其元素为(phrase,comment,freq)【注意:本任务只需要将短语频次表中短语对应的freq添加到短语评论数据的后面,不需要对RDD(phrase,comment)进行聚合】 ???

该场景下是不能简单的使用union RDD(phrase,comment)和RDD(phrase,freq)后再reduceByKey的,因为reduceByKey时,相同的key(phrase)对应的comment会聚集到一个part中,而一个phrase可能映射上千万级的comment,一个part显然会被撑爆。
目前使用的是将短语频次表中的短语使用hash压缩,不是长久之计。另外一种方法就是使用random降低key对应的comment条数:
屏幕快照 2019-11-20 下午9.29.06.png
上图已经非常清楚了,主要保证大表RDD random后,两个RDD的key仍相同。

TASK4

一个大表(uid,...),一个小表(uid,...),现在想取出小表对应的uid值在大表中的行 ???
如下几种都是在真实场景中尝试过的方法

join
直接使用join可能会存在数据倾斜的问题,大表10T,小表10G左右,hive join跑了一个晚上没出结果。
简单提一下Join (Common Join)的机制:
屏幕快照 2019-11-20 下午8.39.27.png
Map阶段以join on条件中的列作为key(多列组合),value为提取的字段(注意标识该value来自哪个表),生成(key,value)对。
shuffle阶段按key将pair对推到不同的分区中(这里shuffle有固定分区,即不管多少个RDD,其中的元素的key相同,将被分到相同的part中,或者是直接union两个表)。
reduce阶段按key进行聚合(可以从上图中看到并不是简单reduceByKey的方式聚合)。

Boardcast+filter+reduceByKey
将小表collect出来,然后在大表的filter阶段中传入来过滤不匹配的行。
注意:collect()后一定要转为set或者map,这样使用contains时就是O(1)的时间复杂度,而array是O(n)的时间复杂度

val accountsDidRDDBc=ss.sparkContext.broadcast(accountsDidRDD.collect().toSet)
val didsRDD=ss.sql(didSql)
  .repartitionByRange(10000, col("user_id"))
  .rdd
  .filter{row=>
    val accountsDidRDDBcV=accountsDidRDDBc.value
    val did=row.getLong(1)
    accountsDidRDDBcV.contains(did)
  }

union+reduceByKey+filter
因为boardcast中传入的变量是有限的(好像和executor上的内存有关),因此可能会出现java.lang.ArrayIndexOutOfBoundsExceptionjava.lang.NegativeArraySizeException的错误,此时除了增加executor的内存外,另外一种方法就是使用union将两个table连起来,此处需要注意value要区分化,即将每个表对应的value加上前后缀,在reduceByKey后,filter阶段中以前后缀来过滤是否保留该行。
我后面查看下面这样的错误:

java.lang.NegativeArraySizeException
        at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:447)
        at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:245)
        at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:239)
        at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:135)

的解决方法中提到说broadcast中能够传入2G以内的数据,否则出现问题要么是size超过10亿,要么是java的bug,然后给出了解决方案,在spark_submit的时候添加

--conf spark.executor.extraJavaOptions="-XX:hashCode=0" \
--conf spark.driver.extraJavaOptions="-XX:hashCode=0" \

或者在spark代码中添加

import com.esotericsoftware.kryo.Kryo
val kryo = new Kryo
kryo.setReferences(false)

scala广播500M数据kryo

参考

[1]一起学Hive之十-Hive中Join的原理和机制

留下一条评论

暂无评论