在 Apache Spark 中,任务的切分(Task Division)是 Spark 将应用程序逻辑划分为多个并行任务的核心机制。任务切分的主要原则是基于数据分区和操作算子。以下是任务切分的核心原则和关键影响因素:
1. Spark 任务切分的基本概念
- Task:Spark 的最小计算单元,一个 Task 处理一个分区的数据。
- Stage:一组可以并行执行的任务,每个 Stage 包含多个 Task。
- Job:由一个 Action(如
count()
、save()
)触发的计算任务,是 Stage 的集合。
2. 任务切分的原则
2.1 基于分区(Partition)的切分
- Spark 的任务划分以 分区(Partition) 为基础,每个分区的数据由一个 Task 处理。
- 分区数决定了 Task 的数量,通常由以下几个因素确定:
- 初始 RDD 分区数:
- 数据读取时分区数的默认值:
- HDFS 文件:由 HDFS 块大小决定,默认 128MB 或 64MB。
- Local 文件:由
sparkContext.textFile(path, numPartitions)
中的numPartitions
参数决定。
- 数据读取时分区数的默认值:
- 后续操作对分区的影响:
- 转换算子(如
repartition()
、coalesce()
)会重新定义分区数。 - 数据 Shuffle 也会重新分区,默认的分区数可以通过
spark.sql.shuffle.partitions
配置。
- 转换算子(如
- 初始 RDD 分区数:
2.2 基于依赖关系(Dependency)的切分
- 根据 RDD 的依赖关系,划分计算阶段(Stage):
- 宽依赖(Wide Dependency):
- 一次计算需要多个分区的数据(如
groupByKey
、reduceByKey
)。 - 会引发 Shuffle,需重新划分 Stage。
- 一次计算需要多个分区的数据(如
- 窄依赖(Narrow Dependency):
- 一次计算仅依赖一个分区的数据(如
map
、filter
)。 - 任务可以在同一 Stage 中完成。
- 一次计算仅依赖一个分区的数据(如
- 宽依赖(Wide Dependency):
2.3 基于算子的切分
- Action 操作会触发一个 Job,每个 Job 会切分成多个 Stage:
- Stage 划分依据是 算子类型 和 依赖关系。
- 例如:
rdd.map(...).filter(...).reduceByKey(...).count()
map
和 filter
为窄依赖,在同一 Stage。
reduceByKey
引发 Shuffle,产生新 Stage。
3. 任务切分的影响因素
3.1 数据源
- HDFS:分区数受 HDFS 块大小影响。
- Kafka:分区数与 Kafka Topic 分区数一致。
- 本地文件:受文件的大小和读取方式影响。
3.2 算子
- 窄依赖算子:如
map
、flatMap
、filter
,不会触发 Stage 划分。 - 宽依赖算子:如
reduceByKey
、join
、groupByKey
,会触发 Shuffle 和 Stage 切分。
3.3 分区数
- 分区数的大小直接决定 Task 的数量:
- 分区数太少,不能充分利用集群资源(任务并行度低)。
- 分区数太多,可能导致任务调度开销增加。
3.4 配置参数
- spark.default.parallelism:默认的 RDD 分区数(推荐设置为 2-3 倍的 Executor 核心数)。
- spark.sql.shuffle.partitions:Shuffle 操作的默认分区数,适用于 SQL 操作。
4. Spark 任务切分优化
4.1 数据分区优化
- 使用
repartition()
或coalesce()
调整分区数:- 增加分区:
repartition()
会触发全量 Shuffle,适合大任务。 - 减少分区:
coalesce()
会尽量避免 Shuffle,适合减少小任务。
- 增加分区:
- 示例:python复制代码
rdd = rdd.repartition(100) # 将分区数调整为 100
4.2 算子优化
- 优先使用聚合算子:如
reduceByKey
优于groupByKey
,可减少 Shuffle 数据量。 - 本地合并:如
mapPartitions
,在分区内先进行局部计算。
4.3 分区策略优化
- 自定义分区器:对
key-value
数据可以使用partitionBy
自定义分区规则。- 示例:python复制代码
rdd = rdd.partitionBy(10) # 自定义为 10 个分区
4.4 配置调整
- 并行度设置:
- 增加
spark.default.parallelism
和spark.sql.shuffle.partitions
的值,提升任务并行度。
- 增加
- 优化资源分配:
- 确保每个 Executor 有足够的内存和 CPU。
5. 实例分析:任务划分示例
以下示例展示 Spark 如何根据分区和依赖划分任务:
代码示例
from pyspark import SparkContext
sc = SparkContext("local", "Task Division Example")
data = [("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5)]
rdd = sc.parallelize(data, 2)
# 1. Map 操作(窄依赖,不引发 Shuffle)
mapped_rdd = rdd.map(lambda x: (x[0], x[1] * 2))
# 2. ReduceByKey 操作(宽依赖,引发 Shuffle)
reduced_rdd = mapped_rdd.reduceByKey(lambda x, y: x + y)
# 3. Collect 动作(触发 Job)
result = reduced_rdd.collect()
print(result)
任务划分
- 初始分区数:
rdd
分为 2 个分区。 - Stage 1:
- 执行
map
操作,生成 2 个 Task(每个分区一个)。
- 执行
- Stage 2:
reduceByKey
引发 Shuffle,生成新的 2 个 Task。
- Stage 3:
collect
操作触发结果收集任务。
6. 总结
- Spark 的任务切分主要基于数据分区和算子依赖关系。
- 窄依赖 算子通常在一个 Stage 内完成,而 宽依赖 算子会引发 Shuffle 和 Stage 切分。
- 任务切分影响集群资源利用效率,合理配置分区数、选择高效算子是优化的关键。
发布者:myrgd,转载请注明出处:https://www.object-c.cn/4521