COM6012 Scalable Machine Learning 2023 by Haiping Lu at The University of Sheffield
Accompanying lectures: YouTube video lectures recorded in Year 2020/21.
- Task 1: To finish in the lab session on 28th April. Essential
- Task 2: To finish in the lab session on 28th April. Essential
- Task 3: To finish by the following Wednesday 29th March. Exercise
- Task 4: To explore further. Optional
- Extracting, transforming and selecting features
- PCA in Spark DataFrame API
pyspark.ml
- SVD in Spark RDD API
pyspark.mllib
- StandardScaler in Spark to standardise/normalise data to unit standard deviation and/or zero mean.
- Data Types - RDD-based API
- PCA on Wiki
- Understanding Dimension Reduction with Principal Component Analysis (PCA)
- Principal Component Analysis explained on Kaggle with data available here, and background info here
To deal with data efficiently, Spark considers different data types. In particular, MLlib supports local vectors and matrices stored on a single machine, as well as distributed matrices backed by one or more RDDs. Local vectors and local matrices are simple data models that serve as public interfaces. The underlying linear algebra operations are provided by Breeze. A training example used in supervised learning is called a “labeled point” in MLlib.
Local vector: Dense vs Sparse
A local vector has integer-typed and 0-based indices and double-typed values, stored on a single machine. MLlib supports two types of local vectors: dense and sparse. A dense vector is backed by a double array representing its entry values, while a sparse vector is backed by two parallel arrays: indices and values. For example, a vector (1.0, 0.0, 3.0) can be represented in dense format as [1.0, 0.0, 3.0] or in sparse format as (3, [0, 2], [1.0, 3.0]), where 3 is the size of the vector.
Check out the Vector in RDD API or Vector in DataFrame API (see method .Sparse()
) and SparseVector in RDD API or SparseVector in DataFrame API . The official example is below
import numpy as np
from pyspark.mllib.linalg import Vectors
dv1 = np.array([1.0, 0.0, 3.0]) # Use a NumPy array as a dense vector.
dv2 = [1.0, 0.0, 3.0] # Use a Python list as a dense vector.
sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0]) # Create a SparseVector.
Note the vector created by Vectors.sparse()
is of type SparseVector()
sv1
# SparseVector(3, {0: 1.0, 2: 3.0})
To view the sparse vector in a dense format
sv1.toArray()
# array([1., 0., 3.])
A labeled point is a local vector, either dense or sparse, associated with a label/response. In MLlib, labeled points are used in supervised learning algorithms. We use a double to store a label, so we can use labeled points in both regression and classification. For binary classification, a label should be either 0 (negative) or 1 (positive). For multiclass classification, labels should be class indices starting from zero: 0, 1, 2, ....
See LabeledPoint API in MLlib. Now, we create a labeled point with a positive label and a dense feature vector, as well as a labeled point with a negative label and a sparse feature vector.
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint
pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])
neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))
neg
# LabeledPoint(0.0, (3,[0,2],[1.0,3.0]))
neg.label
# 0.0
neg.features
# SparseVector(3, {0: 1.0, 2: 3.0})
Now view the features as dense vector (rather than sparse vector)
neg.features.toArray()
# array([1., 0., 3.])
A local matrix has integer-typed row and column indices and double-typed values, stored on a single machine. MLlib supports dense matrices, whose entry values are stored in a single double array in column-major order, and sparse matrices, whose non-zero entry values are stored in the Compressed Sparse Column (CSC) format in column-major order. For example, we create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) and a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0)) in the following:
from pyspark.mllib.linalg import Matrix, Matrices
dm2 = Matrices.dense(3, 2, [1, 3, 5, 2, 4, 6])
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])
print(dm2)
# DenseMatrix([[1., 2.],
# [3., 4.],
# [5., 6.]])
print(sm)
# 3 X 2 CSCMatrix
# (0,0) 9.0
# (2,1) 6.0
# (1,1) 8.0
See Scala API for Matrices.sparse and from its source code, we can see it creates a CSC SparseMatrix.
Here the compressed sparse column (CSC or CCS) format is used for sparse matrix representation. You can learn it from this simple explanation. To learn more about CSC, you may refer to a top video and a top post with animation.
values are read first by column, a row index is stored for each value, and column pointers are stored. For example, CSC is (val, row_ind, col_ptr), where val is an array of the (top-to-bottom, then left-to-right) non-zero values of the matrix; row_ind is the row indices corresponding to the values; and, col_ptr is the list of val indexes where each column starts.
dsm=sm.toDense()
print(dsm)
# DenseMatrix([[9., 0.],
# [0., 8.],
# [0., 6.]])
A distributed matrix has long-typed row and column indices and double-typed values, stored distributively in one or more RDDs. It is very important to choose the right format to store large and distributed matrices. Converting a distributed matrix to a different format may require a global shuffle, which is quite expensive. Four types of distributed matrices have been implemented so far.
The basic type is called RowMatrix. A RowMatrix is a row-oriented distributed matrix without meaningful row indices, e.g., a collection of feature vectors. It is backed by an RDD of its rows, where each row is a local vector. We assume that the number of columns is not huge for a RowMatrix so that a single local vector can be reasonably communicated to the driver and can also be stored / operated on using a single node. Since each row is represented by a local vector, the number of columns is limited by the integer range but it should be much smaller in practice.
Now we create an RDD of vectors rows
, from which we create a RowMatrix mat
.
from pyspark.mllib.linalg.distributed import RowMatrix
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])
mat = RowMatrix(rows)
m = mat.numRows() # Get its size: m=4, n=3
n = mat.numCols()
rowsRDD = mat.rows # Get the rows as an RDD of vectors again.
We can view the RowMatrix in a dense matrix format
rowsRDD.collect()
# [DenseVector([1.0, 2.0, 3.0]), DenseVector([4.0, 5.0, 6.0]), DenseVector([7.0, 8.0, 9.0]), DenseVector([10.0, 11.0, 12.0])]
Principal component analysis (PCA) is a statistical procedure that uses an orthogonal transformation to convert a set of observations of possibly correlated variables (entities each of which takes on various numerical values) into a set of values of linearly uncorrelated variables called principal components (PCs). A PCA class trains a model to project vectors to a low-dimensional space using PCA and this is probably the most commonly used dimensionality reduction method.
Check out the API. Check pyspark.ml.feature.PCAModel
too to see what is available for the fitted model. Let us project three 5-dimensional feature vectors into 2-dimensional principal components.
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])
df.show()
# +--------------------+
# | features|
# +--------------------+
# | (5,[1,3],[1.0,7.0])|
# |[2.0,0.0,3.0,4.0,...|
# |[4.0,0.0,0.0,6.0,...|
# +--------------------+
pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
# +----------------------------------------+
# |pcaFeatures |
# +----------------------------------------+
# |[1.6485728230883807,-4.013282700516296] |
# |[-4.645104331781534,-1.1167972663619026]|
# |[-6.428880535676489,-5.337951427775355] |
# +----------------------------------------+
Check the explained variance in percentage
model.explainedVariance
# DenseVector([0.7944, 0.2056])
Take a look at the principal components Matrix. Each column is one principal component.
print(model.pc)
# DenseMatrix([[-0.44859172, -0.28423808],
# [ 0.13301986, -0.05621156],
# [-0.12523156, 0.76362648],
# [ 0.21650757, -0.56529588],
# [-0.84765129, -0.11560341]])
pyspark.mllib
supports PCA for tall-and-skinny (big
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
rows = sc.parallelize([
Vectors.sparse(5, {1: 1.0, 3: 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
])
rows.collect()
# [SparseVector(5, {1: 1.0, 3: 7.0}), DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), DenseVector([4.0, 0.0, 0.0, 6.0, 7.0])]
mat = RowMatrix(rows)
Compute the top 2 principal components, which are stored in a local dense matrix (the same as above).
pc = mat.computePrincipalComponents(2)
print(pc)
# DenseMatrix([[-0.44859172, -0.28423808],
# [ 0.13301986, -0.05621156],
# [-0.12523156, 0.76362648],
# [ 0.21650757, -0.56529588],
# [-0.84765129, -0.11560341]])
Project the rows to the linear space spanned by the top 2 principal components (the same as above)
projected = mat.multiply(pc)
projected.rows.collect()
# [DenseVector([1.6486, -4.0133]), DenseVector([-4.6451, -1.1168]), DenseVector([-6.4289, -5.338])]
Now we convert to dense rows to see the matrix
from pyspark.mllib.linalg import DenseVector
denseRows = rows.map(lambda vector: DenseVector(vector.toArray()))
denseRows.collect()
# [DenseVector([0.0, 1.0, 0.0, 7.0, 0.0]), DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), DenseVector([4.0, 0.0, 0.0, 6.0, 7.0])]
Read SVD in RDD-based API pyspark.mllib
. As covered in the lecture, we will need SVD for PCA on large-scale data. Here, we use it on the same small toy example to examine the relationship with eigenvalue decomposition based PCA methods above.
We compute the top 2 singular values and corresponding singular vectors.
svd = mat.computeSVD(2, computeU=True)
U = svd.U # The U factor is a RowMatrix.
s = svd.s # The singular values are stored in a local dense vector.
V = svd.V # The V factor is a local dense matrix.
If we are doing it right, the right singular vectors should be the same as the eigenvectors.
print(V)
# DenseMatrix([[-0.31278534, 0.31167136],
# [-0.02980145, -0.17133211],
# [-0.12207248, 0.15256471],
# [-0.71847899, -0.68096285],
# [-0.60841059, 0.62170723]])
But it is not the same! Why? Remeber that we need to do centering! We can do so use the StandardScaler (check out the API) to center the data, i.e., remove the mean.
from pyspark.mllib.feature import StandardScaler
standardizer = StandardScaler(True, False)
model = standardizer.fit(rows)
centeredRows = model.transform(rows)
centeredRows.collect()
# [DenseVector([-2.0, 0.6667, -1.0, 1.3333, -4.0]), DenseVector([0.0, -0.3333, 2.0, -1.6667, 1.0]), DenseVector([2.0, -0.3333, -1.0, 0.3333, 3.0])]
centeredmat = RowMatrix(centeredRows)
Compute the top 2 singular values and corresponding singular vectors.
svd = centeredmat.computeSVD(2, computeU=True)
U = svd.U # The U factor is a RowMatrix.
s = svd.s # The singular values are stored in a local dense vector.
V = svd.V # The V factor is a local dense matrix.
Check the PC obtained this time (it is the same as the above PCA methods now)
print(V)
DenseMatrix([[-0.44859172, -0.28423808],
[ 0.13301986, -0.05621156],
[-0.12523156, 0.76362648],
[ 0.21650757, -0.56529588],
[-0.84765129, -0.11560341]])
Let us examine the relationships between the singular values and the eigenvalues.
print(s)
# [6.001041088520536,3.0530049438580336]
We get the eigenvalues by taking squares of the singular values
evs=s*s
print(evs)
[36.012494146111734,9.320839187221594]
Now we compute the percentage of variance captures and compare with the above to verify (see/search model.explainedVariance
).
evs/sum(evs)
# DenseVector([0.7944, 0.2056])
Study the Iris flower data set iris.csv
under Data
with PCA.
- Follow Understanding Dimension Reduction with Principal Component Analysis (PCA) to do the same analysis using the DataFrame-based PCA
pca.fit()
frompyspark.ml
. - Follow this lab to verify that using the other two RDD-based PCA APIs
computePrincipalComponents
andcomputeSVD
will give the same PCA features.
A company is trying to figure out why their best and experienced employees are leaving prematurely from a dataset. Follow the example Principal Component Analysis explained on Kaggle to perform such analysis in PySpark, using as many PySpark APIs as possible.
Use PySpark to perform the steps in IBM's notebook on Spark-based machine learning for word meanings that makes use of PCA, kmeans, and Word2Vec to learn word meanings.
Choose a Bag of Words Data Set. Let us take the NIPS full papers data as an example.
The format of this data is
Number of documents
Number of words in the vocabulary
Total number of words in the collection
docID wordID count
docID wordID count
...
docID wordID count
Our data matrix will be
- extract the number of documents and the size of the vocabulary, and strip off the first 3 lines
- combine the words per document
- create sparse vectors (for better space efficiency)
Start from a small dataset to test your work, and then checking whether your work scales up to the big NYTIMES bagofwords data. Keep everything as parallel as possible.
Find some large-scale image datasets to examine the principal components and explore low-dimensional representations.