diff --git a/bin/docker_run.sh b/bin/docker_run.sh index cff92058..91daa397 100755 --- a/bin/docker_run.sh +++ b/bin/docker_run.sh @@ -175,7 +175,7 @@ case "$action" in --volume ~/.ssh:/home/arthur/.ssh:ro \ --volume "$data_warehouse_path:/opt/data-warehouse" \ --volume "$(pwd):/opt/src/arthur-redshift-etl" \ - --volume /var/run/docker.sock:/var/run/docker.sock:ro \ + --volume /var/run/docker.sock:/var/run/docker.sock \ $publish_arg \ $profile_arg \ "arthur-redshift-etl:$tag" @@ -194,7 +194,7 @@ case "$action" in --volume ~/.aws:/home/arthur/.aws \ --volume ~/.ssh:/home/arthur/.ssh:ro \ --volume "$data_warehouse_path:/opt/data-warehouse" \ - --volume /var/run/docker.sock:/var/run/docker.sock:ro \ + --volume /var/run/docker.sock:/var/run/docker.sock \ $publish_arg \ $profile_arg \ "arthur-redshift-etl:$tag" diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index 9f447c1c..577d57dc 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -31,6 +31,6 @@ if [[ -r "/opt/local/redshift_etl/venv/bin/activate" ]]; then fi fi -echo arthur >> sudo chmod 777 /var/run/docker.sock +echo arthur | sudo -S chmod 777 /var/run/docker.sock exec "$@" diff --git a/python/etl/commands.py b/python/etl/commands.py index 75be649d..660a11c9 100644 --- a/python/etl/commands.py +++ b/python/etl/commands.py @@ -42,7 +42,7 @@ import etl.templates import etl.unload import etl.validate -from etl.dbt import DbtModelIdentifier, DBTProject +from etl.dbt import DBTProject, TableIdentifier from etl.errors import ETLError, ETLSystemError, InvalidArgumentError from etl.text import join_with_single_quotes from etl.util import croak, isoformat_datetime_string @@ -1188,6 +1188,13 @@ def add_arguments(self, parser): help="include views that are downstream of selected relations without any CTAS before" " (this is the default and only useful with '--only-selected', for debugging only)", ) + + parser.add_argument( + "--include-dbt", + action="store_true", + default=False, + help="show list of dependents (upstream) for every relation", + ) group = parser.add_mutually_exclusive_group() group.add_argument( "--with-staging-schemas", @@ -1214,7 +1221,7 @@ def callback(self, args): logger.warning("Option '--into-schema' implies '--only-selected'") args.only_selected = True dw_config = etl.config.get_dw_config() - relations = self.find_relation_descriptions( + selected_relations = self.find_relation_descriptions( args, default_scheme="s3", required_relation_selector=dw_config.required_in_full_load_selector, @@ -1231,8 +1238,8 @@ def callback(self, args): wlm_query_slots = args.wlm_query_slots or etl.config.get_config_int( "resources.RedshiftCluster.wlm_query_slots", 1 ) - etl.load.upgrade_data_warehouse( - relations, + selected_relations = etl.load.upgrade_data_warehouse( + selected_relations, args.pattern, max_concurrency=max_concurrency, wlm_query_slots=wlm_query_slots, @@ -1246,6 +1253,28 @@ def callback(self, args): dry_run=args.dry_run, ) + if not args.include_dbt or args.only_selected: + return + + dbt_target = "etl_staging" if args.use_staging_schemas else "dev" + + arthur_table_identifiers = [ + TableIdentifier(*etl.names.TableName.from_identifier(relation.identifier).to_tuple()) + for relation in selected_relations + ] + dbt_project = DBTProject.from_env() + dbt_downstream_parents = " ".join( + [ + f"{parent}+" + for parent in dbt_project.find_arthur_leaf_dbt_childs(arthur_table_identifiers) + ] + ) + if not dbt_downstream_parents: + logger.info("No downstream dbt model found") + return + dbt_project.build_image() + dbt_project.run_cmd(f"dbt build -t {dbt_target} -s {dbt_downstream_parents}") + class UpdateDataWarehouseCommand(SubCommand): uses_monitor = True @@ -1785,12 +1814,14 @@ def callback(self, args): if not args.include_dbt: return - dbt_model_identifiers = [ - DbtModelIdentifier(*relation.identifier.split(".")) for relation in relations + arthur_table_identifier = [ + TableIdentifier(*etl.names.TableName.from_identifier(relation.identifier).to_tuple()) + for relation in relations ] - dbt_project = DBTProject(os.environ["DBT_ROOT"], os.environ["DBT_PROFILES_DIR"]) + + dbt_project = DBTProject.from_env() dbt_downstream_parents = " ".join( - [f"{parent}+" for parent in dbt_project.show_downstream_dbt_parents(dbt_model_identifiers)] + [f"{parent}+" for parent in dbt_project.find_arthur_leaf_dbt_childs(arthur_table_identifier)] ) if not dbt_downstream_parents: logging.info("No dbt downstream dependents found") diff --git a/python/etl/dbt.py b/python/etl/dbt.py index 41796c69..25bee018 100644 --- a/python/etl/dbt.py +++ b/python/etl/dbt.py @@ -10,7 +10,7 @@ logger = logging.getLogger(__name__) logger.addHandler(logging.NullHandler()) -DbtModelIdentifier = namedtuple("DbtModelIdentifier", ["schema", "table"]) +TableIdentifier = namedtuple("TableIdentifier", ["schema", "table"]) DBTRelation = namedtuple("DBTRelation", ["name", "depends_on", "type", "is_required"]) @@ -23,6 +23,10 @@ def __init__(self, dbt_project_root, dbt_profiles_dir): self.client = docker.from_env() self.tag = "arthur_dbt:latest" + @classmethod + def from_env(cls): + return DBTProject(os.environ["DBT_ROOT"], os.environ["DBT_PROFILES_DIR"]) + def build_image(self): logging.info("Building DBT image") img = self.client.api.build( @@ -34,10 +38,12 @@ def build_image(self): continue logging.info(f"{self.tag} docker image built") - def run_cmd(self, cmd): + def run_cmd(self, cmd, detach=True, logs=True): + if logs and not detach: + raise ValueError("Logs cannot be set to True while detach is False") try: logging.info(f"Executing inside dbt container {self.tag}: $ {cmd}") - return self.client.containers.run( + dbt_container = self.client.containers.run( self.tag, cmd, volumes={ @@ -46,9 +52,21 @@ def run_cmd(self, cmd): }, stderr=True, stdout=True, - ).decode("utf-8") + detach=detach, + ) + gen = dbt_container.logs(follow=True, stream=True) + dbt_stdout = [] + try: + # Print logs as they are received + for raw_line in gen: + logline = raw_line.decode("utf-8").strip() + logger.info(f"{self.tag} # {logline}") + dbt_stdout.append(logline) + except StopIteration: + pass + return dbt_stdout except docker.errors.ContainerError as exc: - print(exc.container.logs()) + logger.error(exc.container.logs()) raise @staticmethod @@ -58,7 +76,8 @@ def get_files_in_path(path, file_types=None, prefix=""): if (not file_types or file.split(".")[-1] in file_types) and file.startswith(prefix): yield (root, file) - def show_downstream_dbt_parents(self, dbt_model_indentifiers: Sequence[DbtModelIdentifier]): + def find_arthur_leaf_dbt_childs(self, arthur_table_identifier: Sequence[TableIdentifier]): + """Find dbt models that source data from Arthur models.""" dbt_sql_files = self.get_files_in_path(self.local_dbt_path, file_types=("sql")) db_source_regex = r"db_source\(\s*'(.*)'\s*,\s*'(.*)'\s*\)" @@ -68,12 +87,11 @@ def show_downstream_dbt_parents(self, dbt_model_indentifiers: Sequence[DbtModelI db_sources = re.findall(db_source_regex, sql_file) for db_source in db_sources: schema, table = db_source - for dmi in dbt_model_indentifiers: + for dmi in arthur_table_identifier: if dmi.schema == schema and dmi.table == table: yield os.path.splitext(sql_file_path)[0] - def parse_dbt_run_stdout(self, res): - res_list = res.strip().split("\n") + def parse_dbt_run_stdout(self, res_list: list): relations = [] for e in res_list: try: @@ -87,7 +105,7 @@ def parse_dbt_run_stdout(self, res): return relations - def render_dbt_list(self, dbt_relations, with_dependencies=False, with_dependents=False): + def render_dbt_list(self, dbt_relations): current_index = {relation.name: i + 1 for i, relation in enumerate(dbt_relations)} width_selected = max(len(name) for name in current_index) line_template = ( diff --git a/python/etl/load.py b/python/etl/load.py index fce6edef..5312ae94 100644 --- a/python/etl/load.py +++ b/python/etl/load.py @@ -1227,7 +1227,7 @@ def upgrade_data_warehouse( target_schema: Optional[str] = None, skip_copy=False, dry_run=False, -) -> None: +) -> List[RelationDescription]: """ Push new (structural) changes and fresh data through data warehouse. @@ -1260,7 +1260,7 @@ def upgrade_data_warehouse( continue_from=continue_from, ) if not selected_relations: - return + return selected_relations involved_execution_levels = frozenset( funcy.distinct(relation.execution_level for relation in selected_relations) @@ -1302,6 +1302,7 @@ def upgrade_data_warehouse( etl.data_warehouse.create_schemas(traversed_schemas, use_staging=use_staging, dry_run=dry_run) create_relations(relations, max_concurrency, wlm_query_slots, statement_timeout, dry_run=dry_run) + return selected_relations def update_data_warehouse(