-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPipeline
33 lines (21 loc) · 1.07 KB
/
Pipeline
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, RegexTokenizer, HashingTF, IDF
from pyspark.ml import Pipeline
# Initialize a Spark session
spark = SparkSession.builder.appName("BookGenreClassification").getOrCreate()
# Load data
data = spark.read.csv("dbfs:/FileStore/shared_uploads/book32_listing-1.csv", header=True, inferSchema=True)
# Filter out rows where the TITLE is null to prevent errors in processing
data = data.filter(data.TITLE.isNotNull())
# Tokenization and Normalization using the correct TITLE column
tokenizer = RegexTokenizer(inputCol="TITLE", outputCol="words", pattern="\\W", toLowercase=True)
# Feature Extraction
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=1000)
idf = IDF(inputCol="rawFeatures", outputCol="features")
# Building a pipeline
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf])
# Fit the pipeline to training documents
model = pipeline.fit(data)
# Transform the data
transformed_data = model.transform(data)
# Now you can proceed with model training using the transformed_data