Skip to content

Commit

Permalink
.gitignore
Browse files Browse the repository at this point in the history
  • Loading branch information
abeltavares committed May 24, 2024
1 parent 8fd9ad8 commit 69bd0d1
Show file tree
Hide file tree
Showing 10 changed files with 429 additions and 627 deletions.
6 changes: 4 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Ignore files generated by Python
__pycache__/
**/__pycache__
*.pyc

# Ignore log files
Expand All @@ -21,4 +21,6 @@ data/*.json
# Ignore Airflow files
plugins/
logs/
airflow.cfg
airflow.cfg

module/
Empty file added CONTRIBUTING.md
Empty file.
Binary file added assets/marketpipe_logo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
93 changes: 93 additions & 0 deletions core/crypto_api_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import requests
import logging
from dotenv import load_dotenv
import os
from base_api import BaseApiClient
from utils.market_data_processor_utils import read_json


COIN_BASE_URL = "https://pro-api.coinmarketcap.com/v1/cryptocurrency/listings/latest?"


load_dotenv()


class CryptoApiClient(BaseApiClient):
"""
A client for retrieving cryptocurrency data from the CoinMarketCap API.
This class inherits from the BaseApiClient abstract base class and implements the get_data method.
Attributes:
logger (logging.Logger): The logger object for logging messages.
Methods:
__init__(self, COIN_API_KEY: str, logger: logging.Logger): Initializes a new instance of the CryptoApiClient class.
"""

def __init__(self, logger: logging.Logger):
"""
Initializes a new instance of the CryptoApiClient class.
Args:
logger (logging.Logger): The logger object for logging messages.
"""
super().__init__(logger=logger)
self.symbols = read_json("mdp_config.json")['assets']['cryptos']['symbols']

def get_data(self) -> dict[str, dict[str, any]]:
"""
Retrieves market data for the given list of symbols from CoinMarketCap API.
Returns:
Dict[str, Dict[str, str]]: A dictionary containing the retrieved data for each symbol.
"""
parameters = {
"start": "1",
"limit": "100",
"convert": "USD",
"sort": "percent_change_24h",
}

headers = {
"Accepts": "application/json",
"X-CMC_PRO_API_KEY": os.getenv("COIN_API_KEY"),
}

crypto_data = {}

for symbol in self.symbols:
try:
parameters["symbol"] = symbol

response = requests.get(COIN_BASE_URL, headers=headers, params=parameters)
response.raise_for_status()

data = response.json()
symbol_data = data.get("data", [])[0]

symbol_info = {
"name": symbol_data.get("name", ""),
"volume": symbol_data["quote"]["USD"].get("volume_24h", ""),
"price": symbol_data["quote"]["USD"].get("price", ""),
"change_percent": symbol_data["quote"]["USD"].get(
"percent_change_24h", ""
),
"market_cap": symbol_data["quote"]["USD"].get("market_cap", ""),
}

crypto_data[symbol] = symbol_info

self.logger.info(f"Successfully retrieved data for symbol {symbol}.")
except requests.exceptions.RequestException as req_error:
self.logger.error(
f"Error during API request for {symbol}: {req_error}"
)
raise
except (IndexError, KeyError) as data_error:
self.logger.error(
f"Error processing data for {symbol}: {data_error}"
)
raise

return crypto_data
94 changes: 94 additions & 0 deletions core/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import psycopg2
import logging
from dotenv import load_dotenv
import os

load_dotenv()


class Storage:
"""
A class that handles storing data in a database.
Attributes:
logger (logging.Logger): The logger object for logging messages.
conn: The database connection object.
cur: The database cursor object.
"""

def __init__(self, logger: logging.Logger):
self.logger = logger
self.conn = None
self.cur = None

def _connect(self):
try:
self.conn = psycopg2.connect(
host=os.getenv("POSTGRES_HOST"),
port=os.getenv("POSTGRES_PORT"),
database=os.getenv("POSTGRES_DB"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"),
)
self.cur = self.conn.cursor()
except psycopg2.Error as e:
self.logger.error(f"Error connecting to the database: {e}")
raise

def _close(self):
try:
if self.cur:
self.cur.close()
if self.conn:
self.conn.close()
except psycopg2.Error as e:
self.logger.error(f"Error closing the database connection: {e}")

def store_data(self, data: dict[str, dict[str, any]], table: str) -> None:
if not isinstance(table, str):
error_msg = "Table name must be a string"
self.logger.error(error_msg)
raise TypeError(error_msg)
try:
self.logger.info("Storing data in the database.")

self._connect()

with self.conn, self.cur:
for symbol, asset_data in data.items():
name = asset_data["name"]
volume = asset_data["volume"]
price = asset_data["price"]
market_cap = asset_data["market_cap"]
change_percent = asset_data["change_percent"]

if not all([symbol, name]):
self.logger.error(
f"One or more required fields are missing from the {table} data for symbol: {symbol}, name: {name}"
)
raise ValueError(
f"One or more required fields are missing from the {table} data"
)

self.cur.execute(
f"INSERT INTO {table} (symbol, name, market_cap, volume, price, change_percent) VALUES (%s, %s, %s, %s, %s, %s)",
(symbol, name, market_cap, volume, price, change_percent),
)

self.logger.info(
f"Successfully stored data for symbol {symbol} in the {table} table."
)

self.conn.commit()

except psycopg2.Error as error:
self.logger.error(
f"An error occurred while storing data in the database: {error}"
)
if self.conn:
self.conn.rollback()
finally:
self._close()



116 changes: 33 additions & 83 deletions dags/market_data_dag.py
Original file line number Diff line number Diff line change
@@ -1,97 +1,47 @@
import os
import sys
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import DAG
from datetime import datetime
from dotenv import load_dotenv
import logging
from utils.market_data_processor_utils import read_json
from core.data_processor import DataProcessor

# Configure logging
logging.basicConfig(
level=logging.WARNING,
format="[%(asctime)s] [%(levelname)s] [%(name)s] - %(message)s",
)
config = read_json("mdp_config.json")

logger = logging.getLogger(__name__)

# Load environment variables from .env file
load_dotenv()

# Find the parent directory
parent_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(parent_dir)
default_args = {
"owner": config.get("owner", "airflow"),
"depends_on_past": False,
"start_date": datetime.now(),
"email_on_failure": config.get("email_on_failure", False),
"email_on_retry": config.get("email_on_retry", False),
"retries": config.get("retries", 1),
}

# Add the project root to the Python path
sys.path.insert(0, project_root)

from core.market_data_processor import (
StockApiClient,
CryptoApiClient,
Storage,
MarketDataEngine,
)
def create_market_data_dag(asset_type, dag_id, description):
dag = DAG(
dag_id,
default_args=default_args,
schedule_interval=config["assets"][asset_type]["schedule_interval"],
description=description,
)

# Define default arguments for the DAGs
default_args_stocks = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2023, 3, 15),
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
}
market_processor = DataProcessor(asset_type)

default_args_cryptos = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2023, 3, 15),
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
}
with dag:
get_data_task = PythonOperator(
task_id=f"get_{asset_type}_data",
python_callable=market_processor.get_data,
)

# Create instances of the classes
stock_api_client = StockApiClient(
os.environ["ALPHA_API_KEY"], os.environ["PREP_API_KEY"], logger
)
crypto_api_client = CryptoApiClient(os.environ["COIN_API_KEY"], logger)
db_connector = Storage(
os.getenv("POSTGRES_HOST"),
os.getenv("POSTGRES_PORT"),
os.getenv("POSTGRES_DB"),
os.getenv("POSTGRES_USER"),
os.getenv("POSTGRES_PASSWORD"),
logger,
)
stock_engine = MarketDataEngine(stock_api_client, db_connector, logger)
crypto_engine = MarketDataEngine(crypto_api_client, db_connector, logger)
store_data_task = PythonOperator(
task_id=f"store_{asset_type}_data",
python_callable=market_processor.store_data,
)

# Create the DAG for stock data collection and storage
dag_stocks = DAG(
"process_stock_data",
default_args=default_args_stocks,
schedule_interval="0 23 * * 1-5", # Schedule to run everyday at 11 PM from Monday to Friday
description="Collect and store stock data",
)
get_data_task >> store_data_task

# Create the DAG for cryptocurrency data collection and storage
dag_cryptos = DAG(
"process_crypto_data",
default_args=default_args_cryptos,
schedule_interval="0 23 * * *", # Schedule to run everyday at 11 PM
description="Collect and store cryptocurrency data",
)
return dag

# Define the task for stock data collection and storage
process_stock_data_task = PythonOperator(
task_id="get_stocks",
python_callable=stock_engine.process_stock_data,
dag=dag_stocks,
)

# Define the tasks for cryptocurrency data collection and storage
process_crypto_data_task = PythonOperator(
task_id="get_crypto",
python_callable=crypto_engine.process_crypto_data,
dag=dag_cryptos,
)
create_market_data_dag("stocks", "process_stock_data", "Collect and store stock data")
create_market_data_dag("cryptos", "process_crypto_data", "Collect and store crypto data")
27 changes: 27 additions & 0 deletions database_setup/init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- Create the schema
CREATE SCHEMA IF NOT EXISTS market_data;


-- Create a table for stock data
CREATE TABLE IF NOT EXISTS market_data.stocks (
id SERIAL PRIMARY KEY,
date_collected DATE NOT NULL DEFAULT CURRENT_DATE,
symbol VARCHAR(20) NOT NULL,
name VARCHAR(50) NOT NULL,
market_cap DECIMAL(20,2) NOT NULL,
volume INT NOT NULL,
price DECIMAL(10,2) NOT NULL,
change_percent DECIMAL(15,8) NOT NULL
);

-- Create a table for cryptocurrency data
CREATE TABLE IF NOT EXISTS market_data.cryptos (
id SERIAL PRIMARY KEY,
date_collected DATE NOT NULL DEFAULT CURRENT_DATE,
symbol VARCHAR(20) NOT NULL,
name VARCHAR(50) NOT NULL,
market_cap DECIMAL(20,2) NOT NULL,
volume INT NOT NULL,
price DECIMAL(25,15) NOT NULL,
change_percent DECIMAL(50,30) NOT NULL
);
Loading

0 comments on commit 69bd0d1

Please sign in to comment.