Databricks Delta Guide

Download as doc, pdf, or txt
Download as doc, pdf, or txt
You are on page 1of 11

Databricks Delta Guide

Note
Databricks Delta is in Preview.
Use this guide to learn about Databricks Delta, a powerful transactional storage
layer that harnesses the power of Apache Spark and Databricks DBFS.

 Introduction to Databricks Delta

o Requirements

o Frequently asked questions (FAQ)


 Databricks Delta Quickstart

o Create a table

o Read a table

o Append data to a table

o Stream data into a table

o Optimize a table

o Clean up snapshots
 Table Batch Reads and Writes

o Create a table

o Read a table

o Write to a table

o Schema validation

o Update table schema

o Replace table schema

o Views on tables

o Table properties
o Table metadata
 Table Streaming Reads and Writes

o As a source

o As a sink
 Optimizing Performance and Cost

o Compaction (bin-packing)

o ZOrdering (multi-dimensional clustering)


o Data skipping

o Garbage collection

o Improving performance for interactive queries

o Frequently asked questions (FAQ)


 Table Versioning

 Concurrency Control and Isolation Levels in Databricks Delta

o Optimistic Concurrency Control

o Isolation levels
 Porting Existing Workloads to Databricks Delta

o Example

 PREVIOUS   NEXT 

Introduction to Databricks
Delta
Note
Databricks Delta is in Preview.
Databricks Delta delivers a powerful transactional storage layer by harnessing
the power of Apache Spark and Databricks DBFS. The core abstraction of
Databricks Delta is an optimized Spark table that

 Stores data as Parquet files in DBFS.


 Maintains a transaction log that efficiently tracks changes to the table.

You read and write data stored in the delta format using the same familiar
Apache Spark SQL batch and streaming APIs that you use to work with Hive
tables and DBFS directories. With the addition of the transaction log and other
enhancements, Databricks Delta offers significant benefits:

ACID transactions

 Multiple writers can simultaneously modify a dataset and see consistent


views. For qualifications, see Multi-cluster writes.
 Writers can modify a dataset without interfering with jobs reading the
dataset.

Fast read access


 Automatic file management organizes data into large files that can be
read efficiently.
 Statistics enable speeding up reads by 10-100x and and data skipping
avoids reading irrelevant information.

Requirements
Databricks Delta requires Databricks Runtime 4.1 or above. If you created a
Databricks Delta table using a Databricks Runtime lower than 4.1, the table
version must be upgraded. For details, see Table Versioning.

Frequently asked questions (FAQ)


How do Databricks Delta tables compare to Hive SerDe tables?
Databricks Delta tables are managed to a greater degree. In particular,
there are several Hive SerDe parameters that Databricks Delta manages
on your behalf that you should never specify manually:
 ROWFORMAT
 SERDE

 OUTPUTFORMAT AND INPUTFORMAT

 COMPRESSION

 STORED AS

Does Databricks Delta support multi-table transactions?


Databricks Delta does not support multi-table transactions and foreign
keys. Databricks Delta supports transactions at the tablelevel.
Does Databricks Delta support writes or reads using the Spark
Streaming DStream API?
Databricks Delta does not support the DStream API. We recommend
Structured Streaming.
What DDL and DML features does Databricks Delta not support?
 Unsupported DDL features:

o ANALYZE TABLE PARTITION

o ALTER TABLE [ADD|DROP] PARTITION

o ALTER TABLE SET LOCATION

o ALTER TABLE RECOVER PARTITIONS

o ALTER TABLE SET SERDEPROPERTIES

o CREATE TABLE LIKE

o INSERT OVERWRITE DIRECTORY

o LOAD DATA

 Unsupported DML features:

o INSERT INTO [OVERWRITE] with static partitions.

o Bucketing.
o Specifying a schema when reading from a table. A command such
as spark.read.format("delta").schema(df.schema).load(path
) will fail.

o Specifying target partitions


using PARTITION (part_spec) in TRUNCATE TABLE.

What does it mean that Databricks Delta supports multi-cluster writes?


It means that Databricks Delta does locking to make sure that queries
writing to a table from multiple clusters at the same time won’t corrupt the
table. However, it does not mean that if there is a write conflict (for
example, update and delete the same thing) that they will both succeed.
Instead, one of writes will fail atomically and the error will tell you to retry
the operation.
What are the limitations of multi-cluster writes?
Databricks Delta supports transactional writes from multiple clusters in the
same workspace in Databricks Runtime 4.2 and above. All writers must be
running Databricks Runtime 4.2 or above. The following features are not
supported when running in this mode:

 SparkR
 Spark-submit job

 Run a command using REST APIs

 Client-side S3 encryption

 Server-Side Encryption with Customer-Provided Encryption Keys

 S3 paths with credentials in a cluster that cannot access AWS Security


Token Service

You can disable multi-cluster writes by


setting spark.databricks.delta.multiClusterWrites.enabled to fals
e. If they are disabled, writes to a single table must originate from a single
cluster.

Warning

 You cannot concurrently modify the same Databricks Delta table


from different workspaces.
 Writes to a single table using Databricks Runtime versions lower than
4.2 must originate from a single cluster. To perform transactional writes
from multiple clusters in the same workspace you must upgrade to
Databricks Runtime 4.2.

Why is Databricks Delta data I deleted still stored in S3?


If you are using Databricks Delta and have enabled bucket versioning you
have two entities managing table files: Databricks Delta and AWS. To
ensure that data is fully deleted you must:

 Clean up deleted files that are no longer in the Databricks Delta


transaction log using VACUUM
 Enable an S3 lifecycle policy for versioned objects that ensures that old
versions of deleted files are purged

Can I access Databricks Delta tables outside of Databricks Runtime?


There are two cases to consider: external writes and external reads.

 External writes: Databricks Delta maintains additional metadata in the


form of a transaction log to enable ACID transactions and snapshot
isolation for readers. In order to ensure the transaction log is updated
correctly and the proper validations are performed, writes must go
through Databricks Runtime.
 External reads: Databricks Delta tables store data encoded in an open
format (Parquet), allowing other tools that understand this format to
read the data. However, since other tools do not support Databricks
Delta‘s transaction log, it is likely that they will incorrectly read stale
deleted data, uncommitted data, or the partial results of failed
transactions.
In cases where the data is static (that is, there are no active jobs writing
to the table), you can use VACUUM with a retention of ZERO HOURS to
clean up any stale Parquet files that are not currently part of the table.
This operation puts the Parquet files present in DBFS into a consistent
state such that they can now be read by external tools.
However, Databricks Delta relies on stale snapshots for the following
functionality, which will break when using VACUUM with zero retention
allowance:
o Snapshot isolation for readers - Long running jobs will continue to
read a consistent snapshot from the moment the jobs started, even if
the table is modified concurrently. Running VACUUM with a
retention less than length of these jobs can cause them to fail with
a FileNotFoundException.
o Streaming from Databricks Delta tables - Streams read from the
original files written into a table in order to ensure exactly once
processing. When combined with OPTIMIZE, VACUUM with zero
retention can remove these files before the stream has time to
processes them, causing it to fail.

For these reasons we only recommend the above technique on static data
sets that must be read by external tools.

Databricks Delta Quickstart


This quickstart demonstrates the basics of working with Databricks Delta. This
topic shows how to build a pipeline that reads JSON data into a Databricks Delta
table and then append additional data. The topic includes an example notebook
that demonstrates basic Databricks Delta operations.

In this topic:

 Create a table
 Read a table

 Append data to a table

o Example notebooks
 Stream data into a table

 Optimize a table

 Clean up snapshots

Create a table
Create a table from a dataset. You can use existing Spark SQL code and change
the format from parquet, csv, json, and so on, to delta.
Scala

Copy

events = spark.read.json("/data/events")
events.write.format("delta").save("/data/events")
SQL

Copy

CREATE TABLE events


USING delta
AS SELECT *
FROM json.`/data/events/`
These operations create a new table using the schema that was inferred from the
JSON data. For the full set of options available when you create a new
Databricks Delta table, see Create a table and Write to a table.

Read a table
You access data in Databricks Delta tables either by specifying the path on
DBFS ("/data/events") or the table name ("events"):

Scala

Copy

events = spark.read.format("delta").load("/data/events")
or

Copy

events = spark.table("events")
SQL

Copy

SELECT * FROM delta.`/data/events`


or

Copy

SELECT * FROM events


Append data to a table
As new events arrive, you can atomically append them to the table:

Scala

Copy

newEvents.write
.format("delta")
.mode("append")
.save("/data/events")
or

Copy

newEvents.write
.format("delta")
.mode("append")
.saveAsTable("events")
SQL

Copy

INSERT INTO events VALUES(...)


or

Copy

INSERT INTO events SELECT * FROM newEvents


For an example of how to create a Databricks Delta table and append to it, see
the following notebook:

Example notebooks
 Python notebook
 Scala notebook

 SQL notebook

Python notebook
How to import a notebookGet notebook link
Scala notebook
How to import a notebookGet notebook link

SQL notebook
How to import a notebookGet notebook link

Stream data into a table


You can also use Structured Streaming to stream new data as it arrives into the
table:

Copy

events = spark.readStream.json("/data/events")
events.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/delta/events/_checkpoint/etl-from-json")
.start("/delta/events")
For more information about Databricks Delta integration with Structured
Streaming, see Table Streaming Reads and Writes.

Optimize a table
Once you have been streaming for awhile, you will likely have a lot of small files
in the table. If you want to improve the speed of read queries, you can
use OPTIMIZE to collapse small files into larger ones:

Copy

OPTIMIZE delta.`/data/events`
or

Copy

OPTIMIZE events
You can also specify interesting columns that are often present in query
predicates for your workload, and Databricks Delta uses this information to
cluster related records together:
Copy

OPTIMIZE events ZORDER BY eventType, city


For the full set of options available when running OPTIMIZE, see Optimizing
Performance and Cost.

Clean up snapshots
Databricks Delta provides snapshot isolation for reads, which means that it is
safe to run OPTIMIZE even while other users or jobs are querying the table.
Eventually you should clean up old snapshots. You can do this by running
the VACUUM command:

Copy

VACUUM events
You control the age of the latest retained snapshot by using
the RETAIN <N> HOURS option:

Copy

VACUUM events RETAIN 24 HOURS


For details on using VACUUM effectively, see Garbage collection.

 PREVIOUS   NEXT 

You might also like