Mod4 Bda

Download as pdf or txt
Download as pdf or txt
You are on page 1of 14

4TH Module

Introduction to Apache Spark Framework: Spark Architecture, RDDs, spark context, spark session Lazy evaluation,
spark environment parameters, Job execution in spark, Parallel Programming.

Spark is a unified analytics engine for large-scale data processing including built-in modules for SQL, streaming, machine
learning and graph processing.

What is Spark?

Apache Spark is an open-source cluster computing framework. Its primary purpose is to handle the real-time generated
data.

Spark was built on the top of the Hadoop MapReduce. It was optimized to run in memory whereas alternative approaches
like Hadoop's MapReduce writes data to and from computer hard drives. So, Spark process the data much quicker than
other alternatives.

Features of Apache Spark

o Fast - It provides high performance for both batch and streaming data, using a state-of-the-art DAG scheduler, a
query optimizer, and a physical execution engine.
o Easy to Use - It facilitates to write the application in Java, Scala, Python, R, and SQL. It also provides more than
80 high-level operators.
o Generality - It provides a collection of libraries including SQL and DataFrames, MLlib for machine learning,
GraphX, and Spark Streaming.
o Lightweight - It is a light unified analytics engine which is used for large scale data processing.
o Runs Everywhere - It can easily run on Hadoop, Apache Mesos, Kubernetes, standalone, or in the cloud.

Usage of Spark

o Data integration: The data generated by systems are not consistent enough to combine for analysis. To fetch
consistent data from systems we can use processes like Extract, transform, and load (ETL). Spark is used to
reduce the cost and time required for this ETL process.
o Stream processing: It is always difficult to handle the real-time generated data such as log files. Spark is capable
enough to operate streams of data and refuses potentially fraudulent operations.
o Machine learning: Machine learning approaches become more feasible and increasingly accurate due to
enhancement in the volume of data. As spark is capable of storing data in memory and can run repeated queries
quickly, it makes it easy to work on machine learning algorithms.
o Interactive analytics: Spark is able to generate the respond rapidly. So, instead of running pre-defined queries,
we can handle the data interactively.

Spark Architecture

o The Spark follows the master-slave architecture. Its cluster consists of a single master and multiple slaves.
o The Spark architecture depends upon two abstractions:

Resilient Distributed Dataset (RDD)

Directed Acyclic Graph (DAG)

Resilient Distributed Datasets (RDD)

The Resilient Distributed Datasets are the group of data items that can be stored in-memory on worker nodes. Here,

o Resilient: Restore the data on failure.


o Distributed: Data is distributed among different nodes.
o Dataset: Group of data.

Directed Acyclic Graph (DAG)

o Directed Acyclic Graph is a finite direct graph that performs a sequence of computations on data. Each node is an
RDD partition, and the edge is a transformation on top of data. Here, the graph refers the navigation whereas
directed and acyclic refers to how it is done.

Let's understand the Spark architecture.

Driver Program

The Driver Program is a process that runs the main() function of the application and creates the SparkContext object.
The purpose of SparkContext is to coordinate the spark applications, running as independent sets of processes on a
cluster.

To run on a cluster, the SparkContext connects to a different type of cluster managers and then perform the following
tasks: -

o It acquires executors on nodes in the cluster.


o Then, it sends your application code to the executors. Here, the application code can be defined by JAR or Python
files passed to the SparkContext.
o At last, the SparkContext sends tasks to the executors to run.

Cluster Manager

o The role of the cluster manager is to allocate resources across applications. The Spark is capable enough of
running on a large number of clusters.
o It consists of various types of cluster managers such as Hadoop YARN, Apache Mesos and Standalone
Scheduler.
o Here, the Standalone Scheduler is a standalone spark cluster manager that facilitates to install Spark on an empty
set of machines.

Worker Node

o The worker node is a slave node


o Its role is to run the application code in the cluster.
Executor

o An executor is a process launched for an application on a worker node.


o It runs tasks and keeps data in memory or disk storage across them.
o It read and write data to the external sources.
o Every application contains its executor.

Task

o A unit of work that will be sent to one executor.

Spark Components
The Spark project consists of different types of tightly integrated components. At its core, Spark is a
computational engine that can schedule, distribute and monitor multiple applications.

Let's understand each Spark component in detail.

Spark Core

o The Spark Core is the heart of Spark and performs the core functionality.
o It holds the components for task scheduling, fault recovery, interacting with storage systems and memory
management.

Spark SQL

o The Spark SQL is built on the top of Spark Core. It provides support for structured data.
o It allows to query the data via SQL (Structured Query Language) as well as the Apache Hive variant of SQL
called the HQL (Hive Query Language).
o It supports JDBC and ODBC connections that establish a relation between Java objects and existing databases,
data warehouses and business intelligence tools.
o It also supports various sources of data like Hive tables, Parquet, and JSON.

Spark Streaming

o Spark Streaming is a Spark component that supports scalable and fault-tolerant processing of streaming data.
o It uses Spark Core's fast scheduling capability to perform streaming analytics.
o It accepts data in mini-batches and performs RDD transformations on that data.
o Its design ensures that the applications written for streaming data can be reused to analyze batches of historical
data with little modification.
o The log files generated by web servers can be considered as a real-time example of a data stream.

MLlib
o The MLlib is a Machine Learning library that contains various machine learning algorithms.
o These include correlations and hypothesis testing, classification and regression, clustering, and principal
component analysis.
o It is nine times faster than the disk-based implementation used by Apache Mahout.

GraphX

o The GraphX is a library that is used to manipulate graphs and perform graph-parallel computations.
o It facilitates to create a directed graph with arbitrary properties attached to each vertex and edge.
o To manipulate graph, it supports various fundamental operators like subgraph, join Vertices, and aggregate
Messages

RDD
In Apache Spark, an RDD (Resilient Distributed Dataset) is the fundamental data structure and a core concept. It
represents an immutable, distributed collection of objects that can be processed in parallel. Here are some key
characteristics and features of RDDs:
Immutable: Once created, an RDD cannot be modified. Any transformations on an RDD produce a new RDD.
Distributed: RDDs are divided into partitions, which can be processed on different nodes in a cluster.
Fault-Tolerant: RDDs can automatically recover from node failures. This is achieved through lineage information, which
records the sequence of transformations that were applied to create the RDD. If a partition of an RDD is lost, it can be
recomputed using this lineage.
Lazy Evaluations: Transformations on RDDs are not executed immediately. Instead, they are lazily evaluated, meaning
they are only computed when an action (like collect or save) requires a result to be returned to the driver program.
Transformations and Actions: Transformations: Operations that create a new RDD from an existing one (e.g., map, filter,
reduceByKey). Actions: Operations that trigger the computation and return a result to the driver program or save the data
to an external storage (e.g., count, collect, saveAsTextFile).
Persistence: RDDs can be cached in memory across operations to improve performance, especially for iterative
algorithms that reuse the same dataset multiple times.

In Apache Spark, RDDs primarily reside on the worker nodes.


Worker Nodes:
Data Storage: The partitions of an RDD are distributed across the worker nodes in a Spark cluster. Each worker node
stores one or more partitions of the RDD.
Computation: The transformations and actions on the RDD are executed on the worker nodes where the data resides. This
distributed nature allows Spark to perform parallel processing.
Driver Node:
Control Flow: The driver node is responsible for the orchestration of the Spark application. It runs the main program,
creates RDDs, and defines transformations and actions.
Job Coordination: The driver node sends tasks to the worker nodes and collects results from them.
Result Collection: While the driver node does not store the actual data of RDDs, it can collect results from actions that
require the data to be brought back to the driver, such as collect().
Explanation of how RDDs work:

1. Creation of RDDs

RDDs can be created in several ways:


 From an existing collection: Distributing a local collection (like a list or array) across the cluster using the
parallelize method.

rdd = sc.parallelize([1, 2, 3, 4, 5])

 From external storage: Loading data from external storage systems such as HDFS, S3, or local file systems
using methods like textFile.

rdd = sc.textFile("hdfs://path/to/file")

2. RDD Transformations and Actions

Transformations

Transformations create a new RDD from an existing one. They are lazy, meaning they are not executed immediately but
are instead recorded in the form of a lineage (DAG - Directed Acyclic Graph) of transformations to be executed when an
action is called. Examples include:

 map: Applies a function to each element in the RDD.

rdd2 = rdd.map(lambda x: x * 2)

 filter: Filters the elements in the RDD based on a predicate.

rdd2 = rdd.filter(lambda x: x % 2 == 0)

 reduceByKey: When working with key-value pairs, merges the values for each key using a function.

rdd2 = rdd.reduceByKey(lambda a, b: a + b)

Actions

Actions trigger the execution of transformations and return a result or write data to an external storage system. Examples
include:

 collect: Returns all elements of the RDD to the driver program.

result = rdd.collect()

 count: Returns the number of elements in the RDD.

count = rdd.count()

 saveAsTextFile: Writes the elements of the RDD to a text file.

rdd.saveAsTextFile("hdfs://path/to/output")

3. Execution Model

When an action is called, Spark constructs a logical execution plan (DAG) from the lineage of transformations. This plan
is then optimized and converted into a physical execution plan consisting of tasks. Here’s how the execution process
works:

1. Job Creation: Each action creates a job consisting of multiple stages.


2. Stage Division: Each job is divided into stages based on wide dependencies (such as shuffle operations).
3. Task Execution: Each stage is further divided into tasks based on partitions. Tasks are distributed to worker
nodes for parallel execution.
4. Fault Tolerance

RDDs provide fault tolerance through lineage information. If a partition of an RDD is lost (due to a node failure), Spark
can recompute that partition using the lineage. The lineage tracks the series of transformations that were applied to the
initial data.

5. Persistence

RDDs can be explicitly cached in memory using the cache() or persist() methods. This is useful for iterative algorithms
where the same RDD is reused multiple times. Caching reduces the need to recompute the RDD from scratch and
improves performance.

rdd.persist()

Example of RDD Usage

Here's a simple example demonstrating RDD creation, transformation, and action:

from pyspark import SparkContext

# Initialize SparkContext
sc = SparkContext("local", "RDD Example")

# Create an RDD from a list


data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

# Perform a transformation (map)


rdd2 = rdd.map(lambda x: x * 2)

# Perform another transformation (filter)


rdd3 = rdd2.filter(lambda x: x > 5)

# Perform an action (collect)


result = rdd3.collect()

print(result) # Output: [6, 8, 10]

# Stop the SparkContext


sc.stop()

In this example, the parallelize method creates an RDD from a list. The map transformation multiplies each element by 2,
and the filter transformation filters out elements that are less than or equal to 5. Finally, the collect action retrieves the
resulting elements to the driver program.

Spark Context and spark Session

In Apache Spark, SparkContext and SparkSession are two key entry points for working with the Spark API. However,
they serve slightly different purposes and have different scopes of usage, especially with the introduction of SparkSession
in Spark 2.0.

SparkContext

SparkContext is the original entry point for Spark functionality. It allows you to configure and initialize a Spark
application and provides methods to create RDDs, access Spark services, and manage job execution.

Key Points:

 Initialization: SparkContext is typically initialized directly and is a prerequisite for RDD operations.
 Creating RDDs: Provides methods like parallelize and textFile to create RDDs.
 Configuration: You configure Spark through a SparkConf object, which is then used to initialize SparkContext.

Example

from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("RDD Example").setMaster("local")

sc = SparkContext(conf=conf)

# Create an RDD from a list

data = [1, 2, 3, 4, 5]

rdd = sc.parallelize(data)

print(rdd.collect()) # Output: [1, 2, 3, 4, 5]

sc.stop()

SparkSession

SparkSession is a unified entry point for Spark applications introduced in Spark 2.0. It encapsulates all the configurations
and functionalities of SparkContext, SQLContext, and HiveContext, providing a more convenient and consistent API.

Key Points:

 Unified Interface: Combines SparkContext, SQLContext, and HiveContext into a single object.
 Configuration: Configuration is done via the builder pattern, which simplifies setup.
 DataFrame and SQL: Facilitates working with DataFrames and running SQL queries in addition to RDD
operations.
 Backward Compatibility: Internally manages the SparkContext and provides backward compatibility with older
APIs.

Example

from pyspark.sql import SparkSession

# Initialize SparkSession

spark = SparkSession.builder \ .appName("DataFrame Example") \ .master("local") \ .getOrCreate()

# Access the SparkContext via SparkSession

sc = spark.sparkContext

# Create an RDD from a list using SparkContext

data = [1, 2, 3, 4, 5]

rdd = sc.parallelize(data)

# Convert RDD to DataFrame

df = rdd.toDF(["numbers"])

df.show() # Output: DataFrame showing numbers column


spark.stop()

Key Differences

 Scope and Functionality: SparkSession provides a higher-level API and unifies different contexts
(SparkContext, SQLContext, HiveContext). It is designed to be the single entry point for all Spark operations.
 Ease of Use: SparkSession offers a simpler and more consistent interface for initializing Spark applications and
accessing different functionalities.
 Modern Applications: For Spark applications written in Spark 2.0 and later, SparkSession is the preferred entry
point, while SparkContext is still accessible via SparkSession.sparkContext

Lazy Evaluation

Lazy evaluation is a key concept in Apache Spark's RDD (Resilient Distributed Dataset) API. It helps optimize the
execution of data processing tasks, ensuring efficiency and fault tolerance.

Lazy evaluation means that Spark does not immediately compute the results of transformations on RDDs. Instead, it
builds up a logical plan of transformations that will only be executed when an action is called. This allows Spark to
optimize the entire chain of transformations and avoid unnecessary computations.

How Lazy Evaluation Works

1. Transformation vs. Action:


o Transformations: Operations that define a new RDD from an existing one (e.g., map, filter, flatMap).
Transformations are lazy and do not trigger computation.
o Actions: Operations that trigger the execution of the transformations and return a result to the driver
program or write data to an external storage system (e.g., collect, count, saveAsTextFile).
2. Building the Lineage/DAG:
o When you define transformations on an RDD, Spark constructs a lineage graph (Directed Acyclic Graph
or DAG) that tracks the series of transformations needed to compute the data.
o This lineage information is used for fault tolerance, as it allows Spark to recompute lost partitions if
needed.
3. Triggering Computation:
o The actual computation is triggered when an action is called. At this point, Spark optimizes the execution
plan and performs the necessary transformations to produce the final result.

Benefits of Lazy Evaluation

1. Optimization:
o By delaying the computation, Spark can optimize the execution plan. It can combine multiple
transformations, eliminate unnecessary steps, and apply optimizations like pipelining operations.
2. Fault Tolerance:
o The lineage graph allows Spark to efficiently recompute lost data by reapplying the transformations to the
original data.
3. Efficiency:
o Spark avoids unnecessary computations by only executing transformations when needed and minimizing
data movement across the cluster.

Example

from pyspark import SparkContext

# Initialize SparkContext

sc = SparkContext("local", "Lazy Evaluation Example")

# Create an RDD from a list

data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

# Define transformations (lazy, no computation yet)

rdd2 = rdd.map(lambda x: x * 2)

rdd3 = rdd2.filter(lambda x: x > 5)

# Define another transformation (still lazy)

rdd4 = rdd3.map(lambda x: x + 1)

# No computation happens until an action is called

print("Transformations defined but not executed.")

# Trigger the computation with an action

result = rdd4.collect()

print("Result:", result) # Output: [7, 9, 11]

# Stop the SparkContext

sc.stop()

In this example:

 Transformations map, filter, and another map are defined but not executed immediately.
 When the collect action is called, Spark optimizes the transformations and executes them to produce the final
result.

Execution Flow with Lazy Evaluation

1. Transformations Defined:
o rdd2 = rdd.map(lambda x: x * 2)
o rdd3 = rdd2.filter(lambda x: x > 5)
o rdd4 = rdd3.map(lambda x: x + 1)
2. Lineage Graph Constructed:
o Spark builds a logical execution plan for the transformations.
3. Action Triggers Execution:
o result = rdd4.collect()
o Spark optimizes the plan and executes the tasks across the cluster to produce the result.

Spark Environment Parameters

Apache Spark provides a wide range of environment parameters that allow you to configure and optimize the
performance of your Spark applications. These parameters can be set in the SparkConf object, in the spark-submit
command, or in the configuration files (spark-defaults.conf, log4j.properties).

Key Environment Parameters

1. Spark Application Configuration

spark.app.name: Sets the name of your application.

conf.set("spark.app.name", "MySparkApp")
spark.master: Specifies the master URL for the cluster. Common values include local, yarn, mesos, or a Spark
standalone cluster.

conf.set("spark.master", "local[*]")

2. Executor and Core Configuration

spark.executor.memory: Amount of memory to allocate per executor.

conf.set("spark.executor.memory", "2g")

spark.executor.cores: Number of cores to use on each executor.

conf.set("spark.executor.cores", "2")

spark.cores.max: Total number of cores to use across the cluster.

conf.set("spark.cores.max", "4")

3. Driver Configuration

 spark.driver.memory: Amount of memory to allocate for the driver.

conf.set("spark.driver.memory", "1g")

4. Resource Allocation

 spark.dynamicAllocation.enabled: Enables dynamic allocation of executors.

conf.set("spark.dynamicAllocation.enabled", "true")

spark.dynamicAllocation.minExecutors: Minimum number of executors.

conf.set("spark.dynamicAllocation.minExecutors", "1")

spark.dynamicAllocation.maxExecutors: Maximum number of executors.

conf.set("spark.dynamicAllocation.maxExecutors", "10")

5. Shuffle and Storage Configuration

 spark.sql.shuffle.partitions: Number of partitions to use for shuffling data for joins or aggregations.

conf.set("spark.sql.shuffle.partitions", "200")

6. Logging and Debugging

 spark.eventLog.enabled: Enables event logging, useful for debugging.

conf.set("spark.eventLog.enabled", "true")

7. Network Configuration
 spark.driver.port: Port for the driver to listen on.

conf.set("spark.driver.port", "7077")

Job Execution in Apache Spark

Job execution in Apache Spark involves several steps, from defining an application and its transformations/actions to
executing those tasks across a distributed cluster.

1. SparkContext Initialization

The driver program initializes a SparkContext (or SparkSession), which establishes a connection with the Spark cluster.

from pyspark import SparkContext

sc = SparkContext("local", "Example App")

2. Creating RDDs and Defining Transformations

The driver program creates RDDs and defines transformations. These operations are lazy and only build a lineage (DAG)
of transformations.

data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
rdd2 = rdd.map(lambda x: x * 2)
rdd3 = rdd2.filter(lambda x: x > 5)

3. Triggering Actions

When an action (e.g., collect, count) is called, Spark creates a job.

result = rdd3.collect()

4. DAG Scheduler and Task Scheduler

DAG Scheduler

 Stage Creation: The DAG scheduler divides the job into stages based on wide dependencies (e.g., shuffles).
 Task Set Creation: Each stage is composed of tasks, each of which operates on a partition of the RDD.

Task Scheduler

 Task Assignment: The task scheduler assigns tasks to executor nodes. Tasks are sent to workers based on data
locality and resource availability.
 Execution: Tasks are executed on the worker nodes. Each task processes data and applies the transformations
defined in the RDD lineage.

5. Execution on Worker Nodes

Worker nodes execute the tasks:

 Data Processing: Each worker reads its partition of data, applies the transformations, and produces intermediate
results.
 Shuffling (if necessary): For operations that require shuffling, data is transferred between nodes.

6. Collecting Results
Once all tasks are completed:

 Aggregation: Results from the tasks are aggregated as needed.


 Driver Program: Results are sent back to the driver program.

Fault Tolerance and Task Re-Execution

 Lineage Information: If a task fails, Spark uses the lineage information to recompute the lost data by reapplying
the transformations to the original data.
 Speculative Execution: Spark can run speculative duplicate tasks for slow-running tasks to avoid stragglers.

Example:

from pyspark import SparkConf, SparkContext

# Initialize SparkContext
conf = SparkConf().setAppName("Job Execution Example").setMaster("local[*]")
sc = SparkContext(conf=conf)

# Define RDDs and transformations


data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
rdd2 = rdd.map(lambda x: x * 2)
rdd3 = rdd2.filter(lambda x: x > 5)

# Trigger an action
result = rdd3.collect()

print(result)# Output: [6, 8, 10]

# Stop the SparkContext


sc.stop()

Parallel Programming

Parallel programming in Apache Spark leverages its distributed computing capabilities to process large datasets
efficiently across a cluster of machines. Spark provides a high-level API for parallel programming through Resilient
Distributed Datasets (RDDs), DataFrames, and Datasets, allowing developers to write parallel operations without dealing
with the complexities of low-level thread management.

Key Concepts

Resilient Distributed Datasets (RDDs):

 Distributed Data: RDDs are collections of data partitioned across the nodes of the cluster, enabling parallel
processing.
 Transformations: Operations like map, filter, flatMap, etc., are applied in parallel on each partition.
 Actions: Operations like collect, count, saveAsTextFile, etc., trigger the execution of transformations in parallel.

DataFrames and Datasets:

 Data Abstractions: Higher-level abstractions built on top of RDDs, providing more optimization opportunities.
 SQL Queries: Can execute SQL queries in parallel using Spark SQL.

Parallel Execution:

 Task Scheduling: Spark’s task scheduler distributes tasks across the available executors in the cluster.
 Data Locality: Spark tries to execute tasks where data resides to minimize data transfer.
Example of Parallel Programming with RDDs

from pyspark import SparkConf, SparkContext

# Initialize SparkContext

conf = SparkConf().setAppName("Parallel Programming Example").setMaster("local[*]")

sc = SparkContext(conf=conf)

# Create an RDD from a list of numbers

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

rdd = sc.parallelize(data, numSlices=4) # Create 4 partitions

# Define a transformation: Multiply each element by 2

rdd2 = rdd.map(lambda x: x * 2)

# Define another transformation: Filter out elements less than 10

rdd3 = rdd2.filter(lambda x: x >= 10)

# Define an action: Collect the results

result = rdd3.collect()

print("Result:", result) # Output: [10, 12, 14, 16, 18, 20]

# Stop the SparkContext

sc.stop()

Example of Parallel Programming with DataFrames

from pyspark.sql import SparkSession

# Initialize SparkSession

spark = SparkSession.builder.appName("Parallel Programming with DataFrames").getOrCreate()

# Create a DataFrame from a list of tuples

data = [(1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,), (10,)]

df = spark.createDataFrame(data, ["number"])

# Define a transformation: Multiply each number by 2

df2 = df.withColumn("number_times_two", df["number"] * 2)

# Define another transformation: Filter out rows where number_times_two is less than 10

df3 = df2.filter(df2["number_times_two"] >= 10)


# Define an action: Collect the results

result = df3.collect()

print("Result:", result) # Output: Rows where number_times_two is >= 10

# Stop the SparkSession

spark.stop()

You might also like