📊databricks-zerobus-ingest
- プラグイン
- databricks
- ソース
- GitHub で見る ↗
説明
gRPC を使用して Databricks Delta テーブルへのニアリアルタイムデータ取り込みを行う、Zerobus Ingest クライアントを構築します。 次のような場合に使用: - メッセージバスを介さず Unity Catalog テーブルに直接書き込むプロデューサーを作成する場合 - Python / Java / Go / TypeScript / Rust で Zerobus Ingest SDK を使用する場合 - UC テーブルから Protobuf スキーマを生成する場合 - ACK ハンドリングおよびリトライロジックを含むストリームベースの取り込みを実装する場合
原文を表示
Build Zerobus Ingest clients for near real-time data ingestion into Databricks Delta tables via gRPC. Use when creating producers that write directly to Unity Catalog tables without a message bus, working with the Zerobus Ingest SDK in Python/Java/Go/TypeScript/Rust, generating Protobuf schemas from UC tables, or implementing stream-based ingestion with ACK handling and retry logic.
ユースケース
- ✓メッセージバスを介さず直接書き込む
- ✓複数言語でSDKを使用する
- ✓UC テーブルから Protobuf スキーマを生成
- ✓ストリームベースの取り込みを実装
本文(日本語訳)
Zerobus Ingest
Zerobus gRPC API を介して、データを Databricks Delta テーブルに直接取り込むクライアントを構築します。
ステータス: GA(2026年2月より一般提供開始。Lakeflow Jobs Serverless SKU として課金)
ドキュメント:
Zerobus Ingest とは?
Zerobus Ingest は、gRPC を介して Delta テーブルへのレコード単位の直接データ取り込みを実現する、サーバーレスコネクターです。 レイクハウスへのデータ投入において、メッセージバスインフラ(Kafka、Kinesis、Event Hub)が不要になります。 このサービスはスキーマの検証、ターゲットテーブルへのデータのマテリアライズ、およびクライアントへの耐久性確認(ACK)の返送を行います。
基本パターン: SDK 初期化 → ストリーム作成 → レコード取り込み → ACK 処理 → フラッシュ → クローズ
判断の早見表: 何を構築しますか?
| シナリオ | 言語 | シリアライゼーション | 参考資料 |
|---|---|---|---|
| クイックプロトタイプ / テストハーネス | Python | JSON | references/2-python-client.md |
| 本番用 Python プロデューサー | Python | Protobuf | references/2-python-client.md + references/4-protobuf-schema.md |
| JVM マイクロサービス | Java | Protobuf | references/3-multilanguage-clients.md |
| Go サービス | Go | JSON または Protobuf | references/3-multilanguage-clients.md |
| Node.js / TypeScript アプリ | TypeScript | JSON | references/3-multilanguage-clients.md |
| 高パフォーマンスシステムサービス | Rust | JSON または Protobuf | references/3-multilanguage-clients.md |
| UC テーブルからのスキーマ生成 | 任意 | Protobuf | references/4-protobuf-schema.md |
| リトライ / 再接続ロジック | 任意 | 任意 | references/5-operations-and-limits.md |
言語が指定されていない場合は、Python をデフォルトとして使用してください。
共通ライブラリ
ZeroBus によるデータ取り込みに必要な主要ライブラリは以下の通りです:
- databricks-sdk>=0.85.0: 認証およびメタデータ取得のための Databricks ワークスペースクライアント
- databricks-zerobus-ingest-sdk>=1.0.0: 高パフォーマンスなストリーミング取り込みのための ZeroBus SDK
- grpcio-tools
これらは通常 Databricks にプリインストールされていません。execute_code ツールを使用してインストールしてください:
code:"%pip install databricks-sdk>=VERSION databricks-zerobus-ingest-sdk>=VERSION"
返却された cluster_id と context_id は後続の呼び出しのために保存してください。
スマートインストール方法
# まず protobuf バージョンを確認し、互換性のある grpcio-tools をインストール
import google.protobuf
runtime_version = google.protobuf.__version__
print(f"Runtime protobuf version: {runtime_version}")
if runtime_version.startswith("5.26") or runtime_version.startswith("5.29"):
%pip install grpcio-tools==1.62.0
else:
%pip install grpcio-tools # 新しい protobuf バージョンには最新版を使用
前提条件
以下のオブジェクトが有効であることを確認せずに、このスキルを実行してはなりません:
- Unity Catalog マネージド Delta テーブル — 取り込み先のテーブル
- サービスプリンシパルの ID とシークレット — ターゲットテーブルに対する
MODIFYおよびSELECT権限を持つもの - Zerobus サーバーエンドポイント — ワークスペースのリージョンに対応したもの
- Zerobus Ingest SDK — ターゲット言語向けにインストール済みであること
完全なセットアップ手順については references/1-setup-and-authentication.md を参照してください。
Python 最小構成例(JSON)
import json
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
sdk = ZerobusSdk(server_endpoint, workspace_url)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
table_props = TableProperties(table_name)
stream = sdk.create_stream(client_id, client_secret, table_props, options)
try:
record = {"device_name": "sensor-1", "temp": 22, "humidity": 55}
stream.ingest_record(json.dumps(record))
stream.flush()
finally:
stream.close()
詳細ガイド
| トピック | ファイル | 参照するタイミング |
|---|---|---|
| セットアップ & 認証 | references/1-setup-and-authentication.md | エンドポイントの形式、サービスプリンシパル、SDK インストール |
| Python クライアント | references/2-python-client.md | 同期/非同期 Python、JSON と Protobuf のフロー、再利用可能なクライアントクラス |
| マルチ言語 | references/3-multilanguage-clients.md | Java、Go、TypeScript、Rust の SDK サンプル |
| Protobuf スキーマ | references/4-protobuf-schema.md | UC テーブルからの .proto 生成、コンパイル、型マッピング |
| 運用 & 制限事項 | references/5-operations-and-limits.md | ACK 処理、リトライ、再接続、スループット制限、制約事項 |
ワークフローに定められたすべてのステップに必ず従ってください。
ワークフロー
- 実行計画を表示する
- クライアントの種類を決定する
- スキーマを取得する — 必ず references/4-protobuf-schema.md を使用すること
- Python コードをローカルファイルに書き出す — 該当ガイドの指示に従う(例:
scripts/zerobus_ingest.py) - ワークスペースにアップロードする:
databricks workspace import-dir ./scripts /Workspace/Users/<user>/scripts - Databricks 上で実行する — ジョブまたはノートブックを使用
- 実行に失敗した場合: ローカルファイルを編集し、再アップロードして再実行する
重要事項
- ローカルパッケージをインストールしないこと
- サーバーレスの制限: Zerobus SDK はサーバーレスコンピュート上で pip インストールできません。クラシックコンピュートクラスターを使用するか、SDK を使わないノートブックベースの取り込みには Zerobus REST API(Beta)を使用してください。
- テーブルへの明示的な権限付与: サービスプリンシパルには、ターゲットテーブルに対して
MODIFYおよびSELECTを明示的に付与する必要があります。スキーマレベルの継承権限は、authorization_detailsOAuth フローには不十分な場合があります。
実行ワークフロー
ステップ 1: コードをワークスペースにアップロード
databricks workspace import-dir ./scripts /Workspace/Users/<user>/scripts
ステップ 2: ジョブを作成して実行
databricks jobs create --json '{
"name": "zerobus-ingest",
"tasks": [{
"task_key": "ingest",
"spark_python_task": {
"python_file": "/Workspace/Users/<user>/scripts/zerobus_ingest.py"
},
"new_cluster": {
"spark_version": "16.1.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 0
}
}]
}'
databricks jobs run-now JOB_ID
実行に失敗した場合:
- ジョブ実行出力からエラー内容を確認する
- ローカルの Python ファイルを編集して問題を修正する
- 再アップロード:
databricks workspace import-dir ./scripts /Workspace/Users/<user>/scripts - 再実行:
databricks jobs run-now JOB_ID
ライブラリのインストール
Databricks はデフォルトで Spark、pandas、numpy、および一般的なデータライブラリを提供しています。 import エラーが発生した場合にのみ、ライブラリをインストールしてください。
ジョブ設定に以下を追加します:
"libraries": [
{"pypi": {"package": "databricks-zerobus-ingest-sdk>=1.0.0"}}
]
または、クラスター設定の init スクリプトを使用してください。
🚨 重要な知見: タイムスタンプ形式の修正
重要: ZeroBus では、タイムスタンプフィールドに文字列型ではなく Unix 整数タイムスタンプ を使用する必要があります。 Databricks 向けには、タイムスタンプをマイクロ秒単位で生成する必要があります。
主要概念
- gRPC + Protobuf: Zerobus はトランスポートプロトコルとして gRPC を使用します。gRPC で通信し Protobuf メッセージを構築できるアプリケーションであれば、Zerobus にデータを送信できます。
- JSON または Protobuf シリアライゼーション: クイックスタートには JSON を、型安全性・前方互換性・パフォーマンスが必要な場合は Protobuf を使用します。
- At-least-once 配信: コネクターは at-least-once(少なくとも1回)の配信保証を提供します。コンシューマーは重複処理に対応した設計にしてください。
- 耐久性 ACK: 取り込まれた各レコードは
RecordAcknowledgmentを返します。バッファリングされたすべてのレコードを確実に書き込むにはflush()を使用し、オフセットベースの追跡にはwait_for_offset(offset)を使用します。 - テーブル管理なし: Zerobus はテーブルの作成や変更を行いません。ターゲットテーブルの事前作成とスキーマ変更の管理は利用者自身が行う必要があります。
- シングル AZ 耐久性: サービスは単一のアベイラビリティーゾーンで稼働します。ゾーン障害が発生する可能性を考慮して設計してください。
よくある問題
| 問題 | 解決策 |
|---|---|
| 接続拒否 | サーバーエンドポイントの形式がご利用のクラウド(AWS vs Azure)と一致しているか確認してください。ファイアウォールの許可リストも確認してください。 |
| 認証失敗 | サービスプリンシパルの client_id/secret を確認してください。ターゲットテーブルへの GRANT ステートメントを検証してください。 |
| スキーマ不一致 | レコードのフィールドがターゲットテーブルのスキーマと完全に一致しているか確認してください。テーブルが変更されている場合は .proto を再生成してください。 |
| ストリームが予期せず閉じた | 指数バックオフによるリトライとストリームの再初期化を実装してください。references/5-operations-and-limits.md を参照してください。 |
| スループット制限超過 | ストリームあたり最大 100 MB/s、15,000 行/秒です。複数のストリームを開くか、Databricks にお問い合わせください。 |
| リージョン未対応 | references/5-operations-and-limits.md でサポートリージョンを確認してください。 |
| テーブルが見つからない | テーブルがサポート対象リージョン内のマネージド Delta テーブルであり、正しい3部構成の名前(カタログ.スキーマ.テーブ |
原文(English)を表示
Zerobus Ingest
Build clients that ingest data directly into Databricks Delta tables via the Zerobus gRPC API.
Status: GA (Generally Available since February 2026; billed under Lakeflow Jobs Serverless SKU)
Documentation:
What Is Zerobus Ingest?
Zerobus Ingest is a serverless connector that enables direct, record-by-record data ingestion into Delta tables via gRPC. It eliminates the need for message bus infrastructure (Kafka, Kinesis, Event Hub) for lakehouse-bound data. The service validates schemas, materializes data to target tables, and sends durability acknowledgments back to the client.
Core pattern: SDK init -> create stream -> ingest records -> handle ACKs -> flush -> close
Quick Decision: What Are You Building?
| Scenario | Language | Serialization | Reference |
|---|---|---|---|
| Quick prototype / test harness | Python | JSON | references/2-python-client.md |
| Production Python producer | Python | Protobuf | references/2-python-client.md + references/4-protobuf-schema.md |
| JVM microservice | Java | Protobuf | references/3-multilanguage-clients.md |
| Go service | Go | JSON or Protobuf | references/3-multilanguage-clients.md |
| Node.js / TypeScript app | TypeScript | JSON | references/3-multilanguage-clients.md |
| High-performance system service | Rust | JSON or Protobuf | references/3-multilanguage-clients.md |
| Schema generation from UC table | Any | Protobuf | references/4-protobuf-schema.md |
| Retry / reconnection logic | Any | Any | references/5-operations-and-limits.md |
If not specified, default to python.
Common Libraries
These libraries are essential for ZeroBus data ingestion:
- databricks-sdk>=0.85.0: Databricks workspace client for authentication and metadata
- databricks-zerobus-ingest-sdk>=1.0.0: ZeroBus SDK for high-performance streaming ingestion
- grpcio-tools
These are typically NOT pre-installed on Databricks. Install them using
execute_codetool: code: "%pip install databricks-sdk>=VERSION databricks-zerobus-ingest-sdk>=VERSION"
Save the returned cluster_id and context_id for subsequent calls.
Smart Installation Approach
Check protobuf version first, then install compatible
grpcio-tools import google.protobuf runtime_version = google.protobuf.version print(f"Runtime protobuf version: {runtime_version}")
if runtime_version.startswith("5.26") or runtime_version.startswith("5.29"): %pip install grpcio-tools==1.62.0 else: %pip install grpcio-tools # Use latest for newer protobuf versions
Prerequisites
You must never execute the skill without confirming the below objects are valid:
- A Unity Catalog managed Delta table to ingest into
- A service principal id and secret with
MODIFYandSELECTon the target table - The Zerobus server endpoint for your workspace region
- The Zerobus Ingest SDK installed for your target language
See references/1-setup-and-authentication.md for complete setup instructions.
Minimal Python Example (JSON)
import json
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
sdk = ZerobusSdk(server_endpoint, workspace_url)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
table_props = TableProperties(table_name)
stream = sdk.create_stream(client_id, client_secret, table_props, options)
try:
record = {"device_name": "sensor-1", "temp": 22, "humidity": 55}
stream.ingest_record(json.dumps(record))
stream.flush()
finally:
stream.close()
Detailed guides
| Topic | File | When to Read |
|---|---|---|
| Setup & Auth | references/1-setup-and-authentication.md | Endpoint formats, service principals, SDK install |
| Python Client | references/2-python-client.md | Sync/async Python, JSON and Protobuf flows, reusable client class |
| Multi-Language | references/3-multilanguage-clients.md | Java, Go, TypeScript, Rust SDK examples |
| Protobuf Schema | references/4-protobuf-schema.md | Generate .proto from UC table, compile, type mappings |
| Operations & Limits | references/5-operations-and-limits.md | ACK handling, retries, reconnection, throughput limits, constraints |
You must always follow all the steps in the Workflow
Workflow
- Display the plan of your execution
- Determine the type of client
- Get schema Always use references/4-protobuf-schema.md
- Write Python code to a local file following the instructions in the relevant guide (e.g.,
scripts/zerobus_ingest.py) - Upload to workspace:
databricks workspace import-dir ./scripts /Workspace/Users/<user>/scripts - Execute on Databricks using a job or notebook
- If execution fails: Edit the local file, re-upload, and re-execute
Important
- Never install local packages
- Serverless limitation: The Zerobus SDK cannot pip-install on serverless compute. Use classic compute clusters, or use the Zerobus REST API (Beta) for notebook-based ingestion without the SDK.
- Explicit table grants: Service principals need explicit
MODIFYandSELECTgrants on the target table. Schema-level inherited permissions may not be sufficient for theauthorization_detailsOAuth flow.
Execution Workflow
Step 1: Upload code to workspace
databricks workspace import-dir ./scripts /Workspace/Users/<user>/scripts
Step 2: Create and run a job
databricks jobs create --json '{
"name": "zerobus-ingest",
"tasks": [{
"task_key": "ingest",
"spark_python_task": {
"python_file": "/Workspace/Users/<user>/scripts/zerobus_ingest.py"
},
"new_cluster": {
"spark_version": "16.1.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 0
}
}]
}'
databricks jobs run-now JOB_ID
If execution fails:
- Read the error from the job run output
- Edit the local Python file to fix the issue
- Re-upload:
databricks workspace import-dir ./scripts /Workspace/Users/<user>/scripts - Re-run:
databricks jobs run-now JOB_ID
Installing Libraries
Databricks provides Spark, pandas, numpy, and common data libraries by default. Only install a library if you get an import error.
Add to the job configuration:
"libraries": [
{"pypi": {"package": "databricks-zerobus-ingest-sdk>=1.0.0"}}
]
Or use init scripts in the cluster configuration.
🚨 Critical Learning: Timestamp Format Fix
BREAKTHROUGH: ZeroBus requires timestamp fields as Unix integer timestamps, NOT string timestamps. The timestamp generation must use microseconds for Databricks.
Key Concepts
- gRPC + Protobuf: Zerobus uses gRPC as its transport protocol. Any application that can communicate via gRPC and construct Protobuf messages can produce to Zerobus.
- JSON or Protobuf serialization: JSON for quick starts; Protobuf for type safety, forward compatibility, and performance.
- At-least-once delivery: The connector provides at-least-once guarantees. Design consumers to handle duplicates.
- Durability ACKs: Each ingested record returns a
RecordAcknowledgment. Useflush()to ensure all buffered records are durably written, or usewait_for_offset(offset)for offset-based tracking. - No table management: Zerobus does not create or alter tables. You must pre-create your target table and manage schema evolution yourself.
- Single-AZ durability: The service runs in a single availability zone. Plan for potential zone outages.
Common Issues
| Issue | Solution |
|---|---|
| Connection refused | Verify server endpoint format matches your cloud (AWS vs Azure). Check firewall allowlists. |
| Authentication failed | Confirm service principal client_id/secret. Verify GRANT statements on the target table. |
| Schema mismatch | Ensure record fields match the target table schema exactly. Regenerate .proto if table changed. |
| Stream closed unexpectedly | Implement retry with exponential backoff and stream reinitialization. See references/5-operations-and-limits.md. |
| Throughput limits hit | Max 100 MB/s and 15,000 rows/s per stream. Open multiple streams or contact Databricks. |
| Region not supported | Check supported regions in references/5-operations-and-limits.md. |
| Table not found | Ensure table is a managed Delta table in a supported region with correct three-part name. |
| SDK install fails on serverless | The Zerobus SDK cannot be pip-installed on serverless compute. Use classic compute clusters or the REST API (Beta) from notebooks. |
| Error 4024 / authorization_details | Service principal lacks explicit table-level grants. Grant MODIFY and SELECT directly on the target table — schema-level inherited grants may be insufficient. |
Related Skills
- databricks-python-sdk - General SDK patterns and WorkspaceClient for table/schema management
- databricks-pipelines - Downstream pipeline processing of ingested data
- databricks-unity-catalog - Managing catalogs, schemas, and tables that Zerobus writes to
- databricks-synthetic-data-gen - Generate test data to feed into Zerobus producers
- databricks-core - CLI install, profile selection, authentication
Resources
原文・著作権は Anthropic および各プラグイン作者に帰属します。日本語訳は Claude API による自動翻訳です。