Pyspark PDF
Pyspark PDF
Pyspark PDF
net/publication/331482292
CITATIONS READS
2 6,753
1 author:
Wenqiang Feng
University of Tennessee
24 PUBLICATIONS 205 CITATIONS
SEE PROFILE
Some of the authors of this publication are also working on these related projects:
All content following this page was uploaded by Wenqiang Feng on 23 April 2019.
Wenqiang Feng
1 Preface 3
1.1 About . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 3
1.2 Motivation for this tutorial . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 4
1.3 Copyright notice and license info . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 4
1.4 Acknowledgement . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 4
1.5 Feedback and suggestions . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 5
i
6.5 Statistical Tests . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 57
7 Data Exploration 59
7.1 Univariate Analysis . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 59
7.2 Multivariate Analysis . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 72
8 Regression 79
8.1 Linear Regression . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 79
8.2 Generalized linear regression . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 90
8.3 Decision tree Regression . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 98
8.4 Random Forest Regression . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 104
8.5 Gradient-boosted tree regression . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 111
9 Regularization 119
9.1 Ordinary least squares regression . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 119
9.2 Ridge regression . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 119
9.3 Least Absolute Shrinkage and Selection Operator (LASSO) . . . . . . . . . . . . . . . . . 120
9.4 Elastic net . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 120
10 Classification 121
10.1 Binomial logistic regression . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 121
10.2 Multinomial logistic regression . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 132
10.3 Decision tree Classification . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 143
10.4 Random forest Classification . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 152
10.5 Gradient-boosted tree Classification . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 162
10.6 XGBoost: Gradient-boosted tree Classification . . . . . . . . . . . . . . . . . . . . . . . . 162
10.7 Naive Bayes Classification . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 164
11 Clustering 177
11.1 K-Means Model . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 177
ii
15 ALS: Stock Portfolio Recommendations 253
15.1 Recommender systems . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 254
15.2 Alternating Least Squares . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 255
15.3 Demo . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 255
Bibliography 383
Index 387
iii
iv
Learning Apache Spark with Python
Welcome to my Learning Apache Spark with Python note! In this note, you will learn a wide array of
concepts about PySpark in Data Mining, Text Mining, Machine Learning and Deep Learning. The PDF
version can be downloaded from HERE.
CONTENTS 1
Learning Apache Spark with Python
2 CONTENTS
CHAPTER
ONE
PREFACE
1.1 About
This is a shared repository for Learning Apache Spark Notes. The PDF version can be downloaded from
HERE. The first version was posted on Github in ChenFeng ([Feng2017]). This shared repository mainly
contains the self-learning and self-teaching notes from Wenqiang during his IMA Data Science Fellowship.
The reader is referred to the repository https://github.com/runawayhorse001/LearningApacheSpark for more
details about the dataset and the .ipynb files.
In this repository, I try to use the detailed demo code and examples to show how to use each main functions.
If you find your work wasn’t cited in this note, please feel free to let me know.
Although I am by no means an data mining programming and Big Data expert, I decided that it would be
useful for me to share what I learned about PySpark programming in the form of easy tutorials with detailed
example. I hope those tutorials will be a valuable tool for your studies.
The tutorials assume that the reader has a preliminary knowledge of programming and Linux. And this
document is generated automatically by using sphinx.
• Wenqiang Feng
– Data Scientist and PhD in Mathematics
– University of Tennessee at Knoxville
– Email: [email protected]
• Biography
Wenqiang Feng is Data Scientist within DST’s Applied Analytics Group. Dr. Feng’s responsibilities
include providing DST clients with access to cutting-edge skills and technologies, including Big Data
analytic solutions, advanced analytic and data enhancement techniques and modeling.
Dr. Feng has deep analytic expertise in data mining, analytic systems, machine learning algorithms,
business intelligence, and applying Big Data tools to strategically solve industry problems in a cross-
functional business. Before joining DST, Dr. Feng was an IMA Data Science Fellow at The Institute
3
Learning Apache Spark with Python
for Mathematics and its Applications (IMA) at the University of Minnesota. While there, he helped
startup companies make marketing decisions based on deep predictive analytics.
Dr. Feng graduated from University of Tennessee, Knoxville, with Ph.D. in Computational Mathe-
matics and Master’s degree in Statistics. He also holds Master’s degree in Computational Mathematics
from Missouri University of Science and Technology (MST) and Master’s degree in Applied Mathe-
matics from the University of Science and Technology of China (USTC).
• Declaration
The work of Wenqiang Feng was supported by the IMA, while working at IMA. However, any opin-
ion, finding, and conclusions or recommendations expressed in this material are those of the author
and do not necessarily reflect the views of the IMA, UTK and DST.
I was motivated by the IMA Data Science Fellowship project to learn PySpark. After that I was impressed
and attracted by the PySpark. And I foud that:
1. It is no exaggeration to say that Spark is the most powerful Bigdata tool.
2. However, I still found that learning Spark was a difficult process. I have to Google it and identify
which one is true. And it was hard to find detailed examples which I can easily learned the full
process in one file.
3. Good sources are expensive for a graduate student.
This Learning Apache Spark with Python PDF file is supposed to be a free and living document, which
is why its source is available online at https://runawayhorse001.github.io/LearningApacheSpark/pyspark.
pdf. But this document is licensed according to both MIT License and Creative Commons Attribution-
NonCommercial 2.0 Generic (CC BY-NC 2.0) License.
When you plan to use, copy, modify, merge, publish, distribute or sublicense, Please see the terms of
those licenses for more details and give the corresponding credits to the author.
1.4 Acknowledgement
At here, I would like to thank Ming Chen, Jian Sun and Zhongbo Li at the University of Tennessee at
Knoxville for the valuable disscussion and thank the generous anonymous authors for providing the detailed
solutions and source code on the internet. Without those help, this repository would not have been possible
to be made. Wenqiang also would like to thank the Institute for Mathematics and Its Applications (IMA) at
University of Minnesota, Twin Cities for support during his IMA Data Scientist Fellow visit.
A special thank you goes to Dr. Haiping Lu, Lecturer in Machine Learning at Department of Computer
Science, University of Sheffield, for recommending and heavily using my tutorial in his teaching class and
for the valuable suggestions.
4 Chapter 1. Preface
Learning Apache Spark with Python
Your comments and suggestions are highly appreciated. I am more than happy to receive corrections, sug-
gestions or feedbacks through email ([email protected]) for improvements.
6 Chapter 1. Preface
CHAPTER
TWO
Chinese proverb
Sharpening the knife longer can make it easier to hack the firewood – old Chinese proverb
I think the following four main reasons from Apache Spark™ official website are good enough to convince
you to use Spark.
1. Speed
Run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk.
Apache Spark has an advanced DAG execution engine that supports acyclic data flow and in-memory
computing.
2. Ease of Use
Write applications quickly in Java, Scala, Python, R.
7
Learning Apache Spark with Python
Spark offers over 80 high-level operators that make it easy to build parallel apps. And you can use it
interactively from the Scala, Python and R shells.
3. Generality
Combine SQL, streaming, and complex analytics.
Spark powers a stack of libraries including SQL and DataFrames, MLlib for machine learning,
GraphX, and Spark Streaming. You can combine these libraries seamlessly in the same application.
4. Runs Everywhere
Spark runs on Hadoop, Mesos, standalone, or in the cloud. It can access diverse data sources including
HDFS, Cassandra, HBase, and S3.
No matter you like it or not, Python has been one of the most popular programming languages.
THREE
Chinese proverb
Good tools are prerequisite to the successful execution of a job. – old Chinese proverb
A good programming platform can save you lots of troubles and time. Herein I will only present how to
install my favorite programming platform and only show the easiest way which I know to set it up on Linux
system. If you want to install on the other operator system, you can Google it. In this section, you may learn
how to set up Pyspark on the corresponding programming platform and package.
If you don’t have any experience with Linux or Unix operator system, I would love to recommend you to
use Spark on Databricks Community Cloud. Since you do not need to setup the Spark and it’s totally free
for Community Edition. Please follow the steps listed below.
1. Sign up a account at: https://community.cloud.databricks.com/login.html
2. Sign in with your account, then you can creat your cluster(machine), table(dataset) and
notebook(code).
Note: You need to save the path which appears at Uploaded to DBFS: /File-
Store/tables/05rmhuqv1489687378010/. Since we will use this path to load the dataset.
11
Learning Apache Spark with Python
After finishing the above 5 steps, you are ready to run your Spark code on Databricks Community Cloud. I
will run all the following demos on Databricks Community Cloud. Hopefully, when you run the demo code,
you will get the following results:
+---+-----+-----+---------+-----+
|_c0| TV|Radio|Newspaper|Sales|
+---+-----+-----+---------+-----+
| 1|230.1| 37.8| 69.2| 22.1|
| 2| 44.5| 39.3| 45.1| 10.4|
| 3| 17.2| 45.9| 69.3| 9.3|
| 4|151.5| 41.3| 58.5| 18.5|
| 5|180.8| 10.8| 58.4| 12.9|
+---+-----+-----+---------+-----+
only showing top 5 rows
root
|-- _c0: integer (nullable = true)
|-- TV: double (nullable = true)
|-- Radio: double (nullable = true)
|-- Newspaper: double (nullable = true)
|-- Sales: double (nullable = true)
I will strongly recommend you to install Anaconda, since it contains most of the prerequisites and support
multiple Operator Systems.
1. Install Python
Go to Ubuntu Software Center and follow the following steps:
a. Open Ubuntu Software Center
b. Search for python
c. And click Install
Or Open your terminal and using the following command:
Java is used by many other softwares. So it is quite possible that you have already installed it. You can by
using the following command in Command Prompt:
java -version
Otherwise, you can follow the steps in How do I install Java for my Mac? to install java on Mac and use the
following command in Command Prompt to install on Ubuntu:
Warning: Installing Java and Java SE Runtime Environment steps are very important, since
Spark is a domain-specific language written in Java.
You can check if your Java is available and find it’s version by using the following command in Command
Prompt:
java -version
If your Java is installed successfully, you will get the similar results as follows:
Actually, the Pre-build version doesn’t need installation. You can use it when you unpack it.
a. Download: You can get the Pre-built Apache Spark™ from Download Apache Spark™.
b. Unpack: Unpack the Apache Spark™ to the path where you want to install the Spark.
c. Test: Test the Prerequisites: change the direction spark-#.#.#-bin-hadoop#.#/
bin and run
./pyspark
vim ~/.bash_profile
And add the following lines to your bash_profile (remember to change the path)
source ~/.bash_profile
vim ~/.bashrc
And add the following lines to your bashrc (remember to change the path)
source ~/.bashrc
Installing open source software on Windows is always a nightmare for me. Thanks for Deelesh Mandloi.
You can follow the detailed procedures in the blog Getting Started with PySpark on Windows to install the
Apache Spark™ on your Windows Operator System.
After you finishing the above setup steps in Configure Spark on Mac and Ubuntu, then you should be good
to write and run your PySpark Code in Jupyter notebook.
After you finishing the above setup steps in Configure Spark on Mac and Ubuntu, then you should be good
to write and run your PySpark Code in Apache Zeppelin.
After you finishing the above setup steps in Configure Spark on Mac and Ubuntu, then you should be good
to use Sublime Text to write your PySpark Code and run your code as a normal python code in Terminal.
python test_pyspark.py
If you want to run PySpark code on Eclipse, you need to add the paths for the External Libraries for your
Current Project as follows:
1. Open the properties of your project
And then you should be good to run your code on Eclipse with PyDev.
unzip sparkling-water-2.4.5.zip
cd ~/sparkling-water-2.4.5/bin
./pysparkling
If you have a correct setup for PySpark, then you will get the following results:
sparkling
Following the setup steps in Configure Spark on Mac and Ubuntu, you can set up your own cluster on the
cloud, for example AWS, Google Cloud. Actually, for those clouds, they have their own Big Data tool. Yon
can run them directly whitout any setting just like Databricks Community Cloud. If you want more details,
please feel free to contact with me.
The code for this section is available for download test_pyspark, and the Jupyter notebook can be download
from test_pyspark_ipynb.
• Python Source code
## set up SparkSession
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df.show(5)
df.printSchema()
FOUR
Chinese proverb
Know yourself and know your enemy, and you will never be defeated – idiom, from Sunzi’s Art of War
Most of the following content comes from [Kirillov2016]. So the copyright belongs to Anton Kirillov. I
will refer you to get more details from Apache Spark core concepts, architecture and internals.
Before diving deep into how Apache Spark works, lets understand the jargon of Apache Spark
• Job: A piece of code which reads some input from HDFS or local, performs some computation on the
data and writes some output data.
• Stages: Jobs are divided into stages. Stages are classified as a Map or reduce stages (Its easier to
understand if you have worked on Hadoop and want to correlate). Stages are divided based on com-
putational boundaries, all computations (operators) cannot be Updated in a single Stage. It happens
over many stages.
• Tasks: Each stage has some tasks, one task per partition. One task is executed on one partition of data
on one executor (machine).
• DAG: DAG stands for Directed Acyclic Graph, in the present context its a DAG of operators.
• Executor: The process responsible for executing a task.
• Master: The machine on which the Driver program runs
• Slave: The machine on which the Executor program runs
1. Spark Driver
27
Learning Apache Spark with Python
• SparkContext
– represents the connection to a Spark cluster, and can be used to create RDDs, accu-
mulators and broadcast variables on that cluster
• DAGScheduler
– computes a DAG of stages for each job and submits them to TaskScheduler deter-
mines preferred locations for tasks (based on cache status or shuffle files locations)
and finds minimum schedule to run the jobs
• TaskScheduler
– responsible for sending tasks to the cluster, running them, retrying if there are failures,
and mitigating stragglers
• SchedulerBackend
– backend interface for scheduling systems that allows plugging in different implemen-
tations(Mesos, YARN, Standalone, local)
• BlockManager
– provides interfaces for putting and retrieving blocks both locally and remotely into
various stores (memory, disk, and off-heap)
4.3 Architecture
Spark has a small code base and the system is divided in various layers. Each layer has some responsibilities.
The layers are independent of each other.
The first layer is the interpreter, Spark uses a Scala interpreter, with some modifications. As you enter
your code in spark console (creating RDD’s and applying operators), Spark creates a operator graph. When
the user runs an action (like collect), the Graph is submitted to a DAG Scheduler. The DAG scheduler
divides operator graph into (map and reduce) stages. A stage is comprised of tasks based on partitions of
the input data. The DAG scheduler pipelines operators together to optimize the graph. For e.g. Many map
operators can be scheduled in a single stage. This optimization is key to Sparks performance. The final
result of a DAG scheduler is a set of stages. The stages are passed on to the Task Scheduler. The task
scheduler launches tasks via cluster manager. (Spark Standalone/Yarn/Mesos). The task scheduler doesn’t
know about dependencies among stages.
FIVE
Chinese proverb
If you only know yourself, but not your opponent, you may win or may lose. If you know neither
yourself nor your enemy, you will always endanger yourself – idiom, from Sunzi’s Art of War
RDD represents Resilient Distributed Dataset. An RDD in Spark is simply an immutable distributed
collection of objects sets. Each RDD is split into multiple partitions (similar pattern with smaller sets),
which may be computed on different nodes of the cluster.
Usually, there are two popular way to create the RDDs: loading an external dataset, or distributing a set
of collection of objects. The following examples show some simplest ways to create RDDs by using
parallelize() fucntion which takes an already existing collection in your program and pass the same
to the Spark Context.
1. By using parallelize( ) fucntion
spark = SparkSession \
.builder \
.appName("Python Spark create RDD example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df.show()
+----+----+----+-----+
(continues on next page)
31
Learning Apache Spark with Python
spark = SparkSession \
.builder \
.appName("Python Spark create RDD example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
myData.collect()
[(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]
spark = SparkSession \
.builder \
.appName("Python Spark create RDD example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
Employee = spark.createDataFrame([
('1', 'Joe', '70000', '1'),
('2', 'Henry', '80000', '2'),
('3', 'Sam', '60000', '2'),
('4', 'Max', '90000', '1')],
['Id', 'Name', 'Sallary','DepartmentId']
)
+---+-----+-------+------------+
| Id| Name|Sallary|DepartmentId|
+---+-----+-------+------------+
| 1| Joe| 70000| 1|
| 2|Henry| 80000| 2|
| 3| Sam| 60000| 2|
| 4| Max| 90000| 1|
+---+-----+-------+------------+
## set up SparkSession
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark create RDD example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').\
load("/home/feng/Spark/Code/data/Advertising.csv",
˓→header=True)
df.show(5)
df.printSchema()
+---+-----+-----+---------+-----+
|_c0| TV|Radio|Newspaper|Sales|
+---+-----+-----+---------+-----+
| 1|230.1| 37.8| 69.2| 22.1|
| 2| 44.5| 39.3| 45.1| 10.4|
| 3| 17.2| 45.9| 69.3| 9.3|
| 4|151.5| 41.3| 58.5| 18.5|
| 5|180.8| 10.8| 58.4| 12.9|
+---+-----+-----+---------+-----+
only showing top 5 rows
root
|-- _c0: integer (nullable = true)
|-- TV: double (nullable = true)
|-- Radio: double (nullable = true)
|-- Newspaper: double (nullable = true)
|-- Sales: double (nullable = true)
Once created, RDDs offer two types of operations: transformations and actions.
b. Read dataset from DataBase
## set up SparkSession
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark create RDD example") \
.config("spark.some.config.option", "some-value") \
(continues on next page)
## User information
user = 'your_username'
pw = 'your_password'
## Database information
table_name = 'table_name'
url = 'jdbc:postgresql://##.###.###.##:5432/dataset?user='+user+'&
˓→password='+pw
properties ={'driver': 'org.postgresql.Driver', 'password': pw,'user
˓→': user}
df = spark.read.jdbc(url=url, table=table_name,
˓→properties=properties)
df.show(5)
df.printSchema()
+---+-----+-----+---------+-----+
|_c0| TV|Radio|Newspaper|Sales|
+---+-----+-----+---------+-----+
| 1|230.1| 37.8| 69.2| 22.1|
| 2| 44.5| 39.3| 45.1| 10.4|
| 3| 17.2| 45.9| 69.3| 9.3|
| 4|151.5| 41.3| 58.5| 18.5|
| 5|180.8| 10.8| 58.4| 12.9|
+---+-----+-----+---------+-----+
only showing top 5 rows
root
|-- _c0: integer (nullable = true)
|-- TV: double (nullable = true)
|-- Radio: double (nullable = true)
|-- Newspaper: double (nullable = true)
|-- Sales: double (nullable = true)
Note: Reading tables from Database needs the proper drive for the corresponding Database. For example,
the above demo needs org.postgresql.Driver and you need to download it and put it in jars folder
of your spark installation path. I download postgresql-42.1.1.jar from the official website and put
it in jars folder.
sc= SparkContext('local','example')
hc = HiveContext(sc)
tf1 = sc.textFile("hdfs://cdhstltest/user/data/demo.CSV")
print(tf1.first())
hc.sql("use intg_cme_w")
spf = hc.sql("SELECT * FROM spf LIMIT 100")
print(spf.show(5))
Warning: All the figures below are from Jeffrey Thompson. The interested reader is referred to pyspark
pictures
There are two main types of Spark operations: Transformations and Actions [Karau2015].
Note: Some people defined three types of operations: Transformations, Actions and Shuffles.
Transformations construct a new RDD from a previous one. For example, one common transformation is
filtering data that matches a predicate.
Actions, on the other hand, compute a result based on an RDD, and either return it to the driver program or
save it to an external storage system (e.g., HDFS).
1. From List
:: Python Code:
:: Comparison:
+---+---+---+
| A| B| C|
A B C +---+---+---+
0 a 1 2 | a| 1| 2|
1 b 2 3 | b| 2| 3|
2 c 3 4 | c| 3| 4|
+---+---+---+
Attention: Pay attentation to the parameter columns= in pd.DataFrame. Since the default value
will make the list as rows.
:: Python Code:
# caution for the columns=
pd.DataFrame(my_list, columns= col_name)
#
pd.DataFrame(my_list, col_name)
:: Comparison:
A B C 0 1 2
0 a 1 2 A a 1 2
1 b 2 3 B b 2 3
2 c 3 4 C c 3 4
2. From Dict
:: Python Code:
pd.DataFrame(d)for
# Tedious for PySpark
spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys())).
˓→show()
:: Comparison:
+---+---+---+
| A| B| C|
A B C +---+---+---+
0 0 1 1 | 0| 1| 1|
1 1 0 0 | 1| 0| 0|
2 0 1 0 | 0| 1| 0|
+---+---+---+
1. From DataBase
Most of time, you need to share your code with your colleagues or release your code for Code Review or
Quality assurance(QA). You will definitely do not want to have your User Information in the code.
So you can save them in login.txt:
runawayhorse001
PythonTips
#User Information
try:
login = pd.read_csv(r'login.txt', header=None)
user = login[0][0]
pw = login[0][1]
print('User information is ready!')
except:
print('Login information is not available!!!')
#Database information
host = '##.###.###.##'
db_name = 'db_name'
table_name = 'table_name'
:: Comparison:
sql = """
select *
from {table_name}
""".format(table_name=table_name)
dp = pd.read_sql(sql, conn)
# connect to database
url = 'jdbc:postgresql://'+host+':5432/'+db_name+'?user='+user+'&password='+pw
properties ={'driver': 'org.postgresql.Driver', 'password': pw,'user': user}
ds = spark.read.jdbc(url=url, table=table_name, properties=properties)
Attention: Reading tables from Database with PySpark needs the proper drive for the corresponding
Database. For example, the above demo needs org.postgresql.Driver and you need to download it and
put it in jars folder of your spark installation path. I download postgresql-42.1.1.jar from the official
website and put it in jars folder.
2. From .csv
:: Comparison:
3. From .json
Data from: http://api.luftdaten.info/static/v1/data.json
dp = pd.read_json("data/data.json")
ds = spark.read.json('data/data.json')
:: Python Code:
dp[['id','timestamp']].head(4)
#
ds[['id','timestamp']].show(4)
:: Comparison:
+----------+------------------
˓→ -+
| id|
˓→ timestamp|
id timestamp +----------+------------------
˓→ -+
0 2994551481 2019-02-28 17:23:52 |2994551481|2019-02-28
˓→ 17:23:52|
1 2994551482 2019-02-28 17:23:52 |2994551482|2019-02-28
˓→17:23:52|
2 2994551483 2019-02-28 17:23:52 |2994551483|2019-02-28
˓→17:23:52|
3 2994551484 2019-02-28 17:23:52 |2994551484|2019-02-28
˓→17:23:52|
+----------+------------------
˓→ -+
only showing top 4 rows
:: Python Code:
dp.head(4)
#
ds.show(4)
:: Comparison:
+-----+-----+---------+-----+
| TV|Radio|Newspaper|Sales|
TV Radio Newspaper Sales +-----+-----+---------+-----+
0 230.1 37.8 69.2 22.1 |230.1| 37.8| 69.2| 22.1|
1 44.5 39.3 45.1 10.4 | 44.5| 39.3| 45.1| 10.4|
2 17.2 45.9 69.3 9.3 | 17.2| 45.9| 69.3| 9.3|
3 151.5 41.3 58.5 18.5 |151.5| 41.3| 58.5| 18.5|
+-----+-----+---------+-----+
only showing top 4 rows
:: Python Code:
dp.columns
#
ds.columns
:: Comparison:
:: Python Code:
dp.dtypes
#
ds.dtypes
:: Comparison:
:: Comparison:
+------+---+----+
| A| B| C|
A B C +------+---+----+
0 male 1 NaN | male| 1|null|
1 female 2 3.0 |female| 2| 3|
2 male 3 4.0 | male| 3| 4|
+------+---+----+
:: Python Code:
dp.fillna(-99)
#
ds.fillna(-99).show()
:: Comparison:
+------+---+----+
| A| B| C|
(continues on next page)
:: Python Code:
:: Comparison:
+---+---+----+
| A| B| C|
A B C +---+---+----+
0 1 1 NaN | 1| 1|null|
1 0 2 3.0 | 0| 2| 3|
2 1 3 4.0 | 1| 3| 4|
+---+---+----+
dp.columns = ['a','b','c','d']
dp.head(4)
#
ds.toDF('a','b','c','d').show(4)
:: Comparison:
+-----+----+----+----+
| a| b| c| d|
a b c d +-----+----+----+----+
0 230.1 37.8 69.2 22.1 |230.1|37.8|69.2|22.1|
1 44.5 39.3 45.1 10.4 | 44.5|39.3|45.1|10.4|
2 17.2 45.9 69.3 9.3 | 17.2|45.9|69.3| 9.3|
3 151.5 41.3 58.5 18.5 |151.5|41.3|58.5|18.5|
+-----+----+----+----+
only showing top 4 rows
mapping = {'Newspaper':'C','Sales':'D'}
:: Python Code:
dp.rename(columns=mapping).head(4)
#
new_names = [mapping.get(col,col) for col in ds.columns]
ds.toDF(*new_names).show(4)
:: Comparison:
+-----+-----+----+----+
| TV|Radio| C| D|
TV Radio C D +-----+-----+----+----+
0 230.1 37.8 69.2 22.1 |230.1| 37.8|69.2|22.1|
1 44.5 39.3 45.1 10.4 | 44.5| 39.3|45.1|10.4|
2 17.2 45.9 69.3 9.3 | 17.2| 45.9|69.3| 9.3|
3 151.5 41.3 58.5 18.5 |151.5| 41.3|58.5|18.5|
+-----+-----+----+----+
only showing top 4 rows
Note: You can also use withColumnRenamed to rename one column in PySpark.
:: Python Code:
ds.withColumnRenamed('Newspaper','Paper').show(4
:: Comparison:
+-----+-----+-----+-----+
| TV|Radio|Paper|Sales|
+-----+-----+-----+-----+
|230.1| 37.8| 69.2| 22.1|
| 44.5| 39.3| 45.1| 10.4|
| 17.2| 45.9| 69.3| 9.3|
|151.5| 41.3| 58.5| 18.5|
+-----+-----+-----+-----+
only showing top 4 rows
drop_name = ['Newspaper','Sales']
:: Python Code:
dp.drop(drop_name,axis=1).head(4)
#
ds.drop(*drop_name).show(4)
:: Comparison:
+-----+-----+
| TV|Radio|
TV Radio +-----+-----+
0 230.1 37.8 |230.1| 37.8|
1 44.5 39.3 | 44.5| 39.3|
2 17.2 45.9 | 17.2| 45.9|
3 151.5 41.3 |151.5| 41.3|
+-----+-----+
only showing top 4 rows
5.3.10 Filter
dp = pd.read_csv('Advertising.csv')
#
ds = spark.read.csv(path='Advertising.csv',
header=True,
inferSchema=True)
:: Python Code:
dp[dp.Newspaper<20].head(4)
#
ds[ds.Newspaper<20].show(4)
:: Comparison:
+-----+-----+---------+-----+
| TV|Radio|Newspaper|Sales|
TV Radio Newspaper Sales +-----+-----+---------+-----+
7 120.2 19.6 11.6 13.2 |120.2| 19.6| 11.6| 13.2|
8 8.6 2.1 1.0 4.8 | 8.6| 2.1| 1.0| 4.8|
11 214.7 24.0 4.0 17.4 |214.7| 24.0| 4.0| 17.4|
13 97.5 7.6 7.2 9.7 | 97.5| 7.6| 7.2| 9.7|
+-----+-----+---------+-----+
only showing top 4 rows
:: Python Code:
dp[(dp.Newspaper<20)&(dp.TV>100)].head(4)
#
ds[(ds.Newspaper<20)&(ds.TV>100)].show(4)
:: Comparison:
+-----+-----+---------+-----+
| TV|Radio|Newspaper|Sales|
TV Radio Newspaper Sales +-----+-----+---------+-----+
7 120.2 19.6 11.6 13.2 |120.2| 19.6| 11.6| 13.2|
11 214.7 24.0 4.0 17.4 |214.7| 24.0| 4.0| 17.4|
19 147.3 23.9 19.1 14.6 |147.3| 23.9| 19.1| 14.6|
25 262.9 3.5 19.5 12.0 |262.9| 3.5| 19.5| 12.0|
+-----+-----+---------+-----+
only showing top 4 rows
:: Python Code:
dp['tv_norm'] = dp.TV/sum(dp.TV)
dp.head(4)
#
ds.withColumn('tv_norm', ds.TV/ds.groupBy().agg(F.sum("TV")).collect()[0][0]).
˓→show(4)
:: Comparison:
+-----+-----+---------+-----+-
˓→ -------------------+
| TV|Radio|Newspaper|Sales|
˓→ tv_norm|
TV Radio Newspaper Sales tv_norm +-----+-----+---------+-----+-
˓→-------------------+
0 230.1 37.8 69.2 22.1 0.007824 |230.1| 37.8| 69.2| 22.
˓→1|0.007824268493802813|
1 44.5 39.3 45.1 10.4 0.001513 | 44.5| 39.3| 45.1| 10.
˓→4|0.001513167961643...|
2 17.2 45.9 69.3 9.3 0.000585 | 17.2| 45.9| 69.3| 9.
˓→3|5.848649200061207E-4|
:: Python Code:
:: Comparison:
+-----+-----+---------+-----+-
˓→ ---+
|
˓→ TV|Radio|Newspaper|Sales|cond|
TV Radio Newspaper Sales cond +-----+-----+---------+-----+-
˓→---+
0 230.1 37.8 69.2 22.1 1 |230.1| 37.8| 69.2| 22.1|
˓→ 1|
1 44.5 39.3 45.1 10.4 2 | 44.5| 39.3| 45.1| 10.4|
˓→ 2|
2 17.2 45.9 69.3 9.3 3 | 17.2| 45.9| 69.3| 9.3|
˓→ 3|
3 151.5 41.3 58.5 18.5 2 |151.5| 41.3| 58.5| 18.5|
˓→ 2|
+-----+-----+---------+-----+-
˓→ ---+
only showing top 4 rows
:: Python Code:
dp['log_tv'] = np.log(dp.TV)
dp.head(4)
#
ds.withColumn('log_tv',F.log(ds.TV)).show(4)
:: Comparison:
+-----+-----+---------+-----+-
˓→ -----------------+
| TV|Radio|Newspaper|Sales|
˓→ log_tv|
TV Radio Newspaper Sales log_tv +-----+-----+---------+-----+-
˓→-----------------+
0 230.1 37.8 69.2 22.1 5.438514 |230.1| 37.8| 69.2| 22.1|
˓→ 5.43851399704132|
+-----+-----+---------+-----+-
˓→ -----------------+
only showing top 4 rows
:: Python Code:
:: Comparison:
+-----+-----+---------+-----+-
˓→ ----+
|
˓→ TV|Radio|Newspaper|Sales|tv+10|
TV Radio Newspaper Sales tv+10 +-----+-----+---------+-----+-
˓→----+
0 230.1 37.8 69.2 22.1 240.1 |230.1| 37.8| 69.2| 22.
˓→1|240.1|
+-----+-----+---------+-----+-
˓→ ----+
only showing top 4 rows
5.3.12 Join
lefts = spark.createDataFrame(leftp)
rights = spark.createDataFrame(rightp)
A B C D A F G H
0 A0 B0 C0 D0 4 A0 B4 C4 D4
1 A1 B1 C1 D1 5 A1 B5 C5 D5
2 A2 B2 C2 D2 6 A6 B6 C6 D6
3 A3 B3 C3 D3 7 A7 B7 C7 D7
1. Left Join
:: Python Code:
leftp.merge(rightp,on='A',how='left')
#
lefts.join(rights,on='A',how='left')
.orderBy('A',ascending=True).show()
:: Comparison:
+---+---+---+---+----+---
˓→ -+----+
| A| B| C| D| F|
˓→G| H|
A B C D F G H +---+---+---+---+----+---
˓→-+----+
0 A0 B0 C0 D0 B4 C4 D4 | A0| B0| C0| D0| B4|
˓→C4| D4|
1 A1 B1 C1 D1 B5 C5 D5 | A1| B1| C1| D1| B5|
˓→C5| D5|
2 A2 B2 C2 D2 NaN NaN NaN | A2| B2| C2|
˓→D2|null|null|null|
3 A3 B3 C3 D3 NaN NaN NaN | A3| B3| C3|
˓→D3|null|null|null|
+---+---+---+---+----+---
˓→ -+----+
2. Right Join
:: Python Code:
leftp.merge(rightp,on='A',how='right')
#
lefts.join(rights,on='A',how='right')
.orderBy('A',ascending=True).show()
:: Comparison:
+---+----+----+----+---+-
˓→ --+---+
| A| B| C| D| F|
˓→ G| H|
A B C D F G H +---+----+----+----+---+-
˓→--+---+
0 A0 B0 C0 D0 B4 C4 D4 | A0| B0| C0| D0| B4|
˓→C4| D4|
1 A1 B1 C1 D1 B5 C5 D5 | A1| B1| C1| D1| B5|
˓→C5| D5|
3. Inner Join
:: Python Code:
leftp.merge(rightp,on='A',how='inner')
#
lefts.join(rights,on='A',how='inner')
.orderBy('A',ascending=True).show()
:: Comparison:
+---+---+---+---+---+---+---+
| A| B| C| D| F| G| H|
A B C D F G H +---+---+---+---+---+---+---+
0 A0 B0 C0 D0 B4 C4 D4 | A0| B0| C0| D0| B4| C4| D4|
1 A1 B1 C1 D1 B5 C5 D5 | A1| B1| C1| D1| B5| C5| D5|
+---+---+---+---+---+---+---+
4. Full Join
:: Python Code:
leftp.merge(rightp,on='A',how='full')
#
lefts.join(rights,on='A',how='full')
.orderBy('A',ascending=True).show()
:: Comparison:
+---+----+----+----+----
˓→ +----+----+
| A| B| C| D|
˓→F| G| H|
A B C D F G H +---+----+----+----+----
˓→+----+----+
0 A0 B0 C0 D0 B4 C4 D4 | A0| B0| C0| D0|
˓→B4| C4| D4|
1 A1 B1 C1 D1 B5 C5 D5 | A1| B1| C1| D1|
˓→B5| C5| D5|
2 A2 B2 C2 D2 NaN NaN NaN | A2| B2| C2|
˓→D2|null|null|null|
3 A3 B3 C3 D3 NaN NaN NaN | A3| B3| C3|
˓→D3|null|null|null|
:: Python Code:
:: Comparison:
+----+----+----+------+
|col1|col2|col3|concat|
col1 col2 col3 concat +----+----+----+------+
0 a 2 3 a2 | a| 2| 3| a2|
1 b 5 6 b5 | b| 5| 6| b5|
2 c 8 9 c8 | c| 8| 9| c8|
3 a 2 3 a2 | a| 2| 3| a2|
4 b 5 6 b5 | b| 5| 6| b5|
5 c 8 9 c8 | c| 8| 9| c8|
+----+----+----+------+
5.3.14 GroupBy
:: Python Code:
dp.groupby(['col1']).agg({'col2':'min','col3':'mean'})
#
ds.groupBy(['col1']).agg({'col2': 'min', 'col3': 'avg'}).show()
:: Comparison:
+----+---------+---------+
col2 col3 |col1|min(col2)|avg(col3)|
col1 +----+---------+---------+
a 2 3 | c| 8| 9.0|
b 5 6 | b| 5| 6.0|
c 8 9 | a| 2| 3.0|
+----+---------+---------+
5.3.15 Pivot
:: Python Code:
:: Comparison:
+----+----+----+----+
col2 2 5 8 |col1| 2| 5| 8|
col1 +----+----+----+----+
a 6.0 NaN NaN | c|null|null| 18|
b NaN 12.0 NaN | b|null| 12|null|
c NaN NaN 18.0 | a| 6|null|null|
+----+----+----+----+
5.3.16 Window
d = {'A':['a','b','c','d'],'B':['m','m','n','n'],'C':[1,2,3,6]}
dp = pd.DataFrame(d)
ds = spark.createDataFrame(dp)
:: Python Code:
dp['rank'] = dp.groupby('B')['C'].rank('dense',ascending=False)
#
from pyspark.sql.window import Window
w = Window.partitionBy('B').orderBy(ds.C.desc())
ds = ds.withColumn('rank',F.rank().over(w))
:: Comparison:
+---+---+---+----+
| A| B| C|rank|
A B C rank +---+---+---+----+
0 a m 1 2.0 | b| m| 2| 1|
1 b m 2 1.0 | a| m| 1| 2|
2 c n 3 2.0 | d| n| 6| 1|
3 d n 6 1.0 | c| n| 3| 2|
+---+---+---+----+
d ={'Id':[1,2,3,4,5,6],
'Score': [4.00, 4.00, 3.85, 3.65, 3.65, 3.50]}
(continues on next page)
Id Score
0 1 4.00
1 2 4.00
2 3 3.85
3 4 3.65
4 5 3.65
5 6 3.50
:: Python Code:
:: Comparison:
+---+-----+----------------+----------+
| Id|Score|Rank_spark_dense|Rank_spark|
Id Score Rank_dense Rank +---+-----+----------------+----------+
0 1 4.00 1.0 1.0 | 1| 4.0| 1| 1|
1 2 4.00 1.0 1.0 | 2| 4.0| 1| 1|
2 3 3.85 2.0 3.0 | 3| 3.85| 2| 3|
3 4 3.65 3.0 4.0 | 4| 3.65| 3| 4|
4 5 3.65 3.0 4.0 | 5| 3.65| 3| 4|
5 6 3.50 4.0 6.0 | 6| 3.5| 4| 6|
+---+-----+----------------+----------+
SIX
Chinese proverb
If you only know yourself, but not your opponent, you may win or may lose. If you know neither
yourself nor your enemy, you will always endanger yourself – idiom, from Sunzi’s Art of War
6.1 Notations
Since I have documented the Linear Algebra Preliminaries in my Prelim Exam note for Numerical Analysis,
the interested reader is referred to [Feng2014] for more details (Figure. Linear Algebra Preliminaries).
53
Learning Apache Spark with Python
In statistics, MAE (Mean absolute error) is a measure of difference between two continuous variables. The
Mean Absolute Error is given by:
𝑚
1 ∑︁
MAE = |ˆ
𝑦𝑖 − 𝑦𝑖 |.
𝑚
𝑖=1
In statistics, the MSE (Mean Squared Error) of an estimator (of a procedure for estimating an unobserved
quantity) measures the average of the squares of the errors or deviations—that is, the difference between the
estimator and what is estimated.
𝑚
1 ∑︁
MSE = 𝑦𝑖 − 𝑦𝑖 )2
(ˆ
𝑚
𝑖=1
In statistical data analysis the TSS (Total Sum of Squares) is a quantity that appears as part of a standard way
of presenting results of such analyses. It is defined as being the sum, over all observations, of the squared
differences of each observation from the overall mean.
𝑚
∑︁
TSS = ¯ )2
(𝑦𝑖 − 𝑦
𝑖=1
In statistics, the ESS (Explained sum of squares), alternatively known as the model sum of squares or sum
of squares due to regression.
The ESS is the sum of the squares of the differences of the predicted values and the mean value of the
response variable which is given by:
𝑚
∑︁
ESS = (ˆ ¯ )2
𝑦𝑖 − 𝑦
𝑖=1
In statistics, RSS (Residual sum of squares), also known as the sum of squared residuals (SSR) or the sum
of squared errors of prediction (SSE), is the sum of the squares of residuals which is given by:
𝑚
∑︁
RSS = 𝑦𝑖 − 𝑦𝑖 )2
(ˆ
𝑖=1
𝐸𝑆𝑆 RSS
𝑅2 := =1− .
𝑇 𝑆𝑆 TSS
Note: In general (𝑦 𝑇 𝑦 ˆ𝑇 𝑦
¯=𝑦 ¯ ), total sum of squares = explained sum of squares + residual sum of squares,
i.e.:
More details can be found at Partitioning in the general ordinary least squares model.
6.4.1 Recall
TP
Recall =
TP+FN
6.4.2 Precision
TP
Precision =
TP+FP
6.4.3 Accuracy
TP+TN
Accuracy =
Total
6.4.4 𝐹1 -score
2 * Recall * Precision
F1 =
Recall + Precision
• Pearson correlation: Tests for the strength of the association between two continuous variables.
• Spearman correlation: Tests for the strength of the association between two ordinal variables (does
not rely on the assumption of normal distributed data).
• Chi-square: Tests for the strength of the association between two categorical variables.
• Wilcoxon rank-sum test: Tests for difference between two independent variables - takes into account
magnitude and direction of difference.
• Wilcoxon sign-rank test: Tests for difference between two related variables - takes into account mag-
nitude and direction of difference.
• Sign test: Tests if two related variables are different – ignores magnitude of change, only takes into
account direction.
SEVEN
DATA EXPLORATION
Chinese proverb
A journey of a thousand miles begins with a single step – idiom, from Laozi.
I wouldn’t say that understanding your dataset is the most difficult thing in data science, but it is really
important and time-consuming. Data Exploration is about describing the data by means of statistical and
visualization techniques. We explore data in order to understand the features and bring important features
to our models.
In mathematics, univariate refers to an expression, equation, function or polynomial of only one variable.
“Uni” means “one”, so in other words your data has only one variable. So you do not need to deal with the
causes or relationships in this step. Univariate analysis takes data, summarizes that variables (attributes) one
by one and finds patterns in the data.
There are many ways that can describe patterns found in univariate data include central tendency (mean,
mode and median) and dispersion: range, variance, maximum, minimum, quartiles (including the interquar-
tile range), coefficient of variation and standard deviation. You also have several options for visualizing and
describing data with univariate data. Such as frequency Distribution Tables, bar Charts,
histograms, frequency Polygons, pie Charts.
The variable could be either categorical or numerical, I will demostrate the different statistical and visuliza-
tion techniques to investigate each type of the variable.
• The Jupyter notebook can be download from Data Exploration.
• The data can be downloaf from German Credit.
• Describe
59
Learning Apache Spark with Python
The desctibe function in pandas and spark will give us most of the statistical results, such as min,
median, max, quartiles and standard deviation. With the help of the user defined function,
you can get even more statistical results.
+-------+------------------+-------------------+
|summary| Account Balance| No of dependents|
+-------+------------------+-------------------+
| count| 1000| 1000|
| mean| 2.577| 1.155|
| stddev|1.2576377271108936|0.36208577175319395|
| min| 1| 1|
| max| 4| 2|
+-------+------------------+-------------------+
You may find out that the default function in PySpark does not include the quartiles. The following function
will help you to get the same results in Pandas
if deciles:
percentiles = np.array(range(0, 110, 10))
else:
percentiles = [25, 50, 75]
percs = np.transpose([np.percentile(df_in.select(x).collect(),
˓→ percentiles) for x in columns])
percs = pd.DataFrame(percs, columns=columns)
percs['summary'] = [str(p) + '%' for p in percentiles]
spark_describe = df_in.describe().toPandas()
new_df = pd.concat([spark_describe, percs],ignore_index=True)
new_df = new_df.round(2)
return new_df[['summary'] + columns]
describe_pd(df,num_cols)
+-------+------------------+-----------------+
|summary| Account Balance| No of dependents|
+-------+------------------+-----------------+
| count| 1000.0| 1000.0|
| mean| 2.577| 1.155|
| stddev|1.2576377271108936|0.362085771753194|
| min| 1.0| 1.0|
| max| 4.0| 2.0|
| 25%| 1.0| 1.0|
| 50%| 2.0| 1.0|
| 75%| 4.0| 1.0|
+-------+------------------+-----------------+
Sometimes, because of the confidential data issues, you can not deliver the real data and your clients may
ask more statistical results, such as deciles. You can apply the follwing function to achieve it.
describe_pd(df,num_cols,deciles=True)
+-------+------------------+-----------------+
|summary| Account Balance| No of dependents|
+-------+------------------+-----------------+
| count| 1000.0| 1000.0|
| mean| 2.577| 1.155|
| stddev|1.2576377271108936|0.362085771753194|
| min| 1.0| 1.0|
| max| 4.0| 2.0|
| 0%| 1.0| 1.0|
| 10%| 1.0| 1.0|
| 20%| 1.0| 1.0|
| 30%| 2.0| 1.0|
| 40%| 2.0| 1.0|
| 50%| 2.0| 1.0|
| 60%| 3.0| 1.0|
| 70%| 4.0| 1.0|
| 80%| 4.0| 1.0|
| 90%| 4.0| 2.0|
| 100%| 4.0| 2.0|
+-------+------------------+-----------------+
1. negative skew: The left tail is longer; the mass of the distribution is concentrated on the right of
the figure. The distribution is said to be left-skewed, left-tailed, or skewed to the left, despite the
fact that the curve itself appears to be skewed or leaning to the right; left instead refers to the left
tail being drawn out and, often, the mean being skewed to the left of a typical center of the data.
A left-skewed distribution usually appears as a right-leaning curve.
2. positive skew: The right tail is longer; the mass of the distribution is concentrated on the left of
the figure. The distribution is said to be right-skewed, right-tailed, or skewed to the right, despite
the fact that the curve itself appears to be skewed or leaning to the left; right instead refers to the
right tail being drawn out and, often, the mean being skewed to the right of a typical center of
the data. A right-skewed distribution usually appears as a left-leaning curve.
This subsection comes from Wikipedia Kurtosis.
In probability theory and statistics, kurtosis (kyrtos or kurtos, meaning “curved, arching”) is a measure
of the “tailedness” of the probability distribution of a real-valued random variable. In a similar way to
the concept of skewness, kurtosis is a descriptor of the shape of a probability distribution and, just as
for skewness, there are different ways of quantifying it for a theoretical distribution and corresponding
ways of estimating it from a sample from a population.
+---------------------+---------------------+
|skewness(Age (years))|kurtosis(Age (years))|
+---------------------+---------------------+
| 1.0231743160548064| 0.6114371688367672|
+---------------------+---------------------+
F. J. Anscombe once said that make both calculations and graphs. Both sorts of output should be stud-
ied; each will contribute to understanding. These 13 datasets in Figure Same Stats, Different Graphs (the
Datasaurus, plus 12 others) each have the same summary statistics (x/y mean, x/y standard deviation, and
Pearson’s correlation) to two decimal places, while being drastically different in appearance. This work
describes the technique we developed to create this dataset, and others like it. More details and interesting
results can be found in Same Stats Different Graphs.
• Histogram
The fundamental difference between histogram and bar graph will help you to identify the two easily is that
there are gaps between bars in a bar graph but in the histogram, the bars are adjacent to each other. The
interested reader is referred to Difference Between Histogram and Bar Graph.
plt.figure(figsize=(10,8))
# the histogram of the data
plt.hist(x, bins, alpha=0.8, histtype='bar', color='gold',
ec='black',weights=np.zeros_like(x) + 100. / x.size)
fig.savefig(var+".pdf", bbox_inches='tight')
########################################################################
hist, bin_edges = np.histogram(x,bins,
weights=np.zeros_like(x) + 100. / x.size)
# make the histogram
########################################################################
ax = fig.add_subplot(1, 2, 2)
# Plot the histogram heights against integers on the x axis
ax.bar(range(len(hist)),hist,width=1,alpha=0.8,ec ='black', color='gold')
# Set the xticklabels to a string that tells us what the bin edges were
labels =['{}'.format(int(bins[i+1])) for i,j in enumerate(hist)]
labels.insert(0,'0')
ax.set_xticklabels(labels)
plt.xlabel(var)
plt.ylabel('count')
plt.suptitle('Histogram of {}: Left with percentage output;Right with count
˓→output'
.format(var), size=16)
plt.show()
fig.savefig(var+".pdf", bbox_inches='tight')
Sometimes, some people will ask you to plot the unequal width (invalid argument for histogram) of the bars.
YOu can still achieve it by the follwing trick.
bins =[0,200,400,600,700,800,900,1000,2000,3000,4000,5000,6000,10000,25000]
# Set the xticklabels to a string that tells us what the bin edges were
#labels =['{}k'.format(int(bins[i+1]/1000)) for i,j in enumerate(hist)]
labels =['{}'.format(bins[i+1]) for i,j in enumerate(hist)]
labels.insert(0,'0')
ax.set_xticklabels(labels)
#plt.text(-0.6, -1.4,'0')
plt.xlabel(var)
plt.ylabel('percentage')
plt.show()
x = df.select(var).toPandas()
ax = fig.add_subplot(1, 2, 2)
ax = sns.violinplot(data=x)
Compared with the numerical variables, the categorical variables are much more easier to do the exploration.
• Frequency table
from pyspark.sql import functions as F
from pyspark.sql.functions import rank,sum,col
from pyspark.sql import Window
window = Window.rowsBetween(Window.unboundedPreceding,Window.
˓→unboundedFollowing)
# withColumn('Percent %',F.format_string("%5.0f%%\n",col('Credit_num')*100/
˓→col('total'))).\
+---------+----------+------------------+----------+----------+-------+
|age_class|Credit_num| Credit_avg|Credit_min|Credit_max|Percent|
(continues on next page)
• Pie plot
# Data to plot
labels = plot_data.age_class
sizes = plot_data.Percent
colors = ['gold', 'yellowgreen', 'lightcoral','blue', 'lightskyblue','green',
˓→'red']
explode = (0, 0.1, 0, 0,0,0) # explode 1st slice
# Plot
plt.figure(figsize=(10,8))
plt.pie(sizes, explode=explode, labels=labels, colors=colors,
autopct='%1.1f%%', shadow=True, startangle=140)
plt.axis('equal')
plt.show()
• Bar plot
labels = plot_data.age_class
missing = plot_data.Percent
ind = [x for x, _ in enumerate(labels)]
plt.figure(figsize=(10,8))
plt.bar(ind, missing, width=0.8, label='missing', color='gold')
plt.xticks(ind, labels)
plt.ylabel("percentage")
plt.show()
plt.figure(figsize=(10,8))
plt.bar(ind, women, width=0.8, label='women', color='gold',
˓→bottom=man+missing)
(continues on next page)
plt.xticks(ind, labels)
plt.ylabel("percentage")
plt.legend(loc="upper left")
plt.title("demo")
plt.show()
In this section, I will only demostrate the bivariate analysis. Since the multivariate analysis is the generation
of the bivariate.
• Correlation matrix
corr_data = df.select(num_cols)
col_names = corr_data.columns
features = corr_data.rdd.map(lambda row: row[0:])
corr_mat=Statistics.corr(features, method="pearson")
corr_df = pd.DataFrame(corr_mat)
corr_df.index, corr_df.columns = col_names, col_names
print(corr_df.to_string())
+--------------------+--------------------+
| Account Balance| No of dependents|
+--------------------+--------------------+
| 1.0|-0.01414542650320914|
|-0.01414542650320914| 1.0|
+--------------------+--------------------+
• Scatter Plot
df = sns.load_dataset("iris")
sns.pairplot(df, hue="species")
plt.show()
pValues: [0.687289278791,0.682270330336]
degreesOfFreedom: [2, 3]
statistics: [0.75,1.5]
• Cross table
df.stat.crosstab("age_class", "Occupation").show()
+--------------------+---+---+---+---+
|age_class_Occupation| 1| 2| 3| 4|
+--------------------+---+---+---+---+
| <25| 4| 34|108| 4|
| 55-64| 1| 15| 31| 9|
| 25-34| 7| 61|269| 60|
| 35-44| 4| 58|143| 49|
| 65+| 5| 3| 6| 9|
| 45-54| 1| 29| 73| 17|
+--------------------+---+---+---+---+
• Stacked plot
plt.figure(figsize=(10,8))
plt.bar(ind, women, width=0.8, label='women', color='gold',
˓→bottom=man+missing)
plt.xticks(ind, labels)
plt.ylabel("percentage")
plt.legend(loc="upper left")
plt.title("demo")
plt.show()
EIGHT
REGRESSION
Chinese proverb
A journey of a thousand miles begins with a single step. – old Chinese proverb
In statistical modeling, regression analysis focuses on investigating the relationship between a dependent
variable and one or more independent variables. Wikipedia Regression analysis
In data mining, Regression is a model to represent the relationship between the value of lable ( or target,
it is numerical variable) and on one or more features (or predictors they can be numerical and categorical
variables).
8.1.1 Introduction
𝑦𝑖 = 𝛽0 + 𝛽𝑗 𝑥𝑖𝑗 , where, 𝑖 = 1, · · · 𝑚, 𝑗 = 1, · · · 𝑛.
𝑦 = X𝛽. (8.1)
1. Direct Methods (For more information please refer to my Prelim Notes for Numerical Analysis)
79
Learning Apache Spark with Python
80 Chapter 8. Regression
Learning Apache Spark with Python
In mathematics, (8.1) is a overdetermined system. The method of ordinary least squares can be used to find
an approximate solution to overdetermined systems. For the system overdetermined system (8.1), the least
squares formula is obtained from the problem
where T indicates a matrix transpose, provided (XT X)−1 exists (that is, provided X has full column rank).
Note: Actually, (8.3) can be derivated by the following way: multiply X𝑇 on side of (8.1) and then multiply
(X𝑇 X)−1 on both side of the former result. You may also apply the Extreme Value Theorem to (8.2)
and find the solution (8.3).
ℎ𝛽 = 𝛽0 + 𝛽𝑗 𝑥𝑗 , where, 𝑗 = 1, · · · 𝑛.
Note: The reason why we prefer to solve (8.4) rather than (8.2) is because (8.4) is convex and it has some
nice properties, such as it’s uniquely solvable and energy stable for small enough learning rate. the interested
reader who has great interest in non-convex cost function (energy) case. is referred to [Feng2016PSD] for
more details.
Gradient descent is a first-order iterative optimization algorithm for finding the minimum of a func-
tion. It searchs with the direction of the steepest desscent which is defined by the negative of the
gradient (see Fig. Gradient Descent in 1D and Gradient Descent in 2D for 1D and 2D, respectively) and
with learning rate (search step) 𝛼.
82 Chapter 8. Regression
Learning Apache Spark with Python
8.1.9 Demo
• The Jupyter notebook can be download from Linear Regression which was implemented without using
Pipeline.
• The Jupyter notebook can be download from Linear Regression with Pipeline which was implemented
with using Pipeline.
• I will only present the code with pipeline style in the following.
• For more details about the parameters, please visit Linear Regression API .
1. Set up spark context and SparkSession
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark regression example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
2. Load dataset
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').\
load("../data/Advertising.csv",header=True);
root
|-- TV: double (nullable = true)
(continues on next page)
You can also get the Statistical resutls from the data frame (Unfortunately, it only works for numerical).
df.describe().show()
+-------+-----------------+------------------+------------------+-------------
˓→-----+
|summary| TV| Radio| Newspaper|
˓→Sales|
+-------+-----------------+------------------+------------------+-------------
˓→-----+
| count| 200| 200| 200|
˓→ 200|
| mean| 147.0425|23.264000000000024|30.553999999999995|14.
˓→022500000000003|
| stddev|85.85423631490805|14.846809176168728| 21.77862083852283| 5.
˓→217456565710477|
| min| 0.7| 0.0| 0.3|
˓→ 1.6|
| max| 296.4| 49.6| 114.0|
˓→ 27.0|
+-------+-----------------+------------------+------------------+-------------
˓→-----+
Note: You are strongly encouraged to try my get_dummy function for dealing with the categorical data
in comple dataset.
84 Chapter 8. Regression
Learning Apache Spark with Python
def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol):
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
˓→for encoder in encoders]
+ continuousCols, outputCol="features
˓→")
model=pipeline.fit(df)
data = model.transform(df)
(continues on next page)
data = data.withColumn('label',col(labelCol))
return data.select(indexCol,'features','label')
def get_dummy(df,indexCol,categoricalCols,continuousCols):
'''
Get dummy variables and concat with continuous variables for
˓→unsupervised learning.
:param df: the dataframe
:param categoricalCols: the name list of the categorical data
:param continuousCols: the name list of the numerical data
:return k: feature matrix
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
˓→for encoder in encoders]
+ continuousCols, outputCol="features
˓→")
model=pipeline.fit(df)
data = model.transform(df)
return data.select(indexCol,'features')
transformed= transData(df)
transformed.show(5)
+-----------------+-----+
| features|label|
+-----------------+-----+
(continues on next page)
86 Chapter 8. Regression
Learning Apache Spark with Python
Note: You will find out that all of the supervised machine learning algorithms in Spark are based on the
features and label (unsupervised machine learning algorithms in Spark are based on the features). That is
to say, you can play with all of the machine learning algorithms in Spark when you get ready the features
and label in pipeline architecture.
featureIndexer = VectorIndexer(inputCol="features", \
outputCol="indexedFeatures",\
maxCategories=4).fit(transformed)
data = featureIndexer.transform(transformed)
data.show(5,True)
+-----------------+-----+-----------------+
| features|label| indexedFeatures|
+-----------------+-----+-----------------+
|[230.1,37.8,69.2]| 22.1|[230.1,37.8,69.2]|
| [44.5,39.3,45.1]| 10.4| [44.5,39.3,45.1]|
| [17.2,45.9,69.3]| 9.3| [17.2,45.9,69.3]|
|[151.5,41.3,58.5]| 18.5|[151.5,41.3,58.5]|
|[180.8,10.8,58.4]| 12.9|[180.8,10.8,58.4]|
+-----------------+-----+-----------------+
only showing top 5 rows
6. Split the data into training and test sets (40% held out for testing)
# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = transformed.randomSplit([0.6, 0.4])
You can check your train and test data as follows (In my opinion, it is always to good to keep tracking your
data during prototype pahse):
trainingData.show(5)
testData.show(5)
+---------------+-----+---------------+
| features|label|indexedFeatures|
+---------------+-----+---------------+
| [4.1,11.6,5.7]| 3.2| [4.1,11.6,5.7]|
| [5.4,29.9,9.4]| 5.3| [5.4,29.9,9.4]|
|[7.3,28.1,41.4]| 5.5|[7.3,28.1,41.4]|
|[7.8,38.9,50.6]| 6.6|[7.8,38.9,50.6]|
| [8.6,2.1,1.0]| 4.8| [8.6,2.1,1.0]|
+---------------+-----+---------------+
only showing top 5 rows
+----------------+-----+----------------+
| features|label| indexedFeatures|
+----------------+-----+----------------+
| [0.7,39.6,8.7]| 1.6| [0.7,39.6,8.7]|
| [8.4,27.2,2.1]| 5.7| [8.4,27.2,2.1]|
|[11.7,36.9,45.2]| 7.3|[11.7,36.9,45.2]|
|[13.2,15.9,49.6]| 5.6|[13.2,15.9,49.6]|
|[16.9,43.7,89.4]| 8.7|[16.9,43.7,89.4]|
+----------------+-----+----------------+
only showing top 5 rows
8. Pipeline Architecture
model = pipeline.fit(trainingData)
88 Chapter 8. Regression
Learning Apache Spark with Python
for i in range(len(Summary.pValues)):
print ("##",'{:10.6f}'.format(coef[i]),\
'{:10.6f}'.format(Summary.coefficientStandardErrors[i]),\
'{:8.3f}'.format(Summary.tValues[i]),\
'{:10.6f}'.format(Summary.pValues[i]))
print ("##",'---')
print ("##","Mean squared error: % .6f" \
% Summary.meanSquaredError, ", RMSE: % .6f" \
% Summary.rootMeanSquaredError )
print ("##","Multiple R-squared: %f" % Summary.r2, ", \
Total iterations: %i"% Summary.totalIterations)
modelsummary(model.stages[-1])
+----------------+-----+------------------+
| features|label| prediction|
+----------------+-----+------------------+
| [0.7,39.6,8.7]| 1.6| 10.81405928637388|
| [8.4,27.2,2.1]| 5.7| 8.583086404079918|
|[11.7,36.9,45.2]| 7.3|10.814712818232422|
(continues on next page)
9. Evaluation
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
You can also check the 𝑅2 value for the test data:
y_true = predictions.select("label").toPandas()
y_pred = predictions.select("prediction").toPandas()
import sklearn.metrics
r2_score = sklearn.metrics.r2_score(y_true, y_pred)
print('r2_score: {0}'.format(r2_score))
r2_score: 0.854486655585
Warning: You should know most softwares are using different formula to calculate the 𝑅2 value
when no intercept is included in the model. You can get more information from the disscussion at
StackExchange.
8.2.1 Introduction
8.2.3 Demo
90 Chapter 8. Regression
Learning Apache Spark with Python
• For more details about the parameters, please visit Generalized Linear Regression API .
1. Set up spark context and SparkSession
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark regression example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
2. Load dataset
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').\
load("../data/Advertising.csv",header=True);
root
|-- TV: double (nullable = true)
|-- Radio: double (nullable = true)
|-- Newspaper: double (nullable = true)
|-- Sales: double (nullable = true)
You can also get the Statistical resutls from the data frame (Unfortunately, it only works for numerical).
df.describe().show()
Note: You are strongly encouraged to try my get_dummy function for dealing with the categorical data
in comple dataset.
Supervised learning version:
def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol):
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
˓→for encoder in encoders]
+ continuousCols, outputCol="features
˓→")
model=pipeline.fit(df)
data = model.transform(df)
data = data.withColumn('label',col(labelCol))
92 Chapter 8. Regression
Learning Apache Spark with Python
def get_dummy(df,indexCol,categoricalCols,continuousCols):
'''
Get dummy variables and concat with continuous variables for
˓→unsupervised learning.
:param df: the dataframe
:param categoricalCols: the name list of the categorical data
:param continuousCols: the name list of the numerical data
:return k: feature matrix
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
˓→for encoder in encoders]
+ continuousCols, outputCol="features
˓→")
model=pipeline.fit(df)
data = model.transform(df)
return data.select(indexCol,'features')
transformed= transData(df)
transformed.show(5)
+-----------------+-----+
| features|label|
+-----------------+-----+
|[230.1,37.8,69.2]| 22.1|
| [44.5,39.3,45.1]| 10.4|
| [17.2,45.9,69.3]| 9.3|
|[151.5,41.3,58.5]| 18.5|
|[180.8,10.8,58.4]| 12.9|
+-----------------+-----+
only showing top 5 rows
Note: You will find out that all of the machine learning algorithms in Spark are based on the features and
label. That is to say, you can play with all of the machine learning algorithms in Spark when you get ready
the features and label.
data= transData(df)
data.show()
94 Chapter 8. Regression
Learning Apache Spark with Python
data = featureIndexer.transform(transformed)
When you check you data at this point, you will get
+-----------------+-----+-----------------+
| features|label| indexedFeatures|
+-----------------+-----+-----------------+
|[230.1,37.8,69.2]| 22.1|[230.1,37.8,69.2]|
| [44.5,39.3,45.1]| 10.4| [44.5,39.3,45.1]|
| [17.2,45.9,69.3]| 9.3| [17.2,45.9,69.3]|
|[151.5,41.3,58.5]| 18.5|[151.5,41.3,58.5]|
|[180.8,10.8,58.4]| 12.9|[180.8,10.8,58.4]|
+-----------------+-----+-----------------+
only showing top 5 rows
6. Split the data into training and test sets (40% held out for testing)
# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = transformed.randomSplit([0.6, 0.4])
You can check your train and test data as follows (In my opinion, it is always to good to keep tracking your
data during prototype pahse):
trainingData.show(5)
testData.show(5)
+----------------+-----+----------------+
| features|label| indexedFeatures|
+----------------+-----+----------------+
| [5.4,29.9,9.4]| 5.3| [5.4,29.9,9.4]|
| [7.8,38.9,50.6]| 6.6| [7.8,38.9,50.6]|
| [8.4,27.2,2.1]| 5.7| [8.4,27.2,2.1]|
| [8.7,48.9,75.0]| 7.2| [8.7,48.9,75.0]|
|[11.7,36.9,45.2]| 7.3|[11.7,36.9,45.2]|
+----------------+-----+----------------+
only showing top 5 rows
+---------------+-----+---------------+
| features|label|indexedFeatures|
+---------------+-----+---------------+
| [0.7,39.6,8.7]| 1.6| [0.7,39.6,8.7]|
| [4.1,11.6,5.7]| 3.2| [4.1,11.6,5.7]|
|[7.3,28.1,41.4]| 5.5|[7.3,28.1,41.4]|
| [8.6,2.1,1.0]| 4.8| [8.6,2.1,1.0]|
|[17.2,4.1,31.6]| 5.9|[17.2,4.1,31.6]|
(continues on next page)
8. Pipeline Architecture
model = pipeline.fit(trainingData)
def modelsummary(model):
import numpy as np
print ("Note: the last rows are the information for Intercept")
print ("##","-------------------------------------------------")
print ("##"," Estimate | Std.Error | t Values | P-value")
coef = np.append(list(model.coefficients),model.intercept)
Summary=model.summary
for i in range(len(Summary.pValues)):
print ("##",'{:10.6f}'.format(coef[i]),\
'{:10.6f}'.format(Summary.coefficientStandardErrors[i]),\
'{:8.3f}'.format(Summary.tValues[i]),\
'{:10.6f}'.format(Summary.pValues[i]))
print ("##",'---')
# print ("##","Mean squared error: % .6f" \
# % Summary.meanSquaredError, ", RMSE: % .6f" \
# % Summary.rootMeanSquaredError )
# print ("##","Multiple R-squared: %f" % Summary.r2, ", \
# Total iterations: %i"% Summary.totalIterations)
modelsummary(model.stages[-1])
96 Chapter 8. Regression
Learning Apache Spark with Python
# Make predictions.
predictions = model.transform(testData)
+---------------+-----+------------------+
| features|label| prediction|
+---------------+-----+------------------+
| [0.7,39.6,8.7]| 1.6|10.937383732327625|
| [4.1,11.6,5.7]| 3.2| 5.491166258750164|
|[7.3,28.1,41.4]| 5.5| 8.8571603947873|
| [8.6,2.1,1.0]| 4.8| 3.793966281660073|
|[17.2,4.1,31.6]| 5.9| 4.502507124763654|
+---------------+-----+------------------+
only showing top 5 rows
11. Evaluation
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
y_true = predictions.select("label").toPandas()
y_pred = predictions.select("prediction").toPandas()
import sklearn.metrics
r2_score = sklearn.metrics.r2_score(y_true, y_pred)
print('r2_score: {0}'.format(r2_score))
r2_score: 0.87707391843
8.3.1 Introduction
8.3.3 Demo
spark = SparkSession \
.builder \
.appName("Python Spark regression example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
2. Load dataset
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').\
load("../data/Advertising.csv",header=True);
df.show(5,True)
df.printSchema()
+-----+-----+---------+-----+
| TV|Radio|Newspaper|Sales|
+-----+-----+---------+-----+
|230.1| 37.8| 69.2| 22.1|
| 44.5| 39.3| 45.1| 10.4|
| 17.2| 45.9| 69.3| 9.3|
|151.5| 41.3| 58.5| 18.5|
|180.8| 10.8| 58.4| 12.9|
+-----+-----+---------+-----+
only showing top 5 rows
98 Chapter 8. Regression
Learning Apache Spark with Python
You can also get the Statistical resutls from the data frame (Unfortunately, it only works for numerical).
df.describe().show()
Note: You are strongly encouraged to try my get_dummy function for dealing with the categorical data
in comple dataset.
Supervised learning version:
def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol):
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
˓→for encoder in encoders]
+ continuousCols, outputCol="features
˓→")
model=pipeline.fit(df)
data = model.transform(df)
data = data.withColumn('label',col(labelCol))
return data.select(indexCol,'features','label')
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
˓→for encoder in encoders]
+ continuousCols, outputCol="features
˓→")
model=pipeline.fit(df)
data = model.transform(df)
(continues on next page)
return data.select(indexCol,'features')
transformed= transData(df)
transformed.show(5)
+-----------------+-----+
| features|label|
+-----------------+-----+
|[230.1,37.8,69.2]| 22.1|
| [44.5,39.3,45.1]| 10.4|
| [17.2,45.9,69.3]| 9.3|
|[151.5,41.3,58.5]| 18.5|
|[180.8,10.8,58.4]| 12.9|
+-----------------+-----+
only showing top 5 rows
Note: You will find out that all of the machine learning algorithms in Spark are based on the features and
label. That is to say, you can play with all of the machine learning algorithms in Spark when you get ready
the features and label.
transformed = transData(df)
transformed.show(5)
featureIndexer = VectorIndexer(inputCol="features", \
outputCol="indexedFeatures",\
maxCategories=4).fit(transformed)
data = featureIndexer.transform(transformed)
When you check you data at this point, you will get
+-----------------+-----+-----------------+
| features|label| indexedFeatures|
+-----------------+-----+-----------------+
|[230.1,37.8,69.2]| 22.1|[230.1,37.8,69.2]|
| [44.5,39.3,45.1]| 10.4| [44.5,39.3,45.1]|
| [17.2,45.9,69.3]| 9.3| [17.2,45.9,69.3]|
|[151.5,41.3,58.5]| 18.5|[151.5,41.3,58.5]|
|[180.8,10.8,58.4]| 12.9|[180.8,10.8,58.4]|
+-----------------+-----+-----------------+
only showing top 5 rows
6. Split the data into training and test sets (40% held out for testing)
# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = transformed.randomSplit([0.6, 0.4])
You can check your train and test data as follows (In my opinion, it is always to good to keep tracking your
data during prototype pahse):
trainingData.show(5)
testData.show(5)
+---------------+-----+---------------+
| features|label|indexedFeatures|
+---------------+-----+---------------+
| [4.1,11.6,5.7]| 3.2| [4.1,11.6,5.7]|
|[7.3,28.1,41.4]| 5.5|[7.3,28.1,41.4]|
| [8.4,27.2,2.1]| 5.7| [8.4,27.2,2.1]|
| [8.6,2.1,1.0]| 4.8| [8.6,2.1,1.0]|
|[8.7,48.9,75.0]| 7.2|[8.7,48.9,75.0]|
+---------------+-----+---------------+
(continues on next page)
+----------------+-----+----------------+
| features|label| indexedFeatures|
+----------------+-----+----------------+
| [0.7,39.6,8.7]| 1.6| [0.7,39.6,8.7]|
| [5.4,29.9,9.4]| 5.3| [5.4,29.9,9.4]|
| [7.8,38.9,50.6]| 6.6| [7.8,38.9,50.6]|
|[17.2,45.9,69.3]| 9.3|[17.2,45.9,69.3]|
|[18.7,12.1,23.4]| 6.7|[18.7,12.1,23.4]|
+----------------+-----+----------------+
only showing top 5 rows
8. Pipeline Architecture
model = pipeline.fit(trainingData)
9. Make predictions
# Make predictions.
predictions = model.transform(testData)
+----------+-----+----------------+
|prediction|label| features|
+----------+-----+----------------+
| 7.2| 1.6| [0.7,39.6,8.7]|
| 7.3| 5.3| [5.4,29.9,9.4]|
| 7.2| 6.6| [7.8,38.9,50.6]|
| 8.64| 9.3|[17.2,45.9,69.3]|
| 6.45| 6.7|[18.7,12.1,23.4]|
+----------+-----+----------------+
only showing top 5 rows
10. Evaluation
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
y_true = predictions.select("label").toPandas()
y_pred = predictions.select("prediction").toPandas()
import sklearn.metrics
r2_score = sklearn.metrics.r2_score(y_true, y_pred)
print('r2_score: {0}'.format(r2_score))
8.4.1 Introduction
8.4.3 Demo
spark = SparkSession \
(continues on next page)
2. Load dataset
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').\
load("../data/Advertising.csv",header=True);
df.show(5,True)
df.printSchema()
+-----+-----+---------+-----+
| TV|Radio|Newspaper|Sales|
+-----+-----+---------+-----+
|230.1| 37.8| 69.2| 22.1|
| 44.5| 39.3| 45.1| 10.4|
| 17.2| 45.9| 69.3| 9.3|
|151.5| 41.3| 58.5| 18.5|
|180.8| 10.8| 58.4| 12.9|
+-----+-----+---------+-----+
only showing top 5 rows
root
|-- TV: double (nullable = true)
|-- Radio: double (nullable = true)
|-- Newspaper: double (nullable = true)
|-- Sales: double (nullable = true)
df.describe().show()
+-------+-----------------+------------------+------------------+-------------
˓→-----+
Note: You are strongly encouraged to try my get_dummy function for dealing with the categorical data
in comple dataset.
Supervised learning version:
def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol):
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
˓→for encoder in encoders]
+ continuousCols, outputCol="features
˓→")
model=pipeline.fit(df)
data = model.transform(df)
data = data.withColumn('label',col(labelCol))
return data.select(indexCol,'features','label')
def get_dummy(df,indexCol,categoricalCols,continuousCols):
'''
Get dummy variables and concat with continuous variables for
˓→unsupervised learning.
:param df: the dataframe
:param categoricalCols: the name list of the categorical data
:param continuousCols: the name list of the numerical data
:return k: feature matrix
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
˓→for encoder in encoders]
+ continuousCols, outputCol="features
˓→")
model=pipeline.fit(df)
data = model.transform(df)
return data.select(indexCol,'features')
+-----------------+-----+
| features|label|
+-----------------+-----+
|[230.1,37.8,69.2]| 22.1|
| [44.5,39.3,45.1]| 10.4|
| [17.2,45.9,69.3]| 9.3|
(continues on next page)
featureIndexer = VectorIndexer(inputCol="features", \
outputCol="indexedFeatures",\
maxCategories=4).fit(transformed)
data = featureIndexer.transform(transformed)
data.show(5,True)
+-----------------+-----+-----------------+
| features|label| indexedFeatures|
+-----------------+-----+-----------------+
|[230.1,37.8,69.2]| 22.1|[230.1,37.8,69.2]|
| [44.5,39.3,45.1]| 10.4| [44.5,39.3,45.1]|
| [17.2,45.9,69.3]| 9.3| [17.2,45.9,69.3]|
|[151.5,41.3,58.5]| 18.5|[151.5,41.3,58.5]|
|[180.8,10.8,58.4]| 12.9|[180.8,10.8,58.4]|
+-----------------+-----+-----------------+
only showing top 5 rows
6. Split the data into training and test sets (40% held out for testing)
# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = data.randomSplit([0.6, 0.4])
trainingData.show(5)
testData.show(5)
+----------------+-----+----------------+
| features|label| indexedFeatures|
+----------------+-----+----------------+
| [0.7,39.6,8.7]| 1.6| [0.7,39.6,8.7]|
| [8.6,2.1,1.0]| 4.8| [8.6,2.1,1.0]|
| [8.7,48.9,75.0]| 7.2| [8.7,48.9,75.0]|
|[11.7,36.9,45.2]| 7.3|[11.7,36.9,45.2]|
|[13.2,15.9,49.6]| 5.6|[13.2,15.9,49.6]|
+----------------+-----+----------------+
only showing top 5 rows
+---------------+-----+---------------+
| features|label|indexedFeatures|
(continues on next page)
Note: If you decide to use the indexedFeatures features, you need to add the parameter
featuresCol="indexedFeatures".
8. Pipeline Architecture
# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])
model = pipeline.fit(trainingData)
9. Make predictions
predictions = model.transform(testData)
+---------------+-----+------------------+
| features|label| prediction|
+---------------+-----+------------------+
| [4.1,11.6,5.7]| 3.2| 8.155439814814816|
| [5.4,29.9,9.4]| 5.3|10.412769901394899|
|[7.3,28.1,41.4]| 5.5| 12.13735648148148|
|[7.8,38.9,50.6]| 6.6|11.321796703296704|
| [8.4,27.2,2.1]| 5.7|12.071421957671957|
+---------------+-----+------------------+
only showing top 5 rows
10. Evaluation
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
(continues on next page)
import sklearn.metrics
r2_score = sklearn.metrics.r2_score(y_true, y_pred)
print('r2_score: {:4.3f}'.format(r2_score))
r2_score: 0.831
model.stages[-1].featureImportances
model.stages[-1].trees
8.5.1 Introduction
8.5.3 Demo
spark = SparkSession \
.builder \
.appName("Python Spark GBTRegressor example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
2. Load dataset
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').\
load("../data/Advertising.csv",header=True);
df.show(5,True)
df.printSchema()
+-----+-----+---------+-----+
| TV|Radio|Newspaper|Sales|
+-----+-----+---------+-----+
|230.1| 37.8| 69.2| 22.1|
| 44.5| 39.3| 45.1| 10.4|
| 17.2| 45.9| 69.3| 9.3|
|151.5| 41.3| 58.5| 18.5|
|180.8| 10.8| 58.4| 12.9|
+-----+-----+---------+-----+
only showing top 5 rows
root
|-- TV: double (nullable = true)
|-- Radio: double (nullable = true)
|-- Newspaper: double (nullable = true)
|-- Sales: double (nullable = true)
df.describe().show()
Note: You are strongly encouraged to try my get_dummy function for dealing with the categorical data
in comple dataset.
Supervised learning version:
def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol):
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
˓→for encoder in encoders]
+ continuousCols, outputCol="features
˓→")
model=pipeline.fit(df)
(continues on next page)
data = data.withColumn('label',col(labelCol))
return data.select(indexCol,'features','label')
def get_dummy(df,indexCol,categoricalCols,continuousCols):
'''
Get dummy variables and concat with continuous variables for
˓→unsupervised learning.
:param df: the dataframe
:param categoricalCols: the name list of the categorical data
:param continuousCols: the name list of the numerical data
:return k: feature matrix
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
˓→for encoder in encoders]
+ continuousCols, outputCol="features
˓→")
model=pipeline.fit(df)
data = model.transform(df)
return data.select(indexCol,'features')
+-----------------+-----+
| features|label|
+-----------------+-----+
|[230.1,37.8,69.2]| 22.1|
| [44.5,39.3,45.1]| 10.4|
| [17.2,45.9,69.3]| 9.3|
|[151.5,41.3,58.5]| 18.5|
|[180.8,10.8,58.4]| 12.9|
+-----------------+-----+
only showing top 5 rows
featureIndexer = VectorIndexer(inputCol="features", \
outputCol="indexedFeatures",\
maxCategories=4).fit(transformed)
data = featureIndexer.transform(transformed)
data.show(5,True)
+-----------------+-----+-----------------+
| features|label| indexedFeatures|
+-----------------+-----+-----------------+
|[230.1,37.8,69.2]| 22.1|[230.1,37.8,69.2]|
| [44.5,39.3,45.1]| 10.4| [44.5,39.3,45.1]|
| [17.2,45.9,69.3]| 9.3| [17.2,45.9,69.3]|
|[151.5,41.3,58.5]| 18.5|[151.5,41.3,58.5]|
|[180.8,10.8,58.4]| 12.9|[180.8,10.8,58.4]|
+-----------------+-----+-----------------+
only showing top 5 rows
6. Split the data into training and test sets (40% held out for testing)
# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = data.randomSplit([0.6, 0.4])
(continues on next page)
trainingData.show(5)
testData.show(5)
+----------------+-----+----------------+
| features|label| indexedFeatures|
+----------------+-----+----------------+
| [0.7,39.6,8.7]| 1.6| [0.7,39.6,8.7]|
| [8.6,2.1,1.0]| 4.8| [8.6,2.1,1.0]|
| [8.7,48.9,75.0]| 7.2| [8.7,48.9,75.0]|
|[11.7,36.9,45.2]| 7.3|[11.7,36.9,45.2]|
|[13.2,15.9,49.6]| 5.6|[13.2,15.9,49.6]|
+----------------+-----+----------------+
only showing top 5 rows
+---------------+-----+---------------+
| features|label|indexedFeatures|
+---------------+-----+---------------+
| [4.1,11.6,5.7]| 3.2| [4.1,11.6,5.7]|
| [5.4,29.9,9.4]| 5.3| [5.4,29.9,9.4]|
|[7.3,28.1,41.4]| 5.5|[7.3,28.1,41.4]|
|[7.8,38.9,50.6]| 6.6|[7.8,38.9,50.6]|
| [8.4,27.2,2.1]| 5.7| [8.4,27.2,2.1]|
+---------------+-----+---------------+
only showing top 5 rows
Note: If you decide to use the indexedFeatures features, you need to add the parameter
featuresCol="indexedFeatures".
8. Pipeline Architecture
9. Make predictions
predictions = model.transform(testData)
+----------------+-----+------------------+
| features|label| prediction|
+----------------+-----+------------------+
| [7.8,38.9,50.6]| 6.6| 6.836040343319862|
| [8.6,2.1,1.0]| 4.8| 5.652202764688849|
| [8.7,48.9,75.0]| 7.2| 6.908750296855572|
| [13.1,0.4,25.6]| 5.3| 5.784020210692574|
|[19.6,20.1,17.0]| 7.6|6.8678921062629295|
+----------------+-----+------------------+
only showing top 5 rows
10. Evaluation
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
import sklearn.metrics
r2_score = sklearn.metrics.r2_score(y_true, y_pred)
print('r2_score: {:4.3f}'.format(r2_score))
r2_score: 0.932
model.stages[-1].trees
NINE
REGULARIZATION
In mathematics, statistics, and computer science, particularly in the fields of machine learning and inverse
problems, regularization is a process of introducing additional information in order to solve an ill-posed
problem or to prevent overfitting (Wikipedia Regularization).
Due to the sparsity within our data, our training sets will often be ill-posed (singular). Applying regulariza-
tion to the regression has many advantages, including:
1. Converting ill-posed problems to well-posed by adding additional information via the penalty param-
eter 𝜆
2. Preventing overfitting
3. Variable selection and the removal of correlated variables (Glmnet Vignette). The Ridge method
shrinks the coefficients of correlated variables while the LASSO method picks one variable and dis-
cards the others. The elastic net penalty is a mixture of these two; if variables are correlated in groups
then 𝛼 = 0.5 tends to select the groups as in or out. If 𝛼 is close to 1, the elastic net performs
much like the LASSO method and removes any degeneracies and wild behavior caused by extreme
correlations.
1
min𝑛 ‖X𝛽 − 𝑦‖2
𝛽∈R 𝑛
When 𝜆 = 0 (i.e. regParam = 0), then there is no penalty.
1
min𝑛 ‖X𝛽 − 𝑦‖2 + 𝜆‖𝛽‖22
𝛽∈R 𝑛
119
Learning Apache Spark with Python
When 𝜆 > 0 (i.e. regParam > 0) and 𝛼 = 0 (i.e. elasticNetParam = 0) , then the penalty is an L2
penalty.
1
min𝑛 ‖X𝛽 − 𝑦‖2 + 𝜆‖𝛽‖1
𝛽∈R 𝑛
When 𝜆 > 0 (i.e. regParam > 0) and 𝛼 = 1 (i.e. elasticNetParam = 1), then the penalty is an L1
penalty.
weightCol=None, aggregationDepth=2)
1
min ‖X𝛽 − 𝑦‖2 + 𝜆(𝛼‖𝛽‖1 + (1 − 𝛼)‖𝛽‖22 ), 𝛼 ∈ (0, 1)
𝛽∈R𝑛 𝑛
When 𝜆 > 0 (i.e. regParam > 0) and elasticNetParam ∈ (0, 1) (i.e. 𝛼 ∈ (0, 1)) , then the penalty is
an L1 + L2 penalty.
TEN
CLASSIFICATION
Chinese proverb
Birds of a feather folock together. – old Chinese proverb
10.1.1 Introduction
10.1.2 Demo
Note: In this demo, I introduced a new function get_dummy to deal with the categorical data. I highly
recommend you to use my get_dummy function in the other cases. This function will save a lot of time for
you.
spark = SparkSession \
.builder \
.appName("Python Spark Logistic Regression example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
2. Load dataset
df = spark.read.format('com.databricks.spark.csv') \
.options(header='true', inferschema='true') \
(continues on next page)
121
Learning Apache Spark with Python
+---+------------+-------+---------+-------+-------+-------+----+-------+-----
˓→---+--------+-----+--------+---+
|age|
˓→job|marital|education|default|balance|housing|loan|contact|duration|campaign|pdays|previo
˓→ y|
+---+------------+-------+---------+-------+-------+-------+----+-------+-----
˓→---+--------+-----+--------+---+
| 58| management|married| tertiary| no| 2143| yes| no|unknown|
˓→261| 1| -1| 0| no|
| 44| technician| single|secondary| no| 29| yes| no|unknown|
˓→151| 1| -1| 0| no|
| 33|entrepreneur|married|secondary| no| 2| yes| yes|unknown|
˓→ 76| 1| -1| 0| no|
| 47| blue-collar|married| unknown| no| 1506| yes| no|unknown|
˓→ 92| 1| -1| 0| no|
| 33| unknown| single| unknown| no| 1| no| no|unknown|
˓→198| 1| -1| 0| no|
+---+------------+-------+---------+-------+-------+-------+----+-------+-----
˓→---+--------+-----+--------+---+
only showing top 5 rows
df.printSchema()
root
|-- age: integer (nullable = true)
|-- job: string (nullable = true)
|-- marital: string (nullable = true)
|-- education: string (nullable = true)
|-- default: string (nullable = true)
|-- balance: integer (nullable = true)
|-- housing: string (nullable = true)
|-- loan: string (nullable = true)
|-- contact: string (nullable = true)
|-- day: integer (nullable = true)
|-- month: string (nullable = true)
|-- duration: integer (nullable = true)
|-- campaign: integer (nullable = true)
|-- pdays: integer (nullable = true)
|-- previous: integer (nullable = true)
|-- poutcome: string (nullable = true)
|-- y: string (nullable = true)
Note: You are strongly encouraged to try my get_dummy function for dealing with the categorical data
in complex dataset.
Supervised learning version:
def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol):
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
˓→for encoder in encoders]
+ continuousCols, outputCol="features
˓→")
model=pipeline.fit(df)
data = model.transform(df)
data = data.withColumn('label',col(labelCol))
return data.select(indexCol,'features','label')
def get_dummy(df,indexCol,categoricalCols,continuousCols):
'''
Get dummy variables and concat with continuous variables for
˓→unsupervised learning.
:param df: the dataframe
:param categoricalCols: the name list of the categorical data
:param continuousCols: the name list of the numerical data
:return k: feature matrix
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
˓→for encoder in encoders]
+ continuousCols, outputCol="features
˓→")
model=pipeline.fit(df)
data = model.transform(df)
return data.select(indexCol,'features')
def get_dummy(df,categoricalCols,continuousCols,labelCol):
model=pipeline.fit(df)
data = model.transform(df)
data = data.withColumn('label',col(labelCol))
return data.select('features','label')
3. Deal with categorical data and Convert the data to dense vector
catcols = ['job','marital','education','default',
'housing','loan','contact','poutcome']
data = get_dummy(df,catcols,num_cols,labelCol)
data.show(5)
+--------------------+-----+
| features|label|
+--------------------+-----+
|(29,[1,11,14,16,1...| no|
|(29,[2,12,13,16,1...| no|
|(29,[7,11,13,16,1...| no|
|(29,[0,11,16,17,1...| no|
|(29,[12,16,18,20,...| no|
+--------------------+-----+
only showing top 5 rows
+--------------------+-----+------------+
| features|label|indexedLabel|
+--------------------+-----+------------+
|(29,[1,11,14,16,1...| no| 0.0|
|(29,[2,12,13,16,1...| no| 0.0|
|(29,[7,11,13,16,1...| no| 0.0|
|(29,[0,11,16,17,1...| no| 0.0|
|(29,[12,16,18,20,...| no| 0.0|
+--------------------+-----+------------+
only showing top 5 rows
+--------------------+-----+--------------------+
| features|label| indexedFeatures|
+--------------------+-----+--------------------+
|(29,[1,11,14,16,1...| no|(29,[1,11,14,16,1...|
|(29,[2,12,13,16,1...| no|(29,[2,12,13,16,1...|
|(29,[7,11,13,16,1...| no|(29,[7,11,13,16,1...|
(continues on next page)
# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = data.randomSplit([0.6, 0.4])
trainingData.show(5,False)
testData.show(5,False)
+-----------------------------------------------------------------------------
˓→--------------------+-----+
|features
˓→ |label|
+-----------------------------------------------------------------------------
˓→--------------------+-----+
|(29,[0,11,13,16,17,18,19,21,24,25,26,27],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,-
˓→731.0,401.0,4.0,-1.0])|no |
|(29,[0,11,13,16,17,18,19,21,24,25,26,27],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,-
˓→723.0,112.0,2.0,-1.0])|no |
|(29,[0,11,13,16,17,18,19,21,24,25,26,27],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,-
˓→626.0,205.0,1.0,-1.0])|no |
|(29,[0,11,13,16,17,18,19,21,24,25,26,27],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,-
˓→498.0,357.0,1.0,-1.0])|no |
|(29,[0,11,13,16,17,18,19,21,24,25,26,27],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,-
˓→477.0,473.0,2.0,-1.0])|no |
+-----------------------------------------------------------------------------
˓→--------------------+-----+
only showing top 5 rows
+-----------------------------------------------------------------------------
˓→--------------------+-----+
|features
˓→ |label|
+-----------------------------------------------------------------------------
˓→--------------------+-----+
|(29,[0,11,13,16,17,18,19,21,24,25,26,27],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,-
˓→648.0,280.0,2.0,-1.0])|no |
|(29,[0,11,13,16,17,18,19,21,24,25,26,27],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,-
˓→596.0,147.0,1.0,-1.0])|no |
|(29,[0,11,13,16,17,18,19,21,24,25,26,27],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,-
˓→529.0,416.0,4.0,-1.0])|no |
|(29,[0,11,13,16,17,18,19,21,24,25,26,27],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,-
˓→518.0,46.0,5.0,-1.0]) |no |
|(29,[0,11,13,16,17,18,19,21,24,25,26,27],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,-
˓→470.0,275.0,2.0,-1.0])|no |
+-----------------------------------------------------------------------------
˓→--------------------+-----+
7. Pipeline Architecture
8. Make predictions
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)
+--------------------+-----+--------------+
| features|label|predictedLabel|
+--------------------+-----+--------------+
|(29,[0,11,13,16,1...| no| no|
|(29,[0,11,13,16,1...| no| no|
|(29,[0,11,13,16,1...| no| no|
|(29,[0,11,13,16,1...| no| no|
|(29,[0,11,13,16,1...| no| no|
+--------------------+-----+--------------+
only showing top 5 rows
9. Evaluation
lrModel = model.stages[2]
trainingSummary = lrModel.summary
You can use z.show() to get the data and plot the ROC curves:
You can also register a TempTable data.registerTempTable('roc_data') and then use sql to
plot the ROC curve:
10. visualization
import matplotlib.pyplot as plt
import numpy as np
import itertools
print(cm)
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
class_temp = predictions.select("label").groupBy("label")\
.count().sort('count', ascending=False).toPandas()
(continues on next page)
['no', 'yes']
y_pred = predictions.select("predictedLabel")
y_pred = y_pred.toPandas()
array([[15657, 379],
[ 1410, 667]])
plt.show()
10.2.1 Introduction
10.2.2 Demo
Note: In this demo, I introduced a new function get_dummy to deal with the categorical data. I highly
recommend you to use my get_dummy function in the other cases. This function will save a lot of time for
you.
spark = SparkSession \
.builder \
.appName("Python Spark MultinomialLogisticRegression classification") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
2. Load dataset
df = spark.read.format('com.databricks.spark.csv') \
.options(header='true', inferschema='true') \
.load("./data/WineData2.csv",header=True);
df.show(5)
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
|fixed|volatile|citric|sugar|chlorides|free|total|density|
˓→pH|sulphates|alcohol|quality|
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
| 7.4| 0.7| 0.0| 1.9| 0.076|11.0| 34.0| 0.9978|3.51| 0.56|
˓→9.4| 5|
| 7.8| 0.88| 0.0| 2.6| 0.098|25.0| 67.0| 0.9968| 3.2| 0.68|
˓→9.8| 5|
| 7.8| 0.76| 0.04| 2.3| 0.092|15.0| 54.0| 0.997|3.26| 0.65|
˓→9.8| 5|
| 11.2| 0.28| 0.56| 1.9| 0.075|17.0| 60.0| 0.998|3.16| 0.58|
˓→9.8| 6|
| 7.4| 0.7| 0.0| 1.9| 0.076|11.0| 34.0| 0.9978|3.51| 0.56|
˓→9.4| 5|
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
only showing top 5 rows
df.printSchema()
root
|-- fixed: double (nullable = true)
|-- volatile: double (nullable = true)
|-- citric: double (nullable = true)
|-- sugar: double (nullable = true)
|-- chlorides: double (nullable = true)
|-- free: double (nullable = true)
|-- total: double (nullable = true)
|-- density: double (nullable = true)
|-- pH: double (nullable = true)
|-- sulphates: double (nullable = true)
|-- alcohol: double (nullable = true)
|-- quality: string (nullable = true)
#
def condition(r):
if (0<= r <= 4):
label = "low"
elif(4< r <= 6):
label = "medium"
else:
label = "high"
return label
df = df.withColumn("quality", quality_udf("quality"))
df.show(5,True)
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
|fixed|volatile|citric|sugar|chlorides|free|total|density|
˓→pH|sulphates|alcohol|quality|
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
| 7.4| 0.7| 0.0| 1.9| 0.076|11.0| 34.0| 0.9978|3.51| 0.56|
˓→9.4| medium|
df.printSchema()
root
|-- fixed: double (nullable = true)
|-- volatile: double (nullable = true)
|-- citric: double (nullable = true)
|-- sugar: double (nullable = true)
|-- chlorides: double (nullable = true)
|-- free: double (nullable = true)
|-- total: double (nullable = true)
|-- density: double (nullable = true)
|-- pH: double (nullable = true)
|-- sulphates: double (nullable = true)
|-- alcohol: double (nullable = true)
|-- quality: string (nullable = true)
3. Deal with categorical data and Convert the data to dense vector
Note: You are strongly encouraged to try my get_dummy function for dealing with the categorical data
in complex dataset.
Supervised learning version:
def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol):
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
˓→ for encoder in encoders]
(continues on next page)
model=pipeline.fit(df)
data = model.transform(df)
data = data.withColumn('label',col(labelCol))
return data.select(indexCol,'features','label')
def get_dummy(df,indexCol,categoricalCols,continuousCols):
'''
Get dummy variables and concat with continuous variables for
˓→unsupervised learning.
:param df: the dataframe
:param categoricalCols: the name list of the categorical data
:param continuousCols: the name list of the numerical data
:return k: feature matrix
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
˓→for encoder in encoders]
+ continuousCols, outputCol="features
˓→")
model=pipeline.fit(df)
data = model.transform(df)
return data.select(indexCol,'features')
def get_dummy(df,categoricalCols,continuousCols,labelCol):
(continues on next page)
model=pipeline.fit(df)
data = model.transform(df)
data = data.withColumn('label',col(labelCol))
return data.select('features','label')
def transData(data):
return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features',
˓→'label'])
transformed = transData(df)
transformed.show(5)
+--------------------+------+
| features| label|
+--------------------+------+
|[7.4,0.7,0.0,1.9,...|medium|
|[7.8,0.88,0.0,2.6...|medium|
|[7.8,0.76,0.04,2....|medium|
|[11.2,0.28,0.56,1...|medium|
|[7.4,0.7,0.0,1.9,...|medium|
(continues on next page)
+--------------------+------+------------+
| features| label|indexedLabel|
+--------------------+------+------------+
|[7.4,0.7,0.0,1.9,...|medium| 0.0|
|[7.8,0.88,0.0,2.6...|medium| 0.0|
|[7.8,0.76,0.04,2....|medium| 0.0|
|[11.2,0.28,0.56,1...|medium| 0.0|
|[7.4,0.7,0.0,1.9,...|medium| 0.0|
+--------------------+------+------------+
only showing top 5 rows
+--------------------+------+--------------------+
| features| label| indexedFeatures|
+--------------------+------+--------------------+
|[7.4,0.7,0.0,1.9,...|medium|[7.4,0.7,0.0,1.9,...|
|[7.8,0.88,0.0,2.6...|medium|[7.8,0.88,0.0,2.6...|
|[7.8,0.76,0.04,2....|medium|[7.8,0.76,0.04,2....|
|[11.2,0.28,0.56,1...|medium|[11.2,0.28,0.56,1...|
|[7.4,0.7,0.0,1.9,...|medium|[7.4,0.7,0.0,1.9,...|
+--------------------+------+--------------------+
only showing top 5 rows
trainingData.show(5,False)
testData.show(5,False)
+---------------------------------------------------------+------+
|features |label |
(continues on next page)
+---------------------------------------------------------+------+
|features |label |
+---------------------------------------------------------+------+
|[4.6,0.52,0.15,2.1,0.054,8.0,65.0,0.9934,3.9,0.56,13.1] |low |
|[4.9,0.42,0.0,2.1,0.048,16.0,42.0,0.99154,3.71,0.74,14.0]|high |
|[5.0,0.42,0.24,2.0,0.06,19.0,50.0,0.9917,3.72,0.74,14.0] |high |
|[5.0,1.02,0.04,1.4,0.045,41.0,85.0,0.9938,3.75,0.48,10.5]|low |
|[5.0,1.04,0.24,1.6,0.05,32.0,96.0,0.9934,3.74,0.62,11.5] |medium|
+---------------------------------------------------------+------+
only showing top 5 rows
7. Pipeline Architecture
8. Make predictions
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)
+--------------------+------+--------------+
| features| label|predictedLabel|
+--------------------+------+--------------+
(continues on next page)
9. Evaluation
lrModel = model.stages[2]
trainingSummary = lrModel.summary
# .select('threshold').head()['threshold']
# lr.setThreshold(bestThreshold)
You can use z.show() to get the data and plot the ROC curves:
You can also register a TempTable data.registerTempTable('roc_data') and then use sql to
plot the ROC curve:
10. visualization
print(cm)
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
class_temp = predictions.select("label").groupBy("label")\
.count().sort('count', ascending=False).toPandas()
class_temp = class_temp["label"].values.tolist()
class_names = map(str, class_temp)
# # # print(class_name)
class_names
plt.show()
10.3.1 Introduction
10.3.2 Demo
2. Load dataset
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true') \
.load("../data/WineData2.csv",header=True);
df.show(5,True)
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
|fixed|volatile|citric|sugar|chlorides|free|total|density|
˓→pH|sulphates|alcohol|quality|
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
| 7.4| 0.7| 0.0| 1.9| 0.076|11.0| 34.0| 0.9978|3.51| 0.56|
˓→9.4| 5|
| 7.8| 0.88| 0.0| 2.6| 0.098|25.0| 67.0| 0.9968| 3.2| 0.68|
˓→9.8| 5|
| 7.8| 0.76| 0.04| 2.3| 0.092|15.0| 54.0| 0.997|3.26| 0.65|
˓→9.8| 5|
| 11.2| 0.28| 0.56| 1.9| 0.075|17.0| 60.0| 0.998|3.16| 0.58|
˓→9.8| 6|
| 7.4| 0.7| 0.0| 1.9| 0.076|11.0| 34.0| 0.9978|3.51| 0.56|
˓→9.4| 5|
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
only showing top 5 rows
#
def condition(r):
if (0<= r <= 4):
label = "low"
elif(4< r <= 6):
label = "medium"
else:
label = "high"
return label
df = df.withColumn("quality", quality_udf("quality"))
df.show(5,True)
df.printSchema()
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
|fixed|volatile|citric|sugar|chlorides|free|total|density|
˓→pH|sulphates|alcohol|quality|
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
root
|-- fixed: double (nullable = true)
|-- volatile: double (nullable = true)
|-- citric: double (nullable = true)
|-- sugar: double (nullable = true)
|-- chlorides: double (nullable = true)
|-- free: double (nullable = true)
|-- total: double (nullable = true)
|-- density: double (nullable = true)
|-- pH: double (nullable = true)
|-- sulphates: double (nullable = true)
|-- alcohol: double (nullable = true)
|-- quality: string (nullable = true)
Note: You are strongly encouraged to try my get_dummy function for dealing with the categorical data
in complex dataset.
Supervised learning version:
def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol):
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
˓→for encoder in encoders]
+ continuousCols, outputCol="features
˓→")
model=pipeline.fit(df)
data = model.transform(df)
data = data.withColumn('label',col(labelCol))
return data.select(indexCol,'features','label')
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
˓→for encoder in encoders]
+ continuousCols, outputCol="features
˓→")
model=pipeline.fit(df)
data = model.transform(df)
return data.select(indexCol,'features')
def transData(data):
return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF([
˓→'features','label'])
transformed = transData(df)
transformed.show(5)
+--------------------+------+
| features| label|
+--------------------+------+
|[7.4,0.7,0.0,1.9,...|medium|
|[7.8,0.88,0.0,2.6...|medium|
|[7.8,0.76,0.04,2....|medium|
|[11.2,0.28,0.56,1...|medium|
|[7.4,0.7,0.0,1.9,...|medium|
+--------------------+------+
only showing top 5 rows
+--------------------+------+------------+
| features| label|indexedLabel|
+--------------------+------+------------+
|[7.4,0.7,0.0,1.9,...|medium| 0.0|
|[7.8,0.88,0.0,2.6...|medium| 0.0|
|[7.8,0.76,0.04,2....|medium| 0.0|
|[11.2,0.28,0.56,1...|medium| 0.0|
|[7.4,0.7,0.0,1.9,...|medium| 0.0|
+--------------------+------+------------+
only showing top 5 rows
+--------------------+------+--------------------+
| features| label| indexedFeatures|
+--------------------+------+--------------------+
|[7.4,0.7,0.0,1.9,...|medium|[7.4,0.7,0.0,1.9,...|
|[7.8,0.88,0.0,2.6...|medium|[7.8,0.88,0.0,2.6...|
|[7.8,0.76,0.04,2....|medium|[7.8,0.76,0.04,2....|
|[11.2,0.28,0.56,1...|medium|[11.2,0.28,0.56,1...|
|[7.4,0.7,0.0,1.9,...|medium|[7.4,0.7,0.0,1.9,...|
+--------------------+------+--------------------+
only showing top 5 rows
# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = transformed.randomSplit([0.6, 0.4])
trainingData.show(5)
testData.show(5)
+--------------------+------+
| features| label|
+--------------------+------+
|[4.6,0.52,0.15,2....| low|
|[4.7,0.6,0.17,2.3...|medium|
|[5.0,1.02,0.04,1....| low|
|[5.0,1.04,0.24,1....|medium|
|[5.1,0.585,0.0,1....| high|
+--------------------+------+
only showing top 5 rows
+--------------------+------+
| features| label|
(continues on next page)
8. Pipeline Architecture
9. Make predictions
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)
+--------------------+------+--------------+
| features| label|predictedLabel|
+--------------------+------+--------------+
|[4.9,0.42,0.0,2.1...| high| high|
|[5.0,0.38,0.01,1....|medium| medium|
|[5.0,0.4,0.5,4.3,...|medium| medium|
|[5.0,0.42,0.24,2....| high| medium|
|[5.0,0.74,0.0,1.2...|medium| medium|
+--------------------+------+--------------+
only showing top 5 rows
10. Evaluation
rfModel = model.stages[-2]
print(rfModel) # summary only
11. visualization
print(cm)
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
class_temp = predictions.select("label").groupBy("label")\
.count().sort('count', ascending=False).toPandas()
class_temp = class_temp["label"].values.tolist()
class_names = map(str, class_temp)
# # # print(class_name)
class_names
y_pred = predictions.select("predictedLabel")
y_pred = y_pred.toPandas()
plt.show()
10.4.1 Introduction
10.4.2 Demo
spark = SparkSession \
.builder \
.appName("Python Spark Decision Tree classification") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
2. Load dataset
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true') \
.load("../data/WineData2.csv",header=True);
df.show(5,True)
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
|fixed|volatile|citric|sugar|chlorides|free|total|density|
˓→pH|sulphates|alcohol|quality|
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
#
def condition(r):
if (0<= r <= 4):
label = "low"
elif(4< r <= 6):
label = "medium"
else:
label = "high"
return label
df = df.withColumn("quality", quality_udf("quality"))
df.show(5,True)
df.printSchema()
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
|fixed|volatile|citric|sugar|chlorides|free|total|density|
˓→pH|sulphates|alcohol|quality|
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
| 7.4| 0.7| 0.0| 1.9| 0.076|11.0| 34.0| 0.9978|3.51| 0.56|
˓→9.4| medium|
| 7.8| 0.88| 0.0| 2.6| 0.098|25.0| 67.0| 0.9968| 3.2| 0.68|
˓→9.8| medium|
| 7.8| 0.76| 0.04| 2.3| 0.092|15.0| 54.0| 0.997|3.26| 0.65|
˓→9.8| medium|
| 11.2| 0.28| 0.56| 1.9| 0.075|17.0| 60.0| 0.998|3.16| 0.58|
˓→9.8| medium|
| 7.4| 0.7| 0.0| 1.9| 0.076|11.0| 34.0| 0.9978|3.51| 0.56|
˓→9.4| medium|
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
only showing top 5 rows
root
|-- fixed: double (nullable = true)
|-- volatile: double (nullable = true)
|-- citric: double (nullable = true)
|-- sugar: double (nullable = true)
|-- chlorides: double (nullable = true)
|-- free: double (nullable = true)
|-- total: double (nullable = true)
(continues on next page)
Note: You are strongly encouraged to try my get_dummy function for dealing with the categorical data
in complex dataset.
Supervised learning version:
def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol):
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
˓→for encoder in encoders]
+ continuousCols, outputCol="features
˓→")
model=pipeline.fit(df)
data = model.transform(df)
data = data.withColumn('label',col(labelCol))
return data.select(indexCol,'features','label')
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
˓→for encoder in encoders]
+ continuousCols, outputCol="features
˓→")
model=pipeline.fit(df)
data = model.transform(df)
return data.select(indexCol,'features')
def transData(data):
return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF([
˓→'features','label'])
transformed = transData(df)
transformed.show(5)
+--------------------+------+
| features| label|
+--------------------+------+
(continues on next page)
+--------------------+------+------------+
| features| label|indexedLabel|
+--------------------+------+------------+
|[7.4,0.7,0.0,1.9,...|medium| 0.0|
|[7.8,0.88,0.0,2.6...|medium| 0.0|
|[7.8,0.76,0.04,2....|medium| 0.0|
|[11.2,0.28,0.56,1...|medium| 0.0|
|[7.4,0.7,0.0,1.9,...|medium| 0.0|
+--------------------+------+------------+
only showing top 5 rows
featureIndexer =VectorIndexer(inputCol="features", \
outputCol="indexedFeatures", \
maxCategories=4).fit(transformed)
featureIndexer.transform(transformed).show(5, True)
+--------------------+------+--------------------+
| features| label| indexedFeatures|
+--------------------+------+--------------------+
|[7.4,0.7,0.0,1.9,...|medium|[7.4,0.7,0.0,1.9,...|
|[7.8,0.88,0.0,2.6...|medium|[7.8,0.88,0.0,2.6...|
|[7.8,0.76,0.04,2....|medium|[7.8,0.76,0.04,2....|
|[11.2,0.28,0.56,1...|medium|[11.2,0.28,0.56,1...|
|[7.4,0.7,0.0,1.9,...|medium|[7.4,0.7,0.0,1.9,...|
+--------------------+------+--------------------+
only showing top 5 rows
# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = transformed.randomSplit([0.6, 0.4])
+--------------------+------+
| features| label|
+--------------------+------+
|[4.6,0.52,0.15,2....| low|
|[4.7,0.6,0.17,2.3...|medium|
|[5.0,1.02,0.04,1....| low|
|[5.0,1.04,0.24,1....|medium|
|[5.1,0.585,0.0,1....| high|
+--------------------+------+
only showing top 5 rows
+--------------------+------+
| features| label|
+--------------------+------+
|[4.9,0.42,0.0,2.1...| high|
|[5.0,0.38,0.01,1....|medium|
|[5.0,0.4,0.5,4.3,...|medium|
|[5.0,0.42,0.24,2....| high|
|[5.0,0.74,0.0,1.2...|medium|
+--------------------+------+
only showing top 5 rows
8. Pipeline Architecture
9. Make predictions
# Make predictions.
predictions = model.transform(testData)
(continues on next page)
+--------------------+------+--------------+
| features| label|predictedLabel|
+--------------------+------+--------------+
|[4.9,0.42,0.0,2.1...| high| high|
|[5.0,0.38,0.01,1....|medium| medium|
|[5.0,0.4,0.5,4.3,...|medium| medium|
|[5.0,0.42,0.24,2....| high| medium|
|[5.0,0.74,0.0,1.2...|medium| medium|
+--------------------+------+--------------+
only showing top 5 rows
10. Evaluation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
rfModel = model.stages[-2]
print(rfModel) # summary only
11. visualization
import matplotlib.pyplot as plt
import numpy as np
import itertools
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
class_temp = predictions.select("label").groupBy("label")\
.count().sort('count', ascending=False).toPandas()
class_temp = class_temp["label"].values.tolist()
class_names = map(str, class_temp)
# # # print(class_name)
class_names
y_pred = predictions.select("predictedLabel")
y_pred = y_pred.toPandas()
array([[502, 9, 0],
[ 73, 22, 0],
[ 28, 0, 0]])
plt.show()
10.5.1 Introduction
10.5.2 Demo
• The Jupyter notebook can be download from Gradient boosted tree Classification.
• For more details, please visit GBTClassifier API .
10.6.1 Introduction
10.6.2 Demo
• The Jupyter notebook can be download from Gradient boosted tree Classification.
• For more details, please visit GBTClassifier API .
Warning: Unfortunately, I didn’t find a good way to setup the XGBoost directly in Spark. But I do get
the XGBoost work with pysparkling on my machine.
2. Parse the data using H2O and convert them to Spark Frame
import h2o
frame = h2o.import_file("https://raw.githubusercontent.com/h2oai/sparkling-
˓→water/master/examples/smalldata/prostate/prostate.csv")
spark_frame = hc.as_spark_frame(frame)
spark_frame.show(4)
+---+-------+---+----+-----+-----+----+----+-------+
| ID|CAPSULE|AGE|RACE|DPROS|DCAPS| PSA| VOL|GLEASON|
+---+-------+---+----+-----+-----+----+----+-------+
| 1| 0| 65| 1| 2| 1| 1.4| 0.0| 6|
| 2| 0| 72| 1| 3| 2| 6.7| 0.0| 7|
| 3| 0| 70| 1| 1| 2| 4.9| 0.0| 6|
| 4| 0| 76| 2| 2| 1|51.2|20.0| 7|
+---+-------+---+----+-----+-----+----+----+-------+
only showing top 4 rows
4. Run Predictions
predictions = model.transform(spark_frame)
predictions.show(4)
+---+-------+---+----+-----+-----+----+----+-------+-------------------+
| ID|CAPSULE|AGE|RACE|DPROS|DCAPS| PSA| VOL|GLEASON| prediction_output|
+---+-------+---+----+-----+-----+----+----+-------+-------------------+
| 1| 0| 65| 1| 2| 1| 1.4| 0.0| 6|[64.85852813720703]|
| 2| 0| 72| 1| 3| 2| 6.7| 0.0| 7| [72.0611801147461]|
| 3| 0| 70| 1| 1| 2| 4.9| 0.0| 6|[70.26496887207031]|
| 4| 0| 76| 2| 2| 1|51.2|20.0| 7|[75.26521301269531]|
+---+-------+---+----+-----+-----+----+----+-------+-------------------+
only showing top 4 rows
10.7.1 Introduction
10.7.2 Demo
spark = SparkSession \
.builder \
.appName("Python Spark Naive Bayes classification") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
2. Load dataset
df = spark.read.format('com.databricks.spark.csv') \
.options(header='true', inferschema='true') \
.load("./data/WineData2.csv",header=True);
df.show(5)
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
|fixed|volatile|citric|sugar|chlorides|free|total|density|
˓→pH|sulphates|alcohol|quality|
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
| 7.4| 0.7| 0.0| 1.9| 0.076|11.0| 34.0| 0.9978|3.51| 0.56|
˓→9.4| 5|
| 7.8| 0.88| 0.0| 2.6| 0.098|25.0| 67.0| 0.9968| 3.2| 0.68|
˓→9.8| 5|
| 7.8| 0.76| 0.04| 2.3| 0.092|15.0| 54.0| 0.997|3.26| 0.65|
˓→9.8| 5|
| 11.2| 0.28| 0.56| 1.9| 0.075|17.0| 60.0| 0.998|3.16| 0.58|
˓→9.8| 6|
| 7.4| 0.7| 0.0| 1.9| 0.076|11.0| 34.0| 0.9978|3.51| 0.56|
˓→9.4| 5|
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
only showing top 5 rows
df.printSchema()
root
|-- fixed: double (nullable = true)
|-- volatile: double (nullable = true)
|-- citric: double (nullable = true)
|-- sugar: double (nullable = true)
|-- chlorides: double (nullable = true)
|-- free: double (nullable = true)
|-- total: double (nullable = true)
|-- density: double (nullable = true)
|-- pH: double (nullable = true)
|-- sulphates: double (nullable = true)
|-- alcohol: double (nullable = true)
|-- quality: string (nullable = true)
#
def condition(r):
if (0<= r <= 6):
label = "low"
(continues on next page)
df = df.withColumn("quality", quality_udf("quality"))
df.show(5,True)
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
|fixed|volatile|citric|sugar|chlorides|free|total|density|
˓→pH|sulphates|alcohol|quality|
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
| 7.4| 0.7| 0.0| 1.9| 0.076|11.0| 34.0| 0.9978|3.51| 0.56|
˓→9.4| medium|
| 7.8| 0.88| 0.0| 2.6| 0.098|25.0| 67.0| 0.9968| 3.2| 0.68|
˓→9.8| medium|
| 7.8| 0.76| 0.04| 2.3| 0.092|15.0| 54.0| 0.997|3.26| 0.65|
˓→9.8| medium|
| 11.2| 0.28| 0.56| 1.9| 0.075|17.0| 60.0| 0.998|3.16| 0.58|
˓→9.8| medium|
| 7.4| 0.7| 0.0| 1.9| 0.076|11.0| 34.0| 0.9978|3.51| 0.56|
˓→9.4| medium|
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
only showing top 5 rows
df.printSchema()
root
|-- fixed: double (nullable = true)
|-- volatile: double (nullable = true)
|-- citric: double (nullable = true)
|-- sugar: double (nullable = true)
|-- chlorides: double (nullable = true)
|-- free: double (nullable = true)
|-- total: double (nullable = true)
|-- density: double (nullable = true)
|-- pH: double (nullable = true)
|-- sulphates: double (nullable = true)
|-- alcohol: double (nullable = true)
|-- quality: string (nullable = true)
3. Deal with categorical data and Convert the data to dense vector
Note: You are strongly encouraged to try my get_dummy function for dealing with the categorical data
in complex dataset.
Supervised learning version:
def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol):
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
˓→for encoder in encoders]
+ continuousCols, outputCol="features
˓→")
model=pipeline.fit(df)
data = model.transform(df)
data = data.withColumn('label',col(labelCol))
return data.select(indexCol,'features','label')
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
˓→for encoder in encoders]
+ continuousCols, outputCol="features
˓→")
model=pipeline.fit(df)
data = model.transform(df)
return data.select(indexCol,'features')
def get_dummy(df,categoricalCols,continuousCols,labelCol):
model=pipeline.fit(df)
data = model.transform(df)
data = data.withColumn('label',col(labelCol))
return data.select('features','label')
def transData(data):
return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features',
˓→'label'])
transformed = transData(df)
transformed.show(5)
+--------------------+-----+
| features|label|
+--------------------+-----+
|[7.4,0.7,0.0,1.9,...| low|
|[7.8,0.88,0.0,2.6...| low|
|[7.8,0.76,0.04,2....| low|
|[11.2,0.28,0.56,1...| low|
|[7.4,0.7,0.0,1.9,...| low|
+--------------------+-----+
only showing top 5 rows
+--------------------+-----+------------+
| features|label|indexedLabel|
+--------------------+-----+------------+
|[7.4,0.7,0.0,1.9,...| low| 0.0|
|[7.8,0.88,0.0,2.6...| low| 0.0|
|[7.8,0.76,0.04,2....| low| 0.0|
|[11.2,0.28,0.56,1...| low| 0.0|
|[7.4,0.7,0.0,1.9,...| low| 0.0|
+--------------------+-----+------------+
only showing top 5 rows
+--------------------+-----+--------------------+
| features|label| indexedFeatures|
+--------------------+-----+--------------------+
|[7.4,0.7,0.0,1.9,...| low|[7.4,0.7,0.0,1.9,...|
|[7.8,0.88,0.0,2.6...| low|[7.8,0.88,0.0,2.6...|
|[7.8,0.76,0.04,2....| low|[7.8,0.76,0.04,2....|
|[11.2,0.28,0.56,1...| low|[11.2,0.28,0.56,1...|
|[7.4,0.7,0.0,1.9,...| low|[7.4,0.7,0.0,1.9,...|
+--------------------+-----+--------------------+
only showing top 5 rows
# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = data.randomSplit([0.6, 0.4])
trainingData.show(5,False)
testData.show(5,False)
+---------------------------------------------------------+-----+
|features |label|
+---------------------------------------------------------+-----+
|[5.0,0.38,0.01,1.6,0.048,26.0,60.0,0.99084,3.7,0.75,14.0]|low |
|[5.0,0.42,0.24,2.0,0.06,19.0,50.0,0.9917,3.72,0.74,14.0] |high |
|[5.0,0.74,0.0,1.2,0.041,16.0,46.0,0.99258,4.01,0.59,12.5]|low |
|[5.0,1.02,0.04,1.4,0.045,41.0,85.0,0.9938,3.75,0.48,10.5]|low |
|[5.0,1.04,0.24,1.6,0.05,32.0,96.0,0.9934,3.74,0.62,11.5] |low |
+---------------------------------------------------------+-----+
only showing top 5 rows
+---------------------------------------------------------+-----+
|features |label|
+---------------------------------------------------------+-----+
|[4.6,0.52,0.15,2.1,0.054,8.0,65.0,0.9934,3.9,0.56,13.1] |low |
|[4.7,0.6,0.17,2.3,0.058,17.0,106.0,0.9932,3.85,0.6,12.9] |low |
|[4.9,0.42,0.0,2.1,0.048,16.0,42.0,0.99154,3.71,0.74,14.0]|high |
|[5.0,0.4,0.5,4.3,0.046,29.0,80.0,0.9902,3.49,0.66,13.6] |low |
|[5.2,0.49,0.26,2.3,0.09,23.0,74.0,0.9953,3.71,0.62,12.2] |low |
+---------------------------------------------------------+-----+
only showing top 5 rows
7. Pipeline Architecture
8. Make predictions
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)
+--------------------+-----+--------------+
| features|label|predictedLabel|
+--------------------+-----+--------------+
|[4.6,0.52,0.15,2....| low| low|
|[4.7,0.6,0.17,2.3...| low| low|
|[4.9,0.42,0.0,2.1...| high| low|
|[5.0,0.4,0.5,4.3,...| low| low|
|[5.2,0.49,0.26,2....| low| low|
+--------------------+-----+--------------+
only showing top 5 rows
9. Evaluation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
lrModel = model.stages[2]
trainingSummary = lrModel.summary
You can use z.show() to get the data and plot the ROC curves:
You can also register a TempTable data.registerTempTable('roc_data') and then use sql to
plot the ROC curve:
10. visualization
import matplotlib.pyplot as plt
import numpy as np
import itertools
print(cm)
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
class_temp = predictions.select("label").groupBy("label")\
.count().sort('count', ascending=False).toPandas()
class_temp = class_temp["label"].values.tolist()
class_names = map(str, class_temp)
# # # print(class_name)
class_names
['low', 'high']
y_pred = predictions.select("predictedLabel")
y_pred = y_pred.toPandas()
array([[392, 169],
[ 32, 61]])
plt.show()
ELEVEN
CLUSTERING
Chinese proverb
Sharpening the knife longer can make it easier to hack the firewood – old Chinese proverb
The above figure was generated by the code from: Python Data Science Handbook.
11.1.1 Introduction
11.1.2 Demo
spark = SparkSession \
.builder \
.appName("Python Spark K-means example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
2. Load dataset
177
Learning Apache Spark with Python
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').\
load("../data/iris.csv",header=True);
df.show(5,True)
df.printSchema()
+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
| 5.1| 3.5| 1.4| 0.2| setosa|
| 4.9| 3.0| 1.4| 0.2| setosa|
| 4.7| 3.2| 1.3| 0.2| setosa|
| 4.6| 3.1| 1.5| 0.2| setosa|
| 5.0| 3.6| 1.4| 0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows
root
|-- sepal_length: double (nullable = true)
|-- sepal_width: double (nullable = true)
|-- petal_length: double (nullable = true)
|-- petal_width: double (nullable = true)
|-- species: string (nullable = true)
You can also get the Statistical resutls from the data frame (Unfortunately, it only works for numerical).
df.describe().show()
+-------+------------------+-------------------+------------------+-----------
˓→-------+---------+
|summary| sepal_length| sepal_width| petal_length|
˓→petal_width| species|
+-------+------------------+-------------------+------------------+-----------
˓→-------+---------+
| count| 150| 150| 150|
˓→ 150| 150|
| mean| 5.843333333333335| 3.0540000000000007|3.7586666666666693|1.
˓→1986666666666672| null|
| stddev|0.8280661279778637|0.43359431136217375| 1.764420419952262|0.
˓→7631607417008414| null|
| min| 4.3| 2.0| 1.0|
˓→ 0.1| setosa|
| max| 7.9| 4.4| 6.9|
˓→ 2.5|virginica|
(continues on next page)
Note: You are strongly encouraged to try my get_dummy function for dealing with the categorical data
in complex dataset.
Supervised learning version:
def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol):
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
˓→for encoder in encoders]
+ continuousCols, outputCol="features
˓→")
model=pipeline.fit(df)
data = model.transform(df)
data = data.withColumn('label',col(labelCol))
return data.select(indexCol,'features','label')
assembler = VectorAssembler(inputCols=[encoder.getOutputCol()
˓→for encoder in encoders]
+ continuousCols, outputCol="features
˓→")
model=pipeline.fit(df)
data = model.transform(df)
return data.select(indexCol,'features')
transformed= transData(df)
transformed.show(5, False)
+-----------------+
|features |
+-----------------+
|[5.1,3.5,1.4,0.2]|
|[4.9,3.0,1.4,0.2]|
|[4.7,3.2,1.3,0.2]|
|[4.6,3.1,1.5,0.2]|
|[5.0,3.6,1.4,0.2]|
+-----------------+
only showing top 5 rows
featureIndexer = VectorIndexer(inputCol="features", \
outputCol="indexedFeatures",\
maxCategories=4).fit(transformed)
data = featureIndexer.transform(transformed)
6. Elbow method to determine the optimal number of clusters for k-means clustering
import numpy as np
cost = np.zeros(20)
for k in range(2,20):
kmeans = KMeans()\
.setK(k)\
.setSeed(1) \
.setFeaturesCol("indexedFeatures")\
.setPredictionCol("cluster")
model = kmeans.fit(data)
cost[k] = model.computeCost(data) # requires Spark 2.0 or later
import numpy as np
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
import seaborn as sbs
from matplotlib.ticker import MaxNLocator
In my opinion, sometimes it’s hard to choose the optimal number of the clusters by using the elbow
method. As shown in the following Figure, you can choose 3, 5 or even 8. I will choose 3 in this demo.
• Silhouette analysis
#PySpark libraries
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import col, percent_rank, lit
from pyspark.sql.window import Window
from pyspark.sql import DataFrame, Row
from pyspark.sql.types import StructType
from functools import reduce # For Python 3.x
start = time.time()
silh_lst = []
k_lst = np.arange(k_min, k_max+1)
r_table = df_in.select(index_col).toPandas()
r_table = r_table.set_index(index_col)
centers = pd.DataFrame()
for k in k_lst:
silh_val = []
for run in np.arange(1, num_runs+1):
# Make predictions
predictions = model.transform(df_in)
r_table['cluster_{k}_{run}'.format(k=k, run=run)]= predictions.
˓→select('prediction').toPandas()
silh_array=np.asanyarray(silh_val)
silh_lst.append(silh_array.mean())
print('+------------------------------------------------------------+')
print("| The finding optimal k phase took %8.0f s. |"
˓→%(elapsed))
print('+------------------------------------------------------------+')
+------------------------------------------------------------+
| The finding optimal k phase took 1783 s. |
+------------------------------------------------------------+
spark.createDataFrame(silh_lst).show()
+---+------------------+
| k| silhouette|
+---+------------------+
| 3|0.8045154385557953|
| 4|0.6993528775512052|
| 5|0.6689286654221447|
| 6|0.6356184024841809|
| 7|0.7174102265711756|
| 8|0.6720861758298997|
| 9| 0.601771359881241|
| 10|0.6292447334578428|
+---+------------------+
From the silhouette list, we can choose 3 as the optimal number of the clusters.
7. Pipeline Architecture
kmeans = KMeans() \
.setK(3) \
.setFeaturesCol("indexedFeatures")\
.setPredictionCol("cluster")
model = pipeline.fit(transformed)
cluster = model.transform(transformed)
8. k-means clusters
cluster = model.transform(transformed)
+-----------------+-----------------+-------+
| features| indexedFeatures|cluster|
+-----------------+-----------------+-------+
|[5.1,3.5,1.4,0.2]|[5.1,3.5,1.4,0.2]| 1|
|[4.9,3.0,1.4,0.2]|[4.9,3.0,1.4,0.2]| 1|
|[4.7,3.2,1.3,0.2]|[4.7,3.2,1.3,0.2]| 1|
|[4.6,3.1,1.5,0.2]|[4.6,3.1,1.5,0.2]| 1|
|[5.0,3.6,1.4,0.2]|[5.0,3.6,1.4,0.2]| 1|
|[5.4,3.9,1.7,0.4]|[5.4,3.9,1.7,0.4]| 1|
|[4.6,3.4,1.4,0.3]|[4.6,3.4,1.4,0.3]| 1|
|[5.0,3.4,1.5,0.2]|[5.0,3.4,1.5,0.2]| 1|
|[4.4,2.9,1.4,0.2]|[4.4,2.9,1.4,0.2]| 1|
|[4.9,3.1,1.5,0.1]|[4.9,3.1,1.5,0.1]| 1|
|[5.4,3.7,1.5,0.2]|[5.4,3.7,1.5,0.2]| 1|
|[4.8,3.4,1.6,0.2]|[4.8,3.4,1.6,0.2]| 1|
|[4.8,3.0,1.4,0.1]|[4.8,3.0,1.4,0.1]| 1|
|[4.3,3.0,1.1,0.1]|[4.3,3.0,1.1,0.1]| 1|
|[5.8,4.0,1.2,0.2]|[5.8,4.0,1.2,0.2]| 1|
|[5.7,4.4,1.5,0.4]|[5.7,4.4,1.5,0.4]| 1|
|[5.4,3.9,1.3,0.4]|[5.4,3.9,1.3,0.4]| 1|
|[5.1,3.5,1.4,0.3]|[5.1,3.5,1.4,0.3]| 1|
|[5.7,3.8,1.7,0.3]|[5.7,3.8,1.7,0.3]| 1|
|[5.1,3.8,1.5,0.3]|[5.1,3.8,1.5,0.3]| 1|
+-----------------+-----------------+-------+
only showing top 20 rows
TWELVE
RFM ANALYSIS
187
Learning Apache Spark with Python
+----------+-------+---------+---------+
|CustomerID|Recency|Frequency| Monetary|
+----------+-------+---------+---------+
| 14911| 1| 248|132572.62|
| 12748| 0| 224| 29072.1|
| 17841| 1| 169| 40340.78|
| 14606| 1| 128| 11713.85|
| 15311| 0| 118| 59419.34|
+----------+-------+---------+---------+
only showing top 5 rows
+----------+-------+---------+--------+-----+-----+-----+
|CustomerID|Recency|Frequency|Monetary|r_seg|f_seg|m_seg|
+----------+-------+---------+--------+-----+-----+-----+
| 17420| 50| 3| 598.83| 2| 3| 2|
| 16861| 59| 3| 151.65| 3| 3| 1|
| 16503| 106| 5| 1421.43| 3| 2| 3|
| 15727| 16| 7| 5178.96| 1| 1| 4|
| 17389| 0| 43|31300.08| 1| 1| 4|
+----------+-------+---------+--------+-----+-----+-----+
only showing top 5 rows
12.1.3 3. Determine the RFM scores and summarize the corresponding business
value
+----------+-------+---------+--------+-----+-----+-----+--------+
|CustomerID|Recency|Frequency|Monetary|r_seg|f_seg|m_seg|RFMScore|
+----------+-------+---------+--------+-----+-----+-----+--------+
(continues on next page)
12.2 Demo
spark = SparkSession \
.builder \
.appName("Python Spark RFM example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
2. Load dataset
df_raw = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').\
load("Online Retail.csv",header=True);
df_raw.show(5)
df_raw.printSchema()
+---------+---------+--------------------+--------+------------+---------+----
˓→------+--------------+
|InvoiceNo|StockCode| Description|Quantity|
˓→InvoiceDate|UnitPrice|CustomerID| Country|
+---------+---------+--------------------+--------+------------+---------+----
˓→------+--------------+
| 536365| 85123A|WHITE HANGING HEA...| 6|12/1/10 8:26| 2.55|
˓→ 17850|United Kingdom|
| 536365| 71053| WHITE METAL LANTERN| 6|12/1/10 8:26| 3.39|
˓→ 17850|United Kingdom|
+---------+---------+--------------------+--------+------------+---------+----
˓→------+--------------+
only showing top 5 rows
(continues on next page)
root
|-- InvoiceNo: string (nullable = true)
|-- StockCode: string (nullable = true)
|-- Description: string (nullable = true)
|-- Quantity: integer (nullable = true)
|-- InvoiceDate: string (nullable = true)
|-- UnitPrice: double (nullable = true)
|-- CustomerID: integer (nullable = true)
|-- Country: string (nullable = true)
def my_count(df_in):
df_in.agg( *[ count(c).alias(c) for c in df_in.columns ] ).show()
my_count(df_raw)
+---------+---------+-----------+--------+-----------+---------+----------+---
˓→----+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+---
˓→----+
Since the count results are not the same, we have some null value in the CustomerID column. We can
drop these records from the dataset.
df = df_raw.dropna(how='any')
my_count(df)
+---------+---------+-----------+--------+-----------+---------+----------+---
˓→----+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+---
˓→----+
| 406829| 406829| 406829| 406829| 406829| 406829| 406829|
˓→406829|
+---------+---------+-----------+--------+-----------+---------+----------+---
˓→----+
df = df.withColumn('NewInvoiceDate'
, to_utc_timestamp(unix_timestamp(col('InvoiceDate'),
˓→timeFmt).cast('timestamp')
, 'UTC'))
df.show(5)
+---------+---------+--------------------+--------+------------+---------+----
˓→------+--------------+--------------------+
|InvoiceNo|StockCode| Description|Quantity|
˓→InvoiceDate|UnitPrice|CustomerID| Country| NewInvoiceDate|
+---------+---------+--------------------+--------+------------+---------+----
˓→------+--------------+--------------------+
| 536365| 85123A|WHITE HANGING HEA...| 6|12/1/10 8:26| 2.55|
˓→ 17850|United Kingdom|2010-12-01 08:26:...|
| 536365| 71053| WHITE METAL LANTERN| 6|12/1/10 8:26| 3.39|
˓→ 17850|United Kingdom|2010-12-01 08:26:...|
| 536365| 84406B|CREAM CUPID HEART...| 8|12/1/10 8:26| 2.75|
˓→ 17850|United Kingdom|2010-12-01 08:26:...|
| 536365| 84029G|KNITTED UNION FLA...| 6|12/1/10 8:26| 3.39|
˓→ 17850|United Kingdom|2010-12-01 08:26:...|
| 536365| 84029E|RED WOOLLY HOTTIE...| 6|12/1/10 8:26| 3.39|
˓→ 17850|United Kingdom|2010-12-01 08:26:...|
+---------+---------+--------------------+--------+------------+---------+----
˓→------+--------------+--------------------+
only showing top 5 rows
date_max = df.select(max('NewInvoiceDate')).toPandas()
current = to_utc_timestamp( unix_timestamp(lit(str(date_max.iloc[0][0])), \
'yy-MM-dd HH:mm').cast('timestamp'), 'UTC' )
# Calculatre Duration
df = df.withColumn('Duration', datediff(lit(current), 'NewInvoiceDate'))
recency = df.groupBy('CustomerID').agg(min('Duration').alias('Recency'))
frequency = df.groupBy('CustomerID', 'InvoiceNo').count()\
.groupBy('CustomerID')\
.agg(count("*").alias("Frequency"))
monetary = df.groupBy('CustomerID').agg(round(sum('TotalPrice'), 2).alias(
˓→'Monetary'))
rfm = recency.join(frequency,'CustomerID', how = 'inner')\
.join(monetary,'CustomerID', how = 'inner')
rfm.show(5)
+----------+-------+---------+--------+
|CustomerID|Recency|Frequency|Monetary|
+----------+-------+---------+--------+
| 17420| 50| 3| 598.83|
| 16861| 59| 3| 151.65|
| 16503| 106| 5| 1421.43|
| 15727| 16| 7| 5178.96|
| 17389| 0| 43|31300.08|
+----------+-------+---------+--------+
only showing top 5 rows
cols = ['Recency','Frequency','Monetary']
describe_pd(rfm,cols,1)
+-------+-----------------+-----------------+------------------+
|summary| Recency| Frequency| Monetary|
+-------+-----------------+-----------------+------------------+
| count| 4372.0| 4372.0| 4372.0|
| mean|91.58119853613907| 5.07548032936871|1898.4597003659655|
| stddev|100.7721393138483|9.338754163574727| 8219.345141139722|
| min| 0.0| 1.0| -4287.63|
| max| 373.0| 248.0| 279489.02|
| 25%| 16.0| 1.0|293.36249999999995|
| 50%| 50.0| 3.0| 648.075|
| 75%| 143.0| 5.0| 1611.725|
+-------+-----------------+-----------------+------------------+
def RScore(x):
if x <= 16:
return 1
elif x<= 50:
return 2
elif x<= 143:
return 3
else:
return 4
def FScore(x):
if x <= 1:
return 4
elif x <= 3:
return 3
elif x <= 5:
return 2
else:
return 1
def MScore(x):
if x <= 293:
return 4
elif x <= 648:
return 3
elif x <= 1611:
return 2
else:
return 1
5. RFM Segmentation
+----------+-------+---------+--------+-----+-----+-----+
|CustomerID|Recency|Frequency|Monetary|r_seg|f_seg|m_seg|
+----------+-------+---------+--------+-----+-----+-----+
| 17420| 50| 3| 598.83| 2| 3| 2|
| 16861| 59| 3| 151.65| 3| 3| 1|
| 16503| 106| 5| 1421.43| 3| 2| 3|
| 15727| 16| 7| 5178.96| 1| 1| 4|
| 17389| 0| 43|31300.08| 1| 1| 4|
(continues on next page)
rfm_seg = rfm_seg.withColumn('RFMScore',
F.concat(F.col('r_seg'),F.col('f_seg'), F.col('m_
˓→seg')))
rfm_seg.sort(F.col('RFMScore')).show(5)
+----------+-------+---------+--------+-----+-----+-----+--------+
|CustomerID|Recency|Frequency|Monetary|r_seg|f_seg|m_seg|RFMScore|
+----------+-------+---------+--------+-----+-----+-----+--------+
| 17988| 11| 8| 191.17| 1| 1| 1| 111|
| 16892| 1| 7| 496.84| 1| 1| 2| 112|
| 16668| 15| 6| 306.72| 1| 1| 2| 112|
| 16554| 3| 7| 641.55| 1| 1| 2| 112|
| 16500| 4| 6| 400.86| 1| 1| 2| 112|
+----------+-------+---------+--------+-----+-----+-----+--------+
only showing top 5 rows
6. Statistical Summary
• simple summary
rfm_seg.groupBy('RFMScore')\
.agg({'Recency':'mean',
'Frequency': 'mean',
'Monetary': 'mean'} )\
.sort(F.col('RFMScore')).show(5)
+--------+-----------------+------------------+------------------+
|RFMScore| avg(Recency)| avg(Monetary)| avg(Frequency)|
+--------+-----------------+------------------+------------------+
| 111| 11.0| 191.17| 8.0|
| 112| 8.0| 505.9775| 7.5|
| 113|7.237113402061856|1223.3604123711339| 7.752577319587629|
| 114|6.035123966942149| 8828.888595041324|18.882231404958677|
| 121| 9.6| 207.24| 4.4|
+--------+-----------------+------------------+------------------+
only showing top 5 rows
• complex summary
grp = 'RFMScore'
num_cols = ['Recency','Frequency','Monetary']
df_input = rfm_seg
quantile_grouped = quantile_agg(df_input,grp,num_cols)
(continues on next page)
deciles_grouped = deciles_agg(df_input,grp,num_cols)
deciles_grouped.toPandas().to_csv(output_dir+'deciles_grouped.csv')
12.3 Extension
You can also apply the K-means clustering in Clustering section to do the segmentation.
transformed= transData(rfm)
transformed.show(5)
+----------+-------------------+
|CustomerID| rfm|
+----------+-------------------+
| 17420| [50.0,3.0,598.83]|
| 16861| [59.0,3.0,151.65]|
| 16503|[106.0,5.0,1421.43]|
| 15727| [16.0,7.0,5178.96]|
| 17389|[0.0,43.0,31300.08]|
+----------+-------------------+
only showing top 5 rows
scaler = MinMaxScaler(inputCol="rfm",\
(continues on next page)
+----------+-------------------+----------------------------------------------
˓→----------------+
|CustomerID|rfm |features
˓→ |
+----------+-------------------+----------------------------------------------
˓→----------------+
|17420 |[50.0,3.0,598.83] |[0.13404825737265416,0.008097165991902834,0.
˓→01721938714830836]|
|16861 |[59.0,3.0,151.65] |[0.1581769436997319,0.008097165991902834,0.
˓→01564357039241953] |
|16503 |[106.0,5.0,1421.43]|[0.28418230563002683,0.016194331983805668,0.
˓→02011814573186342]|
|15727 |[16.0,7.0,5178.96] |[0.04289544235924933,0.024291497975708502,0.
˓→03335929858922501]|
|17389 |[0.0,43.0,31300.08]|[0.0,0.1700404858299595,0.12540746393334334]
˓→ |
+----------+-------------------+----------------------------------------------
˓→----------------+
only showing top 5 rows
#PySpark libraries
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import col, percent_rank, lit
from pyspark.sql.window import Window
from pyspark.sql import DataFrame, Row
from pyspark.sql.types import StructType
from functools import reduce # For Python 3.x
import numpy as np
cost = np.zeros(20)
for k in range(2,20):
kmeans = KMeans()\
(continues on next page)
model = kmeans.fit(scaledData)
cost[k] = model.computeCost(scaledData) # requires Spark 2.0 or later
import numpy as np
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
import seaborn as sbs
from matplotlib.ticker import MaxNLocator
In my opinion, sometimes it’s hard to choose the number of the clusters. As shown in Figure Cost v.s. the
number of the clusters, you can choose 3, 5 or even 8. I will choose 3 in this demo.
• Silhouette analysis
#PySpark libraries
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import col, percent_rank, lit
from pyspark.sql.window import Window
from pyspark.sql import DataFrame, Row
from pyspark.sql.types import StructType
from functools import reduce # For Python 3.x
start = time.time()
silh_lst = []
k_lst = np.arange(k_min, k_max+1)
r_table = df_in.select(index_col).toPandas()
r_table = r_table.set_index(index_col)
centers = pd.DataFrame()
for k in k_lst:
silh_val = []
for run in np.arange(1, num_runs+1):
# Make predictions
predictions = model.transform(df_in)
r_table['cluster_{k}_{run}'.format(k=k, run=run)]= predictions.
˓→select('prediction').toPandas()
silh_array=np.asanyarray(silh_val)
silh_lst.append(silh_array.mean())
print('+------------------------------------------------------------+')
print("| The finding optimal k phase took %8.0f s. |"
˓→%(elapsed))
print('+------------------------------------------------------------+')
spark.createDataFrame(silh_lst).show()
+---+------------------+
| k| silhouette|
+---+------------------+
| 3|0.8045154385557953|
| 4|0.6993528775512052|
| 5|0.6689286654221447|
| 6|0.6356184024841809|
| 7|0.7174102265711756|
| 8|0.6720861758298997|
| 9| 0.601771359881241|
| 10|0.6292447334578428|
+---+------------------+
From the silhouette list, we can choose 3 as the optimal number of the clusters.
4. K-means clustering
k = 3
kmeans = KMeans().setK(k).setSeed(1)
model = kmeans.fit(scaledData)
# Make predictions
predictions = model.transform(scaledData)
predictions.show(5,False)
+----------+-------------------+--------------------+----------+
|CustomerID| rfm| features|prediction|
+----------+-------------------+--------------------+----------+
| 17420| [50.0,3.0,598.83]|[0.13404825737265...| 0|
| 16861| [59.0,3.0,151.65]|[0.15817694369973...| 0|
| 16503|[106.0,5.0,1421.43]|[0.28418230563002...| 2|
| 15727| [16.0,7.0,5178.96]|[0.04289544235924...| 0|
| 17389|[0.0,43.0,31300.08]|[0.0,0.1700404858...| 0|
+----------+-------------------+--------------------+----------+
only showing top 5 rows
5. statistical summary
results = rfm.join(predictions.select('CustomerID','prediction'),'CustomerID',
˓→how='left')
results.show(5)
+----------+-------+---------+--------+----------+
|CustomerID|Recency|Frequency|Monetary|prediction|
+----------+-------+---------+--------+----------+
| 13098| 1| 41|28658.88| 0|
| 13248| 124| 2| 465.68| 2|
| 13452| 259| 2| 590.0| 1|
| 13460| 29| 2| 183.44| 0|
| 13518| 85| 1| 659.44| 0|
+----------+-------+---------+--------+----------+
only showing top 5 rows
• simple summary
results.groupBy('prediction')\
.agg({'Recency':'mean',
'Frequency': 'mean',
'Monetary': 'mean'} )\
.sort(F.col('prediction')).show(5)
+----------+------------------+------------------+------------------+
|prediction| avg(Recency)| avg(Monetary)| avg(Frequency)|
+----------+------------------+------------------+------------------+
| 0|30.966337980278816|2543.0355321319284| 6.514450867052023|
| 1|296.02403846153845|407.16831730769206|1.5592948717948718|
| 2|154.40148698884758| 702.5096406443623| 2.550185873605948|
+----------+------------------+------------------+------------------+
• complex summary
grp = 'RFMScore'
num_cols = ['Recency','Frequency','Monetary']
df_input = results
quantile_grouped = quantile_agg(df_input,grp,num_cols)
quantile_grouped.toPandas().to_csv(output_dir+'quantile_grouped.csv')
deciles_grouped = deciles_agg(df_input,grp,num_cols)
deciles_grouped.toPandas().to_csv(output_dir+'deciles_grouped.csv')
THIRTEEN
TEXT MINING
Chinese proverb
Articles showed more than intended. – Xianglong Shen
• My img2txt function
def img2txt(img_dir):
"""
convert images to text
"""
import os, PythonMagick
(continues on next page)
203
Learning Apache Spark with Python
f = open('doc4img.txt','wa')
for img in [img_file for img_file in os.listdir(img_dir)
if (img_file.endswith(".png") or
img_file.endswith(".jpg") or
img_file.endswith(".jpeg"))]:
start_time = datetime.now()
print('---------------------------------------------------------------
˓→-----')
print(img)
print('Converting ' + img +'.......')
print('---------------------------------------------------------------
˓→-----')
f.close()
• Demo
I applied my img2txt function to the image in Image folder.
image_dir = r"Image"
img2txt(image_dir)
Wenqiang Feng
Data Scientist
DST APPLIED ANALYTICS GROUP
Wenqiang Feng is Data Scientist for DST’s Applied Analytics Group. Dr. Feng’s
˓→responsibilities
Dr. Feng has deep analytic expertise in data mining, analytic systems,
˓→machine learning
algorithms, business intelligence, and applying Big Data tools to
˓→strategically solve industry
problems in a cross--functional business. Before joining the DST Applied
˓→Analytics Group, Dr.
Feng holds a MA Data Science Fellow at The Institute for Mathematics and Its
˓→Applications
{IMA) at the University of Minnesota. While there, he helped startup
˓→companies make
marketing decisions based on deep predictive analytics.
• My img2txt_enhance function
def img2txt_enhance(img_dir,scaler):
"""
convert images files to text
"""
import numpy as np
import os, PythonMagick
from datetime import datetime
import PyPDF2
(continues on next page)
f = open('doc4img.txt','wa')
for img in [img_file for img_file in os.listdir(img_dir)
if (img_file.endswith(".png") or
img_file.endswith(".jpg") or
img_file.endswith(".jpeg"))]:
start_time = datetime.now()
print('---------------------------------------------------------------
˓→-----')
print(img)
print('Converting ' + img +'.......')
print('---------------------------------------------------------------
˓→-----')
f.close()
• Demo
I applied my img2txt_enhance function to the following noised image in Enhance folder.
image_dir = r"Enhance"
pdf2txt_enhance(image_dir)
--------------------------------------------------------------------
noised.jpg
Converting noised.jpg.......
--------------------------------------------------------------------
zHHH
CPU Time for convertingnoised.jpg:0:00:00.135465
--------------------------------------------------------------------
noised.jpg
Converting noised.jpg.......
--------------------------------------------------------------------
,2 WW
CPU Time for convertingnoised.jpg:0:00:00.133508
• My pdf2txt function
def pdf2txt(pdf_dir,image_dir):
"""
convert PDF to text
"""
f = open('doc.txt','wa')
for pdf in [pdf_file for pdf_file in os.listdir(pdf_dir) if pdf_file.
˓→endswith(".pdf")]:
start_time = datetime.now()
print('---------------------------------------------------------------
˓→-----')
print(pdf)
print('Converting %d pages.' % npage)
print('---------------------------------------------------------------
˓→-----')
f.write( "\n----------------------------------------------------------
˓→ ----------\n")
for p in range(npage):
#print(text)
f.close()
• Demo
I applied my pdf2txt function to my scaned bio pdf file in pdf folder.
pdf_dir = r"pdf"
image_dir = r"Image"
pdf2txt(pdf_dir,image_dir)
--------------------------------------------------------------------
feng.pdf
Converting 1 pages.
--------------------------------------------------------------------
l I l w
Wenqiang Feng
Data Scientist
DST APPLIED ANALYTICS GROUP
Wenqiang Feng is Data Scientist for DST’s Applied Analytics Group. Dr. Feng’s
˓→responsibilities
include providing DST clients with access to cutting--edge skills and
˓→technologies, including Big
Data analytic solutions, advanced analytic and data enhancement techniques
˓→and modeling.
Dr. Feng has deep analytic expertise in data mining, analytic systems,
˓→machine learning
algorithms, business intelligence, and applying Big Data tools to
˓→strategically solve industry
• My audio2txt function
def audio2txt(audio_dir):
''' convert audio to text'''
import speech_recognition as sr
r = sr.Recognizer()
f = open('doc.txt','wa')
for audio_n in [audio_file for audio_file in os.listdir(audio_dir) \
if audio_file.endswith(".wav")]:
f.close()
• Demo
I applied my audio2txt function to my audio records in audio folder.
audio_dir = r"audio"
audio2txt(audio_dir)
By the way, you can use my following python code to record your own audio and play with audio2txt
function in Command-line python record.py "demo2.wav":
audio_filename = sys.argv[1]
r = sr.Recognizer()
with sr.Microphone() as source:
r.adjust_for_ambient_noise(source)
print("Hey there, say something, I am recording!")
audio = r.listen(source)
print("Done listening!")
def check_blanks(data_str):
is_blank = str(data_str.isspace())
return is_blank
• Determine whether the language of the text content is english or not: Use langid module to classify
the language to make sure we are applying the correct cleanup actions for English langid
def check_lang(data_str):
predict_lang = langid.classify(data_str)
if predict_lang[1] >= .9:
language = predict_lang[0]
else:
language = 'NA'
return language
• Remove features
def remove_features(data_str):
# compile regex
url_re = re.compile('https?://(www.)?\w+\.\w+(/\w+)*/?')
punc_re = re.compile('[%s]' % re.escape(string.punctuation))
num_re = re.compile('(\\d+)')
mention_re = re.compile('@(\w+)')
alpha_num_re = re.compile("^[a-z0-9_.]+$")
# convert to lowercase
data_str = data_str.lower()
# remove hyperlinks
data_str = url_re.sub(' ', data_str)
# remove @mentions
data_str = mention_re.sub(' ', data_str)
# remove puncuation
(continues on next page)
def remove_stops(data_str):
# expects a string
stops = set(stopwords.words("english"))
list_pos = 0
cleaned_str = ''
text = data_str.split()
for word in text:
if word not in stops:
# rebuild cleaned_str
if list_pos == 0:
cleaned_str = word
else:
cleaned_str = cleaned_str + ' ' + word
list_pos += 1
return cleaned_str
• tagging text
def tag_and_remove(data_str):
cleaned_str = ' '
# noun tags
nn_tags = ['NN', 'NNP', 'NNP', 'NNPS', 'NNS']
# adjectives
jj_tags = ['JJ', 'JJR', 'JJS']
# verbs
vb_tags = ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ']
nltk_tags = nn_tags + jj_tags + vb_tags
# tag the text and keep only those with the right tags
tagged_text = pos_tag(text)
for tagged_word in tagged_text:
if tagged_word[1] in nltk_tags:
cleaned_str += tagged_word[0] + ' '
return cleaned_str
• lemmatization
def lemmatize(data_str):
# expects a string
list_pos = 0
cleaned_str = ''
lmtzr = WordNetLemmatizer()
text = data_str.split()
tagged_words = pos_tag(text)
for word in tagged_words:
if 'v' in word[1].lower():
lemma = lmtzr.lemmatize(word[0], pos='v')
else:
lemma = lmtzr.lemmatize(word[0], pos='n')
if list_pos == 0:
cleaned_str = lemma
else:
cleaned_str = cleaned_str + ' ' + lemma
list_pos += 1
return cleaned_str
Theoretically speaking, you may apply any classification algorithms to do classification. I will only present
Naive Bayes method is the following.
13.3.1 Introduction
13.3.2 Demo
import pyspark
from pyspark.sql import SQLContext
2. load dataset
#Create DataFrame
data_df = sqlContext.createDataFrame(typed_rdd, ["text", "id", "label"])
#data_df.show()
data_df.printSchema()
root
|-- text: string (nullable = true)
|-- id: string (nullable = true)
|-- label: double (nullable = true)
+--------------------+------------------+-----+
| text| id|label|
+--------------------+------------------+-----+
|Fresh install of ...| 1018769417| 1.0|
|Well. Now I know ...| 10284216536| 1.0|
|"Literally six we...| 10298589026| 1.0|
|Mitsubishi i MiEV...|109017669432377344| 1.0|
+--------------------+------------------+-----+
only showing top 4 rows
4. language identification
+--------------------+------------------+-----+----+
| text| id|label|lang|
+--------------------+------------------+-----+----+
|RT @goeentertain:...|665305154954989568| 1.0| en|
|Teforia Uses Mach...|660668007975268352| 1.0| en|
| Apple TV or Roku?| 25842461136| 1.0| en|
|Finished http://t...| 9412369614| 1.0| en|
+--------------------+------------------+-----+----+
only showing top 4 rows
rm_stops_df = en_df.select(raw_cols)\
.withColumn("stop_text", remove_stops_udf(en_df["text"]))
rm_stops_df.show(4)
+--------------------+------------------+-----+--------------------+
| text| id|label| stop_text|
+--------------------+------------------+-----+--------------------+
|RT @goeentertain:...|665305154954989568| 1.0|RT @goeentertain:...|
|Teforia Uses Mach...|660668007975268352| 1.0|Teforia Uses Mach...|
(continues on next page)
+--------------------+------------------+-----+--------------------+----------
˓→----------+
| text| id|label| stop_text|
˓→ feat_text|
+--------------------+------------------+-----+--------------------+----------
˓→----------+
|RT @goeentertain:...|665305154954989568| 1.0|RT @goeentertain:...| future
˓→blase ...|
|Teforia Uses Mach...|660668007975268352| 1.0|Teforia Uses Mach...|teforia
˓→uses mach...|
| Apple TV or Roku?| 25842461136| 1.0| Apple TV Roku?|
˓→apple roku|
|Finished http://t...| 9412369614| 1.0|Finished http://t...|
˓→ finished|
+--------------------+------------------+-----+--------------------+----------
˓→----------+
tagged_df.show(4)
+--------------------+------------------+-----+--------------------+----------
˓→----------+
8. lemmatization of words
lemm_df = tagged_df.select(raw_cols+["tagged_text"]) \
.withColumn("lemm_text", lemmatize_udf(tagged_df["tagged_
˓→text"]))
lemm_df.show(4)
+--------------------+------------------+-----+--------------------+----------
˓→----------+
| text| id|label| tagged_text|
˓→ lemm_text|
+--------------------+------------------+-----+--------------------+----------
˓→----------+
# drop duplicates
dedup_df = no_blanks_df.dropDuplicates(['text', 'label'])
dedup_df.show(4)
+--------------------+------------------+-----+--------------------+--------+
| text| id|label| lemm_text|is_blank|
+--------------------+------------------+-----+--------------------+--------+
|RT @goeentertain:...|665305154954989568| 1.0|future blase vice...| False|
|Teforia Uses Mach...|660668007975268352| 1.0|teforia use machi...| False|
| Apple TV or Roku?| 25842461136| 1.0| apple roku| False|
|Finished http://t...| 9412369614| 1.0| finish| False|
+--------------------+------------------+-----+--------------------+--------+
(continues on next page)
+--------------------+------------------+-----+--------------------+--------+-
˓→-----------+
+------------+------------------+--------------------+-----+
| uid| id| text|label|
+------------+------------------+--------------------+-----+
| 85899345920| 1546813742| dragon| 1.0|
|111669149696| 1558492525| hurt much| 1.0|
|128849018880|383221484023709697|seth blog word se...| 1.0|
|137438953472|660668007975268352|teforia use machi...| 1.0|
+------------+------------------+--------------------+-----+
only showing top 4 rows
# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = data.randomSplit([0.6, 0.4])
# Pipeline Architecture
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, nb])
predictions = model.transform(testData)
+-----------------------------------------------+-----+----------+
|text |label|prediction|
+-----------------------------------------------+-----+----------+
|finish |1.0 |1.0 |
|meet rolo dogsofthinkgeek happy nationaldogday |1.0 |1.0 |
|pumpkin family |1.0 |1.0 |
|meet jet dogsofthinkgeek happy nationaldogday |1.0 |1.0 |
|meet vixie dogsofthinkgeek happy nationaldogday|1.0 |1.0 |
+-----------------------------------------------+-----+----------+
only showing top 5 rows
15. evaluation
0.912655971479501
13.4.1 Introduction
Sentiment analysis (sometimes known as opinion mining or emotion AI) refers to the use of natural language
processing, text analysis, computational linguistics, and biometrics to systematically identify, extract, quan-
tify, and study affective states and subjective information. Sentiment analysis is widely applied to voice
of the customer materials such as reviews and survey responses, online and social media, and healthcare
materials for applications that range from marketing to customer service to clinical medicine.
Generally speaking, sentiment analysis aims to determine the attitude of a speaker, writer, or other subject
with respect to some topic or the overall contextual polarity or emotional reaction to a document, interaction,
or event. The attitude may be a judgment or evaluation (see appraisal theory), affective state (that is to say,
the emotional state of the author or speaker), or the intended emotional communication (that is to say, the
emotional effect intended by the author or interlocutor).
Sentiment analysis in business, also known as opinion mining is a process of identifying and cataloging a
piece of text according to the tone conveyed by it. It has broad application:
• Sentiment Analysis in Business Intelligence Build up
• Sentiment Analysis in Business for Competitive Advantage
• Enhancing the Customer Experience through Sentiment Analysis in Business
13.4.2 Pipeline
13.4.3 Demo
spark = SparkSession \
.builder \
.appName("Python Spark Sentiment Analysis example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
2. Load dataset
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').\
load("../data/newtwitter.csv",header=True);
+--------------------+----------+-------+
| text| id|pubdate|
+--------------------+----------+-------+
|10 Things Missing...|2602860537| 18536|
|RT @_NATURALBWINN...|2602850443| 18536|
|RT @HBO24 yo the ...|2602761852| 18535|
|Aaaaaaaand I have...|2602738438| 18535|
|can I please have...|2602684185| 18535|
+--------------------+----------+-------+
only showing top 5 rows
3. Text Preprocessing
• remove non ASCII characters
check:
df = df.withColumn('text_non_asci',strip_non_ascii_udf(df['text']))
df.show(5,True)
ouput:
+--------------------+----------+-------+--------------------+
| text| id|pubdate| text_non_asci|
+--------------------+----------+-------+--------------------+
|10 Things Missing...|2602860537| 18536|10 Things Missing...|
|RT @_NATURALBWINN...|2602850443| 18536|RT @_NATURALBWINN...|
|RT @HBO24 yo the ...|2602761852| 18535|RT @HBO24 yo the ...|
|Aaaaaaaand I have...|2602738438| 18535|Aaaaaaaand I have...|
|can I please have...|2602684185| 18535|can I please have...|
+--------------------+----------+-------+--------------------+
only showing top 5 rows
• fixed abbreviation
# fixed abbreviation
def fix_abbreviation(data_str):
data_str = data_str.lower()
data_str = re.sub(r'\bthats\b', 'that is', data_str)
data_str = re.sub(r'\bive\b', 'i have', data_str)
data_str = re.sub(r'\bim\b', 'i am', data_str)
data_str = re.sub(r'\bya\b', 'yeah', data_str)
data_str = re.sub(r'\bcant\b', 'can not', data_str)
data_str = re.sub(r'\bdont\b', 'do not', data_str)
data_str = re.sub(r'\bwont\b', 'will not', data_str)
data_str = re.sub(r'\bid\b', 'i would', data_str)
data_str = re.sub(r'wtf', 'what the fuck', data_str)
data_str = re.sub(r'\bwth\b', 'what the hell', data_str)
data_str = re.sub(r'\br\b', 'are', data_str)
data_str = re.sub(r'\bu\b', 'you', data_str)
data_str = re.sub(r'\bk\b', 'OK', data_str)
data_str = re.sub(r'\bsux\b', 'sucks', data_str)
data_str = re.sub(r'\bno+\b', 'no', data_str)
data_str = re.sub(r'\bcoo+\b', 'cool', data_str)
data_str = re.sub(r'rt\b', '', data_str)
(continues on next page)
check:
df = df.withColumn('fixed_abbrev',fix_abbreviation_udf(df['text_non_asci
˓→']))
df.show(5,True)
ouput:
+--------------------+----------+-------+--------------------+----------------
˓→----+
| text| id|pubdate| text_non_asci| fixed_
˓→abbrev|
+--------------------+----------+-------+--------------------+----------------
˓→----+
def remove_features(data_str):
# compile regex
url_re = re.compile('https?://(www.)?\w+\.\w+(/\w+)*/?')
punc_re = re.compile('[%s]' % re.escape(string.punctuation))
num_re = re.compile('(\\d+)')
mention_re = re.compile('@(\w+)')
alpha_num_re = re.compile("^[a-z0-9_.]+$")
# convert to lowercase
data_str = data_str.lower()
# remove hyperlinks
data_str = url_re.sub(' ', data_str)
# remove @mentions
data_str = mention_re.sub(' ', data_str)
# remove puncuation
data_str = punc_re.sub(' ', data_str)
# remove numeric 'words'
(continues on next page)
check:
df = df.withColumn('removed',remove_features_udf(df['fixed_abbrev']))
df.show(5,True)
ouput:
+--------------------+----------+-------+--------------------+----------------
˓→----+--------------------+
def sentiment_analysis(text):
return TextBlob(text).sentiment.polarity
• Sentiment score
+--------------------+---------------+
| removed|sentiment_score|
+--------------------+---------------+
|things missing in...| -0.03181818|
|oh and do not lik...| -0.03181818|
|yo the newtwitter...| 0.3181818|
|aaaaaaaand have t...| 0.11818182|
|can please have t...| 0.13636364|
+--------------------+---------------+
only showing top 5 rows
• Words frequency
• Sentiment Classification
def condition(r):
if (r >=0.1):
(continues on next page)
5. Output
• Sentiment Class
+--------------------+---------------+---------+
| text|sentiment_score|sentiment|
+--------------------+---------------+---------+
|Lists on #NewTwit...| -0.1| neutral|
|Too bad most of m...| -0.1| neutral|
|the #newtwitter i...| -0.1| neutral|
(continues on next page)
+--------------------+---------------+---------+
| text|sentiment_score|sentiment|
+--------------------+---------------+---------+
|oh. #newtwitter i...| -1.0| negative|
|RT @chqwn: #NewTw...| -1.0| negative|
|Copy that - its W...| -1.0| negative|
|RT @chqwn: #NewTw...| -1.0| negative|
|#NewTwitter has t...| -1.0| negative|
+--------------------+---------------+---------+
only showing top 5 rows
13.6.1 Introduction
In text mining, a topic model is a unsupervised model for discovering the abstract “topics” that occur in a
collection of documents.
Latent Dirichlet Allocation (LDA) is a mathematical method for estimating both of these at the same time:
finding the mixture of words that is associated with each topic, while also determining the mixture of topics
that describes each document.
13.6.2 Demo
1. Load data
+-----+---------------+---------+--------+------+--------+-----+-----
˓→------+--------------------+
1. Text preprocessing
I will use the following raw column names to keep my table concise:
raw_cols = rawdata.columns
raw_cols
rawdata = rawdata.dropDuplicates(['review'])
• fixed abbreviation
# fixed abbreviation
def fix_abbreviation(data_str):
data_str = data_str.lower()
data_str = re.sub(r'\bthats\b', 'that is', data_str)
data_str = re.sub(r'\bive\b', 'i have', data_str)
data_str = re.sub(r'\bim\b', 'i am', data_str)
data_str = re.sub(r'\bya\b', 'yeah', data_str)
data_str = re.sub(r'\bcant\b', 'can not', data_str)
data_str = re.sub(r'\bdont\b', 'do not', data_str)
data_str = re.sub(r'\bwont\b', 'will not', data_str)
data_str = re.sub(r'\bid\b', 'i would', data_str)
data_str = re.sub(r'wtf', 'what the fuck', data_str)
data_str = re.sub(r'\bwth\b', 'what the hell', data_str)
data_str = re.sub(r'\br\b', 'are', data_str)
data_str = re.sub(r'\bu\b', 'you', data_str)
data_str = re.sub(r'\bk\b', 'OK', data_str)
data_str = re.sub(r'\bsux\b', 'sucks', data_str)
data_str = re.sub(r'\bno+\b', 'no', data_str)
data_str = re.sub(r'\bcoo+\b', 'cool', data_str)
data_str = re.sub(r'rt\b', '', data_str)
data_str = data_str.strip()
return data_str
list_pos = 0
cleaned_str = ''
for word in data_str.split():
if list_pos == 0:
if alpha_num_re.match(word) and len(word) > 1:
cleaned_str = word
else:
cleaned_str = ' '
else:
if alpha_num_re.match(word) and len(word) > 1:
cleaned_str = cleaned_str + ' ' + word
else:
cleaned_str += ' '
list_pos += 1
# remove unwanted space, *.split() will automatically split on
# whitespace and discard duplicates, the " ".join() joins the
# resulting list into one string.
return " ".join(cleaned_str.split())
• Part-of-Speech Tagging
# Part-of-Speech Tagging
def tag_and_remove(data_str):
cleaned_str = ' '
# noun tags
nn_tags = ['NN', 'NNP', 'NNP', 'NNPS', 'NNS']
# adjectives
jj_tags = ['JJ', 'JJR', 'JJS']
# verbs
vb_tags = ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ']
nltk_tags = nn_tags + jj_tags + vb_tags
# tag the text and keep only those with the right tags
tagged_text = pos_tag(text)
for tagged_word in tagged_text:
if tagged_word[1] in nltk_tags:
cleaned_str += tagged_word[0] + ' '
return cleaned_str
• lemmatization
# lemmatization
def lemmatize(data_str):
# expects a string
list_pos = 0
cleaned_str = ''
lmtzr = WordNetLemmatizer()
text = data_str.split()
tagged_words = pos_tag(text)
for word in tagged_words:
if 'v' in word[1].lower():
lemma = lmtzr.lemmatize(word[0], pos='v')
else:
lemma = lmtzr.lemmatize(word[0], pos='n')
if list_pos == 0:
cleaned_str = lemma
else:
cleaned_str = cleaned_str + ' ' + lemma
list_pos += 1
return cleaned_str
1. Text processing
• correct the data schema
rawdata.printSchema()
root
|-- id: string (nullable = true)
|-- airline: string (nullable = true)
|-- date: string (nullable = true)
|-- location: string (nullable = true)
|-- rating: float (nullable = true)
|-- cabin: string (nullable = true)
|-- value: string (nullable = true)
|-- recommended: string (nullable = true)
|-- review: string (nullable = true)
# https://docs.python.org/2/library/datetime.html#strftime-and-
˓→strptime-behavior
# 21-Jun-14 <----> %d-%b-%y
to_date = udf (lambda x: datetime.strptime(x, '%d-%b-%y'),
˓→DateType())
rawdata.printSchema()
root
|-- id: string (nullable = true)
|-- airline: string (nullable = true)
|-- date: date (nullable = true)
|-- location: string (nullable = true)
|-- rating: float (nullable = true)
|-- cabin: string (nullable = true)
(continues on next page)
rawdata.show(5)
+-----+------------------+----------+--------+------+--------+-----+-
˓→----------+--------------------+
| id| airline| date|location|rating|
˓→cabin|value|recommended| review|
+-----+------------------+----------+--------+------+--------+-----+-
˓→----------+--------------------+
|10551|Southwest Airlines|2013-11-06| USA| 1.0|Business| 2|
˓→ NO|Flight 3246 from ...|
|10298| US Airways|2014-03-31| UK| 1.0|Business| 0|
˓→ NO|Flight from Manch...|
|10564|Southwest Airlines|2013-09-06| USA| 10.0| Economy| 5|
˓→ YES|I'm Executive Pla...|
|10134| Delta Air Lines|2013-12-10| USA| 8.0| Economy| 4|
˓→ YES|MSP-JFK-MXP and r...|
|10912| United Airlines|2014-04-07| USA| 3.0| Economy| 1|
˓→ NO|Worst airline I h...|
+-----+------------------+----------+--------+------+--------+-----+-
˓→----------+--------------------+
only showing top 5 rows
+-----+------------------+----------+--------+------+--------+-----+-
˓→----------+--------------------+--------------------+
rawdata = rawdata.select(raw_cols+['non_asci'])\
.withColumn('fixed_abbrev',fix_abbreviation_
˓→udf(rawdata['non_asci']))
+-----+------------------+----------+--------+------+--------+-----+-
˓→----------+--------------------+--------------------+--------------
˓→------+
| id| airline| date|location|rating|
˓→cabin|value|recommended| review| non_asci|
˓→ fixed_abbrev|
+-----+------------------+----------+--------+------+--------+-----+-
˓→----------+--------------------+--------------------+--------------
˓→------+
|10551|Southwest Airlines|2013-11-06| USA| 1.0|Business| 2|
˓→ NO|Flight 3246 from ...|Flight 3246 from ...|flight 3246
˓→from ...|
|10298| US Airways|2014-03-31| UK| 1.0|Business| 0|
˓→ NO|Flight from Manch...|Flight from Manch...|flight from
˓→manch...|
|10564|Southwest Airlines|2013-09-06| USA| 10.0| Economy| 5|
˓→ YES|I'm Executive Pla...|I'm Executive Pla...|i'm executive
˓→pla...|
|10134| Delta Air Lines|2013-12-10| USA| 8.0| Economy| 4|
˓→ YES|MSP-JFK-MXP and r...|MSP-JFK-MXP and r...|msp-jfk-mxp
˓→and r...|
|10912| United Airlines|2014-04-07| USA| 3.0| Economy| 1|
˓→ NO|Worst airline I h...|Worst airline I h...|worst airline
˓→i h...|
+-----+------------------+----------+--------+------+--------+-----+-
˓→----------+--------------------+--------------------+--------------
˓→------+
only showing top 5 rows
rawdata = rawdata.select(raw_cols+['fixed_abbrev'])\
.withColumn('stop_text',remove_stops_udf(rawdata[
˓→'fixed_abbrev']))
+-----+------------------+----------+--------+------+--------+-----+-
˓→----------+--------------------+--------------------+--------------
˓→------+
| id| airline| date|location|rating|
˓→cabin|value|recommended| review| fixed_abbrev|
˓→ stop_text|
+-----+------------------+----------+--------+------+--------+-----+-
˓→----------+--------------------+--------------------+--------------
˓→------+
|10551|Southwest Airlines|2013-11-06| USA| 1.0|Business| 2|
˓→ NO|Flight 3246 from ...|flight 3246 from ...|flight 3246
˓→chica...|
|10298| US Airways|2014-03-31| UK| 1.0|Business| 0|
˓→ NO|Flight from Manch...|flight from manch...|flight
˓→manchester...|
˓→------+
only showing top 5 rows
rawdata = rawdata.select(raw_cols+['stop_text'])\
.withColumn('feat_text',remove_features_udf(rawdata[
˓→'stop_text']))
+-----+------------------+----------+--------+------+--------+-----+-
˓→----------+--------------------+--------------------+--------------
˓→------+
| id| airline| date|location|rating|
˓→cabin|value|recommended| review| stop_text|
˓→ feat_text|
+-----+------------------+----------+--------+------+--------+-----+-
˓→----------+--------------------+--------------------+--------------
˓→------+
|10551|Southwest Airlines|2013-11-06| USA| 1.0|Business| 2|
˓→ NO|Flight 3246 from ...|flight 3246 chica...|flight
˓→chicago mi...|
+-----+------------------+----------+--------+------+--------+-----+-
˓→----------+--------------------+--------------------+--------------
˓→------+
only showing top 5 rows
rawdata = rawdata.select(raw_cols+['feat_text'])\
.withColumn('tagged_text',tag_and_remove_
˓→udf(rawdata['feat_text']))
rawdata = rawdata.select(raw_cols+['tagged_text']) \
.withColumn('lemm_text',lemmatize_udf(rawdata[
˓→'tagged_text'])
+-----+------------------+----------+--------+------+--------+-----+-
˓→----------+--------------------+--------------------+--------------
˓→------+
| id| airline| date|location|rating|
˓→cabin|value|recommended| review| tagged_text|
˓→ lemm_text|
+-----+------------------+----------+--------+------+--------+-----+-
˓→----------+--------------------+--------------------+--------------
˓→------+
|10551|Southwest Airlines|2013-11-06| USA| 1.0|Business| 2|
˓→ NO|Flight 3246 from ...| flight chicago m...|flight
˓→chicago mi...|
|10298| US Airways|2014-03-31| UK| 1.0|Business| 0|
˓→ NO|Flight from Manch...| flight mancheste...|flight
˓→manchester...|
|10564|Southwest Airlines|2013-09-06| USA| 10.0| Economy| 5|
˓→ YES|I'm Executive Pla...| executive platin...|executive
˓→platinu...|
(continues on next page)
rawdata = rawdata.select(raw_cols+['lemm_text']) \
.withColumn("is_blank", check_blanks_udf(rawdata[
˓→"lemm_text"]))
+-----+------------------+----------+--------+------+--------+-----+-
˓→----------+--------------------+--------------------+--------+
+-----+------------------+----------+--------+------+--------+-----+-
˓→----------+--------------------+--------------------+--------+---+
| id| airline| date|location|rating|
˓→cabin|value|recommended| review| lemm_
˓→text|is_blank|uid|
+-----+------------------+----------+--------+------+--------+-----+-
˓→----------+--------------------+--------------------+--------+---+
|10551|Southwest Airlines|2013-11-06| USA| 1.0|Business| 2|
˓→ NO|Flight 3246 from ...|flight chicago mi...| False| 0|
(continues on next page)
model = pipeline.fit(data)
1. Results presentation
• Topics
+-----+--------------------+--------------------+
|topic| termIndices| termWeights|
+-----+--------------------+--------------------+
| 0|[60, 7, 12, 483, ...|[0.01349507958269...|
| 1|[363, 29, 187, 55...|[0.01247250144447...|
(continues on next page)
• Topic terms
from pyspark.sql.types import ArrayType, StringType
def termsIdx2Term(vocabulary):
def termsIdx2Term(termIndices):
return [vocabulary[int(index)] for index in termIndices]
return udf(termsIdx2Term, ArrayType(StringType()))
vectorizerModel = model.stages[1]
vocabList = vectorizerModel.vocabulary
final = ldatopics.withColumn("Terms", termsIdx2Term(vocabList)(
˓→"termIndices"))
+-----+------------------------------------------------+-------------
˓→-------------------------------------------------------------------
˓→-----+
|topic|termIndices |Terms
˓→
˓→ |
+-----+------------------------------------------------+-------------
˓→-------------------------------------------------------------------
˓→-----+
|0 |[60, 7, 12, 483, 292, 326, 88, 4, 808, 32] |[pm, plane,
˓→board, kid, online, lga, schedule, get, memphis, arrive]
˓→ |
|1 |[363, 29, 187, 55, 48, 647, 30, 9, 204, 457] |[dublin,
˓→class, th, sit, entertainment, express, say, delay, dl, son]
˓→ |
|2 |[46, 107, 672, 274, 92, 539, 23, 27, 279, 8] |[economy,
˓→sfo, milwaukee, decent, comfortable, iad, return, united, average,
˓→airline]|
(continues on next page)
˓→ |
|5 |[122, 103, 181, 48, 434, 10, 121, 147, 934, 169]|[lhr, serve,
˓→screen, entertainment, ny, delta, excellent, atl, sin, newark]
˓→ |
|6 |[14, 270, 18, 74, 70, 37, 16, 450, 3, 20] |[check,
˓→employee, gate, line, change, wait, take, fll, time, tell]
˓→ |
|7 |[111, 36, 341, 10, 320, 528, 844, 19, 195, 524] |[atlanta,
˓→first, toilet, delta, washington, card, global, staff, route,
˓→amsterdam] |
|8 |[477, 266, 297, 185, 1, 33, 22, 783, 17, 908] |[fuel, group,
˓→ pas, boarding, seat, trip, minute, orleans, make, select]
˓→ |
|9 |[10, 73, 46, 1, 248, 302, 213, 659, 48, 228] |[delta, lax,
˓→economy, seat, london, detroit, comfo, weren, entertainment, wife]
˓→ |
|10 |[57, 29, 411, 10, 221, 121, 661, 19, 805, 733] |[business,
˓→class, fra, delta, lounge, excellent, syd, staff, nov, mexico]
˓→ |
|11 |[293, 119, 385, 481, 503, 69, 13, 87, 176, 545] |[march, ua,
˓→manchester, phx, envoy, drink, crew, american, aa, canada]
˓→ |
|12 |[116, 218, 256, 156, 639, 20, 365, 18, 22, 136] |[san, clt,
˓→francisco, second, text, tell, captain, gate, minute, available]
˓→ |
|13 |[433, 171, 176, 339, 429, 575, 10, 26, 474, 796]|[daughter,
˓→small, aa, ba, segment, proceed, delta, passenger, size, similar]
˓→ |
|14 |[74, 84, 45, 108, 342, 111, 315, 87, 52, 4] |[line, agent,
˓→ next, hotel, standby, atlanta, dallas, american, book, get]
˓→ |
|15 |[669, 215, 14, 58, 561, 59, 125, 179, 93, 5] |[fit, carry,
˓→check, people, bathroom, ask, thing, row, don, fly]
˓→ |
|16 |[198, 21, 98, 164, 57, 141, 345, 62, 121, 174] |[ife, good,
˓→nice, much, business, lot, dfw, great, excellent, carrier]
˓→ |
|17 |[96, 29, 569, 444, 15, 568, 21, 103, 657, 505] |[phl, class,
˓→diego, lady, food, wheelchair, good, serve, miami, mia]
˓→ |
|18 |[18, 60, 140, 64, 47, 40, 31, 35, 2, 123] |[gate, pm,
˓→phoenix, connection, cancel, connect, day, airpo, hour, charlotte]
˓→ |
|19 |[33, 178, 95, 2, 9, 284, 42, 4, 89, 31] |[trip,
˓→counter, philadelphia, hour, delay, stay, way, get, southwest,
˓→day] |
+-----+------------------------------------------------+-------------
(continues on next page)
˓→-------------------------------------------------------------------
˓→-----+
• LDA results
+-----+------------------+----------+-----------+------+-------------
˓→-------+--------------------+--------------------+
| id| airline| date| cabin|rating|
˓→ words| features| topicDistribution|
+-----+------------------+----------+-----------+------+-------------
˓→-------+--------------------+--------------------+
FOURTEEN
Chinese proverb
A Touch of Cloth,linked in countless ways. – old Chinese proverb
14.1 Introduction
Co-occurrence networks are generally used to provide a graphic visualization of potential relationships
245
Learning Apache Spark with Python
between people, organizations, concepts or other entities represented within written material. The generation
and visualization of co-occurrence networks has become practical with the advent of electronically stored
text amenable to text mining.
14.2.1 Methodology
• Build Corpus C
• Build Document-Term matrix D based on Corpus C
• Compute Term-Document matrix 𝐷𝑇
• Adjacency Matrix 𝐴 = 𝐷𝑇 · 𝐷
There are four main components in this algorithm in the algorithm: Corpus C, Document-Term matrix D,
Term-Document matrix 𝐷𝑇 and Adjacency Matrix A. In this demo part, I will show how to build those four
main components.
Given that we have three groups of friends, they are
+-------------------------------------+
|words |
+-------------------------------------+
|[[george] [jimmy] [john] [peter]] |
|[[vincent] [george] [stefan] [james]]|
|[[emma] [james] [olivia] [george]] |
+-------------------------------------+
1. Corpus C
Then we can build the following corpus based on the unique elements in the given group data:
+-------------------------------+
|features |
+-------------------------------+
|(9,[0,2,3,7],[1.0,1.0,1.0,1.0])|
|(9,[0,1,4,5],[1.0,1.0,1.0,1.0])|
|(9,[0,1,6,8],[1.0,1.0,1.0,1.0])|
+-------------------------------+
• Term-Document matrix 𝐷𝑇
RDD:
[array([ 1., 1., 1.]), array([ 0., 1., 1.]), array([ 1., 0., 0.
˓→]),
array([ 1., 0., 0.]), array([ 0., 1., 0.]), array([ 0., 1., 0.
˓→]),
array([ 0., 0., 1.]), array([ 1., 0., 0.]), array([ 0., 0., 1.
˓→])]
Matrix:
3. Adjacency Matrix 𝐴 = 𝐷𝑇 · 𝐷
RDD:
[array([ 1., 1., 1.]), array([ 0., 1., 1.]), array([ 1., 0., 0.
˓→]),
array([ 1., 0., 0.]), array([ 0., 1., 0.]), array([ 0., 1., 0.
˓→]),
array([ 0., 0., 1.]), array([ 1., 0., 0.]), array([ 0., 0., 1.
˓→])]
Matrix:
array([[ 3., 2., 1., 1., 1., 1., 1., 1., 1.],
[ 2., 2., 0., 0., 1., 1., 1., 0., 1.],
[ 1., 0., 1., 1., 0., 0., 0., 1., 0.],
[ 1., 0., 1., 1., 0., 0., 0., 1., 0.],
[ 1., 1., 0., 0., 1., 1., 0., 0., 0.],
[ 1., 1., 0., 0., 1., 1., 0., 0., 0.],
[ 1., 1., 0., 0., 0., 0., 1., 0., 1.],
[ 1., 0., 1., 1., 0., 0., 0., 1., 0.],
[ 1., 1., 0., 0., 0., 0., 1., 0., 1.]])
• Problem
The attached utf-8 encoded text file contains the tags associated with an online biomedical scientific article
formatted as follows (size: 100000). Each Scientific article is represented by a line in the file delimited by
carriage return.
+--------------------+
| words|
+--------------------+
|[ACTH Syndrome, E...|
|[Antibody Formati...|
|[Adaptation, Phys...|
|[Aerosol Propella...|
+--------------------+
only showing top 4 rows
Write a program that, using this file as input, produces a list of pairs of tags which appear TOGETHER
in any order and position in at least fifty different Scientific articles. For example, in the above sample,
[Female] and [Humans] appear together twice, but every other pair appears only once. Your program should
output the pair list to stdout in the same form as the input (eg tag 1, tag 2n).
• My solution
The corresponding words frequency:
Output:
+----------+------+-------+
| term.x|term.y| freq|
+----------+------+-------+
| Female|Humans|16741.0|
| Male|Humans|13883.0|
| Adult|Humans|10391.0|
| Male|Female| 9806.0|
|MiddleAged|Humans| 8181.0|
| Adult|Female| 7411.0|
| Adult| Male| 7240.0|
|MiddleAged| Male| 6328.0|
|MiddleAged|Female| 6002.0|
|MiddleAged| Adult| 5944.0|
+----------+------+-------+
only showing top 10 rows
df = spark.read.csv("matrix1.txt",sep=",",inferSchema=True)
df.show()
+---+---+---+---+
|_c0|_c1|_c2|_c3|
+---+---+---+---+
|1.2|3.4|2.3|1.1|
|2.3|1.1|1.5|2.2|
|3.3|1.8|4.5|3.3|
|5.3|2.2|4.5|4.4|
|9.3|8.1|0.3|5.5|
|4.5|4.3|2.1|6.6|
+---+---+---+---+
+------------------+------------------+------------------+------------------+
| op_0| op_1| op_2| op_3|
+------------------+------------------+------------------+------------------+
| 152.45|118.88999999999999| 57.15|121.44000000000001|
|118.88999999999999|104.94999999999999| 38.93| 94.71|
| 57.15| 38.93|52.540000000000006| 55.99|
|121.44000000000001| 94.71| 55.99|110.10999999999999|
+------------------+------------------+------------------+------------------+
import numpy as np
a = np.genfromtxt("matrix1.txt",delimiter=",")
np.dot(a.T, a)
TODO ..
FIFTEEN
Chinese proverb
Don’t put all your eggs in one basket.
import numpy as np
import matplotlib.pyplot as plt
(continues on next page)
253
Learning Apache Spark with Python
print(data)
print(ingredients)
def func(pct, allvals):
absolute = int(pct/100.*np.sum(allvals))
return "{:.1f}%\n({:d} k)".format(pct, absolute)
#ax.set_title("Stock portfolio")
plt.show()
Recommender systems or recommendation systems (sometimes replacing “system” with a synonym such
as platform or engine) are a subclass of information filtering system that seek to predict the “rating” or
“preference” that a user would give to an item.”
The main idea is to build a matrix users R items rating values and try to factorize it, to recommend main
products rated by other users. A popular approach for this is matrix factorization is Alternating Least Squares
(ALS)
Apache Spark ML implements ALS for collaborative filtering, a very popular algorithm for making recom-
mendations.
ALS recommender is a matrix factorization algorithm that uses Alternating Least Squares with Weighted-
Lamda-Regularization (ALS-WR). It factors the user to item matrix A into the user-to-feature matrix U and
the item-to-feature matrix M: It runs the ALS algorithm in a parallel fashion. The ALS algorithm should
uncover the latent factors that explain the observed user to item ratings and tries to find optimal factor
weights to minimize the least squares between predicted and actual ratings.
https://www.elenacuoco.com/2016/12/22/alternating-least-squares-als-spark-ml/
15.3 Demo
spark = SparkSession \
.builder \
.appName("Python Spark RFM example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
2. Load dataset
df_raw = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').\
load("Online Retail.csv",header=True);
df_raw.show(5)
df_raw.printSchema()
+---------+---------+--------------------+--------+------------+---------+----
˓→------+--------------+
|InvoiceNo|StockCode| Description|Quantity|
˓→InvoiceDate|UnitPrice|CustomerID| Country|
(continues on next page)
root
|-- InvoiceNo: string (nullable = true)
|-- StockCode: string (nullable = true)
|-- Description: string (nullable = true)
|-- Quantity: integer (nullable = true)
|-- InvoiceDate: string (nullable = true)
|-- UnitPrice: double (nullable = true)
|-- CustomerID: integer (nullable = true)
|-- Country: string (nullable = true)
def my_count(df_in):
df_in.agg( *[ count(c).alias(c) for c in df_in.columns ] ).show()
import pyspark.sql.functions as F
from pyspark.sql.functions import round
df_raw = df_raw.withColumn('Asset',round( F.col('Quantity') * F.col('UnitPrice
˓→'), 2 ))
df = df_raw.withColumnRenamed('StockCode', 'Cusip')\
.select('CustomerID','Cusip','Quantity','UnitPrice','Asset')
my_count(df)
+----------+------+--------+---------+------+
|CustomerID| Cusip|Quantity|UnitPrice| Asset|
+----------+------+--------+---------+------+
| 406829|541909| 541909| 541909|541909|
+----------+------+--------+---------+------+
Since the count results are not the same, we have some null value in the CustomerID column. We can
df = df.filter(F.col('Asset')>=0)
df = df.dropna(how='any')
my_count(df)
+----------+------+--------+---------+------+
|CustomerID| Cusip|Quantity|UnitPrice| Asset|
+----------+------+--------+---------+------+
| 397924|397924| 397924| 397924|397924|
+----------+------+--------+---------+------+
df.show(3)
+----------+------+--------+---------+-----+
|CustomerID| Cusip|Quantity|UnitPrice|Asset|
+----------+------+--------+---------+-----+
| 17850|85123A| 6| 2.55| 15.3|
| 17850| 71053| 6| 3.39|20.34|
| 17850|84406B| 8| 2.75| 22.0|
+----------+------+--------+---------+-----+
only showing top 3 rows
def toUpper(s):
return s.upper()
pop = df.groupBy('Cusip')\
.agg(F.count('CustomerID').alias('Customers'),F.round(F.sum('Asset'),2).
˓→alias('TotalAsset'))\
.sort([F.col('Customers'),F.col('TotalAsset')],ascending=[0,0])
pop.show(5)
+------+---------+----------+
| Cusip|Customers|TotalAsset|
+------+---------+----------+
|85123A| 2035| 100603.5|
| 22423| 1724| 142592.95|
|85099B| 1618| 85220.78|
| 84879| 1408| 56580.34|
| 47566| 1397| 68844.33|
+------+---------+----------+
only showing top 5 rows
top = 10
cusip_lst = pd.DataFrame(pop.select('Cusip').head(top)).astype('str').iloc[:,
˓→0].tolist()
cusip_lst.insert(0,'CustomerID')
pivot_tab = df.groupBy('CustomerID').pivot('Cusip').sum('Asset')
pivot_tab = pivot_tab.fillna(0)
selected_tab = pivot_tab.select(cusip_lst)
selected_tab.show(4)
+----------+------+-----+------+-----+-----+-----+-----+-----+----+-----+
|CustomerID|85123A|22423|85099B|84879|47566|20725|22720|20727|POST|23203|
+----------+------+-----+------+-----+-----+-----+-----+-----+----+-----+
| 16503| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 33.0| 0.0| 0.0|
| 15727| 123.9| 25.5| 0.0| 0.0| 0.0| 33.0| 99.0| 0.0| 0.0| 0.0|
| 14570| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|
| 14450| 0.0| 0.0| 8.32| 0.0| 0.0| 0.0| 49.5| 0.0| 0.0| 0.0|
+----------+------+-----+------+-----+-----+-----+-----+-----+----+-----+
only showing top 4 rows
def elemwiseDiv(df_in):
num = len(df_in.columns)
temp = df_in.rdd.map(lambda x: list(flatten([x[0],[x[i]/float(sum(x[1:]))
if sum(x[1:])>0 else
˓→x[i]
for i in range(1,
˓→num)]])))
return spark.createDataFrame(temp,df_in.columns)
ratings = elemwiseDiv(selected_tab)
ratings.show(4)
+----------+------+-----+------+-----+-----+-----+-----+-----+----+-----+
|CustomerID|85123A|22423|85099B|84879|47566|20725|22720|20727|POST|23203|
+----------+------+-----+------+-----+-----+-----+-----+-----+----+-----+
| 16503| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0|
| 15727| 0.44| 0.09| 0.0| 0.0| 0.0| 0.12| 0.35| 0.0| 0.0| 0.0|
(continues on next page)
# Filter dtypes and split into column names and type description
cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
# Spark SQL supports only homogeneous columns
assert len(set(dtypes)) == 1, "All columns have to be of the same type"
df_all = to_long(ratings,['CustomerID'])
df_all.show(5)
+----------+------+------+
|CustomerID| Cusip|rating|
+----------+------+------+
| 16503|85123A| 0.0|
| 16503| 22423| 0.0|
| 16503|85099B| 0.0|
| 16503| 84879| 0.0|
| 16503| 47566| 0.0|
+----------+------+------+
only showing top 5 rows
df_all.show(5, True)
df_all.printSchema()
+----------+------+------+------------+
(continues on next page)
root
|-- CustomerID: long (nullable = true)
|-- Cusip: string (nullable = false)
|-- rating: double (nullable = true)
|-- indexedCusip: double (nullable = true)
train.show(5)
test.show(5)
+----------+-----+------------+-------------------+
|CustomerID|Cusip|indexedCusip| rating|
+----------+-----+------------+-------------------+
| 12940|20725| 2.0| 0.0|
| 12940|20727| 4.0| 0.0|
| 12940|22423| 9.0|0.49990198000392083|
| 12940|22720| 3.0| 0.0|
| 12940|23203| 7.0| 0.0|
+----------+-----+------------+-------------------+
only showing top 5 rows
+----------+-----+------------+------------------+
|CustomerID|Cusip|indexedCusip| rating|
+----------+-----+------------+------------------+
| 12940|84879| 1.0|0.1325230346990786|
| 13285|20725| 2.0|0.2054154995331466|
| 13285|20727| 4.0|0.2054154995331466|
| 13285|47566| 0.0| 0.0|
| 13623|23203| 7.0| 0.0|
+----------+-----+------------+------------------+
only showing top 5 rows
• train model
import itertools
from math import sqrt
from operator import add
import sys
from pyspark.ml.recommendation import ALS
ranks = [4,5]
lambdas = [0.05]
numIters = [30]
bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1
val = test.na.drop()
for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
als = ALS(rank=rank, maxIter=numIter, regParam=lmbda, numUserBlocks=10,
˓→numItemBlocks=10, implicitPrefs=False,
alpha=1.0,
userCol="CustomerID", itemCol="indexedCusip", seed=1, ratingCol=
˓→"rating", nonnegative=True)
model=als.fit(train)
if (validationRmse, bestValidationRmse):
bestModel = model
bestValidationRmse = validationRmse
bestRank = rank
bestLambda = lmbda
bestNumIter = numIter
model = bestModel
• make prediction
topredict=test[test['rating']==0]
predictions=model.transform(topredict)
predictions.filter(predictions.prediction>0)\
.sort([F.col('CustomerID'),F.col('Cusip')],ascending=[0,0]).show(5)
+----------+------+------------+------+------------+
|CustomerID| Cusip|indexedCusip|rating| prediction|
+----------+------+------------+------+------------+
| 18283| 47566| 0.0| 0.0| 0.01625076|
| 18282|85123A| 6.0| 0.0| 0.057172246|
| 18282| 84879| 1.0| 0.0| 0.059531752|
| 18282| 23203| 7.0| 0.0| 0.010502596|
| 18282| 22720| 3.0| 0.0| 0.053893942|
+----------+------+------------+------+------------+
only showing top 5 rows
SIXTEEN
Monte Carlo simulations are just a way of estimating a fixed parameter by repeatedly generating random
numbers. More details can be found at A Zero Math Introduction to Markov Chain Monte Carlo Methods.
Monte Carlo simulation is a technique used to understand the impact of risk and uncertainty in financial,
project management, cost, and other forecasting models. A Monte Carlo simulator helps one visualize most
or all of the potential outcomes to have a better idea regarding the risk of a decision. More details can be
found at The house always wins.
We assume that the player John has the 49% chance to win the game and the wager will be $5 per game.
263
Learning Apache Spark with Python
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
start_m =100
wager = 5
bets = 100
trials = 1000
end_m = []
for i in range(trials):
money = reduce(lambda c, x: c + [c[-1] + x], trans(np.random.
˓→random(bets)), [start_m])
end_m.append(money[-1])
plt.plot(money)
plt.show()
1. Fecth the data. If you need the code for this piece, you can contact with me.
stock.tail(4)
+----------+----------+----------+----------+----------+----------+--------+
| Date| Open| High| Low| Close| Adj Close| Volume|
+----------+----------+----------+----------+----------+----------+--------+
|2018-12-07|155.399994|158.050003|151.729996|153.059998|153.059998|17447900|
|2018-12-10|150.389999|152.809998|147.479996|151.429993|151.429993|15525500|
|2018-12-11|155.259995|156.240005|150.899994|151.830002|151.830002|13651900|
|2018-12-12|155.240005|156.169998|151.429993| 151.5| 151.5|16597900|
+----------+----------+----------+----------+----------+----------+--------+
stock['Date'] = pd.to_datetime(stock['Date'])
3. Data visualization
The formula for Compound Annual Growth Rate (CAGR) is very useful for investment analysis. It may also
be referred to as the annualized rate of return or annual percent yield or effective annual rate, depending on
the algebraic form of the equation. Many investments such as stocks have returns that can vary wildly. The
CAGR formula allows you to calculate a “smoothed” rate of return that you can use to compare to other
investments. The formula is defined as (more details can be found at CAGR Calculator and Formula)
(︂ )︂ 365
End Value Days
CAGR = −1
Start Value
A stock’s volatility is the variation in its price over a period of time. For example, one stock may have a
tendency to swing wildly higher and lower, while another stock may move in much steadier, less turbulent
way. Both stocks may end up at the same price at the end of day, but their path to that point can vary wildly.
First, we create a series of percentage returns and calculate the annual volatility of returns Annualizing
volatility. To present this volatility in annualized terms, we simply need to multiply our daily standard
deviation by the square root of 252. This assumes there are 252 trading days in a given year. More details
can be found at How to Calculate Annualized Volatility.
1. Create matrix of daily returns using random normal distribution Generates an RDD matrix comprised
of i.i.d. samples from the uniform distribution U(0.0, 1.0).
2. Transform the distribution in the generated RDD from U(0.0, 1.0) to U(a, b), use Random-
RDDs.uniformRDD(sc, n, p, seed) .map(lambda v: a + (b - a) * v)
a = mu/T
b = vol/math.sqrt(T)
v = mat.map(lambda x: a + (b - a)* x)
+--------+--------+--------+--------+--------+
| _1| _2| _3| _4| _5|
+--------+--------+--------+--------+--------+
|0.935234|1.162894| 1.07972|1.238257|1.066136|
|0.878456|1.045922|0.990071|1.045552|0.854516|
|1.186472|0.944777|0.742247|0.940023|1.220934|
|0.872928|1.030882|1.248644|1.114262|1.063762|
| 1.09742|1.188537|1.137283|1.162548|1.024612|
+--------+--------+--------+--------+--------+
only showing top 5 rows
price.show(5)
+--------+--------+--------+--------+--------+----------+
| _1| _2| _3| _4| _5|init_price|
+--------+--------+--------+--------+--------+----------+
|0.935234|1.162894| 1.07972|1.238257|1.066136| 151.5|
|0.878456|1.045922|0.990071|1.045552|0.854516| 151.5|
|1.186472|0.944777|0.742247|0.940023|1.220934| 151.5|
|0.872928|1.030882|1.248644|1.114262|1.063762| 151.5|
| 1.09742|1.188537|1.137283|1.162548|1.024612| 151.5|
+--------+--------+--------+--------+--------+----------+
only showing top 5 rows
+--------+--------+--------+--------+--------+----------+-----+
| _1| _2| _3| _4| _5|init_price|day_0|
+--------+--------+--------+--------+--------+----------+-----+
|0.935234|1.162894| 1.07972|1.238257|1.066136| 151.5|151.5|
|0.878456|1.045922|0.990071|1.045552|0.854516| 151.5|151.5|
|1.186472|0.944777|0.742247|0.940023|1.220934| 151.5|151.5|
|0.872928|1.030882|1.248644|1.114262|1.063762| 151.5|151.5|
| 1.09742|1.188537|1.137283|1.162548|1.024612| 151.5|151.5|
(continues on next page)
price.show(5)
+--------+--------+--------+--------+--------+----------+-----+------+------+-
˓→-----+------+------+
| _1| _2| _3| _4| _5|init_price|day_0| day_1| day_2|
˓→day_3| day_4| day_5|
+--------+--------+--------+--------+--------+----------+-----+------+------+-
˓→-----+------+------+
|0.935234|1.162894| 1.07972|1.238257|1.066136| 234.87|151.5|141.69|164.
˓→77|177.91| 220.3|234.87|
|0.878456|1.045922|0.990071|1.045552|0.854516| 123.14|151.5|133.09| 139.
˓→2|137.82| 144.1|123.14|
|1.186472|0.944777|0.742247|0.940023|1.220934| 144.67|151.5|179.75|169.
˓→82|126.05|118.49|144.67|
|0.872928|1.030882|1.248644|1.114262|1.063762| 201.77|151.5|132.25|136.
˓→33|170.23|189.68|201.77|
| 1.09742|1.188537|1.137283|1.162548|1.024612| 267.7|151.5|166.26|197.
˓→61|224.74|261.27| 267.7|
+--------+--------+--------+--------+--------+----------+-----+------+------+-
˓→-----+------+------+
only showing top 5 rows
16.2.6 Summary
simulated = price.select(selected_col)
simulated.describe().show()
+-------+----------+------------------+------------------+------------------+-
˓→-----------------+------------------+
|summary|2018-12-12| 2018-12-13| 2018-12-14| 2018-12-17|
˓→ 2018-12-18| 2018-12-19|
+-------+----------+------------------+------------------+------------------+-
˓→-----------------+------------------+
| count| 10000.0| 10000.0| 10000.0| 10000.0|
˓→ 10000.0| 10000.0|
(continues on next page)
data_plt = simulated.toPandas()
days = pd.date_range(stock['Date'].iloc[-1], periods= T+1,freq='B').date
width = 10
height = 6
fig = plt.figure(figsize=(width, height))
ax = fig.add_subplot(1,1,1)
for i in range(trials):
plt.plot(days, data_plt.iloc[i])
ax.set_xlabel('Date')
ax.set_ylabel('price ($)')
ax.set_title('Simulated Stock price: ' + ticker, y=1.01)
plt.show()
SEVENTEEN
Chinese proverb
A book is known in time of need.
Monte Carlo simulations are just a way of estimating a fixed parameter by repeatedly generating random
numbers. More details can be found at A Zero Math Introduction to Markov Chain Monte Carlo Methods.
Markov Chain Monte Carlo (MCMC) methods are used to approximate the posterior distribution of a pa-
rameter of interest by random sampling in a probabilistic space. More details can be found at A Zero Math
Introduction to Markov Chain Monte Carlo Methods.
The following theory and demo are from Dr. Rebecca C. Steorts’s Intro to Markov Chain Monte Carlo. More
details can be found at Dr. Rebecca C. Steorts’s STA 360/601: Bayesian Methods and Modern Statistics
class at Duke.
275
Learning Apache Spark with Python
3. Let
𝜃*
{︂
(𝑠+1) with prob min(𝑟, 1)
𝜃 = (17.1)
𝜃(𝑠) otherwise
Note: Actually, the (17.1) in Step 3 can be replaced by sampling 𝑢 ∼ Uniform(0, 1) and setting 𝜃(𝑠+1) = 𝜃*
if 𝑢 < 𝑟 and setting 𝜃(𝑠+1) = 𝜃(𝑠) otherwise.
The following example is going to test out the Metropolis algorithm for the conjugate Normal-Normal model
with a known variance situation.
𝑖𝑖𝑑
𝑋1 , · · · , 𝑋𝑛 𝜃 ∼ Normal(𝜃, 𝜎 2 )
𝜃 ∼ Normal(𝜇, 𝜏 2 )
𝑛/𝜎 2 1/𝜏 2
𝜇𝑛 = 𝑥
¯ + 𝜇
𝑛/𝜎 2 + 1/𝜏 2 𝑛/𝜎 2 + 1/𝜏 2
and
1
𝜏𝑛2 =
𝑛/𝜎 2 + 1/𝜏 2
𝑝(𝜃* |𝑥)
𝑟=
𝑝(𝜃(𝑠) |𝑥)
𝑝(𝑥|𝜃* )𝑝(𝜃* )
=
𝑝(𝑥|𝜃(𝑠) )𝑝(𝜃(𝑠) )
*
dnorm(𝜃* , 𝜇, 𝜏 )
(︂ ∏︀ )︂ (︂ )︂
𝑖 dnorm(𝑥𝑖 , 𝜃 , 𝜎)
= ∏︀ (𝑠)
𝑖 dnorm(𝑥𝑖 , 𝜃 , 𝜎) dnorm(𝜃(𝑠) , 𝜇, 𝜏 )
In many cases, computing the ratio 𝑟 directly can be numerically unstable, however, this can be modified by
taking 𝑙𝑜𝑔𝑟. i.e.
∑︁ (︁ )︁
𝑙𝑜𝑔𝑟 = 𝑙𝑜𝑔[dnorm(𝑥𝑖 , 𝜃* , 𝜎)] − 𝑙𝑜𝑔[dnorm(𝑥𝑖 , 𝜃(𝑠) , 𝜎)]
𝑖
∑︁ (︁ )︁
+ 𝑙𝑜𝑔[dnorm(𝜃* , 𝜇, 𝜏 )] − 𝑙𝑜𝑔[dnorm(𝜃(𝑠) , 𝜇, 𝜏 )]
𝑖
Then the criteria of the acceptance becomes: if 𝑙𝑜𝑔𝑢 < 𝑙𝑜𝑔𝑟, where 𝑢 is sample form the Uniform(0, 1).
17.3 Demos
Now, We generate 𝑆 iterations of the Metropolis algorithm starting at 𝜃(0) = 0 and using a normal proposal
distribution, where
17.3.1 R results
# setting values
set.seed(1)
s2<-1
t2<-10
mu<-5; n<-5
####metropolis part####
##S = total num of simulations
theta<-0 ; delta<-2 ; S<-10000 ; THETA<-NULL ; set.seed(1)
for(s in 1:S){
## simulating our proposal
#the new value of theta
#print(theta)
theta.star<-rnorm(1,theta,sqrt(delta))
##taking the log of the ratio r
log.r<-( sum(dnorm(y,theta.star,sqrt(s2),log=TRUE))+
dnorm(theta.star,mu,sqrt(t2),log=TRUE))-
( sum(dnorm(y,theta,sqrt(s2),log=TRUE))+
dnorm(theta,mu,sqrt(t2),log=TRUE))
#print(log.r)
if(log(runif(1))<log.r) { theta<-theta.star }
##updating THETA
#print(log(runif(1)))
THETA<-c(THETA,theta)
}
Figure. Histogram for the Metropolis algorithm with r shows a trace plot for this run as well as a histogram
for the Metropolis algorithm compared with a draw from the true normal density.
# coding: utf-8
# In[1]:
import numpy as np
# In[2]:
def rnorm(n,mean,sd):
"""
same functions as rnorm in r
r: rnorm(n, mean=0, sd=1)
py: rvs(loc=0, scale=1, size=1, random_state=None)
"""
return norm.rvs(loc=mean,scale=sd,size=n)
# In[3]:
s2 = 1
t2 = 10
mu = 5
n = 5
# In[4]:
y = rnorm(n,10,1)
y
# In[5]:
# In[6]:
t2_n = 1.0/(n/float(s2)+1.0/t2)
t2_n
# In[7]:
# In[8]:
# In[9]:
####metropolis part####
##S = total num of simulations
# theta<-0 ; delta<-2 ; S<-10000 ; THETA<-NULL ; set.seed(1)
theta = 0
delta = 2
S = 10000
theta_v = []
# In[ ]:
for s in range(S):
theta_star = norm.rvs(theta,np.sqrt(delta),1)
logr = (sum(dnorm(y,theta_star,np.sqrt(s2),log=True)) +
sum(dnorm(theta_star,mu,np.sqrt(t2),log=True)))-
(sum(dnorm(y,theta,np.sqrt(s2),log=True)) +
sum(dnorm([theta],mu,np.sqrt(t2),log=True)))
#print(logr)
if np.log(runif(1))<logr:
theta = theta_star
#print(theta)
theta_v.append(theta)
# In[ ]:
plt.figure(figsize=(20, 8))
plt.subplot(1, 2, 1)
plt.plot(theta_v,'b-.')
plt.subplot(1, 2, 2)
#bins = np.arange(0, S, 10)
plt.hist(theta_v, density=True,bins='auto')
x = np.linspace(min(theta_v),max(theta_v),100)
(continues on next page)
# In[ ]:
Figure. Histogram for the Metropolis algorithm with python shows a trace plot for this run as well as a
histogram for the Metropolis algorithm compared with a draw from the true normal density.
TODO. . .
Figure. Histogram for the Metropolis algorithm with PySpark shows a trace plot for this run as well as a
histogram for the Metropolis algorithm compared with a draw from the true normal density.
EIGHTEEN
NEURAL NETWORK
Chinese proverb
Sharpening the knife longer can make it easier to hack the firewood – old Chinese proverb
18.1.1 Introduction
A feedforward neural network is an artificial neural network wherein connections between the units do not
form a cycle. As such, it is different from recurrent neural networks.
The feedforward neural network was the first and simplest type of artificial neural network devised. In this
network, the information moves in only one direction, forward (see Fig. MultiLayer Neural Network), from
the input nodes, through the hidden nodes (if any) and to the output nodes. There are no cycles or loops in
the network.
285
Learning Apache Spark with Python
18.1.2 Demo
spark = SparkSession \
.builder \
.appName("Python Spark Feedforward neural network example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
2. Load dataset
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
|fixed|volatile|citric|sugar|chlorides|free|total|density|
˓→pH|sulphates|alcohol|quality|
+-----+--------+------+-----+---------+----+-----+-------+----+---------+-----
˓→--+-------+
#
def condition(r):
if (0<= r <= 4):
label = "low"
elif(4< r <= 6):
label = "medium"
else:
label = "high"
return label
data= transData(df)
data.show()
5. Split the data into training and test sets (40% held out for testing)
7. Make predictions
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)
8. Evaluation
NINETEEN
MY PYSPARK PACKAGE
It’s super easy to wrap your own package in Python. I packed some functions which I frequently used in my
daily work. You can download and install it from My PySpark Package. The hierarchical structure and the
directory structure of this package are as follows.
|-- build
| |-- bdist.linux-x86_64
| |-- lib.linux-x86_64-2.7
| |-- PySparkTools
| |-- __init__.py
| |-- Manipulation
| | |-- DataManipulation.py
| | |-- __init__.py
| | Visualization
| |-- __init__.py
|-- PyPlots.py
|-- dist
|-- PySParkTools-1.0-py2.7.egg
|-- __init__.py
|-- PySparkTools
| |-- __init__.py
| |-- Manipulation
| | |-- DataManipulation.py
| | |-- __init__.py
| |-- Visualization
| |-- __init__.py
| |-- PyPlots.py
|-- PyPlots.pyc
|-- PySParkTools.egg-info
| |-- dependency_links.txt
| |-- PKG-INFO
| |-- requires.txt
| |-- SOURCES.txt
| |-- top_level.txt
|-- README.md
|-- requirements.txt
(continues on next page)
289
Learning Apache Spark with Python
From the above hierarchical structure, you will find that you have to have __init__.py in each directory.
I will explain the __init__.py file with the example below:
19.2 Set Up
try:
with open("README.md") as f:
long_description = f.read()
except IOError:
long_description = ""
try:
with open("requirements.txt") as f:
requirements = [x.strip() for x in f.read().splitlines() if x.strip()]
except IOError:
requirements = []
setup(name='PySParkTools',
install_requires=requirements,
version='1.0',
description='Python Spark Tools',
author='Wenqiang Feng',
author_email='[email protected]',
url='https://github.com/runawayhorse001/PySparkTools',
packages=find_packages(),
long_description=long_description
)
19.3 ReadMe
# PySparkTools
This is my PySpark Tools. If you want to colne and install it, you can use
- clone
```{bash}
git clone [email protected]:runawayhorse001/PySparkTools.git
(continues on next page)
```{bash}
cd PySparkTools
pip install -r requirements.txt
python setup.py install
```
- test
```{bash}
cd PySparkTools/test
python test1.py
```
TWENTY
MY CHEAT SHEET
You can download the PDF version: PySpark Cheat Sheet and pdDataFrame vs rddDataFrame.
293
Learning Apache Spark with Python
295
Learning Apache Spark with Python
TWENTYONE
PYSPARK API
Those APIs are automatically generated from PySpark package, so all the CopyRights belong to Spark.
class pyspark.ml.stat.ChiSquareTest
Note: Experimental
Conduct Pearson’s independence test for every feature against the label. For each feature, the (feature,
label) pairs are converted into a contingency matrix for which the Chi-squared statistic is computed.
All label and feature values must be categorical.
The null hypothesis is that the occurrence of the outcomes is statistically independent.
New in version 2.2.0.
static test(dataset, featuresCol, labelCol)
Perform a Pearson’s independence test using dataset.
Parameters
• dataset – DataFrame of categorical labels and categorical features. Real-
valued features will be treated as categorical for each distinct value.
• featuresCol – Name of features column in dataset, of type 𝑉 𝑒𝑐𝑡𝑜𝑟
(𝑉 𝑒𝑐𝑡𝑜𝑟𝑈 𝐷𝑇 ).
• labelCol – Name of label column in dataset, of any numerical type.
Returns DataFrame containing the test result for every feature against the label. This
DataFrame will contain a single Row with the following fields: - 𝑝𝑉 𝑎𝑙𝑢𝑒𝑠 :
𝑉 𝑒𝑐𝑡𝑜𝑟 - 𝑑𝑒𝑔𝑟𝑒𝑒𝑠𝑂𝑓 𝐹 𝑟𝑒𝑒𝑑𝑜𝑚 : 𝐴𝑟𝑟𝑎𝑦[𝐼𝑛𝑡] - 𝑠𝑡𝑎𝑡𝑖𝑠𝑡𝑖𝑐𝑠 : 𝑉 𝑒𝑐𝑡𝑜𝑟 Each of these
fields has one value per feature.
297
Learning Apache Spark with Python
Note: Experimental
Compute the correlation matrix for the input dataset of Vectors using the specified method. Methods
currently supported: 𝑝𝑒𝑎𝑟𝑠𝑜𝑛 (default), 𝑠𝑝𝑒𝑎𝑟𝑚𝑎𝑛.
Note: For Spearman, a rank correlation, we need to create an RDD[Double] for each column and sort
it in order to retrieve the ranks and then join the columns back into an RDD[Vector], which is fairly
costly. Cache the input Dataset before calling corr with 𝑚𝑒𝑡ℎ𝑜𝑑 =′ 𝑠𝑝𝑒𝑎𝑟𝑚𝑎𝑛′ to avoid recomputing
the common lineage.
Note: Experimental
Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a continuous distribu-
tion.
By comparing the largest difference between the empirical cumulative distribution of the sample data
and the theoretical distribution we can provide a test for the the null hypothesis that the sample data
comes from that theoretical distribution.
New in version 2.4.0.
static test(dataset, sampleCol, distName, *params)
Conduct a one-sample, two-sided Kolmogorov-Smirnov test for probability distribution equal-
ity. Currently supports the normal distribution, taking as parameters the mean and standard
deviation.
Parameters
• dataset – a Dataset or a DataFrame containing the sample of data to test.
• sampleCol – Name of sample column in dataset, of any numerical type.
• distName – a 𝑠𝑡𝑟𝑖𝑛𝑔 name for a theoretical distribution, currently only sup-
port “norm”.
• params – a list of 𝐷𝑜𝑢𝑏𝑙𝑒 values specifying the parameters to be used for the
theoretical distribution. For “norm” distribution, the parameters includes mean
and variance.
Returns A DataFrame that contains the Kolmogorov-Smirnov test result for the input
sampled data. This DataFrame will contain a single Row with the following fields:
- 𝑝𝑉 𝑎𝑙𝑢𝑒 : 𝐷𝑜𝑢𝑏𝑙𝑒 - 𝑠𝑡𝑎𝑡𝑖𝑠𝑡𝑖𝑐 : 𝐷𝑜𝑢𝑏𝑙𝑒
>>> round(ksResult.pValue, 3)
1.0
>>> round(ksResult.statistic, 3)
0.175
>>> dataset = [[2.0], [3.0], [4.0]]
>>> dataset = spark.createDataFrame(dataset, ['sample'])
>>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm',
˓→3.0, 1.0).first()
>>> round(ksResult.pValue, 3)
1.0
>>> round(ksResult.statistic, 3)
0.175
Note: Experimental
Tools for vectorized statistics on MLlib Vectors. The methods in this package provide various statistics
for Vectors contained inside DataFrames. This class lets users pick the statistics they would like to
extract for a given column.
Note: Currently, the performance of this interface is about 2x~3x slower then using the RDD
interface.
New in version 2.4.0.
static min(col, weightCol=None)
return a column of min summary
New in version 2.4.0.
static normL1(col, weightCol=None)
return a column of normL1 summary
New in version 2.4.0.
static normL2(col, weightCol=None)
return a column of normL2 summary
New in version 2.4.0.
static numNonZeros(col, weightCol=None)
return a column of numNonZero summary
New in version 2.4.0.
static variance(col, weightCol=None)
return a column of variance summary
New in version 2.4.0.
class pyspark.ml.stat.SummaryBuilder(jSummaryBuilder)
Note: Experimental
class pyspark.ml.regression.AFTSurvivalRegression(featuresCol=’features’,
labelCol=’label’, pre-
dictionCol=’prediction’,
fitIntercept=True, max-
Iter=100, tol=1e-06, cen-
sorCol=’censor’, quan-
tileProbabilities=[0.01,
0.05, 0.1, 0.25, 0.5,
0.75, 0.9, 0.95, 0.99],
quantilesCol=None,
aggregationDepth=2)
Note: Experimental
setQuantilesCol(value)
Sets the value of quantilesCol.
New in version 1.6.0.
class pyspark.ml.regression.AFTSurvivalRegressionModel(java_model=None)
Note: Experimental
This generalizes the idea of “Gini” importance to other losses, following the explanation of
Gini importance from “Random Forests” documentation by Leo Breiman and Adele Cutler, and
following the implementation from scikit-learn.
This feature importance is calculated as follows:
• importance(feature j) = sum (over nodes which split on feature j) of the gain, where
gain is scaled by the number of instances passing through node
• Normalize importances for tree to sum to 1.
Note: Feature importance for single decision trees can have high variance due to correlated
predictor variables. Consider using a RandomForestRegressor to determine feature im-
portance instead.
evaluateEachIteration(dataset, loss)
Method to compute error or loss for every iteration of gradient boosting.
Parameters
• dataset – Test dataset to evaluate model on, where dataset is an instance of
pyspark.sql.DataFrame
• loss – The loss function used to compute error. Supported options: squared,
absolute
New in version 2.4.0.
featureImportances
Estimate of the importance of each feature.
Each feature’s importance is the average of its importance across all trees in the ensemble The
importance vector is normalized to sum to 1. This method is suggested by Hastie et al. (Hastie,
Tibshirani, Friedman. “The Elements of Statistical Learning, 2nd Edition.” 2001.) and follows
the implementation from scikit-learn.
See also:
DecisionTreeRegressionModel.featureImportances
New in version 2.0.0.
trees
These have null parent Estimators.
New in version 2.0.0.
Type Trees in this ensemble. Warning
class pyspark.ml.regression.GeneralizedLinearRegression(labelCol=’label’,
fea-
turesCol=’features’,
prediction-
Col=’prediction’,
fam-
ily=’gaussian’,
link=None, fit-
Intercept=True,
maxIter=25,
tol=1e-06, reg-
Param=0.0,
weight-
Col=None,
solver=’irls’,
linkPrediction-
Col=None, vari-
ancePower=0.0,
linkPower=None,
offset-
Col=None)
Note: Experimental
getVariancePower()
Gets the value of variancePower or its default value.
New in version 2.2.0.
setFamily(value)
Sets the value of family.
New in version 2.0.0.
setLink(value)
Sets the value of link.
New in version 2.0.0.
setLinkPower(value)
Sets the value of linkPower.
New in version 2.2.0.
setLinkPredictionCol(value)
Sets the value of linkPredictionCol.
New in version 2.0.0.
setOffsetCol(value)
Sets the value of offsetCol.
New in version 2.3.0.
setParams(self, labelCol="label", featuresCol="features", predictionCol="prediction",
family="gaussian", link=None, fitIntercept=True, maxIter=25, tol=1e-6, reg-
Param=0.0, weightCol=None, solver="irls", linkPredictionCol=None, varian-
cePower=0.0, linkPower=None, offsetCol=None)
Sets params for generalized linear regression.
New in version 2.0.0.
setVariancePower(value)
Sets the value of variancePower.
New in version 2.2.0.
class pyspark.ml.regression.GeneralizedLinearRegressionModel(java_model=None)
Note: Experimental
evaluate(dataset)
Evaluates the model on a test dataset.
Parameters dataset – Test dataset to evaluate model on, where dataset is an in-
stance of pyspark.sql.DataFrame
New in version 2.0.0.
hasSummary
Indicates whether a training summary exists for this model instance.
New in version 2.0.0.
intercept
Model intercept.
New in version 2.0.0.
summary
Gets summary (e.g. residuals, deviance, pValues) of model on training set. An exception is
thrown if 𝑡𝑟𝑎𝑖𝑛𝑖𝑛𝑔𝑆𝑢𝑚𝑚𝑎𝑟𝑦𝑖𝑠𝑁 𝑜𝑛𝑒.
New in version 2.0.0.
class pyspark.ml.regression.GeneralizedLinearRegressionSummary(java_obj=None)
Note: Experimental
nullDeviance
The deviance for the null model.
New in version 2.0.0.
numInstances
Number of instances in DataFrame predictions.
New in version 2.2.0.
predictionCol
Field in predictions which gives the predicted value of each instance. This is set to a new
column name if the original model’s 𝑝𝑟𝑒𝑑𝑖𝑐𝑡𝑖𝑜𝑛𝐶𝑜𝑙 is not set.
New in version 2.0.0.
predictions
Predictions output by the model’s 𝑡𝑟𝑎𝑛𝑠𝑓 𝑜𝑟𝑚 method.
New in version 2.0.0.
rank
The numeric rank of the fitted linear model.
New in version 2.0.0.
residualDegreeOfFreedom
The residual degrees of freedom.
New in version 2.0.0.
residualDegreeOfFreedomNull
The residual degrees of freedom for the null model.
New in version 2.0.0.
residuals(residualsType=’deviance’)
Get the residuals of the fitted model by type.
Parameters residualsType – The type of residuals which should be returned.
Supported options: deviance (default), pearson, working, and response.
New in version 2.0.0.
class pyspark.ml.regression.GeneralizedLinearRegressionTrainingSummary(java_obj=None)
Note: Experimental
The learning objective is to minimize the specified loss function, with regularization. This supports
two kinds of loss:
• squaredError (a.k.a squared loss)
• huber (a hybrid of squared error for relatively small errors and absolute error for relatively large
ones, and we estimate the scale parameter from training data)
This supports multiple types of regularization:
• none (a.k.a. ordinary least squares)
• L2 (ridge regression)
• L1 (Lasso)
• L2 + L1 (elastic net)
Note: Fitting with huber loss only supports none and L2 regularization.
summary
Gets summary (e.g. residuals, mse, r-squared ) of model on training set. An exception is thrown
if 𝑡𝑟𝑎𝑖𝑛𝑖𝑛𝑔𝑆𝑢𝑚𝑚𝑎𝑟𝑦𝑖𝑠𝑁 𝑜𝑛𝑒.
New in version 2.0.0.
class pyspark.ml.regression.LinearRegressionSummary(java_obj=None)
Note: Experimental
See also:
Wikipedia explain variation
Note: This ignores instance weights (setting all to 1.0) from 𝐿𝑖𝑛𝑒𝑎𝑟𝑅𝑒𝑔𝑟𝑒𝑠𝑠𝑖𝑜𝑛.𝑤𝑒𝑖𝑔ℎ𝑡𝐶𝑜𝑙.
This will change in later Spark versions.
labelCol
Field in “predictions” which gives the true label of each instance.
New in version 2.0.0.
meanAbsoluteError
Returns the mean absolute error, which is a risk function corresponding to the expected value of
the absolute error loss or l1-norm loss.
Note: This ignores instance weights (setting all to 1.0) from 𝐿𝑖𝑛𝑒𝑎𝑟𝑅𝑒𝑔𝑟𝑒𝑠𝑠𝑖𝑜𝑛.𝑤𝑒𝑖𝑔ℎ𝑡𝐶𝑜𝑙.
This will change in later Spark versions.
Note: This ignores instance weights (setting all to 1.0) from 𝐿𝑖𝑛𝑒𝑎𝑟𝑅𝑒𝑔𝑟𝑒𝑠𝑠𝑖𝑜𝑛.𝑤𝑒𝑖𝑔ℎ𝑡𝐶𝑜𝑙.
This will change in later Spark versions.
See also:
Wikipedia coefficient of determination
Note: This ignores instance weights (setting all to 1.0) from 𝐿𝑖𝑛𝑒𝑎𝑟𝑅𝑒𝑔𝑟𝑒𝑠𝑠𝑖𝑜𝑛.𝑤𝑒𝑖𝑔ℎ𝑡𝐶𝑜𝑙.
This will change in later Spark versions.
Note: This ignores instance weights (setting all to 1.0) from 𝐿𝑖𝑛𝑒𝑎𝑟𝑅𝑒𝑔𝑟𝑒𝑠𝑠𝑖𝑜𝑛.𝑤𝑒𝑖𝑔ℎ𝑡𝐶𝑜𝑙.
This will change in later Spark versions.
Note: This ignores instance weights (setting all to 1.0) from 𝐿𝑖𝑛𝑒𝑎𝑟𝑅𝑒𝑔𝑟𝑒𝑠𝑠𝑖𝑜𝑛.𝑤𝑒𝑖𝑔ℎ𝑡𝐶𝑜𝑙.
This will change in later Spark versions.
Note: Experimental
Linear regression training results. Currently, the training summary ignores the training weights except
for the objective trace.
New in version 2.0.0.
objectiveHistory
Objective function (scaled loss + regularization) at each iteration. This value is only available
when using the “l-bfgs” solver.
See also:
LinearRegression.solver
New in version 2.0.0.
totalIterations
Number of training iterations until termination. This value is only available when using the
“l-bfgs” solver.
See also:
LinearRegression.solver
New in version 2.0.0.
class pyspark.ml.regression.RandomForestRegressor(featuresCol=’features’,
labelCol=’label’, predic-
tionCol=’prediction’,
maxDepth=5,
maxBins=32, minIn-
stancesPerNode=1,
minInfoGain=0.0,
maxMemoryInMB=256,
cacheNodeIds=False,
checkpointInterval=10,
impurity=’variance’,
subsamplin-
gRate=1.0, seed=None,
numTrees=20, feature-
SubsetStrategy=’auto’)
Random Forest learning algorithm for regression. It supports both continuous and categorical features.
>>> from numpy import allclose
>>> from pyspark.ml.linalg import Vectors
>>> df = spark.createDataFrame([
... (1.0, Vectors.dense(1.0)),
... (0.0, Vectors.sparse(1, [], []))], ["label", "features"])
>>> rf = RandomForestRegressor(numTrees=2, maxDepth=2, seed=42)
>>> model = rf.fit(df)
>>> model.featureImportances
(continues on next page)
>>> model.getNumTrees
2
>>> test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], [
˓→"features"])
>>> model.transform(test1).head().prediction
0.5
>>> rfr_path = temp_path + "/rfr"
>>> rf.save(rfr_path)
>>> rf2 = RandomForestRegressor.load(rfr_path)
>>> rf2.getNumTrees()
2
>>> model_path = temp_path + "/rfr_model"
>>> model.save(model_path)
>>> model2 = RandomForestRegressionModel.load(model_path)
>>> model.featureImportances == model2.featureImportances
True
Tibshirani, Friedman. “The Elements of Statistical Learning, 2nd Edition.” 2001.) and follows
the implementation from scikit-learn.
See also:
DecisionTreeRegressionModel.featureImportances
New in version 2.0.0.
trees
These have null parent Estimators.
New in version 2.0.0.
Type Trees in this ensemble. Warning
Note: Experimental
Note: Experimental
class pyspark.ml.classification.LogisticRegression(featuresCol=’features’,
labelCol=’label’,
prediction-
Col=’prediction’,
maxIter=100, reg-
Param=0.0, elasticNet-
Param=0.0, tol=1e-06,
fitIntercept=True,
threshold=0.5, thresh-
olds=None, probabili-
tyCol=’probability’,
rawPrediction-
Col=’rawPrediction’,
standardization=True,
weightCol=None, ag-
gregationDepth=2, fam-
ily=’auto’, lowerBound-
sOnCoefficients=None,
upperBoundsOn-
Coefficients=None,
lowerBoundsOn-
Intercepts=None,
upperBoundsOnInter-
cepts=None)
Logistic regression. This class supports multinomial logistic (softmax) and binomial logistic regres-
sion.
1
threshold: 𝑡ℎ𝑟𝑒𝑠ℎ𝑜𝑙𝑑𝑠(0) . Otherwise, returns threshold if set or its default value if unset.
1+ 𝑡ℎ𝑟𝑒𝑠ℎ𝑜𝑙𝑑𝑠(1)
setThresholds(value)
Sets the value of thresholds. Clears value of threshold if it has been set.
New in version 1.5.0.
setUpperBoundsOnCoefficients(value)
Sets the value of upperBoundsOnCoefficients
New in version 2.3.0.
setUpperBoundsOnIntercepts(value)
Sets the value of upperBoundsOnIntercepts
New in version 2.3.0.
class pyspark.ml.classification.LogisticRegressionModel(java_model=None)
Model fitted by LogisticRegression.
New in version 1.3.0.
coefficientMatrix
Model coefficients.
New in version 2.1.0.
coefficients
Model coefficients of binomial logistic regression. An exception is thrown in the case of multi-
nomial logistic regression.
New in version 2.0.0.
evaluate(dataset)
Evaluates the model on a test dataset.
Parameters dataset – Test dataset to evaluate model on, where dataset is an in-
stance of pyspark.sql.DataFrame
New in version 2.0.0.
hasSummary
Indicates whether a training summary exists for this model instance.
New in version 2.0.0.
intercept
Model intercept of binomial logistic regression. An exception is thrown in the case of multino-
mial logistic regression.
New in version 1.4.0.
interceptVector
Model intercept.
New in version 2.1.0.
summary
Gets summary (e.g. accuracy/precision/recall, objective history, total iterations) of model trained
on the training set. An exception is thrown if 𝑡𝑟𝑎𝑖𝑛𝑖𝑛𝑔𝑆𝑢𝑚𝑚𝑎𝑟𝑦𝑖𝑠𝑁 𝑜𝑛𝑒.
Note: Experimental
predictions
Dataframe outputted by the model’s 𝑡𝑟𝑎𝑛𝑠𝑓 𝑜𝑟𝑚 method.
New in version 2.0.0.
probabilityCol
Field in “predictions” which gives the probability of each class as a vector.
New in version 2.0.0.
recallByLabel
Returns recall for each label (category).
New in version 2.3.0.
truePositiveRateByLabel
Returns true positive rate for each label (category).
New in version 2.3.0.
weightedFMeasure(beta=1.0)
Returns weighted averaged f-measure.
New in version 2.3.0.
weightedFalsePositiveRate
Returns weighted false positive rate.
New in version 2.3.0.
weightedPrecision
Returns weighted averaged precision.
New in version 2.3.0.
weightedRecall
Returns weighted averaged recall. (equals to precision, recall and f-measure)
New in version 2.3.0.
weightedTruePositiveRate
Returns weighted true positive rate. (equals to precision, recall and f-measure)
New in version 2.3.0.
class pyspark.ml.classification.LogisticRegressionTrainingSummary(java_obj=None)
Note: Experimental
Abstraction for multinomial Logistic Regression Training results. Currently, the training summary
ignores the training weights except for the objective trace.
New in version 2.0.0.
objectiveHistory
Objective function (scaled loss + regularization) at each iteration.
Note: Experimental
Note: This ignores instance weights (setting all to 1.0) from 𝐿𝑜𝑔𝑖𝑠𝑡𝑖𝑐𝑅𝑒𝑔𝑟𝑒𝑠𝑠𝑖𝑜𝑛.𝑤𝑒𝑖𝑔ℎ𝑡𝐶𝑜𝑙.
This will change in later Spark versions.
Note: This ignores instance weights (setting all to 1.0) from 𝐿𝑜𝑔𝑖𝑠𝑡𝑖𝑐𝑅𝑒𝑔𝑟𝑒𝑠𝑠𝑖𝑜𝑛.𝑤𝑒𝑖𝑔ℎ𝑡𝐶𝑜𝑙.
This will change in later Spark versions.
Note: This ignores instance weights (setting all to 1.0) from 𝐿𝑜𝑔𝑖𝑠𝑡𝑖𝑐𝑅𝑒𝑔𝑟𝑒𝑠𝑠𝑖𝑜𝑛.𝑤𝑒𝑖𝑔ℎ𝑡𝐶𝑜𝑙.
This will change in later Spark versions.
Note: This ignores instance weights (setting all to 1.0) from 𝐿𝑜𝑔𝑖𝑠𝑡𝑖𝑐𝑅𝑒𝑔𝑟𝑒𝑠𝑠𝑖𝑜𝑛.𝑤𝑒𝑖𝑔ℎ𝑡𝐶𝑜𝑙.
Note: This ignores instance weights (setting all to 1.0) from 𝐿𝑜𝑔𝑖𝑠𝑡𝑖𝑐𝑅𝑒𝑔𝑟𝑒𝑠𝑠𝑖𝑜𝑛.𝑤𝑒𝑖𝑔ℎ𝑡𝐶𝑜𝑙.
This will change in later Spark versions.
Note: This ignores instance weights (setting all to 1.0) from 𝐿𝑜𝑔𝑖𝑠𝑡𝑖𝑐𝑅𝑒𝑔𝑟𝑒𝑠𝑠𝑖𝑜𝑛.𝑤𝑒𝑖𝑔ℎ𝑡𝐶𝑜𝑙.
This will change in later Spark versions.
Note: Experimental
class pyspark.ml.classification.DecisionTreeClassifier(featuresCol=’features’,
labelCol=’label’,
prediction-
Col=’prediction’,
probability-
Col=’probability’,
rawPrediction-
Col=’rawPrediction’,
maxDepth=5,
maxBins=32,
minInstances-
PerNode=1,
minInfoGain=0.0,
maxMemory-
InMB=256,
cacheN-
odeIds=False,
checkpointIn-
terval=10, im-
purity=’gini’,
seed=None)
Decision tree learning algorithm for classification. It supports both binary and multiclass labels, as
well as both continuous and categorical features.
Note: Feature importance for single decision trees can have high variance due to correlated
predictor variables. Consider using a RandomForestClassifier to determine feature im-
portance instead.
class pyspark.ml.classification.RandomForestClassifier(featuresCol=’features’,
labelCol=’label’,
prediction-
Col=’prediction’,
probability-
Col=’probability’,
rawPrediction-
Col=’rawPrediction’,
maxDepth=5,
maxBins=32,
minInstances-
PerNode=1,
minInfoGain=0.0,
maxMemory-
InMB=256,
cacheN-
odeIds=False,
checkpointIn-
terval=10, im-
purity=’gini’,
numTrees=20,
featureSubset-
Strategy=’auto’,
seed=None,
subsamplin-
gRate=1.0)
Random Forest learning algorithm for classification. It supports both binary and multiclass labels, as
well as both continuous and categorical features.
>>> import numpy
>>> from numpy import allclose
>>> from pyspark.ml.linalg import Vectors
>>> from pyspark.ml.feature import StringIndexer
>>> df = spark.createDataFrame([
... (1.0, Vectors.dense(1.0)),
... (0.0, Vectors.sparse(1, [], []))], ["label", "features"])
>>> stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
>>> si_model = stringIndexer.fit(df)
>>> td = si_model.transform(df)
>>> rf = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol="indexed
˓→", seed=42)
>>> model = rf.fit(td)
>>> model.featureImportances
SparseVector(1, {0: 1.0})
>>> allclose(model.treeWeights, [1.0, 1.0, 1.0])
True
>>> test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
>>> result = model.transform(test0).head()
>>> result.prediction
0.0
(continues on next page)
DecisionTreeClassificationModel.featureImportances
New in version 2.0.0.
trees
These have null parent Estimators.
New in version 2.0.0.
Type Trees in this ensemble. Warning
class pyspark.ml.classification.NaiveBayes(featuresCol=’features’, la-
belCol=’label’, prediction-
Col=’prediction’, probability-
Col=’probability’, rawPrediction-
Col=’rawPrediction’, smooth-
ing=1.0, modelType=’multinomial’,
thresholds=None, weight-
Col=None)
Naive Bayes Classifiers. It supports both Multinomial and Bernoulli NB. Multinomial NB can handle
finitely supported discrete data. For example, by converting documents into TF-IDF vectors, it can be
used for document classification. By making every vector a binary (0/1) data, it can also be used as
Bernoulli NB. The input feature values must be nonnegative.
setBlockSize(value)
Sets the value of blockSize.
New in version 1.6.0.
setInitialWeights(value)
Sets the value of initialWeights.
New in version 2.0.0.
setLayers(value)
Sets the value of layers.
New in version 1.6.0.
setParams(featuresCol=’features’, labelCol=’label’, predictionCol=’prediction’, max-
Iter=100, tol=1e-06, seed=None, layers=None, blockSize=128, stepSize=0.03,
solver=’l-bfgs’, initialWeights=None, probabilityCol=’probability’, rawPredic-
tionCol=’rawPrediction’)
setParams(self, featuresCol=”features”, labelCol=”label”, predictionCol=”prediction”, max-
Iter=100, tol=1e-6, seed=None, layers=None, blockSize=128, stepSize=0.03, solver=”l-bfgs”,
initialWeights=None, probabilityCol=”probability”, rawPredictionCol=”rawPrediction”): Sets
params for MultilayerPerceptronClassifier.
New in version 1.6.0.
setStepSize(value)
Sets the value of stepSize.
New in version 2.0.0.
class pyspark.ml.classification.MultilayerPerceptronClassificationModel(java_model=No
Model fitted by MultilayerPerceptronClassifier.
New in version 1.6.0.
layers
array of layer sizes including input and output layers.
New in version 1.6.0.
weights
the weights of layers.
New in version 2.0.0.
class pyspark.ml.classification.OneVsRest(featuresCol=’features’, la-
belCol=’label’, prediction-
Col=’prediction’, classifier=None,
weightCol=None, parallelism=1)
Note: Experimental
Reduction of Multiclass Classification to Binary Classification. Performs reduction using one against
all strategy. For a multiclass classification with k classes, train k models (one per class). Each example
is scored against all k models and the model with highest score is picked to label the example.
>>> model.transform(test0).head().prediction
0.0
>>> test1 = sc.parallelize([Row(features=Vectors.sparse(4, [0], [1.
˓→0]))]).toDF()
>>> model.transform(test1).head().prediction
2.0
>>> test2 = sc.parallelize([Row(features=Vectors.dense(0.5, 0.4, 0.3, 0.
˓→2))]).toDF()
>>> model.transform(test2).head().prediction
0.0
>>> model_path = temp_path + "/ovr_model"
>>> model.save(model_path)
>>> model2 = OneVsRestModel.load(model_path)
>>> model2.transform(test0).head().prediction
0.0
Note: Experimental
Model fitted by OneVsRest. This stores the models resulting from training k binary classifiers: one
for each class. Each example is scored against all k models, and the model with the highest score is
picked to label the example.
New in version 2.0.0.
copy(extra=None)
Creates a copy of this instance with a randomly generated uid and some extra params. This
creates a deep copy of the embedded paramMap, and copies the embedded and extra parameters
over.
Parameters extra – Extra parameters to copy to the new instance
Returns Copy of this instance
New in version 2.0.0.
Note: Experimental
et al).
getInitSteps()
Gets the value of 𝑖𝑛𝑖𝑡𝑆𝑡𝑒𝑝𝑠
New in version 1.5.0.
getK()
Gets the value of 𝑘
New in version 1.5.0.
setDistanceMeasure(value)
Sets the value of distanceMeasure.
New in version 2.4.0.
setInitMode(value)
Sets the value of initMode.
New in version 1.5.0.
setInitSteps(value)
Sets the value of initSteps.
New in version 1.5.0.
setK(value)
Sets the value of k.
New in version 1.5.0.
setParams(self, featuresCol="features", predictionCol="prediction", k=2, initMode="k-
means||", initSteps=2, tol=1e-4, maxIter=20, seed=None, distanceMea-
sure="euclidean")
Sets params for KMeans.
New in version 1.5.0.
class pyspark.ml.clustering.KMeansModel(java_model=None)
Model fitted by KMeans.
New in version 1.5.0.
clusterCenters()
Get the cluster centers, represented as a list of NumPy arrays.
New in version 1.5.0.
computeCost(dataset)
Return the K-means cost (sum of squared distances of points to their nearest center) for this
model on the given data.
..note:: Deprecated in 2.4.0. It will be removed in 3.0.0. Use ClusteringEvaluator instead.
You can also get the cost on the training dataset in the summary.
New in version 2.0.0.
hasSummary
Indicates whether a training summary exists for this model instance.
Note: For high-dimensional data (with many features), this algorithm may perform poorly. This is
due to high-dimensional data (a) making it difficult to cluster at all (based on statistical/theoretical
arguments) and (b) numerical issues with Gaussian distributions.
hasSummary
Indicates whether a training summary exists for this model instance.
New in version 2.1.0.
summary
Gets summary (e.g. cluster assignments, cluster sizes) of the model trained on the training set.
An exception is thrown if no summary exists.
New in version 2.1.0.
weights
Weight for each Gaussian distribution in the mixture. This is a multinomial probability distri-
bution over the k Gaussians, where weights[i] is the weight for Gaussian i, and weights sum to
1.
New in version 2.0.0.
class pyspark.ml.clustering.GaussianMixtureSummary(java_obj=None)
Note: Experimental
Original LDA paper (journal version): Blei, Ng, and Jordan. “Latent Dirichlet Allocation.” JMLR,
2003.
Input data (featuresCol): LDA is given a collection of documents as input data, via the featuresCol
parameter. Each document is specified as a Vector of length vocabSize, where each entry is the
count for the corresponding term (word) in the document. Feature transformers such as pyspark.
ml.feature.Tokenizer and pyspark.ml.feature.CountVectorizer can be useful
for converting text to word count vectors.
getK()
Gets the value of k or its default value.
New in version 2.0.0.
getKeepLastCheckpoint()
Gets the value of keepLastCheckpoint or its default value.
New in version 2.0.0.
getLearningDecay()
Gets the value of learningDecay or its default value.
New in version 2.0.0.
getLearningOffset()
Gets the value of learningOffset or its default value.
New in version 2.0.0.
getOptimizeDocConcentration()
Gets the value of optimizeDocConcentration or its default value.
New in version 2.0.0.
getOptimizer()
Gets the value of optimizer or its default value.
New in version 2.0.0.
getSubsamplingRate()
Gets the value of subsamplingRate or its default value.
New in version 2.0.0.
getTopicConcentration()
Gets the value of topicConcentration or its default value.
New in version 2.0.0.
getTopicDistributionCol()
Gets the value of topicDistributionCol or its default value.
New in version 2.0.0.
setDocConcentration(value)
Sets the value of docConcentration.
isDistributed()
Indicates whether this instance is of type DistributedLDAModel
New in version 2.0.0.
logLikelihood(dataset)
Calculates a lower bound on the log likelihood of the entire corpus. See Equation (16) in the
Online LDA paper (Hoffman et al., 2010).
WARNING: If this model is an instance of DistributedLDAModel (produced when
optimizer is set to “em”), this involves collecting a large topicsMatrix() to the driver.
This implementation may be changed in the future.
New in version 2.0.0.
logPerplexity(dataset)
Calculate an upper bound on perplexity. (Lower is better.) See Equation (16) in the Online LDA
paper (Hoffman et al., 2010).
WARNING: If this model is an instance of DistributedLDAModel (produced when
optimizer is set to “em”), this involves collecting a large topicsMatrix() to the driver.
This implementation may be changed in the future.
New in version 2.0.0.
topicsMatrix()
Inferred topics, where each topic is represented by a distribution over terms. This is a matrix of
size vocabSize x k, where each column is a topic. No guarantees are given about the ordering of
the topics.
WARNING: If this model is actually a DistributedLDAModel instance produced by the
Expectation-Maximization (“em”) 𝑜𝑝𝑡𝑖𝑚𝑖𝑧𝑒𝑟, then this method could involve collecting a large
amount of data to the driver (on the order of vocabSize x k).
New in version 2.0.0.
vocabSize()
Vocabulary size (number of terms or words in the vocabulary)
New in version 2.0.0.
class pyspark.ml.clustering.LocalLDAModel(java_model=None)
Local (non-distributed) model fitted by LDA. This model stores the inferred topics only; it does not
store info about the training dataset.
New in version 2.0.0.
class pyspark.ml.clustering.DistributedLDAModel(java_model=None)
Distributed model fitted by LDA. This type of model is currently only produced by Expectation-
Maximization (EM).
This model stores the inferred topics, the full training dataset, and the topic distribution for each
training document.
New in version 2.0.0.
getCheckpointFiles()
If using checkpointing and LDA.keepLastCheckpoint is set to true, then there may be
saved checkpoint files. This method is provided so that users can manage those files.
Note: Removing the checkpoints can cause failures if a partition is lost and is needed by certain
DistributedLDAModel methods. Reference counting will clean up the checkpoints when
this model and derivative data go out of scope.
Note: Experimental
Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by Lin and Cohen.
From the abstract: PIC finds a very low-dimensional embedding of a dataset using truncated power
iteration on a normalized pair-wise similarity matrix of the data.
This class is not yet an Estimator/Transformer, use assignClusters() method to run the PowerIt-
erationClustering algorithm.
See also:
Wikipedia on Spectral clustering
True
getFinalStorageLevel()
Gets the value of finalStorageLevel or its default value.
New in version 2.0.0.
getImplicitPrefs()
Gets the value of implicitPrefs or its default value.
New in version 1.4.0.
getIntermediateStorageLevel()
Gets the value of intermediateStorageLevel or its default value.
New in version 2.0.0.
getItemCol()
Gets the value of itemCol or its default value.
New in version 1.4.0.
getNonnegative()
Gets the value of nonnegative or its default value.
New in version 1.4.0.
getNumItemBlocks()
Gets the value of numItemBlocks or its default value.
New in version 1.4.0.
getNumUserBlocks()
Gets the value of numUserBlocks or its default value.
New in version 1.4.0.
getRank()
Gets the value of rank or its default value.
New in version 1.4.0.
getRatingCol()
Gets the value of ratingCol or its default value.
New in version 1.4.0.
getUserCol()
Gets the value of userCol or its default value.
New in version 1.4.0.
setAlpha(value)
Sets the value of alpha.
New in version 1.4.0.
setColdStartStrategy(value)
Sets the value of coldStartStrategy.
New in version 2.2.0.
setFinalStorageLevel(value)
Sets the value of finalStorageLevel.
New in version 2.0.0.
setImplicitPrefs(value)
Sets the value of implicitPrefs.
New in version 1.4.0.
setIntermediateStorageLevel(value)
Sets the value of intermediateStorageLevel.
New in version 2.0.0.
setItemCol(value)
Sets the value of itemCol.
New in version 1.4.0.
setNonnegative(value)
Sets the value of nonnegative.
New in version 1.4.0.
setNumBlocks(value)
Sets both numUserBlocks and numItemBlocks to the specific value.
New in version 1.4.0.
setNumItemBlocks(value)
Sets the value of numItemBlocks.
New in version 1.4.0.
setNumUserBlocks(value)
Sets the value of numUserBlocks.
New in version 1.4.0.
setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItem-
Blocks=10, implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item",
seed=None, ratingCol="rating", nonnegative=False, checkpointInter-
val=10, intermediateStorageLevel="MEMORY_AND_DISK", finalStor-
ageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
Sets params for ALS.
New in version 1.4.0.
setRank(value)
Sets the value of rank.
New in version 1.4.0.
setRatingCol(value)
Sets the value of ratingCol.
New in version 1.4.0.
setUserCol(value)
Sets the value of userCol.
New in version 1.4.0.
class pyspark.ml.recommendation.ALSModel(java_model=None)
Model fitted by ALS.
New in version 1.4.0.
itemFactors
𝑖𝑑 and 𝑓 𝑒𝑎𝑡𝑢𝑟𝑒𝑠
New in version 1.4.0.
Type a DataFrame that stores item factors in two columns
rank
rank of the matrix factorization model
New in version 1.4.0.
recommendForAllItems(numUsers)
Returns top 𝑛𝑢𝑚𝑈 𝑠𝑒𝑟𝑠 users recommended for each item, for all items.
Parameters numUsers – max number of recommendations for each item
Returns a DataFrame of (itemCol, recommendations), where recommendations are
stored as an array of (userCol, rating) Rows.
New in version 2.2.0.
recommendForAllUsers(numItems)
Returns top 𝑛𝑢𝑚𝐼𝑡𝑒𝑚𝑠 items recommended for each user, for all users.
Parameters numItems – max number of recommendations for each user
Returns a DataFrame of (userCol, recommendations), where recommendations are
stored as an array of (itemCol, rating) Rows.
New in version 2.2.0.
recommendForItemSubset(dataset, numUsers)
Returns top 𝑛𝑢𝑚𝑈 𝑠𝑒𝑟𝑠 users recommended for each item id in the input data set. Note that if
there are duplicate ids in the input dataset, only one set of recommendations per unique id will
be returned.
Parameters
• dataset – a Dataset containing a column of item ids. The column name must
match 𝑖𝑡𝑒𝑚𝐶𝑜𝑙.
• numUsers – max number of recommendations for each item
Returns a DataFrame of (itemCol, recommendations), where recommendations are
stored as an array of (userCol, rating) Rows.
New in version 2.3.0.
recommendForUserSubset(dataset, numItems)
Returns top 𝑛𝑢𝑚𝐼𝑡𝑒𝑚𝑠 items recommended for each user id in the input data set. Note that if
there are duplicate ids in the input dataset, only one set of recommendations per unique id will
be returned.
Parameters
• dataset – a Dataset containing a column of user ids. The column name must
match 𝑢𝑠𝑒𝑟𝐶𝑜𝑙.
• numItems – max number of recommendations for each user
Returns a DataFrame of (userCol, recommendations), where recommendations are
stored as an array of (itemCol, rating) Rows.
New in version 2.3.0.
userFactors
𝑖𝑑 and 𝑓 𝑒𝑎𝑡𝑢𝑟𝑒𝑠
New in version 1.4.0.
Type a DataFrame that stores user factors in two columns
class pyspark.ml.pipeline.Pipeline(stages=None)
A simple pipeline, which acts as an estimator. A Pipeline consists of a sequence of stages, each of
which is either an Estimator or a Transformer. When Pipeline.fit() is called, the stages
are executed in order. If a stage is an Estimator, its Estimator.fit() method will be called
on the input dataset to fit a model. Then the model, which is a transformer, will be used to trans-
form the dataset as the input to the next stage. If a stage is a Transformer, its Transformer.
transform() method will be called to produce the dataset for the next stage. The fitted model
from a Pipeline is a PipelineModel, which consists of fitted models and transformers, corre-
sponding to the pipeline stages. If stages is an empty list, the pipeline acts as an identity transformer.
New in version 1.3.0.
copy(extra=None)
Creates a copy of this instance.
Parameters extra – extra parameters
Returns new instance
New in version 1.4.0.
getStages()
Get pipeline stages.
New in version 1.3.0.
classmethod read()
Returns an MLReader instance for this class.
class pyspark.ml.pipeline.PipelineReader(cls)
(Private) Specialization of MLReader for Pipeline types
load(path)
Load the ML instance from the input path.
class pyspark.ml.pipeline.PipelineSharedReadWrite
Note: DeveloperApi
Functions for MLReader and MLWriter shared between Pipeline and PipelineModel
New in version 2.3.0.
static getStagePath(stageUid, stageIdx, numStages, stagesDir)
Get path for saving the given stage.
static load(metadata, sc, path)
Load metadata and stages for a Pipeline or PipelineModel
Returns (UID, list of stages)
static saveImpl(instance, stages, sc, path)
Save metadata and stages for a Pipeline or PipelineModel - save metadata to
path/metadata - save stages to stages/IDX_UID
static validateStages(stages)
Check that all stages are Writable
class pyspark.ml.pipeline.PipelineWriter(instance)
(Private) Specialization of MLWriter for Pipeline types
saveImpl(path)
save() handles overwriting and then calls this method. Subclasses should override this method
to implement the actual saving of the instance.
class pyspark.ml.tuning.ParamGridBuilder
Builder for a param grid used in grid search-based model selection.
>>> from pyspark.ml.classification import LogisticRegression
>>> lr = LogisticRegression()
>>> output = ParamGridBuilder() \
... .baseOn({lr.labelCol: 'l'}) \
... .baseOn([lr.predictionCol, 'p']) \
... .addGrid(lr.regParam, [1.0, 2.0]) \
... .addGrid(lr.maxIter, [1, 5]) \
... .build()
>>> expected = [
(continues on next page)
Note: Experimental
Validation for hyper-parameter tuning. Randomly splits the input dataset into train and valida-
tion sets, and uses evaluation metric on the validation set to select the best model. Similar to
CrossValidator, but only splits the set once.
>>> from pyspark.ml.classification import LogisticRegression
>>> from pyspark.ml.evaluation import BinaryClassificationEvaluator
>>> from pyspark.ml.linalg import Vectors
>>> dataset = spark.createDataFrame(
... [(Vectors.dense([0.0]), 0.0),
... (Vectors.dense([0.4]), 1.0),
... (Vectors.dense([0.5]), 0.0),
... (Vectors.dense([0.6]), 1.0),
(continues on next page)
Note: Experimental
class pyspark.ml.evaluation.Evaluator
Base class for evaluators that compute metrics from predictions.
New in version 1.4.0.
evaluate(dataset, params=None)
Evaluates the output with optional parameters.
Parameters
Note: Experimental
Evaluator for binary classification, which expects two input columns: rawPrediction and label. The
rawPrediction column can be of type double (binary 0/1 prediction, or probability of label 1) or of
type vector (length-2 vector of raw predictions, scores, or label probabilities).
setMetricName(value)
Sets the value of metricName.
New in version 1.4.0.
setParams(self, rawPredictionCol="rawPrediction", labelCol="label", metric-
Name="areaUnderROC")
Sets params for binary classification evaluator.
New in version 1.4.0.
class pyspark.ml.evaluation.RegressionEvaluator(predictionCol=’prediction’,
labelCol=’label’, metric-
Name=’rmse’)
Note: Experimental
Evaluator for Regression, which expects two input columns: prediction and label.
class pyspark.ml.evaluation.MulticlassClassificationEvaluator(predictionCol=’prediction’,
label-
Col=’label’,
metric-
Name=’f1’)
Note: Experimental
Evaluator for Multiclass Classification, which expects two input columns: prediction and label.
Note: Experimental
Evaluator for Clustering results, which expects two input columns: prediction and features. The
metric computes the Silhouette measure using the squared Euclidean distance.
The Silhouette is a measure for the validation of the consistency within clusters. It ranges between 1
and -1, where a value close to 1 means that the points in a cluster are close to the other points in the
same cluster and far from the points of the other clusters.
TWENTYTWO
MAIN REFERENCE
381
Learning Apache Spark with Python
383
Learning Apache Spark with Python
384 Bibliography
PYTHON MODULE INDEX
p
pyspark.ml.classification, 324
pyspark.ml.clustering, 347
pyspark.ml.evaluation, 375
pyspark.ml.pipeline, 368
pyspark.ml.recommendation, 363
pyspark.ml.regression, 303
pyspark.ml.stat, 297
pyspark.ml.tuning, 370
385
Learning Apache Spark with Python
A BinaryLogisticRegressionTrainingSummary
(class in pyspark.ml.classification), 333
accuracy (pyspark.ml.classification.LogisticRegressionSummary
attribute), 330 BisectingKMeans (class in pys-
addGrid() (pyspark.ml.tuning.ParamGridBuilder park.ml.clustering), 347
method), 371 BisectingKMeansModel (class in pys-
AFTSurvivalRegression (class in pys- park.ml.clustering), 349
park.ml.regression), 303 BisectingKMeansSummary (class in pys-
AFTSurvivalRegressionModel (class in pys- park.ml.clustering), 349
park.ml.regression), 305 boundaries (pys-
park.ml.regression.IsotonicRegressionModel
aic (pyspark.ml.regression.GeneralizedLinearRegressionSummary
attribute), 313 attribute), 316
ALS (class in pyspark.ml.recommendation), 363 build() (pyspark.ml.tuning.ParamGridBuilder
ALSModel (class in pyspark.ml.recommendation), method), 371
367
areaUnderROC (pys-
C
ChiSquareTest (class in pyspark.ml.stat), 297
park.ml.classification.BinaryLogisticRegressionSummary
attribute), 332 clusterCenters() (pys-
assignClusters() (pys- park.ml.clustering.BisectingKMeansModel
park.ml.clustering.PowerIterationClustering method), 349
method), 361 clusterCenters() (pys-
avgMetrics (pys- park.ml.clustering.KMeansModel method),
park.ml.tuning.CrossValidatorModel 351
attribute), 373 ClusteringEvaluator (class in pys-
park.ml.evaluation), 378
B coefficientMatrix (pys-
baseOn() (pyspark.ml.tuning.ParamGridBuilder park.ml.classification.LogisticRegressionModel
method), 371 attribute), 329
bestModel (pys- coefficients (pys-
park.ml.tuning.CrossValidatorModel park.ml.classification.LinearSVCModel
attribute), 373 attribute), 325
bestModel (pys- coefficients (pys-
park.ml.tuning.TrainValidationSplitModel park.ml.classification.LogisticRegressionModel
attribute), 375 attribute), 329
BinaryClassificationEvaluator (class in coefficients (pys-
pyspark.ml.evaluation), 376 park.ml.regression.AFTSurvivalRegressionModel
BinaryLogisticRegressionSummary attribute), 305
(class in pyspark.ml.classification), 332 coefficients (pys-
387
Learning Apache Spark with Python
388 Index
Learning Apache Spark with Python
Index 389
Learning Apache Spark with Python
390 Index
Learning Apache Spark with Python
Index 391
Learning Apache Spark with Python
392 Index
Learning Apache Spark with Python
Index 393
Learning Apache Spark with Python
394 Index
Learning Apache Spark with Python
S method), 312
saveImpl() (pys- setFeatureIndex() (pys-
park.ml.pipeline.PipelineModelWriter park.ml.regression.IsotonicRegression
method), 369 method), 316
saveImpl() (pys- setFeatureSubsetStrategy() (pys-
park.ml.pipeline.PipelineSharedReadWrite park.ml.classification.GBTClassifier
static method), 370 method), 337
saveImpl() (pyspark.ml.pipeline.PipelineWriter setFeatureSubsetStrategy() (pys-
method), 370 park.ml.classification.RandomForestClassifier
scale (pyspark.ml.regression.AFTSurvivalRegressionModel method), 340
attribute), 305 setFeatureSubsetStrategy() (pys-
scale (pyspark.ml.regression.LinearRegressionModel park.ml.regression.GBTRegressor method),
attribute), 318 308
Set up Spark on Cloud, 20 setFeatureSubsetStrategy() (pys-
setAlpha() (pyspark.ml.recommendation.ALS park.ml.regression.RandomForestRegressor
method), 365 method), 323
setBlockSize() (pys- setFinalStorageLevel() (pys-
park.ml.classification.MultilayerPerceptronClassifier park.ml.recommendation.ALS method),
method), 344 365
setCensorCol() (pys- setImplicitPrefs() (pys-
park.ml.regression.AFTSurvivalRegression park.ml.recommendation.ALS method),
method), 304 366
setColdStartStrategy() (pys- setInitialWeights() (pys-
park.ml.recommendation.ALS method), park.ml.classification.MultilayerPerceptronClassifier
365 method), 345
setDistanceMeasure() (pys- setInitMode() (pyspark.ml.clustering.KMeans
park.ml.clustering.BisectingKMeans method), 351
method), 348 setInitMode() (pys-
setDistanceMeasure() (pys- park.ml.clustering.PowerIterationClustering
park.ml.clustering.KMeans method), method), 362
351 setInitSteps() (pys-
setDistanceMeasure() (pys- park.ml.clustering.KMeans method),
park.ml.evaluation.ClusteringEvaluator 351
method), 379 setIntermediateStorageLevel() (pys-
setDocConcentration() (pys- park.ml.recommendation.ALS method),
park.ml.clustering.LDA method), 356 366
setDstCol() (pys- setIsotonic() (pys-
park.ml.clustering.PowerIterationClustering park.ml.regression.IsotonicRegression
method), 362 method), 316
setEpsilon() (pys- setItemCol() (pyspark.ml.recommendation.ALS
park.ml.regression.LinearRegression method), 366
method), 318 setK() (pyspark.ml.clustering.BisectingKMeans
setFamily() (pys- method), 348
park.ml.classification.LogisticRegression setK() (pyspark.ml.clustering.GaussianMixture
method), 328 method), 353
setFamily() (pys- setK() (pyspark.ml.clustering.KMeans method),
park.ml.regression.GeneralizedLinearRegression 351
setK() (pyspark.ml.clustering.LDA method), 356
Index 395
Learning Apache Spark with Python
396 Index
Learning Apache Spark with Python
Index 397
Learning Apache Spark with Python
398 Index
Learning Apache Spark with Python
weightedFMeasure() (pys-
park.ml.classification.LogisticRegressionSummary
method), 331
weightedPrecision (pys-
park.ml.classification.LogisticRegressionSummary
attribute), 331
weightedRecall (pys-
park.ml.classification.LogisticRegressionSummary
attribute), 331
weightedTruePositiveRate (pys-
park.ml.classification.LogisticRegressionSummary
attribute), 331
weights (pyspark.ml.classification.MultilayerPerceptronClassificationModel
attribute), 345
weights (pyspark.ml.clustering.GaussianMixtureModel
attribute), 354
write() (pyspark.ml.pipeline.Pipeline method),
369
write() (pyspark.ml.pipeline.PipelineModel
method), 369
write() (pyspark.ml.tuning.CrossValidator
method), 372
write() (pyspark.ml.tuning.CrossValidatorModel
method), 373
write() (pyspark.ml.tuning.TrainValidationSplit
method), 374
write() (pyspark.ml.tuning.TrainValidationSplitModel
method), 375
Index 399