diff --git a/.gitignore b/.gitignore index f21f848..e6928ca 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,8 @@ .gitignore random-name-api-venv/ +dags/__pycache__ logs/ +plugins/ config/ -dags/__pycache__/kafka_stream.cpython-38.pyc +spark/ +cassandra/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 1083ee2..81ac855 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,12 +1,14 @@ FROM apache/airflow:2.9.0-python3.11 +COPY requirements.txt / USER root -RUN sudo apt-get update && sudo apt-get install -y gcc libpq-dev +RUN sudo apt-get update && sudo apt-get install -y gcc libpq-dev \ +openjdk-17-jre-headless USER airflow -ADD requirements.txt . -RUN pip install -r requirements.txt - +ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 +RUN pip3 install "apache-airflow==2.9.0" apache-airflow-providers-apache-spark==4.7.1 +RUN pip3 install --no-cache-dir -r /requirements.txt diff --git a/dags/dags.py b/dags/dags.py index 5948d07..2aa651e 100644 --- a/dags/dags.py +++ b/dags/dags.py @@ -1,7 +1,5 @@ from airflow import DAG from airflow.operators.python_operator import PythonOperator -from airflow.providers.apache.spark.operators \ - .spark_submit import SparkSubmitOperator from kafka_stream import stream_data from datetime import datetime @@ -22,9 +20,4 @@ python_callable=stream_data, dag=dag) - processing_task = SparkSubmitOperator( - task_id='process_data', - application='/opt/airflow/dags/spark_processing.py', - dag=dag) - streaming_task diff --git a/dags/kafka_stream.py b/dags/kafka_stream.py index 530f01e..58c7302 100644 --- a/dags/kafka_stream.py +++ b/dags/kafka_stream.py @@ -1,30 +1,21 @@ -from kafka import KafkaProducer, KafkaConsumer +from kafka import KafkaProducer import json import requests def create_kafka_producer(): - - producer = KafkaProducer(bootstrap_servers=['kafka:29092'], + producer = KafkaProducer(bootstrap_servers=['kafka:9092'], value_serializer=lambda x: json.dumps(x) .encode('utf-8')) return producer -def create_kafka_consumer(topic): - consumer = KafkaConsumer(topic, bootstrap_servers=['kafka:29092'], - auto_offset_reset='earliest', - value_deserializer=lambda x: json - .loads(x.decode('utf-8'))) - return consumer - - def get_data(): res = requests.get('https://randomuser.me/api') res = res.json() - res = res['res'][0] + res = res['results'][0] return res @@ -39,7 +30,7 @@ def format_data(res): {res['location']['street']['name']}" data["city"] = res['location']['city'] data["country"] = res['location']['country'] - data["postcode"] = int(res['location']['postcode']) + data["postcode"] = res['location']['postcode'] data["latitude"] = float(res['location']['coordinates']['latitude']) data["longitude"] = float(res['location']['coordinates']['longitude']) data["email"] = res["email"] @@ -55,3 +46,6 @@ def stream_data(): data = format_data(res) producer.send(topic, value=data) producer.flush() + + +stream_data() diff --git a/dags/spark_processing.py b/dags/spark_processing.py index b3cc0ef..5ed7d83 100644 --- a/dags/spark_processing.py +++ b/dags/spark_processing.py @@ -13,15 +13,13 @@ def create_spark_session(): try: spark = SparkSession \ - .builder \ - .appName('SparkStructuredStreaming') \ - .config("spark.jars.packages", - "com.datastax.spark:spark-cassandra-connector_2.12:3.4.0, \ - org.apache.spark:spark-sql-kafka-0-10_2.13:3.4.2") \ - .config("spark.cassandra.connection.auth.username", "cassandra") \ - .config("spark.cassandra.connection.auth.password", "cassandra") \ - .config("spark.cassandra.connection.port", 9042) \ - .getOrCreate() + .builder \ + .appName('SparkStructuredStreaming') \ + .config('spark.cassandra.connection.host', 'cassandra') \ + .config("spark.cassandra.connection.port", 9042) \ + .config("spark.cassandra.auth.username", "cassandra") \ + .config("spark.cassandra.auth.password", "cassandra") \ + .getOrCreate() spark.sparkContext.setLogLevel("ERROR") logging.info('Spark session created successfully') except Exception: @@ -36,7 +34,7 @@ def create_initial_dataframe(spark_session): df = spark_session \ .readStream \ .format("kafka") \ - .option("kafka.bootstrap.servers", "kafka:29092") \ + .option("kafka.bootstrap.servers", "kafka:9092") \ .option("subscribe", "user_data") \ .option("delimeter", ",") \ .option("startingOffsets", "earliest") \ @@ -50,9 +48,6 @@ def create_initial_dataframe(spark_session): def create_final_dataframe(df, spark_session): - """ - Modifies the initial dataframe, and creates the final dataframe. - """ schema = StructType([ StructField("full_name", StringType(), False), StructField("gender", StringType(), False), @@ -68,15 +63,20 @@ def create_final_dataframe(df, spark_session): df = df.selectExpr("CAST(value AS STRING)") \ .select(from_json(col("value"), schema).alias("data")) \ .select("data.*") - print(df) + return df def start_streaming(df): logging.info("Streaming started") my_query = (df.writeStream - .format("com.apache.spark.sql.cassandra") + .format("org.apache.spark.sql.cassandra") .outputMode("append") - .options(table="random_names", keyspace="spark_streaming")) + .option("checkpointLocation", "/opt/spark/checkpoint") + .options(table="random_names", keyspace="spark_streaming") + .option("kafka.request.timeout.ms", "30000") + .option("kafka.retry.backoff.ms", "500") + .option("kafka.session.timeout.ms", "60000") + .start()) return my_query.awaitTermination() diff --git a/docker-compose.yaml b/docker-compose.yaml index 23a32ca..42be34a 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -250,6 +250,8 @@ services: - airflow-network ports: - "2181:2181" + volumes: + - zookeeper-data:/data environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 @@ -268,18 +270,20 @@ services: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092, PLAINTEXT_HOST://0.0.0.0:29092 KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT spark: image: apache/spark:3.4.2-python3 - container_name: spark_master + container_name: spark ports: - - 8085:8080 + - 8085:8085 working_dir: /opt/spark - command: /opt/spark/sbin/start-master.sh + command: > + /opt/spark/sbin/start-master.sh environment: - SPARK_UI_PORT: 8080 + SPARK_UI_PORT: 8085 SPARK_MODE: master SPARK_RPC_AUTHENTICATION_ENABLED: no SPARK_RPC_ENCRYPTION_ENABLED: no @@ -316,7 +320,8 @@ volumes: name: cassandra-data spark-data: name: spark-data - + zookeeper-data: + name: zookeeper-data networks: airflow-network: name: airflow-network diff --git a/requirements.txt b/requirements.txt index 8309967..5cc0273 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ psycopg2 kafka-python -apache-airflow==2.9.0 +pyspark==3.4.2