🔄aidp-streaming-kafka
- ソース
- GitHub で見る ↗
説明
OCI Streamingのストリームを、SparkStructured Streaming(Kafka互換)経由でAIDPノートブックから取得します。 次のような場合に使用: ユーザーがOCI Streaming、OCI上のKafka、ストリームプール、Structured Streaming について言及している場合、またはKafkaメッセージをSparkに読み込みたい場合。 認証はSASL/PLAINを使用し、OCI認証トークンによって行われます。 パターンはOracle AIDPの公式サンプルに準拠しています。
原文を表示
Consume an OCI Streaming stream from an AIDP notebook via Spark structured streaming (Kafka-compat). Use when the user mentions OCI Streaming, Kafka on OCI, stream pool, structured streaming, or wants to read Kafka messages into Spark. Auth is SASL/PLAIN with an OCI auth token. Pattern matches the official Oracle AIDP sample.
ユースケース
- ✓OCI Streamingのストリーム取得
- ✓Kafka互換でSparkに読み込む
- ✓AIDPノートブックでストリーム処理
- ✓OCI認証トークンで認証するとき
本文(日本語訳)
aidp-streaming-kafka — Spark Structured Streaming を使用した OCI Streaming
Oracle AIDP 公式サンプル
oracle-samples/oracle-aidp-samples → data-engineering/ingestion/Streaming/StreamingFromOCIStreamingService.ipynb
を踏襲した実装です。
次のような場合に使用
- AIDP ノートブックから OCI Streaming ストリーム(Kafka 互換)を消費したい場合
- ユーザーが「OCI Streaming」「Kafka on OCI」「stream pool」「structured streaming」「Kafka topic」に言及している場合
次のような場合は使用しない
- OCI Object Storage 上のファイルをバッチ読み込みする場合
→
spark.read.format("csv"|"parquet").load("oci://...")で対応可能なため、このスキルは不要です。 - Confluent や MSK など他の Kafka デプロイメントを使用する場合
→ 同じ Spark Kafka API が利用可能です。
bootstrap.serversを対象ブローカーに向け、OCI 固有のユーザー名形式はスキップしてください。
AIDP ノートブックの前提条件
- クラスター上に Spark Kafka コネクター(
spark-sql-kafka-0-10_<scala>:<spark>)が存在すること (AIDP のtpcdsクラスターにはデフォルトで含まれています) sys.pathにヘルパーが追加されていること- OCI Streaming の stream pool OCID とリージョン情報
- OCI 認証トークン(OCI コンソール: プロファイル → 認証トークン → トークンの生成) 有効期限は 1 時間 です。それを超えるジョブの実行前に必ず更新してください。
- Volumes にマウントされたチェックポイントの保存先
(
/Volumes/<catalog>/<schema>/<volume>/_checkpoints/...)/Workspace/...は使用しないでください — Streaming エンジンがサイレントに失敗します。 誤ったパスを指定した場合、ヘルパーのvalidate_checkpoint_path()が明確なValueErrorを送出します。
認証: SASL/PLAIN と OCI 認証トークン
import os
from oracle_ai_data_platform_connectors.streaming import (
bootstrap_for_region, build_kafka_options_sasl_plain,
validate_checkpoint_path,
)
# ブートストラップ設定。
# 汎用リージョン形式(デフォルト)またはセルプレフィックス形式(OCI コンソールの
# "messages-endpoint" 形式に合致)のいずれかを、stream pool の表示に合わせて選択します:
bootstrap = bootstrap_for_region(os.environ["OCI_REGION"]) # streaming.<region>...:9092
# bootstrap = bootstrap_for_region(os.environ["OCI_REGION"], cell=1) # cell-1.streaming.<region>...:9092
opts = build_kafka_options_sasl_plain(
bootstrap_servers=bootstrap,
tenancy_name=os.environ["OCI_TENANCY_NAME"], # 表示名(OCID ではありません)
username=os.environ["OCI_USERNAME"], # OCI ユーザー名。IAM Domains の場合は
# "oracleidentitycloudservice/<email>" を使用
stream_pool_ocid=os.environ["OCI_STREAM_POOL_OCID"],
auth_token=os.environ["OCI_AUTH_TOKEN"], # 有効期限 1h — 長時間ジョブの前に更新
topic=os.environ["KAFKA_TOPIC"],
starting_offsets="latest", # バックフィルする場合は "earliest" を指定
# オプション調整パラメーター(公式サンプルに準拠):
max_partition_fetch_bytes=1024 * 1024,
max_offsets_per_trigger=5, # マイクロバッチあたりの行数を制限(デモ用)
)
raw = spark.readStream.format("kafka").options(**opts).load()
# ストリーム開始前にチェックポイントパスを検証(FUSE のサイレント失敗を防ぎます)
checkpoint = validate_checkpoint_path(os.environ["KAFKA_CHECKPOINT_VOLUME"])
sink_path = os.environ["KAFKA_SINK_VOLUME"] # 例: /Volumes/default/default/streaming/kafkaStreamingSink
# 公式サンプルに準拠: /Volumes/ 配下の Delta シンクに書き込みます。
query = (
raw.writeStream
.queryName("OCIStreamingSource")
.format("delta")
.option("checkpointLocation", checkpoint)
.start(sink_path)
)
query.awaitTermination(timeout=120)
print("input rows in last batch:", (query.lastProgress or {}).get("numInputRows"))
既存のトピックに対して print 形式でインラインテストを行う場合:
out_df = raw.selectExpr("CAST(key AS STRING) AS k", "CAST(value AS STRING) AS v",
"topic", "partition", "offset")
q = (out_df.writeStream.format("memory").queryName("kafka_test")
.option("checkpointLocation", checkpoint)
.trigger(processingTime="5 seconds").start())
q.awaitTermination(timeout=60)
spark.sql("SELECT * FROM kafka_test").show()
q.stop()
ユーザー名の形式(最も多いハマりポイント)
OCI Streaming の Kafka SASL ユーザー名の形式は
<tenancy_name>/<user>/<stream_pool_ocid> です。
中間セグメントはテナントの種類によって異なります:
| テナントの種類 | username 引数に指定する値 |
|---|---|
| レガシー IAM | <email> |
| IAM Domains(モダン) | oracleidentitycloudservice/<email> |
oci iam user list でユーザーが oracleidentitycloudservice/... プレフィックス付きで表示される場合は、プレフィックス付きの形式を使用してください。
注意点・落とし穴
-
チェックポイントパス —
/Volumes/...形式でなければなりません。/Workspace/...やoci://...を渡すとvalidate_checkpoint_path()ヘルパーがValueErrorを送出します。 AIDP における「ストリームは動いているのにデータが来ない」という問い合わせの原因 第 1 位 です。 -
認証トークンの有効期限は 1 時間。 長時間ジョブでは、チェックポイントへの保存 → ストリームの停止 → トークンの更新 → チェックポイントからの再起動、という手順を計画してください。 RP ベースの Kafka SASL(
com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule)は AIDP プラットフォームレベルでブロックされています(RP トークンは提供されません)。 -
ユーザー名の形式 — テナントの 名前(表示名)を使用し、テナントの OCID は使用しないでください。 IAM Domains ユーザーは
oracleidentitycloudservice/プレフィックスが必要です。 -
Streaming ジョブは無期限に実行され続けます。 Streaming クエリが開始されると、AIDP ワークフローのタイムアウト設定は適用されません。 ラッピングジョブに
Max Concurrent Runs = 1を設定してください。 -
ブートストラップホスト — OCI コンソールの stream pool 詳細ページには、
https://cell-1.streaming.<region>.oci.oraclecloud.comのような "messages-endpoint" が表示されます。 Kafka レイヤーではstreaming.<region>...形式とcell-N.streaming.<region>...形式のどちらも使用できます。
参考情報
- ヘルパー: scripts/oracle_ai_data_platform_connectors/streaming/kafka.py
- Oracle AIDP 公式サンプル: StreamingFromOCIStreamingService.ipynb
- OCI Streaming Kafka 互換ドキュメント: https://docs.oracle.com/en-us/iaas/Content/Streaming/Tasks/kafkacompatibility_topic-Configuration.htm
原文(English)を表示
aidp-streaming-kafka — OCI Streaming via Spark structured streaming
Mirrors the official Oracle AIDP sample at oracle-samples/oracle-aidp-samples → data-engineering/ingestion/Streaming/StreamingFromOCIStreamingService.ipynb.
When to use
- User wants to consume an OCI Streaming stream (Kafka-compat) from an AIDP notebook.
- User mentions: "OCI Streaming", "Kafka on OCI", "stream pool", "structured streaming", "Kafka topic".
When NOT to use
- For batch reads of files in OCI Object Storage → standard
spark.read.format("csv"|"parquet").load("oci://...")is fine without this skill. - For other Kafka deployments (Confluent, MSK) — same Spark Kafka API works; just point
bootstrap.serversat the right broker and skip the OCI-specific username format.
Prerequisites in the AIDP notebook
- Spark Kafka connector on the cluster (
spark-sql-kafka-0-10_<scala>:<spark>— AIDP'stpcdscluster has this). - Helpers on
sys.path. - OCI Streaming stream pool OCID + region.
- An OCI auth token (Profile → Auth tokens → Generate Token in the OCI console). 1-hour TTL — refresh before any job that runs longer than that.
- A Volumes-mounted checkpoint location (
/Volumes/<catalog>/<schema>/<volume>/_checkpoints/...). Do NOT use/Workspace/...— the streaming engine fails silently. The helper'svalidate_checkpoint_path()raises a clearValueErrorif you try.
Auth: SASL/PLAIN with OCI auth token
import os
from oracle_ai_data_platform_connectors.streaming import (
bootstrap_for_region, build_kafka_options_sasl_plain,
validate_checkpoint_path,
)
# Bootstrap. Either generic-regional (default) or cell-prefixed (matches OCI
# Console's "messages-endpoint" shape — pick whichever your stream pool shows):
bootstrap = bootstrap_for_region(os.environ["OCI_REGION"]) # streaming.<region>...:9092
# bootstrap = bootstrap_for_region(os.environ["OCI_REGION"], cell=1) # cell-1.streaming.<region>...:9092
opts = build_kafka_options_sasl_plain(
bootstrap_servers=bootstrap,
tenancy_name=os.environ["OCI_TENANCY_NAME"], # display name, NOT OCID
username=os.environ["OCI_USERNAME"], # OCI user; for IAM-Domains
# use "oracleidentitycloudservice/<email>"
stream_pool_ocid=os.environ["OCI_STREAM_POOL_OCID"],
auth_token=os.environ["OCI_AUTH_TOKEN"], # 1h TTL — refresh before long jobs
topic=os.environ["KAFKA_TOPIC"],
starting_offsets="latest", # or "earliest" for backfill
# Optional tuning (matches the official sample):
max_partition_fetch_bytes=1024 * 1024,
max_offsets_per_trigger=5, # cap rows per micro-batch (demo-friendly)
)
raw = spark.readStream.format("kafka").options(**opts).load()
# Validate checkpoint path BEFORE starting (saves you from silent FUSE failures)
checkpoint = validate_checkpoint_path(os.environ["KAFKA_CHECKPOINT_VOLUME"])
sink_path = os.environ["KAFKA_SINK_VOLUME"] # e.g. /Volumes/default/default/streaming/kafkaStreamingSink
# Match the official sample: write to a Delta sink under /Volumes/.
query = (
raw.writeStream
.queryName("OCIStreamingSource")
.format("delta")
.option("checkpointLocation", checkpoint)
.start(sink_path)
)
query.awaitTermination(timeout=120)
print("input rows in last batch:", (query.lastProgress or {}).get("numInputRows"))
For an inline test against an existing topic with print-style output:
out_df = raw.selectExpr("CAST(key AS STRING) AS k", "CAST(value AS STRING) AS v",
"topic", "partition", "offset")
q = (out_df.writeStream.format("memory").queryName("kafka_test")
.option("checkpointLocation", checkpoint)
.trigger(processingTime="5 seconds").start())
q.awaitTermination(timeout=60)
spark.sql("SELECT * FROM kafka_test").show()
q.stop()
Username format (the most common gotcha)
OCI Streaming's Kafka SASL username is <tenancy_name>/<user>/<stream_pool_ocid>. The middle segment depends on tenancy type:
| Tenancy | username argument |
|---|---|
| Legacy IAM | <email> |
| IAM Domains (modern) | oracleidentitycloudservice/<email> |
If you oci iam user list shows the user with oracleidentitycloudservice/... prefix, use the prefixed form.
Gotchas
- Checkpoint path — must be
/Volumes/.... Thevalidate_checkpoint_path()helper raises aValueErrorif you pass/Workspace/...oroci://.... This is the #1 cause of "stream runs but no data appears" complaints in AIDP. - Auth token TTL = 1 hour. For longer runs, plan to checkpoint, stop the stream, refresh the token, restart from checkpoint. RP-based Kafka SASL (
com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule) is blocked at the AIDP platform level (RP tokens not provided). - Username format — tenancy name (display name), NOT tenancy OCID. IAM-Domains users need the
oracleidentitycloudservice/prefix. - Streaming jobs run forever. The AIDP workflow timeout doesn't apply once a streaming query is started. Set
Max Concurrent Runs = 1on the wrapping job. - Bootstrap host — the OCI Console's stream-pool detail page shows a "messages-endpoint" like
https://cell-1.streaming.<region>.oci.oraclecloud.com. Either form (streaming.<region>...orcell-N.streaming.<region>...) works for the Kafka layer.
References
- Helpers: scripts/oracle_ai_data_platform_connectors/streaming/kafka.py
- Official Oracle AIDP sample: StreamingFromOCIStreamingService.ipynb
- OCI Streaming Kafka compat docs: https://docs.oracle.com/en-us/iaas/Content/Streaming/Tasks/kafkacompatibility_topic-Configuration.htm
原文・著作権は Anthropic および各プラグイン作者に帰属します。日本語訳は Claude API による自動翻訳です。