A Deep Dive into ACID Transactions with Delta Lake

In the world of Big Data, ensuring data integrity, consistency, and reliability during operations is paramount. This article aims to take you on a journey through one of the main features of Delta Lake on Apache Spark - ACID transactions, a feature that brings the reliability of traditional databases into the Big Data realm.

ACID Transactions: An Overview

link to this section

ACID stands for Atomicity, Consistency, Isolation, and Durability. These properties are integral to ensuring data reliability and consistency in database systems, and Delta Lake brings these to the world of Big Data processing.

  • Atomicity : This property ensures that all changes in a transaction are recorded, or none are. In other words, if a transaction fails at any point, all changes made during the transaction are rolled back, and the data remains unaltered.

  • Consistency : Consistency ensures that a transaction brings the database from one valid state to another, adhering to predefined rules and constraints.

  • Isolation : This property ensures that concurrent execution of transactions results in a system state as if transactions were executed serially, i.e., one after the other.

  • Durability : Durability guarantees that once a transaction is committed, it will remain so, even in the event of power loss, crashes, or errors.

Atomicity

link to this section

Atomicity means that a set of operations are treated as one. If one operation in the set fails, none of the operations take effect. In Delta Lake, this means that each operation on the data is all or nothing.

For instance, consider a scenario where you are appending a DataFrame to a Delta Lake table:

data = spark.range(5, 10) 
data.write.format("delta").mode("append").save("/tmp/delta-table") 

In the middle of this operation, let's say a node in your Spark cluster fails. Because of atomicity, Delta Lake ensures that the operation either completely succeeds or fails without any side effects. The table "/tmp/delta-table" will remain unaltered in case of failure, ensuring data integrity.

Consistency

link to this section

Consistency in the context of ACID transactions ensures that a transaction brings the database from one valid state to another. In other words, if we're writing data that doesn't match the table schema, Delta Lake will prevent it from ensuring consistency.

Let's say you have a Delta Lake table with a specific schema, and you try to append data that doesn't conform to that schema:

from pyspark.sql.types import StructType, StructField, StringType 
        
# Assume initial schema has only one column "id" 
initial_data = spark.createDataFrame([(1,), (2,)], ["id"]) initial_data.write.format("delta").save("/tmp/delta-table") 

# Now you try to write data with different schema 
new_data_schema = StructType([StructField("id", StringType()), StructField("value", StringType())]) 
new_data = spark.createDataFrame([("3", "foo"), ("4", "bar")], schema=new_data_schema) 

# This will throw an error 
new_data.write.format("delta").mode("append").save("/tmp/delta-table") 

The above code will result in a schema mismatch error because Delta Lake enforces schema consistency.

Isolation

link to this section

Isolation refers to the ability of a transaction to operate independently of others. Delta Lake ensures isolation through a concept called snapshot isolation. When you start a read or write operation, Delta Lake takes a snapshot of the data and applies all the operations on this snapshot.

To illustrate, let's say you have a streaming job that's appending data to a Delta Lake table:

streaming_data.writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table") 

While this streaming job is running, you can run a batch job to count the rows:

spark.read.format("delta").load("/tmp/delta-table").count() 

The count operation isn't affected by the streaming write operation because it operates on a snapshot of the table at the time the operation began. This is a simplification, but it illustrates the principle of isolation.

Types of Isolations

Delta Lake supports two types of isolation levels to handle transaction concurrency: Serializable and WriteSerializable.

  • Serializable : This is the highest level of isolation, which means that any write transaction on a table is completely isolated from other transactions. If two jobs try to write to the same table concurrently, one job will have to wait until the other completes, thus avoiding any conflicts.

  • WriteSerializable : This is the default level of isolation in Delta Lake. This mode allows for concurrent writing, but it still ensures the final table state will be as if the operations had been executed in some serial (one after the other) order.

You don't typically need to manually set the isolation level, as Delta Lake manages this automatically. However, if you find yourself in a situation where you need to ensure the serializable isolation level for stronger consistency guarantees, you can set this up via Spark configurations. Here's an example of how to do this:

spark.conf.set("spark.databricks.delta.isolationLevel", "serializable") 

In the example above, we're using the set method of the Spark session's conf object to set the Delta isolation level to "serializable". If you wanted to switch back to the default isolation level (WriteSerializable), you would do:

spark.conf.set("spark.databricks.delta.isolationLevel", "writeSerializable") 

Remember, Serializable isolation level can significantly impact the performance due to its restrictive nature. Therefore, it's recommended to use it only when it's absolutely necessary to avoid conflicts among transactions.

Durability

link to this section

Durability means that once an operation has been committed, it will survive subsequent failures. In Delta Lake, all operations are stored durably in a transaction log, which is a record of all operations that have been performed on the table.

This transaction log serves multiple purposes, including:

  1. It allows Delta Lake to provide a consistent view of the data, even when operations are being performed on the table.
  2. It allows for "time travel" — the ability to query older versions of the table.
  3. It allows for the possibility of rollbacks (although as of my knowledge cutoff in September 2021, explicit rollbacks aren't supported in Delta Lake).

Here's an example of how you can access the transaction log:

from delta.tables import DeltaTable 
        
deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table") 

# Get the transaction log 
history = deltaTable.history() 

# Display the log 
history.show() 

The history method on a DeltaTable returns a DataFrame that contains information about all the operations that have been performed on the table.

Conclusion

link to this section

ACID transactions are at the core of Delta Lake's promise of reliability and consistency. By ensuring atomicity, consistency, isolation, and durability of data operations, Delta Lake introduces the robustness of traditional databases to the Big Data ecosystem, empowering businesses to make confident, data-driven decisions.

To understand how Delta Lake's ACID transactions can be leveraged in more complex, real-world scenarios, we recommend experimenting hands-on and exploring the comprehensive official Delta Lake documentation . Unleash the power of reliable, large-scale data processing with Delta Lake!