COM6012 Scalable Machine Learning 2023 by Haiping Lu at The University of Sheffield
Accompanying lectures: YouTube video lectures recorded in Year 2020/21.
- Task 1: To finish in the lab session on 17th Feb. Essential
- Task 2: To finish in the lab session on 17th Feb. Essential
- Task 3: To finish in the lab session on 17th Feb. Essential
- Task 4: To finish by the following Wednesday 22nd Feb. Exercise
- Task 5: To explore further. Optional
- Chapters 5 and 6, and especially Section 9.1 (of Chapter 9) of PySpark tutorial
- RDD Programming Guide: Most are useful to know in this module.
- Spark SQL, DataFrames and Datasets Guide:
Overview
andGetting Started
recommended (skipping those without Python example). - Machine Learning Library (MLlib) Guide
- ML Pipelines
- Apache Spark Examples
- Basic Statistics - DataFrame API
- Basic Statistics - RDD API: much richer
Tip: Try to use as much DataFrame APIs as possible by referring to the PySpark API documentation. When you try to program something, try to search whether there is a function in the API already.
Firstly, we follow the standard steps as in Task 2 of Lab 1 but with some variations in settings, i.e. to request 4 cores for an interactive shell. If the request could not be scheduled, try to reduce the number of cores requested. We also install numpy
to our environment for later use.
qrshx -P rse-com6012 -pe smp 4 # request 4 CPU cores using our reserved queue
source myspark.sh # assuming HPC/myspark.sh is under your root directory, otherwise, see Lab 1 Task 2
conda install -y numpy # install numpy, to be used in Task 3. This ONLY needs to be done ONCE. NOT every time.
cd com6012/ScalableML # our main working directory
pyspark --master local[4] # start pyspark with 4 cores requested above.
As stated in the RDD Programming Guide, Spark allows for parallel operations in a program to be executed on a cluster with the following abstractions:
- Main abstraction: resilient distributed dataset (RDD) is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel.
- Second abstraction: shared variables can be shared across tasks, or between tasks and the driver program. Two types:
- Broadcast variables, which can be used to cache a value in memory on all nodes
- Accumulators, which are variables that are only “added” to, such as counters and sums.
Parallelized collections are created by calling SparkContext
’s parallelize
method on an existing iterable or collection in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:
data = [1, 2, 3, 4, 5]
rddData = sc.parallelize(data)
rddData.collect()
# [1, 2, 3, 4, 5]
Note: From Lab 2, I will show output as comments # OUTPUT
and show code without >>>
for easy copy and paste in the shell.
The number of partitions can be set manually by passing parallelize a second argument to the SparkContext
sc.parallelize(data, 16)
# ParallelCollectionRDD[1] at readRDDFromFile at PythonRDD.scala:274
Spark tries to set the number of partitions automatically based on the cluster, the rule being 2-4 partitions for every CPU in the cluster.
Spark can also be used for compute-intensive tasks. This code estimates random.random()
returns the next random floating point number in the range [0.0, 1.0).
from random import random
def inside(p):
x, y = random(), random()
return x*x + y*y < 1
NUM_SAMPLES = 10000000
count = sc.parallelize(range(0, NUM_SAMPLES),8).filter(inside).count()
print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))
# Pi is roughly 3.140986
Note that we did not control the seed above so you are not likely to get exactly the same number 3.139747
. You can change NUM_SAMPLES
to see the difference in precision and time cost.
When a function passed to a Spark operation (such as map
or reduce
) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.
To avoid creating a copy of a large variable for each task, an accessible (read-only!) variable can be kept on each machine - this is useful for particularly large datasets which may be needed for multiple tasks. The data broadcasted this way is cached in serialized form and deserialized before running each task. See Data Serialization for more details about serialization.
Broadcast variables are created from a variable SparkContext.broadcast(v)
. The broadcast variable is a wrapper around
broadcastVar = sc.broadcast([1, 2, 3])
broadcastVar
# <pyspark.broadcast.Broadcast object at 0x2ac15e088860>
broadcastVar.value
# [1, 2, 3]
Accumulators are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce
) or sums.
An accumulator is created from an initial value v
by calling SparkContext.accumulator(v)
Cluster tasks can then add to it using the add
method. However, they cannot read its value. Only the dirver program can read the accumulator's value using its value
method.
accum = sc.accumulator(0)
accum
# Accumulator<id=0, value=0>
sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
accum.value
# 10
sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
accum.value
# 20
Along with the introduction of SparkSession
, the resilient distributed dataset
(RDD) was replaced by dataset
. Again, these are objects that can be worked on in parallel. The available operations are:
- transformations: produce new datasets
- actions: computations which return results
We will start with creating dataframes and datasets, showing how we can print their contents. We create a dataframe in the cell below and print out some info (we can also modify the output before printing):
From RDD to DataFrame
rdd = sc.parallelize([(1,2,3),(4,5,6),(7,8,9)])
df = rdd.toDF(["a","b","c"])
rdd
# ParallelCollectionRDD[10] at readRDDFromFile at PythonRDD.scala:274
The number in [ ]
(10
) is the index for RDDs in the shell so it may vary.
Let us examine the DataFrame
df
# DataFrame[a: bigint, b: bigint, c: bigint]
df.show()
# +---+---+---+
# | a| b| c|
# +---+---+---+
# | 1| 2| 3|
# | 4| 5| 6|
# | 7| 8| 9|
# +---+---+---+
df.printSchema()
# root
# |-- a: long (nullable = true)
# |-- b: long (nullable = true)
# |-- c: long (nullable = true)
Now let us get RDD from DataFrame
rdd2=df.rdd
rdd2
# MapPartitionsRDD[26] at javaToPython at NativeMethodAccessorImpl.java:0
rdd2.collect() # view the content
# [Row(a=1, b=2, c=3), Row(a=4, b=5, c=6), Row(a=7, b=8, c=9)]
This data was downloaded from a classic book on statistical learning.
df = spark.read.load("Data/Advertising.csv", format="csv", inferSchema="true", header="true")
df.show(5) # show the top 5 rows
# 21/02/13 12:10:03 WARN CSVHeaderChecker: CSV header does not conform to the schema.
# Header: , TV, radio, newspaper, sales
# Schema: _c0, TV, radio, newspaper, sales
# Expected: _c0 but found:
# CSV file: file:///home/ac1hlu/com6012/ScalableML/Data/Advertising.csv
# +---+-----+-----+---------+-----+
# |_c0| TV|radio|newspaper|sales|
# +---+-----+-----+---------+-----+
# | 1|230.1| 37.8| 69.2| 22.1|
# | 2| 44.5| 39.3| 45.1| 10.4|
# | 3| 17.2| 45.9| 69.3| 9.3|
# | 4|151.5| 41.3| 58.5| 18.5|
# | 5|180.8| 10.8| 58.4| 12.9|
# +---+-----+-----+---------+-----+
# only showing top 5 rows
Note that a warning is given because the first column has an empty header. If we manually specify it, e.g. as index
, the warning will disappear.
Recall that CSV files are semi-structured data so here Spark inferred the scheme automatically. Let us take a look.
df.printSchema()
# root
# |-- _c0: integer (nullable = true)
# |-- TV: double (nullable = true)
# |-- radio: double (nullable = true)
# |-- newspaper: double (nullable = true)
# |-- sales: double (nullable = true)
Let us remove the first column
df2=df.drop('_c0')
df2.printSchema()
# root
# |-- TV: double (nullable = true)
# |-- radio: double (nullable = true)
# |-- newspaper: double (nullable = true)
# |-- sales: double (nullable = true)
We can get summary statistics for numerical columns using .describe().show()
, very handy to inspect your (big) data for understanding/debugging.
df2.describe().show()
# +-------+-----------------+------------------+------------------+------------------+
# |summary| TV| radio| newspaper| sales|
# +-------+-----------------+------------------+------------------+------------------+
# | count| 200| 200| 200| 200|
# | mean| 147.0425|23.264000000000024|30.553999999999995|14.022500000000003|
# | stddev|85.85423631490805|14.846809176168728| 21.77862083852283| 5.217456565710477|
# | min| 0.7| 0.0| 0.3| 1.6|
# | max| 296.4| 49.6| 114.0| 27.0|
# +-------+-----------------+------------------+------------------+------------------+
MLlib is Spark’s machine learning (ML) library. It provides:
- ML Algorithms: common learning algorithms such as classification, regression, clustering, and collaborative filtering
- Featurization: feature extraction, transformation, dimensionality reduction, and selection
- Pipelines: tools for constructing, evaluating, and tuning ML Pipelines
- Persistence: saving and load algorithms, models, and Pipelines
- Utilities: linear algebra, statistics, data handling, etc.
MLlib
allows easy combination of numerous algorithms into a single pipeline using standardized APIs for machine learning algorithms. The key concepts are:
- Dataframe. Dataframes can hold a variety of data types.
- Transformer. Transforms one dataframe into another.
- Estimator. Algorithm which can be fit on a DataFrame to produce a Transformer.
- Pipeline. A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.
- Parameter. Transformers and Estimators share a common API for specifying parameters.
A list of some of the available ML features is available here.
Clarification on whether Estimator is a transformer. See Estimators
An Estimator abstracts the concept of a learning algorithm or any algorithm that fits or trains on data. Technically, an Estimator implements a method fit(), which accepts a DataFrame and produces a Model, which is a Transformer. For example, a learning algorithm such as LogisticRegression is an Estimator, and calling fit() trains a LogisticRegressionModel, which is a Model and hence a Transformer.
The example below is based on Section 9.1 of PySpark tutorial.
Let us convert the above data in CSV format to a typical (feature, label) pair for supervised learning. Here we use the Vectors
API. You may also review the lambda expressions in python.
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
def transData(data):
return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])
transformed= transData(df2)
transformed.show(5)
# +-----------------+-----+
# | features|label|
# +-----------------+-----+
# |[230.1,37.8,69.2]| 22.1|
# | [44.5,39.3,45.1]| 10.4|
# | [17.2,45.9,69.3]| 9.3|
# |[151.5,41.3,58.5]| 18.5|
# |[180.8,10.8,58.4]| 12.9|
# +-----------------+-----+
# only showing top 5 rows
The labels here are real numbers and this is a regression problem. For classification problem, you may need to transform labels (e.g., disease,healthy) to indices with a featureIndexer in Step 5, Section 9.1 of PySpark tutorial.
(trainingData, testData) = transformed.randomSplit([0.6, 0.4], 6012)
We set the seed=6012
in the above (see the randomSplit API )Check your train and test data as follows. It is a good practice to keep tracking your data during prototype phase.
trainingData.show(5)
# +---------------+-----+
# | features|label|
# +---------------+-----+
# | [4.1,11.6,5.7]| 3.2|
# | [5.4,29.9,9.4]| 5.3|
# |[7.8,38.9,50.6]| 6.6|
# |[8.7,48.9,75.0]| 7.2|
# |[13.1,0.4,25.6]| 5.3|
# +---------------+-----+
# only showing top 5 rows
testData.show(5)
# +----------------+-----+
# | features|label|
# +----------------+-----+
# | [0.7,39.6,8.7]| 1.6|
# | [7.3,28.1,41.4]| 5.5|
# | [8.4,27.2,2.1]| 5.7|
# | [8.6,2.1,1.0]| 4.8|
# |[11.7,36.9,45.2]| 7.3|
# +----------------+-----+
# only showing top 5 rows
More details on parameters can be found in the Python API documentation.
from pyspark.ml.regression import LinearRegression
lr = LinearRegression()
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.show(5)
# +----------------+-----+------------------+
# | features|label| prediction|
# +----------------+-----+------------------+
# | [0.7,39.6,8.7]| 1.6|10.497359087823323|
# | [7.3,28.1,41.4]| 5.5| 8.615626828376815|
# | [8.4,27.2,2.1]| 5.7| 8.59859112486577|
# | [8.6,2.1,1.0]| 4.8| 4.027845382391438|
# |[11.7,36.9,45.2]| 7.3| 10.41211129446484|
# +----------------+-----+------------------+
# only showing top 5 rows
You may see some warnings, which are normal.
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="label",predictionCol="prediction",metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
# Root Mean Squared Error (RMSE) on test data = 1.87125
This example is adapted from the ML Pipeline API, with minor changes and additional explanations.
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
Directly create DataFrame (for illustration)
training = spark.createDataFrame([
(0, "a b c d e spark 6012", 1.0),
(1, "b d", 0.0),
(2, "spark f g h 6012", 1.0),
(3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])
training.printSchema()
# root
# |-- id: long (nullable = true)
# |-- text: string (nullable = true)
# |-- label: double (nullable = true)
training.show()
# +---+--------------------+-----+
# | id| text|label|
# +---+--------------------+-----+
# | 0|a b c d e spark 6012| 1.0|
# | 1| b d| 0.0|
# | 2| spark f g h 6012| 1.0|
# | 3| hadoop mapreduce| 0.0|
# +---+--------------------+-----+
Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
Model fitting
model = pipeline.fit(training)
Construct test documents (data), which are unlabeled (id, text) tuples
test = spark.createDataFrame([
(4, "spark i j 6012"),
(5, "l m n"),
(6, "spark 6012 spark"),
(7, "apache hadoop")
], ["id", "text"])
test.show()
# +---+------------------+
# | id| text|
# +---+------------------+
# | 4| spark i j 6012|
# | 5| l m n|
# | 6|spark 6012 spark|
# | 7| apache hadoop|
# +---+------------------+
Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
prediction.show()
# +---+----------------+--------------------+--------------------+--------------------+--------------------+----------+
# | id| text| words| features| rawPrediction| probability|prediction|
# +---+----------------+--------------------+--------------------+--------------------+--------------------+----------+
# | 4| spark i j 6012| [spark, i, j, 6012]|(262144,[19036,11...|[-1.0173918675250...|[0.26553574436761...| 1.0|
# | 5| l m n| [l, m, n]|(262144,[1303,526...|[4.76763852580441...|[0.99157121824630...| 0.0|
# | 6|spark 6012 spark|[spark, 6012, spark]|(262144,[111139,1...|[-3.9099070641898...|[0.01964856004327...| 1.0|
# | 7| apache hadoop| [apache, hadoop]|(262144,[68303,19...|[5.80789088699039...|[0.99700523688305...| 0.0|
# +---+----------------+--------------------+--------------------+--------------------+--------------------+----------+
selected = prediction.select("id", "text", "probability", "prediction")
selected.show()
# +---+----------------+--------------------+----------+
# | id| text| probability|prediction|
# +---+----------------+--------------------+----------+
# | 4| spark i j 6012|[0.26553574436761...| 1.0|
# | 5| l m n|[0.99157121824630...| 0.0|
# | 6|spark 6012 spark|[0.01964856004327...| 1.0|
# | 7| apache hadoop|[0.99700523688305...| 0.0|
# +---+----------------+--------------------+----------+
for row in selected.collect():
rid, text, prob, prediction = row
print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))
# (4, spark i j 6012) --> prob=[0.2655357443676159,0.7344642556323842], prediction=1.000000
# (5, l m n) --> prob=[0.9915712182463081,0.008428781753691883], prediction=0.000000
# (6, spark 6012 spark) --> prob=[0.019648560043272496,0.9803514399567275], prediction=1.000000
# (7, apache hadoop) --> prob=[0.9970052368830581,0.002994763116941912], prediction=0.000000
Note: A reference solution will be provided in Blackboard for this part by the following Thursday.
Starting from this lab, you need to use as many DataFrame functions as possible.
-
On HPC, download the description of the NASA access log data to the
Data
directory viawget ftp://ita.ee.lbl.gov/html/contrib/NASA-HTTP.html
Load the Aug95 NASA access log data in Lab 1 and create a DataFrame with FIVE columns by specifying the schema according to the description in the downloaded html file. Use this DataFrame for the following questions.
-
Find out the number of unique hosts in total (i.e. in August 1995)?
-
Find out the most frequent visitor, i.e. the host with the largest number of visits.
- Add regularisation to the linear regression for advertising example and evaluate the prediction performance against the performance without any regularisation. Study at least three different regularisation settings.
- Construct another test dataset for the machine learning pipeline for document classification example with three test document samples:
"pyspark hadoop"
;"spark a b c"
;"mapreduce spark"
and report the prediction probabilities and the predicted labels for these three sample.
Note: NO solutions will be provided for this part.
- Change the number of partitions to a range of values (e.g. 2, 4, 8, 16, ...) and study the time cost for each value (e.g. by plotting the time cost against the number of partitions).
- Change the number of samples to study the variation in precision and time cost.
- Find out the mean and standard deviation of the reply byte size.
- Other questions in Lab 1 Task 6.
- Explore more CSV data of your interest via Google or at Sample CSV data, including insurance, real estate, and sales transactions.
- Explore the UCI Machine Learning Repository to build machine learning pipelines in PySpark for some datasets of your interest.