Skip to content

Commit

Permalink
Merge pull request #733 from harrystech/ynaim94/DENG-1805/upgrade-wit…
Browse files Browse the repository at this point in the history
…h-dbt

DENG-1805: Upgrade arthur tables + downstream dbt
  • Loading branch information
ynaim94-harrys authored Jun 14, 2022
2 parents 207d716 + 77940a3 commit a46af69
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 23 deletions.
4 changes: 2 additions & 2 deletions bin/docker_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ case "$action" in
--volume ~/.ssh:/home/arthur/.ssh:ro \
--volume "$data_warehouse_path:/opt/data-warehouse" \
--volume "$(pwd):/opt/src/arthur-redshift-etl" \
--volume /var/run/docker.sock:/var/run/docker.sock:ro \
--volume /var/run/docker.sock:/var/run/docker.sock \
$publish_arg \
$profile_arg \
"arthur-redshift-etl:$tag"
Expand All @@ -194,7 +194,7 @@ case "$action" in
--volume ~/.aws:/home/arthur/.aws \
--volume ~/.ssh:/home/arthur/.ssh:ro \
--volume "$data_warehouse_path:/opt/data-warehouse" \
--volume /var/run/docker.sock:/var/run/docker.sock:ro \
--volume /var/run/docker.sock:/var/run/docker.sock \
$publish_arg \
$profile_arg \
"arthur-redshift-etl:$tag"
Expand Down
2 changes: 1 addition & 1 deletion docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ if [[ -r "/opt/local/redshift_etl/venv/bin/activate" ]]; then
fi
fi

echo arthur >> sudo chmod 777 /var/run/docker.sock
echo arthur | sudo -S chmod 777 /var/run/docker.sock

exec "$@"
47 changes: 39 additions & 8 deletions python/etl/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import etl.templates
import etl.unload
import etl.validate
from etl.dbt import DbtModelIdentifier, DBTProject
from etl.dbt import DBTProject, TableIdentifier
from etl.errors import ETLError, ETLSystemError, InvalidArgumentError
from etl.text import join_with_single_quotes
from etl.util import croak, isoformat_datetime_string
Expand Down Expand Up @@ -1188,6 +1188,13 @@ def add_arguments(self, parser):
help="include views that are downstream of selected relations without any CTAS before"
" (this is the default and only useful with '--only-selected', for debugging only)",
)

parser.add_argument(
"--include-dbt",
action="store_true",
default=False,
help="show list of dependents (upstream) for every relation",
)
group = parser.add_mutually_exclusive_group()
group.add_argument(
"--with-staging-schemas",
Expand All @@ -1214,7 +1221,7 @@ def callback(self, args):
logger.warning("Option '--into-schema' implies '--only-selected'")
args.only_selected = True
dw_config = etl.config.get_dw_config()
relations = self.find_relation_descriptions(
selected_relations = self.find_relation_descriptions(
args,
default_scheme="s3",
required_relation_selector=dw_config.required_in_full_load_selector,
Expand All @@ -1231,8 +1238,8 @@ def callback(self, args):
wlm_query_slots = args.wlm_query_slots or etl.config.get_config_int(
"resources.RedshiftCluster.wlm_query_slots", 1
)
etl.load.upgrade_data_warehouse(
relations,
selected_relations = etl.load.upgrade_data_warehouse(
selected_relations,
args.pattern,
max_concurrency=max_concurrency,
wlm_query_slots=wlm_query_slots,
Expand All @@ -1246,6 +1253,28 @@ def callback(self, args):
dry_run=args.dry_run,
)

if not args.include_dbt or args.only_selected:
return

dbt_target = "etl_staging" if args.use_staging_schemas else "dev"

arthur_table_identifiers = [
TableIdentifier(*etl.names.TableName.from_identifier(relation.identifier).to_tuple())
for relation in selected_relations
]
dbt_project = DBTProject.from_env()
dbt_downstream_parents = " ".join(
[
f"{parent}+"
for parent in dbt_project.find_arthur_leaf_dbt_childs(arthur_table_identifiers)
]
)
if not dbt_downstream_parents:
logger.info("No downstream dbt model found")
return
dbt_project.build_image()
dbt_project.run_cmd(f"dbt build -t {dbt_target} -s {dbt_downstream_parents}")


class UpdateDataWarehouseCommand(SubCommand):
uses_monitor = True
Expand Down Expand Up @@ -1785,12 +1814,14 @@ def callback(self, args):
if not args.include_dbt:
return

dbt_model_identifiers = [
DbtModelIdentifier(*relation.identifier.split(".")) for relation in relations
arthur_table_identifier = [
TableIdentifier(*etl.names.TableName.from_identifier(relation.identifier).to_tuple())
for relation in relations
]
dbt_project = DBTProject(os.environ["DBT_ROOT"], os.environ["DBT_PROFILES_DIR"])

dbt_project = DBTProject.from_env()
dbt_downstream_parents = " ".join(
[f"{parent}+" for parent in dbt_project.show_downstream_dbt_parents(dbt_model_identifiers)]
[f"{parent}+" for parent in dbt_project.find_arthur_leaf_dbt_childs(arthur_table_identifier)]
)
if not dbt_downstream_parents:
logging.info("No dbt downstream dependents found")
Expand Down
38 changes: 28 additions & 10 deletions python/etl/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

DbtModelIdentifier = namedtuple("DbtModelIdentifier", ["schema", "table"])
TableIdentifier = namedtuple("TableIdentifier", ["schema", "table"])

DBTRelation = namedtuple("DBTRelation", ["name", "depends_on", "type", "is_required"])

Expand All @@ -23,6 +23,10 @@ def __init__(self, dbt_project_root, dbt_profiles_dir):
self.client = docker.from_env()
self.tag = "arthur_dbt:latest"

@classmethod
def from_env(cls):
return DBTProject(os.environ["DBT_ROOT"], os.environ["DBT_PROFILES_DIR"])

def build_image(self):
logging.info("Building DBT image")
img = self.client.api.build(
Expand All @@ -34,10 +38,12 @@ def build_image(self):
continue
logging.info(f"{self.tag} docker image built")

def run_cmd(self, cmd):
def run_cmd(self, cmd, detach=True, logs=True):
if logs and not detach:
raise ValueError("Logs cannot be set to True while detach is False")
try:
logging.info(f"Executing inside dbt container {self.tag}: $ {cmd}")
return self.client.containers.run(
dbt_container = self.client.containers.run(
self.tag,
cmd,
volumes={
Expand All @@ -46,9 +52,21 @@ def run_cmd(self, cmd):
},
stderr=True,
stdout=True,
).decode("utf-8")
detach=detach,
)
gen = dbt_container.logs(follow=True, stream=True)
dbt_stdout = []
try:
# Print logs as they are received
for raw_line in gen:
logline = raw_line.decode("utf-8").strip()
logger.info(f"{self.tag} # {logline}")
dbt_stdout.append(logline)
except StopIteration:
pass
return dbt_stdout
except docker.errors.ContainerError as exc:
print(exc.container.logs())
logger.error(exc.container.logs())
raise

@staticmethod
Expand All @@ -58,7 +76,8 @@ def get_files_in_path(path, file_types=None, prefix=""):
if (not file_types or file.split(".")[-1] in file_types) and file.startswith(prefix):
yield (root, file)

def show_downstream_dbt_parents(self, dbt_model_indentifiers: Sequence[DbtModelIdentifier]):
def find_arthur_leaf_dbt_childs(self, arthur_table_identifier: Sequence[TableIdentifier]):
"""Find dbt models that source data from Arthur models."""
dbt_sql_files = self.get_files_in_path(self.local_dbt_path, file_types=("sql"))
db_source_regex = r"db_source\(\s*'(.*)'\s*,\s*'(.*)'\s*\)"

Expand All @@ -68,12 +87,11 @@ def show_downstream_dbt_parents(self, dbt_model_indentifiers: Sequence[DbtModelI
db_sources = re.findall(db_source_regex, sql_file)
for db_source in db_sources:
schema, table = db_source
for dmi in dbt_model_indentifiers:
for dmi in arthur_table_identifier:
if dmi.schema == schema and dmi.table == table:
yield os.path.splitext(sql_file_path)[0]

def parse_dbt_run_stdout(self, res):
res_list = res.strip().split("\n")
def parse_dbt_run_stdout(self, res_list: list):
relations = []
for e in res_list:
try:
Expand All @@ -87,7 +105,7 @@ def parse_dbt_run_stdout(self, res):

return relations

def render_dbt_list(self, dbt_relations, with_dependencies=False, with_dependents=False):
def render_dbt_list(self, dbt_relations):
current_index = {relation.name: i + 1 for i, relation in enumerate(dbt_relations)}
width_selected = max(len(name) for name in current_index)
line_template = (
Expand Down
5 changes: 3 additions & 2 deletions python/etl/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -1227,7 +1227,7 @@ def upgrade_data_warehouse(
target_schema: Optional[str] = None,
skip_copy=False,
dry_run=False,
) -> None:
) -> List[RelationDescription]:
"""
Push new (structural) changes and fresh data through data warehouse.
Expand Down Expand Up @@ -1260,7 +1260,7 @@ def upgrade_data_warehouse(
continue_from=continue_from,
)
if not selected_relations:
return
return selected_relations

involved_execution_levels = frozenset(
funcy.distinct(relation.execution_level for relation in selected_relations)
Expand Down Expand Up @@ -1302,6 +1302,7 @@ def upgrade_data_warehouse(
etl.data_warehouse.create_schemas(traversed_schemas, use_staging=use_staging, dry_run=dry_run)

create_relations(relations, max_concurrency, wlm_query_slots, statement_timeout, dry_run=dry_run)
return selected_relations


def update_data_warehouse(
Expand Down

0 comments on commit a46af69

Please sign in to comment.