Skip to content

Commit

Permalink
feature/implement-unit-testing - Implement Unit Testing (#3)
Browse files Browse the repository at this point in the history
Co-authored-by: Abel Tavares <abel.tavares@ctw.bmwgroup.com>
  • Loading branch information
abeltavares and Abel Tavares authored Feb 10, 2024
1 parent 4958ec5 commit 7e1fa36
Show file tree
Hide file tree
Showing 8 changed files with 848 additions and 38 deletions.
38 changes: 38 additions & 0 deletions .github/workflows/run_black.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
name: Run Black Formatter

on:
pull_request:
types: [opened, synchronize, reopened]
push:
branches:
- main

jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: psf/black@stable

- name: Add label if failure
if: failure()
run: |
curl --request POST \
--url "https://api.github.com/repos/${{ github.repository }}/issues/${{ github.event.number }}/labels" \
--header "authorization: Bearer ${{ secrets.GITHUB_TOKEN }}" \
--header "Content-Type: application/json" \
--data-raw '{"labels": ["Formatter Failed"]}'
- name: Check and remove label if sucess
if: success()
run: |
labels=$(curl -s \
--request GET \
--url "https://api.github.com/repos/${{ github.repository }}/issues/${{ github.event.number }}/labels" \
--header "authorization: Bearer ${{ secrets.GITHUB_TOKEN }}")
if [[ $labels == *"Formatter Failed"* ]]; then
curl --request DELETE \
--url "https://api.github.com/repos/${{ github.repository }}/issues/${{ github.event.number }}/labels/Formatter%20Failed" \
--header "authorization: Bearer ${{ secrets.GITHUB_TOKEN }}"
fi
66 changes: 66 additions & 0 deletions .github/workflows/run_tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
name: Run Tests

on:
pull_request:
types: [opened, synchronize, reopened]
push:
branches:
- main

jobs:
test:
runs-on: ubuntu-latest

steps:
- name: Checkout repository
uses: actions/checkout@v2

- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.10.12

- name: Install dependencies
run: |
pip install --upgrade pip
pip install requests
pip install psycopg2-binary
pip install python-dotenv
pip install apache-airflow==2.8.1
pip install apache-airflow[cncf.kubernetes]
pip install pandas
pip install Flask-Session==0.5.0
- name: Initialize Airflow database
run: airflow db migrate

- name: Run tests
run: |
python -m unittest discover tests
python tests/dags_test.py
- name: Add label if failure
if: failure()
run: |
curl --request POST \
--url "https://api.github.com/repos/${{ github.repository }}/issues/${{ github.event.number }}/labels" \
--header "authorization: Bearer ${{ secrets.GITHUB_TOKEN }}" \
--header "Content-Type: application/json" \
--data-raw '{"labels": ["Tests Failed"]}'
- name: Check and remove label if present
if: success()
run: |
labels=$(curl -s \
--request GET \
--url "https://api.github.com/repos/${{ github.repository }}/issues/${{ github.event.number }}/labels" \
--header "authorization: Bearer ${{ secrets.GITHUB_TOKEN }}")
if [[ $labels == *"Tests Failed"* ]]; then
curl --request DELETE \
--url "https://api.github.com/repos/${{ github.repository }}/issues/${{ github.event.number }}/labels/Tests%20Failed" \
--header "authorization: Bearer ${{ secrets.GITHUB_TOKEN }}"
fi
17 changes: 17 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
repos:
- repo: local
hooks:
- id: unit-tests
name: Run Unit Tests
entry: |
python3 -c "
import subprocess
import sys
TEST_RESULT = subprocess.call(['python3', '-m', 'unittest', 'discover', 'tests'])
sys.exit(TEST_RESULT)
"
language: system
- repo: https://github.com/psf/black
rev: 22.10.0
hooks:
- id: black
133 changes: 117 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

MarketTrackPipe is an automated Apache Airflow data pipeline for collecting and storing stock and cryptocurrency market data. The pipeline retrieves daily data for the top 5 stocks and top 5 cryptocurrencies based on market performance from Alpha Vantage, Financial Modeling Prep, and CoinMarketCap APIs and stores it in a PostgreSQL database. The pipeline is containerized using Docker and written in Python 3.

The pipeline follows object-oriented programming principles to ensure modularity, maintainability, and extensibility. Each component of the pipeline is designed as a separate class with well-defined responsibilities.

Unit testing is implemented throughout the workflow to ensure the reliability and efficiency of the pipeline. These tests validate the functionality of each component and help identify any potential issues or bugs.

## Project Components


Expand All @@ -28,25 +32,123 @@ MarketTrackPipe is an automated Apache Airflow data pipeline for collecting and
```

- `core`: Contains core functionality for processing market data.
```mermaid
classDiagram
class BaseApiClient {
<<abstract>>
+logger: logging.Logger
<<abstractmethod>>
+@abstractmethod get_data(): Dict[str, List[str]]
}
class StockApiClient {
+ALPHA_API_KEY: str
+PREP_API_KEY: str
+ALPHA_BASE_URL: str
+PREP_BASE_URL: str
+logger: logging.Logger
+get_stocks(): Dict[str, List[str]]
+get_data(symbols: Dict[str, List[str]]): Dict[str, List[Dict]]
}
class CryptoApiClient {
+COIN_API_KEY: str
+logger: logging.Logger
+get_data(): Dict[str, List[Dict]]
}
class Storage {
+host: str
+port: int
+database: str
+user: str
+password: str
+conn
+cur
+logger: logging.Logger
+_connect()
+_close()
+store_data(data: Dict[str, List[Dict[str, any]]], data_type: str): None
}
class MarketDataEngine {
+api_client: BaseApiClient
+db_connector: Storage
+logger: logging.Logger
+process_stock_data()
+process_crypto_data()
}
BaseApiClient <|-- StockApiClient
BaseApiClient <|-- CryptoApiClient
MarketDataEngine "1" --> "1" BaseApiClient
MarketDataEngine "1" --> "1" Storage
```
<br>

- `dags`: Contains the Apache Airflow DAG definitions for orchestrating the data collection and storage process.
- `tests`: Contains the unit tests for testing individual components of the project.
- `init.sql`: SQL script for creating and initializing the database schema.
```mermaid
graph TD;
subgraph DB
schema[market_data]
stock[stock_data]
crypto[crypto_data]
end
subgraph Fields
date_collected
symbol
name
market_cap
volume
price
change_percent
end
schema --> |Schema| stock & crypto
stock & crypto -->|Table| gainers & losers & actives
gainers & losers & actives --> Fields
```
<br>

- `docker-compose.yml`: Defines the services and configures the project's containers, setting up the environment (postgres, pgadmin, airflow).

The `MarketDataEngine` class within `core/market_data_processor.py` encapsulates the logic for retrieving and storing market data. The `market_data_dag.py` file within the `dags` directory sets up the Apache Airflow DAGs for collecting and storing market data.

The `init.sql` defines two schemas in `market_data` database, one for `stock_data` and another for `crypto_data`, and then creates tables within each schema to store `gainer`, `loser`, and `active` data for both stock and crypto.

The columns for each table are as follows:

- `id` : a unique identifier for each row in the table
- `date_collected` : the date on which the data was collected, defaulting to the current date
- `symbol` : the stock or crypto symbol
- `name` : the name of the stock or crypto
- `market_cap` : the market capitalization of the stock or crypto
- `volume` : the trading volume of the stock or crypto
- `price` : the price of the stock or crypto
- `change_percent` : the percentage change in the price of the stock or crypto
<br>
```mermaid
graph TD;
subgraph MarketTrackPipe
A((Airflow))
D(Docker)
P(PostgreSQL)
G(pgAdmin)
end
subgraph Core
MDE(MarketDataEngine)
SAPI(StockApiClient)
CAPI(CryptoApiClient)
STR(Storage)
end
subgraph Dags
MD_DAG_stocks(process_stock_data)
MD_DAG_crypto(process_crypto_data)
end
D --> A & P & G
P --> G
A --> Dags
Dags --> MDE
MDE --> SAPI & CAPI
SAPI & CAPI --> API
API --> SAPI & CAPI
SAPI & CAPI --> STR
STR --> P
style A fill:#f9f,stroke:#333,stroke-width:4px;
style D fill:#bbf,stroke:#333,stroke-width:2px;
style P fill:#f9f,stroke:#333,stroke-width:4px;
style MDE fill:#f9f,stroke:#333,stroke-width:4px;
style MD_DAG_stocks fill:#f9f,stroke:#333,stroke-width:4px;
style MD_DAG_crypto fill:#f9f,stroke:#333,stroke-width:4px;
```

## Requirements

Expand Down Expand Up @@ -81,7 +183,6 @@ The columns for each table are as follows:
airflow trigger_dag data_collection_storage_crypto
```


## Setting up Pre-commit Hooks (Developer Setup)

To ensure code quality and run unit tests before committing changes, MarketTrackPipe uses [pre-commit](https://pre-commit.com/) hooks. Follow these steps to set it up:
Expand All @@ -97,9 +198,10 @@ To ensure code quality and run unit tests before committing changes, MarketTrack
```bash
pre-commit install
```

This will install the pre-commit hook into your git repository.
<br>

3. Now, every time you commit changes, pre-commit will automatically run unit tests to ensure code quality. Additionally, these tests are also executed in a GitHub Actions workflow on every pull request to the repository.

## Usage
Expand Down Expand Up @@ -137,4 +239,3 @@ Additionally, a GitHub Action is configured to automatically run the black forma
Please make sure to run `pip install pre-commit` and `pre-commit install` as mentioned in the setup instructions to enable the pre-commit hook on your local development environment.

Contributors are encouraged to follow the black code style guidelines when making changes to the codebase.

14 changes: 3 additions & 11 deletions core/market_data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,6 @@
from typing import Dict, List
from abc import ABC, abstractmethod

# Configure logging
logging.basicConfig(
level=logging.WARNING,
format="[%(asctime)s] [%(levelname)s] [%(name)s] - %(message)s",
)

logger = logging.getLogger(__name__)


class BaseApiClient(ABC):

Expand Down Expand Up @@ -206,7 +198,7 @@ def get_data(self, symbols: Dict[str, List[str]]) -> Dict[str, List[Dict]]:


class CryptoApiClient(BaseApiClient):
def __init__(self, COIN_API_KEY: str, logger: logging.Logger = None):
def __init__(self, COIN_API_KEY: str, logger: logging.Logger):
super().__init__(logger=logger)
self.COIN_API_KEY = COIN_API_KEY

Expand Down Expand Up @@ -353,7 +345,7 @@ def __init__(
database: str,
user: str,
password: str,
logger: logging.Logger = logger,
logger: logging.Logger,
):
self.host = host
self.port = port
Expand Down Expand Up @@ -454,7 +446,7 @@ def __init__(
self,
api_client: BaseApiClient,
db_connector: "Storage",
logger: logging.Logger = logger,
logger: logging.Logger,
):
self.api_client = api_client
self.db_connector = db_connector
Expand Down
32 changes: 21 additions & 11 deletions dags/market_data_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@
from airflow.models import DAG
from datetime import datetime
from dotenv import load_dotenv
import logging

# Configure logging
logging.basicConfig(
level=logging.WARNING,
format="[%(asctime)s] [%(levelname)s] [%(name)s] - %(message)s",
)

logger = logging.getLogger(__name__)

# Load environment variables from .env file
load_dotenv()
Expand Down Expand Up @@ -43,30 +52,31 @@

# Create instances of the classes
stock_api_client = StockApiClient(
os.environ["ALPHA_API_KEY"], os.environ["PREP_API_KEY"]
os.environ["ALPHA_API_KEY"], os.environ["PREP_API_KEY"], logger
)
crypto_api_client = CryptoApiClient(os.environ["COIN_API_KEY"])
crypto_api_client = CryptoApiClient(os.environ["COIN_API_KEY"], logger)
db_connector = Storage(
os.getenv["POSTGRES_HOST"],
os.getenv["POSTGRES_PORT"],
os.getenv["POSTGRES_DB"],
os.getenv["POSTGRES_USER"],
os.getenv["POSTGRES_PASSWORD"],
os.getenv("POSTGRES_HOST"),
os.getenv("POSTGRES_PORT"),
os.getenv("POSTGRES_DB"),
os.getenv("POSTGRES_USER"),
os.getenv("POSTGRES_PASSWORD"),
logger,
)
stock_engine = MarketDataEngine(stock_api_client, db_connector)
crypto_engine = MarketDataEngine(crypto_api_client, db_connector)
stock_engine = MarketDataEngine(stock_api_client, db_connector, logger)
crypto_engine = MarketDataEngine(crypto_api_client, db_connector, logger)

# Create the DAG for stock data collection and storage
dag_stocks = DAG(
"data_collection_storage_stocks",
"process_stock_data",
default_args=default_args_stocks,
schedule_interval="0 23 * * 1-5", # Schedule to run everyday at 11 PM from Monday to Friday
description="Collect and store stock data",
)

# Create the DAG for cryptocurrency data collection and storage
dag_cryptos = DAG(
"data_collection_storage_crypto",
"process_crypto_data",
default_args=default_args_cryptos,
schedule_interval="0 23 * * *", # Schedule to run everyday at 11 PM
description="Collect and store cryptocurrency data",
Expand Down
Loading

0 comments on commit 7e1fa36

Please sign in to comment.