🧊aidp-iceberg
- ソース
- GitHub で見る ↗
説明
OCI Object StorageをバックエンドとするApache IcebergテーブルをAIDPノートブックから読み書きします。 次のような場合に使用: ユーザーがIceberg、Apache Iceberg、タイムトラベル、スナップショット、スキーマエボリューション、パーティションエボリューション、またはデータレイクファイルへのACIDトランザクションについて言及している場合。 `oci://` 上のIceberg Hadoopカタログを使用しており、認証はワークスペースのIAMアイデンティティを通じて暗黙的に行われます。
原文を表示
Read and write Apache Iceberg tables backed by OCI Object Storage from an AIDP notebook. Use when the user mentions Iceberg, Apache Iceberg, time travel, snapshots, schema evolution, partition evolution, or wants ACID transactions on data lake files. Uses the Iceberg Hadoop catalog on `oci://` — auth is implicit via the workspace IAM identity.
ユースケース
- ✓Icebergテーブルの読み書き
- ✓タイムトラベル機能を使用する
- ✓スキーマエボリューション対応
- ✓データレイクのACIDトランザクション処理
本文(日本語訳)
aidp-iceberg — OCI Object Storage 上の Apache Iceberg
OCI Object Storage をウェアハウスとして、Iceberg テーブル(ACID、タイムトラベル、スキーマ進化、パーティションプルーニング)を管理します。 Iceberg の Hadoop カタログは、データと同じバケットにすべてのメタデータを格納するため、外部メタストアは不要です。
次のような場合に使用
- OCI Object Storage 上の Iceberg テーブルを操作する場合
- 「Iceberg」「time travel」「snapshots」「schema evolution」などのキーワードが含まれる場合
次のような場合には使用しない
oci://上のプレーンな CSV / Parquet / JSON ファイル(トランザクション / タイムトラベル不要)→aidp-object-storageを使用してください- AWS / Azure 上の Iceberg テーブル → このスキルのカタログ設定を適宜変更してください。Hadoop カタログ自体はポータブルですが、バケット URI が異なります
カタログの初回登録
OCI_NAMESPACE = "<namespace>"
BUCKET_NAME = "<bucket>"
WAREHOUSE = f"oci://{BUCKET_NAME}@{OCI_NAMESPACE}/iceberg-warehouse"
CATALOG_NAME = "oci_catalog"
spark.conf.set(f"spark.sql.catalog.{CATALOG_NAME}", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set(f"spark.sql.catalog.{CATALOG_NAME}.type", "hadoop")
spark.conf.set(f"spark.sql.catalog.{CATALOG_NAME}.warehouse", WAREHOUSE)
この設定以降、oci_catalog.<db>.<table> を参照するすべての SQL が Iceberg によって管理されます。
データベース+テーブルの作成
DB = "demo_db"
TABLE = "employees"
FQN = f"{CATALOG_NAME}.{DB}.{TABLE}"
spark.sql(f"CREATE DATABASE IF NOT EXISTS {CATALOG_NAME}.{DB}")
spark.sql(f"""
CREATE TABLE {FQN} (
employee_id INT,
employee_name STRING,
salary DOUBLE,
department STRING,
hire_date DATE
)
USING iceberg
PARTITIONED BY (department)
""")
データの挿入(呼び出しごとに 1 件の ACID トランザクション = 1 スナップショット)
import pandas as pd
from datetime import date
pdf = pd.DataFrame([
(101, "John Doe", 75000.0, "Engineering", date(2022, 1, 15)),
(102, "Jane Smith", 85000.0, "Sales", date(2021, 3, 20)),
], columns=["employee_id", "employee_name", "salary", "department", "hire_date"])
spark.createDataFrame(pdf).writeTo(FQN).append()
スキーマ進化(データの書き直しなし)
spark.sql(f"ALTER TABLE {FQN} ADD COLUMN location STRING")
# 既存行の新カラムは NULL となり、エラーは発生しません
タイムトラベル
snaps = spark.sql(f"""
SELECT snapshot_id, committed_at, operation
FROM {FQN}.snapshots
ORDER BY committed_at
""").collect()
first = snaps[0].snapshot_id
spark.sql(f"SELECT * FROM {FQN} VERSION AS OF {first}").show()
物理ファイルの確認
spark.sql(f"""
SELECT file_path, file_format, record_count, file_size_in_bytes
FROM {FQN}.files
""").show(truncate=False)
注意事項
- 認証は暗黙的 —
aidp-object-storageと同様です。ワークスペースの IAM ID が Object Storage への読み書きを行います。キーの設定は不要です。 - Hadoop カタログはメタデータをバケット内に格納します。 スナップショット、スキーマバージョン、マニフェストはすべて
<warehouse>/<db>/<table>/metadata/以下に配置されます。Hive メタストア、Glue、JDBC カタログは不要です。 - CREATE TABLE には
USING icebergが必須です。 指定しない場合、Spark はデフォルトの V1 ファイルソースを使用するため、ACID が無効になります。 - タイムトラベルにはスナップショット ID が必要です。 長い保持期間を設定しておくと便利です。Iceberg はテーブルプロパティ(
history.expire.max-snapshot-age-ms)に基づいてスナップショットを期限切れにするため、長期的なタイムトラベルが必要な場合はこの値を適切に設定してください。 - パーティションプルーニングは、パーティションカラム(例では
department)を使った絞り込み条件があるクエリで有効になります。条件がない場合、Iceberg はすべてのファイルを読み込みますが、並列で処理されます。
参考資料
原文(English)を表示
aidp-iceberg — Apache Iceberg on OCI Object Storage
Manage Iceberg tables (ACID, time travel, schema evolution, partition pruning) backed by OCI Object Storage as the warehouse. The Iceberg Hadoop catalog stores all metadata in the same bucket as data — no external metastore.
When to use
- Iceberg tables on OCI Object Storage.
- Mentioned: "Iceberg", "time travel", "snapshots", "schema evolution".
When NOT to use
- For raw CSV/Parquet/JSON files in
oci://(no transactions / time-travel) →aidp-object-storage. - For Iceberg tables on AWS / Azure → adapt this skill's catalog config; the Hadoop catalog is portable but the bucket URI changes.
One-time catalog registration
OCI_NAMESPACE = "<namespace>"
BUCKET_NAME = "<bucket>"
WAREHOUSE = f"oci://{BUCKET_NAME}@{OCI_NAMESPACE}/iceberg-warehouse"
CATALOG_NAME = "oci_catalog"
spark.conf.set(f"spark.sql.catalog.{CATALOG_NAME}", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set(f"spark.sql.catalog.{CATALOG_NAME}.type", "hadoop")
spark.conf.set(f"spark.sql.catalog.{CATALOG_NAME}.warehouse", WAREHOUSE)
After this, all SQL referring to oci_catalog.<db>.<table> is Iceberg-managed.
Create database + table
DB = "demo_db"
TABLE = "employees"
FQN = f"{CATALOG_NAME}.{DB}.{TABLE}"
spark.sql(f"CREATE DATABASE IF NOT EXISTS {CATALOG_NAME}.{DB}")
spark.sql(f"""
CREATE TABLE {FQN} (
employee_id INT,
employee_name STRING,
salary DOUBLE,
department STRING,
hire_date DATE
)
USING iceberg
PARTITIONED BY (department)
""")
Insert (each call is one ACID transaction = one snapshot)
import pandas as pd
from datetime import date
pdf = pd.DataFrame([
(101, "John Doe", 75000.0, "Engineering", date(2022, 1, 15)),
(102, "Jane Smith", 85000.0, "Sales", date(2021, 3, 20)),
], columns=["employee_id", "employee_name", "salary", "department", "hire_date"])
spark.createDataFrame(pdf).writeTo(FQN).append()
Schema evolution (no rewrite)
spark.sql(f"ALTER TABLE {FQN} ADD COLUMN location STRING")
# Old rows show NULL for the new column; no errors.
Time travel
snaps = spark.sql(f"""
SELECT snapshot_id, committed_at, operation
FROM {FQN}.snapshots
ORDER BY committed_at
""").collect()
first = snaps[0].snapshot_id
spark.sql(f"SELECT * FROM {FQN} VERSION AS OF {first}").show()
Inspect physical files
spark.sql(f"""
SELECT file_path, file_format, record_count, file_size_in_bytes
FROM {FQN}.files
""").show(truncate=False)
Gotchas
- Auth is implicit — same as
aidp-object-storage. The workspace IAM identity reads/writes Object Storage. No keys. - Hadoop catalog stores metadata IN the bucket. Snapshots, schema versions, manifests all sit under
<warehouse>/<db>/<table>/metadata/. There is no Hive metastore, no Glue, no JDBC catalog. USING icebergis required in CREATE TABLE; otherwise Spark uses the default V1 file source and you lose ACID.- Time-travel requires the snapshot ID — keeping a long retention helps. Iceberg expires snapshots based on table properties (
history.expire.max-snapshot-age-ms); set this if long-term time travel matters. - Partition pruning kicks in for queries with predicates on the partition column (
departmentin the example). Without that predicate Iceberg still reads all files but in parallel.
References
原文・著作権は Anthropic および各プラグイン作者に帰属します。日本語訳は Claude API による自動翻訳です。