🤖databricks-ai-functions
- プラグイン
- databricks
- ソース
- GitHub で見る ↗
説明
次のような場合に使用: DatabricksのビルトインAI関数(ai_classify、ai_extract、ai_summarize、ai_mask、ai_translate、ai_fix_grammar、ai_gen、ai_analyze_sentiment、ai_similarity、ai_parse_document、ai_query、ai_forecast)を活用して、モデルエンドポイントを管理することなく、SQLおよびPySparkパイプラインに直接AI機能を追加したい場合。 また、ドキュメントの解析や、カスタムRAGパイプライン(解析 → チャンク分割 → インデックス作成 → クエリ)の構築にも対応しています。
原文を表示
Use Databricks built-in AI Functions (ai_classify, ai_extract, ai_summarize, ai_mask, ai_translate, ai_fix_grammar, ai_gen, ai_analyze_sentiment, ai_similarity, ai_parse_document, ai_query, ai_forecast) to add AI capabilities directly to SQL and PySpark pipelines without managing model endpoints. Also covers document parsing and building custom RAG pipelines (parse → chunk → index → query).
ユースケース
- ✓SQLパイプラインにAI機能を追加したい
- ✓PySparkパイプラインにAI機能を追加したい
- ✓ドキュメントを解析する
- ✓カスタムRAGパイプラインを構築する
本文(日本語訳)
Databricks AI Functions
公式ドキュメント: https://docs.databricks.com/large-language-models/ai-functions 個別関数リファレンス: https://docs.databricks.com/sql/language-manual/functions/
概要
Databricks AI Functions は、Foundation Model API をデータパイプラインから直接呼び出せる組み込みの SQL / PySpark 関数です。
モデルエンドポイントのセットアップ不要、APIキー不要、ボイラープレートコード不要で利用できます。
UPPER() や LENGTH() と同じ感覚でテーブルの列に対して操作でき、大規模バッチ推論向けに最適化されています。
関数は以下の3カテゴリに分類されます:
| カテゴリ | 関数 | 次のような場合に使用 |
|---|---|---|
| タスク特化型 | ai_analyze_sentiment, ai_classify, ai_extract, ai_fix_grammar, ai_gen, ai_mask, ai_similarity, ai_summarize, ai_translate, ai_parse_document |
タスクが明確に定義されている場合 — 常にこちらを優先すること |
| 汎用型 | ai_query |
複雑なネスト JSON、カスタムエンドポイント、マルチモーダル — 最終手段としてのみ使用 |
| テーブル値型 | ai_forecast |
時系列予測 |
関数選択のルール — ai_query よりも常にタスク特化型関数を優先すること:
| タスク | 使用する関数 | ai_query にフォールバックするケース |
|---|---|---|
| センチメント分析 | ai_analyze_sentiment |
なし |
| 固定ラベルのルーティング | ai_classify(ラベル数: 2〜500件。精度向上のため説明を追加推奨) |
なし |
| エンティティ / フィールド抽出 | ai_extract |
なし |
| 要約 | ai_summarize |
なし — 上限なしにするには max_words=0 を使用 |
| 文法修正 | ai_fix_grammar |
なし |
| 翻訳 | ai_translate |
対象言語がサポートリストに含まれていない場合 |
| PII マスキング | ai_mask |
なし |
| 自由形式の生成 | ai_gen |
構造化 JSON 出力が必要な場合 |
| セマンティック類似度 | ai_similarity |
なし |
| PDF / ドキュメント解析 | ai_parse_document |
画像レベルの推論が必要な場合 |
| 複雑な JSON / 推論 | — | これが ai_query の本来の用途 |
前提条件
- Databricks SQL ウェアハウス(Classic 以外)、または DBR 15.1+ のクラスター
- バッチワークロードには DBR 15.4 ML LTS を推奨
ai_parse_documentの使用には DBR 17.1+ が必須ai_forecastには Pro または Serverless SQL ウェアハウスが必要- バッチ AI 推論がサポートされている AWS / Azure リージョンのワークスペース
- モデルは Apache 2.0 または LLAMA 3.3 Community License のもとで動作 — ライセンス遵守はお客様の責任となります
クイックスタート
1つのクエリで、テキスト列の分類・抽出・センチメント分析をまとめて実行する例:
SELECT
ticket_id,
ticket_text,
ai_classify(ticket_text, ARRAY('urgent', 'not urgent', 'spam')) AS priority,
ai_extract(ticket_text, '["product", "error_code", "date"]') AS entities,
ai_analyze_sentiment(ticket_text) AS sentiment
FROM support_tickets;
from pyspark.sql.functions import expr
df = spark.table("support_tickets")
df = (
df.withColumn("priority", expr("ai_classify(ticket_text, array('urgent', 'not urgent', 'spam'))"))
.withColumn("entities", expr("ai_extract(ticket_text, '[\"product\", \"error_code\", \"date\"]')"))
.withColumn("sentiment", expr("ai_analyze_sentiment(ticket_text)"))
)
# ai_extract は VARIANT を返します(:response 配下のフィールド)。
# VARIANT に対してはコロン (:) 記法を使用してください — ドット記法では NULL が返ります。
df.selectExpr("ticket_id", "priority", "sentiment",
"entities:response:product::string AS product",
"entities:response:error_code::string AS error_code",
"entities:response:date::string AS date").display()
よくある使用パターン
パターン 1: テキスト分析パイプライン
複数のタスク特化型関数を連鎖させ、1回のパスでテキスト列を一括エンリッチする:
SELECT
id,
content,
ai_analyze_sentiment(content) AS sentiment,
ai_summarize(content, 30) AS summary,
ai_classify(content,
ARRAY('technical', 'billing', 'other')) AS category,
ai_fix_grammar(content) AS content_clean
FROM raw_feedback;
パターン 2: 保存前の PII マスキング
from pyspark.sql.functions import expr
df_clean = (
spark.table("raw_messages")
.withColumn(
"message_safe",
expr("ai_mask(message, array('person', 'email', 'phone', 'address'))")
)
)
df_clean.write.format("delta").mode("append").saveAsTable("catalog.schema.messages_safe")
パターン 3: Unity Catalog Volume からのドキュメント取り込み
PDF / Office ドキュメントを解析し、タスク特化型関数でエンリッチする:
from pyspark.sql.functions import expr
df = (
spark.read.format("binaryFile")
.load("/Volumes/catalog/schema/landing/documents/")
.withColumn("parsed", expr("ai_parse_document(content)"))
# ai_parse_document は VARIANT を返します — ナビゲートにはコロン (:) 演算子を使用し、ドットは使わないでください。
# 構造: { "document": { "pages": [...], "elements": [...] }, "error_status": ..., "metadata": ... }
.selectExpr("path",
"concat_ws('\n', transform(parsed:document:elements, e -> e:content::STRING)) AS text_blocks",
"parsed:error_status AS parse_error")
.filter("parse_error IS NULL")
.withColumn("summary", expr("ai_summarize(text_blocks, 50)"))
.withColumn("entities", expr("ai_extract(text_blocks, '[\"date\", \"amount\", \"vendor\"]')"))
)
パターン 4: セマンティックマッチング / 重複排除
-- 会社名の近似重複を検出する
SELECT a.id, b.id, ai_similarity(a.name, b.name) AS score
FROM companies a
JOIN companies b ON a.id < b.id
WHERE ai_similarity(a.name, b.name) > 0.85;
パターン 5: ai_query による複雑な JSON 抽出(最終手段)
出力スキーマにネスト配列が含まれる場合や、タスク特化型関数では対応できない多段階推論が必要な場合にのみ使用すること:
from pyspark.sql.functions import expr, from_json, col
df = (
spark.table("parsed_documents")
.withColumn("ai_response", expr("""
ai_query(
'databricks-claude-sonnet-4',
concat('Extract invoice as JSON with nested itens array: ', text_blocks),
responseFormat => '{"type":"json_object"}',
failOnError => false
)
"""))
.withColumn("invoice", from_json(
col("ai_response.response"),
"STRUCT<numero:STRING, total:DOUBLE, "
"itens:ARRAY<STRUCT<codigo:STRING, descricao:STRING, qtde:DOUBLE, vlrUnit:DOUBLE>>>"
))
)
パターン 6: 時系列予測
SELECT *
FROM ai_forecast(
observed => TABLE(SELECT date, sales FROM daily_sales),
horizon => '2026-12-31',
time_col => 'date',
value_col => 'sales'
);
-- 返却値: date, sales_forecast, sales_upper, sales_lower
リファレンスファイル
- references/1-task-functions.md — タスク特化型関数 9 種(
ai_analyze_sentiment,ai_classify,ai_extract,ai_fix_grammar,ai_gen,ai_mask,ai_similarity,ai_summarize,ai_translate)およびai_parse_documentの完全な構文・パラメーター・SQL / PySpark サンプル - references/2-ai-query.md —
ai_query完全リファレンス: 全パラメーター、responseFormatによる構造化出力、マルチモーダルfiles =>、UDF パターン、エラーハンドリング - references/3-ai-forecast.md —
ai_forecastのパラメーター、単一メトリクス / 複数グループ / 複数メトリクス / 信頼区間のパターン - references/4-document-processing-pipeline.md — Lakeflow Declarative Pipeline で AI Functions を活用したエンドツーエンドのバッチドキュメント処理パイプライン。
config.ymlによる一元管理、関数選択ロジック、カスタム RAG パイプライン(解析 → チャンク化 → Vector Search)、ニアリアルタイム向けの DSPy / LangChain ガイダンスを含む
よくある問題
| 問題 | 解決策 |
|---|---|
ai_parse_document が見つからない |
DBR 17.1+ が必要です。クラスターのランタイムバージョンを確認してください。 |
ai_forecast が失敗する |
Pro または Serverless SQL ウェアハウスが必要です — Classic や Starter では利用できません。 |
| すべての関数が NULL を返す | 入力列が NULL です。呼び出し前に WHERE col IS NOT NULL でフィルタリングしてください。 |
ai_translate が特定の言語で失敗する |
サポート言語: 英語、ドイツ語、フランス語、イタリア語、ポルトガル語、ヒンディー語、スペイン語、タイ語。それ以外の言語には多言語モデルを指定した ai_query を使用してください。 |
ai_classify が予期しないラベルを返す |
明確で相互排他的なラベル名を使用してください。ラベル数を少なく(2〜5個)すると結果が安定します。 |
バッチジョブで ai_query が一部の行でエラーになる |
failOnError => false を追加してください — エラー時に例外を投げる代わりに、.response と .errorMessage を持つ STRUCT を返します。 |
| バッチジョブの処理が遅い | バッチ推論スループット最適化のため、DBR 15.4 ML LTS クラスター(Serverless やインタラクティブクラスターではなく)を使用してください。 |
| パイプラインコードを変更せずにモデルを切り替えたい | すべてのモデル名とプロンプトを config.yml に集約してください — パターンの詳細は references/4-document-processing-pipeline.md を参照してください。 |
原文(English)を表示
Databricks AI Functions
Official Docs: https://docs.databricks.com/large-language-models/ai-functions Individual function reference: https://docs.databricks.com/sql/language-manual/functions/
Overview
Databricks AI Functions are built-in SQL and PySpark functions that call Foundation Model APIs directly from your data pipelines — no model endpoint setup, no API keys, no boilerplate. They operate on table columns as naturally as UPPER() or LENGTH(), and are optimized for batch inference at scale.
There are three categories:
| Category | Functions | Use when |
|---|---|---|
| Task-specific | ai_analyze_sentiment, ai_classify, ai_extract, ai_fix_grammar, ai_gen, ai_mask, ai_similarity, ai_summarize, ai_translate, ai_parse_document |
The task is well-defined — prefer these always |
| General-purpose | ai_query |
Complex nested JSON, custom endpoints, multimodal — last resort only |
| Table-valued | ai_forecast |
Time series forecasting |
Function selection rule — always prefer a task-specific function over ai_query:
| Task | Use this | Fall back to ai_query when... |
|---|---|---|
| Sentiment scoring | ai_analyze_sentiment |
Never |
| Fixed-label routing | ai_classify (2–500 labels; add descriptions for accuracy) |
Never |
| Entity / field extraction | ai_extract |
Never |
| Summarization | ai_summarize |
Never — use max_words=0 for uncapped |
| Grammar correction | ai_fix_grammar |
Never |
| Translation | ai_translate |
Target language not in the supported list |
| PII redaction | ai_mask |
Never |
| Free-form generation | ai_gen |
Need structured JSON output |
| Semantic similarity | ai_similarity |
Never |
| PDF / document parsing | ai_parse_document |
Need image-level reasoning |
| Complex JSON / reasoning | — | This is the intended use case for ai_query |
Prerequisites
- Databricks SQL warehouse (not Classic) or cluster with DBR 15.1+
- DBR 15.4 ML LTS recommended for batch workloads
- DBR 17.1+ required for
ai_parse_document ai_forecastrequires a Pro or Serverless SQL warehouse- Workspace in a supported AWS/Azure region for batch AI inference
- Models run under Apache 2.0 or LLAMA 3.3 Community License — customers are responsible for compliance
Quick Start
Classify, extract, and score sentiment from a text column in a single query:
SELECT
ticket_id,
ticket_text,
ai_classify(ticket_text, ARRAY('urgent', 'not urgent', 'spam')) AS priority,
ai_extract(ticket_text, '["product", "error_code", "date"]') AS entities,
ai_analyze_sentiment(ticket_text) AS sentiment
FROM support_tickets;
from pyspark.sql.functions import expr
df = spark.table("support_tickets")
df = (
df.withColumn("priority", expr("ai_classify(ticket_text, array('urgent', 'not urgent', 'spam'))"))
.withColumn("entities", expr("ai_extract(ticket_text, '[\"product\", \"error_code\", \"date\"]')"))
.withColumn("sentiment", expr("ai_analyze_sentiment(ticket_text)"))
)
# ai_extract returns a VARIANT (fields under :response). Use colon (:) notation — dot returns NULL on a VARIANT.
df.selectExpr("ticket_id", "priority", "sentiment",
"entities:response:product::string AS product",
"entities:response:error_code::string AS error_code",
"entities:response:date::string AS date").display()
Common Patterns
Pattern 1: Text Analysis Pipeline
Chain multiple task-specific functions to enrich a text column in one pass:
SELECT
id,
content,
ai_analyze_sentiment(content) AS sentiment,
ai_summarize(content, 30) AS summary,
ai_classify(content,
ARRAY('technical', 'billing', 'other')) AS category,
ai_fix_grammar(content) AS content_clean
FROM raw_feedback;
Pattern 2: PII Redaction Before Storage
from pyspark.sql.functions import expr
df_clean = (
spark.table("raw_messages")
.withColumn(
"message_safe",
expr("ai_mask(message, array('person', 'email', 'phone', 'address'))")
)
)
df_clean.write.format("delta").mode("append").saveAsTable("catalog.schema.messages_safe")
Pattern 3: Document Ingestion from a Unity Catalog Volume
Parse PDFs/Office docs, then enrich with task-specific functions:
from pyspark.sql.functions import expr
df = (
spark.read.format("binaryFile")
.load("/Volumes/catalog/schema/landing/documents/")
.withColumn("parsed", expr("ai_parse_document(content)"))
# ai_parse_document returns a VARIANT — navigate with the colon (:) operator, never dot.
# Shape: { "document": { "pages": [...], "elements": [...] }, "error_status": ..., "metadata": ... }
.selectExpr("path",
"concat_ws('\n', transform(parsed:document:elements, e -> e:content::STRING)) AS text_blocks",
"parsed:error_status AS parse_error")
.filter("parse_error IS NULL")
.withColumn("summary", expr("ai_summarize(text_blocks, 50)"))
.withColumn("entities", expr("ai_extract(text_blocks, '[\"date\", \"amount\", \"vendor\"]')"))
)
Pattern 4: Semantic Matching / Deduplication
-- Find near-duplicate company names
SELECT a.id, b.id, ai_similarity(a.name, b.name) AS score
FROM companies a
JOIN companies b ON a.id < b.id
WHERE ai_similarity(a.name, b.name) > 0.85;
Pattern 5: Complex JSON Extraction with ai_query (last resort)
Use only when the output schema has nested arrays or requires multi-step reasoning that no task-specific function handles:
from pyspark.sql.functions import expr, from_json, col
df = (
spark.table("parsed_documents")
.withColumn("ai_response", expr("""
ai_query(
'databricks-claude-sonnet-4',
concat('Extract invoice as JSON with nested itens array: ', text_blocks),
responseFormat => '{"type":"json_object"}',
failOnError => false
)
"""))
.withColumn("invoice", from_json(
col("ai_response.response"),
"STRUCT<numero:STRING, total:DOUBLE, "
"itens:ARRAY<STRUCT<codigo:STRING, descricao:STRING, qtde:DOUBLE, vlrUnit:DOUBLE>>>"
))
)
Pattern 6: Time Series Forecasting
SELECT *
FROM ai_forecast(
observed => TABLE(SELECT date, sales FROM daily_sales),
horizon => '2026-12-31',
time_col => 'date',
value_col => 'sales'
);
-- Returns: date, sales_forecast, sales_upper, sales_lower
Reference Files
- references/1-task-functions.md — Full syntax, parameters, SQL + PySpark examples for all 9 task-specific functions (
ai_analyze_sentiment,ai_classify,ai_extract,ai_fix_grammar,ai_gen,ai_mask,ai_similarity,ai_summarize,ai_translate) andai_parse_document - references/2-ai-query.md —
ai_querycomplete reference: all parameters, structured output withresponseFormat, multimodalfiles =>, UDF patterns, and error handling - references/3-ai-forecast.md —
ai_forecastparameters, single-metric, multi-group, multi-metric, and confidence interval patterns - references/4-document-processing-pipeline.md — End-to-end batch document processing pipeline using AI Functions in a Lakeflow Declarative Pipeline; includes
config.ymlcentralization, function selection logic, custom RAG pipeline (parse → chunk → Vector Search), and DSPy/LangChain guidance for near-real-time variants
Common Issues
| Issue | Solution |
|---|---|
ai_parse_document not found |
Requires DBR 17.1+. Check cluster runtime. |
ai_forecast fails |
Requires Pro or Serverless SQL warehouse — not available on Classic or Starter. |
| All functions return NULL | Input column is NULL. Filter with WHERE col IS NOT NULL before calling. |
ai_translate fails for a language |
Supported: English, German, French, Italian, Portuguese, Hindi, Spanish, Thai. Use ai_query with a multilingual model for others. |
ai_classify returns unexpected labels |
Use clear, mutually exclusive label names. Fewer labels (2–5) produces more reliable results. |
ai_query raises on some rows in a batch job |
Add failOnError => false — returns a STRUCT with .response and .errorMessage instead of raising. |
| Batch job runs slowly | Use DBR 15.4 ML LTS cluster (not serverless or interactive) for optimized batch inference throughput. |
| Want to swap models without editing pipeline code | Store all model names and prompts in config.yml — see references/4-document-processing-pipeline.md for the pattern. |
原文・著作権は Anthropic および各プラグイン作者に帰属します。日本語訳は Claude API による自動翻訳です。