5 - Programming With RDDs and Dataframes
5 - Programming With RDDs and Dataframes
5 - Programming With RDDs and Dataframes
with RDD
1
Programming with RDD
2
Programming with RDD
Open Spark-Shell
The following command is used to open spark shell. Generally, spark is
built using Scala. Therefore, a Spark program runs on Scala
environment.
$ spark-shell
3
Programming with RDD
Look at the last line of the output “Spark context available as sc” means
the Spark container is automatically created spark context object with
the name sc. Before starting the first step of a program, the SparkContext
object should be created.
Create an RDD
First, we have to read the input file using Spark-Scala API and create an
RDD.
The following command is used for reading a file from given location.
Here, new RDD is created with the name of inputfile. The String which is
given as an argument in the textFile(“”) method is absolute path for the
input file name. However, if only the file name is given, then it means
that the input file is in the current location.
❑ Next, read each word as a key with a value ‘1’ (<key, value> =
<word,1>)
❑ using map function (map(word => (word, 1)).
5
Programming with RDD
The following command is used for executing word count logic. After
executing this, you will not find any output because this is not an
action, this is a transformation; pointing a new RDD or tell spark to
what to do with the given data)
6
Programming with RDD
Current RDD
While working with the RDD, if you want to know about current RDD, then
use the following command. It will show you the description about current
RDD and its dependencies for debugging.
scala> counts.toDebugString
scala> counts.cache()
7
Programming with RDD
scala> counts.saveAsTextFile("output")
8
Programming with RDD
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
10
Programming with RDD
http://localhost:4040
Scala> counts.unpersist()
http://localhost:4040
11
Programming with RDD
SparkWordCount.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext( "local", "Word Count",
"/usr/local/spark", Nil,Map(), Map())
/* local = master URL; Word Count = application name; */
/* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
/* Map = variables to work nodes */
/*creating an inputRDD to read text file (in.txt) through Spark context*/
val input = sc.textFile("in.txt")
12
Programming with RDD
SparkWordCount.scala
count.saveAsTextFile("outfile")
System.out.println("OK");
}}
13
Spark-Submit
16
Advance Spark Programming
Accumulators
An accumulator is created from an initial value v by calling
SparkContext.accumulator(v). Tasks running on the cluster can then add
to it using the add method or the += operator (in Scala and Python).
However, they cannot read its value. Only the driver program can read
the accumulator’s value, using its value method. The code given below
shows an accumulator being used to add up the elements of an array:
If you want to see the output of above code then use the following
command:
scala> accum.value
output
res2: Int = 10 17
Numeric RDD Operations
18
Numeric RDD Operations
If you want to use only one of these methods, you can call the
corresponding method directly on RDD.
19
DataFrames
20
DataFrames
By imposing a structure onto a distributed collection of data, this allows
Spark users to query structured data in Spark SQL or using expression
methods (instead of lambdas).
In this module, we will include code samples using both methods.
21
DataFrames
Objectives
❑Python to RDD communications
❑A quick refresh of Spark's Catalyst Optimizer
❑Speeding up PySpark with DataFrames
❑Creating DataFrames
❑Simple DataFrame queries
❑Interoperating with RDDs
❑Querying with the DataFrame API
❑Querying with Spark SQL
❑Using DataFrames for an on-time flight performance
22
Python to RDD Communications
23
Python to RDD Communications
24
Python to RDD Communications
25
Python to RDD Communications
26
Python to RDD Communications
Whenever a PySpark program is executed using RDDs, there is a
potentially large overhead to execute the job.
In the PySpark driver, the Spark Context uses Py4j to launch a JVM
using the JavaSparkContext.
Any RDD transformations are initially mapped to PythonRDD objects in
Java.
Once these tasks are pushed out to the Spark Worker(s), PythonRDD
objects launch Python subprocesses using pipes to send both code and
data to be processed within Python:
While this approach allows PySpark to distribute the processing of the
data to multiple Python subprocesses on multiple workers, as you can
see, there is a lot of context switching and communications overhead
between Python and the JVM.
An excellent resource on PySpark performance is Holden Karau's
Improving PySpark Performance: Spark performance beyond the JVM:
http://bit.ly/2bx89bn.
27
Catalyst Optimizer refresh
As noted in Module1 Spark, one of the primary reasons the Spark SQL
engine is so fast is because of the Catalyst Optimizer.
For readers with a database background, this diagram looks similar to
the logical/physical planner and cost model/cost-based optimization of
a relational database management system (RDBMS):
28
The significance of this is that, as opposed to immediately processing
the query, the Spark engine's Catalyst Optimizer compiles and optimizes
a logical plan and has a cost optimizer that determines the most
efficient physical plan generated.
As part of Project Tungsten, there are further improvements to
performance by generating byte code (code generation or codegen)
instead of interpreting each row of data.
As previously noted, the optimizer is based on functional programming
constructs and was designed with two purposes in mind:
To ease the adding of new optimization techniques and features to
Spark SQL, and to allow external developers to extend the optimizer.
(for example, adding data-source-specific rules, support for new data
types, and so on).
29
Speeding up PySpark with DataFrames
The significance of DataFrames and the Catalyst Optimizer (and Project
Tungsten) is the increase in performance of PySpark queries when
compared to non-optimized RDD queries. As shown in the following
figure, prior to the introduction of DataFrames, Python query speeds
were often twice as slow as the same Scala queries using RDD. Typically,
this slowdown in query performance was due to the communications
overhead between Python and the JVM:
30
Speeding up PySpark with DataFrames
31
Speeding up PySpark with DataFrames
32