-
Directory:
03_pinot-kafka-flink
-
Objective: Enhance the data pipeline by introducing Apache Flink SQL for complex event processing before ingesting the data into Apache Pinot through Apache Kafka.
-
Setup:
-
Ensure that Docker Compose is running with the necessary services, including Apache Kafka, Apache Flink, and Apache Pinot.
-
Navigate to the
03_pinot-kafka-flink
directory where the necessary files and scripts are located.
-
-
Tasks:
-
Description: Initialize all necessary services by running the
make
command. Ensure that the validation phase is successful indicating all services are correctly configured and running. -
Action:
make
-
Description: Start the Apache Flink SQL client to manage streaming jobs for data processing.
-
Action:
docker-compose run sql-client
-
Description: Use the Flink SQL client to create tables for movies and ratings, perform data transformations, and run example queries which you will update with actual SQL later.
-
Actions:
-
Create the movies table:
CREATE TABLE Movies ( movieId INT, title STRING, releaseYear INT, country STRING, rating DOUBLE, genres ARRAY<STRING>, actors ARRAY<STRING>, directors ARRAY<STRING>, composers ARRAY<STRING>, screenwriters ARRAY<STRING>, productionCompanies ARRAY<STRING>, cinematographer STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'movies', 'properties.bootstrap.servers' = 'kafka:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' );
-
Create the ratings table:
CREATE TABLE MovieRatings ( movieId INT, rating DOUBLE, ratingTimeMillis BIGINT, -- Read the epoch milliseconds as BIGINT ratingTime AS TO_TIMESTAMP_LTZ(ratingTimeMillis, 3) -- Convert to TIMESTAMP_LTZ ) WITH ( 'connector' = 'kafka', 'topic' = 'movie_ratings', 'properties.bootstrap.servers' = 'kafka:9092', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' );
-
Execute a few example transformations and analytical queries:
SELECT * FROM Movies WHERE LOWER(title) LIKE '%lethal weapon%'; -- or SELECT movieId, AVG(rating) as avgRating FROM MovieRatings GROUP BY movieId; -- or SELECT movieId, rating, ratingTime FROM MovieRatings LIMIT 10;
-
Perform a join between the movies and ratings tables and run test queries:
CREATE TABLE RatedMoviesSink -- (1) ( movieId INT, title STRING, releaseYear INT, actors ARRAY<STRING>, rating DOUBLE, ratingTime TIMESTAMP(3), PRIMARY KEY (movieId) NOT ENFORCED -- Declare the PRIMARY KEY constraint ) WITH ( 'connector' = 'upsert-kafka', -- This enables updates and deletes 'topic' = 'rated_movies', 'properties.bootstrap.servers' = 'kafka:9092', 'key.format' = 'json', -- Key format is JSON, matching the value format 'value.format' = 'json' -- Values are serialized in JSON ); INSERT INTO RatedMoviesSink -- (2) SELECT m.movieId, m.title, m.releaseYear, m.actors, r.rating, r.ratingTime FROM MovieRatings r JOIN Movies m ON r.movieId = m.movieId;
-
Defining a Kafka Sink
-
Writing result of joins to Kafka sink
-
-
-
Description: After processing the data with Apache Flink, go to the Apache Pinot query console to query your rated and processed data.
-
Action:
# Open your web browser and navigate to the Apache Pinot UI to execute queries http://localhost:9000/#/query
-
To stop and remove all services related to this part of the workshop, execute:
make destroy