claude-skills/

Anthropic公式スキル・プラグインの日本語ディレクトリ

last sync 22h ago
スキルOfficialdatabase

📊databricks-zerobus-ingest

プラグイン
databricks

説明

gRPC を使用して Databricks Delta テーブルへのニアリアルタイムデータ取り込みを行う、Zerobus Ingest クライアントを構築します。 次のような場合に使用: - メッセージバスを介さず Unity Catalog テーブルに直接書き込むプロデューサーを作成する場合 - Python / Java / Go / TypeScript / Rust で Zerobus Ingest SDK を使用する場合 - UC テーブルから Protobuf スキーマを生成する場合 - ACK ハンドリングおよびリトライロジックを含むストリームベースの取り込みを実装する場合

原文を表示

Build Zerobus Ingest clients for near real-time data ingestion into Databricks Delta tables via gRPC. Use when creating producers that write directly to Unity Catalog tables without a message bus, working with the Zerobus Ingest SDK in Python/Java/Go/TypeScript/Rust, generating Protobuf schemas from UC tables, or implementing stream-based ingestion with ACK handling and retry logic.

ユースケース

  • メッセージバスを介さず直接書き込む
  • 複数言語でSDKを使用する
  • UC テーブルから Protobuf スキーマを生成
  • ストリームベースの取り込みを実装

本文(日本語訳)

Zerobus Ingest

Zerobus gRPC API を介して、データを Databricks Delta テーブルに直接取り込むクライアントを構築します。

ステータス: GA(2026年2月より一般提供開始。Lakeflow Jobs Serverless SKU として課金)

ドキュメント:


Zerobus Ingest とは?

Zerobus Ingest は、gRPC を介して Delta テーブルへのレコード単位の直接データ取り込みを実現する、サーバーレスコネクターです。 レイクハウスへのデータ投入において、メッセージバスインフラ(Kafka、Kinesis、Event Hub)が不要になります。 このサービスはスキーマの検証、ターゲットテーブルへのデータのマテリアライズ、およびクライアントへの耐久性確認(ACK)の返送を行います。

基本パターン: SDK 初期化 → ストリーム作成 → レコード取り込み → ACK 処理 → フラッシュ → クローズ


判断の早見表: 何を構築しますか?

シナリオ 言語 シリアライゼーション 参考資料
クイックプロトタイプ / テストハーネス Python JSON references/2-python-client.md
本番用 Python プロデューサー Python Protobuf references/2-python-client.md + references/4-protobuf-schema.md
JVM マイクロサービス Java Protobuf references/3-multilanguage-clients.md
Go サービス Go JSON または Protobuf references/3-multilanguage-clients.md
Node.js / TypeScript アプリ TypeScript JSON references/3-multilanguage-clients.md
高パフォーマンスシステムサービス Rust JSON または Protobuf references/3-multilanguage-clients.md
UC テーブルからのスキーマ生成 任意 Protobuf references/4-protobuf-schema.md
リトライ / 再接続ロジック 任意 任意 references/5-operations-and-limits.md

言語が指定されていない場合は、Python をデフォルトとして使用してください。


共通ライブラリ

ZeroBus によるデータ取り込みに必要な主要ライブラリは以下の通りです:

  • databricks-sdk>=0.85.0: 認証およびメタデータ取得のための Databricks ワークスペースクライアント
  • databricks-zerobus-ingest-sdk>=1.0.0: 高パフォーマンスなストリーミング取り込みのための ZeroBus SDK
  • grpcio-tools

これらは通常 Databricks にプリインストールされていません。execute_code ツールを使用してインストールしてください:

  • code: "%pip install databricks-sdk>=VERSION databricks-zerobus-ingest-sdk>=VERSION"

返却された cluster_idcontext_id は後続の呼び出しのために保存してください。

スマートインストール方法

# まず protobuf バージョンを確認し、互換性のある grpcio-tools をインストール
import google.protobuf
runtime_version = google.protobuf.__version__
print(f"Runtime protobuf version: {runtime_version}")

if runtime_version.startswith("5.26") or runtime_version.startswith("5.29"):
    %pip install grpcio-tools==1.62.0
else:
    %pip install grpcio-tools  # 新しい protobuf バージョンには最新版を使用

前提条件

以下のオブジェクトが有効であることを確認せずに、このスキルを実行してはなりません:

  1. Unity Catalog マネージド Delta テーブル — 取り込み先のテーブル
  2. サービスプリンシパルの ID とシークレット — ターゲットテーブルに対する MODIFY および SELECT 権限を持つもの
  3. Zerobus サーバーエンドポイント — ワークスペースのリージョンに対応したもの
  4. Zerobus Ingest SDK — ターゲット言語向けにインストール済みであること

完全なセットアップ手順については references/1-setup-and-authentication.md を参照してください。


Python 最小構成例(JSON)

import json
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties

sdk = ZerobusSdk(server_endpoint, workspace_url)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
table_props = TableProperties(table_name)

stream = sdk.create_stream(client_id, client_secret, table_props, options)
try:
    record = {"device_name": "sensor-1", "temp": 22, "humidity": 55}
    stream.ingest_record(json.dumps(record))
    stream.flush()
finally:
    stream.close()

詳細ガイド

トピック ファイル 参照するタイミング
セットアップ & 認証 references/1-setup-and-authentication.md エンドポイントの形式、サービスプリンシパル、SDK インストール
Python クライアント references/2-python-client.md 同期/非同期 Python、JSON と Protobuf のフロー、再利用可能なクライアントクラス
マルチ言語 references/3-multilanguage-clients.md Java、Go、TypeScript、Rust の SDK サンプル
Protobuf スキーマ references/4-protobuf-schema.md UC テーブルからの .proto 生成、コンパイル、型マッピング
運用 & 制限事項 references/5-operations-and-limits.md ACK 処理、リトライ、再接続、スループット制限、制約事項

ワークフローに定められたすべてのステップに必ず従ってください。

ワークフロー

  1. 実行計画を表示する
  2. クライアントの種類を決定する
  3. スキーマを取得する — 必ず references/4-protobuf-schema.md を使用すること
  4. Python コードをローカルファイルに書き出す — 該当ガイドの指示に従う(例: scripts/zerobus_ingest.py
  5. ワークスペースにアップロードする: databricks workspace import-dir ./scripts /Workspace/Users/<user>/scripts
  6. Databricks 上で実行する — ジョブまたはノートブックを使用
  7. 実行に失敗した場合: ローカルファイルを編集し、再アップロードして再実行する

重要事項

  • ローカルパッケージをインストールしないこと
  • サーバーレスの制限: Zerobus SDK はサーバーレスコンピュート上で pip インストールできません。クラシックコンピュートクラスターを使用するか、SDK を使わないノートブックベースの取り込みには Zerobus REST API(Beta)を使用してください。
  • テーブルへの明示的な権限付与: サービスプリンシパルには、ターゲットテーブルに対して MODIFY および SELECT を明示的に付与する必要があります。スキーマレベルの継承権限は、authorization_details OAuth フローには不十分な場合があります。

実行ワークフロー

ステップ 1: コードをワークスペースにアップロード

databricks workspace import-dir ./scripts /Workspace/Users/<user>/scripts

ステップ 2: ジョブを作成して実行

databricks jobs create --json '{
  "name": "zerobus-ingest",
  "tasks": [{
    "task_key": "ingest",
    "spark_python_task": {
      "python_file": "/Workspace/Users/<user>/scripts/zerobus_ingest.py"
    },
    "new_cluster": {
      "spark_version": "16.1.x-scala2.12",
      "node_type_id": "i3.xlarge",
      "num_workers": 0
    }
  }]
}'

databricks jobs run-now JOB_ID

実行に失敗した場合:

  1. ジョブ実行出力からエラー内容を確認する
  2. ローカルの Python ファイルを編集して問題を修正する
  3. 再アップロード: databricks workspace import-dir ./scripts /Workspace/Users/<user>/scripts
  4. 再実行: databricks jobs run-now JOB_ID

ライブラリのインストール

Databricks はデフォルトで Spark、pandas、numpy、および一般的なデータライブラリを提供しています。 import エラーが発生した場合にのみ、ライブラリをインストールしてください。

ジョブ設定に以下を追加します:

"libraries": [
  {"pypi": {"package": "databricks-zerobus-ingest-sdk>=1.0.0"}}
]

または、クラスター設定の init スクリプトを使用してください。


🚨 重要な知見: タイムスタンプ形式の修正

重要: ZeroBus では、タイムスタンプフィールドに文字列型ではなく Unix 整数タイムスタンプ を使用する必要があります。 Databricks 向けには、タイムスタンプをマイクロ秒単位で生成する必要があります。


主要概念

  • gRPC + Protobuf: Zerobus はトランスポートプロトコルとして gRPC を使用します。gRPC で通信し Protobuf メッセージを構築できるアプリケーションであれば、Zerobus にデータを送信できます。
  • JSON または Protobuf シリアライゼーション: クイックスタートには JSON を、型安全性・前方互換性・パフォーマンスが必要な場合は Protobuf を使用します。
  • At-least-once 配信: コネクターは at-least-once(少なくとも1回)の配信保証を提供します。コンシューマーは重複処理に対応した設計にしてください。
  • 耐久性 ACK: 取り込まれた各レコードは RecordAcknowledgment を返します。バッファリングされたすべてのレコードを確実に書き込むには flush() を使用し、オフセットベースの追跡には wait_for_offset(offset) を使用します。
  • テーブル管理なし: Zerobus はテーブルの作成や変更を行いません。ターゲットテーブルの事前作成とスキーマ変更の管理は利用者自身が行う必要があります。
  • シングル AZ 耐久性: サービスは単一のアベイラビリティーゾーンで稼働します。ゾーン障害が発生する可能性を考慮して設計してください。

よくある問題

問題 解決策
接続拒否 サーバーエンドポイントの形式がご利用のクラウド(AWS vs Azure)と一致しているか確認してください。ファイアウォールの許可リストも確認してください。
認証失敗 サービスプリンシパルの client_id/secret を確認してください。ターゲットテーブルへの GRANT ステートメントを検証してください。
スキーマ不一致 レコードのフィールドがターゲットテーブルのスキーマと完全に一致しているか確認してください。テーブルが変更されている場合は .proto を再生成してください。
ストリームが予期せず閉じた 指数バックオフによるリトライとストリームの再初期化を実装してください。references/5-operations-and-limits.md を参照してください。
スループット制限超過 ストリームあたり最大 100 MB/s、15,000 行/秒です。複数のストリームを開くか、Databricks にお問い合わせください。
リージョン未対応 references/5-operations-and-limits.md でサポートリージョンを確認してください。
テーブルが見つからない テーブルがサポート対象リージョン内のマネージド Delta テーブルであり、正しい3部構成の名前(カタログ.スキーマ.テーブ
原文(English)を表示

Zerobus Ingest

Build clients that ingest data directly into Databricks Delta tables via the Zerobus gRPC API.

Status: GA (Generally Available since February 2026; billed under Lakeflow Jobs Serverless SKU)

Documentation:


What Is Zerobus Ingest?

Zerobus Ingest is a serverless connector that enables direct, record-by-record data ingestion into Delta tables via gRPC. It eliminates the need for message bus infrastructure (Kafka, Kinesis, Event Hub) for lakehouse-bound data. The service validates schemas, materializes data to target tables, and sends durability acknowledgments back to the client.

Core pattern: SDK init -> create stream -> ingest records -> handle ACKs -> flush -> close


Quick Decision: What Are You Building?

Scenario Language Serialization Reference
Quick prototype / test harness Python JSON references/2-python-client.md
Production Python producer Python Protobuf references/2-python-client.md + references/4-protobuf-schema.md
JVM microservice Java Protobuf references/3-multilanguage-clients.md
Go service Go JSON or Protobuf references/3-multilanguage-clients.md
Node.js / TypeScript app TypeScript JSON references/3-multilanguage-clients.md
High-performance system service Rust JSON or Protobuf references/3-multilanguage-clients.md
Schema generation from UC table Any Protobuf references/4-protobuf-schema.md
Retry / reconnection logic Any Any references/5-operations-and-limits.md

If not specified, default to python.


Common Libraries

These libraries are essential for ZeroBus data ingestion:

  • databricks-sdk>=0.85.0: Databricks workspace client for authentication and metadata
  • databricks-zerobus-ingest-sdk>=1.0.0: ZeroBus SDK for high-performance streaming ingestion
  • grpcio-tools These are typically NOT pre-installed on Databricks. Install them using execute_code tool:
  • code: "%pip install databricks-sdk>=VERSION databricks-zerobus-ingest-sdk>=VERSION"

Save the returned cluster_id and context_id for subsequent calls.

Smart Installation Approach

Check protobuf version first, then install compatible

grpcio-tools import google.protobuf runtime_version = google.protobuf.version print(f"Runtime protobuf version: {runtime_version}")

if runtime_version.startswith("5.26") or runtime_version.startswith("5.29"): %pip install grpcio-tools==1.62.0 else: %pip install grpcio-tools # Use latest for newer protobuf versions

Prerequisites

You must never execute the skill without confirming the below objects are valid:

  1. A Unity Catalog managed Delta table to ingest into
  2. A service principal id and secret with MODIFY and SELECT on the target table
  3. The Zerobus server endpoint for your workspace region
  4. The Zerobus Ingest SDK installed for your target language

See references/1-setup-and-authentication.md for complete setup instructions.


Minimal Python Example (JSON)

import json
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties

sdk = ZerobusSdk(server_endpoint, workspace_url)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
table_props = TableProperties(table_name)

stream = sdk.create_stream(client_id, client_secret, table_props, options)
try:
    record = {"device_name": "sensor-1", "temp": 22, "humidity": 55}
    stream.ingest_record(json.dumps(record))
    stream.flush()
finally:
    stream.close()

Detailed guides

Topic File When to Read
Setup & Auth references/1-setup-and-authentication.md Endpoint formats, service principals, SDK install
Python Client references/2-python-client.md Sync/async Python, JSON and Protobuf flows, reusable client class
Multi-Language references/3-multilanguage-clients.md Java, Go, TypeScript, Rust SDK examples
Protobuf Schema references/4-protobuf-schema.md Generate .proto from UC table, compile, type mappings
Operations & Limits references/5-operations-and-limits.md ACK handling, retries, reconnection, throughput limits, constraints

You must always follow all the steps in the Workflow

Workflow

  1. Display the plan of your execution
  2. Determine the type of client
  3. Get schema Always use references/4-protobuf-schema.md
  4. Write Python code to a local file following the instructions in the relevant guide (e.g., scripts/zerobus_ingest.py)
  5. Upload to workspace: databricks workspace import-dir ./scripts /Workspace/Users/<user>/scripts
  6. Execute on Databricks using a job or notebook
  7. If execution fails: Edit the local file, re-upload, and re-execute

Important

  • Never install local packages
  • Serverless limitation: The Zerobus SDK cannot pip-install on serverless compute. Use classic compute clusters, or use the Zerobus REST API (Beta) for notebook-based ingestion without the SDK.
  • Explicit table grants: Service principals need explicit MODIFY and SELECT grants on the target table. Schema-level inherited permissions may not be sufficient for the authorization_details OAuth flow.

Execution Workflow

Step 1: Upload code to workspace

databricks workspace import-dir ./scripts /Workspace/Users/<user>/scripts

Step 2: Create and run a job

databricks jobs create --json '{
  "name": "zerobus-ingest",
  "tasks": [{
    "task_key": "ingest",
    "spark_python_task": {
      "python_file": "/Workspace/Users/<user>/scripts/zerobus_ingest.py"
    },
    "new_cluster": {
      "spark_version": "16.1.x-scala2.12",
      "node_type_id": "i3.xlarge",
      "num_workers": 0
    }
  }]
}'

databricks jobs run-now JOB_ID

If execution fails:

  1. Read the error from the job run output
  2. Edit the local Python file to fix the issue
  3. Re-upload: databricks workspace import-dir ./scripts /Workspace/Users/<user>/scripts
  4. Re-run: databricks jobs run-now JOB_ID

Installing Libraries

Databricks provides Spark, pandas, numpy, and common data libraries by default. Only install a library if you get an import error.

Add to the job configuration:

"libraries": [
  {"pypi": {"package": "databricks-zerobus-ingest-sdk>=1.0.0"}}
]

Or use init scripts in the cluster configuration.

🚨 Critical Learning: Timestamp Format Fix

BREAKTHROUGH: ZeroBus requires timestamp fields as Unix integer timestamps, NOT string timestamps. The timestamp generation must use microseconds for Databricks.


Key Concepts

  • gRPC + Protobuf: Zerobus uses gRPC as its transport protocol. Any application that can communicate via gRPC and construct Protobuf messages can produce to Zerobus.
  • JSON or Protobuf serialization: JSON for quick starts; Protobuf for type safety, forward compatibility, and performance.
  • At-least-once delivery: The connector provides at-least-once guarantees. Design consumers to handle duplicates.
  • Durability ACKs: Each ingested record returns a RecordAcknowledgment. Use flush() to ensure all buffered records are durably written, or use wait_for_offset(offset) for offset-based tracking.
  • No table management: Zerobus does not create or alter tables. You must pre-create your target table and manage schema evolution yourself.
  • Single-AZ durability: The service runs in a single availability zone. Plan for potential zone outages.

Common Issues

Issue Solution
Connection refused Verify server endpoint format matches your cloud (AWS vs Azure). Check firewall allowlists.
Authentication failed Confirm service principal client_id/secret. Verify GRANT statements on the target table.
Schema mismatch Ensure record fields match the target table schema exactly. Regenerate .proto if table changed.
Stream closed unexpectedly Implement retry with exponential backoff and stream reinitialization. See references/5-operations-and-limits.md.
Throughput limits hit Max 100 MB/s and 15,000 rows/s per stream. Open multiple streams or contact Databricks.
Region not supported Check supported regions in references/5-operations-and-limits.md.
Table not found Ensure table is a managed Delta table in a supported region with correct three-part name.
SDK install fails on serverless The Zerobus SDK cannot be pip-installed on serverless compute. Use classic compute clusters or the REST API (Beta) from notebooks.
Error 4024 / authorization_details Service principal lacks explicit table-level grants. Grant MODIFY and SELECT directly on the target table — schema-level inherited grants may be insufficient.

Related Skills

  • databricks-python-sdk - General SDK patterns and WorkspaceClient for table/schema management
  • databricks-pipelines - Downstream pipeline processing of ingested data
  • databricks-unity-catalog - Managing catalogs, schemas, and tables that Zerobus writes to
  • databricks-synthetic-data-gen - Generate test data to feed into Zerobus producers
  • databricks-core - CLI install, profile selection, authentication

Resources

原文・著作権は Anthropic および各プラグイン作者に帰属します。日本語訳は Claude API による自動翻訳です。