-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsparkpyeventskafkastreamtoconsole.py
64 lines (56 loc) · 2.48 KB
/
sparkpyeventskafkastreamtoconsole.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, unbase64, base64, split
from pyspark.sql.types import StructField, StructType, StringType, BooleanType, ArrayType, DateType, FloatType
# using the spark application object, read a streaming dataframe from the Kafka topic stedi-events as the source
# Be sure to specify the option that reads all the events from the topic including those that were published before you started the spark stream
spark = SparkSession.builder.appName("kafkaEvents").getOrCreate()
spark.sparkContext.setLogLevel('WARN')
kafkaEventsDF = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe","stedi-events")\
.option("startingOffsets","earliest")\
.load()
# cast the value column in the streaming dataframe as a STRING
kafkaEventsDF = kafkaEventsDF.selectExpr("CAST(value AS string) value")
# parse the JSON from the single column "value" with a json object in it, like this:
# +------------+
# | value |
# +------------+
# |{"custom"...|
# +------------+
#
# and create separated fields like this:
# +------------+-----+-----------+
# | customer|score| riskDate |
# +------------+-----+-----------+
# |"sam@tes"...| -1.4| 2020-09...|
# +------------+-----+-----------+
#
# storing them in a temporary view called CustomerRisk
kafkaEventschema = StructType (
[
StructField("customer", StringType()),
StructField("score", FloatType()),
StructField("riskDate", DateType())
]
)
kafkaEventsDF.withColumn("value", from_json("value", kafkaEventschema))\
.select(col('value.customer'), col('value.score'), col('value.riskDate'))\
.createOrReplaceTempView("CustomerRisk")
# execute a sql statement against a temporary view, selecting the customer and the score from the temporary view, creating a dataframe called customerRiskStreamingDF
customerRiskStreamingDF = spark.sql("SELECT customer, score FROM CustomerRisk")
# sink the customerRiskStreamingDF dataframe to the console in append mode
#
# It should output like this:
#
# +--------------------+-----
# |customer |score|
# +--------------------+-----+
# |Spencer.Davis@tes...| 8.0|
# +--------------------+-----
customerRiskStreamingDF.writeStream.outputMode("append").format("console").start().awaitTermination()
# Run the python script by running the command from the terminal:
# /home/workspace/submit-event-kafka-streaming.sh
# Verify the data looks correct