Pyspark IQ FREE Guide
Pyspark IQ FREE Guide
Pyspark IQ FREE Guide
Interview Questions
and Answers
Atul Kumar
Author & Cloud Expert
[EDITION 01]
Table of Contents
Ans)
RDD-
● It is Spark's structural square. RDDs contain all datasets and data frames.
● If a similar arrangement of data needs to be calculated again, RDDs can be efficiently
reserved.
● It's useful when you need to do low-level transformations, operations, and control on a
dataset.
● It's more commonly used to alter data with functional programming structures than with
domain-specific expressions.
DataFrame-
● It allows the structure, i.e., lines and segments, to be seen. You can think of it as a
database table.
● Optimized Execution Plan- The catalyst analyzer is used to create query plans.
● One of the limitations of data frames is Compile Time well-being, i.e. when the structure
of information is unknown, no control of information is possible.
● Also, if you're working on Python, start with DataFrames and then switch to RDDs if you
need more flexibility.
● It has the best encoding component and, unlike information edges, enables time security
in an organized manner.
● If you want a greater level of type safety at compile-time, or if you want typed JVM
objects, Dataset is the way to go.
● Also, you can leverage datasets in situations where you are looking for a chance to take
advantage of Catalyst optimization or even when you are trying to benefit from
Tungsten’s fast code generation.
Ans)
Here’s how we can create DataFrame using existing RDDs-
The toDF() function of PySpark RDD is used to construct a DataFrame from an existing RDD.
The DataFrame is constructed with the default column names "_1" and "_2" to represent the two
columns because RDD lacks columns.
—-----------------------------------------------------------------------------------------------
dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()
—-----------------------------------------------------------------------------------------------
Here, the printSchema() method gives you a database schema without column names-
root
—-----------------------------------------------------------------------------------------------
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)
—-----------------------------------------------------------------------------------------------
Use the toDF() function with column names as parameters to pass column names to the
DataFrame, as shown below.-
—-----------------------------------------------------------------------------------------------
columns = ["language","users_count"]
dfFromRDD1 = rdd.toDF(columns)
dfFromRDD1.printSchema()
—-----------------------------------------------------------------------------------------------
The above code snippet gives you the database schema with the column names-
Root
—-----------------------------------------------------------------------------------------------
|-- language: string (nullable = true)
|-- users_count: string (nullable = true)
—-----------------------------------------------------------------------------------------------
Q3) Explain the use of StructType and StructField classes in PySpark with examples.
Ans)
The StructType and StructField classes in PySpark are used to define the schema to the
DataFrame and create complex columns such as nested struct, array, and map columns.
StructType is a collection of StructField objects that determines column name, column data
type, field nullability, and metadata.
● To define the columns, PySpark offers the pyspark.sql.types import StructField class,
which has the column name (String), column type (DataType), nullable column
(Boolean), and metadata (MetaData).
—-----------------------------------------------------------------------------------------------
Q4) What are the different ways to handle row duplication in a PySpark DataFrame?
Ans)
There are two ways to handle row duplication in PySpark dataframes. The distinct() function in
PySpark is used to drop/remove duplicate rows (all columns) from a DataFrame, while
dropDuplicates() is used to drop rows based on one or more columns.
Here’s an example showing how to utilize the distinct() and dropDuplicates() methods-
First, we need to create a sample dataframe.
—-----------------------------------------------------------------------------------------------
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
spark = SparkSession.builder.appName(‘K21Academy’).getOrCreate()
data = [("James", "Sales", 3000), \
("Michael", "Sales", 4600), \
("Robert", "Sales", 4100), \
("Maria", "Finance", 3000), \
("James", "Sales", 3000), \
("Scott", "Finance", 3300), \
("Jen", "Finance", 3900), \
("Jeff", "Marketing", 3000), \
("Kumar", "Marketing", 2000), \
("Saif", "Sales", 4100) \
]
column= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = data, schema = column)
df.printSchema()
df.show(truncate=False)
—-----------------------------------------------------------------------------------------------
Output-
The record with the employer name Robert contains duplicate rows in the table above. As we
can see, there are two rows with duplicate values in all fields and four rows with duplicate
values in the department and salary columns.
Below is the entire code for removing duplicate rows-
—-----------------------------------------------------------------------------------------------
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
spark = SparkSession.builder.appName('K21Academy').getOrCreate()
data = [("James", "Sales", 3000), \
("Michael", "Sales", 4600), \
("Robert", "Sales", 4100), \
("Maria", "Finance", 3000), \
("James", "Sales", 3000), \
("Scott", "Finance", 3300), \
("Jen", "Finance", 3900), \
("Jeff", "Marketing", 3000), \
("Kumar", "Marketing", 2000), \
("Saif", "Sales", 4100) \
]
column= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = data, schema = column)
df.printSchema()
df.show(truncate=False)
#Distinct
distinctDF = df.distinct()
print("Distinct count: "+str(distinctDF.count()))
distinctDF.show(truncate=False)
#Drop duplicates
df2 = df.dropDuplicates()
print("Distinct count: "+str(df2.count()))
df2.show(truncate=False)
#Drop duplicates on selected columns
dropDisDF = df.dropDuplicates(["department","salary"])
print("Distinct count of department salary : "+str(dropDisDF.count()))
dropDisDF.show(truncate=False)
}
—-----------------------------------------------------------------------------------------------
Q5) Explain PySpark UDF with the help of an example.
Ans)
The most important aspect of Spark SQL & DataFrame is PySpark UDF (i.e., User Defined
Function), which is used to expand PySpark's built-in capabilities. UDFs in PySpark work
similarly to UDFs in conventional databases. We write a Python function and wrap it in PySpark
SQL udf() or register it as udf and use it on DataFrame and SQL, respectively, in the case of
PySpark.
Output-
The next step is creating a Python function. The code below generates the convertCase()
method, which accepts a string parameter and turns every word's initial letter to a capital letter.
—-----------------------------------------------------------------------------------------------
def convertCase(str):
resStr=""
arr = str.split(" ")
for x in arr:
resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
return resStr
—-----------------------------------------------------------------------------------------------
Q6) Discuss the map() transformation in PySpark DataFrame with the help of an example.
Ans)
PySpark map or the map() function is an RDD transformation that generates a new RDD by
applying 'lambda', which is the transformation function, to each RDD/DataFrame element. RDD
map() transformations are used to perform complex operations such as adding a column,
changing a column, converting data, and so on. Map transformations always produce the same
number of records as the input.
We are adding a new element having value 1 for each element in this PySpark map() example,
and the output of the RDD is PairRDDFunctions, which has key-value pairs, where we have a
word (String type) as Key and 1 (Int type) as Value.
—-----------------------------------------------------------------------------------------------
rdd2=rdd.map(lambda x: (x,1))
for element in rdd2.collect():
print(element)
—-----------------------------------------------------------------------------------------------
Output-
Q7) What do you mean by ‘joins’ in PySpark DataFr
joins?
Ans)
Joins in PySpark are used to join two DataFrames together, and by linking them together, one
may join several DataFrames. INNER Join, LEFT OUTER Join, RIGHT OUTER Join, LEFT
ANTI Join, LEFT SEMI Join, CROSS Join, and SELF Join are among the SQL join types it
supports.
The join() procedure accepts the following parameters and returns a DataFrame-
Ans)
PySpark ArrayType is a collection data type that extends PySpark's DataType class, which is
the superclass for all kinds. The types of items in all ArrayType elements should be the same.
The ArraType() method may be used to construct an instance of an ArrayType. It accepts two
arguments: valueType and one optional argument valueContainsNull, which specifies whether a
value can accept null and is set to True by default. valueType should extend the DataType class
in PySpark.
—-----------------------------------------------------------------------------------------------
from pyspark.sql.types import StringType, ArrayType
arrayCol = ArrayType(StringType(),False)
—-----------------------------------------------------------------------------------------------
Ans)
Using one or more partition keys, PySpark partitions a large dataset into smaller parts. When
we build a DataFrame from a file or table, PySpark creates the DataFrame in memory with a
specific number of divisions based on specified criteria. Transformations on partitioned data run
quicker since each partition's transformations are executed in parallel. Partitioning in memory
(DataFrame) and partitioning on disc (File system) are both supported by PySpark.
Q10) What is meant by PySpark MapType? How can you create a MapType using
StructType?
Ans)
PySpark MapType accepts two mandatory parameters- keyType and valueType, and one
optional boolean argument valueContainsNull.
Here’s how to create a MapType with PySpark StructType and StructField. The StructType()
accepts a list of StructFields, each of which takes a fieldname and a value type.
—-----------------------------------------------------------------------------------------------
from pyspark.sql.types import StructField, StructType, StringType, MapType
schema = StructType([
StructField('name', StringType(), True),
StructField('properties', MapType(StringType(),StringType()),True)
])
Now, using the preceding StructType structure, let's construct a DataFrame-
spark= SparkSession.builder.appName('PySpark StructType StructField').getOrCreate()
dataDictionary = [
('James',{'hair':'black','eye':'brown'}),
('Michael',{'hair':'brown','eye':None}),
('Robert',{'hair':'red','eye':'black'}),
('Washington',{'hair':'grey','eye':'grey'}),
('Jefferson',{'hair':'brown','eye':''})
]
df = spark.createDataFrame(data=dataDictionary, schema = schema)
df.printSchema()
df.show(truncate=False)
—-----------------------------------------------------------------------------------------------
Output-
Q11) How can PySpark DataFrame be converted to Pandas DataFrame?
Ans)
First, you need to learn the difference between the PySpark and Pandas. The key difference
between Pandas and PySpark is that PySpark's operations are quicker than Pandas' because
of its distributed nature and parallel execution over several cores and computers.
In other words, pandas use a single node to do operations, whereas PySpark uses several
computers.
You'll need to transfer the data back to Pandas DataFrame after processing it in PySpark so
that you can use it in Machine Learning apps or other Python programs.
Below are the steps to convert PySpark DataFrame into Pandas DataFrame-
The next step is to convert this PySpark dataframe into Pandas dataframe.
To convert a PySpark DataFrame to a Python Pandas DataFrame, use the toPandas() function.
toPandas() gathers all records in a PySpark DataFrame and delivers them to the driver
software; it should only be used on a short percentage of the data. When using a bigger
dataset, the application fails due to a memory error.
Q12) With the help of an example, show how to employ PySpark ArrayType.
Ans)
PySpark ArrayType is a data type for collections that extends PySpark's DataType class. The
types of items in all ArrayType elements should be the same.
The ArrayType() method may be used to construct an instance of an ArrayType. It accepts two
arguments: valueType and one optional argument valueContainsNull, which specifies whether a
value can accept null and is set to True by default. valueType should extend the DataType class
in PySpark.
—-----------------------------------------------------------------------------------------------
from pyspark.sql.types import StringType, ArrayType
arrayCol = ArrayType(StringType(),False)
—-----------------------------------------------------------------------------------------------
The above example generates a string array that does not allow null values.
Ans)
The pivot() method in PySpark is used to rotate/transpose data from one column into many
Dataframe columns and back using the unpivot() function (). Pivot() is an aggregation in which
the values of one of the grouping columns are transposed into separate columns containing
different data.
To determine the entire amount of each product's exports to each nation, we'll group by
Product, pivot by Country, and sum by Amount.
—-----------------------------------------------------------------------------------------------
pivotDF = df.groupBy("Product").pivot("Country").sum("Amount")
pivotDF.printSchema()
pivotDF.show(truncate=False)
—-----------------------------------------------------------------------------------------------
This will convert the nations from DataFrame rows to columns, resulting in the output seen
below. Wherever data is missing, it is assumed to be null by default.
Ans) Broadcast variables in PySpark are read-only shared variables that are stored and
accessible on all nodes in a cluster so that processes may access or use them. Instead of
sending this information with each job, PySpark uses efficient broadcast algorithms to distribute
broadcast variables among workers, lowering communication costs.
The broadcast(v) function of the SparkContext class is used to generate a PySpark Broadcast.
This method accepts the broadcast parameter v.
Generating broadcast in PySpark Shell:
—-----------------------------------------------------------------------------------------------
broadcastVariable = sc.broadcast(Array(0, 1, 2, 3))
broadcastVariable.value
PySpark RDD Broadcast variable example
spark=SparkSession.builder.appName('SparkByExample.com').getOrCreate()
states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)
data = [("James","Smith","USA","CA"),
("Michael","Rose","USA","NY"),
("Robert","Williams","USA","CA"),
("Maria","Jones","USA","FL") ]
rdd = spark.sparkContext.parallelize(data)
def state_convert(code):
return broadcastState.value[code]
res = rdd.map(lambda a: (a[0],a[1],a[2],state_convert(a{3]))).collect()
print(res)
PySpark DataFrame Broadcast variable example
spark=SparkSession.builder.appName('PySpark broadcast variable').getOrCreate()
states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)
data = [("James","Smith","USA","CA"),
("Michael","Rose","USA","NY"),
("Robert","William","USA","CA"),
("Maria","Jones","USA","FL")
]
columns = ["firstname","lastname","country","state"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)
def state_convert(code):
return broadcastState.value[code]
res = df.rdd.map(lambda a: (a[0],a[1],a[2],state_convert(a[3]))).toDF(column)
res.show(truncate=False)
—-----------------------------------------------------------------------------------------------
2. PySpark Coding Interview Questions
In this section, you will find PySpark code interview questions and answers.
Q15) You have a cluster of ten nodes with each node having 24 CPU cores. The following
code works, but it may crash on huge data sets, or at the very least, it may not take
advantage of the cluster's full processing capabilities. Which aspect is the most difficult
to alter, and how would you go about doing so?
Ans)
—-----------------------------------------------------------------------------------------------
def cal(sparkSession: SparkSession): Unit = { val NumNode = 10 val userActivityRdd:
RDD[UserActivity] = readUserActivityData(sparkSession) . repartition(NumNode) val result =
userActivityRdd .map(e => (e.userId, 1L)) . reduceByKey(_ + _) result .take(1000) }
—-----------------------------------------------------------------------------------------------
The repartition command creates ten partitions regardless of how many of them were loaded.
On large datasets, they might get fairly huge, and they'll almost certainly outgrow the RAM
allotted to a single executor.
In addition, each executor can only have one partition. This means that just ten of the 240
executors are engaged (10 nodes with 24 cores, each running one executor).
If the number is set exceptionally high, the scheduler's cost in handling the partition grows,
lowering performance. It may even exceed the execution time in some circumstances,
especially for extremely tiny partitions.
The optimal number of partitions is between two and three times the number of executors. In the
given scenario, 600 = 10 × 24 x 2.5 divisions would be appropriate.
Q16) Explain the following code and what output it will yield-
case class User(uId: Long, uName: String) case class UserActivity(uId: Long,
activityTypeId: Int, timestampEpochSec: Long) val LoginActivityTypeId = 0 val
LogoutActivityTypeId = 1 private def readUserData(sparkSession: SparkSession):
RDD[User] = { sparkSession.sparkContext.parallelize( Array( User(1, "Doe, John"),
User(2, "Doe, Jane"), User(3, "X, Mr.")) ) } private def readUserActivityData(sparkSession:
SparkSession): RDD[UserActivity] = { sparkSession.sparkContext.parallelize( Array(
UserActivity(1, LoginActivityTypeId, 1514764800L), UserActivity(2, LoginActivityTypeId,
1514808000L), UserActivity(1, LogoutActivityTypeId, 1514829600L), UserActivity(1,
LoginActivityTypeId, 1514894400L)) ) } def calculate(sparkSession: SparkSession): Unit =
{ val userRdd: RDD[(Long, User)] = readUserData(sparkSession).map(e => (e.userId, e))
val userActivityRdd: RDD[(Long, UserActivity)] =
readUserActivityData(sparkSession).map(e => (e.userId, e)) val result = userRdd
.leftOuterJoin(userActivityRdd) .filter(e => e._2._2.isDefined && e._2._2.get.activityTypeId
== LoginActivityTypeId) .map(e => (e._2._1.uName, e._2._2.get.timestampEpochSec))
.reduceByKey((a, b) => if (a < b) a else b) result .foreach(e => println(s"${e._1}: ${e._2}")) }
Ans)
The primary function, calculate, reads two pieces of data. (They are given in this case from a
constant inline data structure that is transformed to a distributed dataset using parallelize.) Each
of them is transformed into a tuple by the map, which consists of a userId and the item itself. To
combine the two datasets, the userId is utilized.
All users' login actions are filtered out of the combined dataset. The uName and the event
timestamp are then combined to make a tuple.
This is eventually reduced to merely the initial login record per user, which is then sent to the
console.
Ans)
This is how the code looks:
—-----------------------------------------------------------------------------------------------
def calculate(sparkSession: SparkSession): Unit = {
val UIdColName = "uId"
val UNameColName = "uName"
val CountColName = "totalEventCount"
val userRdd: DataFrame = readUserData(sparkSession)
val userActivityRdd: DataFrame = readUserActivityData(sparkSession)
val result = userRdd
.repartition(col(UIdColName))
.join(userActivityRdd, UIdColName)
.select(col(UNameColName))
.groupBy(UNameColName)
.count()
.withColumnRenamed("count", CountColName)
result.show()
}
—-----------------------------------------------------------------------------------------------
Q18) Please indicate which parts of the following code will run on the master and which
parts will run on each worker node.
The driver application is responsible for calling this function. The DAG is defined by the
assignment to the result value, as well as its execution, which is initiated by the collect()
operation. The worker nodes handle all of this (including the logic of the method
mapDateTime2Date). Because the result value that is gathered on the master is an array, the
map performed on this value is also performed on the master.
Q19) What are the elements used by the GraphX library, and how are they generated from
an RDD? To determine page rankings, fill in the following code-
Ans)
Vertex and Edge objects are supplied to the Graph object as RDDs of type RDD[VertexId, VT]
and RDD[Edge[ET]] respectively (where VT and ET are any user-defined types associated with
a given Vertex or Edge). For the Edge type, the constructor is Edge[ET](srcId: VertexId, dstId:
VertexId, attr: ET). VertexId is just an alias for Long.
3. PySpark Interview Questions for Experienced Data Engineers
Let us dive into a pool of pyspark advanced interview questions and answers.
Q20) Under what scenarios are Client and Cluster modes used for deployment?
Ans)
Cluster mode should be utilized for deployment if the client computers are not near the cluster.
This is done to prevent the network delay that would occur in Client mode while communicating
between executors. In the case of Client mode, if the machine goes offline, the entire operation
is lost.
Client mode can be utilized for deployment if the client computer is located within the cluster.
There will be no network latency concerns because the computer is part of the cluster, and the
cluster's maintenance is already taken care of, so there is no need to be concerned in the event
of a failure.
Only batch-wise data processing is done Apache Spark can handle data in both real-
using MapReduce. time and batch mode.
The data is stored in HDFS (Hadoop Spark saves data in memory (RAM), making
Distributed File System), which takes a long data retrieval quicker and faster when
time to retrieve. needed.
Q22) Write a spark program to check whether a given keyword exists in a huge text file or
not.
Ans)
To check whether a given keyword exists in a huge text file using PySpark, you can use the
filter and count methods to efficiently process the data:
—-----------------------------------------------------------------------------------------------
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("KeywordSearch").getOrCreate()
sc = spark.sparkContext
lines = sc.textFile("sample_file.txt")
keyword = "my_keyword"
isExist = lines.filter(lambda line: keyword in line).count()
print("Found" if isExist > 0 else "Not Found")
—-----------------------------------------------------------------------------------------------
Ans)
Spark executors have the same fixed core count and heap size as the applications created in
Spark. The heap size relates to the memory used by the Spark executor, which is controlled by
the -executor-memory flag's property spark.executor.memory. On each worker node where
Spark operates, one executor is assigned to it. The executor memory is a measurement of the
memory utilized by the application's worker node.
Memory management, task monitoring, fault tolerance, storage system interactions, work
scheduling, and support for all fundamental I/O activities are all performed by Spark Core.
Additional libraries on top of Spark Core enable a variety of SQL, streaming, and machine-
learning applications.
● Fault Recovery
● Interactions between memory management and storage systems
● Monitoring, scheduling, and distributing jobs
● Fundamental I/O functionss
Q25) What are some of the drawbacks of incorporating Spark into applications?
Ans)
Even though Spark is a strong data processing engine, there are certain drawbacks to utilizing it
in applications.
● Spark can be a constraint for cost-effective large data processing since it uses "in-
memory" calculations.
● When working in cluster mode, files on the path of the local filesystem must be available
at the same place on all worker nodes, as the task execution shuffles across different
worker nodes based on resource availability. All worker nodes must copy the files, or a
separate network-mounted file-sharing system must be installed.
Q26) How can data transfers be kept to a minimum while using PySpark?
Ans)
The process of shuffling corresponds to data transfers. Spark applications run quicker and more
reliably when these transfers are minimized. There are quite several approaches that may be
used to reduce them. They are as follows:
● Using broadcast variables improves the efficiency of joining big and small RDDs.
● Accumulators are used to update variable values in a parallel manner during execution.
● Another popular method is to prevent operations that cause these reshuffles.
Q27) What are Sparse Vectors? What distinguishes them from dense vectors?
Ans)
Sparse vectors are made up of two parallel arrays, one for indexing and the other for storing
values. These vectors are used to save space by storing non-zero values. E.g:-
—----------------------------------------------------------------------------------------------------------------------------
val sparseVec: Vector = Vectors.sparse(5, Array(0, 4), Array(1.0, 2.0))
—----------------------------------------------------------------------------------------------------------------------------
The vector in the above example is of size 5, but the non-zero values are only found at indices 0
and 4.
When there are just a few non-zero values, sparse vectors come in handy. If there are just a few
zero values, dense vectors should be used instead of sparse vectors, as sparse vectors would
create indexing overhead, which might affect performance.
Ans)
The partition of a data stream's contents into batches of X seconds, known as DStreams, is the
basis of Spark Streaming. These DStreams allow developers to cache data in memory, which
may be particularly handy if the data from a DStream is utilized several times. The cache()
function or the persist() method with proper persistence settings can be used to cache data. For
input streams receiving data through networks such as Kafka, Flume, and others, the default
persistence level setting is configured to achieve data replication on two nodes to achieve fault
tolerance.
● Cost-effectiveness: Because Spark calculations are costly, caching aids in data reuse,
which leads to reuse computations, lowering the cost of operations.
● Time-saving: By reusing computations, we may save a lot of time.
● More Jobs Achieved: Worker nodes may perform/execute more jobs by reducing
computation execution time.
Ans)
Spark RDD is extended with a robust API called GraphX, which supports graphs and graph-
based calculations. The Resilient Distributed Property Graph is an enhanced property of Spark
RDD that is a directed multi-graph with many parallel edges. User-defined characteristics are
associated with each edge and vertex. Multiple connections between the same set of vertices
are shown by the existence of parallel edges. GraphX offers a collection of operators that can
allow graph computing, such as subgraph, mapReduceTriplets, joinVertices, and so on. It also
offers a wide number of graph builders and algorithms for making graph analytics chores easier.
Ans)
According to the UNIX Standard Streams, Apache Spark supports the pipe() function on RDDs,
which allows you to assemble distinct portions of jobs that can use any language. The RDD
transformation may be created using the pipe() function, and it can be used to read each
element of the RDD as a String. These may be altered as needed, and the results can be
presented as Strings.
Q31) What are the various levels of persistence that exist in PySpark?
Ans)
Spark automatically saves intermediate data from various shuffle processes. However, it is
advised to use the RDD's persist() function. There are many levels of persistence for storing
RDDs on memory, disc, or both, with varying levels of replication. The following are the
persistence levels available in Spark:
● MEMORY ONLY: This is the default persistence level, and it's used to save RDDs on
the JVM as deserialized Java objects. If the RDDs are too large to fit in memory, the
partitions are not cached and must be recomputed as needed.
● MEMORY AND DISK: On the JVM, the RDDs are saved as deserialized Java objects. If
memory is inadequate, partitions that do not fit in memory will be kept on disc, and data
will be retrieved from the drive as needed.
● MEMORY ONLY SER: The RDD is stored as One Byte per partition serialized Java
Objects.
● DISK ONLY: RDD partitions are only saved on disc.
● OFF HEAP: This level is similar to MEMORY ONLY SER, except that the data is saved
in off-heap memory.
The persist() function has the following syntax for employing persistence levels:
df.persist(StorageLevel.)
Ans)
Suppose you have the following details regarding the cluster:
No. of nodes = 10
No. of cores in each node = 15 cores
RAM of each node = 61GB
No. of cores = How many concurrent tasks the executor can handle.
Ans)
Yes, there is an API for checkpoints in Spark. The practice of checkpointing makes streaming
apps more immune to errors. We can store the data and metadata in a checkpointing directory.
If there’s a failure, the spark may retrieve this data and resume where it left off.
2. Data checkpointing: Because some of the stateful operations demand it, we save the
RDD to secure storage. The RDD for the next batch is defined by the RDDs from
previous batches in this case.
Q34) In Spark, how would you calculate the total number of unique words?
Ans)
Ans)
PySpark is a specialized in-memory distributed processing engine that enables you to handle
data in a distributed fashion effectively.
Let us take a look at PySpark interview questions and answers related to Data Science.
Ans)
Ans)
Some of the major advantages of using PySpark are-
● Writing parallelized code is effortless.
● Keeps track of synchronization points and errors.
● Has a lot of useful built-in algorithms.
Ans)
PySpark allows you to create custom profiles that may be used to build predictive models. In
general, profilers are calculated using the minimum and maximum values of each column. It is
utilized as a valuable data review tool to ensure that the data is accurate and appropriate for
future usage.
Q40) List some recommended practices for making your PySpark data science workflows
better.
Ans)
● Avoid dictionaries: If you use Python data types like dictionaries, your code might not
be able to run in a distributed manner. Consider adding another column to a dataframe
that may be used as a filter instead of utilizing keys to index entries in a dictionary. This
proposal also applies to Python types that aren't distributable in PySpark, such as lists.
● Limit the use of Pandas: toPandas causes all data to be loaded into memory on the
driver node, preventing operations from being run in a distributed manner. When data
has previously been aggregated, and you wish to utilize conventional Python plotting
tools, this method is appropriate, but it should not be used for larger data frames.
● Minimize eager operations: It's best to avoid eager operations that draw whole data
frames into memory if you want your pipeline to be as scalable as possible. Reading in
CSVs, for example, is an eager activity, thus I stage the dataframe to S3 as Parquet
before utilizing it in further pipeline steps.
5. Advanced PySpark Interview Questions and Answers
Ans)
● PySpark SQL is a structured data library for Spark. PySpark SQL, in contrast to the
PySpark RDD API, offers additional detail about the data structure and operations. It
comes with a programming paradigm- ‘DataFrame.’
● A DataFrame is an immutable distributed columnar data collection. DataFrames can
process huge amounts of organized data (such as relational databases) and semi-
structured data (JavaScript Object Notation or JSON).
● After creating a dataframe, you can interact with data using SQL syntax/queries.
● The first step in using PySpark SQL is to use the createOrReplaceTempView() function
to create a temporary table on DataFrame. The table is available throughout
SparkSession via the sql() method. You can delete the temporary table by ending the
SparkSession.
Ans)
Persisting (or caching) a dataset in memory is one of PySpark's most essential features. The
different levels of persistence in PySpark are as follows-
Level Purpose
This level stores deserialized Java objects in
MEMORY_ONLY the JVM. It is the default persistence level in
PySpark.
Ans)
● A streaming application must be available 24 hours a day, seven days a week, and must
be resistant to errors external to the application code (e.g., system failures, JVM
crashes, etc.)
● Metadata checkpointing allows you to save the information that defines the streaming
computation to a fault-tolerant storage system like HDFS. This helps to recover data
from the failure of the streaming application's driver node.
● Data checkpointing entails saving the created RDDs to a secure location. Several
stateful computations combining data from different batches require this type of
checkpoint.
Q44) In PySpark, how would you determine the total number of unique words?
Ans)
● Open the text file in RDD mode:
—-----------------------------------------------------------------
sc.textFile(“hdfs://Hadoop/user/test_file.txt”);
—-----------------------------------------------------------------
● Write a function that converts each line into a single word:
—-----------------------------------------------------------------
def toWords(line):
return line.split();
—-----------------------------------------------------------------
● Run the toWords function on each member of the RDD in Spark:
—-----------------------------------------------------------------
words = line.flatMap(toWords);
—-----------------------------------------------------------------
● Generate a (key, value) for each word:
—-----------------------------------------------------------------
def toTuple(word):
return (word, 1);
wordTuple = words.map(toTuple);
—-----------------------------------------------------------------
Q45) Explain PySpark Streaming. How do you use the TCP/IP Protocol to stream data
Ans)
● Spark Streaming is a feature of the core Spark API that allows for scalable, high-
throughput, and fault-tolerant live data stream processing.
● It entails data ingestion from various sources, including Kafka, Kinesis, TCP
connections, and data processing with complicated algorithms using high-level functions
like map, reduce, join, and window.
We can use the readStream.format("socket") method of the Spark session object for reading
data from a TCP socket and specifying the streaming source host and port as parameters, as
illustrated in the code below:
—-----------------------------------------------------------------
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream("localhost", 9999)
—-----------------------------------------------------------------
● The lineage graph recompiles RDDs on-demand and restores lost data from persisted
RDDs.
● An RDD lineage graph helps you to construct a new RDD or restore data from a lost
persisted RDD.
● It's created by applying modifications to the RDD and generating a consistent execution
plan.
Ans)
● User-Defined Functions- To extend the Spark functions, you can define your own
column-based transformations.
● Standard JDBC/ODBC Connectivity- Spark SQL libraries allow you to connect to
Spark SQL using regular JDBC/ODBC connections and run queries (table operations)
on structured data.
● Data Transformations- For transformations, Spark's RDD API offers the highest quality
performance. Spark takes advantage of this functionality by converting SQL queries to
RDDs for transformations.
● Performance- Due to its in-memory processing, Spark SQL outperforms Hadoop by
allowing for more iterations over datasets.
● Relational Processing- Spark brought relational processing capabilities to its functional
programming capabilities with the advent of SQL.
Ans)
● Apache Spark relies heavily on the Catalyst optimizer. It improves structural queries
expressed in SQL or via the DataFrame/Dataset APIs, reducing program runtime and
cutting costs.
● The Spark Catalyst optimizer supports both rule-based and cost-based optimization.
● Rule-based optimization involves a set of rules to define how to execute the query.
● Cost-based optimization involves developing several plans using rules and then
calculating their costs.
● Catalyst optimizer also handles various Big data challenges like semistructured data and
advanced analytics.
Ans)
Property Operators- These operators create a new graph with the user-defined map function
modifying the vertex or edge characteristics. In these operators, the graph structure is
unaltered. This is a significant feature of these operators since it allows the generated graph to
maintain the original graph's structural indices.
Structural Operators- GraphX currently only supports a few widely used structural operators.
The reverse operator creates a new graph with reversed edge directions. The subgraph
operator returns a graph with just the vertices and edges that meet the vertex predicate. The
mask operator creates a subgraph by returning a graph with all of the vertices and edges found
in the input graph. The groupEdges operator merges parallel edges.
Join Operators- The join operators allow you to join data from external collections (RDDs) to
existing graphs. For example, you might want to combine new user attributes with an existing
graph or pull vertex properties from one graph into another.
Q50) Consider the following scenario: you have a large text file. How will you use
PySpark to see if a specific keyword exists?
Ans)
—-----------------------------------------------------------------
lines = sc.textFile(“hdfs://Hadoop/user/test_file.txt”);
def isFound(line):
if line.find(“my_keyword”) > -1
return 1
return 0
foundBits = lines.map(isFound);
sum = foundBits.reduce(sum);
if sum > 0:
print “Found”
else:
print “Not Found”;
—-----------------------------------------------------------------
6. PySpark Scenario Based Interview Questions for Professionals
Q51) The given file has a delimiter ~|. How will you load it as a spark DataFrame?
Ans)
To load a file with a custom delimiter ~| into a Spark DataFrame using PySpark, you can use the
following code:
—-----------------------------------------------------------------—--------------------------------------------------------
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("scenario based").getOrCreate()
df = spark.read.option("delimiter", "~|").csv("input.csv", header=True)
df.show(truncate=False)
—-----------------------------------------------------------------—--------------------------------------------------------
Q52) How will you merge two files –File1 and File2 –into a single DataFrame if they have
different schemas?
File -1:
Name|Age
Azarudeen, Shahul|25
Michel, Clarke|26
Virat, Kohli|28
Andrew, Simond|37
File -2:
Name|Age|Gender
Rabindra, Tagore |32|Male
Madona, Laure | 59|Female
Flintoff, David|12|Male
Ammie, James| 20|Female
Ans)
—-----------------------------------------------------------------—--------------------------------------------------------
import findspark
findspark.init()
from pyspark.sql import SparkSession, types
spark = SparkSession.builder.master("local").appName('Modes of Dataframereader')\
.getorCreate()
sc=spark.sparkContext
df1=spark.read.option("delimiter","|").csv('input.csv')
df2=spark.read.option("delimiter","|").csv("input2.csv",header=True)
from pyspark.sql.functions import lit
df_add=df1.withColumn("Gender",lit("null"))
df_add. union(df2).show()
For the Union-
from pyspark.sql.types import *
schema=StructType(
[
StructField("Name",StringType(), True),
StructField("Age",StringType(), True),
StructField("Gender",StringType(),True),
]
)
df3=spark.read.option("delimiter","|").csv("input.csv",header=True, schema=schema)
df4=spark.read.option("delimiter","|").csv("input2.csv", header=True, schema=schema)
df3.union(df4).show()
—-----------------------------------------------------------------—--------------------------------------------------------
Q53) Examine the following file, which contains some corrupt/bad data. What will you do
with such data, and how will you import them into a Spark Dataframe?
Ans)
—-----------------------------------------------------------------—--------------------------------------------------------
import findspark
findspark.init()
from pyspark. sql import Sparksession, types
spark = Sparksession.builder.master("local").appName( "Modes of Dataframereader')\
.getorcreate()
sc=spark. sparkContext
from pyspark.sql.types import *
schm structiype([
structField("col_1",stringType(), True),
StructField("col_2",stringType(), True),
structrield("col",stringtype(), True),
])
df=spark.read.option("mode", "DROPMALFORMED").csv('input1.csv', header=True,
schema=schm)
df. show()
—-----------------------------------------------------------------—--------------------------------------------------------
Q54) Consider a file containing an Education column that includes an array of elements,
as shown below. Using Spark Dataframe, convert each element in the array to a record.
Ans)
—-----------------------------------------------------------------—--------------------------------------------------------
import findspark
findspark.init()
from pyspark.sql import SparkSession, types
spark = SparkSession.builder.master("local").appName('scenario based')\
.getorCreate()
sc=spark.sparkContext
in_df=spark.read.option("delimiter","|").csv("input4.csv", header-True)
in_df.show()
from pyspark.sql.functions import posexplode_outer, split
in_df.withColumn("Qualification", explode_outer(split("Education",","))).show()
in_df.select("*", posexplode_outer(split("Education",","))).withColumnRenamed ("col",
"Qualification").withColumnRenamed ("pos", "Index").drop(“Education”).show()
—-----------------------------------------------------------------—--------------------------------------------------------
Q55) Give the output for the following-
input.csv:
101,Azar,finance
102,Mani,HR
103,Raj,IT
in_rdd=sc.textFile('input.csv')
map_rdd=in_rdd.map(lambda x: x.split(','))
map_rdd.count()
in_rdd=sc.textFile('input.csv')
map_rdd=in_rdd.flatMap(lambda x: x.split(','))
map_rdd.count()
Ans)
—-----------------------------------------------------------------—--------------------------------------------------------
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark=SparkSession.builder.master("local").appName( "map").getOrCreate()
sc=spark.sparkContext
in_rdd=sc.textFile('inputfile.txt')
map_rdd=in_rdd.map(lambda x: x.split(','))
map_rdd.collect()
map_rdd.count ()
—-----------------------------------------------------------------—--------------------------------------------------------
For the above code, the output is 3.
—-----------------------------------------------------------------—--------------------------------------------------------
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark=SparkSession.builder.master("local").appName( "map").getOrCreate()
sc=spark.sparkContext
in_rdd=sc.textFile('inputfile.txt')
flat_map_rdd=in_rdd.flatMap(lambda x: x.split(','))
for i in flat_map_rdd.collect():
print(i)
in_rdd.collect()
flat_map_rdd.count()
—-----------------------------------------------------------------—--------------------------------------------------------
Ans)
PySpark provides the reliability needed to upload our files to Apache Spark. This is
accomplished by using sc.addFile, where 'sc' stands for SparkContext. We use SparkFiles.net
to acquire the directory path.
We use the following methods in SparkFiles to resolve the path to the files added using
SparkContext.addFile():
● get(filename),
● getrootdirectory()
Ans)
SparkConf aids in the setup and settings needed to execute a spark application locally or in a
cluster. Put another way, it offers settings for running a Spark application. The following are
some of SparkConf's most important features:
Ans)
The primary difference between lists and tuples is that lists are mutable, but tuples are
immutable.
When a Python object may be edited, it is considered to be a mutable data type. Immutable
data types, on the other hand, cannot be changed.
Output:
[1,2,5,7]
Traceback (most recent call last):
File "python", line 6, in
TypeError: 'tuple' object doesn’t support item assignment
We assigned 7 to list_num at index 3 in this code, and 7 is found at index 3 in the output.
However, we set 7 to tup_num at index 3, but the result returned a type error. Because of their
immutable nature, we can't change tuples.
Ans)
There are two types of errors in Python: syntax errors and exceptions.
Syntax errors are frequently referred to as parsing errors. Errors are flaws in a program that
might cause it to crash or terminate unexpectedly. When a parser detects an error, it repeats the
offending line and then shows an arrow pointing to the line's beginning.
Exceptions arise in a program when the usual flow of the program is disrupted by an external
event. Even if the program's syntax is accurate, there is a potential that an error will be detected
during execution; nevertheless, this error is an exception. ZeroDivisionError, TypeError, and
NameError are some instances of exceptions.
Q60) What are the most significant changes between the Python API (PySpark) and
Apache Spark?
Ans)
PySpark is a Python API created and distributed by the Apache Spark organization to make
working with Spark easier for Python programmers. Scala is the programming language used by
Apache Spark. It can communicate with other languages like Java, R, and Python.
Also, because Scala is a compile-time, type-safe language, Apache Spark has several
capabilities that PySpark does not, one of which includes Datasets. Datasets are a highly typed
collection of domain-specific objects that may be used to execute concurrent calculations.
Ans)
Spark 2.0 includes a new class called SparkSession (pyspark.sql import SparkSession). Before
the 2.0 release, SparkSession was a unified class for all of the many contexts we had
(SQLContext and HiveContext, etc). Since version 2.0, SparkSession may replace SQLContext,
HiveContext, and other contexts specified before version 2.0. It's a way to get into the core
PySpark technology and construct PySpark RDDs and DataFrames programmatically. Spark is
the default object in pyspark-shell, and it may be generated programmatically with
SparkSession.
In PySpark, we must use the builder pattern function builder() to construct SparkSession
programmatically (in a.py file), as detailed below. The getOrCreate() function retrieves an
already existing SparkSession or creates a new SparkSession if none exists.
—-----------------------------------------------------------------—--------------------------------------------------------
spark=SparkSession.builder.master("local[1]") \
.appName('K21Academy') \
.getOrCreate()
—-----------------------------------------------------------------—--------------------------------------------------------
Q62) Suppose you encounter the following error message while running PySpark
commands on Linux-ImportError: No module named py4j.java_gateway
How will you resolve it?
Ans)
Py4J is a Java library integrated into PySpark that allows Python to actively communicate with
JVM instances. Py4J is a necessary module for the PySpark application to execute, and it may
be found in the $SPARK_HOME/python/lib/py4j-*-src.zip directory.
To execute the PySpark application after installing Spark, set the Py4j module to the
PYTHONPATH environment variable. We’ll get an ImportError: No module named
py4j.java_gateway error if we don't set this module to env.
Q63) Suppose you get an error- NameError: Name 'Spark' is not Defined while using
spark. createDataFrame(), but there are no errors while using the same in Spark or
PySpark shell. Why?
Ans)
Spark shell, PySpark shell, and Databricks all have the SparkSession object 'spark' by default.
However, if we are creating a Spark/PySpark application in a.py file, we must manually create a
SparkSession object by using builder to resolve NameError: Name 'Spark' is not Defined.
—-----------------------------------------------------------------—--------------------------------------------------------
# Import PySpark
import pyspark
from pyspark.sql import SparkSession
#Create SparkSession
spark = SparkSession.builder
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
If you get the error message 'No module named pyspark', try using findspark instead-
#Install findspark
pip install findspark
# Import findspark
import findspark
findspark.init()
#import pyspark
import pyspark
from pyspark.sql import SparkSession
—-----------------------------------------------------------------—--------------------------------------------------------
Ans)
Spark supports the following cluster managers:
● Standalone- a simple cluster manager that comes with Spark and makes setting up a
cluster easier.
● Apache Mesos- Mesos is a cluster manager that can also run Hadoop MapReduce and
PySpark applications.
● Hadoop YARN- It is the Hadoop 2 resource management.
● Kubernetes- an open-source framework for automating containerized application
deployment, scaling, and administration.
● local –not exactly a cluster manager, but it's worth mentioning because we use "local"
for master() to run Spark on our laptop/computer.
Ans)
Receivers are unique objects in Apache Spark Streaming whose sole purpose is to consume
data from various data sources and then move it to Spark. By streaming contexts as long-
running tasks on various executors, we can generate receiver objects.
There are two different kinds of receivers which are as follows:
● Reliable receiver: When data is received and copied properly in Apache Spark Storage,
this receiver validates data sources.
● Unreliable receiver: When receiving or replicating data in Apache Spark Storage, these
receivers do not recognize data sources.
8. FAQs on PySpark Interview Questions for Data Engineer
Ans)
No. PySpark is a Python API for Spark. PySpark allows you to create applications using Python
APIs.
Ans)
PySpark is a Python API for Apache Spark. It lets you develop Spark applications using Python
APIs, but it also includes the PySpark shell, which allows you to analyze data in a distributed
environment interactively. Most of Spark's capabilities, such as Spark SQL, DataFrame,
Streaming, MLlib (Machine Learning), and Spark Core, are supported by PySpark.
Ans)
Yes, PySpark is a faster and more efficient Big Data tool.
PySpark is a Python Spark library for running Python applications with Apache Spark features.
Hence, it cannot exist without Spark.
Ans)
PySpark is easy to learn for those with basic knowledge of Python, Java, etc.
Ans)
One week is sufficient to learn the basics of the Spark Core API if you have significant
knowledge of object-oriented programming and functional programming.
Ans)
PySpark is an open-source framework that provides Python API for Spark.
You should start by learning Python, SQL, and Apache Spark. But, you must gain some hands-
on experience by working on real-world projects available on GitHub, Kaggle etc.
FREE Consultation Call
Whether you're struggling to get a job or confused about where to
even begin to get a thriving career, book a free clarity call with
one of our experts at the time that works best for you.
THANK
YOU
www.k21academy.com