数据入门必看丨彻底搞定数据倾斜的12 种办法原创
金蝶云社区-彭文华身份
彭文华
7人赞赏了该文章 3631次浏览 未经作者许可,禁止转载编辑于2022年03月21日 16:26:11

数据倾斜是上帝对某个服务器的过于偏爱。


造成数据倾斜的原因

上帝太过于偏爱某个服务器,因此给他分配了太多的任务,导致 数据都倾斜到这台服务器了。 在大数据场景中,无论是 Map Reduce 还是 Spark,都会因两阶 段之间的 shuffle 导致各个服务器接受到的数据导致处理量失衡的问 题。情况严重,就变成数据倾斜了。 我们之所以创造分布式环境,就是因为我们将一个巨大的任务拆 解成若干个小任务,给不同的服务器执行,这样总执行时间就会减小 至 1/n。理想状态的任务处理情况应该如下图所示:


6.png


原本单机环境需要执行 100s 的任务,由 5 台服务器共同执行, 每台服务器执行 20s,最后总时间会远远大于单机环境的执行时间。 而且我们还可以通过不断增加服务器,来不断减少总运行时间。但是 往往会出现这种情况:


7.png


某 4 台服务器很快就执行完了任务,但是其中有一台服务器的迟 迟不能完成,严重的时候甚至会 OOM(Out Of Memory)。究其原 因,其实如同上面说过的,在分布式处理的不同阶段之间会有一个混洗(shuffle)的过程:


8.png


在 Map 或者 Spark 的 Stage1 阶段,由于每个数据块的大小都是 一致的(默认 128M),所以在这个阶段是不会出现数据倾斜的。但 是一旦我们对数据进行 Shuffle,比如按照商品品类进行分组之后, 在 Reduce 或 Stage2 阶段,数据将会出现严重的倾斜:原本每台服 务器都只需要处理 3 条数据,Shuffle 之后,其中两台服务器各只需 处理 1 条,而剩余的那台服

务器则需要执行 8 条数据。三台服务器 处理的数据量比为 7:1:1。数据倾斜至第一台服务器。任务延迟, 甚至OOM。


如何解决数据倾斜呢:

三个层面: 

1、预判-原始数据预防,保证原始数据不倾斜; 

2、躲闪-规避数据倾斜,尽量规避 Shuffle; 

3、硬刚-处理数据倾斜,无法规避 Shuffle,用各种办法优化 Shuffle 过程


9.png


预判:

虽说 HDFS 的数据都是 128M,不会一开始就出现数据倾斜,但是仍然有以下几种情况:


1、数据压缩后,128M 文件大小一样,但是数据量不一样;

2、存在不可切分的大文件; 

3、流式数据。 


这几种情况还是可能会导致程度不一的数据倾斜的。


我们需要做 一些简单的处理: 

1、数据压缩后,128M 文件大小一样,但是数据量不一样; 解决办法:压缩前,保证每个文件中的数据量基本一致; 

2、存在不可切分的大文件; 解决办法:生成数据时,尽量减少不可切分的文件,尽量按照 HDFS 的逻辑,存成可切分的文件;或者保证这些大文件中的数据量 基本一致,且单机可处理。 

3、流式数据; 解决办法:Kafka 的 partition 实现建议使用随机、轮询等方法, 尽量使各 topic 的各 partition 的数据尽量平衡


闪避

既然我们知道数据倾斜的主要原因的 shuffle 导致的,那么我们首要的优化方向就是 shuffle,能不用尽量不要用。有以下几种方法我们可以规避: 


4、ETL 预处理

在面对无法避免的原始数据倾斜(Hive 表中 key 分布不均匀、 kafka 中某 topic 的 partititoner 含有业务属性,天然不均匀等),我 们可以通过前置 ETL 过程,进行预处理。 注意:这个方法只是将成本转嫁,并没有解决问题。适合削峰填 谷类的操作,比如我们将数据预处理好,避免凌晨集中计算的时候处 理时间过长,影响其他任务。 


5、过滤不必要的 key 

很多数据分析师在单体数据库的时候,就有一个不好的习惯:总 喜欢 select *。在 hive、spark 等分布式环境中,就吃苦头了,经常 遇到数据倾斜甚至 OOM。有经验的数据分析师在写 sql 的时候,通 常会先 group by 一下,看看数据的分布情况,然后再处理。 咱在分布式环境中也可以做类似的事情,就是采样。 离线环境可以用随机采样,实时环境可以用鱼塘采样。采样能够 快速摸清楚各个 key 的大致分布。扫一眼数据量大的 key,如果跟你 的计算没啥关系,直接过滤就行。 比如上面举的例子,母婴品类占绝大多数,但是运营的要求是分 析 3C 产品,那你过滤掉母婴产品,一则减少计算量,二则规避了数 据倾斜的问题。 


6、Reduce join 改为 Map join 

如果是大小表的 join,比如订单表和订单类型、订单状态的 join, 如果使用 reduce join 的话,就非常容易在 shuflle 之后出现数据倾 斜。建议的原则:只要一台服务器的内存能吃下这张小表(主要看服 务器内存大小,建议 2g 以内,再大就影响服务器性能了),就建议 用 map join。这样 join 完之后,每份数据依然是基本均衡的,而且规避了 shuffle


硬刚 

上述几步,能做的都做了,还是不行,那就只能硬刚了。这时就 只能八仙过海各显神通了。基本的逻辑还是一样的,就是能拆的尽量拆,不能拆的用空间换时间,或者自定义。 


7、通用优化:

shuffle 并行度 spark 的 shuffle 并行度默认值是 200,建议根据服务器的情况进 行调整。一般是集群 cpu 总和的 2-3 倍。当发生数据倾斜的时候, 适当增大并行度,可以让任务和数据更均匀的分布在整个集群中。但 是这个调优方法有些玄学成分在,因为你不知道他是咋分过去的。 


并行度调整有三个方法:

●操作函数内设置 testRDD.groupByKey(200) 

●代码中设置“spark.default.parallelism” conf.set("spark.default.parallelism", 200) 

●配置文件中设置“$SPARK_HOME/conf/spark-defaults.conf” 文件 spark.default.parallelism 200 


8、拆分超大 key 

前面说过采样后过滤。如果采样之后发现这个 key 还是你需要的, 无法怎么办?那就把超大数据量的 key 拆分出来,单独做成一个任 务,这样超大数据量的 key 一个任务,其他中小数据量的 key 一个 任务,两个任务分别做 join 啊什么的处理,最后把结果合并一下就 行了。 为了避免超大数据量的 key 单独 join 的时候还是一个 key 一个任 务,可以在 key 上加上随机数取模的前缀,这样就把数据分成了 N 份,然后再 join。 


9、阶段拆分-两阶段聚合 

对于聚合类的操作,这种方式可以说是数据倾斜的大杀器。简单 来说就是在需要聚合的 key 前加一个随机数取模的前缀,这样就能得 到非常均匀的key,然后按这个加工之后的key进行第一次聚合之后, 再对聚合的结果,按照原始 key 进行二次聚合,这样基本就不可能出 现数据倾斜了。示意图如下:


10.png

对比之前的例子中,处理母婴的服务器和处理 3c、图书的服务器 任务量是 7:1:1,这个方案的数据就非常均匀了。


10、任务拆分

很多时候数据情况会非常复杂,有 null 值、有超大数据量的 key、 还有各种需要过滤的数据,还有各种聚合和 join。那这个时候就需要 把任务再拆分。一部分用上面的 key 值过滤,一部分用 Map Join, 一部分用超大 key 单独处理


11、随机前缀 

前面说过小表 join 的时候可以用 Map join。但是遇到大表 join 大 表 咋 办 ? 三 个 方 法 : 1 、 大 表 拆 成 小 表 , 多 次 join ; 2 、 SortMergeJoin;3、位图法(详见《位图法搞定 10 亿用户量用户 标签处理》)。 那大表+中表,该咋处理?可以考虑用随机前缀+RDD 扩容的方 法解决 join 的问题。 如果你将要 join 的表不大不小,又不适合用上面大大表的处理方 法,那就可以用这个通用的 join 方法。简单来说,就是对 A 表中需 要 join 的字段加上 n 以内的随机数前缀,然后再把 B 表中的数据复 制 N 份,join 的字段加上 1-N 的前缀,然后量表再 join,就能解决 数据倾斜的问题了。示意如下:


原始数据如下:

11.png


不经处理直接 join 是这样的,part1 很明显比 part2 要多好几倍的数据:


12.png


我们对 A 表和 B 表进行随机前缀和 RDD 扩容处理之后


13.png


然后再 join,这样每个 part 的数据就非常均匀了

66.png


这个方法比较坑的是 B 表这个 RDD 需要扩容,要复制 N 份,对 内存要求比较高。但是这个方法可以说是通杀 Join 的数据倾斜问题。 


12、自定义 partitioner 

上面说改 spark 的并行数也可以改善数据倾斜,但是有点玄学的 意思在里面。其根本原因就是不管你怎么调优,计算引擎的分区都是 按照固定的方法进行的,根本不会,也没办法考虑数据真实情况。 无论是二阶段聚合解决聚合的问题,还是随机前缀+RDD 扩容解 决 join 的问题,都是通用解决办法,而且还麻烦。其实最好的解决 办法就是根据现在处理的这份数据,单独写一个适合的 partitioner。 比如现在是按省份进行汇总数据,如果只是简单的按省份去分(这并 没有错),那么数据肯定会倾斜,因为各省的数据天然不一样。我们 可以通过历史数据、抽样数据或者一些常识,对数据进行人工分区, 让数据按照我们自定义的分区规则比较均匀的分配到不同的 task 中。


常见的分区方式: 

随机分区:每个区域的数据基本均衡,简单易用,偶尔出现倾斜, 但是特征同样也会随机打散。 

轮询分区:绝对不会倾斜,但是需要提前预知分成若干份,进行轮询。 

hash 散列:可以针对某个特征进行 hash 散列,保证相同特征的数据在一个区,但是极容易出现数据倾斜。 

范围分区:需要排序,临近的数据会被分在同一个区,可以控制分区数据均匀。 


数据倾斜并不可怕,咱可以糙一些,也可以精致一些。但是建议 还是糙一些,这样简单粗暴,多节省一些时间干(xue)点(dong) 别(xi)的


发布于 数据智能 社群

赞 7