二、 如何避免数据倾斜
2.1 避免数据源倾斜-HDFS
Spark通过 textFile(path, minPartitions) 方法读取文件时,使用 TextInputFormat。对于不可切分的文件,每个文件对应一个 Split 从而对应一个 Partition。此时各文件大小是否一致,很大程度上决定了是否存在数据源侧的数据倾斜。另外,对于不可切分的压缩文件,即使压缩后的文件大 小一致,它所包含的实际数据量也可能差别很多,因为源文件数据重复度越高,压缩比越高。反过来, 即使压缩文件大小接近,但由于压缩比可能差距很大,所需处理的数据量差距也可能很大。此时可通过在数据生成端将不可切分文件存储为可切分文件,或者保证各文件包含数据量相同的方式避免数据倾斜。
# 对于不可切分文件可能出现数据倾斜,对于可切分文件,一般来说,不存在数据倾斜问题。
1. 可切分: 基本上不会! 默认数据块大小:128M
2. 不可切分: 源文件不均匀,最终导致 分布式引用程序计算产生数据倾斜 日志:每一个小时生成一个日志文件
2.2 避免数据源倾斜-Kaka
Topic 主题: 分布式的组织形式: 分区, 既然要进行数据分区,那就有可能产生数据分布不均匀
以 Spark Stream 通过 DirectStream 方式读取 Kafka 数据为例。由于 Kafka 的每一个 Partition 对应 Spark 的一个 Task(Partition),所以 Kafka 内相关 Topic 的各 Partition 之间数据是否平衡,直接决 定 Spark 处理该数据时是否会产生数据倾斜。
Kafka 某一 Topic 内消息在不同 Partition 之间的分布,主要由 Producer 端所使用的 Partitioner 实现 类决定。如果使用随机 Partitioner,则每条消息会随机发送到一个 Partition 中,从而从概率上来讲, 各 Partition 间的数据会达到平衡。此时源 Stage(直接读取 Kafka 数据的 Stage)不会产生数据倾斜。
但很多时候,业务场景可能会要求将具备同一特征的数据顺序消费,此时就需要将具有相同特征的数据 放于同一个 Partition 中。一个典型的场景是,需要将同一个用户相关的PV信息置于同一个 Partition 中。此时,如果产生了数据倾斜,则需要通过其它方式处理。
* 以 Spark Stream 通过 DirectStream 方式读取 Kafka 数据为例。由于 Kafka 的每一个 Partition 对应 Spark 的一个 Task(Partition),所以 Kafka 内相关 Topic 的各 Partition 之间数据是否平衡,直接决 定 Spark 处理该数据时是否会产生数据倾斜。
* Kafka 某一 Topic 内消息在不同 Partition 之间的分布,主要由 Producer 端所使用的 Partitioner 实现 类决定。如果使用随机 Partitioner,则每条消息会随机发送到一个 Partition 中,从而从概率上来讲, 各 Partition 间的数据会达到平衡。此时源 Stage(直接读取 Kafka 数据的 Stage)不会产生数据倾斜。
* 但很多时候,业务场景可能会要求将具备同一特征的数据顺序消费,此时就需要将具有相同特征的数据 放于同一个 Partition 中。一个典型的场景是,需要将同一个用户相关的PV信息置于同一个 Partition 中。此时,如果产生了数据倾斜,则需要通过其它方式处理。
2.3 定位处理逻辑 - Stage 和 Task
归根结底,数据倾斜产生的原因,就是两个 stage 中的 shuffle 过程导致的。所以我们只需要研究Shuffle 算子即可。我们知道了导致数据倾斜的问题就是 shuffle 算子,所以我们先去找到代码中的 shuffle 的算子,比如 distinct、groupByKey、reduceByKey、aggergateByKey、join、cogroup、repartition 等,那么问 题一定就出现在这里。spark的执行,按照hsuffle算子分成多个stage来执行。
* 如果 Spark Application 运行过程中,出现数据倾斜,可以通过 web 管理监控界面,查看 各stage 的运行情况,如果某一个 stage 的运行很长,并且这个 stage 的大部分Task都运行很快,则
2.4 查看导致倾斜的key的数据分布情况
知道了数据倾斜发生在哪里之后,通常需要分析一下那个执行了shuffle操作并且导致了数据倾斜的 RDD/Hive表,查看一下其中key的分布情况。这主要是为之后选择哪一种技术方案提供依据。针对不同 的key分布与不同的shuffle算子组合起来的各种情况,可能需要选择不同的技术方案来解决。此时根据你执行操作的情况不同,可以有很多种查看key分布的方式:
1. 如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下 SQL 中使用的表的key 分布情况。
2. 如果是对 Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看 key 分布 的代码,比如 RDD.countByKey()。然后对统计出来的各个key出现的次数,collect/take到客户端打印 一下,就可以看到key的分布情况。
举例来说,对于上面所说的单词计数程序,如果确定了是 stage1 的 reduceByKey 算子导致了数据倾 斜,那么就应该看看进行 reduceByKey 操作的 RDD 中的 key 分布情况,在这个例子中指的就是 pairs RDD。如下示例,我们可以先对 pairs 采样 10% 的样本数据,然后使用 countByKey 算子统计出每个 key 出现的次数,最后在客户端遍历和打印样本数据中各个 key 的出现次数。
val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))
采样!(离线处理:无放回采样, 流式处理:鱼塘采样)