Skip to content

Latest commit

 

History

History
158 lines (143 loc) · 4.21 KB

File metadata and controls

158 lines (143 loc) · 4.21 KB
  • Directory: 03_pinot-kafka-flink

  • Objective: Enhance the data pipeline by introducing Apache Flink SQL for complex event processing before ingesting the data into Apache Pinot through Apache Kafka.

  • Setup:

    • Ensure that Docker Compose is running with the necessary services, including Apache Kafka, Apache Flink, and Apache Pinot.

    • Navigate to the 03_pinot-kafka-flink directory where the necessary files and scripts are located.

  • Tasks:

Step 1: Start Services Using Make

  • Description: Initialize all necessary services by running the make command. Ensure that the validation phase is successful indicating all services are correctly configured and running.

  • Action:

    make
  • Description: Start the Apache Flink SQL client to manage streaming jobs for data processing.

  • Action:

    docker-compose run sql-client
  • Description: Use the Flink SQL client to create tables for movies and ratings, perform data transformations, and run example queries which you will update with actual SQL later.

  • Actions:

    • Create the movies table:

      CREATE TABLE Movies
      (
          movieId             INT,
          title               STRING,
          releaseYear         INT,
          country             STRING,
          rating DOUBLE,
          genres              ARRAY<STRING>,
          actors              ARRAY<STRING>,
          directors           ARRAY<STRING>,
          composers           ARRAY<STRING>,
          screenwriters       ARRAY<STRING>,
          productionCompanies ARRAY<STRING>,
          cinematographer     STRING
      ) WITH (
            'connector' = 'kafka',
            'topic' = 'movies',
            'properties.bootstrap.servers' = 'kafka:9092',
            'scan.startup.mode' = 'earliest-offset',
            'format' = 'json'
            );
    • Create the ratings table:

      CREATE TABLE MovieRatings
      (
          movieId          INT,
          rating DOUBLE,
          ratingTimeMillis BIGINT,                            -- Read the epoch milliseconds as BIGINT
          ratingTime AS TO_TIMESTAMP_LTZ(ratingTimeMillis, 3) -- Convert to TIMESTAMP_LTZ
      ) WITH (
            'connector' = 'kafka',
            'topic' = 'movie_ratings',
            'properties.bootstrap.servers' = 'kafka:9092',
            'scan.startup.mode' = 'latest-offset',
            'format' = 'json'
            );
    • Execute a few example transformations and analytical queries:

      SELECT *
      FROM Movies
      WHERE LOWER(title) LIKE '%lethal weapon%';
      
      -- or
      SELECT movieId, AVG(rating) as avgRating
      FROM MovieRatings
      GROUP BY movieId;
      
      -- or
      SELECT movieId, rating, ratingTime
      FROM MovieRatings LIMIT 10;
    • Perform a join between the movies and ratings tables and run test queries:

      CREATE TABLE RatedMoviesSink    -- (1)
      (
          movieId     INT,
          title       STRING,
          releaseYear INT,
          actors      ARRAY<STRING>,
          rating DOUBLE,
          ratingTime  TIMESTAMP(3),
          PRIMARY KEY (movieId) NOT ENFORCED -- Declare the PRIMARY KEY constraint
      ) WITH (
            'connector' = 'upsert-kafka', -- This enables updates and deletes
            'topic' = 'rated_movies',
            'properties.bootstrap.servers' = 'kafka:9092',
            'key.format' = 'json', -- Key format is JSON, matching the value format
            'value.format' = 'json' -- Values are serialized in JSON
            );
      
      INSERT INTO RatedMoviesSink     -- (2)
      SELECT m.movieId,
             m.title,
             m.releaseYear,
             m.actors,
             r.rating,
             r.ratingTime
      FROM MovieRatings r
               JOIN
           Movies m
           ON r.movieId = m.movieId;
      1. Defining a Kafka Sink

      2. Writing result of joins to Kafka sink

Step 4: Query Processed Data in Pinot

  • Description: After processing the data with Apache Flink, go to the Apache Pinot query console to query your rated and processed data.

  • Action:

    # Open your web browser and navigate to the Apache Pinot UI to execute queries
    http://localhost:9000/#/query

Clean Up

  • To stop and remove all services related to this part of the workshop, execute:

    make destroy

Troubleshooting

  • If encountering any issues during the setup or execution, check the logs of each service:

    docker logs <service_name>