claude-skills/

Anthropic公式スキル・プラグインの日本語ディレクトリ

last sync 22h ago
スキルOfficialdevelopment

🧊aidp-iceberg

説明

OCI Object StorageをバックエンドとするApache IcebergテーブルをAIDPノートブックから読み書きします。 次のような場合に使用: ユーザーがIceberg、Apache Iceberg、タイムトラベル、スナップショット、スキーマエボリューション、パーティションエボリューション、またはデータレイクファイルへのACIDトランザクションについて言及している場合。 `oci://` 上のIceberg Hadoopカタログを使用しており、認証はワークスペースのIAMアイデンティティを通じて暗黙的に行われます。

原文を表示

Read and write Apache Iceberg tables backed by OCI Object Storage from an AIDP notebook. Use when the user mentions Iceberg, Apache Iceberg, time travel, snapshots, schema evolution, partition evolution, or wants ACID transactions on data lake files. Uses the Iceberg Hadoop catalog on `oci://` — auth is implicit via the workspace IAM identity.

ユースケース

  • Icebergテーブルの読み書き
  • タイムトラベル機能を使用する
  • スキーマエボリューション対応
  • データレイクのACIDトランザクション処理

本文(日本語訳)

aidp-iceberg — OCI Object Storage 上の Apache Iceberg

OCI Object Storage をウェアハウスとして、Iceberg テーブル(ACID、タイムトラベル、スキーマ進化、パーティションプルーニング)を管理します。 Iceberg の Hadoop カタログは、データと同じバケットにすべてのメタデータを格納するため、外部メタストアは不要です。

次のような場合に使用

  • OCI Object Storage 上の Iceberg テーブルを操作する場合
  • 「Iceberg」「time travel」「snapshots」「schema evolution」などのキーワードが含まれる場合

次のような場合には使用しない

  • oci:// 上のプレーンな CSV / Parquet / JSON ファイル(トランザクション / タイムトラベル不要)→ aidp-object-storage を使用してください
  • AWS / Azure 上の Iceberg テーブル → このスキルのカタログ設定を適宜変更してください。Hadoop カタログ自体はポータブルですが、バケット URI が異なります

カタログの初回登録

OCI_NAMESPACE = "<namespace>"
BUCKET_NAME   = "<bucket>"
WAREHOUSE     = f"oci://{BUCKET_NAME}@{OCI_NAMESPACE}/iceberg-warehouse"
CATALOG_NAME  = "oci_catalog"

spark.conf.set(f"spark.sql.catalog.{CATALOG_NAME}",            "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set(f"spark.sql.catalog.{CATALOG_NAME}.type",       "hadoop")
spark.conf.set(f"spark.sql.catalog.{CATALOG_NAME}.warehouse",  WAREHOUSE)

この設定以降、oci_catalog.<db>.<table> を参照するすべての SQL が Iceberg によって管理されます。

データベース+テーブルの作成

DB    = "demo_db"
TABLE = "employees"
FQN   = f"{CATALOG_NAME}.{DB}.{TABLE}"

spark.sql(f"CREATE DATABASE IF NOT EXISTS {CATALOG_NAME}.{DB}")
spark.sql(f"""
    CREATE TABLE {FQN} (
        employee_id   INT,
        employee_name STRING,
        salary        DOUBLE,
        department    STRING,
        hire_date     DATE
    )
    USING iceberg
    PARTITIONED BY (department)
""")

データの挿入(呼び出しごとに 1 件の ACID トランザクション = 1 スナップショット)

import pandas as pd
from datetime import date

pdf = pd.DataFrame([
    (101, "John Doe",       75000.0, "Engineering", date(2022, 1, 15)),
    (102, "Jane Smith",     85000.0, "Sales",       date(2021, 3, 20)),
], columns=["employee_id", "employee_name", "salary", "department", "hire_date"])

spark.createDataFrame(pdf).writeTo(FQN).append()

スキーマ進化(データの書き直しなし)

spark.sql(f"ALTER TABLE {FQN} ADD COLUMN location STRING")
# 既存行の新カラムは NULL となり、エラーは発生しません

タイムトラベル

snaps = spark.sql(f"""
    SELECT snapshot_id, committed_at, operation
    FROM {FQN}.snapshots
    ORDER BY committed_at
""").collect()
first = snaps[0].snapshot_id
spark.sql(f"SELECT * FROM {FQN} VERSION AS OF {first}").show()

物理ファイルの確認

spark.sql(f"""
    SELECT file_path, file_format, record_count, file_size_in_bytes
    FROM {FQN}.files
""").show(truncate=False)

注意事項

  • 認証は暗黙的aidp-object-storage と同様です。ワークスペースの IAM ID が Object Storage への読み書きを行います。キーの設定は不要です。
  • Hadoop カタログはメタデータをバケット内に格納します。 スナップショット、スキーマバージョン、マニフェストはすべて <warehouse>/<db>/<table>/metadata/ 以下に配置されます。Hive メタストア、Glue、JDBC カタログは不要です。
  • CREATE TABLE には USING iceberg が必須です。 指定しない場合、Spark はデフォルトの V1 ファイルソースを使用するため、ACID が無効になります。
  • タイムトラベルにはスナップショット ID が必要です。 長い保持期間を設定しておくと便利です。Iceberg はテーブルプロパティ(history.expire.max-snapshot-age-ms)に基づいてスナップショットを期限切れにするため、長期的なタイムトラベルが必要な場合はこの値を適切に設定してください。
  • パーティションプルーニングは、パーティションカラム(例では department)を使った絞り込み条件があるクエリで有効になります。条件がない場合、Iceberg はすべてのファイルを読み込みますが、並列で処理されます。

参考資料

原文(English)を表示

aidp-iceberg — Apache Iceberg on OCI Object Storage

Manage Iceberg tables (ACID, time travel, schema evolution, partition pruning) backed by OCI Object Storage as the warehouse. The Iceberg Hadoop catalog stores all metadata in the same bucket as data — no external metastore.

When to use

  • Iceberg tables on OCI Object Storage.
  • Mentioned: "Iceberg", "time travel", "snapshots", "schema evolution".

When NOT to use

  • For raw CSV/Parquet/JSON files in oci:// (no transactions / time-travel) → aidp-object-storage.
  • For Iceberg tables on AWS / Azure → adapt this skill's catalog config; the Hadoop catalog is portable but the bucket URI changes.

One-time catalog registration

OCI_NAMESPACE = "<namespace>"
BUCKET_NAME   = "<bucket>"
WAREHOUSE     = f"oci://{BUCKET_NAME}@{OCI_NAMESPACE}/iceberg-warehouse"
CATALOG_NAME  = "oci_catalog"

spark.conf.set(f"spark.sql.catalog.{CATALOG_NAME}",            "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set(f"spark.sql.catalog.{CATALOG_NAME}.type",       "hadoop")
spark.conf.set(f"spark.sql.catalog.{CATALOG_NAME}.warehouse",  WAREHOUSE)

After this, all SQL referring to oci_catalog.<db>.<table> is Iceberg-managed.

Create database + table

DB    = "demo_db"
TABLE = "employees"
FQN   = f"{CATALOG_NAME}.{DB}.{TABLE}"

spark.sql(f"CREATE DATABASE IF NOT EXISTS {CATALOG_NAME}.{DB}")
spark.sql(f"""
    CREATE TABLE {FQN} (
        employee_id   INT,
        employee_name STRING,
        salary        DOUBLE,
        department    STRING,
        hire_date     DATE
    )
    USING iceberg
    PARTITIONED BY (department)
""")

Insert (each call is one ACID transaction = one snapshot)

import pandas as pd
from datetime import date

pdf = pd.DataFrame([
    (101, "John Doe",       75000.0, "Engineering", date(2022, 1, 15)),
    (102, "Jane Smith",     85000.0, "Sales",       date(2021, 3, 20)),
], columns=["employee_id", "employee_name", "salary", "department", "hire_date"])

spark.createDataFrame(pdf).writeTo(FQN).append()

Schema evolution (no rewrite)

spark.sql(f"ALTER TABLE {FQN} ADD COLUMN location STRING")
# Old rows show NULL for the new column; no errors.

Time travel

snaps = spark.sql(f"""
    SELECT snapshot_id, committed_at, operation
    FROM {FQN}.snapshots
    ORDER BY committed_at
""").collect()
first = snaps[0].snapshot_id
spark.sql(f"SELECT * FROM {FQN} VERSION AS OF {first}").show()

Inspect physical files

spark.sql(f"""
    SELECT file_path, file_format, record_count, file_size_in_bytes
    FROM {FQN}.files
""").show(truncate=False)

Gotchas

  • Auth is implicit — same as aidp-object-storage. The workspace IAM identity reads/writes Object Storage. No keys.
  • Hadoop catalog stores metadata IN the bucket. Snapshots, schema versions, manifests all sit under <warehouse>/<db>/<table>/metadata/. There is no Hive metastore, no Glue, no JDBC catalog.
  • USING iceberg is required in CREATE TABLE; otherwise Spark uses the default V1 file source and you lose ACID.
  • Time-travel requires the snapshot ID — keeping a long retention helps. Iceberg expires snapshots based on table properties (history.expire.max-snapshot-age-ms); set this if long-term time travel matters.
  • Partition pruning kicks in for queries with predicates on the partition column (department in the example). Without that predicate Iceberg still reads all files but in parallel.

References

原文・著作権は Anthropic および各プラグイン作者に帰属します。日本語訳は Claude API による自動翻訳です。