Skip to content

Commit

Permalink
Merge pull request #728 from harrystech/ynaim94/DENG-1806/show-downst…
Browse files Browse the repository at this point in the history
…ream-dbt

DENG-1806: Show downstream dependents in dbt
  • Loading branch information
ynaim94-harrys authored Jun 10, 2022
2 parents 394da2c + 00d1be4 commit 207d716
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 5 deletions.
9 changes: 8 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,25 @@ RUN yum install -y \
jq \
libyaml-devel \
openssh-clients \
passwd \
postgresql \
procps-ng \
python3 \
python3-devel \
sudo \
tar \
tmux \
vim-minimal

RUN amazon-linux-extras install docker

# Run as non-priviledged user "arthur".
RUN useradd --comment 'Arthur ETL' --user-group --create-home arthur && \
RUN useradd --comment 'Arthur ETL' --user-group --groups wheel --create-home arthur && \
mkdir --parent /opt/data-warehouse "/opt/local/$PROJ_NAME" /opt/src/arthur-redshift-etl && \
chown -R arthur.arthur /opt/*

RUN echo "arthur:arthur" | chpasswd

USER arthur

# The .bashrc will ensure the virutal environment is activated when running interactive shells.
Expand Down
6 changes: 6 additions & 0 deletions bin/docker_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,16 @@ case "$action" in
docker run --rm --interactive --tty \
--env ARTHUR_DEFAULT_PREFIX="$target_env" \
--env DATA_WAREHOUSE_CONFIG="/opt/data-warehouse/$config_path" \
--env DBT_ROOT="$data_warehouse_path/dbt" \
--env DBT_PROFILES_DIR="${HOME}/.dbt/profiles.yml" \
--sysctl net.ipv4.tcp_keepalive_time=300 \
--sysctl net.ipv4.tcp_keepalive_intvl=60 \
--sysctl net.ipv4.tcp_keepalive_probes=9 \
--volume ~/.aws:/home/arthur/.aws \
--volume ~/.ssh:/home/arthur/.ssh:ro \
--volume "$data_warehouse_path:/opt/data-warehouse" \
--volume "$(pwd):/opt/src/arthur-redshift-etl" \
--volume /var/run/docker.sock:/var/run/docker.sock:ro \
$publish_arg \
$profile_arg \
"arthur-redshift-etl:$tag"
Expand All @@ -183,12 +186,15 @@ case "$action" in
docker run --rm --interactive --tty \
--env ARTHUR_DEFAULT_PREFIX="$target_env" \
--env DATA_WAREHOUSE_CONFIG="/opt/data-warehouse/$config_path" \
--env DBT_ROOT="$data_warehouse_path/dbt" \
--env DBT_PROFILES_DIR="${HOME}/.dbt/profiles.yml" \
--sysctl net.ipv4.tcp_keepalive_time=300 \
--sysctl net.ipv4.tcp_keepalive_intvl=60 \
--sysctl net.ipv4.tcp_keepalive_probes=9 \
--volume ~/.aws:/home/arthur/.aws \
--volume ~/.ssh:/home/arthur/.ssh:ro \
--volume "$data_warehouse_path:/opt/data-warehouse" \
--volume /var/run/docker.sock:/var/run/docker.sock:ro \
$publish_arg \
$profile_arg \
"arthur-redshift-etl:$tag"
Expand Down
2 changes: 2 additions & 0 deletions docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ if [[ -r "/opt/local/redshift_etl/venv/bin/activate" ]]; then
fi
fi

echo arthur >> sudo chmod 777 /var/run/docker.sock

exec "$@"
31 changes: 30 additions & 1 deletion python/etl/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import etl.templates
import etl.unload
import etl.validate
from etl.dbt import DbtModelIdentifier, DBTProject
from etl.errors import ETLError, ETLSystemError, InvalidArgumentError
from etl.text import join_with_single_quotes
from etl.util import croak, isoformat_datetime_string
Expand Down Expand Up @@ -1762,20 +1763,48 @@ def add_arguments(self, parser):
action="store_true",
help="show list of dependents (upstream) for every relation",
)
group.add_argument(
"--include-dbt",
action="store_true",
help="show list of dependents (upstream) for every relation",
)

def callback(self, args):
dw_config = etl.config.get_dw_config()
relations = self.find_relation_descriptions(
args, required_relation_selector=dw_config.required_in_full_load_selector, return_all=True
)
etl.load.show_downstream_dependents(
relations = etl.load.show_downstream_dependents(
relations,
args.pattern,
continue_from=args.continue_from,
with_dependencies=args.with_dependencies,
with_dependents=args.with_dependents,
)

if not args.include_dbt:
return

dbt_model_identifiers = [
DbtModelIdentifier(*relation.identifier.split(".")) for relation in relations
]
dbt_project = DBTProject(os.environ["DBT_ROOT"], os.environ["DBT_PROFILES_DIR"])
dbt_downstream_parents = " ".join(
[f"{parent}+" for parent in dbt_project.show_downstream_dbt_parents(dbt_model_identifiers)]
)
if not dbt_downstream_parents:
logging.info("No dbt downstream dependents found")
return

logging.info("dbt downstream dependents found")
dbt_project.build_image()
dbt_stdout = dbt_project.run_cmd(
f"dbt list -t dev --exclude redshift --output json "
f" --resource-type model -s {dbt_downstream_parents}"
)
dbt_relations = dbt_project.parse_dbt_run_stdout(dbt_stdout)
dbt_project.render_dbt_list(dbt_relations)


class ShowUpstreamDependenciesCommand(SubCommand):
def __init__(self):
Expand Down
109 changes: 109 additions & 0 deletions python/etl/dbt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import json
import logging
import os
import re
from collections import namedtuple
from typing import Sequence

import docker

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

DbtModelIdentifier = namedtuple("DbtModelIdentifier", ["schema", "table"])

DBTRelation = namedtuple("DBTRelation", ["name", "depends_on", "type", "is_required"])


class DBTProject:
def __init__(self, dbt_project_root, dbt_profiles_dir):
self.dbt_root = dbt_project_root
self.dbt_profiles_dir = dbt_profiles_dir
self.local_dbt_path = "dbt"
self.client = docker.from_env()
self.tag = "arthur_dbt:latest"

def build_image(self):
logging.info("Building DBT image")
img = self.client.api.build(
path="dbt", tag=self.tag, dockerfile="Dockerfile", quiet=False, nocache=False
)

# Wait for the image to build
for _ in img:
continue
logging.info(f"{self.tag} docker image built")

def run_cmd(self, cmd):
try:
logging.info(f"Executing inside dbt container {self.tag}: $ {cmd}")
return self.client.containers.run(
self.tag,
cmd,
volumes={
self.dbt_root: {"bind": "/dbt", "mode": "rw"},
self.dbt_profiles_dir: {"bind": "/root/.dbt/profiles.yml", "mode": "ro"},
},
stderr=True,
stdout=True,
).decode("utf-8")
except docker.errors.ContainerError as exc:
print(exc.container.logs())
raise

@staticmethod
def get_files_in_path(path, file_types=None, prefix=""):
for root, _, files in os.walk(path):
for file in files:
if (not file_types or file.split(".")[-1] in file_types) and file.startswith(prefix):
yield (root, file)

def show_downstream_dbt_parents(self, dbt_model_indentifiers: Sequence[DbtModelIdentifier]):
dbt_sql_files = self.get_files_in_path(self.local_dbt_path, file_types=("sql"))
db_source_regex = r"db_source\(\s*'(.*)'\s*,\s*'(.*)'\s*\)"

for model_path, sql_file_path in dbt_sql_files:
with open(os.path.join(model_path, sql_file_path), "r") as f:
sql_file = f.read()
db_sources = re.findall(db_source_regex, sql_file)
for db_source in db_sources:
schema, table = db_source
for dmi in dbt_model_indentifiers:
if dmi.schema == schema and dmi.table == table:
yield os.path.splitext(sql_file_path)[0]

def parse_dbt_run_stdout(self, res):
res_list = res.strip().split("\n")
relations = []
for e in res_list:
try:
d = json.loads(e)
except json.decoder.JSONDecodeError:
continue
d["depends_on"] = [node.split(".")[-1] for node in d["depends_on"]["nodes"]]
d["type"] = d["config"]["materialized"].upper()
d["is_required"] = "required" in d["config"]["tags"]
relations.append(DBTRelation(d["name"], d["depends_on"], d["type"], d["is_required"]))

return relations

def render_dbt_list(self, dbt_relations, with_dependencies=False, with_dependents=False):
current_index = {relation.name: i + 1 for i, relation in enumerate(dbt_relations)}
width_selected = max(len(name) for name in current_index)
line_template = (
"{relation.name:{width}s}"
" # {relation.type} index={index:4d}"
" flag={flag:9s}"
" is_required={relation.is_required}"
)

for relation in dbt_relations:
print(
line_template.format(
flag="DBT",
index=current_index[relation.name],
relation=relation,
width=width_selected,
)
)
return
5 changes: 3 additions & 2 deletions python/etl/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -1426,7 +1426,7 @@ def show_downstream_dependents(
continue_from: Optional[str] = None,
with_dependencies: Optional[bool] = False,
with_dependents: Optional[bool] = False,
) -> None:
) -> List[RelationDescription]:
"""
List the execution order of loads or updates.
Expand All @@ -1441,7 +1441,7 @@ def show_downstream_dependents(
relations, selector, include_dependents=True, continue_from=continue_from
)
if not selected_relations:
return
return selected_relations

directly_selected_relations = etl.relation.find_matches(selected_relations, selector)
selected = frozenset(relation.identifier for relation in directly_selected_relations)
Expand Down Expand Up @@ -1533,6 +1533,7 @@ def show_downstream_dependents(
width=width_dep,
)
)
return selected_relations


def show_upstream_dependencies(relations: Sequence[RelationDescription], selector: TableSelector):
Expand Down
2 changes: 1 addition & 1 deletion requirements-docs.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
myst-parser~=0.16
Sphinx~=4.5
Sphinx==4.3.2
sphinx-autodoc-typehints~=1.12
sphinx-book-theme~=0.2
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
arrow==1.2.2
boto3==1.22.4
botocore~=1.25
docker==5.0.3
funcy==1.17
jmespath==0.10.0
jsonschema==3.2.0
Expand Down

0 comments on commit 207d716

Please sign in to comment.