Spark考核要点

Spark 3.x AQE 与 DPP

题目一:AQE 的三大能力与适用场景

场景:你们的离线数仓从 Spark 2.x 升级到 Spark 3.x 之后,发现同一份 TPC-DS 报表整体性能提升了不少,但有些 SQL 在数据倾斜场景下表现差异更明显。Leader 让你梳理并落地 Spark 3.x 的 AQE(Adaptive Query Execution,自适应查询执行)能力,用来统一优化 SQL。

要求

  1. 说明 Spark 3.x AQE 相比 Spark 2.x 的核心目标是什么,解决了 Spark 2.x 哪些「静态优化」的痛点?
  2. 结合实际查询场景,分别举例说明 AQE 的 三大能力: 动态合并 Shuffle 分区(Coalesce Shuffle Partitions)动态调整 Join 策略(Demote SortMergeJoin to BroadcastHashJoin 等)动态倾斜 Join 优化(Skew Join)
  3. 对于每一项能力,分别回答: 需要开启或依赖的 关键配置 是什么?触发原理:在运行时依赖哪些统计信息?是在什么阶段做决策?带来的 收益 和 可能的风险/限制 是什么?

参考答案要点(题目一)

1. AQE 的核心目标与痛点

  • 核心目标:在 运行时 基于真实数据分布动态调整执行计划,弥补纯规则优化(RBO)和静态代价优化(CBO)的不足。
  • <font style="color:#DF2A3F;">解决的痛点</font><font style="color:#DF2A3F;">(Spark 2.x):</font> <font style="color:#DF2A3F;">计划一旦确定就无法调整</font><font style="color:#DF2A3F;">:CBO 仅依赖元数据统计,无法根据实际中间结果修正计划。</font><font style="color:#DF2A3F;">Shuffle 分区数固定</font><font style="color:#DF2A3F;">:</font><font style="color:#DF2A3F;">spark.sql.shuffle.partitions</font><font style="color:#DF2A3F;"> 是静态常数,数据量变动或分布不均时要么小 Task 过多、要么单 Task 过大。</font><font style="color:#DF2A3F;">Join 策略不够灵活</font><font style="color:#DF2A3F;">:在编译时决定 Broadcast Join / SortMergeJoin 等策略,一旦执行中某一侧「意外变小」也不会自动改用 Broadcast。</font><font style="color:#DF2A3F;">倾斜 Join 需要手工优化</font><font style="color:#DF2A3F;">:2.x 对 skew join 几乎都是「手工加盐、拆表」等方案,运维/开发成本高。</font>

2. AQE 的三大能力与场景

能力一:动态合并 Shuffle 分区

  • 用途:解决小分区/小文件过多的问题,减少 Task 数量和调度开销,同时减轻 NameNode 小文件压力。
  • 典型场景: 动态过滤、where 条件筛掉了大量数据,实际 Shuffle 输出远小于预估。业务高低峰明显,淡季仍使用旺季设定的较大 spark.sql.shuffle.partitions。
  • 关键配置(常用): spark.sql.adaptive.enabled=truespark.sql.adaptive.coalescePartitions.enabled=truespark.sql.adaptive.advisoryPartitionSizeInBytes=64MB(目标分区大小)
  • 触发原理: AQE 将物理计划拆成多个 QueryStage。每个 Shuffle Map 阶段结束后,收集 MapOutputStatistics(每个 reduce 分区的数据量、空分区数量等)。对明显偏小的分区进行合并,使得合并后分区大小尽量接近 advisoryPartitionSizeInBytes。
  • 收益: 减少小 Task、小文件;降低调度开销、NameNode 压力。更合理利用资源,避免「过度并行」带来的额外开销。
  • 风险/限制: 若数据本身分布不均,部分大分区仍然会存在,无法完全解决倾斜问题。调得过大可能导致某些分区过大,引起单 Task OOM,需要结合业务数据量调参。

能力二:动态调整 Join 策略

  • 用途:在运行时根据某侧表的实际大小调整 Join 策略,例如将原本计划的 SortMergeJoin 降级/升级为 BroadcastHashJoin,从而减少 Shuffle 和排序。
  • 典型场景: 编译时估算小表略大于 autoBroadcastJoinThreshold,CBO 选择 SortMergeJoin。实际运行中,经过 filter/group 等算子后,小表变得很小,完全可以广播。
  • 关键配置(示例): spark.sql.adaptive.enabled=truespark.sql.autoBroadcastJoinThreshold(广播阈值,默认值10M)与 AQE join 策略相关的内部规则(如 DemoteBroadcastHashJoin 等)。
  • 触发原理: 在对应 QueryStage 的 Map 阶段完成之后,AQE 拿到该侧 Shuffle 输出的 真实大小。若某侧的中间结果大小 < autoBroadcastJoinThreshold,且满足空分区比例等附加条件,则将 Join 物理算子从 SortMergeJoin 改写为 BroadcastHashJoin。
  • 收益: 避免不必要的双侧 shuffle + sort,Query 延迟显著降低(特别是典型星型模型中,小维表 Join 大事实表的场景)。减少网络 IO 和磁盘 IO。
  • 风险/限制: 广播表仍然会占用 executor 内存,若阈值配置过大可能引起 OOM。仅在包含 Shuffle 的查询,且开启 AQE 时生效。

能力三:动态倾斜 Join 优化(Skew Join)

  • 用途:针对少数 超大 reduce 分区 做自动拆分,降低单 Task 处理时间,缓解数据倾斜。
  • 典型场景: 事实表与维表 Join,某些热点 key(如大 V 用户、主类目)数据量远高于其他 key,导致某些 reduce 分区特别大。
  • 关键配置(示例): spark.sql.adaptive.enabled=truespark.sql.adaptive.skewJoin.enabled=true与 skew 检测阈值相关的配置(如「分区大小 > 若干倍中位数」等)。
  • 触发原理: Shuffle Map 阶段结束后,根据 reduce 分区的大小分布,检测「异常大」的分区。对这些 skewed 分区进行拆分(如 1 个大分区拆成多个子分区),并在 Join 时与对侧表的对应分区分别做多次局部 Join,最后再 union 结果。
  • 收益: 自动化处理数据倾斜,对业务逻辑透明,减少「手动加盐、拆 key」的工作。提升整体 job 的尾部延迟(p95/p99),避免某个倾斜 Task 拖慢整个 Stage。
  • 风险/限制: 需要额外的 shuffle 和任务调度开销,极端情况下可能收益有限。阈值设置不合理,可能误判或者漏判倾斜。

题目二:DPP(动态分区裁剪)在分区表 Join 中的作用

场景:数仓中有一张按 dt 分区的大事实表 fact_orders,以及一张维度表 dim_users。某个报表 SQL 大致如下:

SELECT o.order_id, o.dt, u.province
FROM fact_orders o
JOIN dim_users u
  ON o.user_id = u.user_id
WHERE u.province = 'Zhejiang'
  AND o.dt BETWEEN '2024-01-01' AND '2024-01-31';

业务方反馈:在开启 Spark 3.x 的 DPP 后,这个 SQL 的执行时间显著下降。请你向团队同学讲清楚 DPP 的价值和原理。

要求

  1. 说明 静态分区裁剪动态分区裁剪(DPP) 的区别,各自适用于什么场景。
  2. 以上述 SQL 为例,画出/描述: 没有 DPP 时,fact_orders 在执行时大致会扫描哪些分区?开启 DPP 后,Spark 如何利用 dim_users 这一侧的过滤条件,进一步减少 fact_orders 的扫描分区数?
  3. 说明 DPP 的 触发条件限制,至少包括: 被裁剪的一侧表在物理上需要满足什么条件?哪些 Join 类型支持 DPP?DPP 何时会因为「收益不够大或成本过高」而被放弃?

参考答案要点(题目二)

1. 静态分区裁剪 vs 动态分区裁剪

  • 静态分区裁剪(Static Partition Pruning): 在 编译阶段,直接从 SQL 语句中解析出对分区列(如 dt)的过滤条件。例:WHERE o.dt BETWEEN '2024-01-01' AND '2024-01-31',编译时即可确定只需扫描 1 月份分区。适用场景:分区列上的常量过滤、表达式在编译期可求值。
  • 动态分区裁剪(Dynamic Partition Pruning, DPP): 在 运行阶段,基于 Join 另一侧的实际过滤结果,动态推导出需要扫描的分区。例如:o.dt 与另一侧 t2.dt Join,而 t2 上有 WHERE t2.id < 5 等条件,则只有满足该条件的 t2.dt 才有意义,从而动态裁剪 t1 的分区。适用场景:分区列值取决于另外一张表的过滤结果,编译时无法得出具体分区集合。

2. 示例 SQL 中 DPP 的行为

无 DPP 时

  • 静态裁剪可以利用 o.dt BETWEEN '2024-01-01' AND '2024-01-31': fact_orders 只扫描 2024-01-01 ~ 2024-01-31 这些 dt 分区。对于每个 dt 分区,都会参与与 dim_users 的 Join。
  • u.province = 'Zhejiang' 只在 dim_users 这一侧生效,与 fact_orders 的分区列 dt 没有直接静态关系,无法再进一步裁剪。

开启 DPP 后

  • 优化思想(简化理解): 将原查询在逻辑上重写为:o.dt IN (SELECT DISTINCT dt FROM fact_orders 与 dim_users 以 province 过滤后的 Join 结果) 或等价的半连接逻辑。更常见的形式是:利用 dim_users 的过滤结果生成一个针对 fact_orders 分区键的「动态过滤器」。
  • 运行过程(抽象): 对 dim_users 应用 province='Zhejiang' 的过滤,得到相关用户集合。这些用户在一定条件下可以与 fact_orders 的分区键(如 dt 或 (dt, user_id))建立映射。Spark 在真正扫描 fact_orders 对应分区之前,根据这批用户/键,裁剪掉不会命中 Join 结果的 dt 分区。
  • 结果: 实际扫描的 fact_orders 分区数 ≤ 静态裁剪后的分区数;在很多 TPC-DS 场景中,能从几十个分区缩减到个位数,大幅降低 IO。

说明:真实实现中,Spark 会根据 Join 方向、Join 类型、是否广播、收益估计等采用不同的实现策略(如基于子查询或 Reused Broadcast Exchange 等),但核心思想都是「用运行时 Join 一侧的结果去裁剪另一侧的分区」。

3. DPP 的触发条件与限制

  • 触发条件(部分): 被裁剪的一侧表必须是 按 Join key 分区 的分区表(例如 PARTITIONED BY (dt) 且 Join key 中包含 dt)。Join 类型通常为: INNER JOINLEFT SEMI(被裁剪表在左侧)LEFT / RIGHT OUTER(有特定要求,被裁剪方需与 Join 方向匹配)(实现细节)对于 Broadcast Hash Join,DPP 会尝试复用广播结果;在非广播场景下可能通过子查询形式实现。
  • 限制与放弃场景: 若评估运行动态子查询的代价 大于 裁剪分区带来的收益(例如很少分区可被裁剪),优化器可能放弃 DPP。若 Join key 与分区列并不匹配(例如按 dt 分区,却在 city 上 Join),DPP 无法生效。某些复杂 Join(多表 Join、嵌套子查询)中,DPP 可能因为规划复杂度或不满足启发式规则而不触发。

以上两道题覆盖了 Spark 3.x 中 AQEDPP 的核心概念、使用场景和原理,既可以考察候选人对新版本特性的理解深度,也能引导其思考如何在真实数仓/报表场景中落地这些能力。

Spark Join 倾斜

题目一:Join 倾斜产生原因

场景:大表 A(如用户行为日志)与大表 B(如用户维度表)按 userId 做 Join。线上发现某个 Task 运行时间远超其他 Task,甚至 OOM,而其他 Task 很快完成。

要求

  1. 说明 Spark Join 数据倾斜 产生的原因。
  2. 为什么 Sort-Merge Join 和 Hash Join 都会出现倾斜?
  3. 如何快速定位哪个 key 发生了倾斜?

参考答案要点

1. Join 倾斜产生的原因

  • Shuffle 分区规则:Spark 的 Join 一般需要 shuffle,按 hash(key) % numPartitions 将相同 key 的数据发往同一分区。(关于numPartitions的值,Join 一般需要 shuffle 时,若写的是 SQL 或 DataFrame,分区数就来自 spark.sql.shuffle.partitions,默认值200;若写的是 RDD 的 join,则来自你传入的分区数或 spark.default.parallelism,如果是本地运行默认值是local的CPU核数,集群模式是分配的executor的总核数和2比较的大者。)
  • 热点 key 存在:若某个 key(如某大 V 用户 ID)在大表 A 或 B 中对应的数据量极大,则承载该 key 的分区会收到远超其他分区的数据量。
  • 表现:该分区所在的 Task 成为瓶颈,运行时间过长、可能 OOM;整体 Job 时间被拖慢,资源利用率低。

2. 为什么 Sort-Merge Join 和 Hash Join 都会出现倾斜?

Sort-Merge Join

两表按 key shuffle 后各自排序,再按序归并

相同 key 必然进入同一 reduce 分区,热点 key 所在分区数据量过大

Hash Join(Shuffle Hash Join)

一侧构建 Hash 表,另一侧按 key 匹配

同样依赖按 key 的 shuffle,热点 key 所在分区数据量大,且 build 端 Hash 表可能 OOM

Broadcast Join

小表广播到各 executor,无 shuffle

不产生倾斜

(每个 Task 本地 join),但只适用于小表

只要涉及按 key 的 shuffle,热点 key 就会导致倾斜。

3. 如何快速定位倾斜的 key?

  • 看 Spark UI:在 Stages 页面查看各 Task 的输入数据量、处理时间,找出明显偏大的 Task,对应的是倾斜分区。
  • 采样统计:对 Join key 做 groupByKeycountByKey,找出 count 最大的 top N 个 key。
  • 自定义指标:在算子中统计每个 key 的数量,写入监控或日志,识别热点 key。

题目二:Join 倾斜的常见解决方案

要求:列举并说明 3~5 种 Spark Join 倾斜的解决思路,并说明各自适用场景。

参考答案要点

1. 广播 Join(Broadcast Join)

  • 做法:把小表 broadcast 到各 executor,与大表做 map 侧 join,无 shuffle。
  • 适用:小表足够小(通常 < 10MB~100MB,视 executor 内存而定)。
  • 优点:彻底避免 shuffle,无倾斜;性能最好。
  • 局限:大表无法 broadcast,只适用于小表。

2. 加盐打散(Salting / 两阶段 Join)

  • 做法: 对倾斜 key 加随机前缀或者后缀,如 hotKey → hotKey_0、hotKey_1、…、hotKey_(N-1)大表:倾斜 key 加 N 个随机前缀,非倾斜 key 加 1 个固定前缀(如 _0)小表:数据膨胀 N 倍,每条记录复制 N 份并分别加上 _0~_N-1 后缀按「带前缀或者后缀的 key」做 Join,得到局部结果再去掉前缀或者后缀,按原始 key 做一次聚合(如 groupBy + sum)得到最终结果
  • 适用:已知或可识别的热点 key,且小表数据量可接受膨胀。
  • 优点:有效分散热点 key 的压力。
  • 局限:小表膨胀会增加内存和网络开销;需要事先识别倾斜 key。

3. 拆分热点 key 单独处理

  • 做法:将倾斜 key 过滤出来,单独做 Join(可用 broadcast join 或单机处理),非倾斜 key 做普通 Join,最后 union 结果。
  • 适用:倾斜 key 数量较少,且可单独处理。
  • 优点:思路清晰,避免影响正常 key 的 Join 性能。
  • 局限:需要两次 Join + union,代码复杂度增加。

4. 增加分区数

  • 做法:通过 spark.sql.shuffle.partitionsrepartition 增加 Join 的分区数。
  • 适用:轻度倾斜,或配合其他方案使用。
  • 局限:若某个 key 的数据量本身就远大于其他 key,增加分区无法让该 key 分散到多个分区,治标不治本。

5. 使用 Spark 3.x 的 AQE skew join 优化

  • 做法:开启 spark.sql.adaptive.enabledspark.sql.adaptive.skewJoin.enabled,AQE 会自动检测倾斜分区并拆分为多个子分区做 join。
  • 适用:Spark 3.x,无需手动识别热点 key。
  • 优点:自动化,对业务透明。
  • 局限:依赖 AQE,某些极端倾斜场景可能仍需手动优化。

题目三:加盐打散 Join 的详细流程

要求:以「大表 A Join 小表 B,key hotUser 倾斜」为例,说明加盐打散 Join 的完整步骤,并解释为什么小表需要膨胀。

参考答案要点

流程概览

1

倾斜 key

hotUser

加随机前缀 →

hotUser_0

hotUser_9

;非倾斜 key 加

_0

每条记录复制 N 份,分别加

_0

_9

后缀

保证 Join 时 key 能一一对应

2

(key_prefix)

Join

同上

热点 key 被分散到 10 个分区

3

去掉前缀,按原始 key 聚合

-

同一逻辑 key 的多个前缀桶结果合并

为什么小表需要膨胀?

  • 大表中 hotUser 被拆成 hotUser_0hotUser_9,会发往 10 个不同分区。
  • 若小表中 hotUser 对应的记录只有 1 条且不加后缀,Join 时该条记录只会进入 1 个分区,无法与另外 9 个分区中的 hotUser_1hotUser_9 匹配。
  • 因此需要将小表中 hotUser 对应的记录复制 N 份,每份加上 _0_N-1 后缀,这样每个分区都能拿到对应前缀的小表数据,Join 结果才能正确。

伪代码示例(Scala)

val numSalt = 10
val skewKeys = Set("hotUser")

// 大表加盐
val saltedA = bigTable.map { row =>
  val key = row.getString(0)
  val newKey = if (skewKeys.contains(key)) {
    key + "_" + (Random.nextInt(numSalt))
  } else {
    key + "_0"
  }
  (newKey, row)
}

// 小表膨胀
val saltedB = smallTable.flatMap { row =>
  val key = row.getString(0)
  (0 until numSalt).map { i =>
    (key + "_" + i, row)
  }
}

// Join 后去盐聚合
saltedA.join(saltedB)
  .map { case (saltedKey, (a, b)) => (saltedKey.dropRight(2), (a, b)) }  // 去掉 _i
  .groupByKey()
  .mapValues(_.head)  // 或按业务做聚合

题目四:Broadcast Join 与 Shuffle Join 的选择

要求

  1. Spark 何时自动选择 Broadcast Join?
  2. 什么情况下 Broadcast Join 反而会变慢或 OOM?
  3. 如何手动指定 Broadcast Join?

参考答案要点

1. 自动选择 Broadcast Join 的条件

  • 小表大小小于 spark.sql.autoBroadcastJoinThreshold(默认 10MB)。
  • 等值 Join。
  • 非 full outer join(Broadcast 不支持 full outer)。

2. Broadcast Join 变慢或 OOM 的原因

  • 小表过大:超过 executor 内存或 broadcast 阈值,导致广播耗时、OOM。
  • 大表分区过多:每个 Task 都要接收一份小表副本,网络和内存压力大。
  • 倾斜在 build 端:若 broadcast 的是大表(不常见),会导致 driver 或 executor OOM。

3. 手动指定 Broadcast Join

-- Spark SQL
SELECT /*+ BROADCAST(small_table) */ *
FROM big_table
JOIN small_table ON big_table.id = small_table.id

// DataFrame API
import org.apache.spark.sql.functions.broadcast
bigDF.join(broadcast(smallDF), "id")

题目五:综合场景题

场景:表 A 1 亿行,表 B 1000 万行,按 user_id Join。发现 user_id = 12345 在表 A 中有 5000 万行,其他 user_id 分布均匀。请问你会如何优化?

参考答案要点

  1. 识别问题user_id = 12345 是热点 key,导致按 user_id shuffle 时该分区严重倾斜。
  2. 方案选择: 若表 B 可 broadcast(< 百兆级):优先考虑 Broadcast Join,可规避 shuffle 倾斜。若表 B 无法 broadcast:采用拆分热点 key 或加盐打散: 将 user_id = 12345 从两表分别过滤出来,单独做 Join(可用 broadcast 或小规模处理)其余数据做普通 Join最后 union 两部分结果或对 user_id = 12345 加盐打散,小表相应膨胀,按「盐化 key」Join 后再聚合。
  3. Spark 3.x:可开启 AQE skew join,先观察是否自动缓解,若不足再叠加上述方案。

Spark Catalyst 与 Tungsten

题目

1. Catalyst 和 Tungsten 分别解决什么问题?一句话概括各自职责,并说明它们分别作用于 Spark 的哪个阶段(逻辑计划 / 物理计划 / 执行)。

2. Catalyst 的优化流程是怎样的?请说出至少 4 种常见的逻辑优化规则,并各举一个简单例子说明「优化前 / 优化后」的差异。

3. 什么是谓词下推(Predicate Pushdown)?结合「读 Parquet + WHERE + SELECT 部分列」的场景,说明 Catalyst 如何减少 I/O 和计算量。

4. Tungsten 的三大改进是什么?分别解决 JVM/传统执行引擎的什么问题?

5. 什么是 Whole-Stage Codegen(全阶段代码生成)?它如何提升 CPU 执行效率?和 RDD 的逐算子执行有什么本质区别?

6. 为什么 DataFrame/SQL 通常比等价的 RDD 实现更快?请从 Catalyst 和 Tungsten 两方面各举一点说明。

参考答案要点

1. Catalyst vs Tungsten:职责与作用阶段

Catalyst

逻辑计划

做等价变换,少读少算、优化 join/聚合顺序等

逻辑计划 → 物理计划

(优化「要算什么」)

Tungsten

物理执行

时用二进制存储和代码生成,提升 CPU/内存效率

物理计划执行

(优化「怎么算」)

  • Catalyst:决定「算哪些数据、用哪种算子、顺序如何」。
  • Tungsten:决定「数据在内存里怎么存、循环怎么跑、如何减少 GC 和虚调用」。

2. Catalyst 优化流程与常见规则

优化流程(简述):

  1. 解析:SQL / DataFrame API → 逻辑计划(Logical Plan)树。
  2. 逻辑优化:对逻辑计划应用一系列规则(Rule),如谓词下推、列剪裁等,得到优化后的逻辑计划。
  3. 物理计划:将逻辑计划转换为物理计划(Physical Plan),选择具体算子(如 BroadcastHashJoin、SortMergeJoin)。
  4. 成本模型 / 规则:在多个物理计划中选一个(或按规则选),生成可执行计划。

常见逻辑优化规则与示例:

谓词下推

把过滤条件尽量推到数据源或 join 前

Scan(t) → Filter(age>18)

变为

Filter(age>18) → Scan(t)

,数据源只读满足条件的行/块

列剪裁

只保留后续用到的列,不读无用列

SELECT name, age FROM t

时,Parquet 只读

name

age

列,不读其它列

常量折叠

在编译期计算常量表达式

WHERE 1=1 AND age > 18

→ 只保留

age > 18

谓词/投影合并

合并多个 Filter、多个 Project

Filter(a) → Filter(b)

合并为一次

Filter(a AND b)

3. 谓词下推 + 列剪裁示例

场景SELECT name, age FROM users_parquet WHERE age > 18 AND city = 'Beijing'

  • 不优化:全表扫描 Parquet,读所有列,再在 Spark 里做 age > 18city = 'Beijing'
  • Catalyst 优化后: 谓词下推:把 age > 18、city = 'Beijing' 推到 Parquet Reader,利用 row group 统计信息跳过不满足的块,或只扫描满足条件的 row group。列剪裁:只读 name、age、city 三列(甚至只读过滤需要的列 + 最终要的列),不读其它列。

结果:I/O 和内存中的计算量都显著减少,逻辑等价。

4. Tungsten 三大改进

Unsafe Row / 二进制存储

JVM 对象多、GC 压力大、缓存不友好

将多列压成连续二进制(类似 C struct),减少小对象和指针,可 off-heap

Whole-Stage Codegen

虚函数调用多、分支多、CPU 流水线利用率低

把多算子融合成一段手写风格的紧凑循环,减少调用与分支

Cache-friendly 聚合/Join

传统 Hash 表随机访问多、cache miss 高

用连续内存/数组做 hash 表,列式访问,提高 cache 命中率

5. Whole-Stage Codegen 与 RDD 的区别

Whole-Stage Codegen:

  • 多个算子(如 Filter + Project + HashAggregate)在物理计划中合并为一个「代码生成节点」。
  • 运行时动态生成一段类似手写的 Java 字节码(或 JVM 可执行的代码),在一个大循环里完成:读二进制 → 过滤 → 投影 → 聚合,中间少用 Row 对象和虚调用。
  • 效果:CPU 流水线更满、分支更少、cache 更友好,吞吐提升。

与 RDD 的本质区别:

  • RDD:每个算子(map、filter、reduceByKey)是独立的 Stage/迭代,数据以「对象」形式在算子间传递,每层都有虚调用和可能的新对象。
  • Tungsten + Codegen:多个算子在同一段生成代码里完成,数据以二进制在寄存器/缓存中流动,几乎没有中间对象和跨算子调用。

<font style="color:#DF2A3F;">6. 为什么 DataFrame/SQL 比等价 RDD 更快?</font>

Catalyst 方面:

  • 有 schema 和逻辑计划,可以做谓词下推、列剪裁等,少读少算;RDD 是「黑盒」元素,引擎无法做这类优化。
  • 可以优化 join 顺序、选择 broadcast join 等,RDD 需要手写。

Tungsten 方面:

  • DataFrame/SQL 的物理执行走 Unsafe Row + Codegen,内存和 CPU 效率高;RDD 默认是 JVM 对象 + 逐算子执行,GC 和虚调用开销大。
  • 聚合/join 使用 Tungsten 的列式、cache-friendly 实现,而 RDD 的 groupByKey/reduceByKey 是通用对象流。

总结:DataFrame/SQL 在「逻辑层」被 Catalyst 少算少读,在「执行层」被 Tungsten 算得更快,所以同样业务逻辑下通常比 RDD 更快。

reduceByKey 数据倾斜

题目

场景:有一份用户行为 RDD,每条记录是 (userId, 行为次数),需要按 userIdreduceByKey 汇总每个用户的总次数。线上发现某个热点用户(如 "hotUser")的数据量极大,导致 reduceByKey 时该 key 所在分区处理时间远高于其他分区,出现明显数据倾斜

要求

  1. 说明 reduceByKey 数据倾斜 产生的原因(为什么某个 key 会集中到少数分区、带来什么问题)。
  2. 给出一种经典的解决思路:给倾斜的 key 加随机前缀打散 → 第一次 reduceByKey → 去掉前缀 → 第二次 reduceByKey,并说明每一步的作用。
  3. 为什么去掉前缀后不能只做 mapToPair,而必须再做一次 reduceByKey?请用简单例子说明。
  4. 除「加前缀打散」外,还能举出 1~2 种常见的数据倾斜处理思路吗?

参考答案要点

1. reduceByKey 数据倾斜产生的原因

  • 分区方式:reduceByKey 按 key 的 hash(key) % numPartitions 决定该 key 去哪个分区。同一个 key 一定落在同一个分区
  • 倾斜表现:若某个 key(如 "hotUser")对应的 (key, value) 条数或数据量远大于其他 key,则承载该 key 的分区会收到远多于其他分区的数据,该分区成为瓶颈。
  • 带来的问题:该分区所在 Task 运行时间过长、可能 OOM;整体作业时间被拖慢,资源利用不均。

2. 经典解法:加前缀打散 + 两次 reduceByKey

思路:让「热点 key」不再以单一 key 出现,而是拆成多个「带随机前缀的 key」,把压力分散到多个分区;先对带前缀的 key 做一次聚合,再去掉前缀做第二次聚合,得到与原始 reduceByKey 等价的结果。

1. 打散

对已知的倾斜 key,在 map 阶段加随机前缀,如

hotUser

hotUser_0

hotUser_1

、…、

hotUser_(N-1)

同一逻辑 key 被拆成 N 个不同 key,分散到不同分区,减轻单分区压力

2. 第一次 reduceByKey

对「带前缀的 key」做聚合

在各分区内先做局部聚合,数据量已大幅下降

3. 去掉前缀

mapToPair:把

hotUser_i

还原为

hotUser

,其他 key 不变

恢复「逻辑 key」,为最终汇总做准备

4. 第二次 reduceByKey

对「原始 key」再做一次聚合

把同一逻辑 key 的多个前缀桶的部分结果合并成最终一个值,等价于原始语义

伪代码示例

// 假设 pairRdd 是 JavaPairRDD<String, Integer>,skewKey = "hotUser",numSalt = 10
JavaPairRDD<String, Integer> salted = pairRdd.mapToPair(kv -> {
    if (skewKey.equals(kv._1)) {
        int prefix = new Random().nextInt(numSalt);
        return new Tuple2<>(kv._1 + "_" + prefix, kv._2);
    }
    return kv;
});
JavaPairRDD<String, Integer> firstAgg = salted.reduceByKey(Integer::sum);
JavaPairRDD<String, Integer> finalResult = firstAgg
    .mapToPair(kv -> {
        if (kv._1.startsWith(skewKey + "_"))
            return new Tuple2<>(skewKey, kv._2);
        return kv;
    })
    .reduceByKey(Integer::sum);

3. 为什么去掉前缀后必须再做一次 reduceByKey?

  • 第一次 reduceByKey 之后,同一个逻辑 key(如 hotUser)会对应多条记录:(hotUser_0, sum0)(hotUser_1, sum1)、…。
  • 去掉前缀后变成:(hotUser, sum0)(hotUser, sum1)、…,即同一个 key 对应多个 value,还没有聚合成「一个总结果」。
  • 若只做 mapToPair 去掉后缀而不做第二次 reduceByKey,结果集中会有多个 (hotUser, 部分和)不符合「按 key 聚合成一个值」的语义
  • 因此必须对「去掉前缀后的 RDD」再做一次 reduceByKey,把同一 key 的多个部分和合并成最终的一个和,才与「不做打散时的 reduceByKey」结果等价。

4. 其他常见的数据倾斜处理思路

  • 过滤或单独处理热点 key:若业务允许,可先把倾斜 key 过滤出来单独算(如单机或小 RDD),再与正常 key 的结果合并。
  • 增加分区数:适当增加 reduceByKey 的分区数(如 reduceByKey(func, numPartitions)),可减轻单分区数据量,但若倾斜 key 仍远多于其他 key,单 key 仍会集中在一个分区,治标不治本;配合「打散」更有效。
  • 两阶段聚合(加前缀打散):即本题中的经典方案,适合已知或可识别的热点 key。
  • 使用广播 + map 侧 join:若倾斜发生在 join 上,可把小表广播,避免大表按倾斜 key shuffle;或对倾斜 key 单独用 broadcast join,非倾斜 key 用普通 join 再 union。

mapPartitions

5.1. mapPartitions 用法总结

1. 是什么

  • mapPartitions 是 Spark RDD 的 Transformation 算子,对 RDD 的每个分区施加一个函数,输入是分区内元素的迭代器 Iterator[T],输出是新的迭代器 Iterator[U]
  • map 的区别:map 是逐元素处理(一个输入 → 一个输出);mapPartitions 是按分区处理(一个分区 → 一个迭代器),在分区级别做批量逻辑。

2. 为什么用

减少调用次数

分区内 N 条只调用 1 次函数,而不是 N 次,降低序列化/反序列化和任务调度开销。

分区级共享

可在分区内建连接池、打开文件、初始化资源,整分区复用,避免每条数据都创建/关闭连接。

批量 I/O

适合按分区批量写库、批量写文件,比逐条写更高效。

与分区数对应

分区数 = 并行度;若 1 个分区对应 1 个数据源分片(如 1 个 dt),则 mapPartitions 自然实现“按分片并行”。

3. 怎么用(Java 示例)

// 签名:mapPartitions(f: Iterator[T] => Iterator[U])
JavaRDD<String> result = rdd.mapPartitions((Iterator<Row> partition) -> {
    List<String> out = new ArrayList<>();
    while (partition.hasNext()) {
        Row row = partition.next();
        out.add(process(row));  // 或做批量 DB 写入等
    }
    return out.iterator();
});

  • 入参:当前分区内元素的 Iterator<T>
  • 返回值:新元素的 Iterator<U>(可为空迭代器)。
  • 注意:不要在处理中缓存整个分区到内存(如 iter.toList() 再慢慢处理),数据量大时易 OOM;应流式处理迭代器。

4. 与分区数的关系

  • mapPartitions 的并行度 = RDD 的分区数:有几个分区,该算子就会起几个 Task,每个 Task 处理一个分区。
  • 分区数由上游决定(如 sc.parallelize(..., 4)df.repartition(4)、JDBC 读时的 numPartitions、按 dt 分次读再 union 等),不是由 mapPartitions 自己决定。
  • 若希望“4 个 dt 分区 → 4 个 Spark 分区并行”,需要在读数据时保证 1 个 dt 对应 1 个 RDD 分区(例如按 dt 分次读再 union),mapPartitions 就会在 4 个分区上并行。

5. 典型用法小结

  • 逐行转换:分区内遍历迭代器,每条转成新类型,返回新迭代器(等价于 map,但调用次数少)。
  • 分区内建连接:在迭代器处理前创建 DB 连接/连接池,分区内每条复用,处理完关闭,避免每条数据都建连。
  • 按分区批量写:分区内攒一批再写 HBase/MySQL/文件,减少 I/O 次数。
  • 与 repartition/coalesce 配合:先调整分区数再 mapPartitions,控制并行度与数据倾斜。

5.2. 大数据面试题:mapPartitions

题目:

在 Spark 中,从一张按天分区的 Hive 表(分区字段为 dt)里读取最近 7 天的数据,要求:

  1. 天(dt)并行读取,即 7 个分区并行;
  2. 使用 mapPartitions 对每个分区的数据做处理(例如清洗、过滤或聚合);
  3. 最终每个分区只输出前 100 条记录。

请简要说明实现思路(读数据、分区设计、mapPartitions 里做什么),并指出若改用 map 实现“每个分区只输出前 100 条”会有什么问题。

参考答案要点:

  1. 读数据与分区设计先拿到最近 7 天的 dt 列表(如通过 Hive 元数据或一次小查询)。按 每个 dt 分别读(例如每个 dt 一个 spark.read.table(...).filter($"dt" === dt) 或 JDBC 按 dt 分次查),再 union 成一个大 DataFrame/RDD,这样自然得到 7 个 RDD 分区,与 7 个 dt 一一对应,实现“按天并行”。
  2. mapPartitions 里做什么输入:当前分区内该 dt 的所有记录(可能很多)。逻辑:在分区内只取前 100 条(例如用计数器,或转成列表后 take(100)),再做业务处理(清洗/过滤等),返回这最多 100 条结果的迭代器。这样每个分区最多输出 100 条,总数据量可控,且 7 个分区并行执行。
  3. 若用 map 实现“每个分区只输出前 100 条”会有什么问题map 是逐元素无状态的,单条记录无法知道“自己是不是分区内前 100 条”,无法在 map 内做“分区内前 100”的截断。要实现“每分区前 100 条”,要么在 map 之后用 take(100) 等行动算子(但那是全分区或全局的语义,不是“每个分区各 100 条”),要么在 mapPartitions 里按分区维度处理,在分区内计数/截断。因此“每个分区只输出前 N 条”这类分区级逻辑,用 mapPartitions 更合适;用 map 难以正确且高效地实现。

加分点:

  • 能区分 RDD 分区数与业务上的“天(dt)分区”,并说明如何通过读数据方式让两者一致。
  • 能提到 mapPartitions 的注意点:分区内不要一次性把迭代器全拉入内存,否则大分区易 OOM;应流式处理或限制条数(如前 100 条)。

reduceByKey 与 groupByKey

题目

场景:有一份日志 RDD,每条记录是 (userId, 点击次数),例如:

(user1, 3), (user2, 1), (user1, 2), (user3, 5), (user2, 4), ...

请用 Spark RDD 实现:按 userId 汇总每个用户的总点击次数

要求

  1. 分别用 groupByKeyreduceByKey 两种方式实现。
  2. Shuffle 数据量、内存、适用场景 三方面比较这两种写法,并说明在大数据场景下更推荐哪一种及原因。
  3. 若坚持用 groupByKey 做「按 key 求和」,如何改进能减少 Shuffle?(提示:可考虑先做一次「本地聚合」再 groupByKey,或换用其他算子。)

参考答案要点

1. 两种实现

reduceByKey:

pairRDD.reduceByKey(Integer::sum);

  • 先在各分区内按 key 做本地聚合(map 端 combine),再 shuffle,最后再按 key 做最终聚合。

groupByKey:

pairRDD.groupByKey().mapValues(iter -> {
    int sum = 0;
    for (Integer v : iter) sum += v;
    return sum;
});

  • 先把同一 key 的所有 value 都 shuffle 到同一分区,再在分区内对每个 key 的 value 列表求和。

2. 对比(为什么说 reduceByKey 更优)

Shuffle 数据量

分区内先聚合,每个 key 只传一个「部分和」,数据量小

同一 key 的每条 (userId, 点击次数) 都参与 shuffle,数据量大

内存

reduce 端只对「部分和」再聚合,单 key 占用小

reduce 端要先收齐同一 key 的

全部 value

再处理,易 OOM

适用场景

按 key 做聚合(sum、max、count 等)

必须保留「每个 key 下的全部 value 列表」时(如去重、排序、复杂逻辑)

结论:做「按 key 求和」这类聚合时,应优先用 reduceByKey(或 aggregateByKey),避免用 groupByKey。

3. 若用 groupByKey 做「按 key 求和」的改进

  • 不推荐继续用 groupByKey 做求和;应改为 reduceByKeyaggregateByKey,本质上就是「先本地聚合再 shuffle」。
  • 若题目坚持 groupByKey:可说明「先 map 端用 HashMap 做一次本地 sum(类似 combine),再按 key 发出去」,但这等价于 reduceByKey 的 combiner 思路,标准答案仍是:这类聚合场景用 reduceByKey,避免 groupByKey

一句话总结

按 key 做聚合(如求和、求最大)时,用 reduceByKey 会先做本地聚合再 shuffle,Shuffle 量和内存都更小;groupByKey 会把同一 key 的所有 value 都 shuffle 过去,不做合并,只适合「必须保留全量 value」的场景。

// 数据: (部门, 员工名)
JavaPairRDD<String, String> deptEmployee = jsc.parallelizePairs(Arrays.asList(
    new Tuple2<>("技术", "张三"),
    new Tuple2<>("产品", "李四"),
    new Tuple2<>("技术", "王五"),
    new Tuple2<>("技术", "赵六")
));

// groupByKey -> 每个 key 对应一个 Iterable<value>
JavaPairRDD<String, Iterable<String>> byDept = deptEmployee.groupByKey();

byDept.collect().forEach(t -> {
    System.out.print(t._1 + ": ");
    t._2.forEach(name -> System.out.print(name + " "));
    System.out.println();
});
// 技术: 张三 王五 赵六
// 产品: 李四

aggregateByKey

题目

场景**:有一份学生成绩 RDD,每条记录是 **(学生姓名, 单科分数)**,例如:**

(张三, 80), (李四, 90), (张三, 70), (王五, 85), (李四, 88), (张三, 90), ...

请用 Spark RDD 实现:按学生姓名汇总,得到每个学生的平均分(结果类型为 **(姓名, 平均分)**)。

要求**:**

  1. aggregateByKey 实现「按 key 求平均」,并写出 **zeroValue****seqOp****combOp**** 的含义。**
  2. 为什么这里不能用 reduceByKey 直接求平均?reduceByKey 和 aggregateByKey 在「输入/输出类型」上有什么本质区别?
  3. 若用 groupByKey 再在 value 上求平均,和 aggregateByKey 相比有什么劣势?

参考答案要点

1. 用 aggregateByKey 实现「按 key 求平均」

思路:先按 key 聚合成 **(总分, 个数)**,再算 **总分/个数**

Java 示例:

// 假设 scores 是 JavaPairRDD<String, Integer>
JavaPairRDD<String, Tuple2<Integer, Integer>> sumCount = scores.aggregateByKey(
    new Tuple2<>(0, 0),                                    // zeroValue: 初始 (总分=0, 个数=0)
    (acc, v) -> new Tuple2<>(acc._1 + v, acc._2 + 1),      // seqOp: 分区内,累加分数和次数
    (a, b) -> new Tuple2<>(a._1 + b._1, a._2 + b._2)       // combOp: 分区间,合并两个 (sum, count)
);
JavaPairRDD<String, Double> avgByKey = sumCount.mapValues(t -> (double) t._1 / t._2);

参数含义:

zeroValue

每个 key 的初始聚合值,这里用

**(0, 0)**

表示 (总分, 个数),类型可与 value 不同

seqOp

**分区内:当前聚合值

**(sum, count)**

遇到一个分数

**v**

,更新为 **

**(sum+v, count+1)**

combOp

**shuffle 后分区间:把两个

**(sum1, count1)**

**(sum2, count2)**

合并成 **

**(sum1+sum2, count1+count2)**

2. 为什么不能用 reduceByKey 直接求平均?类型上的区别?

  • reduceByKey**:聚合函数是 **(V, V) => V**,输入和输出类型都是 V。若 value 是分数 **Int**,只能不断「两个分数合并成一个分数」(如相加),最终得到的是总分,无法同时维护「个数」来算平均。**
  • aggregateByKey**:可以指定聚合结果类型 U ≠ V。这里 value 是 **Int**,聚合中间结果是 **(Int, Int)** 即 (总分, 个数),最后再 **mapValues** 算平均。所以「按 key 求平均」这类结果类型与 value 不同的聚合,要用 aggregateByKey(或先 map 成 (key, (v,1)) 再用 reduceByKey,但写法更啰嗦)。**

本质区别**:reduceByKey 是「同类型聚合」;aggregateByKey 支持「异类型聚合」和自定义初始值、分区内/分区间不同逻辑。**

3. groupByKey 再求平均 vs aggregateByKey

Shuffle 量

分区内先聚成 (sum, count),只 shuffle 少量元组

同一 key 的

每条分数

都 shuffle,数据量大

内存

reduce 端只维护 (sum, count)

reduce 端要收齐该 key 的

全部分数列表

,易 OOM

适用

聚合场景首选

需要「全量 value 列表」时才用

结论:做「按 key 求平均」应用 aggregateByKey,避免 groupByKey。

一句话总结

aggregateByKey 适合「按 key 聚合且结果类型与 value 不同」的场景(如按 key 求平均、按 key 构造 (sum, count) 或集合);理解 zeroValue、seqOp(分区内)、combOp(分区间)三个参数,并能与 reduceByKey、groupByKey 区分即可。

// 数据: (学生名, 分数)
JavaPairRDD<String, Integer> scores = jsc.parallelizePairs(Arrays.asList(
    new Tuple2<>("张三", 80),
    new Tuple2<>("李四", 90),
    new Tuple2<>("张三", 70),
    new Tuple2<>("李四", 85),
    new Tuple2<>("张三", 90)
));

// aggregateByKey(初始值)(分区内合并, 分区间合并)
// 初始值: (0, 0) 表示 (总分, 个数)
// seqOp: 分区内,把当前分数合并进 (sum, count)
// combOp: shuffle 后,把两个 (sum, count) 合并
JavaPairRDD<String, Tuple2<Integer, Integer>> sumCount = scores.aggregateByKey(
    new Tuple2<>(0, 0),           // zeroValue: (sum=0, count=0)
    (acc, v) -> new Tuple2<>(acc._1 + v, acc._2 + 1),   // seqOp: 分区内
    (a, b) -> new Tuple2<>(a._1 + b._1, a._2 + b._2)    // combOp: 分区间
);

// 再算平均
JavaPairRDD<String, Double> avgByKey = sumCount.mapValues(t -> (double) t._1 / t._2);
avgByKey.collect().forEach(t -> System.out.println(t._1 + " -> " + t._2));
// 张三 -> 80.0, 李四 -> 87.5

repartition 与 coalesce

题目

场景:某 Spark 任务从 1000 个分区读取数据,经过 filter 后大量分区变空或数据很少,最后要写出到 HDFS,希望只生成 10 个文件。

要求

  1. 应使用 repartition(10) 还是 coalesce(10)?说明理由。
  2. repartition 和 coalesce 在「是否 shuffle」「分区数增减」「典型用途」上的区别是什么?
  3. 什么情况下必须用 repartition、什么情况下优先用 coalesce?

参考答案要点

1. 本例应选 coalesce(10)

  • 目标:从很多分区减少到 10 个分区,且主要是合并分区、写出少量文件。
  • coalesce(10):只做分区合并,默认不触发全量 shuffle,性能更好,适合「只减不增」。
  • repartition(10):会触发全量 shuffle,数据会重新均匀分布,本例不需要重分布,用 repartition 会多一次不必要的 shuffle。

结论:减少分区且不要求重新打散数据时,优先用 coalesce(10)

2. repartition 与 coalesce 对比

是否 shuffle

会,全量重分布

默认不 shuffle,只合并现有分区

分区数

可增可减

一般只减不增(不 shuffle 时无法真正增加)

典型用途

增加分区、均匀重分布、主动 shuffle

减少分区、合并小分区、写少量文件

3. 何时用 repartition / coalesce

  • 必须或适合用 repartition: 需要增加分区数(如从 2 个分区扩到 100 个);需要均匀重分布(如 filter 后数据倾斜,想重新打散);宽依赖、需要一次按新分区规则的 shuffle。
  • 优先用 coalesce: 只需要减少分区数(如 1000 → 10);filter 后分区过多、过小,想合并成较少分区再计算或写文件;不要求分区内数据绝对均匀,能接受「相邻分区合并」带来的略不均匀。

补充coalesce(n, shuffle = true) 会触发 shuffle,效果接近 repartition(n),在「减分区但希望更均匀」时可用。

一句话总结

减少分区、且不想全量 shuffle 时用 coalesce;增加分区或需要均匀重分布时用 repartition。

Spark Action 算子

题目一:Transformation 和 Action 的区别

问题**:Transformation 和 Action 的区别是什么?各举 3 个例子,并说明「真正的计算」是在哪一类算子执行时触发的?**

参考答案要点

Transformation

定义 RDD 的转换关系,返回新 RDD

否,懒执行

map、filter、reduceByKey、groupByKey

Action

需要得到结果或产生副作用,触发 Job

collect、count、take、reduce、foreach、saveAsTextFile

触发计算**:只有遇到 Action 时,Spark 才会根据前面的 Transformation 链生成 DAG、划分 Stage、提交 Job 并执行。Transformation 只是记录依赖,不执行。**

题目二:collect() 与 OOM、替代方案

问题**:**collect()** 为什么在大数据场景下容易导致 Driver OOM?在什么场景下可以用哪些 Action 替代?**

参考答案要点

  • collect()** 会把 整个 RDD 的数据拉到 Driver 内存。数据量大时,Driver 内存装不下就会 OOM。**
  • 替代思路**:** 只要前几条:用 take(n) 或 first()。要个数:用 count()。要写出去:用 saveAsTextFile / saveAsSequenceFile 等,不要先 collect 再写。要抽样:takeSample 或先 sample 再 take/collect。
  • 原则**:大数据量下尽量避免对全量 RDD 做 collect(),优先用 take、count、写文件、抽样等 Action。**

题目三:reduce 与 fold

问题**:**reduce****fold** 的区别是什么?使用 **fold** 时对 zeroValue 有什么要求?**

参考答案要点

  • reduce**:**(T, T) => T**,无初始值,从第一个元素开始聚合;空 RDD 会报错。**
  • fold**:有 zeroValue,每个分区先以 zeroValue 为起点聚合,最后分区间合并时还会再用一次 zeroValue。**
  • 对 fold 的 zeroValue 要求**:必须是该聚合运算的 单位元(和任何元素运算不改变结果),例如:求和用 0,求积用 1,列表拼接用空列表。否则会多算多次 zeroValue,结果错误。**

一句话总结

Action 才会触发计算;collect 易 OOM,大数据量用 take/count/save 等替代;fold 的 zeroValue 必须是单位元。

coalesce 减少分区为什么不产生 shuffle

题目

问题**:Spark 的 **coalesce** 减少分区时,为什么说不产生 shuffle?相邻的多个分区会合并为一个分区,数据不是从多个父 RDD 传到子 RDD 吗,这样难道不产生 shuffle 吗?**

参考答案要点

1. Spark 里「shuffle」指什么?

  • Shuffle** = Map 端按 Partitioner(如 **hash(key) % numPartitions**)把数据重新分区并写出(shuffle write),Reduce 端再按新分区拉取(shuffle read)。**
  • 特点:每条数据都会按新的分区规则重新决定去哪个分区,通常伴随大量跨节点传输。

2. coalesce 减少分区时在做什么?

  • 不引入新的 Partitioner**,不做「按 key 重新分区」。**
  • 只是做分区索引的合并:例如父 RDD 的 partition 0、1 → 子 RDD 的 partition 0;partition 2、3 → partition 1……
  • 子分区和父分区的对应关系是固定的、按索引连续合并,不是按 key 重分布。
  • 实现上:子 RDD 的一个分区对应一个 task,该 task 按顺序读那几个父分区的迭代器(或拉取块),在内存里拼成一个大迭代器,不经过「按 key 写 + 按分区读」的 shuffle 流程

3. 为什么说「不产生 shuffle」?

是否按新 Partitioner 重算每条记录的去向

否,只按分区索引合并

是否有 shuffle write + shuffle read

有(ShuffleDependency)

数据移动方式

全量重分布,通常跨节点

子分区直接读/拉取那几个父分区;同节点则本地合并

  • 同一 Executor 上的多个父分区****合并时:直接在本地拼数据,无网络、无 shuffle
  • 跨 Executor** 时:需要拉取远程分区数据,走的是块拉取,不是「先按 Partitioner 写、再 shuffle read」的机制,在 API/依赖类型上仍不算一次 shuffle。**
  • 常见场景(如 1000 分区 → 10 分区):很多合并发生在同节点内,因此说「默认不触发全量 shuffle」。

4. 一句话总结

coalesce 减分区是「按索引合并分区 + 本地合并或按块拉取」,不做按 key 的重新分区,不经过 ShuffleDependency,因此不产生(我们平时说的)shuffle;数据确实从多个父分区进到一个子分区,但传的方式不是 shuffle 的写+读两阶段。

Spark 宽依赖与窄依赖

题目

1. 什么是窄依赖、什么是宽依赖?各举 3 个算子例子,并说明两者在「分区对应关系」和「是否 shuffle」上的区别。

2. Spark 为什么要区分宽依赖和窄依赖?Stage 是如何根据依赖类型划分的?

3. 给定一段 RDD 链:**textFile → map → filter → reduceByKey → collect**,请标出宽依赖、窄依赖,并说明会划分成几个 Stage、为什么。

参考答案要点

1. 窄依赖 vs 宽依赖

定义

父 RDD 的每个分区最多被

一个

子 RDD 分区使用

父 RDD 的一个分区可能被

多个

子 RDD 分区使用

分区对应

一对一或多对一

一对多(需按 key 重分布)

是否 shuffle

典型算子

map、filter、flatMap、mapPartitions、union

groupByKey、reduceByKey、repartition、join(按 key 时)

2. 为什么要区分?Stage 如何划分?

  • Stage 划分**:从 Action 往前回溯 DAG,遇到宽依赖就切一刀,形成 Stage 边界。每个 Stage 内部是一串窄依赖,可以合并成一批 Task 流水线执行;宽依赖处必须等前一 Stage 写出 shuffle 结果,再启动下一 Stage。**
  • 容错**:窄依赖只需重算丢失的父分区;宽依赖可能要从多个父分区重算并重新 shuffle。**
  • 调度与并行**:窄依赖可 pipeline,宽依赖是 shuffle 边界,决定 Task 划分和网络传输。**

3. 示例 RDD 链的依赖与 Stage

textFile → map → filter → reduceByKey → collect
           [窄]    [窄]      [宽]          Action

  • 窄依赖**:**textFile → map****map → filter**(无 shuffle,分区一一对应或合并)。**
  • 宽依赖**:**filter → reduceByKey**(reduceByKey 按 key 重分区,需要 shuffle)。**
  • Stage 数量**:2 个。 ** **Stage1:textFile → map → filter(一串窄依赖,可 pipeline)。 **Stage2:reduceByKey 之后到 collect(以 reduceByKey 的 shuffle 为边界)。

一句话总结

窄依赖 = 无 shuffle、父分区至多对应一个子分区;宽依赖 = 有 shuffle、一对多。Stage 在宽依赖处切分,窄依赖在 Stage 内流水线执行。

#牛客AI配图神器#

#大数据面试##大数据##spark#
全部评论

相关推荐

评论
1
1
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务