Load Data using EMR Spark with Apache Iceberg

Vishal Khondre
Dev Genius
Published in
5 min readAug 3, 2022

--

Quick set of instructions to create an EMR cluster and then load data using Spark with Apache Iceberg.

Tech Stack:

AWS EMR as Compute Engine. Elastic MapReduce (EMR) is a managed cluster platform that simplifies running big data frameworks.

AWS S3 as data Storage (data warehouse). Simple Storage Service (Amazon S3) is an object storage service offering industry-leading scalability, data availability, security, and performance.

AWS DynamoDB as Spark Catalog metastore. Amazon DynamoDB is a fully managed proprietary NoSQL database service that supports key–value and document data structures.

Apache Spark as open-source, distributed data processing framework/system used for big data workloads.

Apache Iceberg as an open table format for large data sets. A highly performant, concurrent, ACID-compliant table format for data lakes.

Steps to Create AWS EMR Cluster and Jupyter Notebook

Let’s create an EMR Cluster

Go to AWS Services and click on EMR. Click on “Create cluster” button.

Click on “Go to advanced options”

Lets select the EMR version to 6.4.0 in the drop-down selector. Next, check the box for all of the following applications to include them in the cluster:

Spark 3.1.2
JupyterHub 1.41
JupyterEnterpriseGateway 2.1.0

In the “Edit software settings” section, please enter the following configuration into the text box. Please make sure that you replace <Your S3 Path> with a path to the location on S3 which you want to use for your warehouse, e.g. s3://dficebergdemo

[
{
“classification”:”spark-defaults”,
“properties”:{
“spark.jars.packages”:”org.apache.iceberg:iceberg-spark3-runtime:0.12.1,software.amazon.awssdk:bundle:2.15.40,software.amazon.awssdk:url-connection-client:2.15.40",
“spark.sql.extensions”:”org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions”,
“spark.sql.defaultCatalog”:”prodiceberg”,
“spark.sql.catalog.prodiceberg”:”org.apache.iceberg.spark.SparkCatalog”,
“spark.sql.catalog.prodiceberg.catalog-impl”:”org.apache.iceberg.aws.dynamodb.DynamoDbCatalog”,
“spark.sql.catalog.prodiceberg.warehouse”:”s3://dficebergdemo”,
“spark.sql.catalog.prodiceberg.dynamodb.table-name”:”prodiceberg_metastore”
}
}
]

So in the end, it will look like below -

Click on next and choose default options for Hardware. Click on next and change cluster name in “General Cluster settings”. Click on next, choose appropriate security options and lastly, click on “Create cluster”.

This will create cluster in about ten to fifteen minutes.

Let’s create a Jupyter Notebook

On the EMR dashboard in the AWS management console, select “Notebooks” on the left navigation pane. And then you can click on “Create notebook.”

Jupyter Enterprise Gateway helps you attach this notebook to cluster that we created in above step. Provide appropriate name to Notebook. You would see an option to choose existing cluster and you can see cluster created in previous step. Select specific cluster from pop-up window and click on “Choose cluster” to attach cluster with Notebook. Lastly, click on “Create notebook” button. The notebook will be in pending status until the notebook server starts.

Steps to load the data

Let’s load the data

Access the notebook, by clicking on name of notebook.

Click on “Open in Jupyter”

Change the Jupyter Kennel to PySpark

Please start the spark session by running spark in a cell.

spark

We are going to use New York City Taxi and Limousine Commission (TLC) Trip Record Data (https://registry.opendata.aws/nyc-tlc-trip-records-pds/). This data contains information about trips taken by taxis and for-hire vehicles in New York City.

https://coiled.io/blog/nyc-taxi-parquet-csv-index-error/

Let’s create database and table to load the data

We’ll create an nyc database and save couple of month’s of this data into an iceberg table called taxis. Please note that, all files are now available in Parquet format and not in CSV format.

spark.sql(“CREATE DATABASE nyc”)
df = spark.read.option(“header”, True).parquet(“s3://nyc-tlc/trip data/yellow_tripdata_2020–03.parquet”)
df.writeTo(“nyc.taxis”).create()

You can run describe table command to see the columns and column types.

spark.sql(“DESCRIBE TABLE nyc.taxis”).show(truncate=False)

Load one more month of data.

df = spark.read.option(“header”, True).parquet(“s3://nyc-tlc/trip data/yellow_tripdata_2020–04.parquet”)
df.write.mode(“append”).insertInto(“nyc.taxis”)

Lets check record count of nyc.taxis table.

spark.sql(“””
SELECT COUNT(*) as cnt
FROM nyc.taxis
“””).show()

With this, we simply loaded the data where we used EMR cluster as compute engine, S3 to persist the data and Apache Iceberg as open table format that works just like a SQL table.

Expressive SQL for DML operations, Time Travel, Schema evolution, Data Rollback, data compaction, etc. are interesting features of Apache Iceberg which makes it best choice for table format if we are pursuing a data architecture where open source and open standards are a must-have.

If you want to know more about this tech stack then kindly go through below links —

--

--