🏗️databricks-jobs
- プラグイン
- databricks
- ソース
- GitHub で見る ↗
説明
Databricksにおいて、DABs・Python SDK・CLIを通じてLakeflow Jobsを開発・デプロイします。 次のような場合に使用: ノートブック、Pythonホイール、SQL、dbt、またはパイプラインを使用したデータエンジニアリングジョブを作成する場合。 実装を開始する**前に**呼び出してください。
原文を表示
Develop and deploy Lakeflow Jobs on Databricks via DABs, Python SDK, or the CLI. Use when creating data engineering jobs with notebooks, Python wheels, SQL, dbt, or pipelines. Invoke BEFORE starting implementation.
ユースケース
- ✓データエンジニアリングジョブを作成する
- ✓Lakehouse Jobsを開発・デプロイする
- ✓実装開始前に準備する
本文(日本語訳)
Lakeflow Jobs 開発
最初に: CLI の基本操作、認証、プロファイル選択、データ探索コマンドについては、親スキルである databricks-core を参照してください。
Lakeflow Jobs は、マルチタスク DAG・柔軟なトリガー・包括的なモニタリングを備えたデータワークフローのオーケストレーションツールです。
Jobs は多様なタスクタイプをサポートしており、Asset Bundles(DABs)・Python SDK・CLI を通じて管理できます。
リファレンスファイル
| ユースケース | リファレンスファイル |
|---|---|
| タスクタイプの設定(notebook、Python、SQL、dbt、pipeline、JAR、run_job、for_each) | references/task-types.md |
| トリガーとスケジュールの設定(cron、periodic、ファイル到着、テーブル更新、継続実行) | references/triggers-schedules.md |
| 通知・ヘルスルール・リトライ・タイムアウト・キューの設定 | references/notifications-monitoring.md |
| 完全なサンプル集(ETL、ウェアハウスリフレッシュ、イベント駆動、ML トレーニング、マルチ環境、ストリーミング、クロスジョブ) | references/examples.md |
新規 Job プロジェクトのスキャフォールディング
設定ファイルを使って databricks bundle init を非インタラクティブに実行します。
これにより <project_name>/ ディレクトリにプロジェクトが作成されます:
databricks bundle init default-python --config-file <(echo '{"project_name": "my_job", "include_job": "yes", "include_pipeline": "no", "include_python": "yes", "serverless": "yes"}') --profile <PROFILE> < /dev/null
project_name: 使用できる文字は英数字とアンダースコアのみ
スキャフォールディング完了後、プロジェクトディレクトリに CLAUDE.md と AGENTS.md を作成してください。
これらのファイルは、agent がプロジェクトをどのように扱うかを把握するために不可欠です。
以下の内容を使用してください:
# Declarative Automation Bundles プロジェクト
このプロジェクトはデプロイメントに Declarative Automation Bundles(旧称: Databricks Asset Bundles)を使用しています。
## 前提条件
Databricks CLI(>= v0.288.0)がインストールされていない場合はインストールしてください:
- macOS: `brew tap databricks/tap && brew install databricks`
- Linux: `curl -fsSL https://raw.githubusercontent.com/databricks/setup-cli/main/install.sh | sh`
- Windows: `winget install Databricks.DatabricksCLI`
確認: `databricks -v`
## AI Agent 向け
CLI の基本操作、認証、デプロイメントワークフローについては `databricks-core` スキルを参照してください。
Job 固有のガイダンスについては `databricks-jobs` スキルを参照してください。
スキルが利用できない場合は、以下のコマンドでインストールしてください: `databricks aitools install`
プロジェクト構成
my-job-project/
├── databricks.yml # Bundle 設定
├── resources/
│ └── my_job.job.yml # Job 定義
├── src/
│ ├── my_notebook.ipynb # Notebook タスク
│ └── my_module/ # Python wheel パッケージ
│ ├── __init__.py
│ └── main.py
├── tests/
│ └── test_main.py
└── pyproject.toml # Python プロジェクト設定(wheel 使用時)
クイックスタート
Asset Bundles(DABs)— 推奨
# resources/jobs.yml
resources:
jobs:
my_etl_job:
name: "[${bundle.target}] My ETL Job"
tasks:
- task_key: extract
notebook_task:
notebook_path: ../src/notebooks/extract.py
Python SDK
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import Task, NotebookTask, Source
w = WorkspaceClient()
job = w.jobs.create(
name="my-etl-job",
tasks=[
Task(
task_key="extract",
notebook_task=NotebookTask(
notebook_path="/Workspace/Shared/etl/extract",
source=Source.WORKSPACE,
),
),
],
)
print(f"Created job: {job.job_id}")
CLI
databricks jobs create --json '{
"name": "my-etl-job",
"tasks": [{
"task_key": "extract",
"notebook_task": {
"notebook_path": "/Workspace/Shared/etl/extract",
"source": "WORKSPACE"
}
}]
}'
コアコンセプト
マルチタスクワークフロー
Jobs は DAG ベースのタスク依存関係をサポートしています:
tasks:
- task_key: extract
notebook_task:
notebook_path: ../src/extract.py
- task_key: transform
depends_on:
- task_key: extract
notebook_task:
notebook_path: ../src/transform.py
- task_key: load
depends_on:
- task_key: transform
run_if: ALL_SUCCESS # 全依存タスクが成功した場合のみ実行
notebook_task:
notebook_path: ../src/load.py
run_if の条件:
ALL_SUCCESS(デフォルト)— 全依存タスクが成功した場合に実行ALL_DONE— 全依存タスクが完了した場合に実行(成功・失敗を問わない)AT_LEAST_ONE_SUCCESS— 少なくとも1つの依存タスクが成功した場合に実行NONE_FAILED— 失敗した依存タスクが存在しない場合に実行ALL_FAILED— 全依存タスクが失敗した場合に実行AT_LEAST_ONE_FAILED— 少なくとも1つの依存タスクが失敗した場合に実行
タスクタイプ一覧
| タスクタイプ | ユースケース | リファレンス |
|---|---|---|
notebook_task |
Notebook の実行 | references/task-types.md#notebook-task |
spark_python_task |
Python スクリプトの実行 | references/task-types.md#spark-python-task |
python_wheel_task |
Python wheel の実行 | references/task-types.md#python-wheel-task |
sql_task |
SQL クエリ・ファイル・ダッシュボード・アラートの実行 | references/task-types.md#sql-task |
dbt_task |
dbt プロジェクトの実行 | references/task-types.md#dbt-task |
pipeline_task |
SDP(旧称: DLT)パイプラインのトリガー | references/task-types.md#pipeline-task |
spark_jar_task |
Spark JAR の実行 | references/task-types.md#spark-jar-task |
run_job_task |
他の Job のトリガー | references/task-types.md#run-job-task |
for_each_task |
入力のループ処理 | references/task-types.md#for-each-task |
トリガータイプ一覧
| トリガータイプ | ユースケース | リファレンス |
|---|---|---|
schedule |
cron ベースのスケジューリング | references/triggers-schedules.md#cron-schedule |
trigger.periodic |
インターバルベース | references/triggers-schedules.md#periodic-trigger |
trigger.file_arrival |
ファイル到着イベント | references/triggers-schedules.md#file-arrival-trigger |
trigger.table_update |
Unity Catalog テーブル変更イベント | references/triggers-schedules.md#table-update-trigger |
continuous |
常時稼働 Job | references/triggers-schedules.md#continuous-jobs |
コンピュート設定
Job クラスター(推奨)
タスク間で共有できる再利用可能なクラスター設定を定義します:
job_clusters:
- job_cluster_key: shared_cluster
new_cluster:
spark_version: "15.4.x-scala2.12"
node_type_id: "i3.xlarge"
num_workers: 2
spark_conf:
spark.speculation: "true"
tasks:
- task_key: my_task
job_cluster_key: shared_cluster
notebook_task:
notebook_path: ../src/notebook.py
オートスケールクラスター
new_cluster:
spark_version: "15.4.x-scala2.12"
node_type_id: "i3.xlarge"
autoscale:
min_workers: 2
max_workers: 8
既存クラスター
tasks:
- task_key: my_task
existing_cluster_id: "0123-456789-abcdef12"
notebook_task:
notebook_path: ../src/notebook.py
サーバーレスコンピュート
Notebook および Python タスクでサーバーレスを使用するには、クラスター設定を省略します:
tasks:
- task_key: serverless_task
notebook_task:
notebook_path: ../src/notebook.py
# クラスター設定なし = サーバーレス
Job パラメーター
Job レベルで定義したパラメーターは、全タスクに渡されます(タスクごとに繰り返す必要はありません):
parameters:
- name: env
default: "dev"
- name: date
default: "{{start_date}}" # 動的な値の参照
Notebook 内でのアクセス方法:
catalog = dbutils.widgets.get("env")
load_date = dbutils.widgets.get("date")
特定タスクへの渡し方:
tasks:
- task_key: my_task
notebook_task:
notebook_path: ../src/notebook.py
base_parameters:
env: "{{job.parameters.env}}"
custom_param: "value"
基本操作
Python SDK
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Job 一覧の取得
jobs = w.jobs.list()
# Job の詳細取得
job = w.jobs.get(job_id=12345)
# Job を即時実行
run = w.jobs.run_now(job_id=12345)
# パラメーター付きで実行
run = w.jobs.run_now(
job_id=12345,
job_parameters={"env": "prod", "date": "2024-01-15"},
)
# 実行のキャンセル
w.jobs.cancel_run(run_id=run.run_id)
# Job の削除
w.jobs.delete(job_id=12345)
CLI
# Job 一覧の取得
databricks jobs list
# Job の詳細取得
databricks jobs get 12345
# Job の実行
databricks jobs run-now 12345
# パラメーター付きで実行(--json 内に job_id を含める必要あり)
databricks jobs run-now --json '{"job_id": 12345, "job_parameters": {"env": "prod"}}'
# 実行のキャンセル
databricks jobs cancel-run 67890
# Job の削除
databricks jobs delete 12345
Asset Bundle 操作
# 設定の検証
databricks bundle validate --profile <profile>
# ターゲットへのデプロイ
databricks bundle deploy -t dev --profile <profile>
# Job の実行
databricks bundle run <job_name> -t dev --profile <profile>
# 実行ステータスの確認
databricks jobs get-run --run-id <id> --profile <profile>
# リソースの削除
databricks bundle destroy --auto-approve
パーミッション(DABs)
resources:
jobs:
my_job:
name: "My Job"
permissions:
- level: CAN_VIEW
group_name: "data-analysts"
- level: CAN_MANAGE_RUN
group_name: "data-engineers"
- level: CAN_MANAGE
user_name: "admin@example.com"
パーミッションレベル:
CAN_VIEW— Job および実行履歴の閲覧CAN_MANAGE_RUN— 閲覧・実行トリガー・実行キャンセルCAN_MANAGE— 編集・削除を含む完全な制御
ユニットテスト
ローカルでユニットテストを実行します:
uv run pytest
開発ワークフロー
- 検証:
databricks bundle validate --profile <profile> - **デ
原文(English)を表示
Lakeflow Jobs Development
FIRST: Use the parent databricks-core skill for CLI basics, authentication, profile selection, and data exploration commands.
Lakeflow Jobs orchestrate data workflows with multi-task DAGs, flexible triggers, and comprehensive monitoring. Jobs support diverse task types and can be managed via Asset Bundles (DABs), Python SDK, or CLI.
Reference Files
| Use Case | Reference File |
|---|---|
| Configure task types (notebook, Python, SQL, dbt, pipeline, JAR, run_job, for_each) | references/task-types.md |
| Set up triggers and schedules (cron, periodic, file arrival, table update, continuous) | references/triggers-schedules.md |
| Configure notifications, health rules, retries, timeouts, queues | references/notifications-monitoring.md |
| Complete worked examples (ETL, warehouse refresh, event-driven, ML training, multi-env, streaming, cross-job) | references/examples.md |
Scaffolding a New Job Project
Use databricks bundle init with a config file to scaffold non-interactively. This creates a project in the <project_name>/ directory:
databricks bundle init default-python --config-file <(echo '{"project_name": "my_job", "include_job": "yes", "include_pipeline": "no", "include_python": "yes", "serverless": "yes"}') --profile <PROFILE> < /dev/null
project_name: letters, numbers, underscores only
After scaffolding, create CLAUDE.md and AGENTS.md in the project directory. These files are essential to provide agents with guidance on how to work with the project. Use this content:
# Declarative Automation Bundles Project
This project uses Declarative Automation Bundles (formerly Databricks Asset Bundles) for deployment.
## Prerequisites
Install the Databricks CLI (>= v0.288.0) if not already installed:
- macOS: `brew tap databricks/tap && brew install databricks`
- Linux: `curl -fsSL https://raw.githubusercontent.com/databricks/setup-cli/main/install.sh | sh`
- Windows: `winget install Databricks.DatabricksCLI`
Verify: `databricks -v`
## For AI Agents
Read the `databricks-core` skill for CLI basics, authentication, and deployment workflow.
Read the `databricks-jobs` skill for job-specific guidance.
If skills are not available, install them: `databricks aitools install`
Project Structure
my-job-project/
├── databricks.yml # Bundle configuration
├── resources/
│ └── my_job.job.yml # Job definition
├── src/
│ ├── my_notebook.ipynb # Notebook tasks
│ └── my_module/ # Python wheel package
│ ├── __init__.py
│ └── main.py
├── tests/
│ └── test_main.py
└── pyproject.toml # Python project config (if using wheels)
Quick Start
Asset Bundles (DABs) — recommended
# resources/jobs.yml
resources:
jobs:
my_etl_job:
name: "[${bundle.target}] My ETL Job"
tasks:
- task_key: extract
notebook_task:
notebook_path: ../src/notebooks/extract.py
Python SDK
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import Task, NotebookTask, Source
w = WorkspaceClient()
job = w.jobs.create(
name="my-etl-job",
tasks=[
Task(
task_key="extract",
notebook_task=NotebookTask(
notebook_path="/Workspace/Shared/etl/extract",
source=Source.WORKSPACE,
),
),
],
)
print(f"Created job: {job.job_id}")
CLI
databricks jobs create --json '{
"name": "my-etl-job",
"tasks": [{
"task_key": "extract",
"notebook_task": {
"notebook_path": "/Workspace/Shared/etl/extract",
"source": "WORKSPACE"
}
}]
}'
Core Concepts
Multi-Task Workflows
Jobs support DAG-based task dependencies:
tasks:
- task_key: extract
notebook_task:
notebook_path: ../src/extract.py
- task_key: transform
depends_on:
- task_key: extract
notebook_task:
notebook_path: ../src/transform.py
- task_key: load
depends_on:
- task_key: transform
run_if: ALL_SUCCESS # Only run if all dependencies succeed
notebook_task:
notebook_path: ../src/load.py
run_if conditions:
ALL_SUCCESS(default) — run when all dependencies succeedALL_DONE— run when all dependencies complete (success or failure)AT_LEAST_ONE_SUCCESS— run when at least one dependency succeedsNONE_FAILED— run when no dependencies failedALL_FAILED— run when all dependencies failedAT_LEAST_ONE_FAILED— run when at least one dependency failed
Task Types Summary
| Task Type | Use Case | Reference |
|---|---|---|
notebook_task |
Run notebooks | references/task-types.md#notebook-task |
spark_python_task |
Run Python scripts | references/task-types.md#spark-python-task |
python_wheel_task |
Run Python wheels | references/task-types.md#python-wheel-task |
sql_task |
Run SQL queries/files/dashboards/alerts | references/task-types.md#sql-task |
dbt_task |
Run dbt projects | references/task-types.md#dbt-task |
pipeline_task |
Trigger SDP (formerly DLT) pipelines | references/task-types.md#pipeline-task |
spark_jar_task |
Run Spark JARs | references/task-types.md#spark-jar-task |
run_job_task |
Trigger other jobs | references/task-types.md#run-job-task |
for_each_task |
Loop over inputs | references/task-types.md#for-each-task |
Trigger Types Summary
| Trigger Type | Use Case | Reference |
|---|---|---|
schedule |
Cron-based scheduling | references/triggers-schedules.md#cron-schedule |
trigger.periodic |
Interval-based | references/triggers-schedules.md#periodic-trigger |
trigger.file_arrival |
File arrival events | references/triggers-schedules.md#file-arrival-trigger |
trigger.table_update |
Unity Catalog table change events | references/triggers-schedules.md#table-update-trigger |
continuous |
Always-running jobs | references/triggers-schedules.md#continuous-jobs |
Compute Configuration
Job Clusters (recommended)
Define reusable cluster configurations shared across tasks:
job_clusters:
- job_cluster_key: shared_cluster
new_cluster:
spark_version: "15.4.x-scala2.12"
node_type_id: "i3.xlarge"
num_workers: 2
spark_conf:
spark.speculation: "true"
tasks:
- task_key: my_task
job_cluster_key: shared_cluster
notebook_task:
notebook_path: ../src/notebook.py
Autoscaling Clusters
new_cluster:
spark_version: "15.4.x-scala2.12"
node_type_id: "i3.xlarge"
autoscale:
min_workers: 2
max_workers: 8
Existing Cluster
tasks:
- task_key: my_task
existing_cluster_id: "0123-456789-abcdef12"
notebook_task:
notebook_path: ../src/notebook.py
Serverless Compute
For notebook and Python tasks, omit cluster configuration to use serverless:
tasks:
- task_key: serverless_task
notebook_task:
notebook_path: ../src/notebook.py
# No cluster config = serverless
Job Parameters
Parameters defined at job level are passed to ALL tasks (no need to repeat per task):
parameters:
- name: env
default: "dev"
- name: date
default: "{{start_date}}" # Dynamic value reference
Access in notebooks:
catalog = dbutils.widgets.get("env")
load_date = dbutils.widgets.get("date")
Pass to specific tasks:
tasks:
- task_key: my_task
notebook_task:
notebook_path: ../src/notebook.py
base_parameters:
env: "{{job.parameters.env}}"
custom_param: "value"
Common Operations
Python SDK
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# List jobs
jobs = w.jobs.list()
# Get job details
job = w.jobs.get(job_id=12345)
# Run job now
run = w.jobs.run_now(job_id=12345)
# Run with parameters
run = w.jobs.run_now(
job_id=12345,
job_parameters={"env": "prod", "date": "2024-01-15"},
)
# Cancel run
w.jobs.cancel_run(run_id=run.run_id)
# Delete job
w.jobs.delete(job_id=12345)
CLI
# List jobs
databricks jobs list
# Get job details
databricks jobs get 12345
# Run job
databricks jobs run-now 12345
# Run with parameters (must use --json with job_id inside)
databricks jobs run-now --json '{"job_id": 12345, "job_parameters": {"env": "prod"}}'
# Cancel run
databricks jobs cancel-run 67890
# Delete job
databricks jobs delete 12345
Asset Bundle Operations
# Validate configuration
databricks bundle validate --profile <profile>
# Deploy to a target
databricks bundle deploy -t dev --profile <profile>
# Run a job
databricks bundle run <job_name> -t dev --profile <profile>
# Check run status
databricks jobs get-run --run-id <id> --profile <profile>
# Destroy resources
databricks bundle destroy --auto-approve
Permissions (DABs)
resources:
jobs:
my_job:
name: "My Job"
permissions:
- level: CAN_VIEW
group_name: "data-analysts"
- level: CAN_MANAGE_RUN
group_name: "data-engineers"
- level: CAN_MANAGE
user_name: "admin@example.com"
Permission levels:
CAN_VIEW— view job and run historyCAN_MANAGE_RUN— view, trigger, and cancel runsCAN_MANAGE— full control including edit and delete
Unit Testing
Run unit tests locally:
uv run pytest
Development Workflow
- Validate:
databricks bundle validate --profile <profile> - Deploy:
databricks bundle deploy -t dev --profile <profile> - Run:
databricks bundle run <job_name> -t dev --profile <profile> - Check run status:
databricks jobs get-run --run-id <id> --profile <profile>
Common Issues
| Issue | Solution |
|---|---|
| Job cluster startup slow | Use job clusters with job_cluster_key for reuse across tasks |
| Task dependencies not working | Verify task_key references match exactly in depends_on |
| Schedule not triggering | Check pause_status: UNPAUSED and valid timezone |
| File arrival not detecting | Ensure path has proper permissions and uses cloud storage URL |
| Table update trigger missing events | Verify Unity Catalog table and proper grants |
| Parameter not accessible | Use dbutils.widgets.get() in notebooks |
admins group error |
Cannot modify admins permissions on jobs |
| Serverless task fails | Ensure task type supports serverless (notebook, Python) |
Related Skills
- databricks-dabs — DABs configuration patterns shared by jobs and pipelines
- databricks-pipelines — SDP (formerly DLT) pipelines triggered by
pipeline_task
Documentation
原文・著作権は Anthropic および各プラグイン作者に帰属します。日本語訳は Claude API による自動翻訳です。