Objective: Enhance the data pipeline by introducing Apache Flink SQL for complex event processing before ingesting the data into Apache Pinot through Apache Kafka.
Ensure that Docker Compose is running with the necessary services, including Apache Kafka, Apache Flink, and Apache Pinot.
Navigate to the
directory where the necessary files and scripts are located.
Description: Initialize all necessary services by running the
command. Ensure that the validation phase is successful indicating all services are correctly configured and running. -
Description: Start the Apache Flink SQL client to manage streaming jobs for data processing.
docker-compose run sql-client
Description: Use the Flink SQL client to create tables for movies and ratings, perform data transformations, and run example queries which you will update with actual SQL later.
Create the movies table:
CREATE TABLE Movies ( movieId INT, title STRING, releaseYear INT, country STRING, rating DOUBLE, genres ARRAY<STRING>, actors ARRAY<STRING>, directors ARRAY<STRING>, composers ARRAY<STRING>, screenwriters ARRAY<STRING>, productionCompanies ARRAY<STRING>, cinematographer STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'movies', 'properties.bootstrap.servers' = 'kafka:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' );
Create the ratings table:
CREATE TABLE MovieRatings ( movieId INT, rating DOUBLE, ratingTimeMillis BIGINT, -- Read the epoch milliseconds as BIGINT ratingTime AS TO_TIMESTAMP_LTZ(ratingTimeMillis, 3) -- Convert to TIMESTAMP_LTZ ) WITH ( 'connector' = 'kafka', 'topic' = 'movie_ratings', 'properties.bootstrap.servers' = 'kafka:9092', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' );
Execute a few example transformations and analytical queries:
SELECT * FROM Movies WHERE LOWER(title) LIKE '%lethal weapon%'; -- or SELECT movieId, AVG(rating) as avgRating FROM MovieRatings GROUP BY movieId; -- or SELECT movieId, rating, ratingTime FROM MovieRatings LIMIT 10;
Perform a join between the movies and ratings tables and run test queries:
CREATE TABLE RatedMoviesSink -- (1) ( movieId INT, title STRING, releaseYear INT, actors ARRAY<STRING>, rating DOUBLE, ratingTime TIMESTAMP(3), PRIMARY KEY (movieId) NOT ENFORCED -- Declare the PRIMARY KEY constraint ) WITH ( 'connector' = 'upsert-kafka', -- This enables updates and deletes 'topic' = 'rated_movies', 'properties.bootstrap.servers' = 'kafka:9092', 'key.format' = 'json', -- Key format is JSON, matching the value format 'value.format' = 'json' -- Values are serialized in JSON ); INSERT INTO RatedMoviesSink -- (2) SELECT m.movieId, m.title, m.releaseYear, m.actors, r.rating, r.ratingTime FROM MovieRatings r JOIN Movies m ON r.movieId = m.movieId;
Defining a Kafka Sink
Writing result of joins to Kafka sink
Description: After processing the data with Apache Flink, go to the Apache Pinot query console to query your rated and processed data.
# Open your web browser and navigate to the Apache Pinot UI to execute queries http://localhost:9000/#/query
To stop and remove all services related to this part of the workshop, execute:
make destroy