Mod4 Bda
Mod4 Bda
Mod4 Bda
Introduction to Apache Spark Framework: Spark Architecture, RDDs, spark context, spark session Lazy evaluation,
spark environment parameters, Job execution in spark, Parallel Programming.
Spark is a unified analytics engine for large-scale data processing including built-in modules for SQL, streaming, machine
learning and graph processing.
What is Spark?
Apache Spark is an open-source cluster computing framework. Its primary purpose is to handle the real-time generated
data.
Spark was built on the top of the Hadoop MapReduce. It was optimized to run in memory whereas alternative approaches
like Hadoop's MapReduce writes data to and from computer hard drives. So, Spark process the data much quicker than
other alternatives.
o Fast - It provides high performance for both batch and streaming data, using a state-of-the-art DAG scheduler, a
query optimizer, and a physical execution engine.
o Easy to Use - It facilitates to write the application in Java, Scala, Python, R, and SQL. It also provides more than
80 high-level operators.
o Generality - It provides a collection of libraries including SQL and DataFrames, MLlib for machine learning,
GraphX, and Spark Streaming.
o Lightweight - It is a light unified analytics engine which is used for large scale data processing.
o Runs Everywhere - It can easily run on Hadoop, Apache Mesos, Kubernetes, standalone, or in the cloud.
Usage of Spark
o Data integration: The data generated by systems are not consistent enough to combine for analysis. To fetch
consistent data from systems we can use processes like Extract, transform, and load (ETL). Spark is used to
reduce the cost and time required for this ETL process.
o Stream processing: It is always difficult to handle the real-time generated data such as log files. Spark is capable
enough to operate streams of data and refuses potentially fraudulent operations.
o Machine learning: Machine learning approaches become more feasible and increasingly accurate due to
enhancement in the volume of data. As spark is capable of storing data in memory and can run repeated queries
quickly, it makes it easy to work on machine learning algorithms.
o Interactive analytics: Spark is able to generate the respond rapidly. So, instead of running pre-defined queries,
we can handle the data interactively.
Spark Architecture
o The Spark follows the master-slave architecture. Its cluster consists of a single master and multiple slaves.
o The Spark architecture depends upon two abstractions:
The Resilient Distributed Datasets are the group of data items that can be stored in-memory on worker nodes. Here,
o Directed Acyclic Graph is a finite direct graph that performs a sequence of computations on data. Each node is an
RDD partition, and the edge is a transformation on top of data. Here, the graph refers the navigation whereas
directed and acyclic refers to how it is done.
Driver Program
The Driver Program is a process that runs the main() function of the application and creates the SparkContext object.
The purpose of SparkContext is to coordinate the spark applications, running as independent sets of processes on a
cluster.
To run on a cluster, the SparkContext connects to a different type of cluster managers and then perform the following
tasks: -
Cluster Manager
o The role of the cluster manager is to allocate resources across applications. The Spark is capable enough of
running on a large number of clusters.
o It consists of various types of cluster managers such as Hadoop YARN, Apache Mesos and Standalone
Scheduler.
o Here, the Standalone Scheduler is a standalone spark cluster manager that facilitates to install Spark on an empty
set of machines.
Worker Node
Task
Spark Components
The Spark project consists of different types of tightly integrated components. At its core, Spark is a
computational engine that can schedule, distribute and monitor multiple applications.
Spark Core
o The Spark Core is the heart of Spark and performs the core functionality.
o It holds the components for task scheduling, fault recovery, interacting with storage systems and memory
management.
Spark SQL
o The Spark SQL is built on the top of Spark Core. It provides support for structured data.
o It allows to query the data via SQL (Structured Query Language) as well as the Apache Hive variant of SQL
called the HQL (Hive Query Language).
o It supports JDBC and ODBC connections that establish a relation between Java objects and existing databases,
data warehouses and business intelligence tools.
o It also supports various sources of data like Hive tables, Parquet, and JSON.
Spark Streaming
o Spark Streaming is a Spark component that supports scalable and fault-tolerant processing of streaming data.
o It uses Spark Core's fast scheduling capability to perform streaming analytics.
o It accepts data in mini-batches and performs RDD transformations on that data.
o Its design ensures that the applications written for streaming data can be reused to analyze batches of historical
data with little modification.
o The log files generated by web servers can be considered as a real-time example of a data stream.
MLlib
o The MLlib is a Machine Learning library that contains various machine learning algorithms.
o These include correlations and hypothesis testing, classification and regression, clustering, and principal
component analysis.
o It is nine times faster than the disk-based implementation used by Apache Mahout.
GraphX
o The GraphX is a library that is used to manipulate graphs and perform graph-parallel computations.
o It facilitates to create a directed graph with arbitrary properties attached to each vertex and edge.
o To manipulate graph, it supports various fundamental operators like subgraph, join Vertices, and aggregate
Messages
RDD
In Apache Spark, an RDD (Resilient Distributed Dataset) is the fundamental data structure and a core concept. It
represents an immutable, distributed collection of objects that can be processed in parallel. Here are some key
characteristics and features of RDDs:
Immutable: Once created, an RDD cannot be modified. Any transformations on an RDD produce a new RDD.
Distributed: RDDs are divided into partitions, which can be processed on different nodes in a cluster.
Fault-Tolerant: RDDs can automatically recover from node failures. This is achieved through lineage information, which
records the sequence of transformations that were applied to create the RDD. If a partition of an RDD is lost, it can be
recomputed using this lineage.
Lazy Evaluations: Transformations on RDDs are not executed immediately. Instead, they are lazily evaluated, meaning
they are only computed when an action (like collect or save) requires a result to be returned to the driver program.
Transformations and Actions: Transformations: Operations that create a new RDD from an existing one (e.g., map, filter,
reduceByKey). Actions: Operations that trigger the computation and return a result to the driver program or save the data
to an external storage (e.g., count, collect, saveAsTextFile).
Persistence: RDDs can be cached in memory across operations to improve performance, especially for iterative
algorithms that reuse the same dataset multiple times.
1. Creation of RDDs
From external storage: Loading data from external storage systems such as HDFS, S3, or local file systems
using methods like textFile.
rdd = sc.textFile("hdfs://path/to/file")
Transformations
Transformations create a new RDD from an existing one. They are lazy, meaning they are not executed immediately but
are instead recorded in the form of a lineage (DAG - Directed Acyclic Graph) of transformations to be executed when an
action is called. Examples include:
rdd2 = rdd.map(lambda x: x * 2)
rdd2 = rdd.filter(lambda x: x % 2 == 0)
reduceByKey: When working with key-value pairs, merges the values for each key using a function.
rdd2 = rdd.reduceByKey(lambda a, b: a + b)
Actions
Actions trigger the execution of transformations and return a result or write data to an external storage system. Examples
include:
result = rdd.collect()
count = rdd.count()
rdd.saveAsTextFile("hdfs://path/to/output")
3. Execution Model
When an action is called, Spark constructs a logical execution plan (DAG) from the lineage of transformations. This plan
is then optimized and converted into a physical execution plan consisting of tasks. Here’s how the execution process
works:
RDDs provide fault tolerance through lineage information. If a partition of an RDD is lost (due to a node failure), Spark
can recompute that partition using the lineage. The lineage tracks the series of transformations that were applied to the
initial data.
5. Persistence
RDDs can be explicitly cached in memory using the cache() or persist() methods. This is useful for iterative algorithms
where the same RDD is reused multiple times. Caching reduces the need to recompute the RDD from scratch and
improves performance.
rdd.persist()
# Initialize SparkContext
sc = SparkContext("local", "RDD Example")
In this example, the parallelize method creates an RDD from a list. The map transformation multiplies each element by 2,
and the filter transformation filters out elements that are less than or equal to 5. Finally, the collect action retrieves the
resulting elements to the driver program.
In Apache Spark, SparkContext and SparkSession are two key entry points for working with the Spark API. However,
they serve slightly different purposes and have different scopes of usage, especially with the introduction of SparkSession
in Spark 2.0.
SparkContext
SparkContext is the original entry point for Spark functionality. It allows you to configure and initialize a Spark
application and provides methods to create RDDs, access Spark services, and manage job execution.
Key Points:
Initialization: SparkContext is typically initialized directly and is a prerequisite for RDD operations.
Creating RDDs: Provides methods like parallelize and textFile to create RDDs.
Configuration: You configure Spark through a SparkConf object, which is then used to initialize SparkContext.
Example
sc = SparkContext(conf=conf)
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
sc.stop()
SparkSession
SparkSession is a unified entry point for Spark applications introduced in Spark 2.0. It encapsulates all the configurations
and functionalities of SparkContext, SQLContext, and HiveContext, providing a more convenient and consistent API.
Key Points:
Unified Interface: Combines SparkContext, SQLContext, and HiveContext into a single object.
Configuration: Configuration is done via the builder pattern, which simplifies setup.
DataFrame and SQL: Facilitates working with DataFrames and running SQL queries in addition to RDD
operations.
Backward Compatibility: Internally manages the SparkContext and provides backward compatibility with older
APIs.
Example
# Initialize SparkSession
sc = spark.sparkContext
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
df = rdd.toDF(["numbers"])
Key Differences
Scope and Functionality: SparkSession provides a higher-level API and unifies different contexts
(SparkContext, SQLContext, HiveContext). It is designed to be the single entry point for all Spark operations.
Ease of Use: SparkSession offers a simpler and more consistent interface for initializing Spark applications and
accessing different functionalities.
Modern Applications: For Spark applications written in Spark 2.0 and later, SparkSession is the preferred entry
point, while SparkContext is still accessible via SparkSession.sparkContext
Lazy Evaluation
Lazy evaluation is a key concept in Apache Spark's RDD (Resilient Distributed Dataset) API. It helps optimize the
execution of data processing tasks, ensuring efficiency and fault tolerance.
Lazy evaluation means that Spark does not immediately compute the results of transformations on RDDs. Instead, it
builds up a logical plan of transformations that will only be executed when an action is called. This allows Spark to
optimize the entire chain of transformations and avoid unnecessary computations.
1. Optimization:
o By delaying the computation, Spark can optimize the execution plan. It can combine multiple
transformations, eliminate unnecessary steps, and apply optimizations like pipelining operations.
2. Fault Tolerance:
o The lineage graph allows Spark to efficiently recompute lost data by reapplying the transformations to the
original data.
3. Efficiency:
o Spark avoids unnecessary computations by only executing transformations when needed and minimizing
data movement across the cluster.
Example
# Initialize SparkContext
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
rdd2 = rdd.map(lambda x: x * 2)
rdd4 = rdd3.map(lambda x: x + 1)
result = rdd4.collect()
sc.stop()
In this example:
Transformations map, filter, and another map are defined but not executed immediately.
When the collect action is called, Spark optimizes the transformations and executes them to produce the final
result.
1. Transformations Defined:
o rdd2 = rdd.map(lambda x: x * 2)
o rdd3 = rdd2.filter(lambda x: x > 5)
o rdd4 = rdd3.map(lambda x: x + 1)
2. Lineage Graph Constructed:
o Spark builds a logical execution plan for the transformations.
3. Action Triggers Execution:
o result = rdd4.collect()
o Spark optimizes the plan and executes the tasks across the cluster to produce the result.
Apache Spark provides a wide range of environment parameters that allow you to configure and optimize the
performance of your Spark applications. These parameters can be set in the SparkConf object, in the spark-submit
command, or in the configuration files (spark-defaults.conf, log4j.properties).
conf.set("spark.app.name", "MySparkApp")
spark.master: Specifies the master URL for the cluster. Common values include local, yarn, mesos, or a Spark
standalone cluster.
conf.set("spark.master", "local[*]")
conf.set("spark.executor.memory", "2g")
conf.set("spark.executor.cores", "2")
conf.set("spark.cores.max", "4")
3. Driver Configuration
conf.set("spark.driver.memory", "1g")
4. Resource Allocation
conf.set("spark.dynamicAllocation.enabled", "true")
conf.set("spark.dynamicAllocation.minExecutors", "1")
conf.set("spark.dynamicAllocation.maxExecutors", "10")
spark.sql.shuffle.partitions: Number of partitions to use for shuffling data for joins or aggregations.
conf.set("spark.sql.shuffle.partitions", "200")
conf.set("spark.eventLog.enabled", "true")
7. Network Configuration
spark.driver.port: Port for the driver to listen on.
conf.set("spark.driver.port", "7077")
Job execution in Apache Spark involves several steps, from defining an application and its transformations/actions to
executing those tasks across a distributed cluster.
1. SparkContext Initialization
The driver program initializes a SparkContext (or SparkSession), which establishes a connection with the Spark cluster.
The driver program creates RDDs and defines transformations. These operations are lazy and only build a lineage (DAG)
of transformations.
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
rdd2 = rdd.map(lambda x: x * 2)
rdd3 = rdd2.filter(lambda x: x > 5)
3. Triggering Actions
result = rdd3.collect()
DAG Scheduler
Stage Creation: The DAG scheduler divides the job into stages based on wide dependencies (e.g., shuffles).
Task Set Creation: Each stage is composed of tasks, each of which operates on a partition of the RDD.
Task Scheduler
Task Assignment: The task scheduler assigns tasks to executor nodes. Tasks are sent to workers based on data
locality and resource availability.
Execution: Tasks are executed on the worker nodes. Each task processes data and applies the transformations
defined in the RDD lineage.
Data Processing: Each worker reads its partition of data, applies the transformations, and produces intermediate
results.
Shuffling (if necessary): For operations that require shuffling, data is transferred between nodes.
6. Collecting Results
Once all tasks are completed:
Lineage Information: If a task fails, Spark uses the lineage information to recompute the lost data by reapplying
the transformations to the original data.
Speculative Execution: Spark can run speculative duplicate tasks for slow-running tasks to avoid stragglers.
Example:
# Initialize SparkContext
conf = SparkConf().setAppName("Job Execution Example").setMaster("local[*]")
sc = SparkContext(conf=conf)
# Trigger an action
result = rdd3.collect()
Parallel Programming
Parallel programming in Apache Spark leverages its distributed computing capabilities to process large datasets
efficiently across a cluster of machines. Spark provides a high-level API for parallel programming through Resilient
Distributed Datasets (RDDs), DataFrames, and Datasets, allowing developers to write parallel operations without dealing
with the complexities of low-level thread management.
Key Concepts
Distributed Data: RDDs are collections of data partitioned across the nodes of the cluster, enabling parallel
processing.
Transformations: Operations like map, filter, flatMap, etc., are applied in parallel on each partition.
Actions: Operations like collect, count, saveAsTextFile, etc., trigger the execution of transformations in parallel.
Data Abstractions: Higher-level abstractions built on top of RDDs, providing more optimization opportunities.
SQL Queries: Can execute SQL queries in parallel using Spark SQL.
Parallel Execution:
Task Scheduling: Spark’s task scheduler distributes tasks across the available executors in the cluster.
Data Locality: Spark tries to execute tasks where data resides to minimize data transfer.
Example of Parallel Programming with RDDs
# Initialize SparkContext
sc = SparkContext(conf=conf)
rdd2 = rdd.map(lambda x: x * 2)
result = rdd3.collect()
sc.stop()
# Initialize SparkSession
data = [(1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,), (10,)]
df = spark.createDataFrame(data, ["number"])
# Define another transformation: Filter out rows where number_times_two is less than 10
result = df3.collect()
spark.stop()