Skip to content

Commit

Permalink
dependency injection
Browse files Browse the repository at this point in the history
  • Loading branch information
abeltavares committed May 26, 2024
1 parent 09ac123 commit 7d7b932
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 16 deletions.
22 changes: 8 additions & 14 deletions core/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,30 @@
from core.base_api import ApiClientFactory
from core.storage import Storage

logging.basicConfig(
level=logging.WARNING,
format="[%(asctime)s] [%(levelname)s] [%(name)s] - %(message)s",
)

logger = logging.getLogger(__name__)


class DataProcessor:
def __init__(self, asset_type: str):
def __init__(self, asset_type: str, api_client_factory: ApiClientFactory, db_connector: Storage, logger: logging.Logger):
self.asset_type = asset_type
self.api_client = ApiClientFactory(logger).get_client(asset_type)
self.db_connector = Storage(logger)
self.api_client = api_client_factory.get_client(asset_type)
self.logger = logger
self.db_connector = db_connector

def get_data(self):
try:
return self.api_client.get_data()
except Exception as e:
logger.error(f"Error getting {self.asset_type} data: {e}")
self.logger.error(f"Error getting {self.asset_type} data: {e}")
raise

def store_data(self, data):
try:
if data is not None:
self.db_connector.store_data(data, self.asset_type)
logger.info(f"{self.asset_type.capitalize()} data stored successfully.")
self.logger.info(f"{self.asset_type.capitalize()} data stored successfully.")
else:
logger.warning(
self.logger.warning(
f"No {self.asset_type} data retrieved. Nothing stored in the database."
)
except Exception as e:
logger.error(f"Error storing {self.asset_type} data: {e}")
self.logger.error(f"Error storing {self.asset_type} data: {e}")
raise
12 changes: 11 additions & 1 deletion dags/market_data_dag.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging
from datetime import datetime
from utils import read_json
from core.data_processor import DataProcessor
from core.base_api import ApiClientFactory
from core.storage import Storage

CONFIG = read_json("mdp_config.json")
print(CONFIG)

default_args = {
"owner": CONFIG.get("owner", "airflow"),
Expand All @@ -24,7 +28,13 @@ def create_market_data_dag(asset_type, dag_id, description):
description=description,
)

market_processor = DataProcessor(asset_type)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("DataPipeline")

api_client_factory = ApiClientFactory(logger)
db_connector = Storage(logger)

market_processor = DataProcessor(asset_type, api_client_factory, db_connector, logger)

with dag:
get_data_task = PythonOperator(
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
airflow==2.8.0
apache-airflow==2.8.0
requests==2.26.0
psycopg2-binary==2.9.1
python-dotenv==0.19.2
Expand Down

0 comments on commit 7d7b932

Please sign in to comment.