陌路茶色/

pyspark使用记录

一些知识点

cache和persist

cache内部调用的是persist(StorageLevel.MEMORY_ONLY),因此如果想要指定缓存的地方,应该使用persist。
在跑pyspark的时候,如果df会被后面重复使用是需要cache的,我加了cache后就明显快了很多。

spark sql

1.关于dataframe中某列转数据类型的问题

比较常见的几个数据类型如下:

BinaryType: binary
BooleanType: boolean
ByteType: tinyint
DoubleType: double
FloatType: float
IntegerType: int
LongType: bigint
ShortType: smallint
StringType: string

大写的对应的是类型,小写的对应的字符串,提一下数组类型,比如类型为ArrayType(IntegerType())对应的是array<int>
我在网上发现了四种写法:
我以string转bigint为例

df=df.withColumn('uid',col('uid').cast('bigint'))
df=df.withColumn('uid',col('uid').cast(LongType()))
df=df.withColumn('uid',df['uid'].cast('bigint'))
df=df.withColumn('uid',df['uid'].cast(LongType()))

2.pyspark sql 样本拼接

join(other, on=None, how=None)
join的写法很多,一般的写法是这样的:

df = ta.join(tb, ta.name == tb.name,how='left').drop(tb.name)

但是如果是通过传入列名字符串来实现拼接呢,或者是多列拼接呢,见 on字段说明:
a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.

joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner').drop("df_as2.name")
joined_df.select("df_as1.name", "df_as2.age")

3.dataframe中使用某张表,sql中使用某张表

sql中使用:

df_dict[table].createOrReplaceTempView(table)

这样sql中就能直接使用table了。
dataframe中使用:

df=df.alias(label_table)
df=df.join(broadcast(df_dict[table]),df[label_column] == df_dict[table][on_column]).drop(df_dict[table][on_column])
df_dict[table]=df_dict[table].alias(table)
df=df.select(*columns_reserved)

在columns_reserved中使用到了label_table和table

错误记录

本地跑任务报错org.apache.spark.memory.SparkOutOfMemoryError,错误指向的是reduceByKey那一行/指向df.describe()

Spark runs out of memory when grouping by key
上面给出了解释,是因为并行度太低,应该增加并行度,不知道为啥,假设是在分布式的场景下,如果并行度小于key的个数会发生什么变化呢,如果并行度大于key的个数又会有什么影响,我查了一下该参数记录一下:
spark.default.parallelism只有在处理RDD时才会起作用,对Spark SQL的无效
spark.sql.shuffle.partitions则是对sparks SQL专用的设置
解释一下上面的问题,本地跑任务设置的partition个数意义在于每个partition是串行的,这样本地内存开销就会以一个partition为基准,如果partition=1,那么就会把数据全部塞进去,显然当数据量比较大时就会报outofmemory。
【后面我在reduceByKey前做了repartition就好了,那个df.describe()报同样的错误是因为我使用join后的df没有做repartition,我在df.describe()前做了repartition后错误就消失了】

org.apache.spark.shuffle.FetchFailedException

错误指向的是下面这句:

table.repartition(partition).write.option('header','true').option('sep',',').mode('overwrite').csv(save_to)

于是将spark.sql.shuffle.partitions设置加大了一倍(然后将repartition中的partition减少到1000),跑成功了,我理解这个问题是因为shuffle操作时被设置的shuffle个数太小导致的,和repartition没啥关系,因为我之前设置到500时也报这个错误我才设置到2000的

Reason: Container killed by YARN for exceeding memory limits.

17.2 GB of 17 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead

留下一条评论

暂无评论