Pyspark Interview Questions: Click Here
Pyspark Interview Questions: Click Here
Pyspark Interview Questions: Click Here
© Copyright by Interviewbit
Contents
Abstracted Nodes: This means that the individual worker nodes can not be
addressed.
Spark API: PySpark provides APIs for utilizing Spark features.
Map-Reduce Model: PySpark is based on Hadoop’s Map-Reduce model this
means that the programmer provides the map and the reduce functions.
Abstracted Network: Networks are abstracted in PySpark which means that the
only possible communication is implicit communication.
When we want to run the Spark application, a driver program that has the main
function will be started. From this point, the SparkContext that we defined gets
initiated. Later on, the driver program performs operations inside the executors of
the worker nodes. Additionally, JVM will be launched using Py4J which in turn creates
JavaSparkContext. Since PySpark has default SparkContext available as “sc”, there
will not be a creation of a new SparkContext.
# --serializing.py----
from pyspark.context import SparkContext
from pyspark.serializers import MarshalSerializer
sc = SparkContext("local", "Marshal Serialization", serializer = MarshalSerializer())
print(sc.parallelize(list(range(1000))).map(lambda x: 3 * x).take(5))
sc.stop()
$SPARK_HOME/bin/spark-submit serializing.py
The output of the code would be the list of size 5 of numbers multiplied by 3:
[0, 3, 6, 9, 12]
The above code filters all the elements in the list that has ‘interview’ in the element.
The output of the above code would be:
[
"interview",
"interviewbit"
]
In this class, we count the number of elements in the spark RDDs. The output of this
code is
The above figure shows the position of cluster manager in the Spark ecosystem.
Consider a master node and multiple worker nodes present in the cluster. The master
nodes provide the worker nodes with the resources like memory, processor
allocation etc depending on the nodes requirements with the help of the cluster
manager.
PySpark supports the following cluster manager types:
Standalone – This is a simple cluster manager that is included with Spark.
Apache Mesos – This manager can run Hadoop MapReduce and PySpark apps.
Hadoop YARN – This manager is used in Hadoop2.
Kubernetes – This is an open-source cluster manager that helps in automated
deployment, scaling and automatic management of containerized apps.
local – This is simply a mode for running Spark applications on
laptops/desktops.
In-Memory Processing: PySpark’s RDD helps in loading data from the disk to the
memory. The RDDs can even be persisted in the memory for reusing the
computations.
Immutability: The RDDs are immutable which means that once created, they
cannot be modified. While applying any transformation operations on the RDDs,
a new RDD would be created.
Fault Tolerance: The RDDs are fault-tolerant. This means that whenever an
operation fails, the data gets automatically reloaded from other available
partitions. This results in seamless execution of the PySpark applications.
Lazy Evolution: The PySpark transformation operations are not performed as
soon as they are encountered. The operations would be stored in the DAG and
are evaluated once it finds the first RDD action.
Partitioning: Whenever RDD is created from any data, the elements in the RDD
are partitioned to the cores available by default.
The data in the PySpark DataFrame is distributed across different machines in the
cluster and the operations performed on this would be run parallelly on all the
machines. These can handle a large collection of structured or semi-structured data
of a range of petabytes.
An important point of using broadcast variables is that the variables are not sent to
the tasks when the broadcast function is called. They will be sent when the variables
are first required by the executors.
Accumulator variables: These variables are called updatable shared variables. They
are added through associative and commutative operations and are used for
performing counter or sum operations. PySpark supports the creation of numeric
type accumulators by default. It also has the ability to add custom accumulator
types. The custom types can be of two types:
Named Accumulators: These accumulators are visible under the “Accumulator”
tab in the PySpark web UI as shown in the image below:
Here, we will see the Accumulable section that has the sum of the Accumulator
values of the variables modified by the tasks listed in the Accumulator column
present in the Tasks table.
Unnamed Accumulators: These accumulators are not shown on the PySpark
Web UI page. It is always recommended to make use of named accumulators.
Accumulator variables can be created by using
SparkContext.longAccumulator(variable) as shown in the example below:
ac = sc.longAccumulator("sumaccumulator")
sc.parallelize([2, 23, 1]).foreach(lambda x: ac.add(x))
Depending on the type of accumulator variable data - double, long and collection,
PySpark provide DoubleAccumulator, LongAccumulator and CollectionAccumulator
respectively.
class pyspark.Sparkconf(
localdefaults = True,
_jvm = None,
_jconf = None
)
where:
- is of type boolean and indicates whether we require loading
loadDefaults
values from Java System Properties. It is True by default.
_jvm - This belongs to the class py4j.java_gateway.JVMView and is an internal
parameter that is used for passing the handle to JVM. This need not be set by the
users.
_jconf - This belongs to the class py4j.java_gateway.JavaObject . This
parameter is an option and can be used for passing existing SparkConf handles
for using the parameters.
def capitalizeWord(str):
result=""
words = str.split(" ")
for word in words:
result= result + word[0:1].upper() + word[1:len(x)] + " "
return result
Use UDF with DataFrame: The UDF can be applied on a Python DataFrame as
that acts as the built-in function of DataFrame.
Consider we have a DataFrame of stored in variable df as below:
+----------+-----------------+
|ID_COLUMN |NAME_COLUMN |
+----------+-----------------+
|1 |harry potter |
|2 |ronald weasley |
|3 |hermoine granger |
+----------+-----------------+
df.select(col("ID_COLUMN"), convertUDF(col("NAME_COLUMN"))
.alias("NAME_COLUMN") )
.show(truncate=False)
+----------+-----------------+
|ID_COLUMN |NAME_COLUMN |
+----------+-----------------+
|1 |Harry Potter |
|2 |Ronald Weasley |
|3 |Hermoine Granger |
+----------+-----------------+
UDFs have to be designed in a way that the algorithms are efficient and take less time
and space complexity. If care is not taken, the performance of the DataFrame
operations would be impacted.
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]")
.appName('InterviewBitSparkSession')
.getOrCreate()
Here,
master() – This is used for setting up the mode in which the application has to
run - cluster mode (use the master name) or standalone mode. For Standalone
mode, we use the local[x] value to the function, where x represents partition
count to be created in RDD, DataFrame and DataSet. The value of x is ideally the
number of CPU cores available.
appName() - Used for setting the application name
getOrCreate() – For returning SparkSession object. This creates a new object if it
does not exist. If an object is there, it simply returns that.
If we want to create a new SparkSession object every time, we can use the
newSession method as shown below:
import pyspark
from pyspark.sql import SparkSession
spark_session = SparkSession.newSession
In the image, we see that the data we have is the list form and post converting to
RDDs, we have it stored in different partitions.
We have the following approaches for creating PySpark RDD:
Using sparkContext.parallelize() : The parallelize() method of the
SparkContext can be used for creating RDDs. This method loads existing
collection from the driver and parallelizes it. This is a basic approach to create
RDD and is used when we have data already present in the memory. This also
requires the presence of all data on the Driver before creating RDD. Code to
create RDD using the parallelize method for the python list shown in the image
above:
list = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd=spark.sparkContext.parallelize(list)
Using sparkContext.textFile() : Using this method, we can read .txt file and
convert them into RDD. Syntax:
rdd_txt = spark.sparkContext.textFile("/path/to/textFile.txt")
We can also read csv, json, parquet and various other formats and create the RDDs.
Empty RDD with no partition using sparkContext.emptyRDD : RDD with no data
is called empty RDD. We can create such RDDs having no partitions by using
emptyRDD() method as shown in the code piece below:
empty_rdd = spark.sparkContext.emptyRDD
# to create empty rdd of string type
empty_rdd_string = spark.sparkContext.emptyRDD[String]
+-----------+----------+
| Name | Age |
+-----------+----------+
| Harry | 20 |
| Ron | 20 |
| Hermoine | 20 |
+-----------+----------+
>> df.printSchema()
root
|-- Name: string (nullable = true)
|-- Age: integer (nullable = true)
df = spark.read.csv("/path/to/file.csv")
PySpark supports csv, text, avro, parquet, tsv and many other file extensions.
startsWith() – returns boolean Boolean value. It is true when the value of the
column starts with the specified string and False when the match is not satisfied
in that column value.
endsWith() – returns boolean Boolean value. It is true when the value of the
column ends with the specified string and False when the match is not satisfied
in that column value.
Both the methods are case-sensitive.
Consider an example of the startsWith() method here. We have created a DataFrame
with 3 rows:
If we have the below code that checks for returning the rows where all the names in
the Name column start with “H”,
import org.apache.spark.sql.functions.col
df.filter(col("Name").startsWith("H")).show()
+-----------+----------+
| Name | Age |
+-----------+----------+
| Harry | 20 |
| Hermoine | 20 |
+-----------+----------+
Notice how the record with the Name “Ron” is filtered out because it does not start
with “H”.
PySpark SQL is the most popular PySpark module that is used to process structured
columnar data. Once a DataFrame is created, we can interact with data using the SQL
syntax. Spark SQL is used for bringing native raw SQL queries on Spark by using
select, where, group by, join, union etc. For using PySpark SQL, the first step is to
create a temporary table on DataFrame by using createOrReplaceTempView()
function. Post creation, the table is accessible throughout SparkSession by using sql()
method. When the SparkSession gets terminated, the temporary table will be
dropped.
For example, consider we have the following DataFrame assigned to a variable df :
+-----------+----------+----------+
| Name | Age | Gender |
+-----------+----------+----------+
| Harry | 20 | M |
| Ron | 20 | M |
| Hermoine | 20 | F |
+-----------+----------+----------+
In the below piece of code, we will be creating a temporary table of the DataFrame
that gets accessible in the SparkSession using the sql() method. The SQL queries can
be run within the method.
df.createOrReplaceTempView("STUDENTS")
df_new = spark.sql("SELECT * from STUDENTS")
df_new.printSchema()
>> df.printSchema()
root
|-- Name: string (nullable = true)
|-- Age: integer (nullable = true)
|-- Gender: string (nullable = true)
For the above example, let’s try running group by on the Gender column:
+------+------------+
|Gender|Gender_Count|
+------+------------+
| F| 1 |
| M| 2 |
+------+------------+
where,
other - Right side of the join
on - column name string used for joining
how - type of join, by default it is inner. The values can be inner, le , right, cross, full,
outer, le _outer, right_outer, le _anti, le _semi.
The join expression can be appended with where() and filter() methods for filtering
rows. We can have multiple join too by means of the chaining join() method.
Consider we have two dataframes - employee and department as shown below:
-- Employee DataFrame --
+------+--------+-----------+
|emp_id|emp_name|empdept_id |
+------+--------+-----------+
| 1| Harry| 5|
| 2| Ron | 5|
| 3| Neville| 10|
| 4| Malfoy| 20|
+------+--------+-----------+
-- Department DataFrame --
+-------+--------------------------+
|dept_id| dept_name |
+-------+--------------------------+
| 5 | Information Technology |
| 10| Engineering |
| 20| Marketting |
+-------+--------------------------+
We can inner join the Employee DataFrame with Department DataFrame to get the
department information along with employee information as:
+------+--------+-----------+-------+--------------------------+
|emp_id|emp_name|empdept_id |dept_id| dept_name |
+------+--------+-----------+-------+--------------------------+
| 1| Harry| 5| 5 | Information Technology |
| 2| Ron | 5| 5 | Information Technology |
| 3| Neville| 10| 10 | Engineering |
| 4| Malfoy| 20| 20 | Marketting |
+------+--------+-----------+-------+--------------------------+
We can also perform joins by chaining join() method by following the syntax:
df1.join(df2,["column_name"]).join(df3,df1["column_name"] == df3["column_name"]).show()
-- Address DataFrame --
+------+--------------+------+
|emp_id| city |state |
+------+--------------+------+
|1 | Bangalore | KA |
|2 | Pune | MH |
|3 | Mumbai | MH |
|4 | Chennai | TN |
+------+--------------+------+
If we want to get address details of the address along with the Employee and the
Department Dataframe, then we can run,
resultDf = empDF.join(addressDF,["emp_id"])
.join(deptDF,empDF["empdept_id"] == deptDF["dept_id"])
.show()
+------+--------+-----------+--------------+------+-------+--------------------------+
|emp_id|emp_name|empdept_id | city |state |dept_id| dept_name |
+------+--------+-----------+--------------+------+-------+--------------------------+
| 1| Harry| 5| Bangalore | KA | 5 | Information Technology |
| 2| Ron | 5| Pune | MH | 5 | Information Technology |
| 3| Neville| 10| Mumbai | MH | 10 | Engineering |
| 4| Malfoy| 20| Chennai | TN | 20 | Marketting |
+------+--------+-----------+--------------+------+-------+--------------------------+
Spark loads the data from the socket and represents it in the value column of the
DataFrame object. The df.printSchema() prints
root
|-- value: string (nullable = true)
Post data processing, the DataFrame can be streamed to the console or any other
destinations based on the requirements like Kafka, dashboards, database etc.
Conclusion
PySpark has gained immense popularity among the big data and machine learning
enthusiasts as well as in the organizations like Netflix as it exposes Spark
functionalities in python that helps developers collaborate with powerful libraries
that aids in big data processing. In this article, we have seen the most commonly
asked interview questions in PySpark that would help developers crack interviews.
References
PySpark by Examples
PySpark Documentation
Python
Css Interview Questions Laravel Interview Questions Asp Net Interview Questions