-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ajusta o código para utilizar XCom_pull de múltiplos Tasks IDs. #140
Merged
Merged
Changes from 4 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
7b4e024
remove ast
edulauer f257cac
refactor has_matches for xcom_pull
edulauer 9b27b76
create function send_notification in DouDigestGenerator
edulauer 78495a7
remove ast
edulauer e7f1304
remove ensure_list function (solved with pydantic)
edulauer 6060f87
Merge branch 'main' into fix_xcom_pull
edulauer File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -8,7 +8,6 @@ | |||||
[] - Definir sufixo do título do email a partir de configuração | ||||||
""" | ||||||
|
||||||
import ast | ||||||
import logging | ||||||
import os | ||||||
import sys | ||||||
|
@@ -344,12 +343,41 @@ def perform_searches( | |||||
|
||||||
return search_dict | ||||||
|
||||||
def has_matches(self, search_result: list, skip_null: bool) -> str: | ||||||
def _ensure_list_of_searches(self, specs: DAGConfig) -> list: | ||||||
"""Ensure that the search attribute is returned as a list, | ||||||
whether it's originally a single element or a list.""" | ||||||
|
||||||
# is it a single search or a list of searchers? | ||||||
if isinstance(specs.search, list): | ||||||
searches = specs.search | ||||||
else: | ||||||
searches = [specs.search] | ||||||
|
||||||
return searches | ||||||
|
||||||
def get_xcom_pull_tasks(self, num_searches, **context): | ||||||
"""Retrieve XCom values from multiple tasks and append them to a new list. | ||||||
Function required for Airflow version 2.10.0 or later | ||||||
(https://github.com/apache/airflow/issues/41983). | ||||||
""" | ||||||
search_results = [] | ||||||
for counter in range(1, num_searches + 1): | ||||||
search_results.append(context["ti"].xcom_pull( | ||||||
task_ids=f'exec_searchs.exec_search_{counter}')) | ||||||
|
||||||
return search_results | ||||||
|
||||||
|
||||||
def has_matches(self, num_searches: int, skip_null: bool, **context) -> str: | ||||||
"""Check if search has matches and return to skip notification or not""" | ||||||
|
||||||
if skip_null: | ||||||
search_results = self.get_xcom_pull_tasks(num_searches=num_searches, | ||||||
**context) | ||||||
|
||||||
skip_notification = True | ||||||
search_result = ast.literal_eval(search_result) | ||||||
for search in search_result: | ||||||
|
||||||
for search in search_results: | ||||||
items = ["contains" for k, v in search["result"].items() if v] | ||||||
if items: | ||||||
skip_notification = False | ||||||
|
@@ -378,6 +406,20 @@ def select_terms_from_db(self, sql: str, conn_id: str): | |||||
|
||||||
return terms_df.to_json(orient="columns") | ||||||
|
||||||
def send_notification(self, | ||||||
num_searches: int, | ||||||
specs: DAGConfig, | ||||||
report_date: str, | ||||||
**context) -> str: | ||||||
"""Send user notification using class Notifier | ||||||
""" | ||||||
search_report = self.get_xcom_pull_tasks(num_searches=num_searches, | ||||||
**context) | ||||||
|
||||||
notifier = Notifier(specs) | ||||||
|
||||||
notifier.send_notification(search_report=search_report, report_date=report_date) | ||||||
|
||||||
def create_dag(self, specs: DAGConfig, config_file: str) -> DAG: | ||||||
"""Creates the DAG object and tasks | ||||||
|
||||||
|
@@ -413,11 +455,7 @@ def create_dag(self, specs: DAGConfig, config_file: str) -> DAG: | |||||
|
||||||
with TaskGroup(group_id="exec_searchs") as tg_exec_searchs: | ||||||
|
||||||
# is it a single search or a list of searchers? | ||||||
if isinstance(specs.search, list): | ||||||
searches = specs.search | ||||||
else: | ||||||
searches = [specs.search] | ||||||
searches = self._ensure_list_of_searches(specs=specs) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Já é lista (vide comentário acima). |
||||||
|
||||||
for counter, subsearch in enumerate(searches, 1): | ||||||
|
||||||
|
@@ -476,14 +514,7 @@ def create_dag(self, specs: DAGConfig, config_file: str) -> DAG: | |||||
task_id="has_matches", | ||||||
python_callable=self.has_matches, | ||||||
op_kwargs={ | ||||||
"search_result": "{{ ti.xcom_pull(task_ids=" | ||||||
+ str( | ||||||
[ | ||||||
f"exec_searchs.exec_search_{count}" | ||||||
for count in range(1, len(searches) + 1) | ||||||
] | ||||||
) | ||||||
+ ") }}", | ||||||
"num_searches": len(searches), | ||||||
"skip_null": specs.report.skip_null, | ||||||
}, | ||||||
) | ||||||
|
@@ -492,16 +523,10 @@ def create_dag(self, specs: DAGConfig, config_file: str) -> DAG: | |||||
|
||||||
send_notification_task = PythonOperator( | ||||||
task_id="send_notification", | ||||||
python_callable=Notifier(specs).send_notification, | ||||||
python_callable=self.send_notification, | ||||||
op_kwargs={ | ||||||
"search_report": "{{ ti.xcom_pull(task_ids=" | ||||||
+ str( | ||||||
[ | ||||||
f"exec_searchs.exec_search_{count}" | ||||||
for count in range(1, len(searches) + 1) | ||||||
] | ||||||
) | ||||||
+ ") }}", | ||||||
"num_searches": len(searches), | ||||||
"specs": specs, | ||||||
"report_date": template_ano_mes_dia_trigger_local_time, | ||||||
}, | ||||||
) | ||||||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
O validator do Pydantic já garante que será sempre uma lista. Essa função agora é desnecessária.