claude-skills/

Anthropic公式スキル・プラグインの日本語ディレクトリ

last sync 22h ago
スキルOfficialdatabase

🏗️databricks-jobs

プラグイン
databricks

説明

Databricksにおいて、DABs・Python SDK・CLIを通じてLakeflow Jobsを開発・デプロイします。 次のような場合に使用: ノートブック、Pythonホイール、SQL、dbt、またはパイプラインを使用したデータエンジニアリングジョブを作成する場合。 実装を開始する**前に**呼び出してください。

原文を表示

Develop and deploy Lakeflow Jobs on Databricks via DABs, Python SDK, or the CLI. Use when creating data engineering jobs with notebooks, Python wheels, SQL, dbt, or pipelines. Invoke BEFORE starting implementation.

ユースケース

  • データエンジニアリングジョブを作成する
  • Lakehouse Jobsを開発・デプロイする
  • 実装開始前に準備する

本文(日本語訳)

Lakeflow Jobs 開発

最初に: CLI の基本操作、認証、プロファイル選択、データ探索コマンドについては、親スキルである databricks-core を参照してください。

Lakeflow Jobs は、マルチタスク DAG・柔軟なトリガー・包括的なモニタリングを備えたデータワークフローのオーケストレーションツールです。
Jobs は多様なタスクタイプをサポートしており、Asset Bundles(DABs)・Python SDK・CLI を通じて管理できます。


リファレンスファイル

ユースケース リファレンスファイル
タスクタイプの設定(notebook、Python、SQL、dbt、pipeline、JAR、run_job、for_each) references/task-types.md
トリガーとスケジュールの設定(cron、periodic、ファイル到着、テーブル更新、継続実行) references/triggers-schedules.md
通知・ヘルスルール・リトライ・タイムアウト・キューの設定 references/notifications-monitoring.md
完全なサンプル集(ETL、ウェアハウスリフレッシュ、イベント駆動、ML トレーニング、マルチ環境、ストリーミング、クロスジョブ) references/examples.md

新規 Job プロジェクトのスキャフォールディング

設定ファイルを使って databricks bundle init を非インタラクティブに実行します。
これにより <project_name>/ ディレクトリにプロジェクトが作成されます:

databricks bundle init default-python --config-file <(echo '{"project_name": "my_job", "include_job": "yes", "include_pipeline": "no", "include_python": "yes", "serverless": "yes"}') --profile <PROFILE> < /dev/null
  • project_name: 使用できる文字は英数字とアンダースコアのみ

スキャフォールディング完了後、プロジェクトディレクトリに CLAUDE.mdAGENTS.md を作成してください。
これらのファイルは、agent がプロジェクトをどのように扱うかを把握するために不可欠です。
以下の内容を使用してください:

# Declarative Automation Bundles プロジェクト

このプロジェクトはデプロイメントに Declarative Automation Bundles(旧称: Databricks Asset Bundles)を使用しています。

## 前提条件

Databricks CLI(>= v0.288.0)がインストールされていない場合はインストールしてください:
- macOS: `brew tap databricks/tap && brew install databricks`
- Linux: `curl -fsSL https://raw.githubusercontent.com/databricks/setup-cli/main/install.sh | sh`
- Windows: `winget install Databricks.DatabricksCLI`

確認: `databricks -v`

## AI Agent 向け

CLI の基本操作、認証、デプロイメントワークフローについては `databricks-core` スキルを参照してください。
Job 固有のガイダンスについては `databricks-jobs` スキルを参照してください。

スキルが利用できない場合は、以下のコマンドでインストールしてください: `databricks aitools install`

プロジェクト構成

my-job-project/
├── databricks.yml              # Bundle 設定
├── resources/
│   └── my_job.job.yml          # Job 定義
├── src/
│   ├── my_notebook.ipynb       # Notebook タスク
│   └── my_module/              # Python wheel パッケージ
│       ├── __init__.py
│       └── main.py
├── tests/
│   └── test_main.py
└── pyproject.toml              # Python プロジェクト設定(wheel 使用時)

クイックスタート

Asset Bundles(DABs)— 推奨

# resources/jobs.yml
resources:
  jobs:
    my_etl_job:
      name: "[${bundle.target}] My ETL Job"
      tasks:
        - task_key: extract
          notebook_task:
            notebook_path: ../src/notebooks/extract.py

Python SDK

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import Task, NotebookTask, Source

w = WorkspaceClient()

job = w.jobs.create(
    name="my-etl-job",
    tasks=[
        Task(
            task_key="extract",
            notebook_task=NotebookTask(
                notebook_path="/Workspace/Shared/etl/extract",
                source=Source.WORKSPACE,
            ),
        ),
    ],
)
print(f"Created job: {job.job_id}")

CLI

databricks jobs create --json '{
  "name": "my-etl-job",
  "tasks": [{
    "task_key": "extract",
    "notebook_task": {
      "notebook_path": "/Workspace/Shared/etl/extract",
      "source": "WORKSPACE"
    }
  }]
}'

コアコンセプト

マルチタスクワークフロー

Jobs は DAG ベースのタスク依存関係をサポートしています:

tasks:
  - task_key: extract
    notebook_task:
      notebook_path: ../src/extract.py

  - task_key: transform
    depends_on:
      - task_key: extract
    notebook_task:
      notebook_path: ../src/transform.py

  - task_key: load
    depends_on:
      - task_key: transform
    run_if: ALL_SUCCESS  # 全依存タスクが成功した場合のみ実行
    notebook_task:
      notebook_path: ../src/load.py

run_if の条件:

  • ALL_SUCCESS(デフォルト)— 全依存タスクが成功した場合に実行
  • ALL_DONE — 全依存タスクが完了した場合に実行(成功・失敗を問わない)
  • AT_LEAST_ONE_SUCCESS — 少なくとも1つの依存タスクが成功した場合に実行
  • NONE_FAILED — 失敗した依存タスクが存在しない場合に実行
  • ALL_FAILED — 全依存タスクが失敗した場合に実行
  • AT_LEAST_ONE_FAILED — 少なくとも1つの依存タスクが失敗した場合に実行

タスクタイプ一覧

タスクタイプ ユースケース リファレンス
notebook_task Notebook の実行 references/task-types.md#notebook-task
spark_python_task Python スクリプトの実行 references/task-types.md#spark-python-task
python_wheel_task Python wheel の実行 references/task-types.md#python-wheel-task
sql_task SQL クエリ・ファイル・ダッシュボード・アラートの実行 references/task-types.md#sql-task
dbt_task dbt プロジェクトの実行 references/task-types.md#dbt-task
pipeline_task SDP(旧称: DLT)パイプラインのトリガー references/task-types.md#pipeline-task
spark_jar_task Spark JAR の実行 references/task-types.md#spark-jar-task
run_job_task 他の Job のトリガー references/task-types.md#run-job-task
for_each_task 入力のループ処理 references/task-types.md#for-each-task

トリガータイプ一覧

トリガータイプ ユースケース リファレンス
schedule cron ベースのスケジューリング references/triggers-schedules.md#cron-schedule
trigger.periodic インターバルベース references/triggers-schedules.md#periodic-trigger
trigger.file_arrival ファイル到着イベント references/triggers-schedules.md#file-arrival-trigger
trigger.table_update Unity Catalog テーブル変更イベント references/triggers-schedules.md#table-update-trigger
continuous 常時稼働 Job references/triggers-schedules.md#continuous-jobs

コンピュート設定

Job クラスター(推奨)

タスク間で共有できる再利用可能なクラスター設定を定義します:

job_clusters:
  - job_cluster_key: shared_cluster
    new_cluster:
      spark_version: "15.4.x-scala2.12"
      node_type_id: "i3.xlarge"
      num_workers: 2
      spark_conf:
        spark.speculation: "true"

tasks:
  - task_key: my_task
    job_cluster_key: shared_cluster
    notebook_task:
      notebook_path: ../src/notebook.py

オートスケールクラスター

new_cluster:
  spark_version: "15.4.x-scala2.12"
  node_type_id: "i3.xlarge"
  autoscale:
    min_workers: 2
    max_workers: 8

既存クラスター

tasks:
  - task_key: my_task
    existing_cluster_id: "0123-456789-abcdef12"
    notebook_task:
      notebook_path: ../src/notebook.py

サーバーレスコンピュート

Notebook および Python タスクでサーバーレスを使用するには、クラスター設定を省略します:

tasks:
  - task_key: serverless_task
    notebook_task:
      notebook_path: ../src/notebook.py
    # クラスター設定なし = サーバーレス

Job パラメーター

Job レベルで定義したパラメーターは、全タスクに渡されます(タスクごとに繰り返す必要はありません):

parameters:
  - name: env
    default: "dev"
  - name: date
    default: "{{start_date}}"  # 動的な値の参照

Notebook 内でのアクセス方法:

catalog = dbutils.widgets.get("env")
load_date = dbutils.widgets.get("date")

特定タスクへの渡し方:

tasks:
  - task_key: my_task
    notebook_task:
      notebook_path: ../src/notebook.py
      base_parameters:
        env: "{{job.parameters.env}}"
        custom_param: "value"

基本操作

Python SDK

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

# Job 一覧の取得
jobs = w.jobs.list()

# Job の詳細取得
job = w.jobs.get(job_id=12345)

# Job を即時実行
run = w.jobs.run_now(job_id=12345)

# パラメーター付きで実行
run = w.jobs.run_now(
    job_id=12345,
    job_parameters={"env": "prod", "date": "2024-01-15"},
)

# 実行のキャンセル
w.jobs.cancel_run(run_id=run.run_id)

# Job の削除
w.jobs.delete(job_id=12345)

CLI

# Job 一覧の取得
databricks jobs list

# Job の詳細取得
databricks jobs get 12345

# Job の実行
databricks jobs run-now 12345

# パラメーター付きで実行(--json 内に job_id を含める必要あり)
databricks jobs run-now --json '{"job_id": 12345, "job_parameters": {"env": "prod"}}'

# 実行のキャンセル
databricks jobs cancel-run 67890

# Job の削除
databricks jobs delete 12345

Asset Bundle 操作

# 設定の検証
databricks bundle validate --profile <profile>

# ターゲットへのデプロイ
databricks bundle deploy -t dev --profile <profile>

# Job の実行
databricks bundle run <job_name> -t dev --profile <profile>

# 実行ステータスの確認
databricks jobs get-run --run-id <id> --profile <profile>

# リソースの削除
databricks bundle destroy --auto-approve

パーミッション(DABs)

resources:
  jobs:
    my_job:
      name: "My Job"
      permissions:
        - level: CAN_VIEW
          group_name: "data-analysts"
        - level: CAN_MANAGE_RUN
          group_name: "data-engineers"
        - level: CAN_MANAGE
          user_name: "admin@example.com"

パーミッションレベル:

  • CAN_VIEW — Job および実行履歴の閲覧
  • CAN_MANAGE_RUN — 閲覧・実行トリガー・実行キャンセル
  • CAN_MANAGE — 編集・削除を含む完全な制御

ユニットテスト

ローカルでユニットテストを実行します:

uv run pytest

開発ワークフロー

  1. 検証: databricks bundle validate --profile <profile>
  2. **デ
原文(English)を表示

Lakeflow Jobs Development

FIRST: Use the parent databricks-core skill for CLI basics, authentication, profile selection, and data exploration commands.

Lakeflow Jobs orchestrate data workflows with multi-task DAGs, flexible triggers, and comprehensive monitoring. Jobs support diverse task types and can be managed via Asset Bundles (DABs), Python SDK, or CLI.

Reference Files

Use Case Reference File
Configure task types (notebook, Python, SQL, dbt, pipeline, JAR, run_job, for_each) references/task-types.md
Set up triggers and schedules (cron, periodic, file arrival, table update, continuous) references/triggers-schedules.md
Configure notifications, health rules, retries, timeouts, queues references/notifications-monitoring.md
Complete worked examples (ETL, warehouse refresh, event-driven, ML training, multi-env, streaming, cross-job) references/examples.md

Scaffolding a New Job Project

Use databricks bundle init with a config file to scaffold non-interactively. This creates a project in the <project_name>/ directory:

databricks bundle init default-python --config-file <(echo '{"project_name": "my_job", "include_job": "yes", "include_pipeline": "no", "include_python": "yes", "serverless": "yes"}') --profile <PROFILE> < /dev/null
  • project_name: letters, numbers, underscores only

After scaffolding, create CLAUDE.md and AGENTS.md in the project directory. These files are essential to provide agents with guidance on how to work with the project. Use this content:

# Declarative Automation Bundles Project

This project uses Declarative Automation Bundles (formerly Databricks Asset Bundles) for deployment.

## Prerequisites

Install the Databricks CLI (>= v0.288.0) if not already installed:
- macOS: `brew tap databricks/tap && brew install databricks`
- Linux: `curl -fsSL https://raw.githubusercontent.com/databricks/setup-cli/main/install.sh | sh`
- Windows: `winget install Databricks.DatabricksCLI`

Verify: `databricks -v`

## For AI Agents

Read the `databricks-core` skill for CLI basics, authentication, and deployment workflow.
Read the `databricks-jobs` skill for job-specific guidance.

If skills are not available, install them: `databricks aitools install`

Project Structure

my-job-project/
├── databricks.yml              # Bundle configuration
├── resources/
│   └── my_job.job.yml          # Job definition
├── src/
│   ├── my_notebook.ipynb       # Notebook tasks
│   └── my_module/              # Python wheel package
│       ├── __init__.py
│       └── main.py
├── tests/
│   └── test_main.py
└── pyproject.toml              # Python project config (if using wheels)

Quick Start

Asset Bundles (DABs) — recommended

# resources/jobs.yml
resources:
  jobs:
    my_etl_job:
      name: "[${bundle.target}] My ETL Job"
      tasks:
        - task_key: extract
          notebook_task:
            notebook_path: ../src/notebooks/extract.py

Python SDK

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import Task, NotebookTask, Source

w = WorkspaceClient()

job = w.jobs.create(
    name="my-etl-job",
    tasks=[
        Task(
            task_key="extract",
            notebook_task=NotebookTask(
                notebook_path="/Workspace/Shared/etl/extract",
                source=Source.WORKSPACE,
            ),
        ),
    ],
)
print(f"Created job: {job.job_id}")

CLI

databricks jobs create --json '{
  "name": "my-etl-job",
  "tasks": [{
    "task_key": "extract",
    "notebook_task": {
      "notebook_path": "/Workspace/Shared/etl/extract",
      "source": "WORKSPACE"
    }
  }]
}'

Core Concepts

Multi-Task Workflows

Jobs support DAG-based task dependencies:

tasks:
  - task_key: extract
    notebook_task:
      notebook_path: ../src/extract.py

  - task_key: transform
    depends_on:
      - task_key: extract
    notebook_task:
      notebook_path: ../src/transform.py

  - task_key: load
    depends_on:
      - task_key: transform
    run_if: ALL_SUCCESS  # Only run if all dependencies succeed
    notebook_task:
      notebook_path: ../src/load.py

run_if conditions:

  • ALL_SUCCESS (default) — run when all dependencies succeed
  • ALL_DONE — run when all dependencies complete (success or failure)
  • AT_LEAST_ONE_SUCCESS — run when at least one dependency succeeds
  • NONE_FAILED — run when no dependencies failed
  • ALL_FAILED — run when all dependencies failed
  • AT_LEAST_ONE_FAILED — run when at least one dependency failed

Task Types Summary

Task Type Use Case Reference
notebook_task Run notebooks references/task-types.md#notebook-task
spark_python_task Run Python scripts references/task-types.md#spark-python-task
python_wheel_task Run Python wheels references/task-types.md#python-wheel-task
sql_task Run SQL queries/files/dashboards/alerts references/task-types.md#sql-task
dbt_task Run dbt projects references/task-types.md#dbt-task
pipeline_task Trigger SDP (formerly DLT) pipelines references/task-types.md#pipeline-task
spark_jar_task Run Spark JARs references/task-types.md#spark-jar-task
run_job_task Trigger other jobs references/task-types.md#run-job-task
for_each_task Loop over inputs references/task-types.md#for-each-task

Trigger Types Summary

Trigger Type Use Case Reference
schedule Cron-based scheduling references/triggers-schedules.md#cron-schedule
trigger.periodic Interval-based references/triggers-schedules.md#periodic-trigger
trigger.file_arrival File arrival events references/triggers-schedules.md#file-arrival-trigger
trigger.table_update Unity Catalog table change events references/triggers-schedules.md#table-update-trigger
continuous Always-running jobs references/triggers-schedules.md#continuous-jobs

Compute Configuration

Job Clusters (recommended)

Define reusable cluster configurations shared across tasks:

job_clusters:
  - job_cluster_key: shared_cluster
    new_cluster:
      spark_version: "15.4.x-scala2.12"
      node_type_id: "i3.xlarge"
      num_workers: 2
      spark_conf:
        spark.speculation: "true"

tasks:
  - task_key: my_task
    job_cluster_key: shared_cluster
    notebook_task:
      notebook_path: ../src/notebook.py

Autoscaling Clusters

new_cluster:
  spark_version: "15.4.x-scala2.12"
  node_type_id: "i3.xlarge"
  autoscale:
    min_workers: 2
    max_workers: 8

Existing Cluster

tasks:
  - task_key: my_task
    existing_cluster_id: "0123-456789-abcdef12"
    notebook_task:
      notebook_path: ../src/notebook.py

Serverless Compute

For notebook and Python tasks, omit cluster configuration to use serverless:

tasks:
  - task_key: serverless_task
    notebook_task:
      notebook_path: ../src/notebook.py
    # No cluster config = serverless

Job Parameters

Parameters defined at job level are passed to ALL tasks (no need to repeat per task):

parameters:
  - name: env
    default: "dev"
  - name: date
    default: "{{start_date}}"  # Dynamic value reference

Access in notebooks:

catalog = dbutils.widgets.get("env")
load_date = dbutils.widgets.get("date")

Pass to specific tasks:

tasks:
  - task_key: my_task
    notebook_task:
      notebook_path: ../src/notebook.py
      base_parameters:
        env: "{{job.parameters.env}}"
        custom_param: "value"

Common Operations

Python SDK

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

# List jobs
jobs = w.jobs.list()

# Get job details
job = w.jobs.get(job_id=12345)

# Run job now
run = w.jobs.run_now(job_id=12345)

# Run with parameters
run = w.jobs.run_now(
    job_id=12345,
    job_parameters={"env": "prod", "date": "2024-01-15"},
)

# Cancel run
w.jobs.cancel_run(run_id=run.run_id)

# Delete job
w.jobs.delete(job_id=12345)

CLI

# List jobs
databricks jobs list

# Get job details
databricks jobs get 12345

# Run job
databricks jobs run-now 12345

# Run with parameters (must use --json with job_id inside)
databricks jobs run-now --json '{"job_id": 12345, "job_parameters": {"env": "prod"}}'

# Cancel run
databricks jobs cancel-run 67890

# Delete job
databricks jobs delete 12345

Asset Bundle Operations

# Validate configuration
databricks bundle validate --profile <profile>

# Deploy to a target
databricks bundle deploy -t dev --profile <profile>

# Run a job
databricks bundle run <job_name> -t dev --profile <profile>

# Check run status
databricks jobs get-run --run-id <id> --profile <profile>

# Destroy resources
databricks bundle destroy --auto-approve

Permissions (DABs)

resources:
  jobs:
    my_job:
      name: "My Job"
      permissions:
        - level: CAN_VIEW
          group_name: "data-analysts"
        - level: CAN_MANAGE_RUN
          group_name: "data-engineers"
        - level: CAN_MANAGE
          user_name: "admin@example.com"

Permission levels:

  • CAN_VIEW — view job and run history
  • CAN_MANAGE_RUN — view, trigger, and cancel runs
  • CAN_MANAGE — full control including edit and delete

Unit Testing

Run unit tests locally:

uv run pytest

Development Workflow

  1. Validate: databricks bundle validate --profile <profile>
  2. Deploy: databricks bundle deploy -t dev --profile <profile>
  3. Run: databricks bundle run <job_name> -t dev --profile <profile>
  4. Check run status: databricks jobs get-run --run-id <id> --profile <profile>

Common Issues

Issue Solution
Job cluster startup slow Use job clusters with job_cluster_key for reuse across tasks
Task dependencies not working Verify task_key references match exactly in depends_on
Schedule not triggering Check pause_status: UNPAUSED and valid timezone
File arrival not detecting Ensure path has proper permissions and uses cloud storage URL
Table update trigger missing events Verify Unity Catalog table and proper grants
Parameter not accessible Use dbutils.widgets.get() in notebooks
admins group error Cannot modify admins permissions on jobs
Serverless task fails Ensure task type supports serverless (notebook, Python)

Related Skills

  • databricks-dabs — DABs configuration patterns shared by jobs and pipelines
  • databricks-pipelines — SDP (formerly DLT) pipelines triggered by pipeline_task

Documentation

原文・著作権は Anthropic および各プラグイン作者に帰属します。日本語訳は Claude API による自動翻訳です。