Spark SQL Hands - On

Create sqlContext

We need to create sqlContext in windows but in cloudera it is available like

In Windows,
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Type and check the value

scala> sqlContext

For this Handson connect to Cloudera 5.4 and above

1. Hive Query Execution - on text file
No need of hiveContext in Cloudera Spark
Queries are expressed in HiveQL

Hive_site.xml should be placed in Spark Conf directory

cloudera> sudo -i
cloudera> cp /etc/hive/conf/hive-site.xml /etc/spark/conf

Dataset: A text file with each containing information about a person

It is a text file.

A. Create a Hive table from Spark

scala> sqlContext.sql("CREATE TABLE people_table (FIRST STRING, MIDDLE STRING, LAST

Check the table created in Hive:

The returned DataFrame has two columns: tableName and isTemporary

scala> sqlContext.sql("show tables").collect().foreach(println);

Check in hadoop file system:

cloudera> hadoop fs -ls /user/hive/warehouse

B. Load data into Hive table


scala> sqlContext.sql("LOAD DATA LOCAL INPATH '/home/cloudera/people.txt' INTO

TABLE people_table")

The results of SQL queries are themselves RDDs and support all normal RDD functions

C. Query on Hive table - To fetch female whose salary is lesser than 2 lakhs
scala> val resultdf = sqlContext.sql("FROM people_table SELECT FIRST,SALARY,SSN
WHERE GENDER='F' and SALARY < 200000 LIMIT 10")

To see a sample of 20 records

scala> resultdf.show()

To get the count of records

scala> resultdf.count()

To see the schema in treeformat

scala> resultdf.printSchema()

2. Load a JSON file and perform Join
Dataset: A json with each containing information about department of a person and a
json with each containing information about people
File name: department.json , people.json

Loading Json File

(Cloudera 5.4/Spark 1.3)
scala> val deptdf = sqlContext.jsonFile("file:/home/cloudera/department.json")

(In Spark 1.5/cloudera 5.5 and above)

scala> val deptdf = sqlContext.read.json("file:/home/cloudera/department.json")

scala> deptdf.printSchema()

Verify the data:

scala> deptdf.select("ssn","dept").show()

Loading 2nd Json File

Ver 1.3:
scala> val ppldf = sqlContext.jsonFile("file:/home/cloudera/people.json")

Ver 1.5 and above:

scala> val ppldf = sqlContext.read.json("file:/home/cloudera/people.json")

scala> ppldf.printSchema()

Join this json df with the dept json df

scala> val joinresult = deptdf.join(ppldf, deptdf("ssn") === ppldf("ssn"))

scala> joinresult.select("name", "city","dept").show()

See the explain plan

scala> joinresult.explain(true)
Now cache the first json and then join and check the performance
scala> deptdf.cache()

Again join and see the performance

scala> val joinresult = deptdf.join(ppldf, deptdf("ssn") === ppldf("ssn"))

scala> joinresult.select("name", "city","dept").show()

A. To find the no of female doctors in each city:

Now, register the DF as temporary table and do SQL query on it

scala> joinresult.registerTempTable("people_dept")
scala> val fdocdf= sqlContext.sql("SELECT city, count(*) as cnt FROM people_dept
WHERE gender = 'F' and dept = 'Doctor' GROUP BY city ORDER BY cnt DESC LIMIT 10")
scala> fdocdf.show()

B. Save the output as text file

First convert the df to RDD.
scala> val fdocrdd = fdocdf.rdd

In command line, delete the path (if available)

cloudera> rm -r /home/cloudera/sparkout/jsonout

scala> fdocrdd.saveAsTextFile("file:/home/cloudera/sparkout/jsonout")

Check the saved file in filesystem:

cloudera> ls sparkout/jsonout
cloudera> cat sparkout/jsonout/part-00000

