- Section 1: To finish in the lab session on 24th Feb. Essential
- Section 2: To finish in the lab session on 24tw Feb. Essential
- Section 3: To finish by the following Tuesday 1st March. Exercise
- Section 4: To explore further. Optional
In this notebook we will explore the classes in PySpark that implement decision trees, random forests and gradient-boosted trees. We will study both classification and regression. The implementations for random forests and gradient-boosted trees heavliy rely on the implementations for decision trees.
There are several challenges when implementing decision trees in a distributed setting, particularly when we want to use commodity hardware. A very popular implementation is known as PLANET: Massively Parallel Learning of Tree Ensembles with MapReduce. PLANET allows an efficient implementation of decision trees at large scale purely using map() and reduce() operations, suitable for a Hadoop cluster. Although the Decision Tree implementation in Apache Spark borrows some of the ideas from PLANET, it also introduces additional tricks that exploit the well-known advantages of Apache Spark, e.g. in memory computing. The Apache Spark implementation of the DecisionTree classifier may not be as flexible as the scikit-learn one (bear in mind they were designed under a different sets of restrictions), but it still allows the use of such a powerful machine learning model at large scale.
You can find more technical details on the implementation of Decision Trees in Apache Spark in the youtube video Scalable Decision Trees in Spark MLlib by Manish Amde and the youtube video Decision Trees on Spark by Joseph Bradley. These technical details are also reviewed in a blog post on decision trees and another blog post on random forests.
You only need to install matplotlib and pandas in your environment once.
In this lab, we will explore the performance of Decision Trees on the datasets we already used in the Notebook for Logistic Regression for Classification, Lab 3.
We now load the dataset and load the names of the features and label that we will use to create the schema for the dataframe. We also cache the dataframe since we are going to perform several operations to rawdata inside a loop.
import numpy as np
rawdata = spark.read.csv('./Data/spambase.data')
rawdata.cache()
ncolumns = len(rawdata.columns)
spam_names = [spam_names.rstrip('\n') for spam_names in open('./Data/spambase.data.names')]
number_names = np.shape(spam_names)[0]
for i in range(number_names):
local = spam_names[i]
colon_pos = local.find(':')
spam_names[i] = local[:colon_pos]
We use the withColumnRenamed method for the dataframe to rename the columns using the more familiar names for the features.
schemaNames = rawdata.schema.names
spam_names[ncolumns-1] = 'labels'
for i in range(ncolumns):
rawdata = rawdata.withColumnRenamed(schemaNames[i], spam_names[i])
Perhaps one of the most important operations when doing data analytics in Apache Spark consists in preprocessing the dataset so that it can be analysed using the MLlib package. In the case of supervised learning, classification or regression, we want the data into a column of type Double
for the label and a column of type SparseVector
or DenseVector
for the features. In turn, to get this representation of the features as a vector, we first need to transform the individual features to type Double
.
Let us see first what is the type of the original features after reading the file.
rawdata.printSchema()
root
|-- word_freq_make: string (nullable = true)
|-- word_freq_address: string (nullable = true)
|-- word_freq_all: string (nullable = true)
|-- word_freq_3d: string (nullable = true)
|-- word_freq_our: string (nullable = true)
|-- word_freq_over: string (nullable = true)
|-- word_freq_remove: string (nullable = true)
|-- word_freq_internet: string (nullable = true)
|-- word_freq_order: string (nullable = true)
|-- word_freq_mail: string (nullable = true)
|-- word_freq_receive: string (nullable = true)
|-- word_freq_will: string (nullable = true)
|-- word_freq_people: string (nullable = true)
|-- word_freq_report: string (nullable = true)
|-- word_freq_addresses: string (nullable = true)
|-- word_freq_free: string (nullable = true)
|-- word_freq_business: string (nullable = true)
|-- word_freq_email: string (nullable = true)
|-- word_freq_you: string (nullable = true)
|-- word_freq_credit: string (nullable = true)
|-- word_freq_your: string (nullable = true)
|-- word_freq_font: string (nullable = true)
|-- word_freq_000: string (nullable = true)
|-- word_freq_money: string (nullable = true)
|-- word_freq_hp: string (nullable = true)
|-- word_freq_hpl: string (nullable = true)
|-- word_freq_george: string (nullable = true)
|-- word_freq_650: string (nullable = true)
|-- word_freq_lab: string (nullable = true)
|-- word_freq_labs: string (nullable = true)
|-- word_freq_telnet: string (nullable = true)
|-- word_freq_857: string (nullable = true)
|-- word_freq_data: string (nullable = true)
|-- word_freq_415: string (nullable = true)
|-- word_freq_85: string (nullable = true)
|-- word_freq_technology: string (nullable = true)
|-- word_freq_1999: string (nullable = true)
|-- word_freq_parts: string (nullable = true)
|-- word_freq_pm: string (nullable = true)
|-- word_freq_direct: string (nullable = true)
|-- word_freq_cs: string (nullable = true)
|-- word_freq_meeting: string (nullable = true)
|-- word_freq_original: string (nullable = true)
|-- word_freq_project: string (nullable = true)
|-- word_freq_re: string (nullable = true)
|-- word_freq_edu: string (nullable = true)
|-- word_freq_table: string (nullable = true)
|-- word_freq_conference: string (nullable = true)
|-- char_freq_;: string (nullable = true)
|-- char_freq_(: string (nullable = true)
|-- char_freq_[: string (nullable = true)
|-- char_freq_!: string (nullable = true)
|-- char_freq_$: string (nullable = true)
|-- char_freq_#: string (nullable = true)
|-- capital_run_length_average: string (nullable = true)
|-- capital_run_length_longest: string (nullable = true)
|-- capital_run_length_total: string (nullable = true)
|-- labels: string (nullable = true)
We notice that all the features and the label are of type String
. We import the String type from pyspark.sql.types, and later use the withColumn method for the dataframe to cast()
each column to Double
.
from pyspark.sql.types import StringType
from pyspark.sql.functions import col
StringColumns = [x.name for x in rawdata.schema.fields if x.dataType == StringType()]
for c in StringColumns:
rawdata = rawdata.withColumn(c, col(c).cast("double"))
We print the schema again and notice the variables are now of type double
.
rawdata.printSchema()
root
|-- word_freq_make: double (nullable = true)
|-- word_freq_address: double (nullable = true)
|-- word_freq_all: double (nullable = true)
|-- word_freq_3d: double (nullable = true)
|-- word_freq_our: double (nullable = true)
|-- word_freq_over: double (nullable = true)
|-- word_freq_remove: double (nullable = true)
|-- word_freq_internet: double (nullable = true)
|-- word_freq_order: double (nullable = true)
|-- word_freq_mail: double (nullable = true)
|-- word_freq_receive: double (nullable = true)
|-- word_freq_will: double (nullable = true)
|-- word_freq_people: double (nullable = true)
|-- word_freq_report: double (nullable = true)
|-- word_freq_addresses: double (nullable = true)
|-- word_freq_free: double (nullable = true)
|-- word_freq_business: double (nullable = true)
|-- word_freq_email: double (nullable = true)
|-- word_freq_you: double (nullable = true)
|-- word_freq_credit: double (nullable = true)
|-- word_freq_your: double (nullable = true)
|-- word_freq_font: double (nullable = true)
|-- word_freq_000: double (nullable = true)
|-- word_freq_money: double (nullable = true)
|-- word_freq_hp: double (nullable = true)
|-- word_freq_hpl: double (nullable = true)
|-- word_freq_george: double (nullable = true)
|-- word_freq_650: double (nullable = true)
|-- word_freq_lab: double (nullable = true)
|-- word_freq_labs: double (nullable = true)
|-- word_freq_telnet: double (nullable = true)
|-- word_freq_857: double (nullable = true)
|-- word_freq_data: double (nullable = true)
|-- word_freq_415: double (nullable = true)
|-- word_freq_85: double (nullable = true)
|-- word_freq_technology: double (nullable = true)
|-- word_freq_1999: double (nullable = true)
|-- word_freq_parts: double (nullable = true)
|-- word_freq_pm: double (nullable = true)
|-- word_freq_direct: double (nullable = true)
|-- word_freq_cs: double (nullable = true)
|-- word_freq_meeting: double (nullable = true)
|-- word_freq_original: double (nullable = true)
|-- word_freq_project: double (nullable = true)
|-- word_freq_re: double (nullable = true)
|-- word_freq_edu: double (nullable = true)
|-- word_freq_table: double (nullable = true)
|-- word_freq_conference: double (nullable = true)
|-- char_freq_;: double (nullable = true)
|-- char_freq_(: double (nullable = true)
|-- char_freq_[: double (nullable = true)
|-- char_freq_!: double (nullable = true)
|-- char_freq_$: double (nullable = true)
|-- char_freq_#: double (nullable = true)
|-- capital_run_length_average: double (nullable = true)
|-- capital_run_length_longest: double (nullable = true)
|-- capital_run_length_total: double (nullable = true)
|-- labels: double (nullable = true)
We have now a dataframe that contains several columns corresponding to the features, of type double, and the last column corresponding to the labels, also of type double.
We can now start the machine learning analysis by creating the training and test set and then designing the DecisionTreeClassifier using the training data.
trainingData, testData = rawdata.randomSplit([0.7, 0.3], 1242)
print(f"There are {trainingData.cache().count()} rows in the training set, and {testData.cache().count()} in the test set")
There are 3166 rows in the training set, and 1435 in the test set
Be careful when using randomSplit. It is important to notice that, unlike when using a single machine, the randomSplit
method can lead to different training and test sets even if we use the same seed! This can happen when the cluster configuration changes. The dataset we are using is very small, it basically fits in one partition. We can see the effect of cluster configuration by repartitioning the dataframe.
trainRepartitionData, testRepartitionData = (rawdata.repartition(24).randomSplit([0.7, 0.3], seed=1242))
print(trainRepartitionData.count())
3223
When you do a 70/30 train/test split, it is an "approximate" 70/30 split. It is not an exact 70/30 split, and when the partitioning of the data changes, you get not only a different number of data points in train/test, but also different data points.
The recommendation is to split your data once. If you need to modify the training and test data in some way, make sure you perform the modifications on the training and test data you got from the split you did at the beginning. Do not perform the modifications on the original data and perform again randomSplit with the hope you will get the same training and test splits that you got the first time.
Vector Assembler Most supervised learning models in PySpark require a column of type SparseVector
or DenseVector
for the features. We use the VectorAssembler tool to concatenate all the features in a vector.
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols = spam_names[0:ncolumns-1], outputCol = 'features')
vecTrainingData = vecAssembler.transform(trainingData)
vecTrainingData.select("features", "labels").show(5)
+--------------------+------+
| features|labels|
+--------------------+------+
|(57,[54,55,56],[1...| 0.0|
|(57,[54,55,56],[1...| 0.0|
|(57,[54,55,56],[1...| 0.0|
|(57,[54,55,56],[1...| 0.0|
|(57,[54,55,56],[1...| 0.0|
+--------------------+------+
only showing top 5 rows
The DecisionTreeClassifier implemented in PySpark has several parameters to tune. Some of them are
maxDepth: it corresponds to the maximum depth of the tree. The default is 5.
maxBins: it determines how many bins should be created from continuous features. The default is 32.
impurity: it is the metric used to compute information gain. The options are "gini" or "entropy". The default is "gini".
minInfoGain: it determines the minimum information gain that will be used for a split. The default is zero.
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="labels", featuresCol="features", maxDepth=10, impurity='entropy')
model = dt.fit(vecTrainingData)
The individual importance of the features can be obtained using featureImportances. We can recover the values of importance and the indexes of the vector where these values are different from zero using values and indices
fi = model.featureImportances
imp_feat = np.zeros(ncolumns-1)
imp_feat[fi.indices] = fi.values
Let us plot the relative importances.
Although we can not visualise an image on the HPC terminal, we can print a figure as a .png file and visualise the .png file in the screen in the local computer.
To plot and save figures on HPC, we need to do the following before using pyplot:
import matplotlib
matplotlib.use('Agg') # Must be before importing matplotlib.pyplot or pylab!
We now set the instructions to print the figure in the Output
folder
import matplotlib.pyplot as plt
x = np.arange(ncolumns-1)
plt.bar(x, imp_feat)
plt.savefig("./Output/feature_importances.png")
The feature with the highest importance is
spam_names[np.argmax(imp_feat)]
'char_freq_$'
We can visualise the DecisionTree in the form of if-then-else statements.
print(model.toDebugString)
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_7144f32ac0ce, depth=10, numNodes=193, numClasses=2, numFeatures=57
If (feature 52 <= 0.0455)
If (feature 6 <= 0.045)
If (feature 51 <= 0.172)
If (feature 24 <= 0.025)
If (feature 55 <= 10.5)
If (feature 20 <= 0.615)
If (feature 26 <= 0.005)
If (feature 9 <= 0.265)
Predict: 0.0
Else (feature 9 > 0.265)
If (feature 56 <= 28.5)
Predict: 0.0
Else (feature 56 > 28.5)
If (feature 2 <= 0.015)
Predict: 1.0
Else (feature 2 > 0.015)
Predict: 0.0
Else (feature 26 > 0.005)
Predict: 0.0
Else (feature 20 > 0.615)
If (feature 15 <= 1.8050000000000002)
If (feature 7 <= 0.01)
If (feature 35 <= 0.005)
If (feature 19 <= 0.01)
Predict: 0.0
Else (feature 19 > 0.01)
Predict: 1.0
Else (feature 35 > 0.005)
If (feature 2 <= 0.655)
Predict: 1.0
Else (feature 2 > 0.655)
Predict: 0.0
Else (feature 7 > 0.01)
If (feature 26 <= 0.005)
Predict: 1.0
Else (feature 26 > 0.005)
Predict: 0.0
Else (feature 15 > 1.8050000000000002)
If (feature 40 <= 0.02)
Predict: 1.0
Else (feature 40 > 0.02)
Predict: 0.0
Else (feature 55 > 10.5)
If (feature 44 <= 0.265)
If (feature 26 <= 0.005)
If (feature 4 <= 0.675)
If (feature 45 <= 0.01)
Predict: 0.0
Else (feature 45 > 0.01)
If (feature 27 <= 0.045)
Predict: 0.0
Else (feature 27 > 0.045)
Predict: 1.0
Else (feature 4 > 0.675)
If (feature 51 <= 0.0335)
Predict: 1.0
Else (feature 51 > 0.0335)
If (feature 11 <= 0.895)
Predict: 1.0
Else (feature 11 > 0.895)
Predict: 0.0
Else (feature 26 > 0.005)
Predict: 0.0
Else (feature 44 > 0.265)
If (feature 15 <= 1.085)
Predict: 0.0
Else (feature 15 > 1.085)
If (feature 1 <= 0.005)
Predict: 1.0
Else (feature 1 > 0.005)
Predict: 0.0
Else (feature 24 > 0.025)
If (feature 15 <= 1.8050000000000002)
If (feature 22 <= 0.005)
Predict: 0.0
Else (feature 22 > 0.005)
If (feature 36 <= 0.805)
Predict: 0.0
Else (feature 36 > 0.805)
Predict: 1.0
Else (feature 15 > 1.8050000000000002)
Predict: 1.0
Else (feature 51 > 0.172)
If (feature 54 <= 2.6285)
If (feature 15 <= 0.01)
If (feature 7 <= 0.01)
If (feature 16 <= 0.01)
If (feature 51 <= 0.976)
If (feature 8 <= 0.005)
If (feature 22 <= 0.365)
Predict: 0.0
Else (feature 22 > 0.365)
Predict: 1.0
Else (feature 8 > 0.005)
If (feature 55 <= 16.5)
Predict: 0.0
Else (feature 55 > 16.5)
Predict: 1.0
Else (feature 51 > 0.976)
If (feature 44 <= 0.005)
If (feature 54 <= 1.6795)
Predict: 1.0
Else (feature 54 > 1.6795)
Predict: 0.0
Else (feature 44 > 0.005)
Predict: 0.0
Else (feature 16 > 0.01)
If (feature 24 <= 0.01)
Predict: 1.0
Else (feature 24 > 0.01)
Predict: 0.0
Else (feature 7 > 0.01)
Predict: 1.0
Else (feature 15 > 0.01)
If (feature 45 <= 0.01)
If (feature 51 <= 0.4955)
If (feature 15 <= 1.8050000000000002)
If (feature 2 <= 0.755)
If (feature 43 <= 0.005)
Predict: 1.0
Else (feature 43 > 0.005)
Predict: 0.0
Else (feature 2 > 0.755)
Predict: 0.0
Else (feature 15 > 1.8050000000000002)
Predict: 1.0
Else (feature 51 > 0.4955)
If (feature 56 <= 40.5)
If (feature 15 <= 0.775)
Predict: 0.0
Else (feature 15 > 0.775)
Predict: 1.0
Else (feature 56 > 40.5)
Predict: 1.0
Else (feature 45 > 0.01)
Predict: 0.0
Else (feature 54 > 2.6285)
If (feature 51 <= 0.4955)
If (feature 34 <= 0.005)
If (feature 11 <= 2.005)
If (feature 50 <= 0.001)
If (feature 20 <= 0.885)
If (feature 15 <= 1.085)
Predict: 0.0
Else (feature 15 > 1.085)
Predict: 1.0
Else (feature 20 > 0.885)
If (feature 49 <= 0.3795)
Predict: 1.0
Else (feature 49 > 0.3795)
Predict: 0.0
Else (feature 50 > 0.001)
Predict: 1.0
Else (feature 11 > 2.005)
Predict: 0.0
Else (feature 34 > 0.005)
Predict: 0.0
Else (feature 51 > 0.4955)
If (feature 49 <= 0.002)
If (feature 44 <= 0.005)
If (feature 56 <= 17.5)
If (feature 18 <= 0.01)
Predict: 0.0
Else (feature 18 > 0.01)
Predict: 1.0
Else (feature 56 > 17.5)
If (feature 45 <= 0.01)
Predict: 1.0
Else (feature 45 > 0.01)
Predict: 0.0
Else (feature 44 > 0.005)
If (feature 44 <= 2.1)
Predict: 0.0
Else (feature 44 > 2.1)
Predict: 1.0
Else (feature 49 > 0.002)
Predict: 1.0
Else (feature 6 > 0.045)
If (feature 26 <= 0.005)
If (feature 29 <= 0.185)
If (feature 51 <= 0.001)
If (feature 6 <= 0.405)
If (feature 4 <= 0.10500000000000001)
Predict: 0.0
Else (feature 4 > 0.10500000000000001)
If (feature 0 <= 0.175)
Predict: 1.0
Else (feature 0 > 0.175)
Predict: 0.0
Else (feature 6 > 0.405)
If (feature 32 <= 0.01)
If (feature 9 <= 1.125)
Predict: 1.0
Else (feature 9 > 1.125)
If (feature 0 <= 0.005)
Predict: 0.0
Else (feature 0 > 0.005)
Predict: 1.0
Else (feature 32 > 0.01)
Predict: 0.0
Else (feature 51 > 0.001)
If (feature 45 <= 0.01)
If (feature 56 <= 46.5)
If (feature 54 <= 2.498)
Predict: 1.0
Else (feature 54 > 2.498)
If (feature 55 <= 12.5)
Predict: 0.0
Else (feature 55 > 12.5)
Predict: 1.0
Else (feature 56 > 46.5)
Predict: 1.0
Else (feature 45 > 0.01)
If (feature 2 <= 0.015)
Predict: 0.0
Else (feature 2 > 0.015)
Predict: 1.0
Else (feature 29 > 0.185)
Predict: 0.0
Else (feature 26 > 0.005)
Predict: 0.0
Else (feature 52 > 0.0455)
If (feature 24 <= 0.195)
If (feature 55 <= 9.5)
If (feature 15 <= 0.165)
If (feature 18 <= 3.1550000000000002)
Predict: 0.0
Else (feature 18 > 3.1550000000000002)
Predict: 1.0
Else (feature 15 > 0.165)
If (feature 55 <= 8.5)
If (feature 12 <= 0.805)
If (feature 4 <= 0.825)
Predict: 1.0
Else (feature 4 > 0.825)
If (feature 1 <= 0.005)
Predict: 0.0
Else (feature 1 > 0.005)
Predict: 1.0
Else (feature 12 > 0.805)
Predict: 0.0
Else (feature 55 > 8.5)
Predict: 0.0
Else (feature 55 > 9.5)
If (feature 45 <= 0.645)
If (feature 22 <= 0.365)
If (feature 51 <= 0.1325)
If (feature 49 <= 0.6515)
If (feature 54 <= 3.4275)
If (feature 12 <= 0.01)
If (feature 1 <= 0.135)
Predict: 1.0
Else (feature 1 > 0.135)
Predict: 0.0
Else (feature 12 > 0.01)
Predict: 1.0
Else (feature 54 > 3.4275)
Predict: 1.0
Else (feature 49 > 0.6515)
Predict: 0.0
Else (feature 51 > 0.1325)
If (feature 36 <= 0.14500000000000002)
If (feature 54 <= 1.598)
Predict: 0.0
Else (feature 54 > 1.598)
Predict: 1.0
Else (feature 36 > 0.14500000000000002)
If (feature 18 <= 1.7650000000000001)
Predict: 1.0
Else (feature 18 > 1.7650000000000001)
If (feature 1 <= 0.225)
Predict: 0.0
Else (feature 1 > 0.225)
Predict: 1.0
Else (feature 22 > 0.365)
Predict: 1.0
Else (feature 45 > 0.645)
Predict: 0.0
Else (feature 24 > 0.195)
If (feature 51 <= 0.4325)
If (feature 6 <= 0.215)
Predict: 0.0
Else (feature 6 > 0.215)
If (feature 1 <= 0.005)
Predict: 1.0
Else (feature 1 > 0.005)
Predict: 0.0
Else (feature 51 > 0.4325)
Predict: 1.0
Indirectly, decision trees allow feature selection: features that allow making decisions in the top of the tree are more relevant for the decision problem.
We can organise the information provided by the visualisation above in the form of a Table using Pandas
import pandas as pd
featureImp = pd.DataFrame(
list(zip(vecAssembler.getInputCols(), model.featureImportances)),
columns=["feature", "importance"])
featureImp.sort_values(by="importance", ascending=False)
feature | importance | |
---|---|---|
52 | char_freq_$ | 0.289479 |
6 | word_freq_remove | 0.150070 |
51 | char_freq_! | 0.121890 |
24 | word_freq_hp | 0.084602 |
55 | capital_run_length_longest | 0.049771 |
54 | capital_run_length_average | 0.042403 |
15 | word_freq_free | 0.036760 |
45 | word_freq_edu | 0.031195 |
26 | word_freq_george | 0.030779 |
44 | word_freq_re | 0.018478 |
4 | word_freq_our | 0.011667 |
20 | word_freq_your | 0.011351 |
7 | word_freq_internet | 0.010919 |
22 | word_freq_000 | 0.010024 |
27 | word_freq_650 | 0.009521 |
49 | char_freq_( | 0.009478 |
18 | word_freq_you | 0.007694 |
12 | word_freq_people | 0.007015 |
16 | word_freq_business | 0.006899 |
1 | word_freq_address | 0.006322 |
2 | word_freq_all | 0.006321 |
36 | word_freq_1999 | 0.005759 |
11 | word_freq_will | 0.005730 |
34 | word_freq_85 | 0.005229 |
56 | capital_run_length_total | 0.005106 |
8 | word_freq_order | 0.004539 |
9 | word_freq_mail | 0.004016 |
50 | char_freq_[ | 0.003810 |
29 | word_freq_labs | 0.003350 |
0 | word_freq_make | 0.002008 |
35 | word_freq_technology | 0.001775 |
32 | word_freq_data | 0.001637 |
40 | word_freq_cs | 0.001493 |
19 | word_freq_credit | 0.001481 |
43 | word_freq_project | 0.001430 |
3 | word_freq_3d | 0.000000 |
5 | word_freq_over | 0.000000 |
53 | char_freq_# | 0.000000 |
10 | word_freq_receive | 0.000000 |
13 | word_freq_report | 0.000000 |
14 | word_freq_addresses | 0.000000 |
17 | word_freq_email | 0.000000 |
48 | char_freq_; | 0.000000 |
47 | word_freq_conference | 0.000000 |
46 | word_freq_table | 0.000000 |
30 | word_freq_telnet | 0.000000 |
21 | word_freq_font | 0.000000 |
31 | word_freq_857 | 0.000000 |
42 | word_freq_original | 0.000000 |
41 | word_freq_meeting | 0.000000 |
39 | word_freq_direct | 0.000000 |
38 | word_freq_pm | 0.000000 |
37 | word_freq_parts | 0.000000 |
23 | word_freq_money | 0.000000 |
25 | word_freq_hpl | 0.000000 |
33 | word_freq_415 | 0.000000 |
28 | word_freq_lab | 0.000000 |
A better visualisation of the tree in pyspark can be obtained by using, for example, spark-tree-plotting. The trick is to convert the spark tree to a JSON format. Once you have the JSON format, you can visualise it using D3 or you can transform from JSON to DOT and use graphviz as we did in scickit-learn for the Notebook in MLAI.
Pipeline We have not mentioned the test data yet. Before applying the decision tree to the test data, this is a good opportunity to introduce a pipeline that includes the VectorAssembler and the Decision Tree.
from pyspark.ml import Pipeline
# Combine stages into pipeline
stages = [vecAssembler, dt]
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(trainingData)
We finally use the MulticlassClassificationEvaluator tool to assess the accuracy on the test set.
predictions = pipelineModel.transform(testData)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator\
(labelCol="labels", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g " % accuracy)
Accuracy = 0.915679
The main difference between Decision Trees for Classification and Decision Trees for Regression is in the impurity measure used. For regression, PySpark uses the variance of the target features as the impurity measure.
The DecisionTreeRegressor implemented in PySpark has several parameters to tune. Some of them are
maxDepth: it corresponds to the maximum depth of the tree. The default is 5.
maxBins: it determines how many bins should be created from continuous features. The default is 32.
impurity: for regression the only supported impurity option is variance.
minInfoGain: it determines the minimum information gain that will be used for a split. The default is zero.
You will have the opportunity to experiment with the DecisionTreeRegressor class later in the module.
We studied the implementation of ensemble methods in scikit-learn in COM6509. See this notebook for a refresher.
PySpark implemenst two types of Tree Ensembles, random forests and gradient boosting. The main difference between both methods is the way in which they combine the different trees that compose the ensemble.
The variant of Random Forests implemented in Apache Spark is also known as bagging or bootstrap aggregating. The tree ensemble in random forests is built by training individual decision trees on different subsets of the training data and using a subset of the available features. For classification, the prediction is done by majority voting among the individual trees. For regression, the prediction is the average of the individual predictions of each tree. For more details on the PySpark implementation see here.
Besides the parameters that we already mentioned for the DecisionTreeClassifier and the DecisionTreeRegressor, the RandomForestClassifier and the RandomForestRegressor in PySpark require three additional parameters:
numTrees the total number of trees to train
featureSubsetStrategy number of features to use as candidates for splitting at each tree node. Options include all, onethird, sqrt, log2, [1-n]
subsamplingRate: size of the dataset used for training each tree in the forest, as a fraction of the size of the original dataset.
We already did an example of classification with decision trees. Let us use now random forests for performing regression.
We are going to use the Wine Quality Dataset to illustrate the use of the RandomForestRegressor class in PySpark. There are eleven input features corresponding to different attributes measured on wine samples (based on physicochemical tests). The target feature corresponds to a quality index that goes from zero to ten being zero a very bad wine and ten an excellent wine. The target feature was computed as the median score of three independent wine taster experts. More details on the dataset can be found in this paper.
rawdataw = spark.read.csv('./Data/winequality-white.csv', sep=';', header='true')
rawdataw.cache()
DataFrame[fixed acidity: string, volatile acidity: string, citric acid: string, residual sugar: string, chlorides: string, free sulfur dioxide: string, total sulfur dioxide: string, density: string, pH: string, sulphates: string, alcohol: string, quality: string]
Notice that we use the parameter sep=;
when loading the file, since the columns in the file are separated by ;
instead of the default ,
rawdataw.printSchema()
root
|-- fixed acidity: string (nullable = true)
|-- volatile acidity: string (nullable = true)
|-- citric acid: string (nullable = true)
|-- residual sugar: string (nullable = true)
|-- chlorides: string (nullable = true)
|-- free sulfur dioxide: string (nullable = true)
|-- total sulfur dioxide: string (nullable = true)
|-- density: string (nullable = true)
|-- pH: string (nullable = true)
|-- sulphates: string (nullable = true)
|-- alcohol: string (nullable = true)
|-- quality: string (nullable = true)
We now follow a very familiar procedure to get the dataset to a format that can be input to Spark MLlib, which consists of:
- transforming the data from type string to type double.
- creating a pipeline that includes a vector assembler and a random forest regressor.
We first start transforming the data types.
from pyspark.sql.types import StringType
from pyspark.sql.functions import col
StringColumns = [x.name for x in rawdataw.schema.fields if x.dataType == StringType()]
for c in StringColumns:
rawdataw = rawdataw.withColumn(c, col(c).cast("double"))
rawdataw = rawdataw.withColumnRenamed('quality', 'labels')
Notice that we used the withColumnRenamed method to rename the name of the target feature from 'quality' to 'label'.
rawdataw.printSchema()
root
|-- fixed acidity: double (nullable = true)
|-- volatile acidity: double (nullable = true)
|-- citric acid: double (nullable = true)
|-- residual sugar: double (nullable = true)
|-- chlorides: double (nullable = true)
|-- free sulfur dioxide: double (nullable = true)
|-- total sulfur dioxide: double (nullable = true)
|-- density: double (nullable = true)
|-- pH: double (nullable = true)
|-- sulphates: double (nullable = true)
|-- alcohol: double (nullable = true)
|-- labels: double (nullable = true)
We now partition the data into a training and a test set
trainingDataw, testDataw = rawdataw.randomSplit([0.7, 0.3], 42)
Now, we create the pipeline. First, we create the vector assembler.
vecAssemblerw = VectorAssembler(inputCols=StringColumns[:-1], outputCol="features")
And now, the Random Forests regressor and the pipeline
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(labelCol="labels", featuresCol="features", maxDepth=5, numTrees=3, \
featureSubsetStrategy = 'all', seed=123, bootstrap=False)
stages = [vecAssemblerw, rf]
pipeline = Pipeline(stages=stages)
pipelineModelw = pipeline.fit(trainingDataw)
We apply now the pipeline to the test data and compute the RMSE between the predictions and the ground truth
predictions = pipelineModelw.transform(testDataw)
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator\
(labelCol="labels", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("RMSE = %g " % rmse)
RMSE = 0.755316
pipelineModelw.stages[-1].featureImportances
SparseVector(11, {0: 0.0378, 1: 0.2075, 2: 0.0354, 3: 0.0266, 5: 0.1073, 7: 0.0394, 8: 0.0189, 9: 0.0106, 10: 0.5166})
featureImp = pd.DataFrame(
list(zip(vecAssemblerw.getInputCols(), pipelineModelw.stages[-1].featureImportances)),
columns=["feature", "importance"])
featureImp.sort_values(by="importance", ascending=False)
feature | importance | |
---|---|---|
10 | alcohol | 0.516600 |
1 | volatile acidity | 0.207466 |
5 | free sulfur dioxide | 0.107308 |
7 | density | 0.039419 |
0 | fixed acidity | 0.037760 |
2 | citric acid | 0.035437 |
3 | residual sugar | 0.026573 |
8 | pH | 0.018871 |
9 | sulphates | 0.010567 |
4 | chlorides | 0.000000 |
6 | total sulfur dioxide | 0.000000 |
In Gradient Boosting or Gradient-boosted trees (GBT), each tree in the ensemble is trained sequentially: the first tree is trained as usual using the training data, the second tree is trained on the residuals between the predictions of the first tree and the labels of the training data, the third tree is trained on the residuals of the predictions of the second tree, etc. The predictions of the ensemble will be the sum of the predictions of each individual tree. The type of residuals are related to the loss function that wants to be minimised. In the PySpark implementations of Gradient-Boosted trees, the loss function for binary classification is the Log-Loss function and the loss function for regression is either the squared error or the absolute error. For details, follow this link.
PySpark uses the classes GBTRegressor for the implementation of Gradient-Boosted trees for regression and GBTClassifier for the implementation of Gradient-Boosted trees for binary classification. As of PySpark version 3.0.1, GBT have not been implemented for multiclass classification.
Besides the parameters that can be specified for Decision Trees, both classes share the additional following parameters
lossType type of loss function. Options are "squared" and "absolute" for regression and "logistic" for classification.
maxIter number of trees in the ensemble. Each iteration produces one tree.
stepSize also known as the learning rate, it is used for shrinking the contribution of each tree in the sequence. The default is 0.1
subsamplingRate as it was the case for Random Forest, this parameter is used for specifying the fraction of the training data used for learning each decision tree.
We will now use the GBTRegressor on the wine quality dataset.
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(labelCol="labels", featuresCol="features", \
maxDepth=5, maxIter=5, lossType='squared', subsamplingRate= 0.5, seed=34)
# Create the pipeline
stages = [vecAssemblerw, gbt]
pipeline = Pipeline(stages=stages)
pipelineModelg = pipeline.fit(trainingDataw)
# Apply the pipeline to the test data
predictions = pipelineModelg.transform(testDataw)
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator \
(labelCol="labels", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("RMSE = %g " % rmse)
RMSE = 0.758747
Note: A reference solution will be provided in Blackboard for this part by the following Thursday (27.04.2023).
Include a cross-validation step for the pipeline of the decision tree classifier applied to the spambase dataset. An example of a cross-validator can be found here. Make paramGrid contains different values for maxDepth, maxBins and impurity and find the best parameters and associated accuracy on the test data. 1
Apply a RandomForestClassifier to the spambase dataset. As in exercise 1, include a cross-validation step with a paramGrid with options for maxDepth, maxBins, numTrees, featureSubsetStrategy and subsamplingRate. Find the best parameters and associated accuracy on the test data.
As we did for the Decision Trees for Classification, it is possible to use the featureImportances method to study the relative importance of each feature in random forests. Use the featureImportances in the random forest regressor used for the wine dataset and indicate the three most relevant features. How are the feature importances computed?
Note: NO solutions will be provided for this part.
Create a decision tree classifier that runs on the default of credit cards dataset. Several of the features in this dataset are categorical. Use StringIndexer for treating the categorical variables.
Note also that this dataset has a different format to the Spambase dataset above - you will need to convert from XLS format to, say, CSV, before using the data. You can use any available tool for this: for example, Excel has an export option, or there is a command line tool xls2csv available on Linux.
Write and run an HPC standalone program using random forest regression on the Physical Activity Monitoring dataset, methodically experimenting with the parameters maxDepth, numTrees and subsamplingRate. Obtain the timing for the experiment. Note that the physical activity monitoring dataset contains NaN (not a number) values when values are missing - you should try dealing with this in two ways
- Drop lines containing NaN
- Replace NaN with the average value from that column. For this, you can use the Imputer transformer available in pyspark.ml.feature
Run experiments with both options.