5 - Programming With RDDs and Dataframes

Download as pdf or txt
Download as pdf or txt
You are on page 1of 32

Programming

with RDD

1
Programming with RDD

Let us see the implementations of few RDD transformations and actions


in RDD programming with the help of an example.
Example
Consider a word count example: It counts each word appearing in a
document. Consider the

input.txt: input file.


People are not as beautiful as they look, as they walk or as they talk.
They are only as beautiful as they love, as they care as they share.

2
Programming with RDD

Follow the procedure given below to execute the given example.

Open Spark-Shell
The following command is used to open spark shell. Generally, spark is
built using Scala. Therefore, a Spark program runs on Scala
environment.

$ spark-shell

3
Programming with RDD

Look at the last line of the output “Spark context available as sc” means
the Spark container is automatically created spark context object with
the name sc. Before starting the first step of a program, the SparkContext
object should be created.
Create an RDD
First, we have to read the input file using Spark-Scala API and create an
RDD.
The following command is used for reading a file from given location.
Here, new RDD is created with the name of inputfile. The String which is
given as an argument in the textFile(“”) method is absolute path for the
input file name. However, if only the file name is given, then it means
that the input file is in the current location.

scala> val inputfile = sc.textFile("input.txt")


4
Programming with RDD

Execute Word count Transformation


❑ Our aim is to count the words in a file.
❑ Create a flat map for splitting each line into words

(flatMap(line => line.split(“ ”)).

❑ Next, read each word as a key with a value ‘1’ (<key, value> =
<word,1>)
❑ using map function (map(word => (word, 1)).

5
Programming with RDD

Finally, reduce those keys by adding values of similar keys


(reduceByKey(_+_)).

The following command is used for executing word count logic. After
executing this, you will not find any output because this is not an
action, this is a transformation; pointing a new RDD or tell spark to
what to do with the given data)

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word


=> (word, 1)).reduceByKey(_+_);

6
Programming with RDD
Current RDD
While working with the RDD, if you want to know about current RDD, then
use the following command. It will show you the description about current
RDD and its dependencies for debugging.

scala> counts.toDebugString

Caching the Transformations


You can mark an RDD to be persisted using the persist() or cache()
methods on it. The first time it is computed in an action, it will be kept in
memory on the nodes. Use the following command to store the
intermediate transformations in memory.

scala> counts.cache()
7
Programming with RDD

Applying the Action


Applying an action, like store all the transformations, results into a text
file. The String argument for saveAsTextFile(“ ”) method is the absolute
path of output folder.
Try the following command to save the output in a text file.
In the following example, ‘output’ folder is in current location.

scala> counts.saveAsTextFile("output")

8
Programming with RDD

Checking the Output


Open another terminal to go to home directory (where spark is executed
in the other terminal). Use the following commands for checking output
directory.

[hadoop@localhost ~]$ cd output/


[hadoop@localhost output]$ ls -1
part-00000
part-00001
_SUCCESS

The following command is used to see output from Part-00000 files.

[hadoop@localhost output]$ cat part-00000


9
Programming with RDD
You will see the Output

(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)

10
Programming with RDD

UN Persist the Storage


Before UN-persisting, if you want to see the storage space that is used
for this application, then use the following URL in your browser.

http://localhost:4040

Scala> counts.unpersist()

verify after unpersist

http://localhost:4040

11
Programming with RDD
SparkWordCount.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._

object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext( "local", "Word Count",
"/usr/local/spark", Nil,Map(), Map())
/* local = master URL; Word Count = application name; */
/* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
/* Map = variables to work nodes */
/*creating an inputRDD to read text file (in.txt) through Spark context*/
val input = sc.textFile("in.txt")

12
Programming with RDD
SparkWordCount.scala

val input = sc.textFile(“in.txt")


/* Transform the inputRDD into countRDD */
val count=input.flatMap(line=>line.split(" "))
.map(word=>(word, 1))
.reduceByKey(_ + _)

/* saveAsTextFile method is an action that effects on the RDD */

count.saveAsTextFile("outfile")
System.out.println("OK");
}}

13
Spark-Submit

$spark-submit [options] <app jar | python file> [app arguments]

Sr. Option Description

1 --master spark://host:port, mesos://host:port, yarn,


or local.
2 --deploy-mode deploy-mode Whether to launch the driver
program locally
("client") or on one of the worker machines
inside the cluster ("cluster") (Default: client).

3 --class Your application's main class (for Java / Scala


apps).
4 --name A name of your application.
14
Spark-Submit
$spark-submit [options] <app jar | python file> [app arguments]

Sr. Option Description

5 --jars Comma-separated list of local jars to include


on the driver and executor classpaths.
6 --packages Comma-separated list of maven coordinates
of jars to include on the driver and executor
classpaths.
7 --repositories Comma-separated list of additional remote
repositories to search for the maven
coordinates given with --packages.
8 --py-files Comma-separated list of .zip, .egg, or .py
files to place on the PYTHON PATH for Python
apps.
9 --files Comma-separated list of files to be placed in
the working directory of each executor. 15
Advance Spark Programming

Spark contains two different types of shared variables- one is broadcast


variables and second is accumulators.
❑ Broadcast variables: used to efficiently, distribute large values.
❑ Accumulators: used to aggregate the information of particular
collection.
❑ Broadcast variables are created from a variable v by calling
SparkContext.broadcast(v). The broadcast variable is a wrapper
around v, and its value can be accessed by calling the value method.
The code given below shows this:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

16
Advance Spark Programming
Accumulators
An accumulator is created from an initial value v by calling
SparkContext.accumulator(v). Tasks running on the cluster can then add
to it using the add method or the += operator (in Scala and Python).
However, they cannot read its value. Only the driver program can read
the accumulator’s value, using its value method. The code given below
shows an accumulator being used to add up the elements of an array:

scala> val accum = sc.accumulator(0)


scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

If you want to see the output of above code then use the following
command:
scala> accum.value

output
res2: Int = 10 17
Numeric RDD Operations

Spark allows you to do different operations on numeric data, using one of


the predefined API methods. Spark’s numeric operations are
implemented with a streaming algorithm that allows building the model,
one element at a time.
These operations are computed and returned as a StatusCounter object
by calling status() method.
The following is a list of numeric methods available in StatusCounter.

Sr. No Method Meaning


1 count() Number of elements in the RDD.
2 Mean() Average of the elements in the RDD.
3 Sum() Total value of the elements in the RDD.

18
Numeric RDD Operations

Sr. No Method Meaning


4 Max() Maximum value among all elements in the RDD.
5 Min() Minimum value among all elements in the RDD.
6 Variance() Variance of the elements.
7 Stdev() Standard deviation.

If you want to use only one of these methods, you can call the
corresponding method directly on RDD.

19
DataFrames

A DataFrame is an immutable distributed collection of data that is


organized into named columns analogous to a table in a relational
database.

Introduced as an experimental feature within Apache Spark 1.0 as


SchemaRDD, they were renamed to DataFrames as part of the Apache
Spark 1.3 release.

For readers who are familiar with Python Pandas DataFrame or R


DataFrame, a Spark DataFrame is a similar concept in that it allows
users to easily work with structured data (for example, data tables);
there are some differences as well so please temper your expectations.

20
DataFrames
By imposing a structure onto a distributed collection of data, this allows
Spark users to query structured data in Spark SQL or using expression
methods (instead of lambdas).
In this module, we will include code samples using both methods.

By structuring your data, this allows the Apache Spark engine –


specifically, the Catalyst Optimizer – to significantly improve the
performance of Spark queries.

In earlier APIs of Spark (that is, RDDs), executing queries in Python


could be significantly slower due to communication overhead between
the Java JVM and Py4J.

21
DataFrames

Objectives
❑Python to RDD communications
❑A quick refresh of Spark's Catalyst Optimizer
❑Speeding up PySpark with DataFrames
❑Creating DataFrames
❑Simple DataFrame queries
❑Interoperating with RDDs
❑Querying with the DataFrame API
❑Querying with Spark SQL
❑Using DataFrames for an on-time flight performance

22
Python to RDD Communications

23
Python to RDD Communications

24
Python to RDD Communications

25
Python to RDD Communications

26
Python to RDD Communications
Whenever a PySpark program is executed using RDDs, there is a
potentially large overhead to execute the job.
In the PySpark driver, the Spark Context uses Py4j to launch a JVM
using the JavaSparkContext.
Any RDD transformations are initially mapped to PythonRDD objects in
Java.
Once these tasks are pushed out to the Spark Worker(s), PythonRDD
objects launch Python subprocesses using pipes to send both code and
data to be processed within Python:
While this approach allows PySpark to distribute the processing of the
data to multiple Python subprocesses on multiple workers, as you can
see, there is a lot of context switching and communications overhead
between Python and the JVM.
An excellent resource on PySpark performance is Holden Karau's
Improving PySpark Performance: Spark performance beyond the JVM:
http://bit.ly/2bx89bn.
27
Catalyst Optimizer refresh

As noted in Module1 Spark, one of the primary reasons the Spark SQL
engine is so fast is because of the Catalyst Optimizer.
For readers with a database background, this diagram looks similar to
the logical/physical planner and cost model/cost-based optimization of
a relational database management system (RDBMS):

28
The significance of this is that, as opposed to immediately processing
the query, the Spark engine's Catalyst Optimizer compiles and optimizes
a logical plan and has a cost optimizer that determines the most
efficient physical plan generated.
As part of Project Tungsten, there are further improvements to
performance by generating byte code (code generation or codegen)
instead of interpreting each row of data.
As previously noted, the optimizer is based on functional programming
constructs and was designed with two purposes in mind:
To ease the adding of new optimization techniques and features to
Spark SQL, and to allow external developers to extend the optimizer.
(for example, adding data-source-specific rules, support for new data
types, and so on).

29
Speeding up PySpark with DataFrames
The significance of DataFrames and the Catalyst Optimizer (and Project
Tungsten) is the increase in performance of PySpark queries when
compared to non-optimized RDD queries. As shown in the following
figure, prior to the introduction of DataFrames, Python query speeds
were often twice as slow as the same Scala queries using RDD. Typically,
this slowdown in query performance was due to the communications
overhead between Python and the JVM:

30
Speeding up PySpark with DataFrames

With DataFrames, not only was there a significant improvement in


Python performance, there is now performance parity between Python,
Scala, SQL, and R.

Python can take advantage of the performance optimizations in Spark


even while the codebase for the Catalyst Optimizer is written in Scala.
Basically, it is a Python wrapper of approximately 2,000 lines of code
that allows PySpark DataFrame queries to be significantly faster.

Altogether, Python DataFrames (as well as SQL, Scala DataFrames, and


R DataFrames) are all able to make use of the Catalyst Optimizer (as
per the following updated diagram):

31
Speeding up PySpark with DataFrames

32

You might also like