🔄aidp-pipelines
- ソース
- GitHub で見る ↗
説明
次のような場合に使用: ユーザーがパイプラインを構築したい、Job を作成・更新・実行したい、定期的な実行をスケジュールしたい、実行のステータスを確認したい、タスクの出力を読み取りたい、または実行をキャンセルしたい場合。 AIDP Job(ノートブックや Python によるタスク DAG を cron でスケジュール実行する仕組み)の作成・スケジューリング・実行・モニタリングを行います。
原文を表示
Author, schedule, run, and monitor AIDP Jobs (task DAGs of notebooks/python with cron). Use when the user wants to build a pipeline, create/update/run a Job, schedule a recurring run, check a run's status, read a task's output, or cancel a run.
ユースケース
- ✓パイプラインを構築したい
- ✓Job を作成・更新・実行したい
- ✓定期的な実行をスケジュールしたい
- ✓実行のステータスを確認したい
- ✓タスクの出力を読み取りたい
- ✓実行をキャンセルしたい
本文(日本語訳)
aidp-pipelines — AIDP ジョブ(ビルド・スケジュール・実行・監視)
AIDP ジョブの作成・運用を行います。ノートブックまたは Python 上のタスク DAG を構成し、オプションで cron スケジュールを設定して実行・監視します。
エンジン優先順位(references/aidp-cli-map.md 参照):
インストール済みであれば公式の aidp workflow … CLI を優先し、未インストールの場合は oci raw-request にフォールバックします。
どちらも同一の REST API に同一の認証でアクセスします — MCP や ai-data-engineer-agent リポジトリは不要です。
すべての変更操作のリクエストボディを .aidp/payloads/ に保存し、実行前に確認してください(references/payloads.md 参照)。
認証 + ベース URL: CLI フラグ:
--instance-id <DATALAKE_OCID> --auth api_key --profile DEFAULT --region <r>REST ベース:https://aidp.<region>.oci.oraclecloud.com/20240831/dataLakes/<DATALAKE_OCID>/…401/403 が発生した場合はreferences/oci-raw-request.mdに記載の認証ラダー(oci session refresh --profile AIDP_SESSION)に従ってください。
コマンド一覧(CLI 優先・REST フォールバック)
- 作成・管理:
aidp workflow list-jobs·create-job·get-job·update-job·delete-job(REST:GET|POST|PUT|DELETE /workspaces/{ws}/jobs[/{key}]) - 実行・監視:
aidp workflow create-job-run·get-job-run·list-job-runs·list-recent-job-runs·list-task-runs·get-task-run·fetch-output/export-task-run-output·cancel-job-run[s]·repair-job-run(REST:POST …/jobs/{key}/actions/run、GET …/jobRuns/{runId}、タスク実行出力)
一覧取得の信頼性(LIVE-LESSON 2026-06-12): ジョブ一覧は大量かつページネーションありです — 実際のワークスペースには 100 件以上のジョブが存在する場合があります(実測:
playgroundでは最初のページで 100 件返却)。 必ずページネーション(limitとopc-next-pageヘッダー)を使用し、1 回のコールだけで「ジョブなし」と判断しないでください。 まず呼び出しが HTTP 2xx と JSON ボディ を返したことを確認してください — CLI/認証/ネットワークエラー(またはシェルのクォートバグ)によって出力が空のリストとして解析されると、100 件以上のジョブが「0 件」に見えるサイレントな偽陰性となります。 ジョブはワークスペーススコープであり、path/notebookPathは/Workspace/...を起点とします。 信頼性に関する規約は references/oci-raw-request.md を参照してください。
次のような場合に使用
- 「パイプライン/ジョブを作成したい」「毎日実行したい」「ジョブ X をトリガーしたい」「実行 Y が何を・なぜ行ったか確認したい」「実行 Z をキャンセルしたい」
エンドポイント(oci raw-request、コントロールプレーン)
作成・管理:
GET /workspaces/{ws}/jobs(一覧)POST /workspaces/{ws}/jobs(作成)GET|PUT|DELETE /workspaces/{ws}/jobs/{key}(取得 / fetch-modify-put 更新 / 削除)
実行・監視:
POST /workspaces/{ws}/jobs/{key}/actions/run(トリガー)GET …/jobs/{key}/jobRunsおよびGET …/jobRuns/{runId}(実行ステータス + タスク→タスク実行のマッピング)- タスク実行と出力(
taskRuns/ 実行配下のタスク実行出力)— 実行コード・stdout・ノートブックセル
REST の実行・監視サブ構造(
jobRuns、タスクごとの出力フィールド名)は references/no-mcp-rest-map.md に従い 先にプローブしてください — 不完全なパスは欠損パラメータを示す400を返します。 実際に 2xx が返るまでサブパスを確定済みとして扱わず、破壊的操作の前に必ず確認してください。
ジョブの作成 — 2 ステップ方式: create → update(LIVE 検証済み 2026-06-10)
ジョブはまず名前のみのボディで作成し、次に update-job の呼び出しでクラスターとタスクを追加します。
tasks や displayName をインラインで含めた単一の POST は拒否されます(400 Invalid resource name / Invalid Task type)。
aidp_skilltest および公式 SDK サンプル workflow_notebook_job_sample.py にてライブ確認済み。
-
ステップ 1 — 作成(名前のみ):
POST …/workspaces/<ws>/jobsに{"name":"etl_daily.job","path":"/Workspace/Shared","maxConcurrentRuns":1}を送信 → 201 が返り、ジョブキーを取得。 (displayNameではなくnameを使用。名前には英字・_・.が使用可能で、先頭は英字にすること。) -
ステップ 2 — 更新(クラスター + タスクを追加):
PUT …/workspaces/<ws>/jobs/<key>にjobClustersとtasksを指定:{ "jobClusters": [ { "clusterKey": "<CLUSTER_KEY>", "clusterName": "<CLUSTER_NAME>" } ], "tasks": [ { "type": "NOTEBOOK_TASK", "taskKey": "extract", "runIf": "ALL_SUCCESS", "cluster": { "clusterKey": "<CLUSTER_KEY>", "clusterName": "<CLUSTER_NAME>" }, "source": "WORKSPACE", "notebookPath": "/Workspace/Shared/extract.ipynb", "dependsOn": [] }, { "type": "NOTEBOOK_TASK", "taskKey": "load", "cluster": { "clusterKey": "<CLUSTER_KEY>", "clusterName": "<CLUSTER_NAME>" }, "source": "WORKSPACE", "notebookPath": "/Workspace/Shared/load.ipynb", "dependsOn": ["extract"] } ] } -
判別子は
type(NOTEBOOK_TASK/PYTHON_TASK/ …)であり、taskTypeではありません。dependsOnが DAG を定義します。 -
clusterNameの落とし穴: タスクごとのcluster.clusterNameには、UUID ではなくクラスターの実際の名前を指定してください(UUID を指定するとWORKFLOW_EXECUTION_0049 Cluster not foundになります)。clusterKeyと組み合わせて使用してください。 -
スケジュール: 定期実行する場合は更新ボディに cron 式を追加します。ボディは
.aidp/payloads/に保存してください。 -
公式の
aidp workflow create-job/update-jobCLI がこの 2 ステップをラップしています。
LIVE 検証済み 2026-06-10(de-agent)— 修正事項: ステップ 2 の
PUT …/jobs/<key>はマージではなく完全置換です — 更新ボディにはjobClusters/tasksと合わせてname・path・maxConcurrentRunsを必ず再送信してください。nameを省略すると400 InvalidParameter(「name must not be null」)が返ります。
実行と監視
POST …/jobs/{key}/actions/run→ 実行オブジェクトを返す。GET …/jobRuns/{runId}でステータスとタスク→タスク実行のマッピングを取得。GET …/jobRuns(ジョブキーでフィルター)で実行履歴を取得。実行を掘り下げてタスクごとのステータス・種別・所要時間を確認。- タスク実行の出力(実行コード + stdout / ノートブックセル)を読み込んで結果とデバッグに活用。
- 実行中のジョブを停止するには、実行のキャンセルアクションに
POSTを送信。
LIVE 検証済み 2026-06-10(de-agent)— 修正事項(create → run → SUCCESS → delete の完全ライフサイクル確認):
実行トリガー:
POST …/jobs/{key}/actions/runは20240831インスタンスで 404 を返します。 動作確認済みのトリガーはPOST …/workspaces/{ws}/jobRunsにボディ{"jobKey":"<key>"}を送信するものです(201 を返します)。POST …/jobRuns {jobKey}を検証済みトリガーとして使用し、actions/runはプローブ専用として扱ってください。タスク実行の出力:
GET …/taskRuns/{key}(200、outputKeyを含む)とPOST …/taskRuns/{key}/actions/fetchOutput(200、NOTEBOOK ペイロード)で取得します。 ジョブで実行されたノートブックは空のセルが含まれる場合があるため、取得したセル出力ではなく タスクのstate/stateMessage(例: 「Successfully executed notebook…」)で結果を確認してください。
レシピ — ノートブックをジョブとしてエンドツーエンドで実行する(公式 AI スキルデモフロー)
AIDP がノートブックワークロードをジョブベースで実行する方法です(CLI/SDK はセルをインタラクティブに実行しません — インタラクティブ実行には scripts/aidp_sql.py を使用してください)。公式 Codex デモを参考にしています。
-
前提条件: ノートブックが存在すること(
aidp-notebooks)、および対象クラスターが ACTIVE であること(aidp-cluster-ops)。 -
ジョブを作成する(上記「ジョブの作成」の 2 ステップに従う) — ボディを保存・確認してから実行:
# ステップ 1: 名前のみで作成 -> ジョブキーを返す aidp workflow create-job --instance-id <OCID> --auth api_key --profile DEFAULT \ --body '{"name":"WeatherSummary.job","path":"/Workspace/WeatherDemo","maxConcurrentRuns":1}' # ステップ 2: 更新 -> jobClusters と NOTEBOOK_TASK(type・cluster{clusterKey,clusterName}・source・notebookPath)を追加 aidp workflow update-job --instance-id <OCID> --auth api_key --profile DEFAULT --job-key <JOB_KEY> \ --body '{"name":"WeatherSummary.job","path":"/Workspace/WeatherDemo","maxConcurrentRuns":1,"jobClusters":[{"clusterKey":"<CK>","clusterName":"<CNAME>"}],"tasks":[{"type":"NOTEBOOK_TASK","taskKey":"summary","cluster":{"clusterKey":"<CK>","clusterName":"<CNAME>"},"source":"WORKSPACE","notebookPath":"/Workspace/WeatherDemo/WeatherSummary.ipynb","dependsOn":[]}]}' -
実行を開始する:
aidp workflow create-job-run --instance-id <OCID> … --body '{"jobKey":"<JOB_KEY>"}'→ 実行キーを返す。 -
ターミナル状態までポーリング:
SUCCESS/FAILED/CANCELEDになるまで数秒ごとにaidp workflow get-job-run … <RUN_KEY>を繰り返す(PENDING → RUNNING → ターミナル状態)。 -
出力取得とサマリー: SUCCESS の場合、タスク実行の
aidp workflow fetch-output/export-task-run-outputを実行し、実行ステータスだけでなくノートブックが生成したレポートの内容もサマリーする。FAILED の場合はaidp-spark-debuggingへ。
タスクタイプ・リトライ・ストリーミング・リペア・パラメーター(プラットフォームリファレンス §10–11)
NOTEBOOK_TASK / PYTHON_TASK 以外のタスク type:
- If/Else — 条件による分岐
- Nested Job — 別ジョブのタスク群を 1 ノードとして埋め込む
- JAR — Scala/Java(JDK/Scala/Spark のバージョンはクラスターランタイ
原文(English)を表示
aidp-pipelines — AIDP Jobs (build, schedule, run, monitor)
Author and operate AIDP Jobs — task DAGs over notebooks/python with optional cron — and watch their runs.
Engine precedence (see references/aidp-cli-map.md): prefer the
official aidp workflow … CLI when installed; fall back to oci raw-request otherwise. Both hit the
same REST API with the same auth — no MCP / ai-data-engineer-agent repo required. Persist every mutation
body to .aidp/payloads/ and confirm before running (see references/payloads.md).
Auth + base URL: CLI flags
--instance-id <DATALAKE_OCID> --auth api_key --profile DEFAULT --region <r>; REST basehttps://aidp.<region>.oci.oraclecloud.com/20240831/dataLakes/<DATALAKE_OCID>/…. On 401/403 follow the auth ladder (oci session refresh --profile AIDP_SESSION) inreferences/oci-raw-request.md.
Commands (CLI preferred · REST fallback)
- Author:
aidp workflow list-jobs·create-job·get-job·update-job·delete-job(REST:GET|POST|PUT|DELETE /workspaces/{ws}/jobs[/{key}]). - Run/monitor:
aidp workflow create-job-run·get-job-run·list-job-runs·list-recent-job-runs·list-task-runs·get-task-run·fetch-output/export-task-run-output·cancel-job-run[s]·repair-job-run(REST:POST …/jobs/{key}/actions/run,GET …/jobRuns/{runId}, task-run output).
Listing reliability (LIVE-LESSON 2026-06-12): job lists are large and paginated — a real workspace can hold 100+ jobs (live:
playgroundreturned 100 on the first page). Always paginate (limit
- the
opc-next-pageheader) and never conclude "no jobs" from a single call. First confirm the call returned HTTP 2xx with a JSON body — a CLI/auth/network error (or a shell-quoting bug) whose output is parsed as an empty list is a silent false-negative that turns 100+ jobs into "0". Jobs are workspace-scoped and theirpath/notebookPathare rooted at/Workspace/.... See the reliability conventions in references/oci-raw-request.md.
When to use
- "Build a pipeline / job", "run it daily", "trigger job X", "why/what did run Y do", "cancel run Z".
Endpoints (oci raw-request, control-plane)
Author: GET /workspaces/{ws}/jobs (list) · POST /workspaces/{ws}/jobs (create) ·
GET|PUT|DELETE /workspaces/{ws}/jobs/{key} (read / fetch-modify-put update / delete).
Run/monitor: POST /workspaces/{ws}/jobs/{key}/actions/run (trigger) ·
GET …/jobs/{key}/jobRuns and GET …/jobRuns/{runId} (run status + task-to-task-run mapping) ·
task runs + output (taskRuns / task-run output under the run) for executed code, stdout, notebook cells.
REST run/monitor sub-shapes (
jobRuns, per-task output field names) are probe-first per references/no-mcp-rest-map.md — a bare path returns a400naming the missing param. Don't present a sub-path as confirmed until a live 2xx; verify before destructive ops.
Authoring a job — TWO-STEP create→update (LIVE-VERIFIED 2026-06-10)
A job is created with a name-only body, then a second update-job call adds clusters + tasks. A single
POST that inlines tasks/displayName is rejected (400 Invalid resource name / Invalid Task type).
Confirmed live on aidp_skilltest and via the official SDK sample workflow_notebook_job_sample.py.
- Step 1 — create (name-only):
POST …/workspaces/<ws>/jobswith{"name":"etl_daily.job","path":"/Workspace/Shared","maxConcurrentRuns":1}→ 201, returns the job key. (Usename— a resource name, notdisplayName; names allow letters/_/.and must start with a letter.) - Step 2 — update (add clusters + tasks):
PUT …/workspaces/<ws>/jobs/<key>withjobClusters+tasks:{ "jobClusters": [ { "clusterKey": "<CLUSTER_KEY>", "clusterName": "<CLUSTER_NAME>" } ], "tasks": [ { "type": "NOTEBOOK_TASK", "taskKey": "extract", "runIf": "ALL_SUCCESS", "cluster": { "clusterKey": "<CLUSTER_KEY>", "clusterName": "<CLUSTER_NAME>" }, "source": "WORKSPACE", "notebookPath": "/Workspace/Shared/extract.ipynb", "dependsOn": [] }, { "type": "NOTEBOOK_TASK", "taskKey": "load", "cluster": { "clusterKey": "<CLUSTER_KEY>", "clusterName": "<CLUSTER_NAME>" }, "source": "WORKSPACE", "notebookPath": "/Workspace/Shared/load.ipynb", "dependsOn": ["extract"] } ] } - Discriminator is
type(NOTEBOOK_TASK/PYTHON_TASK/…), nottaskType.dependsOndefines the DAG. clusterNamepitfall: the per-taskcluster.clusterNamemust be the real cluster name, not a UUID (a UUID →WORKFLOW_EXECUTION_0049 Cluster not found); pair it withclusterKey.- Schedule: add a cron expression in the update body for recurring runs. Persist bodies to
.aidp/payloads/. - The official
aidp workflow create-job/update-jobCLI wraps these two steps.
Live-verified 2026-06-10 on de-agent — correction: the Step-2
PUT …/jobs/<key>is a full replace, not a merge — the update body MUST re-sendname+path+maxConcurrentRunsalongsidejobClusters/tasks. Omittingnamereturns400 InvalidParameter("name must not be null").
Run & monitor
POST …/jobs/{key}/actions/run→ returns a run;GET …/jobRuns/{runId}for status + task-to-task-run mapping.GET …/jobRuns(filter by job key) for history; drill into a run for per-task status/type/duration.- Read a task run's output (executed code + stdout / notebook cells) for results & debugging.
- To stop a running job,
POSTthe run's cancel action.
Live-verified 2026-06-10 on de-agent — corrections (full create→run→SUCCESS→delete lifecycle):
- Run trigger:
POST …/jobs/{key}/actions/runreturns 404 on the20240831instance. The WORKING trigger isPOST …/workspaces/{ws}/jobRunswith body{"jobKey":"<key>"}→ 201. TreatPOST …/jobRuns {jobKey}as the verified trigger;actions/runis a probe only.- Task-run output: read via
GET …/taskRuns/{key}(200, hasoutputKey) +POST …/taskRuns/{key}/actions/fetchOutput(200, NOTEBOOK payload). Job-executed notebooks may persist empty cells, so confirm results from taskstate/stateMessage(e.g. "Successfully executed notebook…"), not from the fetched cell output.
Recipe — run a notebook as a job, end-to-end (the official AI-skill demo flow)
This is how AIDP runs a notebook workload (job-based; the CLI/SDK does not execute cells interactively —
for that use scripts/aidp_sql.py). Mirrors the official Codex demo:
- Preconditions: the notebook exists (
aidp-notebooks) and the target cluster is ACTIVE (aidp-cluster-ops). - Create the job (two-step, per "Authoring a job" above) — persist bodies, confirm, then:
# Step 1: create name-only -> returns job key aidp workflow create-job --instance-id <OCID> --auth api_key --profile DEFAULT \ --body '{"name":"WeatherSummary.job","path":"/Workspace/WeatherDemo","maxConcurrentRuns":1}' # Step 2: update -> add jobClusters + the NOTEBOOK_TASK (type, cluster{clusterKey,clusterName}, source, notebookPath) aidp workflow update-job --instance-id <OCID> --auth api_key --profile DEFAULT --job-key <JOB_KEY> \ --body '{"jobClusters":[{"clusterKey":"<CK>","clusterName":"<CNAME>"}],"tasks":[{"type":"NOTEBOOK_TASK","taskKey":"summary","cluster":{"clusterKey":"<CK>","clusterName":"<CNAME>"},"source":"WORKSPACE","notebookPath":"/Workspace/WeatherDemo/WeatherSummary.ipynb","dependsOn":[]}]}' - Start the run:
aidp workflow create-job-run --instance-id <OCID> … --body '{"jobKey":"<JOB_KEY>"}'→ returns a run key. - Poll to terminal: loop
aidp workflow get-job-run … <RUN_KEY>every few seconds untilSUCCESS/FAILED/CANCELED(PENDING → RUNNING → terminal). - Fetch output + summarize: on SUCCESS,
aidp workflow fetch-output/export-task-run-outputfor the task run, and summarize the report the notebook produced (not just the run status). On FAILED →aidp-spark-debugging.
Task types, retries, streaming, repair & parameters (platform-ref §10–11)
Task types beyond NOTEBOOK_TASK/PYTHON_TASK: If/Else (conditional branching on a condition),
Nested Job (embed another job's tasks as one node), JAR (Scala/Java — JDK/Scala/Spark version must
match the cluster runtime). Tasks can depend on success or failure of a parent; tasks sharing a parent
run in parallel.
Retry policy (per task): retryCount (max attempts), retryInterval (wait between), retryOnTimeout
(retry if the task exceeds its time limit). Streaming task: mark the notebook/python task Streaming —
disables execution timeout + task dependencies, runs continuously until stopped, auto-restarts at monthly
maintenance; set Max Concurrent Runs = 1. Scheduling min frequency is 30 min. Run statuses include
SKIPPED (prior run still active) and TIMED_OUT.
Repair a failed run (rerun only the failed/selected tasks, don't re-run the whole DAG):
oci raw-request --http-method POST --profile DEFAULT \
--target-uri "https://aidp.<region>.oci.oraclecloud.com/20240831/dataLakes/<OCID>/workspaces/<WS>/jobRuns/<RUN_KEY>/actions/repair" \
--request-body '{ "...RepairJobRunDetails: tasks to rerun + optional Key/Value or JSON params..." }'
(SDK workflow.repair_job_run; {{job.repair_count}} increments per repair — confirm the body live before use.)
Parameterization — precedence Job Run > Task > Job (runtime overrides task, task overrides job; job
params are immutable during task execution). Reference system parameters with {{…}} in task
configs/paths: {{job.id}},{{job.name}},{{job.run_id}},{{job.repair_count}},{{job.parameters.[name]}},
{{job.trigger.type}},{{job.trigger.file_arrival.location}} (file-arrival trigger),{{task.name}},
{{task.run_id}},{{task.execution_count}},{{tasks.[name].result_state}} (success/failed/skipped/…),
{{tasks.[name].error_code}},{{workspace.id}},{{hub.region}}. For passing computed values notebook→notebook
inside a task, see oidlUtils.notebook.run/exit in aidp-notebooks.
Workflow
- Confirm the notebook(s)/python file(s) exist (
aidp-notebooks/aidp-workspace-files) and the cluster. - Build the task DAG (deps, schedule); show the user the JSON job spec before creating.
POSTto create, trigger a test run, poll the run to terminal, read task output.- On failure, route to
aidp-spark-debugging(logs + Spark UI) with the failing task run. - Clean up test jobs (
DELETE …/jobs/{key}) when validating.
Interactive SQL (only if a task needs a quick check)
For ad-hoc Spark-SQL outside a job, use the bundled helper — no MCP required:
python "$PLUGIN_DIR/scripts/aidp_sql.py" --region <r> --datalake <DATALAKE_OCID> --workspace <WS> \
--cluster <key> --code "spark.sql('SELECT 1').show()"
It mints a UPST from the api_key DEFAULT profile and returns JSON (status/outputs/spark_job_ids). Production pipeline steps belong in a notebook/python task driven by the Job, not this helper.
References
- references/oci-raw-request.md · references/no-mcp-rest-map.md · references/rest-endpoint-map.md
- Pairs with
aidp-notebooks,aidp-workspace-files,aidp-spark-debugging,aidp-data-quality.
原文・著作権は Anthropic および各プラグイン作者に帰属します。日本語訳は Claude API による自動翻訳です。