Skip to content

Commit

Permalink
added spark_processing and sending data to cassandra
Browse files Browse the repository at this point in the history
  • Loading branch information
ansharfz committed Jun 16, 2024
1 parent 590f681 commit 13fae3d
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 48 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
.gitignore
random-name-api-venv/
dags/__pycache__
logs/
plugins/
config/
dags/__pycache__/kafka_stream.cpython-38.pyc
spark/
cassandra/
10 changes: 6 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
FROM apache/airflow:2.9.0-python3.11
COPY requirements.txt /

USER root
RUN sudo apt-get update && sudo apt-get install -y gcc libpq-dev
RUN sudo apt-get update && sudo apt-get install -y gcc libpq-dev \
openjdk-17-jre-headless

USER airflow
ADD requirements.txt .
RUN pip install -r requirements.txt

ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
RUN pip3 install "apache-airflow==2.9.0" apache-airflow-providers-apache-spark==4.7.1
RUN pip3 install --no-cache-dir -r /requirements.txt



Expand Down
7 changes: 0 additions & 7 deletions dags/dags.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.apache.spark.operators \
.spark_submit import SparkSubmitOperator

from kafka_stream import stream_data
from datetime import datetime
Expand All @@ -22,9 +20,4 @@
python_callable=stream_data,
dag=dag)

processing_task = SparkSubmitOperator(
task_id='process_data',
application='/opt/airflow/dags/spark_processing.py',
dag=dag)

streaming_task
20 changes: 7 additions & 13 deletions dags/kafka_stream.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,21 @@
from kafka import KafkaProducer, KafkaConsumer
from kafka import KafkaProducer

import json
import requests


def create_kafka_producer():

producer = KafkaProducer(bootstrap_servers=['kafka:29092'],
producer = KafkaProducer(bootstrap_servers=['kafka:9092'],
value_serializer=lambda x: json.dumps(x)
.encode('utf-8'))
return producer


def create_kafka_consumer(topic):
consumer = KafkaConsumer(topic, bootstrap_servers=['kafka:29092'],
auto_offset_reset='earliest',
value_deserializer=lambda x: json
.loads(x.decode('utf-8')))
return consumer


def get_data():

res = requests.get('https://randomuser.me/api')
res = res.json()
res = res['res'][0]
res = res['results'][0]

return res

Expand All @@ -39,7 +30,7 @@ def format_data(res):
{res['location']['street']['name']}"
data["city"] = res['location']['city']
data["country"] = res['location']['country']
data["postcode"] = int(res['location']['postcode'])
data["postcode"] = res['location']['postcode']
data["latitude"] = float(res['location']['coordinates']['latitude'])
data["longitude"] = float(res['location']['coordinates']['longitude'])
data["email"] = res["email"]
Expand All @@ -55,3 +46,6 @@ def stream_data():
data = format_data(res)
producer.send(topic, value=data)
producer.flush()


stream_data()
32 changes: 16 additions & 16 deletions dags/spark_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@
def create_spark_session():
try:
spark = SparkSession \
.builder \
.appName('SparkStructuredStreaming') \
.config("spark.jars.packages",
"com.datastax.spark:spark-cassandra-connector_2.12:3.4.0, \
org.apache.spark:spark-sql-kafka-0-10_2.13:3.4.2") \
.config("spark.cassandra.connection.auth.username", "cassandra") \
.config("spark.cassandra.connection.auth.password", "cassandra") \
.config("spark.cassandra.connection.port", 9042) \
.getOrCreate()
.builder \
.appName('SparkStructuredStreaming') \
.config('spark.cassandra.connection.host', 'cassandra') \
.config("spark.cassandra.connection.port", 9042) \
.config("spark.cassandra.auth.username", "cassandra") \
.config("spark.cassandra.auth.password", "cassandra") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
logging.info('Spark session created successfully')
except Exception:
Expand All @@ -36,7 +34,7 @@ def create_initial_dataframe(spark_session):
df = spark_session \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:29092") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "user_data") \
.option("delimeter", ",") \
.option("startingOffsets", "earliest") \
Expand All @@ -50,9 +48,6 @@ def create_initial_dataframe(spark_session):


def create_final_dataframe(df, spark_session):
"""
Modifies the initial dataframe, and creates the final dataframe.
"""
schema = StructType([
StructField("full_name", StringType(), False),
StructField("gender", StringType(), False),
Expand All @@ -68,15 +63,20 @@ def create_final_dataframe(df, spark_session):
df = df.selectExpr("CAST(value AS STRING)") \
.select(from_json(col("value"), schema).alias("data")) \
.select("data.*")
print(df)
return df


def start_streaming(df):
logging.info("Streaming started")
my_query = (df.writeStream
.format("com.apache.spark.sql.cassandra")
.format("org.apache.spark.sql.cassandra")
.outputMode("append")
.options(table="random_names", keyspace="spark_streaming"))
.option("checkpointLocation", "/opt/spark/checkpoint")
.options(table="random_names", keyspace="spark_streaming")
.option("kafka.request.timeout.ms", "30000")
.option("kafka.retry.backoff.ms", "500")
.option("kafka.session.timeout.ms", "60000")
.start())

return my_query.awaitTermination()

Expand Down
17 changes: 11 additions & 6 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ services:
- airflow-network
ports:
- "2181:2181"
volumes:
- zookeeper-data:/data
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
Expand All @@ -268,18 +270,20 @@ services:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092, PLAINTEXT_HOST://0.0.0.0:29092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

spark:
image: apache/spark:3.4.2-python3
container_name: spark_master
container_name: spark
ports:
- 8085:8080
- 8085:8085
working_dir: /opt/spark
command: /opt/spark/sbin/start-master.sh
command: >
/opt/spark/sbin/start-master.sh
environment:
SPARK_UI_PORT: 8080
SPARK_UI_PORT: 8085
SPARK_MODE: master
SPARK_RPC_AUTHENTICATION_ENABLED: no
SPARK_RPC_ENCRYPTION_ENABLED: no
Expand Down Expand Up @@ -316,7 +320,8 @@ volumes:
name: cassandra-data
spark-data:
name: spark-data

zookeeper-data:
name: zookeeper-data
networks:
airflow-network:
name: airflow-network
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
psycopg2
kafka-python
apache-airflow==2.9.0
pyspark==3.4.2

0 comments on commit 13fae3d

Please sign in to comment.