Spark SQL Hands - On

Download as txt, pdf, or txt
Download as txt, pdf, or txt
You are on page 1of 3
At a glance
Powered by AI
The document discusses how to work with Spark SQL and Hive including creating SQL contexts, loading data into Hive tables from Spark, and performing joins on JSON files.

We can create a SQL context in Spark by using sqlContext in Windows or it is available by default like sqlContext in Cloudera. The SQL context allows us to run SQL queries on DataFrames.

We can load data into an existing Hive table from Spark using the LOAD DATA LOCAL command and specifying the data file path. This loads the data from the file into the Hive table.

Create sqlContext

---------------------
We need to create sqlContext in windows but in cloudera it is available like
sqlContext

In Windows,
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Type and check the value

scala> sqlContext

For this Handson connect to Cloudera 5.4 and above

----------------------------------------
-----------------------------------------
1. Hive Query Execution - on text file
----------------------------------------
----------------------------------------
No need of hiveContext in Cloudera Spark
Queries are expressed in HiveQL

Hive_site.xml should be placed in Spark Conf directory

cloudera> sudo -i
cloudera> cp /etc/hive/conf/hive-site.xml /etc/spark/conf

Dataset: A text file with each containing information about a person


It is a text file.

A. Create a Hive table from Spark


--------------------------------------
scala> sqlContext.sql("CREATE TABLE people_table (FIRST STRING, MIDDLE STRING, LAST
STRING,GENDER STRING, BDATE STRING,SALARY DOUBLE, SSN STRING) ROW FORMAT DELIMITED
FIELDS TERMINATED BY ':' STORED AS TEXTFILE")

Check the table created in Hive:


The returned DataFrame has two columns: tableName and isTemporary

scala> sqlContext.sql("show tables").collect().foreach(println);

Check in hadoop file system:

cloudera> hadoop fs -ls /user/hive/warehouse

B. Load data into Hive table


------------------------------

scala> sqlContext.sql("LOAD DATA LOCAL INPATH '/home/cloudera/people.txt' INTO


TABLE people_table")

The results of SQL queries are themselves RDDs and support all normal RDD functions

C. Query on Hive table - To fetch female whose salary is lesser than 2 lakhs
------------------------------------------------------------------------------
scala> val resultdf = sqlContext.sql("FROM people_table SELECT FIRST,SALARY,SSN
WHERE GENDER='F' and SALARY < 200000 LIMIT 10")

To see a sample of 20 records


scala> resultdf.show()

To get the count of records


scala> resultdf.count()

To see the schema in treeformat


scala> resultdf.printSchema()

-------------------------------------
2. Load a JSON file and perform Join
-------------------------------------
Dataset: A json with each containing information about department of a person and a
json with each containing information about people
File name: department.json , people.json

Loading Json File


-----------------
(Cloudera 5.4/Spark 1.3)
scala> val deptdf = sqlContext.jsonFile("file:/home/cloudera/department.json")

(In Spark 1.5/cloudera 5.5 and above)


scala> val deptdf = sqlContext.read.json("file:/home/cloudera/department.json")

scala> deptdf.printSchema()

Verify the data:

scala> deptdf.select("ssn","dept").show()

Loading 2nd Json File


---------------------
Ver 1.3:
scala> val ppldf = sqlContext.jsonFile("file:/home/cloudera/people.json")

Ver 1.5 and above:


scala> val ppldf = sqlContext.read.json("file:/home/cloudera/people.json")

scala> ppldf.printSchema()

Join this json df with the dept json df

scala> val joinresult = deptdf.join(ppldf, deptdf("ssn") === ppldf("ssn"))


scala> joinresult.select("name", "city","dept").show()

See the explain plan


scala> joinresult.explain(true)
Now cache the first json and then join and check the performance
scala> deptdf.cache()

Again join and see the performance

scala> val joinresult = deptdf.join(ppldf, deptdf("ssn") === ppldf("ssn"))


scala> joinresult.select("name", "city","dept").show()

A. To find the no of female doctors in each city:


--------------------------------------------------
Now, register the DF as temporary table and do SQL query on it

scala> joinresult.registerTempTable("people_dept")
scala> val fdocdf= sqlContext.sql("SELECT city, count(*) as cnt FROM people_dept
WHERE gender = 'F' and dept = 'Doctor' GROUP BY city ORDER BY cnt DESC LIMIT 10")
scala> fdocdf.show()

B. Save the output as text file


------------------------------------
First convert the df to RDD.
scala> val fdocrdd = fdocdf.rdd

In command line, delete the path (if available)


cloudera> rm -r /home/cloudera/sparkout/jsonout

scala> fdocrdd.saveAsTextFile("file:/home/cloudera/sparkout/jsonout")

Check the saved file in filesystem:

cloudera> ls sparkout/jsonout
cloudera> cat sparkout/jsonout/part-00000

You might also like