claude-skills/

Anthropic公式スキル・プラグインの日本語ディレクトリ

last sync 22h ago
スキルOfficialdevelopment

🔄aidp-streaming-kafka

説明

OCI Streamingのストリームを、SparkStructured Streaming(Kafka互換)経由でAIDPノートブックから取得します。 次のような場合に使用: ユーザーがOCI Streaming、OCI上のKafka、ストリームプール、Structured Streaming について言及している場合、またはKafkaメッセージをSparkに読み込みたい場合。 認証はSASL/PLAINを使用し、OCI認証トークンによって行われます。 パターンはOracle AIDPの公式サンプルに準拠しています。

原文を表示

Consume an OCI Streaming stream from an AIDP notebook via Spark structured streaming (Kafka-compat). Use when the user mentions OCI Streaming, Kafka on OCI, stream pool, structured streaming, or wants to read Kafka messages into Spark. Auth is SASL/PLAIN with an OCI auth token. Pattern matches the official Oracle AIDP sample.

ユースケース

  • OCI Streamingのストリーム取得
  • Kafka互換でSparkに読み込む
  • AIDPノートブックでストリーム処理
  • OCI認証トークンで認証するとき

本文(日本語訳)

aidp-streaming-kafka — Spark Structured Streaming を使用した OCI Streaming

Oracle AIDP 公式サンプル oracle-samples/oracle-aidp-samples → data-engineering/ingestion/Streaming/StreamingFromOCIStreamingService.ipynb を踏襲した実装です。


次のような場合に使用

  • AIDP ノートブックから OCI Streaming ストリーム(Kafka 互換)を消費したい場合
  • ユーザーが「OCI Streaming」「Kafka on OCI」「stream pool」「structured streaming」「Kafka topic」に言及している場合

次のような場合は使用しない

  • OCI Object Storage 上のファイルをバッチ読み込みする場合 → spark.read.format("csv"|"parquet").load("oci://...") で対応可能なため、このスキルは不要です。
  • Confluent や MSK など他の Kafka デプロイメントを使用する場合 → 同じ Spark Kafka API が利用可能です。bootstrap.servers を対象ブローカーに向け、OCI 固有のユーザー名形式はスキップしてください。

AIDP ノートブックの前提条件

  1. クラスター上に Spark Kafka コネクター(spark-sql-kafka-0-10_<scala>:<spark>)が存在すること (AIDP の tpcds クラスターにはデフォルトで含まれています)
  2. sys.path にヘルパーが追加されていること
  3. OCI Streaming の stream pool OCID とリージョン情報
  4. OCI 認証トークン(OCI コンソール: プロファイル → 認証トークン → トークンの生成) 有効期限は 1 時間 です。それを超えるジョブの実行前に必ず更新してください。
  5. Volumes にマウントされたチェックポイントの保存先/Volumes/<catalog>/<schema>/<volume>/_checkpoints/.../Workspace/... は使用しないでください — Streaming エンジンがサイレントに失敗します。 誤ったパスを指定した場合、ヘルパーの validate_checkpoint_path() が明確な ValueError を送出します。

認証: SASL/PLAIN と OCI 認証トークン

import os
from oracle_ai_data_platform_connectors.streaming import (
    bootstrap_for_region, build_kafka_options_sasl_plain,
    validate_checkpoint_path,
)

# ブートストラップ設定。
# 汎用リージョン形式(デフォルト)またはセルプレフィックス形式(OCI コンソールの
# "messages-endpoint" 形式に合致)のいずれかを、stream pool の表示に合わせて選択します:
bootstrap = bootstrap_for_region(os.environ["OCI_REGION"])              # streaming.<region>...:9092
# bootstrap = bootstrap_for_region(os.environ["OCI_REGION"], cell=1)    # cell-1.streaming.<region>...:9092

opts = build_kafka_options_sasl_plain(
    bootstrap_servers=bootstrap,
    tenancy_name=os.environ["OCI_TENANCY_NAME"],     # 表示名(OCID ではありません)
    username=os.environ["OCI_USERNAME"],             # OCI ユーザー名。IAM Domains の場合は
                                                     # "oracleidentitycloudservice/<email>" を使用
    stream_pool_ocid=os.environ["OCI_STREAM_POOL_OCID"],
    auth_token=os.environ["OCI_AUTH_TOKEN"],         # 有効期限 1h — 長時間ジョブの前に更新
    topic=os.environ["KAFKA_TOPIC"],
    starting_offsets="latest",                       # バックフィルする場合は "earliest" を指定
    # オプション調整パラメーター(公式サンプルに準拠):
    max_partition_fetch_bytes=1024 * 1024,
    max_offsets_per_trigger=5,                       # マイクロバッチあたりの行数を制限(デモ用)
)

raw = spark.readStream.format("kafka").options(**opts).load()

# ストリーム開始前にチェックポイントパスを検証(FUSE のサイレント失敗を防ぎます)
checkpoint = validate_checkpoint_path(os.environ["KAFKA_CHECKPOINT_VOLUME"])
sink_path  = os.environ["KAFKA_SINK_VOLUME"]   # 例: /Volumes/default/default/streaming/kafkaStreamingSink

# 公式サンプルに準拠: /Volumes/ 配下の Delta シンクに書き込みます。
query = (
    raw.writeStream
       .queryName("OCIStreamingSource")
       .format("delta")
       .option("checkpointLocation", checkpoint)
       .start(sink_path)
)
query.awaitTermination(timeout=120)
print("input rows in last batch:", (query.lastProgress or {}).get("numInputRows"))

既存のトピックに対して print 形式でインラインテストを行う場合:

out_df = raw.selectExpr("CAST(key AS STRING) AS k", "CAST(value AS STRING) AS v",
                        "topic", "partition", "offset")
q = (out_df.writeStream.format("memory").queryName("kafka_test")
            .option("checkpointLocation", checkpoint)
            .trigger(processingTime="5 seconds").start())
q.awaitTermination(timeout=60)
spark.sql("SELECT * FROM kafka_test").show()
q.stop()

ユーザー名の形式(最も多いハマりポイント)

OCI Streaming の Kafka SASL ユーザー名の形式は <tenancy_name>/<user>/<stream_pool_ocid> です。 中間セグメントはテナントの種類によって異なります:

テナントの種類 username 引数に指定する値
レガシー IAM <email>
IAM Domains(モダン) oracleidentitycloudservice/<email>

oci iam user list でユーザーが oracleidentitycloudservice/... プレフィックス付きで表示される場合は、プレフィックス付きの形式を使用してください。


注意点・落とし穴

  • チェックポイントパス/Volumes/... 形式でなければなりません。 /Workspace/...oci://... を渡すと validate_checkpoint_path() ヘルパーが ValueError を送出します。 AIDP における「ストリームは動いているのにデータが来ない」という問い合わせの原因 第 1 位 です。

  • 認証トークンの有効期限は 1 時間。 長時間ジョブでは、チェックポイントへの保存 → ストリームの停止 → トークンの更新 → チェックポイントからの再起動、という手順を計画してください。 RP ベースの Kafka SASL(com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule)は AIDP プラットフォームレベルでブロックされています(RP トークンは提供されません)。

  • ユーザー名の形式 — テナントの 名前(表示名)を使用し、テナントの OCID は使用しないでください。 IAM Domains ユーザーは oracleidentitycloudservice/ プレフィックスが必要です。

  • Streaming ジョブは無期限に実行され続けます。 Streaming クエリが開始されると、AIDP ワークフローのタイムアウト設定は適用されません。 ラッピングジョブに Max Concurrent Runs = 1 を設定してください。

  • ブートストラップホスト — OCI コンソールの stream pool 詳細ページには、 https://cell-1.streaming.<region>.oci.oraclecloud.com のような "messages-endpoint" が表示されます。 Kafka レイヤーでは streaming.<region>... 形式と cell-N.streaming.<region>... 形式のどちらも使用できます。


参考情報

原文(English)を表示

aidp-streaming-kafka — OCI Streaming via Spark structured streaming

Mirrors the official Oracle AIDP sample at oracle-samples/oracle-aidp-samples → data-engineering/ingestion/Streaming/StreamingFromOCIStreamingService.ipynb.

When to use

  • User wants to consume an OCI Streaming stream (Kafka-compat) from an AIDP notebook.
  • User mentions: "OCI Streaming", "Kafka on OCI", "stream pool", "structured streaming", "Kafka topic".

When NOT to use

  • For batch reads of files in OCI Object Storage → standard spark.read.format("csv"|"parquet").load("oci://...") is fine without this skill.
  • For other Kafka deployments (Confluent, MSK) — same Spark Kafka API works; just point bootstrap.servers at the right broker and skip the OCI-specific username format.

Prerequisites in the AIDP notebook

  1. Spark Kafka connector on the cluster (spark-sql-kafka-0-10_<scala>:<spark> — AIDP's tpcds cluster has this).
  2. Helpers on sys.path.
  3. OCI Streaming stream pool OCID + region.
  4. An OCI auth token (Profile → Auth tokens → Generate Token in the OCI console). 1-hour TTL — refresh before any job that runs longer than that.
  5. A Volumes-mounted checkpoint location (/Volumes/<catalog>/<schema>/<volume>/_checkpoints/...). Do NOT use /Workspace/... — the streaming engine fails silently. The helper's validate_checkpoint_path() raises a clear ValueError if you try.

Auth: SASL/PLAIN with OCI auth token

import os
from oracle_ai_data_platform_connectors.streaming import (
    bootstrap_for_region, build_kafka_options_sasl_plain,
    validate_checkpoint_path,
)

# Bootstrap. Either generic-regional (default) or cell-prefixed (matches OCI
# Console's "messages-endpoint" shape — pick whichever your stream pool shows):
bootstrap = bootstrap_for_region(os.environ["OCI_REGION"])              # streaming.<region>...:9092
# bootstrap = bootstrap_for_region(os.environ["OCI_REGION"], cell=1)    # cell-1.streaming.<region>...:9092

opts = build_kafka_options_sasl_plain(
    bootstrap_servers=bootstrap,
    tenancy_name=os.environ["OCI_TENANCY_NAME"],     # display name, NOT OCID
    username=os.environ["OCI_USERNAME"],             # OCI user; for IAM-Domains
                                                     # use "oracleidentitycloudservice/<email>"
    stream_pool_ocid=os.environ["OCI_STREAM_POOL_OCID"],
    auth_token=os.environ["OCI_AUTH_TOKEN"],         # 1h TTL — refresh before long jobs
    topic=os.environ["KAFKA_TOPIC"],
    starting_offsets="latest",                       # or "earliest" for backfill
    # Optional tuning (matches the official sample):
    max_partition_fetch_bytes=1024 * 1024,
    max_offsets_per_trigger=5,                       # cap rows per micro-batch (demo-friendly)
)

raw = spark.readStream.format("kafka").options(**opts).load()

# Validate checkpoint path BEFORE starting (saves you from silent FUSE failures)
checkpoint = validate_checkpoint_path(os.environ["KAFKA_CHECKPOINT_VOLUME"])
sink_path  = os.environ["KAFKA_SINK_VOLUME"]   # e.g. /Volumes/default/default/streaming/kafkaStreamingSink

# Match the official sample: write to a Delta sink under /Volumes/.
query = (
    raw.writeStream
       .queryName("OCIStreamingSource")
       .format("delta")
       .option("checkpointLocation", checkpoint)
       .start(sink_path)
)
query.awaitTermination(timeout=120)
print("input rows in last batch:", (query.lastProgress or {}).get("numInputRows"))

For an inline test against an existing topic with print-style output:

out_df = raw.selectExpr("CAST(key AS STRING) AS k", "CAST(value AS STRING) AS v",
                        "topic", "partition", "offset")
q = (out_df.writeStream.format("memory").queryName("kafka_test")
            .option("checkpointLocation", checkpoint)
            .trigger(processingTime="5 seconds").start())
q.awaitTermination(timeout=60)
spark.sql("SELECT * FROM kafka_test").show()
q.stop()

Username format (the most common gotcha)

OCI Streaming's Kafka SASL username is <tenancy_name>/<user>/<stream_pool_ocid>. The middle segment depends on tenancy type:

Tenancy username argument
Legacy IAM <email>
IAM Domains (modern) oracleidentitycloudservice/<email>

If you oci iam user list shows the user with oracleidentitycloudservice/... prefix, use the prefixed form.

Gotchas

  • Checkpoint path — must be /Volumes/.... The validate_checkpoint_path() helper raises a ValueError if you pass /Workspace/... or oci://.... This is the #1 cause of "stream runs but no data appears" complaints in AIDP.
  • Auth token TTL = 1 hour. For longer runs, plan to checkpoint, stop the stream, refresh the token, restart from checkpoint. RP-based Kafka SASL (com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule) is blocked at the AIDP platform level (RP tokens not provided).
  • Username format — tenancy name (display name), NOT tenancy OCID. IAM-Domains users need the oracleidentitycloudservice/ prefix.
  • Streaming jobs run forever. The AIDP workflow timeout doesn't apply once a streaming query is started. Set Max Concurrent Runs = 1 on the wrapping job.
  • Bootstrap host — the OCI Console's stream-pool detail page shows a "messages-endpoint" like https://cell-1.streaming.<region>.oci.oraclecloud.com. Either form (streaming.<region>... or cell-N.streaming.<region>...) works for the Kafka layer.

References

原文・著作権は Anthropic および各プラグイン作者に帰属します。日本語訳は Claude API による自動翻訳です。