⚡aidp-spark-optimization
- ソース
- GitHub で見る ↗
説明
次のような場合に使用: Sparkジョブ/ノートブックの実行が遅い、SLAを達成できていない、スピル(spill)が発生している、OOM(メモリ不足)が起きている、スモールファイルが大量に生成されている、またはシャッフルやデータスキューが著しい場合。 Spark/PySparkコードやSpark UIをパフォーマンス観点でレビューする場合。 大規模なSparkワークロードを実行する前。 オープンソースのApache Spark 3.5.0(+Delta Lake)のチューニングを対象としており、以下の領域をカバーする: パーティション、シャッフル、ジョイン、スキュー、ファイルレイアウト、メモリ、コード生成(codegen)、キャッシング、AQE(Adaptive Query Execution)、圧縮、および各種設定パラメータ。
原文を表示
Use when a Spark job/notebook is slow, missing an SLA, spilling, OOMing, generating too many small files, or shuffling/skewing heavily; when reviewing Spark/PySpark code or a Spark UI for performance; or before running a large Spark workload. Covers open-source Apache Spark 3.5.0 (+ Delta Lake) tuning -- partitions, shuffle, joins, skew, file layout, memory, codegen, caching, AQE, compression, and configuration.
ユースケース
- ✓Sparkジョブの実行が遅いとき
- ✓SLAを達成できていないとき
- ✓メモリ不足やスピルが発生しているとき
- ✓Spark UIをパフォーマンス観点でレビューするとき
- ✓大規模なSparkワークロード実行前
本文(日本語訳)
Spark 最適化
フィールドで実証されたオープンソース Apache Spark 3.5.0(およびオープンソース Delta Lake)ワークロード向けの最適化テクニック集です。 各テクニックには、処理内容・重要な理由・パターン(効果がある場面)・アンチパターン(効果がない、または悪影響を及ぼす場面)・正確な設定/コード・実際の再現検証による効果の根拠を記載しています。
OSS Spark 3.5.0 をそのまま使用する環境を対象とします。
ベンダー固有機能(Databricks Photon / predictive optimization / Delta disk cache、AWS Glue/EMR ランタイム固有の設定)は除外しているか、明示的にフラグを立てています。
(注: Delta Lake の liquid clustering、OPTIMIZE/ZORDER、deletion vectors、CDF はオープンソース — Delta 3.2.0 で GA — references/08-delta-lake.md を参照。)
一部のサンプルは Oracle AI Data Platform(AIDP)上で実行しています。AIDP 固有の実行ルールは references/aidp-notes.md を参照してください。
このプラグイン(
oracle-ai-data-platform-workbench-engineer-agent)では: AIDP 上での計測・適用は、バンドル済みのscripts/aidp_sql.pyを使用してください (クラスターごとに 1 つの SparkSession —spark.confは明示的に set/revert すること。references/aidp-notes.md参照)。 Spark UI のステージ/タスクメトリクスの取得はaidp-spark-debuggingskill 経由で行います。 コンピュートの適正化およびクラスター作成時のみ設定可能なコンフィグ(例:spark.memory.fraction)の設定はaidp-cluster-ops経由で行います。 Delta のOPTIMIZE/VACUUM/ZORDERDDL の実行はaidp-sql-ddl/aidp-table-management経由で行います。ai-data-engineer-agentのspark-optimizationskill(Oracle)から派生。
基本原則
最適化とは計測のループであり、推測ではない。 検出 → 診断 → 提案 → 適用 → 同一データで再実行 → 同一メトリクスで比較。
Spark の処理時間の大半は、データの移動(シャッフル)・ストラグラーの待機(スキュー)・スピル(メモリ不足)・冗長な処理(再読み取り、再マテリアライズ、繰り返しアクション)・行単位の実行オーバーヘッド(広い codegen)のいずれかで失われます。 以下に挙げるほぼすべての改善策は、これらのいずれかを削減するものです。
次のような場合に使用
- job/notebook が遅い、SLA を達成できていない、またはコストが高すぎる場合。
- Spark UI で次のいずれかが見られる場合:
- 特定の1ステージが著しく遅い
- タスクの
p100/p50 > 2xのスキュー - シャッフルの読み取り/書き込みが大きい
- メモリ/ディスクへのスピル
- GC が高い
- 大量の小さなタスク/ファイルが存在する
- 以下のようなコードの問題が見られる場合:
- 同じテーブルへの
union - driver ループで
collect()/head()/count()を繰り返し呼び出している - 巨大なテーブルへの
MERGE - フィールド数が多い
groupBy(100フィールド以上) - 100,000件以上の小さなファイルの読み取り
- フィルタなしで大きなディメンションテーブルを join している
- 同じテーブルへの
- Oracle データベース(ADW/ATP/Exadata)に対して読み書きしている場合:
- シングルタスクの JDBC 読み取り
- 低速またはREDOログを逼迫させる数TBの書き込み
- 非対応の array/map/struct カラム(
references/09-oracle-database.md)
- 大規模なデータインジェスト/変換を開始する前に、クラスターと設定を最初から正しく整えたい場合。
使用しない場合:
正確性のバグ(デバッグを使用してください)、またはチューニングのコストが効果を上回るサブ秒単位のジョブには使用しません。
Structured Streaming の エンジン チューニング(trigger interval、maxOffsetsPerTrigger、state store)はスコープ外ですが、各マイクロバッチはバッチと同じ構造をしているため、以下の join/skew/partition/ファイル/codegen テクニックはバッチ単位で適用できます。
改善機会の見つけ方 → 参照すべきリファレンス
digraph route {
rankdir=LR; node [shape=box];
sym [shape=diamond, label="主な症状は?"];
sym -> "references/diagnosis.md" [label="不明 / まず計測が必要"];
sym -> "references/02-joins.md" [label="join が遅い、スキュー、SortMergeJoin"];
sym -> "references/04-memory-and-spill.md" [label="スピル、OOM、executor ロスト、GC"];
sym -> "references/cluster-sizing.md" [label="コンピュートの適正化: workers/OCPU/RAM"];
sym -> "references/03-file-layout-io.md" [label="小さなファイルが多い、スキャンが遅い、リスティング"];
sym -> "references/01-partitioning.md" [label="パーティション数が多すぎ/少なすぎ、並列性が低い"];
sym -> "references/05-codegen.md" [label="CPUバウンドな広い集計"];
sym -> "references/06-caching-materialization.md" [label="再読み取り、union、driver ループ、MERGE"];
sym -> "references/07-aqe.md" [label="ランタイムの適応的最適化を使いたい"];
sym -> "references/08-delta-lake.md" [label="Delta テーブル: 小ファイル、OPTIMIZE/ZORDER/clustering、VACUUM、deletion vectors、CDF"];
sym -> "references/09-oracle-database.md" [label="Oracle DB / ADW / Exadata への読み書き(JDBC 並列化、redo、複合型)"];
}
その後、references/config-matrix.md で正確なキー・デフォルト値・影響・設定可能な場所(notebook / クラスター作成時のみ / 変更不可)を確認し、references/quick-reference.md で影響度順のチェックリストを参照してください。
リファレンス一覧
| ファイル | 内容 |
|---|---|
references/diagnosis.md |
Spark UI から改善機会を見つけ・計測する方法: 症状→原因の対応、スキュー比率、確認すべきメトリクス、クエリ→ステージのタイミング分析。 |
references/01-partitioning.md |
パーティション数、並列性、shuffle.partitions、AQE coalescing、小さな/空のパーティション、repartition vs coalesce。 |
references/02-joins.md |
Broadcast join、Shuffle-Hash vs Sort-Merge、スキュー処理、semi-join 事前フィルタ、salting。 |
references/03-file-layout-io.md |
小ファイル問題、openCostInBytes/maxPartitionBytes、compaction/OPTIMIZE、partitionOverwriteMode、gzip の分割可能性、圧縮(zstd)。 |
references/04-memory-and-spill.md |
Spark メモリモデル、memory.fraction、スピルの検出と修正、オフヒープ、gzip の4倍則。 |
references/cluster-sizing.md |
AIDP コンピュートの適正化: workers × OCPU × RAM、並列性 vs メモリ、実行結果 → 推奨設定。 |
references/05-codegen.md |
WholeStage codegen、codegen.maxFields、生成コードが 遅くなる ケース。 |
references/06-caching-materialization.md |
キャッシュとキャッシュレイアウト、不要なマテリアライズの回避、同一テーブルの union、driver ループ、MERGE の代わりに JOIN を使う手法。 |
references/07-aqe.md |
Adaptive Query Execution: coalesce、skew join、閾値の相互作用。 |
references/08-delta-lake.md |
Delta Lake 3.2.0: OPTIMIZE/compaction(+VACUUMとの依存関係)、optimized writes、auto-compaction、ZORDER、liquid clustering、deletion vectors、CDF、data skipping、ファイルサイジング。optimizeWrite+zstd+binSize のレシピ。 |
references/09-oracle-database.md |
Oracle DB(ADW/ATP/Exadata): 並列 JDBC 読み取り(partitionColumn、fetchsize)、バルク書き込み(COPY_DATA、ADW専用)vs 通常JDBC書き込み、REDOログ/NOLOGGING フィールドのケースとトレードオフ、複合型(JSON-as-VARCHAR2)、Storage Partition Join。 |
references/config-matrix.md |
設定のマスターテーブル: デフォルト値・影響・設定可能な場所の分類。 |
references/case-studies.md |
実際のビフォー/アフターの数値(フィールドエンゲージメントおよび AIDP 再現結果)。 |
references/quick-reference.md |
影響度順チェックリスト + 症状→テクニックの対応表。 |
references/aidp-notes.md |
AIDP の実行モデル + 設定の安全な set/revert 方法(クラスターごとに 1 つの SparkSession)+ 早期メトリクス収集。 |
80/20 の法則: 効果の高い施策から着手する
典型的な効果の大きさ順にランク付けしています(詳細と注意事項は各参照ファイルを確認してください):
-
join での不要なシャッフルを排除する —
spark.sql.autoBroadcastJoinThresholdを引き上げ、小〜中規模のテーブルを broadcast させる。または semi-join 事前フィルタ で大きなディメンションを broadcast 可能なサイズまで絞り込む。(02-joins.md) -
小ファイル問題を解決する — ソースのコンパクションと出力ファイルサイズの調整;
maxPartitionBytes+openCostInBytesをチューニング。100,000件以上のファイルのリストアップと開封処理はデータ処理本体よりコストがかかることがある。(03-file-layout-io.md) -
スピルを防ぐためにメモリを適正化する — ユーザーデータ構造がない場合は
spark.memory.fractionを引き上げる(AIDP ではクラスター作成時のみ設定可能)。ディスクへのスピルはパフォーマンスを静かに数倍悪化させる。(04-memory-and-spill.md、cluster-sizing.md) -
スキューに対処する — AQE の skew-join、または semi-join 事前フィルタ + broadcast を使用する。
p100/p50 > 2xの join ステージは、ストラグラー1タスクにステージ全体がブロックされる。(02-joins.md、07-aqe.md) -
冗長な処理を排除する — 同じテーブルを
unionしない、driver でcollect()/head()をループ呼び出ししない、中間結果はビューとして扱いマテリアライズしない、完全再ロードには MERGE より JOIN を使う。(06-caching-materialization.md) -
シャッフル/スピル/出力を zstd で圧縮する —
spark.io.compression.codec=zstd+spark.sql.parquet.compression.codec=zstd: gzip 並みのコンパクトさと snappy 並みの速度を両立し、スピルディスクと出力サイズを削減する。(03-file-layout-io.md) -
並列性をチューニングする — 大きなシャッフルに対しては
shuffle.partitions≈ 総コア数の2〜3倍に設定し、AQE coalescing と組み合わせて小さな出力ファイルを回避する。(01-partitioning.md) -
非常に広い集計での codegen を制限する — フィールド数の多い
groupByが CPU バウンドになっている場合はspark.sql.codegen.maxFieldsを下げる。生成されたコードがフォールバック処理より 遅くなる ことがある。(05-codegen.md)
ワークフロー
- 計測する — ワークロードを実行し、ステ
原文(English)を表示
Spark Optimization
Field-tested techniques for optimizing open-source Apache Spark 3.5.0 (and open-source Delta Lake) workloads. Every technique lists what it does, why it matters, patterns (when it helps), anti-patterns (when it does not help or hurts), the exact config/code, and impact evidence from real reproductions.
Applies to plain OSS Spark 3.5.0. Vendor-only features (Databricks Photon / predictive optimization / Delta disk cache, AWS Glue/EMR runtime tricks) are excluded or explicitly flagged. (Note: Delta Lake liquid clustering, OPTIMIZE/ZORDER, deletion vectors, CDF are open-source — GA in Delta 3.2.0 — see references/08-delta-lake.md.) Some examples run on the Oracle AI Data Platform (AIDP); AIDP-specific execution rules are in references/aidp-notes.md.
In this plugin (
oracle-ai-data-platform-workbench-engineer-agent): measure/apply on AIDP via the bundledscripts/aidp_sql.py(one SparkSession per cluster -- set/revertspark.confexplicitly; seereferences/aidp-notes.md); pull Spark-UI stage/task metrics via theaidp-spark-debuggingskill; right-size compute + set cluster-create-only configs (e.g.spark.memory.fraction) viaaidp-cluster-ops; run DeltaOPTIMIZE/VACUUM/ZORDERDDL viaaidp-sql-ddl/aidp-table-management. Adapted from theai-data-engineer-agentspark-optimizationskill (Oracle).
Core principle
Optimization is a measurement loop, not a guess. Detect → diagnose → propose → apply → re-run on the same data → compare the same metrics.
Most Spark time is lost to moving data (shuffle), waiting on a straggler (skew), spilling (under-memory), doing redundant work (re-reads, re-materialization, repeated actions), or per-row execution overhead (wide codegen). Almost every win below reduces one of those.
When to use
- A job/notebook is slow, missing an SLA, or its cost is too high.
- Spark UI shows: a single slow stage, task
p100/p50 > 2xskew, large shuffle read/write, memory/disk spill, high GC, or thousands of tiny tasks/files. - You see code smells:
unionof the same table, a driver loop callingcollect()/head()/count()repeatedly,MERGEon a huge table, widegroupBy(100+ fields), reading 100k+ small files, an un-filtered join with a big dimension. - You read from / write to an Oracle database (ADW/ATP/Exadata): a single-task JDBC read, a slow or redo-saturating multi-TB write, or unsupported array/map/struct columns. (
references/09-oracle-database.md) - Before launching a large ingest/transform so the cluster + configs are right the first time.
When NOT to use: correctness bugs (use debugging), or sub-second jobs where tuning effort exceeds the payoff. Structured Streaming engine tuning (trigger interval, maxOffsetsPerTrigger, state store) is out of scope here — but each micro-batch is batch-shaped, so the join/skew/partition/file/codegen techniques below apply per batch.
How to find the opportunity → which reference to read
digraph route {
rankdir=LR; node [shape=box];
sym [shape=diamond, label="Dominant symptom?"];
sym -> "references/diagnosis.md" [label="not sure / need to measure"];
sym -> "references/02-joins.md" [label="slow join, skew, SortMergeJoin"];
sym -> "references/04-memory-and-spill.md" [label="spill, OOM, lost executors, GC"];
sym -> "references/cluster-sizing.md" [label="right-size compute: workers/OCPU/RAM"];
sym -> "references/03-file-layout-io.md" [label="many small files, slow scan, listing"];
sym -> "references/01-partitioning.md" [label="too many/few partitions, weak parallelism"];
sym -> "references/05-codegen.md" [label="CPU-bound wide aggregation"];
sym -> "references/06-caching-materialization.md" [label="re-reads, union, driver loop, MERGE"];
sym -> "references/07-aqe.md" [label="want runtime adaptivity"];
sym -> "references/08-delta-lake.md" [label="Delta table: small files, OPTIMIZE/ZORDER/clustering, VACUUM, deletion vectors, CDF"];
sym -> "references/09-oracle-database.md" [label="read/write Oracle DB / ADW / Exadata (JDBC parallelism, redo, complex types)"];
}
Then consult references/config-matrix.md for the exact key, default, impact, and where it can be set (notebook / cluster-create-only / non-modifiable), and references/quick-reference.md for the impact-ranked checklist.
Reference index
| File | Covers |
|---|---|
references/diagnosis.md |
How to find & measure opportunities from the Spark UI: symptom→cause, skew ratio, which metrics, query→stage timing. |
references/01-partitioning.md |
Partition counts, parallelism, shuffle.partitions, AQE coalescing, tiny/empty partitions, repartition vs coalesce. |
references/02-joins.md |
Broadcast join, Shuffle-Hash vs Sort-Merge, skew handling, semi-join pre-filter, salting. |
references/03-file-layout-io.md |
Small-file problem, openCostInBytes/maxPartitionBytes, compaction/OPTIMIZE, partitionOverwriteMode, gzip splittability, compression (zstd). |
references/04-memory-and-spill.md |
Spark memory model, memory.fraction, spill detection/fix, off-heap, the gzip-4x rule. |
references/cluster-sizing.md |
Right-size AIDP compute: workers x OCPU x RAM, parallelism vs memory, observed-run → recommendation. |
references/05-codegen.md |
WholeStage codegen, codegen.maxFields, when generated code is slower. |
references/06-caching-materialization.md |
Caching & cache layout, avoid unnecessary materialization, union-of-same-table, driver-loop, JOIN-over-MERGE. |
references/07-aqe.md |
Adaptive Query Execution: coalesce, skew join, the threshold interplay. |
references/08-delta-lake.md |
Delta Lake 3.2.0: OPTIMIZE/compaction (+VACUUM dependency), optimized writes, auto-compaction, ZORDER, liquid clustering, deletion vectors, CDF, data skipping, file sizing; the optimizeWrite+zstd+binSize recipe. |
references/09-oracle-database.md |
Oracle DB (ADW/ATP/Exadata): parallel JDBC reads (partitionColumn, fetchsize), bulk (COPY_DATA, ADW-only) vs conventional-JDBC writes, the redo-log / NOLOGGING field case + tradeoffs, complex types (JSON-as-VARCHAR2), Storage Partition Join. |
references/config-matrix.md |
Master config table: default, impact, and where-settable classification. |
references/case-studies.md |
Real before/after numbers (field engagements + AIDP reproductions). |
references/quick-reference.md |
Impact-ranked checklist + symptom→technique lookup. |
references/aidp-notes.md |
AIDP execution model + how to set/revert configs safely (one SparkSession per cluster) + collect-metrics-early. |
The 80/20: highest-impact moves first
Ranked by typical payoff (full details + caveats in the referenced files):
- Eliminate avoidable shuffle on joins — raise
spark.sql.autoBroadcastJoinThresholdso a small/medium side broadcasts; or semi-join pre-filter the big dimension down to broadcastable size. (02-joins.md) - Fix the small-file problem — compact sources / size output files; tune
maxPartitionBytes+openCostInBytes. Listing+opening 100k+ files can cost more than the data. (03-file-layout-io.md) - Right-size memory to stop spill — raise
spark.memory.fraction(cluster-create-only on AIDP) when there are no user data structures; spill to disk is a silent multiplier. (04-memory-and-spill.md,cluster-sizing.md) - Address skew — AQE skew-join, or semi-join pre-filter + broadcast; a
p100/p50 > 2xjoin stage serializes the whole stage on a straggler. (02-joins.md,07-aqe.md) - Avoid redundant work — don't
unionthe same table, don't loopcollect()/head()on the driver, treat intermediates as views (don't materialize), prefer JOIN over MERGE for full reloads. (06-caching-materialization.md) - Compress shuffle/spill/output with zstd —
spark.io.compression.codec=zstd+spark.sql.parquet.compression.codec=zstd: tight like gzip, fast like snappy; cuts spill-disk and output size. (03-file-layout-io.md) - Tune parallelism —
shuffle.partitions≈ 2–3× total cores for large shuffles; pair with AQE coalescing to avoid small output files. (01-partitioning.md) - Constrain codegen on very wide aggregations — lower
spark.sql.codegen.maxFieldswhen a widegroupByis CPU-bound; generated code can be slower than the fallback. (05-codegen.md)
Workflow
- Measure — run the workload, collect stage metrics + task quantiles (
diagnosis.md). On AIDP, seeaidp-notes.mdfor collecting Spark UI data. - Diagnose — read
explain(True)/the SQL plan; map the dominant symptom to a technique. - Propose — pick the technique; check
config-matrix.mdfor where the config can be set. - Apply — change code or config. On a shared cluster, remember: one SparkSession per cluster — a
spark.conf.setleaks to other notebooks; revert it explicitly (aidp-notes.md). - Re-run on the same cluster + data, prove the workload is logically identical (same rows/columns/result), and compare the same metrics (wall time, shuffle, spill, task skew).
Common mistakes
- Comparing wall time of two runs that aren't logically identical (different data/cache state). Prove equivalence first.
- Reading summed task metrics as wall-clock — stage duration is start→end timestamps; cumulative task time is summed across tasks.
- Counting AQE-skipped stages as time spent.
repartition()"to be safe" — it forces a shuffle; only repartition with a reason (see01-partitioning.md).- Lowering AQE skew/advisory partition size so far that you create a new small-file problem.
- Setting a cluster-create-only config (e.g.
spark.memory.fraction) from a notebook and assuming it took effect — it didn't. Verify withspark.conf.get/ the Environment tab (config-matrix.md).
原文・著作権は Anthropic および各プラグイン作者に帰属します。日本語訳は Claude API による自動翻訳です。