🚀aidp-migrate-job
- ソース
- GitHub で見る ↗
説明
マニフェストに対してDatabricks→AIDPの完全な移行を実行します。 **Pass-1** では `%run` の依存ツリーをたどり、各依存ノートブックのコードのみに対してDatabricks APIを書き換えます。 **Pass-2** では各タスクのセルをライブAIDPクラスター上でセルごとに実行し、4方向の検証(実行エラー / stderrパターン / Sparkログ / Opus評価)を行い、Claudeのツール使用機能を通じて最大10回まで再試行します。 次のような場合に使用: ユーザーがワークロードを実際に移植する準備ができているとき(計画段階ではなく)。 **長時間実行処理** — 標準的なジョブはセル数に応じて、タスクあたり10〜60分程度かかります。
原文を表示
Run the full Databricks→AIDP migration against a manifest. Pass-1 walks the %run dep tree and rewrites Databricks APIs in each dep notebook code-only. Pass-2 executes each task cell-by-cell on a live AIDP cluster, runs 4-way verify (exec error / stderr patterns / Spark logs / Opus eval), and re-attempts up to 10 times via Claude with tool use. Use when the user is ready to actually port the workload (not just plan it). Long-running — typical job takes 10–60 minutes per task depending on cell count.
ユースケース
- ✓ワークロードを実際に移植する準備ができているとき
- ✓DatabricksからAIDPへの完全な移行を実行するとき
- ✓依存ノートブックの移行を一括で行うとき
- ✓移行後のコード動作を検証するとき
本文(日本語訳)
aidp-migrate-job — マイグレーションの実行
これがメインの処理です。Pass-1 でコードを修正し、Pass-2 で実際に動作することを検証します。
次のような場合に使用
- ユーザーがマイグレーションの準備を完了している場合(マニフェスト構築済み、データチェッククリーン、カタログ移行済み、クラスターが Active 状態)。
- ユーザーが「migrate」「run the port」「execute the migration」などを明示的に指示した場合。
以下の条件が揃っていない場合は、このスキルを呼び出さないでください:
reports/<job>_manifest.jsonに有効なマニフェストが存在すること(aidp-build-dagを使用して作成)。aidp-check-dataがクリーンであること(または「データが欠損していることは承知の上で続行してほしい」とユーザーが明示的に指示していること)。- AIDP クラスターが ACTIVE 状態であること。
ANTHROPIC_API_KEYが設定されていること。~/.oci/configが選択したプロファイルで有効であること。
標準的な呼び出し方法
export ANTHROPIC_API_KEY=sk-ant-...
python3 ${CLAUDE_PLUGIN_ROOT}/engine/scripts/job_migrate.py \
--manifest reports/<MyJob>_manifest.json \
--cluster <CLUSTER_ID> \
--aidp-base <AIDP_BASE> \
--datalake-ocid <DATALAKE_OCID> \
--workspace-id <WORKSPACE_UUID> \
--output-base <output-workspace-path> \
--oci-profile <profile>
ワークフロー形式のバリアント(Databricks Job のタスク DAG を保持する場合):
python3 ${CLAUDE_PLUGIN_ROOT}/engine/scripts/job_migrate_from_workflow.py \
--manifest reports/<MyJob>_manifest.json \
--cluster <CLUSTER_ID> \
--aidp-base <AIDP_BASE> \
--datalake-ocid <DATALAKE_OCID> \
--workspace-id <WORKSPACE_UUID> \
--output-base <output-workspace-path> \
--oci-profile <profile>
別ターミナルでログをリアルタイム確認する場合:
tail -f /tmp/migration.log
便利なフラグ一覧
| フラグ | 次のような場合に使用 |
|---|---|
--jobs <name>,<name> |
マニフェストから特定のジョブのみをマイグレーションしたい場合。 |
--start-task <substring> |
指定したタスクから処理を再開したい場合(それ以前のタスクをスキップ)。aidp-resume-migration と組み合わせて使用。 |
--only-tasks <names> |
指定したタスクのみを実行したい場合。失敗した単一タスクを再実行する際に便利。 |
--skip-migrated |
前回の実行で既にマイグレーション済みのノートブックをスキップする(デフォルト: ON)。無効にするには --no-skip-migrated を指定。 |
--parallel <N> |
並列タスクワーカー数(デフォルト: 20)。クラスターが高負荷な場合は減らしてください。 |
--catalog-manifest <path> |
文字列リテラル内のソースカタログを default へ確定的にリマッピングする場合。ソースコードに <source-catalog>.<schema> がハードコードされている場合に必須。 |
2パス処理のモデル
Pass 1 — DEPS(ensure_migrated):
推移的な %run / notebook.run のすべての対象について:
_migration_cache に存在する、またはクラスター上に既に存在する場合 → SKIP
それ以外 → コードのみマイグレーション(Claude が Databricks API を書き換え)
<output-base>/<job>/deps/ に .ipynb を保存
Pass 2 — TASKS(トポロジー順に各タスクを処理):
各タスクのノートブック内の各コードセルについて:
1. 分析(cell_plan: 説明・アクション・リスク)
2. マイグレーション(ツール使用を伴う Claude が書き換え)
3. WebSocket 経由でライブクラスター上で実行
4. 検証:
a. 例外が発生していないか?
b. 標準出力にエラーパターンがないか?("Error:", "Traceback", "FAILED")
c. Spark ログにステージ失敗が記録されていないか?
d. Opus による評価: 出力結果は正しく見えるか?
5. いずれかの検証チェックが失敗した場合 → Claude + 全ツールで call_fix() を呼び出す。
1セルあたり最大 10 回の修正を試みる。fixup_cell は以前のインデックスまで巻き戻し可能。
修正済みの .ipynb を <output-base>/<job>/notebooks/... に保存
JOB_REPORT.md を出力
ログで注目すべきパターン
/tmp/migration.log をリアルタイム確認する際の重要な出力行:
[12:34:56] [<job>/<task>] Cell 5/27: OK
[12:35:42] [<job>/<task>] Cell 12/27: OK (fixed attempt 2)
[12:36:18] [<job>/<task>] Cell 14/27: VERIFY FAIL (attempt 3/10): TABLE_OR_VIEW_NOT_FOUND
[12:39:01] [<job>/<task>] [child:helpers/io_utils.ipynb] Cell 3/8: OK
[12:42:15] [<job>/<task>] [fixup_cell] Rewinding to index 7 (reason: variable redefined upstream)
[12:48:30] [<job>/<task>] RESULT: PASS
RESULT: PASS→ すべてのセルがクリーンに実行完了。RESULT: PARTIAL→ 一部のセルが 10 回の試行すべてで失敗。JOB_REPORT.mdを確認してください。RESULT: FAIL→ 致命的なエラー(クラスターのクラッシュ、マニフェスト破損など)。
出力ディレクトリ構成
正常終了後:
<output-base>/<job-name>/
notebooks/Users/.../<notebook>.ipynb ← マイグレーション済みかつ実行検証済みのノートブック
deps/dep_<name>/<notebook>.ipynb ← Pass-1 の依存関係アーティファクト(参考情報)
tasks/<numbered_key>/ ← タスクごとのレポート
reports/
JOB_REPORT.md ← セルの成功 / 失敗 / 修正件数
マイグレーション済みの .ipynb は AIDP ワークスペースの <output-base> にアップロードされると同時に、オフライン確認用としてローカルの ./reports/<job-name>/ にも保存されます。
問題が発生した場合
| 症状 | スキル / 対処法 |
|---|---|
N 個のセルが 10 回すべての試行で失敗し RESULT: PARTIAL になる |
各セルに対して aidp-fixup-cell を使用。 |
| 実行途中にクラスターがクラッシュした(WebSocket が切断される) | クラスターを再起動。--skip-migrated(デフォルト)付きで再実行 — 完了済みの Pass-1 依存関係は再処理されません。 |
| 処理を中断したい | pkill -f job_migrate.py(SIGTERM) — 現在のセルの処理完了後に終了します。 |
| 依存関係を手動修正した後に再開したい | aidp-resume-migration を使用。 |
マイグレーション済みテーブルがリダイレクトスキーマ(<sandbox>)に作成されたが、本番の場所に書き込まれることを期待していた |
references/gotchas.md の「redirect schema」節を確認。--no-redirect-schema を付けて再実行(注意: データ安全ゲートが無効化されるため、慎重に使用すること)。 |
このスキルが適用する安全対策
-
書き込みリダイレクトによるサンドボックススキーマ。 マイグレーション中、すべての
.saveAsTable(...)/INSERT INTOは自動的にサンドボックスの<schema>.<table>へリダイレクトされます。ソースの本番データは一切変更されません。リダイレクトスキーマはタスクごとに検証(databaseExists)され、検証に失敗した場合はそのタスクが即時失敗します。 -
ユーザーの明示的な同意なしに
--no-redirect-schemaは使用しない。 リダイレクトを無効化するとデータ安全の保証が失われます。 -
ユーザーの明示的な同意なしに
--skip-migrated=falseは使用しない。 強制的な再マイグレーションは Claude トークンを余分に消費するだけでなく、ユーザーが手動で適用した修正を上書きしてしまう恐れがあります。
コスト・所要時間の目安
- 典型的な 30 セルのノートブック: ウォームクラスターで約 5〜15 分、Claude トークンのコストは $1〜3 程度。
- 計 150 セル程度の 5 タスク構成のワークフロー: 30〜90 分、$10〜30 程度。
- Pass-1 の依存関係は同一実行内の複数ジョブ間で共有されます — 2 番目以降のジョブはコストが安くなります。
実行後の手順
JOB_REPORT.mdを確認してください(/migration-statusコマンドで自動解析できます)。PARTIALとなったセルはaidp-fixup-cellで対処してください。- ストリーミング / バッチ収束パイプラインの場合は、続けて
aidp-acceptance-contractを実行してください。
原文(English)を表示
aidp-migrate-job — execute the migration
This is the main event. Pass-1 fixes the code, Pass-2 proves it runs.
When to use
- The user is ready to migrate (manifest built, data-check clean, catalog migrated, cluster Active).
- The user explicitly asks "migrate", "run the port", "execute the migration".
Do NOT invoke this skill without:
- A valid manifest at
reports/<job>_manifest.json(useaidp-build-dag). - A clean
aidp-check-data(or an explicit "I know data is missing, proceed anyway" from the user). - An ACTIVE AIDP cluster.
ANTHROPIC_API_KEYset.~/.oci/configvalid for the chosen profile.
Canonical invocation
export ANTHROPIC_API_KEY=sk-ant-...
python3 ${CLAUDE_PLUGIN_ROOT}/engine/scripts/job_migrate.py \
--manifest reports/<MyJob>_manifest.json \
--cluster <CLUSTER_ID> \
--aidp-base <AIDP_BASE> \
--datalake-ocid <DATALAKE_OCID> \
--workspace-id <WORKSPACE_UUID> \
--output-base <output-workspace-path> \
--oci-profile <profile>
For the workflow-shape variant (preserves the Databricks Job task DAG):
python3 ${CLAUDE_PLUGIN_ROOT}/engine/scripts/job_migrate_from_workflow.py \
--manifest reports/<MyJob>_manifest.json \
--cluster <CLUSTER_ID> \
--aidp-base <AIDP_BASE> \
--datalake-ocid <DATALAKE_OCID> \
--workspace-id <WORKSPACE_UUID> \
--output-base <output-workspace-path> \
--oci-profile <profile>
Tail the log in another terminal:
tail -f /tmp/migration.log
Useful flags
| Flag | When to use |
|---|---|
--jobs <name>,<name> |
Migrate only specific jobs from the manifest. |
--start-task <substring> |
Resume from this task (skip everything before). Pairs with aidp-resume-migration. |
--only-tasks <names> |
Run ONLY these specific tasks. Useful for re-running a single failed task. |
--skip-migrated |
Skip notebooks already migrated in a prior run (default ON). Set off with --no-skip-migrated. |
--parallel <N> |
Concurrent task workers (default 20). Reduce if the cluster is contended. |
--catalog-manifest <path> |
Apply deterministic source-catalog → default remap in string literals. Required when source code has hardcoded <source-catalog>.<schema> strings. |
Two-pass mental model
Pass 1 — DEPS (ensure_migrated):
For every transitive %run / notebook.run target:
if already in _migration_cache or already on cluster → SKIP
else → migrate code only (Claude rewrites Databricks APIs)
save .ipynb to <output-base>/<job>/deps/
Pass 2 — TASKS (per task in topo order):
For each task notebook, for each code cell:
1. Analyze (cell_plan: description, action, risks)
2. Migrate (Claude with tool use rewrites)
3. Execute on live cluster via WebSocket
4. Verify:
a. raised exception?
b. error patterns in stdout? ("Error:", "Traceback", "FAILED")
c. Spark logs show stage failure?
d. Opus eval: does the output look correct?
5. If any verify check failed → call_fix() with Claude + full tools.
Up to 10 fix attempts per cell. fixup_cell can rewind to earlier indices.
Save the fixed-up .ipynb to <output-base>/<job>/notebooks/...
Emit JOB_REPORT.md
Log patterns to watch for
When tailing /tmp/migration.log, key lines:
[12:34:56] [<job>/<task>] Cell 5/27: OK
[12:35:42] [<job>/<task>] Cell 12/27: OK (fixed attempt 2)
[12:36:18] [<job>/<task>] Cell 14/27: VERIFY FAIL (attempt 3/10): TABLE_OR_VIEW_NOT_FOUND
[12:39:01] [<job>/<task>] [child:helpers/io_utils.ipynb] Cell 3/8: OK
[12:42:15] [<job>/<task>] [fixup_cell] Rewinding to index 7 (reason: variable redefined upstream)
[12:48:30] [<job>/<task>] RESULT: PASS
RESULT: PASS → all cells executed cleanly. RESULT: PARTIAL → some cells failed all 10 attempts; review JOB_REPORT.md. RESULT: FAIL → catastrophic (cluster died, manifest broken).
Output layout
After a successful run:
<output-base>/<job-name>/
notebooks/Users/.../<notebook>.ipynb ← the migrated, run-validated notebook
deps/dep_<name>/<notebook>.ipynb ← Pass-1 dep artifacts (informational)
tasks/<numbered_key>/ ← per-task reports
reports/
JOB_REPORT.md ← cell pass/fail/fix counts
The migrated .ipynbs are uploaded to your AIDP workspace at <output-base> AND saved to your local ./reports/<job-name>/ for offline review.
When it goes wrong
| Symptom | Skill / fix |
|---|---|
RESULT: PARTIAL with N cells failing all 10 attempts |
aidp-fixup-cell for each. |
| Cluster died mid-run (WS disconnects) | Restart cluster. Re-invoke with --skip-migrated (default) — Pass-1 deps already done aren't repeated. |
| User wants to abort | pkill -f job_migrate.py (SIGTERM) — lets the current cell finish. |
| User wants to resume after manual fixes to a dep | aidp-resume-migration. |
Migrated table is in the redirect schema (<sandbox>) but user expected production location |
Check references/gotchas.md §"redirect schema". Re-run with --no-redirect-schema (USE WITH CARE — bypasses data-safety gate). |
Safety notes the skill enforces
- Write-redirect sandbox schema. Every
.saveAsTable(...)/INSERT INTOis silently rewritten to a sandbox<schema>.<table>location during migration. Source production data is never touched. The redirect schema is verified per-task (databaseExists) — if verification fails, the task fails fast. - No
--no-redirect-schemawithout explicit user consent. Bypassing the redirect drops the data-safety guarantee. - No
--skip-migrated=falsewithout explicit user consent. Force-re-migration re-spends Claude tokens AND can overwrite manual fixes the user applied to a previously-migrated notebook.
Cost / time guidance
- A typical 30-cell notebook takes ~5-15 minutes on a warm cluster, costs $1-3 in Claude tokens.
- A typical 5-task workflow with ~150 cells total: 30-90 min, $10-30 in Claude.
- Pass-1 deps are SHARED across jobs in the same run — second job is cheaper.
After this
- Read the JOB_REPORT.md (
/migration-statuscommand auto-parses it). - For any
PARTIALcells, route toaidp-fixup-cell. - For streaming / batch convergence pipelines, follow up with
aidp-acceptance-contract.
原文・著作権は Anthropic および各プラグイン作者に帰属します。日本語訳は Claude API による自動翻訳です。