✅aidp-acceptance-contract
- ソース
- GitHub で見る ↗
説明
バッチおよびストリーミングパイプラインにおいて、「例外が発生しないこと」だけでなく収束の検証を必要とする場合のための、YAML駆動の受け入れ契約です。 K回連続してゼロ件ウィンドウが続いた後にのみ PASS を宣言します(連続ゼロウィンドウ収束方式)。 次のような場合に使用: ストリーミングジョブ、緩やかに消化されていくバックフィル処理、または「保留キューが空になること」が成功条件となる任意のパイプライン。
原文を表示
YAML-driven acceptance contract for batch and streaming pipelines that need convergence verification, not just "no exceptions". Declares PASS only after K consecutive zero-result windows (consecutive-zero-window convergence). Use for streaming jobs, slowly-draining backfills, or any pipeline whose success is "the pending queue is empty".
ユースケース
- ✓ストリーミングジョブの収束検証
- ✓バックフィル処理の完了確認
- ✓保留キューが空になることの確認
本文(日本語訳)
aidp-acceptance-contract — バッチ/ストリーミングジョブの収束検証
「例外なし」でのパスは、ストリーミングやバックフィルのパイプラインにとって十分ではありません。 まだ処理中の行が残っている可能性があるためです。 このスキルは YAML ベースのコントラクトを設定し、K 回連続して未処理件数がゼロのウィンドウを確認して初めて PASS を宣言します。
次のような場合に使用
- ソースパイプラインが 構造化ストリーミング の場合 (マイグレーターの Pass-2 はトリガー完了時点で戻るが、ストリーム自体が必ずしも完全に処理済みとは限らない)
- ソースパイプラインが 低速消化のバッチバックフィル の場合 (各タスクがパーティションを処理し、パーティション処理完了時にワークフローが PASS するが、失敗行は別途リトライキューに保持されている)
- ユーザーが「落ち着くまで待つ」「収束」「収束に対するアクセプタンステスト」と要求する場合
成功 = "saveAsTable 完了" であるような単純な ETL には不要です。
パターン
1. マイグレーターが Pass-2 を完了まで実行する。
2. アクセプタンスコントラクトが定期的(sleep_between_s 秒ごと)にプローブを開始する。
3. 各プローブで pending_count_sql を実行し、pending = 0 であるかを確認する。
4. zero_window 回連続してゼロプローブを確認 → PASS を宣言する。
5. max_attempts に達しても収束しない場合 → ACCEPTANCE_CONTRACT_VIOLATED を宣言する。
マイグレーション全体の結果は FAIL に降格される。
YAML コントラクトのフォーマット
reports/<MyJob>_acceptance.yaml に保存してください:
# <task_key> のアクセプタンスコントラクト
task_key: "<task_key>"
description: "リトライキューが空になるまで待機する"
pending_count_sql: "SELECT COUNT(*) AS pending FROM <sandbox_schema>.<retry_queue_table> WHERE status = 'PENDING'"
zero_window: 3 # PASS に必要な K 回連続のゼロウィンドウ数
sleep_between_s: 30 # プローブ間隔(秒)
max_attempts: 60 # 上限。60 × 30秒 = 30分のハードキャップ
# オプション: クラスターの上書き(デフォルトは job_migrate のクラスターを使用)
cluster_id: null
# オプション: 違反時の通知フック(収束前に max_attempts に達した場合に1回呼び出される)
on_violation_log_path: "/tmp/<MyJob>_acceptance.log"
マイグレーションへの組み込み
YAML がマニフェストと同じ場所に存在する場合、コントラクトは job_migrate.py によって自動的に読み込まれます:
# 規則: <MyJob>_manifest.json と <MyJob>_acceptance.yaml を同じディレクトリに配置
ls reports/
# → reports/<MyJob>_manifest.json
# → reports/<MyJob>_acceptance.yaml
python3 ${CLAUDE_PLUGIN_ROOT}/engine/scripts/job_migrate.py \
--manifest reports/<MyJob>_manifest.json \
--acceptance-contract reports/<MyJob>_acceptance.yaml \
# + その他の引数
--acceptance-contract を省略した場合、コントラクトはスキップされます
(デフォルト動作 — PASS = Pass-2 の全セルがグリーン)。
タスク単位 vs ジョブ全体
YAML の task_key フィールドは、マニフェスト内の 1つのタスク にコントラクトをスコープします。
複数のタスクに収束確認が必要な場合は、リスト形式で記述します:
contracts:
- task_key: "<task_a>"
pending_count_sql: "SELECT COUNT(*) FROM <schema>.<queue_a> WHERE pending"
zero_window: 3
sleep_between_s: 30
max_attempts: 60
- task_key: "<task_b>"
pending_count_sql: "SELECT COUNT(*) FROM <schema>.<queue_b> WHERE pending"
zero_window: 5
sleep_between_s: 60
max_attempts: 30
コントラクトは、対応するタスクの完了後に順次実行されます。
結果の読み方
実行後、JOB_REPORT.md にアクセプタンスコントラクトのセクションが追加されます:
## Acceptance contracts
| task_key | status | windows_observed | converged_at |
|---|---|---|---|
| <task_a> | PASS | 3 consecutive zeros | 2026-XX-XX HH:MM:SS |
| <task_b> | ACCEPTANCE_CONTRACT_VIOLATED | 60 attempts, 0 consecutive zeros | -- |
いずれか1つでも コントラクトが VIOLATED になった場合、
セルレベルの成功状態に関わらず、全体の RESULT: は PASS から ACCEPTANCE_CONTRACT_VIOLATED に降格されます。
良い pending_count_sql の書き方
クエリに関するいくつかの指針:
- 自分が管理するサンドボックステーブルから読み取る (多くの場合、ソースのキュー/リトライテーブルのリダイレクトスキーマ版)
- クエリは整数カウントの単一カラムを返す必要がある (フレームワークは "pending" カラムまたは位置 0 を読み取る)
- クエリは 高速 でなければならない —
sleep_between_sごとに実行される。 適切なフィルター/パーティションを追加すること - クエリは 冪等 でなければならない — 実行してもステートを変更しないこと
記述例:
-- ストリーミングリトライキュー
SELECT COUNT(*) AS pending
FROM <sandbox_schema>.<retry_queue_table>
WHERE status IN ('PENDING','RETRYING')
AND ingest_ts > current_date - INTERVAL 1 DAY
-- バックフィルの進捗
SELECT (target_count - processed_count) AS pending
FROM <sandbox_schema>.backfill_progress
WHERE backfill_id = '<id>'
-- ストリーミングウォーターマークのギャップ
SELECT GREATEST(
0,
CAST((unix_timestamp(current_timestamp()) - unix_timestamp(max(event_ts))) / 60 AS INT)
) AS pending
FROM <sandbox_schema>.<streaming_output_table>
このスキルが行わないこと
- ストリーミングジョブの実際の実行は行わない
— 基盤となる
job_migrate.pyがすでにセルを実行済みです。 コントラクトはあくまで 実行後の検証ステップ です。 - 「収束」の意味をセマンティクスレベルで定義しない
— それはユーザーの
pending_count_sqlが担います。 - 失敗したセルのリトライは行わない
— 「パイプラインが時間内に処理を完了できなかった」ことを通知するのみです。
セルレベルのリトライには
aidp-fixup-cellを使用してください。
この後の対応
- PASS の場合: 下流の検証に進み、マイグレーションをサインオフする。
- VIOLATED の場合: キューを調査する。
考えられる原因:
- (a) 基盤となるジョブにより多くの時間が必要(
max_attemptsを増やす) - (b) キューが本当に詰まっている(データ問題、スケジューラー問題)
- (c) コントラクト自体が誤っている(SQL を確認する)
- (a) 基盤となるジョブにより多くの時間が必要(
原文(English)を表示
aidp-acceptance-contract — convergence verification for batch/streaming jobs
A "no exception" pass is not enough for streaming or backfill pipelines. They might still be processing rows that haven't drained yet. This skill wires up a YAML-driven contract: PASS only after K consecutive empty-pending windows.
When to use
- Source pipeline is structured streaming (the migrator's Pass-2 returns when the trigger completes, but the stream itself hasn't necessarily drained).
- Source pipeline is a slowly-draining batch backfill (each task processes a partition, the workflow PASSes when the partition is processed but a separate retry queue holds the failed rows).
- User asks "wait for it to settle", "convergence", "acceptance test on convergence".
Not needed for plain ETL where success = "the saveAsTable finished".
The pattern
1. Migrator runs Pass-2 to completion.
2. Acceptance contract starts probing periodically (every `sleep_between_s` seconds).
3. Each probe runs `pending_count_sql` and checks `pending = 0`.
4. After `zero_window` consecutive zero-probes → declare PASS.
5. If we hit `max_attempts` without convergence → declare ACCEPTANCE_CONTRACT_VIOLATED.
Overall migration result is demoted to FAIL.
YAML contract format
Save at reports/<MyJob>_acceptance.yaml:
# Acceptance contract for <task_key>
task_key: "<task_key>"
description: "Wait until the retry queue drains"
pending_count_sql: "SELECT COUNT(*) AS pending FROM <sandbox_schema>.<retry_queue_table> WHERE status = 'PENDING'"
zero_window: 3 # K consecutive zero windows required for PASS
sleep_between_s: 30 # seconds between probes
max_attempts: 60 # ceiling; 60 × 30s = 30-min hard cap
# Optional: override the cluster (default = use job_migrate's cluster)
cluster_id: null
# Optional: notify-on-violation hook (called once if max_attempts hit before convergence)
on_violation_log_path: "/tmp/<MyJob>_acceptance.log"
Wire into a migration
The contract is loaded by job_migrate.py automatically if the YAML exists alongside the manifest:
# Convention: <MyJob>_manifest.json + <MyJob>_acceptance.yaml side by side
ls reports/
# → reports/<MyJob>_manifest.json
# → reports/<MyJob>_acceptance.yaml
python3 ${CLAUDE_PLUGIN_ROOT}/engine/scripts/job_migrate.py \
--manifest reports/<MyJob>_manifest.json \
--acceptance-contract reports/<MyJob>_acceptance.yaml \
# + the rest of the args
If --acceptance-contract is omitted, the contract is skipped (default behavior — PASS = Pass-2 cells all green).
Per-task vs job-wide
The YAML's task_key field scopes the contract to ONE task in the manifest. If you have multiple tasks needing convergence, write a list:
contracts:
- task_key: "<task_a>"
pending_count_sql: "SELECT COUNT(*) FROM <schema>.<queue_a> WHERE pending"
zero_window: 3
sleep_between_s: 30
max_attempts: 60
- task_key: "<task_b>"
pending_count_sql: "SELECT COUNT(*) FROM <schema>.<queue_b> WHERE pending"
zero_window: 5
sleep_between_s: 60
max_attempts: 30
Contracts run sequentially after their respective tasks complete.
How to read the verdict
After a run, JOB_REPORT.md will include an acceptance-contract section:
## Acceptance contracts
| task_key | status | windows_observed | converged_at |
|---|---|---|---|
| <task_a> | PASS | 3 consecutive zeros | 2026-XX-XX HH:MM:SS |
| <task_b> | ACCEPTANCE_CONTRACT_VIOLATED | 60 attempts, 0 consecutive zeros | -- |
If ANY contract is VIOLATED, the overall RESULT: line is demoted from PASS to ACCEPTANCE_CONTRACT_VIOLATED regardless of cell-level success.
Building a good pending_count_sql
Some rules of thumb for the query:
- Read from a sandbox table you control (most likely the redirect-schema version of the source's queue / retry table).
- The query must return a single column of integer count (the framework specifically reads column "pending" or position 0).
- The query must be FAST — it runs every
sleep_between_s. Add appropriate filters / partitions. - The query must be IDEMPOTENT — running it doesn't mutate state.
Example shapes:
-- Streaming retry queue
SELECT COUNT(*) AS pending
FROM <sandbox_schema>.<retry_queue_table>
WHERE status IN ('PENDING','RETRYING')
AND ingest_ts > current_date - INTERVAL 1 DAY
-- Backfill progress
SELECT (target_count - processed_count) AS pending
FROM <sandbox_schema>.backfill_progress
WHERE backfill_id = '<id>'
-- Streaming watermark gap
SELECT GREATEST(
0,
CAST((unix_timestamp(current_timestamp()) - unix_timestamp(max(event_ts))) / 60 AS INT)
) AS pending
FROM <sandbox_schema>.<streaming_output_table>
What this skill does NOT do
- Doesn't actually run the streaming job — the underlying job_migrate.py already executed the cells. The contract is a POST-execution verification step.
- Doesn't define what "convergence" means semantically — that's the user's
pending_count_sql. - Doesn't retry failed cells — it only signals "the pipeline didn't drain in time". For cell-level retries use
aidp-fixup-cell.
After this
- If PASS: proceed with downstream verification, sign off the migration.
- If VIOLATED: investigate the queue. Either (a) the underlying job needs more time (bump
max_attempts), (b) the queue is genuinely stuck (data issue, scheduler issue), or (c) the contract is wrong (check the SQL).
原文・著作権は Anthropic および各プラグイン作者に帰属します。日本語訳は Claude API による自動翻訳です。