-
Notifications
You must be signed in to change notification settings - Fork 0
/
code.py
284 lines (252 loc) · 8.78 KB
/
code.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
#!/usr/bin/env python
# coding: utf-8
#INSTALLING PYSPARK
# Install pyspark
get_ipython().system('pip install pyspark')
# Import SparkSession
from pyspark.sql import SparkSession
# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Check Spark Session Information
spark
# Import a Spark function from library
from pyspark.sql.functions import col,lit
from pyspark.sql.types import StructType, StructField, StringType,IntegerType
#LOAD OUR DATASET
import pandas as pd
df=pd.read_csv('Review.csv')
df
#IMPORTING LIBRARIES
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()
#df = spark.createDataFrame(pd_df1)
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark import HiveContext
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
import re
import string
import nltk
from nltk.stem.wordnet import WordNetLemmatizer
from nltk.corpus import stopwords
from nltk import pos_tag
import os
import sys
#set the path
get_ipython().run_line_magic('matplotlib', 'inline')
import pandas as pd
import numpy as np
import nltk
import string
import matplotlib.pyplot as plt
import numpy as np
import string
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics import confusion_matrix
from sklearn import metrics
from sklearn.metrics import roc_curve, auc
from nltk.stem.porter import PorterStemmer
from nltk.corpus import stopwords
from nltk import word_tokenize
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark import SQLContext
from pyspark import HiveContext
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.classification import LogisticRegression, NaiveBayes, GBTClassifier
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel,LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.feature import NGram
import matplotlib
matplotlib.style.use('ggplot')
#CREATING SPARK OBJECT
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
sqlContext=HiveContext(sc)
#PREPARING DATA
df1 =sqlContext.read.format('csv')\
.options(header='true',inferschema='true')\
.load(r"Review.csv")
df=df1
df1.count()
# Text Pre-processing
#a. Create UDF Functions for text processing: Convert to lower case, Remove Punctuations and alphanumeric words, Remove Stop words
#b. POS tagging
#c. Text Lemmatization
#TEXT Pre-processing
##COnvert to lower
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def lower(text):
return text.lower()
lower_udf =udf(lower,StringType())
##Remove nonAscii
def strip_non_ascii(data_str):
#''' Returns the string without non ASCII characters'''
stripped = (c for c in data_str if 0 < ord(c) < 127)
return ''.join(stripped)
# setup pyspark udf function
strip_non_ascii_udf = udf(strip_non_ascii, StringType())
##FIx abbreviations
def fix_abbreviation(data_str):
data_str = data_str.lower()
data_str = re.sub(r'\bthats\b', 'that is', data_str)
data_str = re.sub(r'\bive\b', 'i have', data_str)
data_str = re.sub(r'\bim\b', 'i am', data_str)
data_str = re.sub(r'\bya\b', 'yeah', data_str)
data_str = re.sub(r'\bcant\b', 'can not', data_str)
data_str = re.sub(r'\bdont\b', 'do not', data_str)
data_str = re.sub(r'\bwont\b', 'will not', data_str)
data_str = re.sub(r'\bid\b', 'i would', data_str)
data_str = re.sub(r'wtf', 'what the fuck', data_str)
data_str = re.sub(r'\bwth\b', 'what the hell', data_str)
data_str = re.sub(r'\br\b', 'are', data_str)
data_str = re.sub(r'\bu\b', 'you', data_str)
data_str = re.sub(r'\bk\b', 'OK', data_str)
data_str = re.sub(r'\bsux\b', 'sucks', data_str)
data_str = re.sub(r'\bno+\b', 'no', data_str)
data_str = re.sub(r'\bcoo+\b', 'cool', data_str)
data_str = re.sub(r'rt\b', '', data_str)
data_str = data_str.strip()
return data_str
##Remove punctuations mentions and alphanumeric characters
def remove_features(data_str):
# compile regex
url_re = re.compile('https?://(www.)?\w+\.\w+(/\w+)*/?')
punc_re = re.compile('[%s]' % re.escape(string.punctuation))
num_re = re.compile('(\\d+)')
mention_re = re.compile('@(\w+)')
alpha_num_re = re.compile("^[a-z0-9_.]+$")
# convert to lowercase
data_str = data_str.lower()
# remove hyperlinks
data_str = url_re.sub(' ', data_str)
# remove @mentions
data_str = mention_re.sub(' ', data_str)
# remove puncuation
data_str = punc_re.sub(' ', data_str)
# remove numeric 'words'
data_str = num_re.sub(' ', data_str)
# remove non a-z 0-9 characters and words shorter than 1 characters
list_pos = 0
cleaned_str = ''
for word in data_str.split():
if list_pos == 0:
if alpha_num_re.match(word):
cleaned_str = word
else:
cleaned_str = ' '
else:
if alpha_num_re.match(word):
cleaned_str = cleaned_str + ' ' + word
else:
cleaned_str += ' '
list_pos += 1
# remove unwanted space, *.split() will automatically split on
# whitespace and discard duplicates, the " ".join() joins the
# resulting list into one string.
return " ".join(cleaned_str.split())
# setup pyspark udf function
##Remove stop words
def remove_stops(data_str):
# expects a string
stops = set(stopwords.words("english"))
list_pos = 0
cleaned_str = ''
text = data_str.split()
for word in text:
if word not in stops:
# rebuild cleaned_str
if list_pos == 0:
cleaned_str = word
else:
cleaned_str = cleaned_str + ' ' + word
list_pos += 1
return cleaned_str
# Part-of-Speech Tagging
def tag_and_remove(data_str):
cleaned_str = ' '
# noun tags
nn_tags = ['NN', 'NNP', 'NNP', 'NNPS', 'NNS']
# adjectives
jj_tags = ['JJ', 'JJR', 'JJS']
# verbs
vb_tags = ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ']
nltk_tags = nn_tags + jj_tags + vb_tags
# break string into 'words'
text = data_str.split()
# tag the text and keep only those with the right tags
tagged_text = pos_tag(text)
for tagged_word in tagged_text:
if tagged_word[1] in nltk_tags:
cleaned_str += tagged_word[0] + ' '
return cleaned_str
##Lemmatization
def lemmatize(data_str):
# expects a string
list_pos = 0
cleaned_str = ''
lmtzr = WordNetLemmatizer()
text = data_str.split()
tagged_words = pos_tag(text)
for word in tagged_words:
if 'v' in word[1].lower():
lemma = lmtzr.lemmatize(word[0], pos='v')
else:
lemma = lmtzr.lemmatize(word[0], pos='n')
if list_pos == 0:
cleaned_str = lemma
else:
cleaned_str = cleaned_str + ' ' + lemma
list_pos += 1
return cleaned_str
#dropping the null values
df1 = df1.dropna()
display(df1)
df1 = df1.dropna(
subset=['listing_id', 'id', 'date', 'reviewer_id','reviewer_name','comments']
)
display(df1)
df1.show()
#udf functions
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
lower_udf =udf(lower,StringType())
strip_non_ascii_udf = udf(strip_non_ascii, StringType())
fix_abbreviation_udf = udf(fix_abbreviation, StringType())
remove_features_udf = udf(remove_features, StringType())
remove_stops_udf = udf(remove_stops, StringType())
tag_and_remove_udf = udf(tag_and_remove, StringType())
lemmatize_udf = udf(lemmatize, StringType())
#sentiment Analysis function to predict the sentiment score
from pyspark.sql.types import FloatType
from textblob import TextBlob
from textblob import TextBlob
def sentiment_score(comments):
return TextBlob(comments).sentiment.polarity
from pyspark.sql.types import FloatType
sentiment_score_udf = udf(lambda x: sentiment_score(x), FloatType())
df1 = df1.select('listing_id','id','date','reviewer_id','reviewer_name','comments',
sentiment_score_udf('comments').alias('sentiment_score'))
df1.show()
#scatter plot
from matplotlib import *
import matplotlib.pyplot as plt
get_ipython().run_line_magic('matplotlib', 'inline')
Df2 = df1.toPandas()
Df2[1:50].plot(x='date', y ='sentiment_score')