Skip to content
Operations & Optimization Last updated: May 14, 2026

Iceberg Incremental Reads

Iceberg incremental reads enable processing only the new or changed data between two snapshots by using the snapshot diff API to identify added and deleted files, making efficient change-data-capture and incremental ETL pipelines without full table scans.

iceberg incremental readsiceberg snapshot difficeberg incremental etliceberg new data since snapshoticeberg change detection

Iceberg Incremental Reads

Incremental reads in Apache Iceberg enable processing only the data that has changed between two snapshots — rather than scanning the entire table every time. This is the foundation for efficient micro-batch ETL, change-data-capture pipelines, and lakehouse-to-lakehouse replication patterns where repeated full-table scans are prohibitively expensive.

The Snapshot Diff API

Iceberg’s snapshot model makes incremental reads straightforward. Because every write creates a new snapshot that records which files were added and which were deleted, the “diff” between two snapshots is precisely the list of changed files.

For an incremental read from snapshot A to snapshot B:

Note: For Merge-on-Read tables, incremental reads based on file diffs are more complex because deletes are recorded as delete files rather than file removals.

Incremental Reads with Apache Spark

Using the Snapshot Incremental API

# Spark: read only new rows appended since a specific snapshot
from_snapshot_id = 8027658604211071520
to_snapshot_id   = 9135729705312082631

incremental_df = spark.read \
    .format("iceberg") \
    .option("start-snapshot-id", from_snapshot_id) \
    .option("end-snapshot-id", to_snapshot_id) \
    .load("db.orders")

# Process only the new/changed rows
incremental_df.show()

Streaming Incremental Read

# Spark Streaming: continuously read new Iceberg snapshots
incremental_stream = spark.readStream \
    .format("iceberg") \
    .option("stream-from-timestamp", "1715700000000") \
    .load("db.events")

incremental_stream \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .start("s3://downstream-bucket/events")

Incremental Reads with PyIceberg

from pyiceberg.catalog import load_catalog

catalog = load_catalog("my_catalog", **{...})
table = catalog.load_table("db.orders")

# Get the snapshot diff between two points
from_snapshot_id = 8027658604211071520
to_snapshot_id = table.current_snapshot().snapshot_id

# Find added files since from_snapshot
added_files = []
for snap_id in get_snapshots_between(table, from_snapshot_id, to_snapshot_id):
    snapshot = table.snapshot_by_id(snap_id)
    for manifest in snapshot.manifests(table.io()):
        for entry in manifest.entries():
            if entry.status == ManifestEntryStatus.ADDED:
                added_files.append(entry.data_file)

# Scan only the added files
scan = table.scan(snapshot_id=to_snapshot_id)
# Apply file filter to added_files only for true incremental read

Incremental Read Patterns

Pattern 1: Snapshot ID Watermark

Track the last processed snapshot ID and read new snapshots since that point:

# Store last processed snapshot in a state store
last_processed_snapshot = state_store.get("orders_last_snapshot")

table = catalog.load_table("db.orders")
current_snapshot = table.current_snapshot().snapshot_id

if current_snapshot != last_processed_snapshot:
    # Process new data
    df = spark.read.format("iceberg") \
        .option("start-snapshot-id", last_processed_snapshot) \
        .option("end-snapshot-id", current_snapshot) \
        .load("db.orders")

    process_incremental(df)

    # Update watermark
    state_store.set("orders_last_snapshot", current_snapshot)

Pattern 2: Timestamp Watermark

Use snapshot timestamps for time-based incrementals:

from datetime import datetime, timedelta

one_hour_ago_ms = int((datetime.now() - timedelta(hours=1)).timestamp() * 1000)

# Find snapshot at or just before one_hour_ago_ms
from_snapshot = table.snapshot_as_of_timestamp(one_hour_ago_ms)
to_snapshot = table.current_snapshot()

incremental_df = spark.read.format("iceberg") \
    .option("start-snapshot-id", from_snapshot.snapshot_id) \
    .option("end-snapshot-id", to_snapshot.snapshot_id) \
    .load("db.orders")

Incremental Read Limitations

For true row-level CDC over MoR tables, use Iceberg CDC patterns with Flink for precise row-level change tracking rather than file-level snapshot diffs.

Use Cases

Use CaseIncremental Read Approach
Downstream ETL refreshSnapshot ID watermark → Spark incremental batch
Lakehouse-to-lakehouse syncTimestamp watermark → Spark Streaming
ML feature freshness updateSnapshot diff → PyIceberg + Arrow
Audit log exportAll snapshots since last export → ordered processing
DWH loadingNightly incremental from last-loaded snapshot

📚 Go Deeper on Apache Iceberg

Alex Merced has authored three hands-on books covering Apache Iceberg, the Agentic Lakehouse, and modern data architecture. Pick up a copy to master the full ecosystem.

← Back to Iceberg Knowledge Base