From f2adce030f999c624e20d1b8865408231f4a4494 Mon Sep 17 00:00:00 2001 From: 121238257 Date: Fri, 24 May 2024 21:02:29 +0100 Subject: [PATCH] dags --- dags/market_data_dag.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/dags/market_data_dag.py b/dags/market_data_dag.py index 717ebbe..0ad02af 100644 --- a/dags/market_data_dag.py +++ b/dags/market_data_dag.py @@ -1,19 +1,18 @@ from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime -import logging -from utils.market_data_processor_utils import read_json +from utils import read_json from core.data_processor import DataProcessor -config = read_json("mdp_config.json") +CONFIG = read_json("mdp_config.json") default_args = { - "owner": config.get("owner", "airflow"), + "owner": CONFIG.get("owner", "airflow"), "depends_on_past": False, "start_date": datetime.now(), - "email_on_failure": config.get("email_on_failure", False), - "email_on_retry": config.get("email_on_retry", False), - "retries": config.get("retries", 1), + "email_on_failure": CONFIG.get("email_on_failure", False), + "email_on_retry": CONFIG.get("email_on_retry", False), + "retries": CONFIG.get("retries", 1), } @@ -21,7 +20,7 @@ def create_market_data_dag(asset_type, dag_id, description): dag = DAG( dag_id, default_args=default_args, - schedule_interval=config["assets"][asset_type]["schedule_interval"], + schedule_interval=CONFIG["assets"][asset_type]["schedule_interval"], description=description, ) @@ -33,9 +32,12 @@ def create_market_data_dag(asset_type, dag_id, description): python_callable=market_processor.get_data, ) - store_data_task = PythonOperator( - task_id=f"store_{asset_type}_data", - python_callable=market_processor.store_data, + store_data_task = ( + PythonOperator( + task_id=f"store_{asset_type}_data", + python_callable=market_processor.store_data, + op_args=[get_data_task.output], + ), ) get_data_task >> store_data_task @@ -44,4 +46,6 @@ def create_market_data_dag(asset_type, dag_id, description): create_market_data_dag("stocks", "process_stock_data", "Collect and store stock data") -create_market_data_dag("cryptos", "process_crypto_data", "Collect and store crypto data") +create_market_data_dag( + "cryptos", "process_crypto_data", "Collect and store crypto data" +)