-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsparkpykafkajoin.py
214 lines (190 loc) · 9.46 KB
/
sparkpykafkajoin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, to_json, col, unbase64, base64, split, expr
from pyspark.sql.types import StructField, StructType, StringType, BooleanType, ArrayType, DateType, FloatType
# create a StructType for the Kafka redis-server topic which has all changes made to Redis - before Spark 3.0.0, schema inference is not automatic
kafkaRedisSchema = StructType (
[
StructField("key", StringType()),
StructField("existType", StringType()),
StructField("Ch", BooleanType()),
StructField("Incr", BooleanType()),
StructField("zSetEntries", ArrayType(
StructType([
StructField("element", StringType()),
StructField("Score", StringType())
]))
)
]
)
# create a StructType for the Customer JSON that comes from Redis- before Spark 3.0.0, schema inference is not automatic
kafkaJSONSchema = StructType (
[
StructField("customer", StringType()),
StructField("score", StringType()),
StructField("email", StringType()),
StructField("birthYear", StringType())
]
)
# create a StructType for the Kafka stedi-events topic which has the Customer Risk JSON that comes from Redis- before Spark 3.0.0, schema inference is not automatic
kafkaEventschema = StructType (
[
StructField("customer", StringType()),
StructField("score", FloatType()),
StructField("riskDate", DateType())
]
)
# create a spark application object
spark = SparkSession.builder.appName("STEDI").getOrCreate()
# set the spark log level to WARN
spark.sparkContext.setLogLevel('WARN')
# using the spark application object, read a streaming dataframe from the Kafka topic redis-server as the source
# Be sure to specify the option that reads all the events from the topic including those that were published before you started the spark stream
kafkaRedisDF = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe","redis-server")\
.option("startingOffsets","earliest")\
.load()
# cast the value column in the streaming dataframe as a STRING
kafkaRedisDF = kafkaRedisDF.selectExpr("CAST(value AS string) value")
# TO-DO:; parse the single column "value" with a json object in it, like this:
# +------------+
# | value |
# +------------+
# |{"key":"Q3..|
# +------------+
#
# with this JSON format: {"key":"Q3VzdG9tZXI=",
# "existType":"NONE",
# "Ch":false,
# "Incr":false,
# "zSetEntries":[{
# "element":"eyJjdXN0b21lck5hbWUiOiJTYW0gVGVzdCIsImVtYWlsIjoic2FtLnRlc3RAdGVzdC5jb20iLCJwaG9uZSI6IjgwMTU1NTEyMTIiLCJiaXJ0aERheSI6IjIwMDEtMDEtMDMifQ==",
# "Score":0.0
# }],
# "zsetEntries":[{
# "element":"eyJjdXN0b21lck5hbWUiOiJTYW0gVGVzdCIsImVtYWlsIjoic2FtLnRlc3RAdGVzdC5jb20iLCJwaG9uZSI6IjgwMTU1NTEyMTIiLCJiaXJ0aERheSI6IjIwMDEtMDEtMDMifQ==",
# "score":0.0
# }]
# }
#
# (Note: The Redis Source for Kafka has redundant fields zSetEntries and zsetentries, only one should be parsed)
#
# and create separated fields like this:
# +------------+-----+-----------+------------+---------+-----+-----+-----------------+
# | key|value|expiredType|expiredValue|existType| ch| incr| zSetEntries|
# +------------+-----+-----------+------------+---------+-----+-----+-----------------+
# |U29ydGVkU2V0| null| null| null| NONE|false|false|[[dGVzdDI=, 0.0]]|
# +------------+-----+-----------+------------+---------+-----+-----+-----------------+
#
# storing them in a temporary view called RedisSortedSet
kafkaRedisDF.withColumn("value", from_json("value", kafkaRedisSchema))\
.select(col('value.existType'), col('value.Ch'),\
col('value.Incr'), col('value.zSetEntries'))\
.createOrReplaceTempView("RedisSortedSet")
# execute a sql statement against a temporary view, which statement takes the element field from the 0th element in the array of structs and create a column called encodedCustomer
# the reason we do it this way is that the syntax available select against a view is different than a dataframe, and it makes it easy to select the nth element of an array in a sql column
sql_statement = "SELECT zSetEntries[0].element AS encodedCustomer FROM RedisSortedSet"
zSetEntriesEncodedStreamingDF = spark.sql(sql_statement)
# take the encodedCustomer column which is base64 encoded at first like this:
# +--------------------+
# | customer|
# +--------------------+
# |[7B 22 73 74 61 7...|
# +--------------------+
# and convert it to clear json like this:
# +--------------------+
# | customer|
# +--------------------+
# |{"customerName":"...|
#+--------------------+
#
# with this JSON format: {"customerName":"Sam Test","email":"sam.test@test.com","phone":"8015551212","birthDay":"2001-01-03"}
zSetDecodedEntriesStreamingDF=zSetEntriesEncodedStreamingDF.withColumn("customer", unbase64(zSetEntriesEncodedStreamingDF.encodedCustomer).cast("string"))
# parse the JSON in the Customer record and store in a temporary view called CustomerRecords
customerSchema = StructType (
[
StructField("customerName", StringType()),
StructField("email", StringType()),
StructField("phone", StringType()),
StructField("birthDay", StringType()),
]
)
zSetDecodedEntriesStreamingDF.withColumn("customer", from_json("customer", customerSchema))\
.select(col('customer.*'))\
.createOrReplaceTempView("CustomerRecords")
# JSON parsing will set non-existent fields to null, so let's select just the fields we want, where they are not null as a new dataframe called emailAndBirthDayStreamingDF
sql_statement = """
SELECT *
FROM CustomerRecords
WHERE email IS NOT NULL
AND birthDay IS NOT NULL
"""
emailAndBirthDayStreamingDF = spark.sql(sql_statement)
# Split the birth year as a separate field from the birthday
emailAndBirthDayStreamingDF = emailAndBirthDayStreamingDF.withColumn('birthYear', split(emailAndBirthDayStreamingDF.birthDay,"-").getItem(0))
# Select only the birth year and email fields as a new streaming data frame called emailAndBirthYearStreamingDF
emailAndBirthYearStreamingDF = emailAndBirthDayStreamingDF.select(col('email'), col('birthYear'))
# using the spark application object, read a streaming dataframe from the Kafka topic stedi-events as the source
# Be sure to specify the option that reads all the events from the topic including those that were published before you started the spark stream
kafkaEventsDF = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe","stedi-events")\
.option("startingOffsets","earliest")\
.load()
# cast the value column in the streaming dataframe as a STRING
kafkaEventsDF = kafkaEventsDF.selectExpr("CAST(value AS string) value")
# parse the JSON from the single column "value" with a json object in it, like this:
# +------------+
# | value |
# +------------+
# |{"custom"...|
# +------------+
#
# and create separated fields like this:
# +------------+-----+-----------+
# | customer|score| riskDate |
# +------------+-----+-----------+
# |"sam@tes"...| -1.4| 2020-09...|
# +------------+-----+-----------+
#
# storing them in a temporary view called CustomerRisk
kafkaEventsDF.withColumn("value", from_json("value", kafkaEventschema))\
.select(col('value.customer'), col('value.score'), col('value.riskDate'))\
.createOrReplaceTempView("CustomerRisk")
# execute a sql statement against a temporary view, selecting the customer and the score from the temporary view, creating a dataframe called customerRiskStreamingDF
sql_statement = "SELECT customer, score FROM CustomerRisk"
customerRiskStreamingDF = spark.sql(sql_statement)
# join the streaming dataframes on the email address to get the risk score and the birth year in the same dataframe
stediScoreStreamingDF = customerRiskStreamingDF.join(emailAndBirthYearStreamingDF, expr("customer = email"))
# sink the joined dataframes to a new kafka topic to send the data to the STEDI graph application
# +--------------------+-----+--------------------+---------+
# | customer|score| email|birthYear|
# +--------------------+-----+--------------------+---------+
# |Santosh.Phillips@...| -0.5|Santosh.Phillips@...| 1960|
# |Sean.Howard@test.com| -3.0|Sean.Howard@test.com| 1958|
# |Suresh.Clark@test...| -5.0|Suresh.Clark@test...| 1956|
# | Lyn.Davis@test.com| -4.0| Lyn.Davis@test.com| 1955|
# |Sarah.Lincoln@tes...| -2.0|Sarah.Lincoln@tes...| 1959|
# |Sarah.Clark@test.com| -4.0|Sarah.Clark@test.com| 1957|
# +--------------------+-----+--------------------+---------+
#
# In this JSON Format {"customer":"Santosh.Fibonnaci@test.com","score":"28.5","email":"Santosh.Fibonnaci@test.com","birthYear":"1963"}
query = stediScoreStreamingDF.selectExpr("TO_JSON(struct(*)) AS value").writeStream \
.outputMode('append') \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("FailOnDataLoss" , "false") \
.option("checkpointLocation", "checkpoint") \
.option("topic", "your_topic_name") \
.start();
query_console = stediScoreStreamingDF.selectExpr("TO_JSON(struct(*)) AS value").writeStream \
.outputMode('append') \
.format('console') \
.option('truncate' , False) \
.start() \
query.awaitTermination()
query_console.awaitTermination()