Skip to content

Commit

Permalink
Sync ADA code with private repo version (#9)
Browse files Browse the repository at this point in the history
* sync code

Signed-off-by: Julia Farias <julia.farias@ibm.com>

* fix numpy version

Signed-off-by: Julia Farias <julia.farias@ibm.com>

* fix pandas version

Signed-off-by: Julia Farias <julia.farias@ibm.com>

* fix typing_extensions version

Signed-off-by: Julia Farias <julia.farias@ibm.com>

* remove internal content

Signed-off-by: Julia Farias <julia.farias@ibm.com>

---------

Signed-off-by: Julia Farias <julia.farias@ibm.com>
  • Loading branch information
juliaalfarias authored Mar 25, 2024
1 parent 1e7c7be commit 8aadbaa
Show file tree
Hide file tree
Showing 10 changed files with 252 additions and 63 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,5 @@ dmypy.json

# Pyre type checker
.pyre/
.DS_Store
models/.DS_Store
112 changes: 112 additions & 0 deletions .secrets.baseline
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
{
"exclude": {
"files": "^.secrets.baseline$",
"lines": null
},
"generated_at": "2023-10-23T12:06:31Z",
"plugins_used": [
{
"name": "AWSKeyDetector"
},
{
"name": "ArtifactoryDetector"
},
{
"name": "AzureStorageKeyDetector"
},
{
"base64_limit": 4.5,
"name": "Base64HighEntropyString"
},
{
"name": "BasicAuthDetector"
},
{
"name": "BoxDetector"
},
{
"name": "CloudantDetector"
},
{
"ghe_instance": "github.ibm.com",
"name": "GheDetector"
},
{
"name": "GitHubTokenDetector"
},
{
"hex_limit": 3,
"name": "HexHighEntropyString"
},
{
"name": "IbmCloudIamDetector"
},
{
"name": "IbmCosHmacDetector"
},
{
"name": "JwtTokenDetector"
},
{
"keyword_exclude": null,
"name": "KeywordDetector"
},
{
"name": "MailchimpDetector"
},
{
"name": "NpmDetector"
},
{
"name": "PrivateKeyDetector"
},
{
"name": "SlackDetector"
},
{
"name": "SoftlayerDetector"
},
{
"name": "SquareOAuthDetector"
},
{
"name": "StripeDetector"
},
{
"name": "TwilioKeyDetector"
}
],
"results": {
"tests/test_app.py": [
{
"hashed_secret": "2d19398bd2ad12800e216c8cac4203dd927bc25d",
"is_secret": false,
"is_verified": false,
"line_number": 55,
"type": "Secret Keyword",
"verified_result": null
},
{
"hashed_secret": "c5ab283dcc3668bb26394c66513c98ff0c234692",
"is_secret": false,
"is_verified": false,
"line_number": 76,
"type": "Base64 High Entropy String",
"verified_result": null
},
{
"hashed_secret": "4d48c6376ebfec3af94c3d9a1b33f4c045e361f7",
"is_secret": false,
"is_verified": false,
"line_number": 79,
"type": "Secret Keyword",
"verified_result": null
}
]
},
"version": "0.13.1+ibm.61.dss",
"word_list": {
"file": null,
"hash": null
}
}
24 changes: 6 additions & 18 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,30 +1,18 @@
FROM python:3.7

ENV OPEN_JDK_VERSION 8
ENV JAVA_HOME /usr/lib/jvm/java-${OPEN_JDK_VERSION}-openjdk-amd64

RUN echo "deb http://ftp.us.debian.org/debian stretch main" >> /etc/apt/sources.list && \
apt-get update

RUN echo 'deb http://ftp.debian.org/debian stretch-backports main' | tee /etc/apt/sources.list.d/stretch-backports.list
FROM python:3.9
WORKDIR /app

RUN apt-get update --yes && \
apt-get install --yes --no-install-recommends \
"openjdk-${OPEN_JDK_VERSION}-jre-headless" \
ca-certificates-java && \
update-ca-certificates -f && \
apt-get install --yes --no-install-recommends && \
apt-get clean && rm -rf /var/lib/apt/lists/*

RUN mkdir app

COPY app.py app/app.py
COPY models/ app/models/
COPY app.py app.py
COPY models/ models/
COPY requirements.txt requirements.txt

RUN pip3 install -r requirements.txt
RUN pip3 install --no-cache-dir -r requirements.txt

EXPOSE 7000

WORKDIR /app

ENTRYPOINT [ "python3", "/app/app.py" ]
43 changes: 38 additions & 5 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,33 @@ def authentication_layer() -> bool:


def retrieve_data_from_scheduling(
object_id=None, object_name=None, query_select="base_query"
object_id=None, object_name=None, query_select="dag_query"
) -> DataFrame:
"""Connect to Scheduling database, execute SQL query and retrieve desired data."""
try:
outlier_min = int(request.headers.get("outlier_min", 0))
outlier_max = int(request.headers.get("outlier_max", 1440))
limit = int(request.headers.get("limit", 25))
except:
raise Exception("Invalid parameter, use integer")

try:
stddev_multiplier = float(request.headers.get("stddev_multiplier", 3))
score_threshold = float(request.headers.get("score_threshold", 1.4))
except:
raise Exception("Invalid parameter, use float")
dag_id = request.headers.get("dag_id", "")
conn = None
params = {
"object_id": object_id,
"object_name": object_name,
"query_select": query_select,
"outlier_min": outlier_min,
"outlier_max": outlier_max,
"limit": limit,
"stddev_multiplier": stddev_multiplier,
"score_threshold": score_threshold,
"dag_id": dag_id
}

query = build_query(**params)
Expand All @@ -76,11 +95,20 @@ def retrieve_data_from_scheduling(
host=os.getenv("HOST"),
port=os.getenv("API_PORT"),
)
airflow_df = pd.read_sql(query, conn)
cursor = conn.cursor()
cursor.execute(query)
results = cursor.fetchall()

labels = []
for name in cursor.description:
labels.append(name[0])

airflow_df = pd.DataFrame(results, columns=labels)
except pg.DatabaseError:
raise pg.DatabaseError
finally:
conn.close()
if conn:
conn.close()

return airflow_df

Expand All @@ -105,7 +133,7 @@ def dag_id(dag_id=None):
try:
authentication_layer()
airflow_replica_df = retrieve_data_from_scheduling(
object_id="dag_id", object_name=dag_id
object_id="dag_id", object_name=dag_id, query_select="dag_query"
)
airflow_replica_df = airflow_replica_df.to_json(orient="records")
return airflow_replica_df
Expand All @@ -121,7 +149,7 @@ def task_id(task_id=None):
try:
authentication_layer()
airflow_replica_df = retrieve_data_from_scheduling(
object_id="task_id", object_name=task_id
object_id="task_id", object_name=task_id, query_select="task_query"
)
airflow_replica_df = airflow_replica_df.to_json(orient="records")
return airflow_replica_df
Expand All @@ -136,5 +164,10 @@ def page_not_found(e):
return error_handler("Route not found.", HTTPStatus.NOT_FOUND)


@app.get("/health/ping")
async def health_check():
return {"status": "available"}


if __name__ == "__main__":
app.run(host="0.0.0.0", port=7000, debug=True)
9 changes: 7 additions & 2 deletions models/sql_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ def build_query(**params):
params = {
"object_id": params.get("object_id"),
"object_name": params.get("object_name"),
"query_select": params.get("query_select")
"query_select": params.get("query_select"),
"outlier_min": params.get("outlier_min"),
"outlier_max": params.get("outlier_max"),
"limit": params.get("limit"),
"stddev_multiplier": params.get("stddev_multiplier"),
"score_threshold": params.get("score_threshold"),
"dag_id": params.get("dag_id"),
}

query_template = queries.get(params.get("query_select"))


j = JinjaSql(param_style="pyformat")
query, bind_params = j.prepare_query(query_template, params)

Expand Down
65 changes: 49 additions & 16 deletions models/templates/templates.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,62 @@
DAG_QUERY = """
select
{{object_id}},
count(dag_id) count_runs,
ceiling(avg(total_time)) average,
ceiling(percentile_cont(0.5) within group (order by (total_time))) median,
ceiling(max(total_time)) maximum,
ceiling(min(total_time)) minimum,
ceiling(stddev(total_time)) standard_deviation,
ceiling(variance(total_time)) variance_,
ceiling((avg(total_time) + {{stddev_multiplier}}*ceiling(stddev(total_time)))*{{score_threshold}}) score
from (select dag_id,
count(*) over (partition by dag_id) as _count_rows,
row_number() over (partition by dag_id order by end_date desc) as _row_number,
extract(epoch from (end_date - start_date))/60 total_time
from dag_run
where
{{object_id}} in ('{{object_name}}')
and state in ('success')
and start_date is not null) dr
where total_time > {{outlier_min}}
and total_time < {{outlier_max}}
and _row_number <= {{limit}}
group by {{object_id}}
order by {{object_id}};
"""

BASE_QUERY = """
TASK_QUERY = """
select
{{object_id}},
max(runs) count_runs,
dag_id,
count(task_id) count_runs,
ceiling(avg(total_time)) average,
ceiling(percentile_cont(0.5) within group (order by (total_time))) median,
ceiling(max(total_time)) maximum,
ceiling(min(total_time)) minimum,
ceiling(cast(stddev(total_time) as integer)) standard_deviation,
ceiling(cast(variance(total_time) as bigint)) variance_,
ceiling(((ceiling(percentile_cont(0.5) within group (order by (total_time))) + ceiling(cast(stddev(total_time) as integer)))/ceiling(percentile_cont(0.5) within group (order by (total_time))))*ceiling(percentile_cont(0.5) within group (order by (total_time)))*1.2) score
from (
select *,
row_number() over (partition by task_id, dag_id order by dag_id) runs,
ceiling(stddev(total_time)) standard_deviation,
ceiling(variance(total_time)) variance_,
ceiling((avg(total_time) + {{stddev_multiplier}}*ceiling(stddev(total_time)))*{{score_threshold}}) score
from (select task_id, dag_id,
count(*) over (partition by task_id) as _count_rows,
row_number() over (partition by task_id order by end_date desc) as _row_number,
extract(epoch from (end_date - start_date))/60 total_time
from task_instance
where
{{object_id}} in ('{{object_name}}')
and task_id not in ('start', 'end', 'check_end', 'end_failure', 'end.end_failure', 'end_success', 'end.end_success')
and dag_id in ('{{dag_id}}')
and operator = 'CrdTriggerOperator'
and state in ('success')
and start_date is not null
and try_number != 0) ti
group by {{object_id}}
where total_time > {{outlier_min}}
and total_time < {{outlier_max}}
and _row_number <= {{limit}}
group by {{object_id}}, dag_id
order by {{object_id}};
"""


ALL_QUERY = """
select
task_id,
Expand All @@ -34,9 +66,9 @@
ceiling(percentile_cont(0.5) within group (order by (total_time))) median,
ceiling(max(total_time)) maximum,
ceiling(min(total_time)) minimum,
ceiling(cast(stddev(total_time) as integer)) standard_deviation,
ceiling(cast(variance(total_time) as bigint)) variance_,
ceiling(((ceiling(percentile_cont(0.5) within group (order by (total_time))) + ceiling(cast(stddev(total_time) as integer)))/ceiling(percentile_cont(0.5) within group (order by (total_time))))*ceiling(percentile_cont(0.5) within group (order by (total_time)))*1.2) score
ceiling(stddev(total_time)) standard_deviation,
ceiling(variance(total_time)) variance_,
ceiling((avg(total_time) + {{stddev_multiplier}}*ceiling(stddev(total_time)))*{{score_threshold}}) score
from (
select *,
row_number() over (partition by task_id, dag_id order by dag_id) runs,
Expand All @@ -52,6 +84,7 @@
"""

queries = {
"base_query": BASE_QUERY,
"all_query" : ALL_QUERY,
}
"dag_query": DAG_QUERY,
"task_query": TASK_QUERY,
"all_query": ALL_QUERY,
}
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[tool.poetry]
name = "epm-ada"
version = "1.0.0"
description = "A microservice to retrieve key analytics metrics for task and DAG level from Airflow database instance."
24 changes: 19 additions & 5 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
cryptography==3.4.7
Flask==2.0.3
Flask_RESTful==0.3.9
aniso8601==9.0.1
cffi==1.15.1
click==8.1.3
cryptography==41.0.2
Flask==2.2.5
Flask-RESTful==0.3.9
itsdangerous==2.1.2
Jinja2==3.0.2
jinjasql==0.1.8
pandas==1.0.1
psycopg2==2.7.7
MarkupSafe==2.1.2
numpy>=1.21.6
pandas>=1.3.5
psycopg2-binary==2.9.5
pycparser==2.21
python-dateutil==2.8.2
pytz==2022.7.1
six==1.16.0
Werkzeug==2.2.3
pytest-mock==3.3.1
asgiref==3.7.2
typing_extensions>=4.7.0
Loading

0 comments on commit 8aadbaa

Please sign in to comment.