HomeBig DataUnlock the ability of Apache Iceberg v3 deletion vectors on Amazon EMR

Unlock the ability of Apache Iceberg v3 deletion vectors on Amazon EMR


As fashionable knowledge architectures broaden, Apache Iceberg has change into a extensively common open desk format, offering ACID transactions, time journey, and schema evolution. In desk format v2, Iceberg launched merge-on-read, enhancing delete and replace dealing with by means of positional delete recordsdata. These recordsdata enhance write efficiency however can decelerate reads when not compacted, since Iceberg should merge them throughout question execution to return the most recent snapshot. Iceberg v3 enhances merge efficiency throughout reads by changing positional delete recordsdata with deletion vectors for dealing with row-level deletes in Merge-on-Learn (MoR) tables. This variation deprecates the usage of positional delete recordsdata in v3, which marked particular row positions as deleted, in favor of the extra environment friendly deletion vectors.

On this put up, we examine and consider the efficiency of the brand new binary deletion vectors in Iceberg v3 with respect to conventional place delete recordsdata of Iceberg v2 utilizing Amazon EMR model 7.10.0 with Apache Spark 3.5.5. We offer insights into the sensible impacts of those superior row-level delete mechanisms on knowledge administration effectivity and efficiency.

Understanding binary deletion vectors and Puffin recordsdata

Binary deletion vectors saved in Puffin recordsdata use compressed bitmaps to effectively characterize which rows have been deleted inside a knowledge file. In distinction, earlier Iceberg variations (v2) relied on positional delete recordsdata—Parquet recordsdata that enumerated rows to delete by file and place. This older method resulted in lots of small delete recordsdata, which positioned a heavy burden on question engines as a consequence of quite a few file reads and dear in-memory conversions. Puffin recordsdata scale back this overhead by compactly encoding deletions, enhancing question efficiency and useful resource utilization.

Iceberg v3 improves this within the following features:

  • Diminished I/O – Fewer small delete recordsdata decrease metadata overhead by introducing deletion vectors—compressed bitmaps that effectively characterize deleted rows. These vectors are saved persistently in Puffin recordsdata, a compact binary format optimized for low-latency entry.
  • Question efficiency – Bitmap-based deletion vectors allow quicker scan filtering by permitting a number of vectors to be saved in a single Puffin file. This reduces metadata and file depend overhead whereas preserving file-level granularity for environment friendly reads. The design helps steady merging of deletion vectors, selling ongoing compaction that maintains secure question efficiency and reduces fragmentation over time. It removes the trade-off between partition-level and file-level delete granularity seen in v2, enabling persistently quick reads even in heavy-update eventualities.
  • Storage effectivity – Iceberg v3 makes use of a compressed binary format as a substitute of verbose Parquet positioning. Engines preserve a single deletion vector per knowledge file at write time, enabling higher compaction and constant question efficiency.

Answer overview

To discover the efficiency traits of delete operations in Iceberg v2 and v3, we use PySpark to run our comparability exams specializing in delete operation runtime and delete file measurement. This implementation helps us successfully benchmark and examine the deletion mechanisms between Iceberg v2’s position-delete recordsdata utilizing Parquet and v3’s newer Puffin-based deletion vectors.

Our answer demonstrates how one can configure Spark with the AWS Glue Information Catalog and Iceberg, create tables, and run delete operations programmatically. We first create Iceberg tables with format variations 2 and three, insert 10,000 rows, then carry out delete operations on a spread of file IDs. We additionally carry out desk compaction after which measure delete operation runtime and measurement and depend of related delete recordsdata.

In Iceberg v3, deleting rows introduces binary deletion vectors saved in Puffin recordsdata (compact binary sidecar recordsdata). These enable extra environment friendly question planning and quicker learn efficiency by consolidating deletes and avoiding giant numbers of small recordsdata.

For this check, the Spark job was submitted by SSH’ing into the EMR cluster and utilizing spark-submit straight from the shell, with the required Iceberg JAR file being referenced straight from the Amazon Easy Storage Service (Amazon S3) bucket within the submission command. When working the job, be sure you present your S3 bucket identify. See the next code:

spark-submit --jars s3:///iceberg/jars/iceberg-spark-runtime-3.5_2.12-1.9.2.jar v3_deletion_vector_test.py

Conditions

To observe together with this put up, you have to have the next conditions:

  • Amazon EMR on Amazon EC2 with model 7.10.0 built-in with the Glue Information Catalog, which incorporates Spark 3.5.5.
  • The Iceberg 1.9.2 JAR file from the official Iceberg documentation, which incorporates essential deletion vector enhancements resembling v2 to v3 rewrites and dangling deletion vector detection. Optionally, you should use the default Iceberg 1.8.1-amzn-0 bundled with Amazon EMR 7.10 if these Iceberg 1.9.x enhancements should not required.
  • An S3 bucket to retailer Iceberg knowledge.
  • An AWS Id and Entry administration (IAM) function for Amazon EMR configured with the required permissions.

The upcoming Amazon EMR 7.11 will ship with Iceberg 1.9.1-amzn-1, which incorporates deletion vector enhancements resembling v2 to v3 rewrites and dangling deletion vector detection. This implies you now not have to manually obtain or add the Iceberg JAR file, as a result of it is going to be included and managed natively by Amazon EMR.

Code walkthrough

The next PySpark script demonstrates how one can create, write, compact, and delete information in Iceberg tables with two totally different format variations (v2 and v3) utilizing the Glue Information Catalog because the metastore. The primary purpose is to check each write and browse efficiency, together with storage traits (delete file format and measurement) between Iceberg format variations 2 and three.

The code performs the next capabilities:

  • Creates a SparkSession configured to make use of Iceberg with Glue Information Catalog integration.
  • Creates an artificial dataset simulating consumer information:
    • Makes use of a hard and fast random seed (42) to supply constant knowledge technology
    • Creates similar datasets for each v2 and v3 tables for truthful comparability
  • Defines the perform test_read_performance(table_name) to carry out the next actions:
    • Measure full desk scan efficiency
    • Measure filtered learn efficiency (with WHERE clause)
    • Monitor file counts for each operations
  • Defines the perform test_iceberg_table(model, test_df) to carry out the next actions:
    • Create or use an Iceberg desk for the required format model
    • Append knowledge to the Iceberg desk
    • Set off Iceberg’s knowledge compaction utilizing a system process
    • Delete rows with IDs between 1000–1099
    • Acquire statistics about inserted knowledge recordsdata and delete-related recordsdata
    • Measure and file learn efficiency metrics
    • Monitor operation timing for inserts, deletes, and reads
  • Defines a perform to print a complete comparative report together with the next info:
    • Delete operation efficiency
    • Learn efficiency (each full desk and filtered)
    • Delete file traits (codecs, counts, sizes)
    • Efficiency enhancements as percentages
    • Storage effectivity metrics
  • Orchestrate the primary execution stream:
    • Create a single dataset to make sure similar knowledge for each variations
    • Clear up present tables for recent testing
    • Run exams for Iceberg format model 2 and model 3
    • Output an in depth comparability report
    • Deal with exceptions and shut down the Spark session

See the next code:

from pyspark.sql import SparkSession
from pyspark.sql.varieties import StructType, StructField, IntegerType, StringType
from pyspark.sql import capabilities as F
import time
import random
import logging
from pyspark.sql.utils import AnalysisException
# Logging
logging.basicConfig(degree=logging.INFO, format="%(message)s")
logger = logging.getLogger(__name__)
# Constants
ROWS_COUNT = 10000
DELETE_RANGE_START = 1000
DELETE_RANGE_END = 1099
SAMPLE_NAMES = ["Alice", "Bob", "Charlie", "Diana",
                "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"]
# Spark Session
spark = (
    SparkSession.builder
    .appName("IcebergWithGlueCatalog")
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    .config("spark.sql.catalog.glue_catalog.warehouse", "s3:///weblog/glue/")
    .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .getOrCreate()
)
spark.sql("CREATE DATABASE IF NOT EXISTS glue_catalog.weblog")
def create_dataset(num_rows=ROWS_COUNT):
    # Set a hard and fast seed for reproducibility
    random.seed(42)
    
    knowledge = [(i,
             random.choice(SAMPLE_NAMES) + str(i),
             random.randint(18, 80))
            for i in range(1, num_rows + 1)]
    schema = StructType([
        StructField("id", IntegerType(), False),
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True)
    ])
    df = spark.createDataFrame(knowledge, schema)
    df = df.withColumn("created_at", F.current_timestamp())
    return df
def test_read_performance(table_name):
    """Check learn efficiency of the desk"""
    start_time = time.time()
    depend = spark.sql(f"SELECT COUNT(*) FROM glue_catalog.weblog.{table_name}").gather()[0][0]
    read_time = time.time() - start_time
    
    # Check filtered learn efficiency
    start_time = time.time()
    filtered_count = spark.sql(f"""
        SELECT COUNT(*) 
        FROM glue_catalog.weblog.{table_name} 
        WHERE age > 30
    """).gather()[0][0]
    filtered_read_time = time.time() - start_time
    
    return read_time, filtered_read_time, depend, filtered_count
def test_iceberg_table(model, test_df):
    strive:
        table_name = f"iceberg_table_v{model}"
        logger.data(f"n=== TESTING ICEBERG V{model} ===")
        spark.sql(f"""
            CREATE TABLE IF NOT EXISTS glue_catalog.weblog.{table_name} (
                id int,
                identify string,
                age int,
                created_at timestamp
            ) USING iceberg
            TBLPROPERTIES (
                'format-version'='{model}',
                'write.delete.mode'='merge-on-read'
            )
        """)
        start_time = time.time()
        test_df.writeTo(f"glue_catalog.weblog.{table_name}").append()
        insert_time = time.time() - start_time
        logger.data("Compaction...")
        spark.sql(
            f"CALL glue_catalog.system.rewrite_data_files('glue_catalog.weblog.{table_name}')")
        start_time = time.time()
        spark.sql(f"""
            DELETE FROM glue_catalog.weblog.{table_name}
            WHERE id BETWEEN {DELETE_RANGE_START} AND {DELETE_RANGE_END}
        """)
        delete_time = time.time() - start_time
        files_df = spark.sql(
            f"SELECT COUNT(*) as data_files FROM glue_catalog.weblog.{table_name}.recordsdata")
        delete_files_df = spark.sql(f"""
            SELECT COUNT(*) as delete_files,
                   file_format,
                   SUM(file_size_in_bytes) as total_size
            FROM glue_catalog.weblog.{table_name}.delete_files
            GROUP BY file_format
        """)
        data_files = files_df.gather()[0]['data_files']
        delete_stats = delete_files_df.gather()
        # Add learn efficiency testing
        logger.data("nTesting learn efficiency...")
        read_time, filtered_read_time, total_count, filtered_count = test_read_performance(table_name)
        
        logger.data(f"Insert time: {insert_time:.3f}s")
        logger.data(f"Delete time: {delete_time:.3f}s")
        logger.data(f"Full desk learn time: {read_time:.3f}s")
        logger.data(f"Filtered learn time: {filtered_read_time:.3f}s")
        logger.data(f"Information recordsdata: {data_files}")
        logger.data(f"Complete information: {total_count}")
        logger.data(f"Filtered information: {filtered_count}")
        if len(delete_stats) > 0:
            stats = delete_stats[0]
            logger.data(f"Delete recordsdata: {stats.delete_files}")
            logger.data(f"Delete format: {stats.file_format}")
            logger.data(f"Delete recordsdata measurement: {stats.total_size} bytes")
            return delete_time, stats.total_size, stats.file_format, read_time, filtered_read_time
        else:
            logger.data("No delete recordsdata discovered")
            return delete_time, 0, "N/A", read_time, filtered_read_time
    besides AnalysisException as e:
        logger.error(f"SQL Error: {str(e)}")
        increase
    besides Exception as e:
        logger.error(f"Error: {str(e)}")
        increase
def print_comparison_results(v2_results, v3_results):
    v2_delete_time, v2_size, v2_format, v2_read_time, v2_filtered_read_time = v2_results
    v3_delete_time, v3_size, v3_format, v3_read_time, v3_filtered_read_time = v3_results
    logger.data("n=== PERFORMANCE COMPARISON ===")
    logger.data(f"v2 delete time: {v2_delete_time:.3f}s")
    logger.data(f"v3 delete time: {v3_delete_time:.3f}s")
    if v2_delete_time > 0:
        enchancment = ((v2_delete_time - v3_delete_time) / v2_delete_time) * 100
        logger.data(f"v3 Delete efficiency enchancment: {enchancment:.1f}%")
    logger.data("n=== READ PERFORMANCE COMPARISON ===")
    logger.data(f"v2 full desk learn time: {v2_read_time:.3f}s")
    logger.data(f"v3 full desk learn time: {v3_read_time:.3f}s")
    logger.data(f"v2 filtered learn time: {v2_filtered_read_time:.3f}s")
    logger.data(f"v3 filtered learn time: {v3_filtered_read_time:.3f}s")
    
    if v2_read_time > 0:
        read_improvement = ((v2_read_time - v3_read_time) / v2_read_time) * 100
        logger.data(f"v3 Learn efficiency enchancment: {read_improvement:.1f}%")
    
    if v2_filtered_read_time > 0:
        filtered_improvement = ((v2_filtered_read_time - v3_filtered_read_time) / v2_filtered_read_time) * 100
        logger.data(f"v3 Filtered learn efficiency enchancment: {filtered_improvement:.1f}%")
    logger.data("n=== DELETE FILE COMPARISON ===")
    logger.data(f"v2 delete format: {v2_format}")
    logger.data(f"v2 delete measurement: {v2_size} bytes")
    logger.data(f"v3 delete format: {v3_format}")
    logger.data(f"v3 delete measurement: {v3_size} bytes")
    if v2_size > 0:
        size_reduction = ((v2_size - v3_size) / v2_size) * 100
        logger.data(f"v3 measurement discount: {size_reduction:.1f}%")
# Predominant
strive:
    # Create dataset as soon as and reuse for each variations
    test_dataset = create_dataset()
    
    # Drop present tables in the event that they exist
    spark.sql("DROP TABLE IF EXISTS glue_catalog.weblog.iceberg_table_v2")
    spark.sql("DROP TABLE IF EXISTS glue_catalog.weblog.iceberg_table_v3")
    
    # Check each variations with the identical dataset
    v2_results = test_iceberg_table(2, test_dataset)
    v3_results = test_iceberg_table(3, test_dataset)
    print_comparison_results(v2_results, v3_results)
lastly:
    spark.cease()

Outcomes abstract

The output generated by the code contains the outcomes abstract part that reveals a number of key comparisons, as proven within the following screenshot. For delete operations, Iceberg v3 makes use of the Puffin file format in comparison with Parquet in v2, leading to vital enhancements. The delete operation time decreased from 3.126 seconds in v2 to 1.407 seconds in v3, attaining a 55.0% efficiency enchancment. Moreover, the delete file measurement was lowered from 1801 bytes utilizing Parquet in v2 to 475 bytes utilizing Puffin in v3, representing a 73.6% discount in storage overhead. Learn operations additionally noticed notable enhancements, with full desk reads 28.5% quicker and filtered reads 23% quicker in v3. These enhancements reveal the effectivity good points from v3’s implementation of binary deletion vectors by means of the Puffin format.

style=

The precise measured efficiency and storage enhancements rely upon workload and atmosphere and would possibly differ from the previous instance.

This following screenshot from the S3 bucket demonstrates a Puffin delete file saved alongside knowledge recordsdata.

style=

Clear up

After you end your exams, it’s essential to wash up your atmosphere to keep away from pointless prices:

  1. Drop the check tables you created to take away related knowledge out of your S3 bucket and stop ongoing storage prices.
  2. Delete any short-term knowledge left within the S3 bucket used for Iceberg knowledge.
  3. Delete the EMR cluster to cease billing for working compute assets.

Cleansing up assets promptly helps preserve cost-efficiency and useful resource hygiene in your AWS atmosphere.

Issues

Iceberg options are launched by means of a phased course of: first within the specification, then within the core library, and at last in engine implementations. Deletion vector assist is at present obtainable within the specification and core library, with Spark being the one supported engine. We validated this functionality on Amazon EMR 7.10 with Spark 3.5.5.

Conclusion

Iceberg v3 introduces a major development in managing row-level deletes for merge-on-read operations by means of binary deletion vectors saved in compact Puffin recordsdata. Our efficiency exams, carried out with Iceberg 1.9.2 on Amazon EMR 7.10.0 and EMR Spark 3.5.5, present clear enhancements in each delete operation velocity and browse efficiency, together with a substantial discount in delete file storage in comparison with Iceberg v2’s positional delete Parquet recordsdata. For extra details about deletion vectors, consult with Iceberg v3 deletion vectors.


In regards to the authors

Arun Shanmugam

Arun Shanmugam

Arun is a Senior Analytics Options Architect at AWS, with a give attention to constructing fashionable knowledge structure. He has been efficiently delivering scalable knowledge analytics options for patrons throughout numerous industries. Outdoors of labor, Arun is an avid out of doors fanatic who actively engages in CrossFit, highway biking, and cricket.

Suthan Phillips

Suthan Phillips

Suthan is a Senior Analytics Architect at AWS, the place he helps clients design and optimize scalable, high-performance knowledge options that drive enterprise insights. He combines architectural steerage on system design and scalability with greatest practices to supply environment friendly, safe implementation throughout knowledge processing and expertise layers. Outdoors of labor, Suthan enjoys swimming, mountain climbing, and exploring the Pacific Northwest.

Kinshuk Paharae

Kinshuk Paharae

Kinshuk is head of product for knowledge processing, main product groups for AWS Glue, Amazon EMR, and Amazon Athena. He has been with AWS for over 5 years.

Linda O'Connor

Linda O’Connor

Linda is a Seasoned Go-To-Market Chief with shut to a few many years of expertise driving progress methods within the knowledge and analytics area. At AWS, she at present leads pan analytics initiatives together with lakehouse architectures, serving to clients remodel their present landscapes by means of non-disruptive innovation. She beforehand served as International Vice President at a German software program firm for 25 years, the place she spearheaded Information Warehousing and Huge Information portfolios, orchestrating profitable product launches and driving international market growth.

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

- Advertisment -
Google search engine

Most Popular

Recent Comments