-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path04 Airflow_main_dag.py
79 lines (67 loc) · 2.6 KB
/
04 Airflow_main_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# Ref: https://medium.com/analytics-vidhya/a-gentle-introduction-to-data-workflows-with-apache-airflow-and-apache-spark-6c2cd9aee573
from datetime import timedelta, datetime
from airflow import models
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
# STEP 2:Define a start date
#In this case yesterday
yesterday = datetime(2020, 12, 10)
SPARK_CODE = ('gs://stackoverflow-dataset-677/01_user.py')
SPARK_CODE2 = ('gs://stackoverflow-dataset-677/02_user_comments_join.py')
dataproc_job_name = 'extract_users_job_dataproc'
dataproc_job_name2 = 'extract_comments_join_users_dataproc'
# STEP 3: Set default arguments for the DAG
default_dag_args = {
'start_date': yesterday,
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'project_id': models.Variable.get('project_id')
}
# STEP 4: Define DAG
# set the DAG name, add a DAG description, define the schedule interval and pass the default arguments defined before
with models.DAG(
'comments_extract_user_join_spark_workflow',
description='DAG for extracting comments and merging with user name',
schedule_interval=timedelta(days=1),
default_args=default_dag_args) as dag:
# STEP 5: Set Operators
# BashOperator
# A simple print date
print_date = BashOperator(
task_id='print_date',
bash_command='date'
)
# dataproc_operator
# Create small dataproc cluster
create_dataproc = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc',
cluster_name='dataproc-cluster-demo-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('dataproc_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
run_spark = dataproc_operator.DataProcPySparkOperator(
task_id='run_spark',
main=SPARK_CODE,
cluster_name='dataproc-cluster-demo-{{ ds_nodash }}',
job_name=dataproc_job_name
)
run_spark2 = dataproc_operator.DataProcPySparkOperator(
task_id='run_spark2',
main=SPARK_CODE2,
cluster_name='dataproc-cluster-demo-{{ ds_nodash }}',
job_name=dataproc_job_name2
)
# dataproc_operator
# Delete Cloud Dataproc cluster.
delete_dataproc = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc',
cluster_name='dataproc-cluster-demo-{{ ds_nodash }}',
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
# STEP 6: Set DAGs dependencies
# Each task should run after have finished the task before.
print_date >> create_dataproc >> run_spark >> run_spark2 >> delete_dataproc