Skip to content

Calling APIs, Uploading the raw results in S3 Buckets, transforming them with pandas and then storing them in postgres database after quality checks. airflow, docker and Minio were used.

Notifications You must be signed in to change notification settings

KianoushAmirpour/End-to-End-Data-Pipeline-for-Reddit-and-News-APIs

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

25 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Reddit-News-Data-Pipeline

Pipeline

  • Pipeline

pipeline

  • Airflow dag

dags

Workflow

  • Data is collected from two APIs (reddit API and News API) for a specified subject.

    • Top, hot, and new posts are fetched for the specified subject.
    • NewsAPI will return the top headlines for the specified category.
  • Collected Data from APIs is stored in Landing zone bucket (S3 Compatible Object Storage).

    • for managing the cloud storage, AWS SDK for Python (Boto3) is used.
  • Data is read from landing zone bucket and data cleaning process will be performed on them using pandas library.

    • Special characters will be removed.
    • Unnecessary new lines will be removed.
    • Quotes will be standardized.
    • Useful keys in data will be extracted.
  • Transformed and cleaned data is sent to processed zone bucket.

  • Staging tables are created and the transformed data will be copied into them. these tasks are done by Postgresoperator of airflow.

  • Data quality checks are executed to make sure that end user will get the accurate data.

    • With the help of SQLColumnCheckOperator from Airflow, criteria for null values, minimum and maximum values, and distinct values will be checked.
    • With the help of SQLTableCheckOperator from Airflow, criteria for the number of rows in the table and the date range of a table will be checked.
  • if the data qulity checks pass, data will be stored in data warehouse tables.

Tools:

  • Cloud : Chabokan
  • Containerization : Docker, Docker Compose
  • Orchestration : Apache Airflow
  • Transformation : Pandas
  • Cloud Storage Buckets : Minio
  • Data Warehouse: PostgreSQL

How to setup

  1. Create an account on reddit and get the credentials from here.

  2. Get the API key from NewsAPI.

  3. Install docker.

  4. Based on template.env file, set up your cloud infrastructure. We used a FERNET_KEY for airflow configuration. You can use the below code:

    from cryptography.fernet import Fernet
    fernet_key = Fernet.generate_key()
    print(fernet_key.decode())  # your fernet_key, keep it in secured place!
    
  5. Run docker compose build. (you can also follow the instructions from here to run airflow with docker compose.)

  6. Run docker compose up airflow-init

  7. Run docker compose up

  8. Schedule your airflow dag or you can trigger it manually.

  9. Run docker compose down.

Chat with database

Utilizing Langchain and Hugging Face, we leveraged the Language Models' capability to interact with the database.

About

Calling APIs, Uploading the raw results in S3 Buckets, transforming them with pandas and then storing them in postgres database after quality checks. airflow, docker and Minio were used.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published