From fa250dec84a34fdbe16f419cd516fb84f3a2882c Mon Sep 17 00:00:00 2001 From: Abel Tavares <121238257+abeltavares@users.noreply.github.com> Date: Sun, 4 Feb 2024 00:19:12 +0000 Subject: [PATCH 1/2] Update init.sql --- init.sql | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/init.sql b/init.sql index 0f2d33d..90f2144 100644 --- a/init.sql +++ b/init.sql @@ -1,12 +1,12 @@ -- Create a schema for stock_data -CREATE SCHEMA IF NOT EXISTS stock_data; +CREATE SCHEMA stock_data; -- Create a schema for crypto data -CREATE SCHEMA IF NOT EXISTS crypto_data; +CREATE SCHEMA crypto_data; -- Create tables for stock data -- Create a table to store gainers data -CREATE TABLE IF NOT EXISTS stock_data.gainers ( +CREATE TABLE stock_data.gainers ( id SERIAL PRIMARY KEY, date_collected DATE NOT NULL DEFAULT CURRENT_DATE, symbol VARCHAR(20) NOT NULL, @@ -18,7 +18,7 @@ CREATE TABLE IF NOT EXISTS stock_data.gainers ( ); -- Create a table to store losers data -CREATE TABLE IF NOT EXISTS stock_data.losers ( +CREATE TABLE stock_data.losers ( id SERIAL PRIMARY KEY, date_collected DATE NOT NULL DEFAULT CURRENT_DATE, symbol VARCHAR(20) NOT NULL, @@ -30,7 +30,7 @@ CREATE TABLE IF NOT EXISTS stock_data.losers ( ); -- Create a table to store actives data -CREATE TABLE IF NOT EXISTS stock_data.actives ( +CREATE TABLE stock_data.actives ( id SERIAL PRIMARY KEY, date_collected DATE NOT NULL DEFAULT CURRENT_DATE, symbol VARCHAR(20) NOT NULL, @@ -43,7 +43,7 @@ CREATE TABLE IF NOT EXISTS stock_data.actives ( -- Create tables for crypto data -- Create a table to store gainers data -CREATE TABLE IF NOT EXISTS crypto_data.gainers ( +CREATE TABLE crypto_data.gainers ( id SERIAL PRIMARY KEY, date_collected DATE NOT NULL DEFAULT CURRENT_DATE, symbol VARCHAR(20) NOT NULL, @@ -55,7 +55,7 @@ CREATE TABLE IF NOT EXISTS crypto_data.gainers ( ); -- Create a table to store losers data -CREATE TABLE IF NOT EXISTS crypto_data.losers ( +CREATE TABLE crypto_data.losers ( id SERIAL PRIMARY KEY, date_collected DATE NOT NULL DEFAULT CURRENT_DATE, symbol VARCHAR(20) NOT NULL, @@ -67,7 +67,7 @@ CREATE TABLE IF NOT EXISTS crypto_data.losers ( ); -- Create a table to store actives data -CREATE TABLE IF NOT EXISTS crypto_data.actives ( +CREATE TABLE crypto_data.actives ( id SERIAL PRIMARY KEY, date_collected DATE NOT NULL DEFAULT CURRENT_DATE, symbol VARCHAR(20) NOT NULL, From 4958ec57446735d06164da2ee713bc4932a44869 Mon Sep 17 00:00:00 2001 From: Abel Tavares <121238257+abeltavares@users.noreply.github.com> Date: Fri, 9 Feb 2024 17:31:02 +0000 Subject: [PATCH 2/2] feature/implement-oop - Implement OOP (#2) Co-authored-by: Abel Tavares --- README.md | 112 +++++--- core/__init__.py | 0 core/market_data_processor.py | 494 ++++++++++++++++++++++++++++++++ dags/data_collection_storage.py | 421 --------------------------- dags/market_data_dag.py | 154 ++++------ 5 files changed, 622 insertions(+), 559 deletions(-) create mode 100644 core/__init__.py create mode 100644 core/market_data_processor.py delete mode 100644 dags/data_collection_storage.py diff --git a/README.md b/README.md index 715786a..8101006 100644 --- a/README.md +++ b/README.md @@ -9,42 +9,33 @@ # MarketTrackPipe -MarketTrackPipe is an automated Apache Airflow data pipeline for collecting, storing, and backing up stock and cryptocurrency market data. The pipeline retrieves daily data for the top 5 stocks and top 5 cryptocurrencies based on market performance from Alpha Vantage, Financial Modeling Prep, and CoinMarketCap APIs and stores it in a PostgreSQL database. Additionally, the pipeline includes a monthly backup function that stores the data from the database in an AWS S3 bucket. The pipeline is containerized using Docker and written in Python 3. +MarketTrackPipe is an automated Apache Airflow data pipeline for collecting and storing stock and cryptocurrency market data. The pipeline retrieves daily data for the top 5 stocks and top 5 cryptocurrencies based on market performance from Alpha Vantage, Financial Modeling Prep, and CoinMarketCap APIs and stores it in a PostgreSQL database. The pipeline is containerized using Docker and written in Python 3. ## Project Components -The pipeline consists of two Python scripts in `dags` folder: -- `data_collection_storage.py`: Contains functions for retrieving stock and crypto performance data from APIs and storing the data in a PostgreSQL database, as well as a function for backing up the data to Amazon S3. -- `market_data_dag.py`: Sets up the DAGs for collecting and storing stock data from the financialmodelingprep and Alpha Advantage APIs, as well as cryptocurrency data from the CoinMarketCap API. Additionally, it sets up a DAG for backing up the data in the PostgreSQL database to Amazon S3 on the last day of every month. +``` + ├── core + │   ├── __init__.py + │   └── market_data_processor.py + ├── dags + │   └── market_data_dag.py + ├── docker-compose.yaml + ├── init.sql + └── tests + ├── dags_test.py + └── tests_market_data_processor.py +``` -The `data_collection_storage_stocks` DAG consists of the following tasks: +- `core`: Contains core functionality for processing market data. +- `dags`: Contains the Apache Airflow DAG definitions for orchestrating the data collection and storage process. +- `tests`: Contains the unit tests for testing individual components of the project. +- `init.sql`: SQL script for creating and initializing the database schema. +- `docker-compose.yml`: Defines the services and configures the project's containers, setting up the environment (postgres, pgadmin, airflow). -1. `get_stocks`: Retrieves the symbol of the top 5 stocks according to market performance. +The `MarketDataEngine` class within `core/market_data_processor.py` encapsulates the logic for retrieving and storing market data. The `market_data_dag.py` file within the `dags` directory sets up the Apache Airflow DAGs for collecting and storing market data. -2. `get_stock_data`: Retrieves detailed information of the stocks retrieved in task 1. - -3. `store_stock_data`: Stores the stock data in a PostgreSQL database. - -DAG runs every day at 11 PM from Monday to Friday. - -The `data_collection_storage_crypto` DAG consists of the following tasks: - -1. `get_crypto_data`: Retrieves data for the top 5 cryptocurrencies according to market performance. - -2. `store_crypto_data`: Stores the cryptocurrency data in a PostgreSQL database. - -DAG runs every day at 11 PM. - -The `backup_data` DAG consists of the following task: - -1. `backup_data`: Extracts data from the PostgreSQL database and stores it in an Amazon S3 bucket in parquet file format. - -The `docker-compose.yml` file is used to define the services and configure the project's containers, setting up the environment (postgres, pgadmin, airflow). - -The `init.sql` file is used to create and initialize the database schema when the docker compose command is executed. - -It creates creates two schemas in `market_data` database, one for `stock_data` and another for `crypto_data`, and then creates tables within each schema to store `gainer`, `loser`, and `active` data for both stock and crypto. +The `init.sql` defines two schemas in `market_data` database, one for `stock_data` and another for `crypto_data`, and then creates tables within each schema to store `gainer`, `loser`, and `active` data for both stock and crypto. The columns for each table are as follows: @@ -60,32 +51,56 @@ The columns for each table are as follows: ## Requirements - [Docker](https://www.docker.com/get-started) +- [pre-commit](https://pre-commit.com/) (Developer) ## Setup -1. Clone the repository:
+1. Clone the repository: - $ git clone https://github.com/abeltavares/MarketTrackPipe.git + ```bash + git clone https://github.com/abeltavares/MarketTrackPipe.git + ``` 2. Create an '.env' file in the project's root directory with the required environment variables (refer to the example .env file in the project). -3. Start the Docker containers:
+3. Start the Docker containers: + + ```bash + docker-compose up + ``` - $ docker-compose up +4. Access the Airflow web server: -4. Access the Airflow web server:
+ Go to the Airflow web UI at http://localhost:8080 and turn on the DAGs. - Go to the Airflow web UI at http://localhost:8080 and turn on the DAGs. + Alternatively, you can trigger the DAG manually by running the following command in your terminal: - Alternatively, you can trigger the DAG manually by running the following command in your terminal: + ```bash + airflow trigger_dag data_collection_storage_stocks + airflow trigger_dag data_collection_storage_crypto + ``` - $ airflow trigger_dag data_collection_storage_stocks - $ airflow trigger_dag data_collection_storage_crypto - $ airflow trigger_dag backup_data -That's it! You should now be able to collect and store stock and cryptocurrency data using MarketTrackPipe. +## Setting up Pre-commit Hooks (Developer Setup) +To ensure code quality and run unit tests before committing changes, MarketTrackPipe uses [pre-commit](https://pre-commit.com/) hooks. Follow these steps to set it up: + +1. Install `pre-commit` by running the following command in your terminal: + + ```bash + pip install pre-commit + ``` + +2. Run the following command to set up pre-commit: + + ```bash + pre-commit install + ``` + + This will install the pre-commit hook into your git repository. +
+3. Now, every time you commit changes, pre-commit will automatically run unit tests to ensure code quality. Additionally, these tests are also executed in a GitHub Actions workflow on every pull request to the repository. ## Usage @@ -93,10 +108,11 @@ After setting up the workflow, you can access the Apache Airflow web UI to monit To access the data stored in the PostgreSQL database, you have two options: -1. Use the command-line tool `psql` to run SQL queries directly. The database credentials and connection information can be found in the '.env' file as well. Using psql, you can connect to the database, execute queries, and save the output to a file or use it as input for other scripts or applications. - - $ docker exec -it my-postgres psql -U postgres -d market_data +1. **Command-line tool `psql`**: You can use `psql` to run SQL queries directly. Find the database credentials and connection information in the '.env' file. Use the following command in your terminal to connect to the database: + ```bash + docker exec -it [host] psql -U [user] -d market_data + ``` 2. Use `pgAdmin`, a web-based visual interface. To access it, navigate to http://localhost:5050 in your web browser and log in using the credentials defined in the `.env` file in the project root directory. From there, you can interactively browse the tables created by the pipeline, run queries, and extract the desired data for analysis or visualization. Choose the option that suits you best depending on your familiarity with SQL and preference for a graphical or command-line interface. @@ -112,3 +128,13 @@ This project is open to contributions. If you have any suggestions or improvemen ## Copyright © 2023 Abel Tavares + + +The codebase of this project follows the [black](https://github.com/psf/black) code style. To ensure consistent formatting, the [pre-commit](https://pre-commit.com/) hook is set up to run the black formatter before each commit. + +Additionally, a GitHub Action is configured to automatically run the black formatter on every pull request, ensuring that the codebase remains formatted correctly. + +Please make sure to run `pip install pre-commit` and `pre-commit install` as mentioned in the setup instructions to enable the pre-commit hook on your local development environment. + +Contributors are encouraged to follow the black code style guidelines when making changes to the codebase. + diff --git a/core/__init__.py b/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/market_data_processor.py b/core/market_data_processor.py new file mode 100644 index 0000000..5f8a6e7 --- /dev/null +++ b/core/market_data_processor.py @@ -0,0 +1,494 @@ +import psycopg2 +from typing import Dict +import requests +import psycopg2 +import logging +from typing import Dict, List +from abc import ABC, abstractmethod + +# Configure logging +logging.basicConfig( + level=logging.WARNING, + format="[%(asctime)s] [%(levelname)s] [%(name)s] - %(message)s", +) + +logger = logging.getLogger(__name__) + + +class BaseApiClient(ABC): + + # Define constant for the top gainers, losers, and actives + TOP_PERFORMANCE_LIMIT = 5 + + def __init__(self, logger: logging.Logger) -> None: + self.logger = logger + + @abstractmethod + def get_data(self) -> Dict[str, List[str]]: + """ + Abstract method to get data. Child classes must implement this method. + """ + pass + + +class StockApiClient(BaseApiClient): + """ + A client for retrieving stock data from multiple APIs. + + Attributes: + ALPHA_BASE_URL (str): The base URL for the Alpha Vantage API. + PREP_BASE_URL (str): The base URL for the Financial Modeling Prep API. + ALPHA_API_KEY (str): The API key for the Alpha Vantage API. + PREP_API_KEY (str): The API key for the Financial Modeling Prep API. + logger (logging.Logger): The logger object for logging messages. + + Methods: + get_stocks(): Retrieves the symbols of the top 5 stocks for gainers, losers, and actives. + get_data(symbols: Dict[str, List[str]]) -> Dict[str, List[Dict]]: Retrieves the volume, price, change percent, market cap, and name for the given symbols from Alpha Vantage's API. + """ + + ALPHA_BASE_URL = "https://www.alphavantage.co/query?" + PREP_BASE_URL = "https://financialmodelingprep.com/api/v3/" + + def __init__(self, ALPHA_API_KEY: str, PREP_API_KEY: str, logger: logging.Logger): + """ + Initializes a new instance of the StockApiClient class. + + Args: + ALPHA_API_KEY (str): The API key for the Alpha Vantage API. + PREP_API_KEY (str): The API key for the Financial Modeling Prep API. + logger (logging.Logger, optional): The logger object for logging messages. Defaults to None. + """ + super().__init__(logger=logger) + self.ALPHA_API_KEY = ALPHA_API_KEY + self.PREP_API_KEY = PREP_API_KEY + + def get_stocks(self) -> Dict[str, List[str]]: + """ + Get the symbols of the top 5 stocks for gainers, losers, and actives. + + Returns: + Dict[str, List[str]]: A dictionary with lists of symbols for gainers, losers, and actives. + + Raises: + Exception: If any of the requests fails or if no data was retrieved. + """ + # Define the URLs for the requested market performances + urls = { + "gainers": f"{self.PREP_BASE_URL}stock_market/gainers?apikey={self.PREP_API_KEY}", + "losers": f"{self.PREP_BASE_URL}stock_market/losers?apikey={self.PREP_API_KEY}", + "actives": f"{self.PREP_BASE_URL}stock_market/actives?apikey={self.PREP_API_KEY}", + } + + # Initialize the dictionary to store the stocks + stocks = {"gainers": [], "losers": [], "actives": []} + + # Send a GET request to each URL + for performance, url in urls.items(): + try: + response = requests.get(url, timeout=5) + + # Check if the request was successful + response.raise_for_status() + + # Retrieve the data from the API response + data = response.json() + + # Check if the data is empty + if not data: + self.logger.error(f"No data was retrieved for '{performance}'") + raise ValueError(f"No data was retrieved for '{performance}'") + + # Get symbol of top stocks in the specified market performance + stock_symbols = [ + item["symbol"] for item in data[: self.TOP_PERFORMANCE_LIMIT] + ] + + # Store the stocks in the dictionary + stocks[performance] = stock_symbols + self.logger.info(f"Successfully retrieved stocks for '{performance}'.") + except requests.exceptions.RequestException as req_error: + self.logger.error( + f"Error during API request for '{performance}': {req_error}" + ) + raise + except (ValueError, IndexError, KeyError, TypeError) as data_error: + self.logger.error( + f"Error processing data for '{performance}': {data_error}" + ) + raise + + return stocks + + def get_data(self, symbols: Dict[str, List[str]]) -> Dict[str, List[Dict]]: + """ + Retrieves the volume, price, change percent, market cap, and name for the given symbols from Alpha Vantage's API. + + Args: + symbols (Dict[str, List[str]]): A dictionary of symbols for the stocks to retrieve data for, with the symbol type (gainers, losers, actives) as the key and a list of symbols as the value. + + Returns: + Dict[str, List[Dict]]: A dictionary of dictionaries for each symbol type (gainers, losers, actives) with the symbol as the key and a dictionary of volume, price, change percent, market cap, and name as the value. + """ + quote_endpoint = "GLOBAL_QUOTE" + overview_endpoint = "profile" + + stock_data = {} + + for symbol_type, symbol_list in symbols.items(): + stock_data[symbol_type] = [] + for symbol in symbol_list: + try: + # Build the URL to request data for the given symbol from the global quote endpoint + alpha_url = f"{self.ALPHA_BASE_URL}function={quote_endpoint}&symbol={symbol}&apikey={self.ALPHA_API_KEY}" + # Request data from the API and convert the response to a dictionary + alpha_response = requests.get(alpha_url) + alpha_response.raise_for_status() # Raise an error for unsuccessful responses + quote_data = alpha_response.json() + + if not quote_data["Global Quote"]: + self.logger.error( + f"No alpha data was retrieved for symbol {symbol}" + ) + raise KeyError( + f"No alpha data was retrieved for symbol {symbol}" + ) + + # Extract the volume, price, and change percent data from the response + volume = quote_data["Global Quote"]["06. volume"] + price = quote_data["Global Quote"]["05. price"] + change_percent = quote_data["Global Quote"]["10. change percent"] + + # Build the URL to request data for the given symbol from the profile endpoint + overview_url = f"{self.PREP_BASE_URL}{overview_endpoint}/{symbol}?apikey={self.PREP_API_KEY}" + # Request data from the API and convert the response to a dictionary + prep_response = requests.get(overview_url) + prep_response.raise_for_status() # Raise an error for unsuccessful responses + overview_data = prep_response.json() + + if ( + not overview_data[0]["companyName"] + or not overview_data[0]["mktCap"] + ): + self.logger.error( + f"No prep data was retrieved for symbol {symbol}" + ) + raise KeyError( + f"No prep data was retrieved for symbol {symbol}" + ) + + # Extract the name and market cap data from the response + name = overview_data[0]["companyName"] + market_cap = overview_data[0]["mktCap"] + + # Append the data to the stock_data list + stock_data[symbol_type].append( + { + "symbol": symbol, + "volume": volume, + "price": price, + "change_percent": change_percent.rstrip("%"), + "market_cap": market_cap, + "name": name, + } + ) + self.logger.info("Successfully retrieved stock data.") + except requests.exceptions.RequestException as req_error: + self.logger.error( + f"Error during API request for {symbol}: {req_error}" + ) + except (ValueError, TypeError) as error: + self.logger.error( + f"An error occurred while retrieving data for {symbol}: {error}" + ) + + return stock_data + + +class CryptoApiClient(BaseApiClient): + def __init__(self, COIN_API_KEY: str, logger: logging.Logger = None): + super().__init__(logger=logger) + self.COIN_API_KEY = COIN_API_KEY + + def get_data(self) -> Dict[str, List[Dict]]: + """ + Gets the top gainers, losers, and active cryptocurrencies on CoinMarketCap. + + Returns: + dict: A dictionary containing the top gainers, losers, and most active cryptocurrencies. + """ + # Define the API endpoint + url = "https://pro-api.coinmarketcap.com/v1/cryptocurrency/listings/latest" + + # Set the parameters for the API request + parameters = { + "start": "1", + "limit": "100", + "convert": "USD", + "sort": "percent_change_24h", + } + + # Add the API key to the request headers + headers = { + "Accepts": "application/json", + "X-CMC_PRO_API_KEY": self.COIN_API_KEY, + } + + try: + # Send the API request + response = requests.get(url, headers=headers, params=parameters) + + # Check if the API request was successful + response.raise_for_status() + + # Parse the response JSON data + data = response.json() + + # Extract the top gainers, top losers, and top active cryptos + top_gainers = data.get("data", [])[: self.TOP_PERFORMANCE_LIMIT] + top_losers = data.get("data", [])[-self.TOP_PERFORMANCE_LIMIT :] + top_active = sorted( + data.get("data", []), + key=lambda x: x["quote"]["USD"]["volume_24h"], + reverse=True, + )[: self.TOP_PERFORMANCE_LIMIT] + + # Create the dictionaries for gainers, losers, and active cryptos + gainer_list = [] + loser_list = [] + active_list = [] + + for gainer in top_gainers: + try: + gainer_dict = { + "symbol": gainer.get("symbol", ""), + "name": gainer.get("name", ""), + "volume": gainer["quote"]["USD"].get("volume_24h", ""), + "price": gainer["quote"]["USD"].get("price", ""), + "change_percent": gainer["quote"]["USD"].get( + "percent_change_24h", "" + ), + "market_cap": gainer["quote"]["USD"].get("market_cap", ""), + } + except KeyError as e: + self.logger.error( + f"KeyError while processing gainer data: {e}, Data: {gainer}" + ) + gainer_list.append(gainer_dict) + + for loser in top_losers: + try: + loser_dict = { + "symbol": loser.get("symbol", ""), + "name": loser.get("name", ""), + "volume": loser["quote"]["USD"].get("volume_24h", ""), + "price": loser["quote"]["USD"].get("price", ""), + "change_percent": loser["quote"]["USD"].get( + "percent_change_24h", "" + ), + "market_cap": loser["quote"]["USD"].get("market_cap", ""), + } + except KeyError as e: + self.logger.error( + f"KeyError while processing loser data: {e}, Data: {loser}" + ) + loser_list.append(loser_dict) + + for active in top_active: + try: + active_dict = { + "symbol": active.get("symbol", ""), + "name": active.get("name", ""), + "volume": active["quote"]["USD"].get("volume_24h", ""), + "price": active["quote"]["USD"].get("price", ""), + "change_percent": active["quote"]["USD"].get( + "percent_change_24h", "" + ), + "market_cap": active["quote"]["USD"].get("market_cap", ""), + } + except KeyError as e: + self.logger.error( + f"KeyError while processing active data: {e}, Data: {active} " + ) + active_list.append(active_dict) + + self.logger.info("Successfully retrieved crypto data.") + return { + "gainers": gainer_list, + "losers": loser_list, + "actives": active_list, + } + + except requests.exceptions.RequestException as req_error: + self.logger.error( + f"Error during API request for cryptocurrencies: {req_error}" + ) + return None + except ValueError as data_error: + self.logger.error( + f"Error processing data for cryptocurrencies: {data_error}" + ) + return None + + +class Storage: + """ + A class that handles storing data in a database. + + Attributes: + host (str): The host address of the database. + port (int): The port number of the database. + database (str): The name of the database. + user (str): The username for accessing the database. + password (str): The password for accessing the database. + logger (logging.Logger): The logger object for logging messages. + conn: The database connection object. + cur: The database cursor object. + """ + + def __init__( + self, + host: str, + port: int, + database: str, + user: str, + password: str, + logger: logging.Logger = logger, + ): + self.host = host + self.port = port + self.database = database + self.user = user + self.password = password + self.logger = logger + self.conn = None + self.cur = None + + def _connect(self): + try: + self.conn = psycopg2.connect( + host=self.host, + port=self.port, + database=self.database, + user=self.user, + password=self.password, + ) + self.cur = self.conn.cursor() + except psycopg2.Error as e: + self.logger.error(f"Error connecting to the database: {e}") + raise + + def _close(self): + try: + if self.cur: + self.cur.close() + if self.conn: + self.conn.close() + except psycopg2.Error as e: + self.logger.error(f"Error closing the database connection: {e}") + + def store_data(self, data: Dict[str, List[Dict[str, any]]], data_type: str) -> None: + try: + # Check if data_type is a string + if not isinstance(data_type, str): + self.logger.error("data_type must be a string") + raise TypeError("data_type must be a string") + + self.logger.info(f"Storing {data_type} data in the database.") + + self._connect() + schema_name = f"{data_type}_data" + with self.conn, self.cur: + for key, asset_list in data.items(): + table = f"{schema_name}.{key}" + for asset_data in asset_list: + symbol = asset_data["symbol"] + name = asset_data["name"] + volume = asset_data["volume"] + price = asset_data["price"] + market_cap = asset_data["market_cap"] + change_percent = asset_data["change_percent"] + + if not all([symbol, name]): + self.logger.error( + f"One or more required fields are missing from the {data_type} data for symbol: {symbol}, name: {name}" + ) + raise ValueError( + f"One or more required fields are missing from the {data_type} data" + ) + + self.cur.execute( + f"INSERT INTO {table} (symbol, name, market_cap, volume, price, change_percent) VALUES (%s, %s, %s, %s, %s, %s)", + (symbol, name, market_cap, volume, price, change_percent), + ) + self.conn.commit() + # Log a message for successful data storage + self.logger.info( + f"Successfully stored {len(asset_list)} entries in the {table} table." + ) + except psycopg2.Error as error: + self.logger.error( + f"An error occurred while storing the {data_type} data in the database: {error}" + ) + if self.conn: + self.conn.rollback() + finally: + self._close() + + +class MarketDataEngine: + """ + Class representing a market data engine. + + Attributes: + api_client (BaseApiClient): The API client used to fetch market data. + db_connector (Storage): The database connector used to store market data. + logger (logging.Logger): The logger used for logging. + + Methods: + process_stock_data: Fetches stock data from the API client and stores it in the database. + process_crypto_data: Fetches crypto data from the API client and stores it in the database. + """ + + def __init__( + self, + api_client: BaseApiClient, + db_connector: "Storage", + logger: logging.Logger = logger, + ): + self.api_client = api_client + self.db_connector = db_connector + self.logger = logger + + def process_stock_data(self): + """ + Fetches stock data from the API client and stores it in the database. + If the stock data retrieval fails, a warning message is logged. + """ + try: + stocks = self.api_client.get_stocks() + stock_data = self.api_client.get_data(stocks) + if stock_data is not None: + self.db_connector.store_data(stock_data, "stock") + else: + self.logger.warning( + "Stock data retrieval failed. No data stored in the database." + ) + except Exception as e: + self.logger.error(f"Error processing stock data: {e}") + + def process_crypto_data(self): + """ + Fetches crypto data from the API client and stores it in the database. + If the crypto data retrieval fails, a warning message is logged. + """ + try: + crypto_data = self.api_client.get_data() + if crypto_data is not None: + self.db_connector.store_data(crypto_data, "crypto") + else: + self.logger.warning( + "Crypto data retrieval failed. No data stored in the database." + ) + except Exception as e: + self.logger.error(f"Error processing crypto data: {e}") diff --git a/dags/data_collection_storage.py b/dags/data_collection_storage.py deleted file mode 100644 index 3a1d099..0000000 --- a/dags/data_collection_storage.py +++ /dev/null @@ -1,421 +0,0 @@ -""" -A Python script with functions for retrieving stock and crypto performance data from Alpha Vantage, Financial Modeling Prep and CoinMarketCap APIs and storing the data in a PostgreSQL database. - -Additionally, the script includes a backup_data function for extracting and storing the data from the database to an S3 bucket on a monthly basis. -""" - -# Import necessary modules -import requests -import time -import os -import psycopg2 -import boto3 -import pandas as pd -import pyarrow as pa -import pyarrow.parquet as pq -import psycopg2 -from datetime import datetime, timedelta - - -# Define the base URL and API key for Alpha Advantage API -ALPHA_BASE_URL = "https://www.alphavantage.co/query?" -ALPHA_API_KEY = os.environ['ALPHA_API_KEY'] - -# Define the base URL and API key for Financial Modeling Prep API -PREP_BASE_URL = "https://financialmodelingprep.com/api/v3/" -PREP_API_KEY = os.environ['PREP_API_KEY'] - -# Define the API key for CoinMarketCap API -COIN_API_KEY = os.environ['COIN_API_KEY'] - -# These variables are used to make API requests to Alpha Advantage, Financial Modeling Prep and CoinMarketCap -# The base URL and API key are used to build the complete URL to make the request - -# Get the database configuration from environment variables -host = os.environ.get("POSTGRES_HOST") -port = os.environ.get("POSTGRES_PORT") -database = os.environ.get("POSTGRES_DB") -user = os.environ.get("POSTGRES_USER") -password = os.environ.get("POSTGRES_PASSWORD") - - -def get_stocks() -> dict: - """ - Get the symbols of the top 5 stocks for gainers, losers, and actives. - :return: a dictionary with lists of symbols for gainers, losers, and actives - :raise: Exception if any of the requests fails or if no data was retrieved - """ - # Define the URLs for the requested market performances - urls = { - 'gainers': f"{PREP_BASE_URL}stock_market/gainers?apikey={PREP_API_KEY}", - 'losers': f"{PREP_BASE_URL}stock_market/losers?apikey={PREP_API_KEY}", - 'actives': f"{PREP_BASE_URL}stock_market/actives?apikey={PREP_API_KEY}" - } - - # Initialize the dictionary to store the stocks - stocks = {'gainers': [], 'losers': [], 'actives': []} - - # Send a GET request to each URL - for performance, url in urls.items(): - response = requests.get(url, timeout=5) - - # Check if the request was successful - if response.status_code != 200: - raise Exception(f"Failed to retrieve data from the API for '{performance}': {response.text}") - - # Retrieve the data from the API response - data = response.json() - - # Check if the data is empty - if not data: - raise Exception(f"No data was retrieved for '{performance}'") - - # Get symbol of top 5 stocks in the specified market performance - stock_symbols = [item['symbol'] for item in data[:5]] - - # Store the stocks in the dictionary - stocks[performance] = stock_symbols - - return stocks - - -def get_stock_data(symbols: dict) -> dict: - """ - Retrieves the volume, price, change percent, market cap, and name for the given symbols from Alpha Vantage's API. - :param symbols: A dictionary of symbols for the stocks to retrieve data for, with the symbol type (gainers, losers, actives) as the key and a list of symbols as the value - :return: A dictionary of dictionaries for each symbol type (gainers, losers, actives) with the symbol as the key and a dictionary of volume, price, change percent, market cap, and name as the value - """ - quote_endpoint = "GLOBAL_QUOTE" - overview_endpoint = "profile" - stock_data = {} - for symbol_type, symbol_list in symbols.items(): - stock_data[symbol_type] = [] - for symbol in symbol_list: - try: - # Build the URL to request data for the given symbol from the global quote endpoint - alpha_url = f"{ALPHA_BASE_URL}function={quote_endpoint}&symbol={symbol}&apikey={ALPHA_API_KEY}" - # Request data from the API and convert the response to a dictionary - alpha_response = requests.get(alpha_url) - quote_data = alpha_response.json() - - # Validate the data returned from the API - if "Error Message" in quote_data: - raise ValueError(f"Error retrieving data for symbol {symbol}: {quote_data['Error Message']}") - - # Extract the volume, price, and change percent data from the response - volume = quote_data["Global Quote"]["06. volume"] - price = quote_data["Global Quote"]["05. price"] - change_percent = quote_data["Global Quote"]["10. change percent"] - - # Build the URL to request data for the given symbol from the profile endpoint - overview_url = f"{PREP_BASE_URL}{overview_endpoint}/{symbol}?apikey={PREP_API_KEY}" - # Request data from the API and convert the response to a dictionary - prep_response = requests.get(overview_url) - overview_data = prep_response.json() - - # Validate the data returned from the API - if "Error Message" in overview_data: - raise ValueError(f"Error retrieving data for symbol {symbol}: {quote_data['Error Message']}") - - # Extract the name and market cap data from the response - name = overview_data[0]['companyName'] - market_cap = overview_data[0]['mktCap'] - - # Append the data to the stock_data list - stock_data[symbol_type].append({ - "symbol": symbol, - "volume": volume, - "price": price, - "change_percent": change_percent.rstrip('%'), - "market_cap": market_cap, - "name": name - }) - except (ValueError, KeyError) as error: - print(f"An error occurred while retrieving data for symbol {symbol}: {error}") - # Pause until the next full minute - time.sleep(55) - return stock_data - - -def store_stock_data(data: dict)-> None: - """ - Store the stock market data in a PostgreSQL database - :param data: A dictionary with keys 'gainers', 'losers', and 'actives', each with a list of stock data - """ - - # Set the schema name to use for storing the stock data - schema_name = "stock_data" - - # Connect to the database - conn = None - cur = None - try: - # Connect to the database using the configuration from environment variables - conn = psycopg2.connect( - host=host, - port=port, - database=database, - user=user, - password=password - ) - - # Create a cursor to execute SQL queries - cur = conn.cursor() - - # Loop through the stock data for each key in the dictionary - for key, stock_list in data.items(): - # Create a table name based on the key name - table = f"{schema_name}.{key}" - # Loop through the stock data - for stock_data in stock_list: - # Extract the relevant information - symbol = stock_data["symbol"] - name = stock_data["name"] - volume = stock_data["volume"] - price = stock_data["price"] - market_cap = stock_data["market_cap"] - change_percent = stock_data["change_percent"] - - # Validate the data - if not all([symbol, name]): - raise ValueError("One or more required fields are missing from the stock data") - - # Insert the data into the table - cur.execute(f"INSERT INTO {table} (symbol, name, market_cap, volume, price, change_percent) VALUES (%s, %s, %s, %s, %s, %s)", - (symbol, name, market_cap, volume, price, change_percent)) - - # Commit the changes to the database - conn.commit() - except (psycopg2.Error, ValueError, TypeError) as error: - print(f"An error occurred while storing the data in the database: {error}") - # Rollback the changes if there was an error - if conn: - conn.rollback() - finally: - if cur: - cur.close() - if conn: - conn.close() - - -def get_crypto_data() -> dict: - """ - Gets the top gainers, losers, and active cryptocurrencies on CoinMarketCap. - - Returns: - dict: A dictionary containing the top gainers, losers, and most active cryptocurrencies. - """ - # Define the API endpoint - url = 'https://pro-api.coinmarketcap.com/v1/cryptocurrency/listings/latest' - - # Set the parameters for the API request - parameters = { - 'start': '1', - 'limit': '100', - 'convert': 'USD', - 'sort': 'percent_change_24h' - } - - # Add the API key to the request headers - headers = { - 'Accepts': 'application/json', - 'X-CMC_PRO_API_KEY': COIN_API_KEY - } - - try: - # Send the API request - response = requests.get(url, headers=headers, params=parameters) - - # Check if the API request was successful - if response.status_code == 200: - # Parse the response JSON data - data = response.json() - - # Extract the top gainers, top losers, and top active cryptos - top_gainers = data['data'][:5] - top_losers = data['data'][-5:] - top_active = sorted(data['data'], key=lambda x: x['quote']['USD']['volume_24h'], reverse=True)[:5] - - # Create the dictionaries for gainers, losers, and active cryptos - gainer_list = [] - loser_list = [] - active_list = [] - - for gainer in top_gainers: - gainer_dict = { - 'symbol': gainer['symbol'], - 'name': gainer['name'], - 'volume': gainer['quote']['USD']['volume_24h'], - 'price': gainer['quote']['USD']['price'], - 'change_percent': gainer['quote']['USD']['percent_change_24h'], - 'market_cap': gainer['quote']['USD']['market_cap'] - } - gainer_list.append(gainer_dict) - - for loser in top_losers: - loser_dict = { - 'symbol': loser['symbol'], - 'name': loser['name'], - 'volume': loser['quote']['USD']['volume_24h'], - 'price': loser['quote']['USD']['price'], - 'change_percent': loser['quote']['USD']['percent_change_24h'], - 'market_cap': loser['quote']['USD']['market_cap'] - } - loser_list.append(loser_dict) - - for active in top_active: - active_dict = { - 'symbol': active['symbol'], - 'name': active['name'], - 'volume': active['quote']['USD']['volume_24h'], - 'price': active['quote']['USD']['price'], - 'change_percent': active['quote']['USD']['percent_change_24h'], - 'market_cap': active['quote']['USD']['market_cap'] - } - active_list.append(active_dict) - - return {'gainers': gainer_list, 'losers': loser_list, 'actives': active_list} - else: - print(f"Error: Request failed with status code {response.status_code}.") - return None - - except requests.exceptions.RequestException as e: - print(f"Error: {e}") - return None - - -def store_crypto_data(data: dict) -> None: - """ - Store the crypto market data in a PostgreSQL database - :param data: A dictionary with keys 'gainers', 'losers', and 'actives', each with a list of crypto data - """ - - # Set the schema name to use for storing the crypto data - schema_name = "crypto_data" - - # Connect to the database - conn = None - cur = None - try: - # Connect to the database using the configuration from environment variables - conn = psycopg2.connect( - host=host, - port=port, - database=database, - user=user, - password=password - ) - - # Create a cursor to execute SQL queries - cur = conn.cursor() - - # Loop through the crypto data for each key in the dictionary - for key, crypto_list in data.items(): - # Create a table name based on the key name - table = f"{schema_name}.{key}" - # Loop through the crypto data - for crypto_data in crypto_list: - # Extract the relevant information - symbol = crypto_data["symbol"] - name = crypto_data["name"] - volume = crypto_data["volume"] - price = crypto_data["price"] - market_cap = crypto_data["market_cap"] - change_percent = crypto_data["change_percent"] - print(change_percent) - - # Validate the data - if not all([symbol, name]): - raise ValueError("One or more required fields are missing from the crypto data") - - # Insert the data into the table - cur.execute(f"INSERT INTO {table} (symbol, name, market_cap, volume, price, change_percent) VALUES (%s, %s, %s, %s, %s, %s)", - (symbol, name, market_cap, volume, price, change_percent)) - - # Commit the changes to the database - conn.commit() - except (psycopg2.Error, ValueError, TypeError) as error: - print(f"An error occurred while storing the data in the database: {error}") - # Rollback the changes if there was an error - if conn: - conn.rollback() - finally: - if cur: - cur.close() - if conn: - conn.close() - - -def backup_data(bucket_name: str) -> None: - """ - Extracts and stores data from database to S3 for the current month. - - Args: - - bucket_name (str): The name of the S3 bucket to store the data in. - """ - # Connect to the database - conn = None - cur = None - try: - # Connect to the database using the configuration from environment variables - conn = psycopg2.connect( - host=host, - port=port, - database=database, - user=user, - password=password - ) - - # Get the current month - current_month = datetime.today().replace(day=1, hour=0, minute=0, second=0, microsecond=0) - start_date = current_month.strftime('%Y-%m-%d') - end_date = (current_month + timedelta(days=31)).strftime('%Y-%m-%d') - - # Define the SQL query to extract the data for the current month - # Define a list of performance types and their corresponding tables - performance_types = { - 'stocks_gainers': 'stock_data.gainers', - 'stocks_losers': 'stock_data.losers', - 'stocks_actives': 'stock_data.actives', - 'crypto_gainers': 'crypto_data.gainers', - 'crypto_losers': 'crypto_data.losers', - 'crypto_actives': 'crypto_data.actives' - } - - # Initialize an empty list to store the generated SQL queries - queries = [] - - # Loop through each performance type and table, and generate the corresponding SQL query - for performance_type, table in performance_types.items(): - query = (f"SELECT '{performance_type}' as performance_type, " - f"date_collected, symbol, name, market_cap, volume, " - f"price, change_percent FROM market_data.{table} " - f"WHERE date_collected >= '{start_date}' AND date_collected < '{end_date}'") - - queries.append(query) - - # Combine all queries using UNION ALL to create the final SQL statement - final_query = " UNION ALL ".join(queries) - - - # Execute the query and retrieve the data - cur = conn.cursor() - cur.execute(final_query) - rows = cur.fetchall() - - # Create a Pandas DataFrame with the retrieved data - df = pd.DataFrame(rows, columns=[desc[0] for desc in cur.description]) - - # Store the data as a Parquet file in an S3 bucket - s3 = boto3.resource('s3') - key = f"market_data/{current_month.year}/{current_month.month}/data.parquet" - pq.write_table(pa.Table.from_pandas(df), f"s3://{bucket_name}/{key}") - - except Exception as e: - print(f"Error backing up data to S3: {e}") - finally: - if cur: - cur.close() - if conn: - conn.close() - \ No newline at end of file diff --git a/dags/market_data_dag.py b/dags/market_data_dag.py index 337fb6e..c328324 100644 --- a/dags/market_data_dag.py +++ b/dags/market_data_dag.py @@ -1,123 +1,87 @@ -""" -These scripts set up three DAGs for collecting, storing and backing up financial market data, including stock data and cryptocurrency data. -The DAG for stock data collection and storage consists of three tasks: - -get_stocks: retrieves the symbol of the top 5 stocks accordingly with market performance -get_stock_data: retrieves detailed information of the stocks retrieved in task 1 -store_stock_data_in_database: stores the stock data in a database -The DAG for cryptocurrency data collection and storage consists of two tasks: - -get_crypto_data: retrieves data for the top 5 cryptocurrencies accordingly with market performance -store_crypto_data_in_database: stores the cryptocurrency data in a database -Both DAGs run at 11 PM, as specified by the schedule_interval parameter. Task dependencies are established such that get_stocks (for the stock data DAG) and get_crypto_data (for the cryptocurrency data DAG) must complete before their respective downstream tasks. Similarly, get_stock_data must complete before store_stock_data for the stock data DAG. +import os +import sys +from airflow.operators.python import PythonOperator +from airflow.models import DAG +from datetime import datetime +from dotenv import load_dotenv -The data_collection_storage_stocks DAG is scheduled to run everyday at 11 PM from Monday to Friday, as specified by the schedule_interval parameter. On the other hand, the data_collection_storage_crypto DAG is scheduled to run everyday at 11 PM, without any day-of-week restrictions. -Task dependencies are established such that get_stocks must complete before get_stock_data, and get_stock_data must complete before store_stock_data. Similarly, get_crypto_data must complete before store_crypto_data. +# Load environment variables from .env file +load_dotenv() -The third DAG, backup_data, is created for backing up the data from the database to S3 on the last day of every month at 11:59 PM. +# Find the parent directory +parent_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.dirname(parent_dir) -The script makes use of PythonOperator to define the tasks, and passes output of one task to the input of the next using op_kwargs and op_args parameters. The get_stocks_data, get_crypto_data and backup_data tasks have no dependencies on any previous tasks. -""" +# Add the project root to the Python path +sys.path.insert(0, project_root) -from airflow.operators.python import PythonOperator -from airflow.models import DAG -from datetime import datetime, timedelta -from data_collection_storage import ( - get_stocks, get_stock_data, store_stock_data, - get_crypto_data, store_crypto_data, backup_data +from core.market_data_processor import ( + StockApiClient, + CryptoApiClient, + Storage, + MarketDataEngine, ) # Define default arguments for the DAGs default_args_stocks = { - 'owner': 'airflow', - 'depends_on_past': False, - 'start_date': datetime(2023, 3, 15), - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 0, + "owner": "airflow", + "depends_on_past": False, + "start_date": datetime(2023, 3, 15), + "email_on_failure": False, + "email_on_retry": False, + "retries": 0, } default_args_cryptos = { - 'owner': 'airflow', - 'depends_on_past': False, - 'start_date': datetime(2023, 3, 15), - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 0, + "owner": "airflow", + "depends_on_past": False, + "start_date": datetime(2023, 3, 15), + "email_on_failure": False, + "email_on_retry": False, + "retries": 0, } -default_args_backup = { - 'owner': 'airflow', - 'depends_on_past': False, - 'start_date': datetime(2023, 3, 15), - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 0, -} +# Create instances of the classes +stock_api_client = StockApiClient( + os.environ["ALPHA_API_KEY"], os.environ["PREP_API_KEY"] +) +crypto_api_client = CryptoApiClient(os.environ["COIN_API_KEY"]) +db_connector = Storage( + os.getenv["POSTGRES_HOST"], + os.getenv["POSTGRES_PORT"], + os.getenv["POSTGRES_DB"], + os.getenv["POSTGRES_USER"], + os.getenv["POSTGRES_PASSWORD"], +) +stock_engine = MarketDataEngine(stock_api_client, db_connector) +crypto_engine = MarketDataEngine(crypto_api_client, db_connector) # Create the DAG for stock data collection and storage dag_stocks = DAG( - 'data_collection_storage_stocks', - default_args=default_args_stocks, - schedule_interval='0 23 * * 1-5', # Schedule to run everyday at 11 PM from Monday to Friday - description='Collect and store stock data' # Add description for the DAG + "data_collection_storage_stocks", + default_args=default_args_stocks, + schedule_interval="0 23 * * 1-5", # Schedule to run everyday at 11 PM from Monday to Friday + description="Collect and store stock data", ) # Create the DAG for cryptocurrency data collection and storage dag_cryptos = DAG( - 'data_collection_storage_crypto', - default_args=default_args_cryptos, - schedule_interval='0 23 * * *', # Schedule to run everyday at 11 PM - description='Collect and store cryptocurrency data' # Add description for the DAG -) - -# Create the DAG for backup -dag_backup = DAG( - 'backup_data', - default_args=default_args_backup, - description='Extract and store data from database to S3 monthly', - schedule_interval='59 23 L * *', # Run on the last day of every month at 11:59 PM + "data_collection_storage_crypto", + default_args=default_args_cryptos, + schedule_interval="0 23 * * *", # Schedule to run everyday at 11 PM + description="Collect and store cryptocurrency data", ) -# Define the tasks for stock data collection and storage -get_stocks_task = PythonOperator( - task_id='get_stocks', - python_callable=get_stocks, - dag=dag_stocks, -) - -get_stock_data_task = PythonOperator( - task_id='get_stock_data', - python_callable=get_stock_data, - op_kwargs={'symbols': get_stocks_task.output}, - dag=dag_stocks, -) - -store_stock_data_task = PythonOperator( - task_id='store_stock_data', - python_callable=store_stock_data, - op_kwargs={'data': get_stock_data_task.output}, +# Define the task for stock data collection and storage +process_stock_data_task = PythonOperator( + task_id="get_stocks", + python_callable=stock_engine.process_stock_data, dag=dag_stocks, ) # Define the tasks for cryptocurrency data collection and storage -get_crypto_data_task = PythonOperator( - task_id='get_crypto_data', - python_callable=get_crypto_data, +process_crypto_data_task = PythonOperator( + task_id="get_crypto", + python_callable=crypto_engine.process_crypto_data, dag=dag_cryptos, ) - -store_crypto_data_task = PythonOperator( - task_id='store_crypto_data', - python_callable=store_crypto_data, - op_args=[get_crypto_data_task.output], - dag=dag_cryptos, -) - -# Define task for database backup -backup_task = PythonOperator( - task_id='backup_data', - python_callable=backup_data, - op_kwargs={'bucket_name': 'marketdata6498'}, #replace with your bucket name - dag=dag_backup -)